m/n/c/curator: implement Management.GetNodes
This is a management call that provides detailed per-node details.
Currently it returns all information about all nodes, but can be then
extended to allow filtering and selective/masked field retrieval.
This call is then used to implement a test which exercises
Curator.NodeRegister and GetNodes.
Change-Id: Ia093d9f03a4213b01acbb0fdac9714d8e7b02dd3
Reviewed-on: https://review.monogon.dev/c/monogon/+/434
Reviewed-by: Mateusz Zalega <mateusz@monogon.tech>
diff --git a/metropolis/node/core/curator/impl_leader_aaa.go b/metropolis/node/core/curator/impl_leader_aaa.go
index 43e325c..a66c249 100644
--- a/metropolis/node/core/curator/impl_leader_aaa.go
+++ b/metropolis/node/core/curator/impl_leader_aaa.go
@@ -33,7 +33,7 @@
res, err := a.etcd.Get(ctx, initialOwnerEtcdPath)
if err != nil {
if !errors.Is(err, ctx.Err()) {
- // TODO(q3k): log
+ // TODO(issues/85): log
return nil, status.Error(codes.Unavailable, "could not retrieve initial owner status in etcd")
}
return nil, err
diff --git a/metropolis/node/core/curator/impl_leader_curator.go b/metropolis/node/core/curator/impl_leader_curator.go
index c566040..9ff7d91 100644
--- a/metropolis/node/core/curator/impl_leader_curator.go
+++ b/metropolis/node/core/curator/impl_leader_curator.go
@@ -73,7 +73,7 @@
if rpcErr, ok := rpcError(err); ok {
return rpcErr
}
- // TODO(q3k): log err
+ // TODO(issues/85): log err
return status.Error(codes.Unavailable, "internal error")
}
@@ -106,7 +106,7 @@
break
}
if err != nil {
- // TODO(q3k): log err
+ // TODO(issues/85): log err
return status.Error(codes.Unavailable, "internal error during initial fetch")
}
nodeKV := v.(nodeAtID)
@@ -144,7 +144,7 @@
for {
v, err := w.Get(ctx)
if err != nil {
- // TODO(q3k): log err
+ // TODO(issues/85): log err
return status.Errorf(codes.Unavailable, "internal error during update")
}
we := &ipb.WatchEvent{}
@@ -285,7 +285,7 @@
// valid.
wantTicket, err := l.ensureRegisterTicket(ctx)
if err != nil {
- // TODO(q3k): log err
+ // TODO(issues/85): log err
return nil, status.Error(codes.Unavailable, "could not retrieve register ticket")
}
gotTicket := req.RegisterTicket
@@ -304,7 +304,7 @@
id := identity.NodeID(pubkey)
key, err := nodeEtcdPrefix.Key(id)
if err != nil {
- // TODO(q3k): log err
+ // TODO(issues/85): log err
return nil, status.Errorf(codes.InvalidArgument, "invalid node id")
}
res, err := l.txnAsLeader(ctx, clientv3.OpGet(key))
@@ -312,14 +312,14 @@
if rpcErr, ok := rpcError(err); ok {
return nil, rpcErr
}
- // TODO(q3k): log this
+ // TODO(issues/85): log this
return nil, status.Errorf(codes.Unavailable, "could not retrieve node %s: %v", id, err)
}
kvs := res.Responses[0].GetResponseRange().Kvs
if len(kvs) > 0 {
node, err := nodeUnmarshal(kvs[0].Value)
if err != nil {
- // TODO(q3k): log this
+ // TODO(issues/85): log this
return nil, status.Errorf(codes.Unavailable, "could not unmarshal node")
}
// If the existing node is in the NEW state already, there's nothing to do,
@@ -331,7 +331,9 @@
// We can return a bit more information to the calling node here, as if it's in
// possession of the private key corresponding to an existing node in the
// cluster, it should have access to the status of the node without danger of
- // leaking data about other nodes. TODO(q3k): log this
+ // leaking data about other nodes.
+ //
+ // TODO(issues/85): log this
return nil, status.Errorf(codes.FailedPrecondition, "node already exists in cluster, state %s", node.state.String())
}
@@ -342,12 +344,12 @@
}
nodeBytes, err := proto.Marshal(node.proto())
if err != nil {
- // TODO(q3k): log this
+ // TODO(issues/85): log this
return nil, status.Errorf(codes.Unavailable, "could not marshal new node")
}
_, err = l.txnAsLeader(ctx, clientv3.OpPut(key, string(nodeBytes)))
if err != nil {
- // TODO(q3k): log this
+ // TODO(issues/85): log this
return nil, status.Error(codes.Unavailable, "could not save new node")
}
diff --git a/metropolis/node/core/curator/impl_leader_management.go b/metropolis/node/core/curator/impl_leader_management.go
index dfc07a8..361a251 100644
--- a/metropolis/node/core/curator/impl_leader_management.go
+++ b/metropolis/node/core/curator/impl_leader_management.go
@@ -53,7 +53,7 @@
}, nil
}
-// GetClusterInfo implements Curator.GetClusterInfo, which returns summary
+// GetClusterInfo implements Management.GetClusterInfo, which returns summary
// information about the Metropolis cluster.
func (l *leaderManagement) GetClusterInfo(ctx context.Context, req *apb.GetClusterInfoRequest) (*apb.GetClusterInfoResponse, error) {
res, err := l.txnAsLeader(ctx, nodeEtcdPrefix.Range())
@@ -71,7 +71,7 @@
for _, kv := range kvs {
node, err := nodeUnmarshal(kv.Value)
if err != nil {
- // TODO(q3k): log this
+ // TODO(issues/85): log this
continue
}
if node.state != cpb.NodeState_NODE_STATE_UP {
@@ -108,3 +108,46 @@
CaCertificate: l.node.ClusterCA().Raw,
}, nil
}
+
+// GetNodes implements Management.GetNodes, which returns a list of nodes from
+// the point of view of the cluster.
+func (l *leaderManagement) GetNodes(_ *apb.GetNodesRequest, srv apb.Management_GetNodesServer) error {
+ ctx := srv.Context()
+
+ l.muNodes.Lock()
+ defer l.muNodes.Unlock()
+
+ // Retrieve all nodes from etcd in a single Get call.
+ res, err := l.txnAsLeader(ctx, nodeEtcdPrefix.Range())
+ if err != nil {
+ return status.Errorf(codes.Unavailable, "could not retrieve list of nodes: %v", err)
+ }
+
+ // Convert etcd data into proto nodes, send one streaming response for each
+ // node.
+ kvs := res.Responses[0].GetResponseRange().Kvs
+ for _, kv := range kvs {
+ node, err := nodeUnmarshal(kv.Value)
+ if err != nil {
+ // TODO(issues/85): log this
+ continue
+ }
+
+ // Convert node roles.
+ roles := &cpb.NodeRoles{}
+ if node.kubernetesWorker != nil {
+ roles.KubernetesWorker = &cpb.NodeRoles_KubernetesWorker{}
+ }
+
+ if err := srv.Send(&apb.Node{
+ Pubkey: node.pubkey,
+ State: node.state,
+ Status: node.status,
+ Roles: roles,
+ }); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
diff --git a/metropolis/node/core/curator/impl_leader_test.go b/metropolis/node/core/curator/impl_leader_test.go
index 2e1dfe8..2608d08 100644
--- a/metropolis/node/core/curator/impl_leader_test.go
+++ b/metropolis/node/core/curator/impl_leader_test.go
@@ -597,3 +597,51 @@
t.Fatalf("CaPublicKey mismatch (wanted %s, got %s)", hex.EncodeToString(want), hex.EncodeToString(got))
}
}
+
+// TestRegisterNode is a smoke test for Curator.RegisterNode and
+// Management.GetNodes.
+func TestRegisterNode(t *testing.T) {
+ cl := fakeLeader(t)
+ defer cl.cancel()
+
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+
+ // Get RegisterTicket.
+ mgmt := apb.NewManagementClient(cl.mgmtConn)
+ resT, err := mgmt.GetRegisterTicket(ctx, &apb.GetRegisterTicketRequest{})
+ if err != nil {
+ t.Fatalf("GetRegisterTicket: %v", err)
+ }
+
+ // Register as other node.
+ curOther := ipb.NewCuratorClient(cl.otherNodeConn)
+ _, err = curOther.RegisterNode(ctx, &ipb.RegisterNodeRequest{RegisterTicket: resT.Ticket})
+ if err != nil {
+ t.Fatalf("RegisterNode: %v", err)
+ }
+
+ // Ensure that the cluster sees the node as NEW via GetNodes.
+ resN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
+ if err != nil {
+ t.Fatalf("GetNodes: %v", err)
+ }
+ // Receive nodes from GetNodes until we find the node that we just registered.
+ var otherNode *apb.Node
+ for {
+ node, err := resN.Recv()
+ if err != nil {
+ t.Fatalf("GetNodes.Recv: %v", err)
+ }
+
+ id := identity.NodeID(node.Pubkey)
+ if id == cl.otherNodeID {
+ otherNode = node
+ break
+ }
+ }
+ // Ensure that this node is marked as NEW.
+ if want, got := cpb.NodeState_NODE_STATE_NEW, otherNode.State; want != got {
+ t.Fatalf("Newly registerd should have state %s, got %s", want, got)
+ }
+}
diff --git a/metropolis/node/core/curator/listener.go b/metropolis/node/core/curator/listener.go
index aea6f1d..a562063 100644
--- a/metropolis/node/core/curator/listener.go
+++ b/metropolis/node/core/curator/listener.go
@@ -392,3 +392,26 @@
})
return
}
+
+type managementGetNodesServer struct {
+ grpc.ServerStream
+ ctx context.Context
+}
+
+func (s *managementGetNodesServer) Context() context.Context {
+ return s.ctx
+}
+
+func (s *managementGetNodesServer) Send(m *apb.Node) error {
+ return s.ServerStream.SendMsg(m)
+}
+
+func (l *listener) GetNodes(req *apb.GetNodesRequest, srv apb.Management_GetNodesServer) error {
+ proxy := func(ctx context.Context, impl rpc.ClusterExternalServices) error {
+ return impl.GetNodes(req, &managementGetNodesServer{
+ ServerStream: srv,
+ ctx: ctx,
+ })
+ }
+ return l.callImpl(srv.Context(), proxy)
+}
diff --git a/metropolis/proto/api/management.proto b/metropolis/proto/api/management.proto
index 44db0c5..63336c8 100644
--- a/metropolis/proto/api/management.proto
+++ b/metropolis/proto/api/management.proto
@@ -5,7 +5,9 @@
import "metropolis/proto/common/common.proto";
import "metropolis/proto/ext/authorization.proto";
-// Management service available to Cluster Managers.
+// Management service available to Cluster Managers, allowing operational work
+// to be performed on the cluster (eg. adding nodes, retrieving information
+// about a running cluster, etc.).
service Management {
// GetRegisterTicket retrieves the current RegisterTicket which is required
// for new nodes to register into the cluster. Presenting this ticket on
@@ -18,6 +20,7 @@
need: PERMISSION_GET_REGISTER_TICKET
};
}
+
// GetClusterInfo retrieves publicly available summary information about
// this cluster, notably data required for nodes to register into a cluster
// or join it (other than the Register Ticket, which is gated by an
@@ -27,6 +30,14 @@
need: PERMISSION_READ_CLUSTER_STATUS
};
}
+
+ // GetNodes retrieves information about nodes in the cluster. Currently,
+ // it returns all available data about all nodes.
+ rpc GetNodes(GetNodesRequest) returns (stream Node) {
+ option (metropolis.proto.ext.authorization) = {
+ need: PERMISSION_READ_CLUSTER_STATUS
+ };
+ }
}
message GetRegisterTicketRequest {
@@ -48,3 +59,29 @@
// ca_certificate is the x509 DER encoded CA certificate of the cluster.
bytes ca_certificate = 2;
}
+
+message GetNodesRequest {
+}
+
+// Node in a Metropolis cluster, streamed by Management.GetNodes. For each node
+// in the cluster, this message will be emitted and will contain information
+// about that node.
+//
+// The fields contained are node fields that PERMISSION_READ_CLUSTER_STATUS
+// allows access to, ie. 'non-private' fields, ones that might be internal to
+// the cluster and possibly considered sensitive information about the
+// infrastructure, but whose knowledge does not allow to escalate privileges
+// within the cluster.
+message Node {
+ // Raw Ed25519 public key of this node, which can be used to generate
+ // the node's ID. This is always set.
+ bytes pubkey = 1;
+ // State of the node from the point of view of the cluster. This is
+ // always set.
+ metropolis.proto.common.NodeState state = 2;
+ // Last reported status by the Node, absent if a node hasn't yet reported
+ // its status.
+ metropolis.proto.common.NodeStatus status = 3;
+ // Roles assigned by the cluster. This is always set.
+ metropolis.proto.common.NodeRoles roles = 4;
+}