m/n/c/curator: add UpdateStatus

This implements Curator.UpdateStatus, which lets nodes self-report some
status items. Currently this is their external IP address, which is
needed to generate a Cluster Directory which is in turn needed to
register into a cluster.

Change-Id: Ib5464ca78ee3466d9b9f89b7af8b40f613ae8dcc
Reviewed-on: https://review.monogon.dev/c/monogon/+/332
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/node/core/curator/BUILD.bazel b/metropolis/node/core/curator/BUILD.bazel
index 22e237f..8eb9a49 100644
--- a/metropolis/node/core/curator/BUILD.bazel
+++ b/metropolis/node/core/curator/BUILD.bazel
@@ -54,6 +54,7 @@
     embed = [":go_default_library"],
     deps = [
         "//metropolis/node/core/consensus/client:go_default_library",
+        "//metropolis/node/core/curator/proto/api:go_default_library",
         "//metropolis/node/core/identity:go_default_library",
         "//metropolis/node/core/localstorage:go_default_library",
         "//metropolis/node/core/localstorage/declarative:go_default_library",
@@ -61,6 +62,7 @@
         "//metropolis/pkg/event/memory:go_default_library",
         "//metropolis/pkg/supervisor:go_default_library",
         "//metropolis/proto/api:go_default_library",
+        "//metropolis/proto/common:go_default_library",
         "@io_etcd_go_etcd//clientv3:go_default_library",
         "@io_etcd_go_etcd//integration:go_default_library",
         "@org_golang_google_grpc//:go_default_library",
diff --git a/metropolis/node/core/curator/impl_follower.go b/metropolis/node/core/curator/impl_follower.go
index cf01b47..f1556a0 100644
--- a/metropolis/node/core/curator/impl_follower.go
+++ b/metropolis/node/core/curator/impl_follower.go
@@ -24,3 +24,7 @@
 func (f *curatorFollower) GetRegisterTicket(_ context.Context, _ *apb.GetRegisterTicketRequest) (*apb.GetRegisterTicketResponse, error) {
 	return nil, status.Error(codes.Unimplemented, "curator follower not implemented")
 }
+
+func (f *curatorFollower) UpdateNodeStatus(ctx context.Context, req *cpb.UpdateNodeStatusRequest) (*cpb.UpdateNodeStatusResponse, error) {
+	return nil, status.Error(codes.Unimplemented, "curator follower not implemented")
+}
diff --git a/metropolis/node/core/curator/impl_leader.go b/metropolis/node/core/curator/impl_leader.go
index 3ebfcfb..7767e48 100644
--- a/metropolis/node/core/curator/impl_leader.go
+++ b/metropolis/node/core/curator/impl_leader.go
@@ -77,8 +77,8 @@
 
 func newCuratorLeader(l leadership) *curatorLeader {
 	return &curatorLeader{
-		leaderCurator{l},
-		leaderAAA{l},
-		leaderManagement{l},
+		leaderCurator{leadership: l},
+		leaderAAA{leadership: l},
+		leaderManagement{leadership: l},
 	}
 }
diff --git a/metropolis/node/core/curator/impl_leader_curator.go b/metropolis/node/core/curator/impl_leader_curator.go
index 1043d58..5a15c94 100644
--- a/metropolis/node/core/curator/impl_leader_curator.go
+++ b/metropolis/node/core/curator/impl_leader_curator.go
@@ -1,10 +1,17 @@
 package curator
 
 import (
+	"context"
+	"sync"
+
+	"go.etcd.io/etcd/clientv3"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
+	"google.golang.org/protobuf/proto"
 
 	cpb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+	"source.monogon.dev/metropolis/node/core/identity"
+	"source.monogon.dev/metropolis/node/core/rpc"
 	"source.monogon.dev/metropolis/pkg/event/etcd"
 )
 
@@ -12,6 +19,15 @@
 // leader.
 type leaderCurator struct {
 	leadership
+
+	// muNodes guards any changes to nodes, and prevents race conditions where the
+	// curator performs a read-modify-write operation to node data. The curator's
+	// leadership ensure no two curators run simultaneously, and this lock ensures
+	// no two parallel curator operations race eachother.
+	//
+	// This lock has to be taken any time such RMW operation takes place when not
+	// additionally guarded using etcd transactions.
+	muNodes sync.Mutex
 }
 
 // Watch returns a stream of updates concerning some part of the cluster
@@ -54,8 +70,9 @@
 		ev := &cpb.WatchEvent{
 			Nodes: []*cpb.Node{
 				{
-					Id:    node.ID(),
-					Roles: node.proto().Roles,
+					Id:     node.ID(),
+					Roles:  node.proto().Roles,
+					Status: node.status,
 				},
 			},
 		}
@@ -64,3 +81,69 @@
 		}
 	}
 }
+
+// UpdateNodeStatus is called by nodes in the cluster to report their own
+// status. This status is recorded by the curator and can be retrieed via
+// Watch.
+func (l *leaderCurator) UpdateNodeStatus(ctx context.Context, req *cpb.UpdateNodeStatusRequest) (*cpb.UpdateNodeStatusResponse, error) {
+	// Ensure that the given node_id matches the calling node. We currently
+	// only allow for direct self-reporting of status by nodes.
+	pi := rpc.GetPeerInfo(ctx)
+	if pi == nil || pi.Node == nil {
+		return nil, status.Error(codes.PermissionDenied, "only nodes can update node status")
+	}
+	id := identity.NodeID(pi.Node.PublicKey)
+	if id != req.NodeId {
+		return nil, status.Errorf(codes.PermissionDenied, "node %q cannot update the status of node %q", id, req.NodeId)
+	}
+
+	// Verify sent status. Currently we assume the entire status must be set at
+	// once, and cannot be unset.
+	if req.Status == nil || req.Status.ExternalAddress == "" {
+		return nil, status.Errorf(codes.InvalidArgument, "Status and Status.ExternalAddress must be set")
+	}
+
+	// As we're performing a node update with two etcd transactions below (one
+	// to retrieve, one to save and upate node), take a local lock to ensure
+	// that we don't have a race between either two UpdateNodeStatus calls or
+	// an UpdateNodeStatus call and some other mutation to the node store.
+	l.muNodes.Lock()
+	defer l.muNodes.Unlock()
+
+	// Retrieve node ...
+	key, err := nodeEtcdPrefix.Key(id)
+	if err != nil {
+		return nil, status.Errorf(codes.InvalidArgument, "invalid node id")
+	}
+	res, err := l.txnAsLeader(ctx, clientv3.OpGet(key))
+	if err != nil {
+		if rpcErr, ok := rpcError(err); ok {
+			return nil, rpcErr
+		}
+		return nil, status.Errorf(codes.Unavailable, "could not retrieve node: %v", err)
+	}
+	kvs := res.Responses[0].GetResponseRange().Kvs
+	if len(kvs) < 1 {
+		return nil, status.Error(codes.NotFound, "no such node")
+	}
+	node, err := nodeUnmarshal(kvs[0].Value)
+	if err != nil {
+		return nil, status.Errorf(codes.Unavailable, "failed to unmarshal node: %v", err)
+	}
+	// ... update its' status ...
+	node.status = req.Status
+	// ... and save it to etcd.
+	bytes, err := proto.Marshal(node.proto())
+	if err != nil {
+		return nil, status.Errorf(codes.Unavailable, "failed to marshal node: %v", err)
+	}
+	_, err = l.txnAsLeader(ctx, clientv3.OpPut(key, string(bytes)))
+	if err != nil {
+		if rpcErr, ok := rpcError(err); ok {
+			return nil, rpcErr
+		}
+		return nil, status.Errorf(codes.Unavailable, "could not update node: %v", err)
+	}
+
+	return &cpb.UpdateNodeStatusResponse{}, nil
+}
diff --git a/metropolis/node/core/curator/impl_leader_management.go b/metropolis/node/core/curator/impl_leader_management.go
index 1895888..7284eb0 100644
--- a/metropolis/node/core/curator/impl_leader_management.go
+++ b/metropolis/node/core/curator/impl_leader_management.go
@@ -9,7 +9,7 @@
 	"google.golang.org/grpc/status"
 	"google.golang.org/protobuf/proto"
 
-	cpb "source.monogon.dev/metropolis/node/core/curator/proto/private"
+	ppb "source.monogon.dev/metropolis/node/core/curator/proto/private"
 	apb "source.monogon.dev/metropolis/proto/api"
 )
 
@@ -52,7 +52,7 @@
 	}
 
 	// No ticket, generate one.
-	ticket := &cpb.RegisterTicket{
+	ticket := &ppb.RegisterTicket{
 		Opaque: make([]byte, registerTicketSize),
 	}
 	_, err = rand.Read(ticket.Opaque)
diff --git a/metropolis/node/core/curator/impl_leader_test.go b/metropolis/node/core/curator/impl_leader_test.go
index 593830d..d20541e 100644
--- a/metropolis/node/core/curator/impl_leader_test.go
+++ b/metropolis/node/core/curator/impl_leader_test.go
@@ -10,8 +10,10 @@
 	"google.golang.org/grpc/test/bufconn"
 
 	"source.monogon.dev/metropolis/node/core/consensus/client"
+	ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
 	"source.monogon.dev/metropolis/node/core/rpc"
 	apb "source.monogon.dev/metropolis/proto/api"
+	cpb "source.monogon.dev/metropolis/proto/common"
 )
 
 // fakeLeader creates a curatorLeader without any underlying leader election, in
@@ -63,7 +65,7 @@
 
 	// Build a test cluster PKI and node/manager certificates, and create the
 	// listener security parameters which will authenticate incoming requests.
-	ephemeral := rpc.NewEphemeralClusterCredentials(t, 1)
+	ephemeral := rpc.NewEphemeralClusterCredentials(t, 2)
 	nodeCredentials := ephemeral.Nodes[0]
 
 	cNode := NewNodeForBootstrap(nil, nodeCredentials.PublicKey())
@@ -127,6 +129,7 @@
 		mgmtConn:      mcl,
 		localNodeConn: lcl,
 		localNodeID:   nodeCredentials.ID(),
+		otherNodeID:   ephemeral.Nodes[1].ID(),
 		cancel:        ctxC,
 	}
 }
@@ -143,6 +146,9 @@
 	localNodeConn grpc.ClientConnInterface
 	// localNodeID is the NodeID of the fake node that the leader is running on.
 	localNodeID string
+	// otherNodeID is the NodeID of some other node present in the curator
+	// state.
+	otherNodeID string
 	// cancel shuts down the fake leader and all client connections.
 	cancel context.CancelFunc
 }
@@ -175,3 +181,74 @@
 		t.Errorf("Unexpected ticket change between calls")
 	}
 }
+
+// TestClusterUpdateNodeStatus exercises the Curator.UpdateNodeStatus RPC by
+// sending node updates and making sure they are reflected in subsequent Watch
+// events.
+func TestClusterUpdateNodeStatus(t *testing.T) {
+	cl := fakeLeader(t)
+	defer cl.cancel()
+
+	curator := ipb.NewCuratorClient(cl.localNodeConn)
+
+	ctx, ctxC := context.WithCancel(context.Background())
+	defer ctxC()
+
+	// Retrieve initial node data, it should have no status set.
+	value, err := curator.Watch(ctx, &ipb.WatchRequest{
+		Kind: &ipb.WatchRequest_NodeInCluster_{
+			NodeInCluster: &ipb.WatchRequest_NodeInCluster{
+				NodeId: cl.localNodeID,
+			},
+		},
+	})
+	if err != nil {
+		t.Fatalf("Could not request node watch: %v", err)
+	}
+	ev, err := value.Recv()
+	if err != nil {
+		t.Fatalf("Could not receive initial node value: %v", err)
+	}
+	if status := ev.Nodes[0].Status; status != nil {
+		t.Errorf("Initial node value contains status, should be nil: %+v", status)
+	}
+
+	// Update status...
+	_, err = curator.UpdateNodeStatus(ctx, &ipb.UpdateNodeStatusRequest{
+		NodeId: cl.localNodeID,
+		Status: &cpb.NodeStatus{
+			ExternalAddress: "192.0.2.10",
+		},
+	})
+	if err != nil {
+		t.Fatalf("UpdateNodeStatus: %v", err)
+	}
+
+	// ... and expect it to be reflected in the new node value.
+	for {
+		ev, err = value.Recv()
+		if err != nil {
+			t.Fatalf("Could not receive second node value: %v", err)
+		}
+		// Keep waiting until we get a status.
+		status := ev.Nodes[0].Status
+		if status == nil {
+			continue
+		}
+		if want, got := "192.0.2.10", status.ExternalAddress; want != got {
+			t.Errorf("Wanted external address %q, got %q", want, got)
+		}
+		break
+	}
+
+	// Expect updating some other node's ID to fail.
+	_, err = curator.UpdateNodeStatus(ctx, &ipb.UpdateNodeStatusRequest{
+		NodeId: cl.otherNodeID,
+		Status: &cpb.NodeStatus{
+			ExternalAddress: "192.0.2.10",
+		},
+	})
+	if err == nil {
+		t.Errorf("UpdateNodeStatus for other node (%q vs local %q) succeeded, should have failed", cl.localNodeID, cl.otherNodeID)
+	}
+}
diff --git a/metropolis/node/core/curator/listener.go b/metropolis/node/core/curator/listener.go
index d21d951..a190167 100644
--- a/metropolis/node/core/curator/listener.go
+++ b/metropolis/node/core/curator/listener.go
@@ -358,3 +358,12 @@
 	})
 	return
 }
+
+func (l *listener) UpdateNodeStatus(ctx context.Context, req *cpb.UpdateNodeStatusRequest) (res *cpb.UpdateNodeStatusResponse, err error) {
+	err = l.callImpl(ctx, func(ctx context.Context, impl rpc.ClusterExternalServices) error {
+		var err2 error
+		res, err2 = impl.UpdateNodeStatus(ctx, req)
+		return err2
+	})
+	return
+}
diff --git a/metropolis/node/core/curator/proto/api/api.proto b/metropolis/node/core/curator/proto/api/api.proto
index da3e3c3..ab78ba6 100644
--- a/metropolis/node/core/curator/proto/api/api.proto
+++ b/metropolis/node/core/curator/proto/api/api.proto
@@ -46,6 +46,14 @@
             need: PERMISSION_READ_CLUSTER_STATUS;
         };
     }
+    // UpdateNodestatus is called by nodes in the cluster to report their own
+    // status. This status is recorded by the curator and can be retrieved via
+    // Watch.
+    rpc UpdateNodeStatus(UpdateNodeStatusRequest) returns (UpdateNodeStatusResponse) {
+        option (metropolis.proto.ext.authorization) = {
+            need: PERMISSION_UPDATE_NODE_SELF;
+        };
+    }
 }
 
 // Node is the state and configuration of a node in the cluster.
@@ -54,6 +62,8 @@
     string id = 1;
     // Roles that the nodes is supposed to take on.
     metropolis.proto.common.NodeRoles roles = 2;
+    // Last reported status of the node, if available.
+    metropolis.proto.common.NodeStatus status = 3;
 };
 
 // WatchRequest specifies what data the caller is interested in. This influences
@@ -81,3 +91,15 @@
     // filter out anything spurious.
     repeated Node nodes = 1;
 }
+
+message UpdateNodeStatusRequest {
+    // node_id is the Metropolis node identity string of the node for which to
+    // set a new status. This currently must be the same node as the one
+    // performing the RPC and is included for safety.
+    string node_id = 1;
+    // status to be set. All fields are overwritten.
+    metropolis.proto.common.NodeStatus status = 2;
+}
+
+message UpdateNodeStatusResponse {
+}
diff --git a/metropolis/node/core/curator/proto/private/storage.proto b/metropolis/node/core/curator/proto/private/storage.proto
index 3208cdb..c8b6b79 100644
--- a/metropolis/node/core/curator/proto/private/storage.proto
+++ b/metropolis/node/core/curator/proto/private/storage.proto
@@ -24,6 +24,8 @@
 
     // The node's intended roles when running.
     metropolis.proto.common.NodeRoles roles = 4;
+
+    metropolis.proto.common.NodeStatus status = 5;
 }
 
 // Information about the cluster owner, currently the only Metropolis management
diff --git a/metropolis/node/core/curator/state_node.go b/metropolis/node/core/curator/state_node.go
index e0763c4..9a84bb7 100644
--- a/metropolis/node/core/curator/state_node.go
+++ b/metropolis/node/core/curator/state_node.go
@@ -64,6 +64,8 @@
 	// cluster. See //metropolis/proto:common.proto for more information.
 	state cpb.NodeState
 
+	status *cpb.NodeStatus
+
 	// A Node can have multiple Roles. Each Role is represented by the presence
 	// of NodeRole* structures in this structure, with a nil pointer
 	// representing the lack of a role.
@@ -131,6 +133,7 @@
 		PublicKey:        n.pubkey,
 		FsmState:         n.state,
 		Roles:            &cpb.NodeRoles{},
+		Status:           n.status,
 	}
 	if n.kubernetesWorker != nil {
 		msg.Roles.KubernetesWorker = &cpb.NodeRoles_KubernetesWorker{}
@@ -147,6 +150,7 @@
 		clusterUnlockKey: msg.ClusterUnlockKey,
 		pubkey:           msg.PublicKey,
 		state:            msg.FsmState,
+		status:           msg.Status,
 	}
 	if msg.Roles.KubernetesWorker != nil {
 		n.kubernetesWorker = &NodeRoleKubernetesWorker{}
diff --git a/metropolis/node/core/rpc/server.go b/metropolis/node/core/rpc/server.go
index 70a0a74..0e823f4 100644
--- a/metropolis/node/core/rpc/server.go
+++ b/metropolis/node/core/rpc/server.go
@@ -12,6 +12,7 @@
 	// services, either locally or remotely.
 	nodePermissions = Permissions{
 		epb.Permission_PERMISSION_READ_CLUSTER_STATUS: true,
+		epb.Permission_PERMISSION_UPDATE_NODE_SELF:    true,
 	}
 )