m/n/c/curator: implement Management.GetNodes

This is a management call that provides detailed per-node details.
Currently it returns all information about all nodes, but can be then
extended to allow filtering and selective/masked field retrieval.

This call is then used to implement a test which exercises
Curator.NodeRegister and GetNodes.

Change-Id: Ia093d9f03a4213b01acbb0fdac9714d8e7b02dd3
Reviewed-on: https://review.monogon.dev/c/monogon/+/434
Reviewed-by: Mateusz Zalega <mateusz@monogon.tech>
diff --git a/metropolis/node/core/curator/impl_leader_aaa.go b/metropolis/node/core/curator/impl_leader_aaa.go
index 43e325c..a66c249 100644
--- a/metropolis/node/core/curator/impl_leader_aaa.go
+++ b/metropolis/node/core/curator/impl_leader_aaa.go
@@ -33,7 +33,7 @@
 	res, err := a.etcd.Get(ctx, initialOwnerEtcdPath)
 	if err != nil {
 		if !errors.Is(err, ctx.Err()) {
-			// TODO(q3k): log
+			// TODO(issues/85): log
 			return nil, status.Error(codes.Unavailable, "could not retrieve initial owner status in etcd")
 		}
 		return nil, err
diff --git a/metropolis/node/core/curator/impl_leader_curator.go b/metropolis/node/core/curator/impl_leader_curator.go
index c566040..9ff7d91 100644
--- a/metropolis/node/core/curator/impl_leader_curator.go
+++ b/metropolis/node/core/curator/impl_leader_curator.go
@@ -73,7 +73,7 @@
 			if rpcErr, ok := rpcError(err); ok {
 				return rpcErr
 			}
-			// TODO(q3k): log err
+			// TODO(issues/85): log err
 			return status.Error(codes.Unavailable, "internal error")
 		}
 
@@ -106,7 +106,7 @@
 			break
 		}
 		if err != nil {
-			// TODO(q3k): log err
+			// TODO(issues/85): log err
 			return status.Error(codes.Unavailable, "internal error during initial fetch")
 		}
 		nodeKV := v.(nodeAtID)
@@ -144,7 +144,7 @@
 	for {
 		v, err := w.Get(ctx)
 		if err != nil {
-			// TODO(q3k): log err
+			// TODO(issues/85): log err
 			return status.Errorf(codes.Unavailable, "internal error during update")
 		}
 		we := &ipb.WatchEvent{}
@@ -285,7 +285,7 @@
 	// valid.
 	wantTicket, err := l.ensureRegisterTicket(ctx)
 	if err != nil {
-		// TODO(q3k): log err
+		// TODO(issues/85): log err
 		return nil, status.Error(codes.Unavailable, "could not retrieve register ticket")
 	}
 	gotTicket := req.RegisterTicket
@@ -304,7 +304,7 @@
 	id := identity.NodeID(pubkey)
 	key, err := nodeEtcdPrefix.Key(id)
 	if err != nil {
-		// TODO(q3k): log err
+		// TODO(issues/85): log err
 		return nil, status.Errorf(codes.InvalidArgument, "invalid node id")
 	}
 	res, err := l.txnAsLeader(ctx, clientv3.OpGet(key))
@@ -312,14 +312,14 @@
 		if rpcErr, ok := rpcError(err); ok {
 			return nil, rpcErr
 		}
-		// TODO(q3k): log this
+		// TODO(issues/85): log this
 		return nil, status.Errorf(codes.Unavailable, "could not retrieve node %s: %v", id, err)
 	}
 	kvs := res.Responses[0].GetResponseRange().Kvs
 	if len(kvs) > 0 {
 		node, err := nodeUnmarshal(kvs[0].Value)
 		if err != nil {
-			// TODO(q3k): log this
+			// TODO(issues/85): log this
 			return nil, status.Errorf(codes.Unavailable, "could not unmarshal node")
 		}
 		// If the existing node is in the NEW state already, there's nothing to do,
@@ -331,7 +331,9 @@
 		// We can return a bit more information to the calling node here, as if it's in
 		// possession of the private key corresponding to an existing node in the
 		// cluster, it should have access to the status of the node without danger of
-		// leaking data about other nodes. TODO(q3k): log this
+		// leaking data about other nodes.
+		//
+		// TODO(issues/85): log this
 		return nil, status.Errorf(codes.FailedPrecondition, "node already exists in cluster, state %s", node.state.String())
 	}
 
@@ -342,12 +344,12 @@
 	}
 	nodeBytes, err := proto.Marshal(node.proto())
 	if err != nil {
-		// TODO(q3k): log this
+		// TODO(issues/85): log this
 		return nil, status.Errorf(codes.Unavailable, "could not marshal new node")
 	}
 	_, err = l.txnAsLeader(ctx, clientv3.OpPut(key, string(nodeBytes)))
 	if err != nil {
-		// TODO(q3k): log this
+		// TODO(issues/85): log this
 		return nil, status.Error(codes.Unavailable, "could not save new node")
 	}
 
diff --git a/metropolis/node/core/curator/impl_leader_management.go b/metropolis/node/core/curator/impl_leader_management.go
index dfc07a8..361a251 100644
--- a/metropolis/node/core/curator/impl_leader_management.go
+++ b/metropolis/node/core/curator/impl_leader_management.go
@@ -53,7 +53,7 @@
 	}, nil
 }
 
-// GetClusterInfo implements Curator.GetClusterInfo, which returns summary
+// GetClusterInfo implements Management.GetClusterInfo, which returns summary
 // information about the Metropolis cluster.
 func (l *leaderManagement) GetClusterInfo(ctx context.Context, req *apb.GetClusterInfoRequest) (*apb.GetClusterInfoResponse, error) {
 	res, err := l.txnAsLeader(ctx, nodeEtcdPrefix.Range())
@@ -71,7 +71,7 @@
 	for _, kv := range kvs {
 		node, err := nodeUnmarshal(kv.Value)
 		if err != nil {
-			// TODO(q3k): log this
+			// TODO(issues/85): log this
 			continue
 		}
 		if node.state != cpb.NodeState_NODE_STATE_UP {
@@ -108,3 +108,46 @@
 		CaCertificate:    l.node.ClusterCA().Raw,
 	}, nil
 }
+
+// GetNodes implements Management.GetNodes, which returns a list of nodes from
+// the point of view of the cluster.
+func (l *leaderManagement) GetNodes(_ *apb.GetNodesRequest, srv apb.Management_GetNodesServer) error {
+	ctx := srv.Context()
+
+	l.muNodes.Lock()
+	defer l.muNodes.Unlock()
+
+	// Retrieve all nodes from etcd in a single Get call.
+	res, err := l.txnAsLeader(ctx, nodeEtcdPrefix.Range())
+	if err != nil {
+		return status.Errorf(codes.Unavailable, "could not retrieve list of nodes: %v", err)
+	}
+
+	// Convert etcd data into proto nodes, send one streaming response for each
+	// node.
+	kvs := res.Responses[0].GetResponseRange().Kvs
+	for _, kv := range kvs {
+		node, err := nodeUnmarshal(kv.Value)
+		if err != nil {
+			// TODO(issues/85): log this
+			continue
+		}
+
+		// Convert node roles.
+		roles := &cpb.NodeRoles{}
+		if node.kubernetesWorker != nil {
+			roles.KubernetesWorker = &cpb.NodeRoles_KubernetesWorker{}
+		}
+
+		if err := srv.Send(&apb.Node{
+			Pubkey: node.pubkey,
+			State:  node.state,
+			Status: node.status,
+			Roles:  roles,
+		}); err != nil {
+			return err
+		}
+	}
+
+	return nil
+}
diff --git a/metropolis/node/core/curator/impl_leader_test.go b/metropolis/node/core/curator/impl_leader_test.go
index 2e1dfe8..2608d08 100644
--- a/metropolis/node/core/curator/impl_leader_test.go
+++ b/metropolis/node/core/curator/impl_leader_test.go
@@ -597,3 +597,51 @@
 		t.Fatalf("CaPublicKey mismatch (wanted %s, got %s)", hex.EncodeToString(want), hex.EncodeToString(got))
 	}
 }
+
+// TestRegisterNode is a smoke test for Curator.RegisterNode and
+// Management.GetNodes.
+func TestRegisterNode(t *testing.T) {
+	cl := fakeLeader(t)
+	defer cl.cancel()
+
+	ctx, ctxC := context.WithCancel(context.Background())
+	defer ctxC()
+
+	// Get RegisterTicket.
+	mgmt := apb.NewManagementClient(cl.mgmtConn)
+	resT, err := mgmt.GetRegisterTicket(ctx, &apb.GetRegisterTicketRequest{})
+	if err != nil {
+		t.Fatalf("GetRegisterTicket: %v", err)
+	}
+
+	// Register as other node.
+	curOther := ipb.NewCuratorClient(cl.otherNodeConn)
+	_, err = curOther.RegisterNode(ctx, &ipb.RegisterNodeRequest{RegisterTicket: resT.Ticket})
+	if err != nil {
+		t.Fatalf("RegisterNode: %v", err)
+	}
+
+	// Ensure that the cluster sees the node as NEW via GetNodes.
+	resN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
+	if err != nil {
+		t.Fatalf("GetNodes: %v", err)
+	}
+	// Receive nodes from GetNodes until we find the node that we just registered.
+	var otherNode *apb.Node
+	for {
+		node, err := resN.Recv()
+		if err != nil {
+			t.Fatalf("GetNodes.Recv: %v", err)
+		}
+
+		id := identity.NodeID(node.Pubkey)
+		if id == cl.otherNodeID {
+			otherNode = node
+			break
+		}
+	}
+	// Ensure that this node is marked as NEW.
+	if want, got := cpb.NodeState_NODE_STATE_NEW, otherNode.State; want != got {
+		t.Fatalf("Newly registerd should have state %s, got %s", want, got)
+	}
+}
diff --git a/metropolis/node/core/curator/listener.go b/metropolis/node/core/curator/listener.go
index aea6f1d..a562063 100644
--- a/metropolis/node/core/curator/listener.go
+++ b/metropolis/node/core/curator/listener.go
@@ -392,3 +392,26 @@
 	})
 	return
 }
+
+type managementGetNodesServer struct {
+	grpc.ServerStream
+	ctx context.Context
+}
+
+func (s *managementGetNodesServer) Context() context.Context {
+	return s.ctx
+}
+
+func (s *managementGetNodesServer) Send(m *apb.Node) error {
+	return s.ServerStream.SendMsg(m)
+}
+
+func (l *listener) GetNodes(req *apb.GetNodesRequest, srv apb.Management_GetNodesServer) error {
+	proxy := func(ctx context.Context, impl rpc.ClusterExternalServices) error {
+		return impl.GetNodes(req, &managementGetNodesServer{
+			ServerStream: srv,
+			ctx:          ctx,
+		})
+	}
+	return l.callImpl(srv.Context(), proxy)
+}
diff --git a/metropolis/proto/api/management.proto b/metropolis/proto/api/management.proto
index 44db0c5..63336c8 100644
--- a/metropolis/proto/api/management.proto
+++ b/metropolis/proto/api/management.proto
@@ -5,7 +5,9 @@
 import "metropolis/proto/common/common.proto";
 import "metropolis/proto/ext/authorization.proto";
 
-// Management service available to Cluster Managers.
+// Management service available to Cluster Managers, allowing operational work
+// to be performed on the cluster (eg. adding nodes, retrieving information
+// about a running cluster, etc.).
 service Management {
     // GetRegisterTicket retrieves the current RegisterTicket which is required
     // for new nodes to register into the cluster. Presenting this ticket on
@@ -18,6 +20,7 @@
             need: PERMISSION_GET_REGISTER_TICKET
         };
     }
+
     // GetClusterInfo retrieves publicly available summary information about
     // this cluster, notably data required for nodes to register into a cluster
     // or join it (other than the Register Ticket, which is gated by an
@@ -27,6 +30,14 @@
             need: PERMISSION_READ_CLUSTER_STATUS
         };
     }
+
+    // GetNodes retrieves information about nodes in the cluster. Currently,
+    // it returns all available data about all nodes.
+    rpc GetNodes(GetNodesRequest) returns (stream Node) {
+        option (metropolis.proto.ext.authorization) = {
+            need: PERMISSION_READ_CLUSTER_STATUS
+        };
+    }
 }
 
 message GetRegisterTicketRequest {
@@ -48,3 +59,29 @@
     // ca_certificate is the x509 DER encoded CA certificate of the cluster.
     bytes ca_certificate = 2;
 }
+
+message GetNodesRequest {
+}
+
+// Node in a Metropolis cluster, streamed by Management.GetNodes. For each node
+// in the cluster, this message will be emitted and will contain information
+// about that node.
+//
+// The fields contained are node fields that PERMISSION_READ_CLUSTER_STATUS
+// allows access to, ie. 'non-private' fields, ones that might be internal to
+// the cluster and possibly considered sensitive information about the
+// infrastructure, but whose knowledge does not allow to escalate privileges
+// within the cluster.
+message Node {
+    // Raw Ed25519 public key of this node, which can be used to generate
+    // the node's ID. This is always set.
+    bytes pubkey = 1;
+    // State of the node from the point of view of the cluster. This is
+    // always set.
+    metropolis.proto.common.NodeState state = 2;
+    // Last reported status by the Node, absent if a node hasn't yet reported
+    // its status.
+    metropolis.proto.common.NodeStatus status = 3;
+    // Roles assigned by the cluster. This is always set.
+    metropolis.proto.common.NodeRoles roles = 4;
+}