m/n/c/curator: add Management.UpdateNodeRoles
This provides an API for node role adjustments.
While changes to KubernetesWorker role are registered, not all side
effects are accounted for as of now. Specifically, disabling this role
within a node won't lead to its removal from the Kubernetes cluster.
Change-Id: Ie8e65990108b8cf82afecf3374f40f2e857fa776
Reviewed-on: https://review.monogon.dev/c/monogon/+/767
Tested-by: Jenkins CI
Reviewed-by: Sergiusz Bazanski <serge@monogon.tech>
diff --git a/metropolis/node/core/consensus/status.go b/metropolis/node/core/consensus/status.go
index e2b15f8..a19fa14 100644
--- a/metropolis/node/core/consensus/status.go
+++ b/metropolis/node/core/consensus/status.go
@@ -76,6 +76,11 @@
// stopped is set to true if the underlying service has been stopped or hasn't
// yet been started.
stopped bool
+
+ // noClusterMemberManagement disables etcd cluster member management in
+ // UpdateNodeRoles. This is currently necessary in order to test the call,
+ // due to limitations of the test harness.
+ noClusterMemberManagement bool
}
// Running returns true if this status represents a running consensus service
@@ -188,7 +193,7 @@
return nil, fmt.Errorf("could not retrieve initial CRL: %w", err)
}
- if !newExists {
+ if !newExists && !s.noClusterMemberManagement {
addr := fmt.Sprintf("https://%s", net.JoinHostPort(name, strconv.Itoa(port)))
if _, err := s.cl.MemberAddAsLearner(ctx, []string{addr}); err != nil {
return nil, fmt.Errorf("could not add new member as learner: %w", err)
diff --git a/metropolis/node/core/consensus/testhelpers.go b/metropolis/node/core/consensus/testhelpers.go
index 32bbc46..f69f73e 100644
--- a/metropolis/node/core/consensus/testhelpers.go
+++ b/metropolis/node/core/consensus/testhelpers.go
@@ -27,8 +27,9 @@
tsh := testServiceHandle{}
st := &Status{
- cl: cl,
- ca: ca,
+ cl: cl,
+ ca: ca,
+ noClusterMemberManagement: true,
}
etcdPKI, err := st.pkiClient()
if err != nil {
diff --git a/metropolis/node/core/curator/impl_leader_management.go b/metropolis/node/core/curator/impl_leader_management.go
index ff9a1f3..bc65e42 100644
--- a/metropolis/node/core/curator/impl_leader_management.go
+++ b/metropolis/node/core/curator/impl_leader_management.go
@@ -282,3 +282,65 @@
return &apb.ApproveNodeResponse{}, nil
}
+
+// UpdateNodeRoles implements Management.UpdateNodeRoles, which in addition to
+// adjusting the affected node's representation within the cluster, can also
+// trigger the addition of a new etcd learner node.
+func (l *leaderManagement) UpdateNodeRoles(ctx context.Context, req *apb.UpdateNodeRolesRequest) (*apb.UpdateNodeRolesResponse, error) {
+ if len(req.Pubkey) != ed25519.PublicKeySize {
+ return nil, status.Errorf(codes.InvalidArgument, "pubkey must be %d bytes long", ed25519.PublicKeySize)
+ }
+
+ // Take l.muNodes before modifying the node.
+ l.muNodes.Lock()
+ defer l.muNodes.Unlock()
+
+ // Find the node matching the requested public key.
+ id := identity.NodeID(req.Pubkey)
+ node, err := nodeLoad(ctx, l.leadership, id)
+ if err != nil {
+ return nil, status.Errorf(codes.InvalidArgument, "while loading node %s: %v", id, err)
+ }
+
+ // Adjust each role, if a corresponding value is set within the request. Do
+ // nothing, if the role is already matches the requested value.
+
+ if req.ConsensusMember != nil {
+ if *req.ConsensusMember {
+ // Add a new etcd learner node.
+ w := l.consensus.Watch()
+ defer w.Close()
+
+ st, err := w.GetRunning(ctx)
+ if err != nil {
+ return nil, status.Errorf(codes.Unavailable, "could not get running consensus: %v", err)
+ }
+
+ join, err := st.AddNode(ctx, node.pubkey)
+ if err != nil {
+ return nil, status.Errorf(codes.Unavailable, "could not add node: %v", err)
+ }
+
+ // Modify the node's state to reflect the change.
+ node.EnableConsensusMember(join)
+ } else {
+ node.DisableConsensusMember()
+ }
+ }
+
+ if req.KubernetesWorker != nil {
+ if *req.KubernetesWorker {
+ if node.consensusMember == nil {
+ return nil, status.Errorf(codes.InvalidArgument, "could not set role: Kubernetes worker nodes must also be consensus members.")
+ }
+ node.EnableKubernetesWorker()
+ } else {
+ node.DisableKubernetesWorker()
+ }
+ }
+
+ if err := nodeSave(ctx, l.leadership, node); err != nil {
+ return nil, err
+ }
+ return &apb.UpdateNodeRolesResponse{}, nil
+}
diff --git a/metropolis/node/core/curator/impl_leader_test.go b/metropolis/node/core/curator/impl_leader_test.go
index 337be8e..cf16af8 100644
--- a/metropolis/node/core/curator/impl_leader_test.go
+++ b/metropolis/node/core/curator/impl_leader_test.go
@@ -235,6 +235,58 @@
etcd client.Namespaced
}
+// putNode is a helper function that creates a new node within the cluster,
+// given its initial state.
+func putNode(t *testing.T, ctx context.Context, l *leadership, state cpb.NodeState) *Node {
+ t.Helper()
+
+ npub, _, err := ed25519.GenerateKey(rand.Reader)
+ if err != nil {
+ t.Fatalf("could not generate node keypair: %v", err)
+ }
+ jpub, _, err := ed25519.GenerateKey(rand.Reader)
+ if err != nil {
+ t.Fatalf("could not generate join keypair: %v", err)
+ }
+ cuk := []byte("fakefakefakefakefakefakefakefake")
+ node := &Node{
+ clusterUnlockKey: cuk,
+ pubkey: npub,
+ jkey: jpub,
+ state: state,
+ }
+ if err := nodeSave(ctx, l, node); err != nil {
+ t.Fatalf("nodeSave failed: %v", err)
+ }
+ return node
+}
+
+// getNodes wraps management.GetNodes, given a CEL filter expression as
+// the request payload.
+func getNodes(t *testing.T, ctx context.Context, mgmt apb.ManagementClient, filter string) []*apb.Node {
+ t.Helper()
+
+ res, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{
+ Filter: filter,
+ })
+ if err != nil {
+ t.Fatalf("GetNodes failed: %v", err)
+ }
+
+ var nodes []*apb.Node
+ for {
+ node, err := res.Recv()
+ if err != nil && err != io.EOF {
+ t.Fatalf("Recv failed: %v", err)
+ }
+ if err == io.EOF {
+ break
+ }
+ nodes = append(nodes, node)
+ }
+ return nodes
+}
+
// TestWatchNodeInCluster exercises a NodeInCluster Watch, from node creation,
// through updates, to its deletion.
func TestWatchNodeInCluster(t *testing.T) {
@@ -878,56 +930,8 @@
ctx, ctxC := context.WithCancel(context.Background())
defer ctxC()
- // putNode creates a new node within the cluster, given its initial state.
- putNode := func(state cpb.NodeState) *Node {
- npub, _, err := ed25519.GenerateKey(rand.Reader)
- if err != nil {
- t.Fatalf("could not generate node keypair: %v", err)
- }
- jpub, _, err := ed25519.GenerateKey(rand.Reader)
- if err != nil {
- t.Fatalf("could not generate join keypair: %v", err)
- }
- cuk := []byte("fakefakefakefakefakefakefakefake")
- node := &Node{
- clusterUnlockKey: cuk,
- pubkey: npub,
- jkey: jpub,
- state: state,
- }
- if err := nodeSave(ctx, cl.l, node); err != nil {
- t.Fatalf("nodeSave failed: %v", err)
- }
- return node
- }
-
mgmt := apb.NewManagementClient(cl.mgmtConn)
- // getNodes calls mgmt.GetNodes, given a CEL filter expression as
- // an argument.
- getNodes := func(filter string) []*apb.Node {
- var nodes []*apb.Node
-
- res, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{
- Filter: filter,
- })
- if err != nil {
- t.Fatalf("GetNodes failed: %v", err)
- }
-
- for {
- node, err := res.Recv()
- if err != nil && err != io.EOF {
- t.Fatalf("Recv failed: %v", err)
- }
- if err == io.EOF {
- break
- }
- nodes = append(nodes, node)
- }
- return nodes
- }
-
// exists returns true, if node n exists within nodes returned by getNodes.
exists := func(n *Node, nodes []*apb.Node) bool {
for _, e := range nodes {
@@ -940,13 +944,13 @@
// Create additional nodes, to be used in test cases below.
var nodes []*Node
- nodes = append(nodes, putNode(cpb.NodeState_NODE_STATE_NEW))
- nodes = append(nodes, putNode(cpb.NodeState_NODE_STATE_UP))
- nodes = append(nodes, putNode(cpb.NodeState_NODE_STATE_UP))
+ nodes = append(nodes, putNode(t, ctx, cl.l, cpb.NodeState_NODE_STATE_NEW))
+ nodes = append(nodes, putNode(t, ctx, cl.l, cpb.NodeState_NODE_STATE_UP))
+ nodes = append(nodes, putNode(t, ctx, cl.l, cpb.NodeState_NODE_STATE_UP))
// Call mgmt.GetNodes without a filter expression. The result r should contain
// all existing nodes.
- r := getNodes("")
+ r := getNodes(t, ctx, mgmt, "")
if !exists(nodes[0], r) {
t.Fatalf("a node is missing in management.GetNodes result.")
}
@@ -956,7 +960,7 @@
// mgmt.GetNodes, provided with the below expression, should return all nodes
// which state matches NODE_STATE_UP.
- r = getNodes("node.state == NODE_STATE_UP")
+ r = getNodes(t, ctx, mgmt, "node.state == NODE_STATE_UP")
// Hence, the second and third node both should be included in the query
// result.
if !exists(nodes[1], r) {
@@ -970,3 +974,101 @@
t.Fatalf("management.GetNodes didn't filter out an undesired node.")
}
}
+
+// TestUpdateNodeRoles exercises management.UpdateNodeRoles by running it
+// against some newly created nodes, and verifying the effect by examining
+// results delivered by a subsequent call to management.GetNodes.
+func TestUpdateNodeRoles(t *testing.T) {
+ cl := fakeLeader(t)
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+
+ // Create the test nodes.
+ var tn []*Node
+ tn = append(tn, putNode(t, ctx, cl.l, cpb.NodeState_NODE_STATE_UP))
+ tn = append(tn, putNode(t, ctx, cl.l, cpb.NodeState_NODE_STATE_UP))
+ tn = append(tn, putNode(t, ctx, cl.l, cpb.NodeState_NODE_STATE_UP))
+
+ // Define the test payloads. Each role is optional, and will be updated
+ // only if it's not nil, and its value differs from the current state.
+ opt := func(v bool) *bool { return &v }
+ ue := []*apb.UpdateNodeRolesRequest{
+ &apb.UpdateNodeRolesRequest{
+ Pubkey: tn[0].pubkey,
+ KubernetesWorker: opt(false),
+ ConsensusMember: opt(false),
+ },
+ &apb.UpdateNodeRolesRequest{
+ Pubkey: tn[1].pubkey,
+ KubernetesWorker: opt(false),
+ ConsensusMember: opt(true),
+ },
+ &apb.UpdateNodeRolesRequest{
+ Pubkey: tn[2].pubkey,
+ KubernetesWorker: opt(true),
+ ConsensusMember: opt(true),
+ },
+ &apb.UpdateNodeRolesRequest{
+ Pubkey: tn[2].pubkey,
+ KubernetesWorker: nil,
+ ConsensusMember: nil,
+ },
+ }
+
+ // The following UpdateNodeRoles requests rely on noClusterMemberManagement
+ // being set in the running consensus Status. (see: consensus/testhelpers.go)
+ // Normally, adding another ConsensusMember would result in a creation of
+ // another etcd learner. Since the cluster can accomodate only one learner
+ // at a time, UpdateNodeRoles would block.
+
+ // Run all the request payloads defined in ue, with an expectation of all of
+ // them succeeding.
+ mgmt := apb.NewManagementClient(cl.mgmtConn)
+ for _, e := range ue {
+ _, err := mgmt.UpdateNodeRoles(ctx, e)
+ if err != nil {
+ t.Fatalf("management.UpdateNodeRoles: %v", err)
+ }
+ }
+
+ // Verify that node roles have indeed been updated.
+ cn := getNodes(t, ctx, mgmt, "")
+ for i, e := range ue {
+ for _, n := range cn {
+ if bytes.Equal(n.Pubkey, e.Pubkey) {
+ if e.KubernetesWorker != nil {
+ if *e.KubernetesWorker != (n.Roles.KubernetesWorker != nil) {
+ t.Fatalf("KubernetesWorker role mismatch (node %d/%d).", i+1, len(ue))
+ }
+ }
+ if e.ConsensusMember != nil {
+ if *e.ConsensusMember != (n.Roles.ConsensusMember != nil) {
+ t.Fatalf("ConsensusMember role mismatch (node %d/%d).", i+1, len(ue))
+ }
+ }
+ }
+ }
+ }
+
+ // Try running a request containing a contradictory set of roles. A cluster
+ // node currently can't be a KubernetesWorker if it's not a ConsensusMember
+ // as well.
+ uf := []*apb.UpdateNodeRolesRequest{
+ &apb.UpdateNodeRolesRequest{
+ Pubkey: tn[0].pubkey,
+ KubernetesWorker: opt(true),
+ ConsensusMember: opt(false),
+ },
+ &apb.UpdateNodeRolesRequest{
+ Pubkey: tn[0].pubkey,
+ KubernetesWorker: opt(true),
+ ConsensusMember: nil,
+ },
+ }
+ for _, e := range uf {
+ _, err := mgmt.UpdateNodeRoles(ctx, e)
+ if err == nil {
+ t.Fatalf("expected an error from management.UpdateNodeRoles, got nil.")
+ }
+ }
+}
diff --git a/metropolis/node/core/curator/state_node.go b/metropolis/node/core/curator/state_node.go
index 9989987..f376c28 100644
--- a/metropolis/node/core/curator/state_node.go
+++ b/metropolis/node/core/curator/state_node.go
@@ -159,6 +159,10 @@
n.kubernetesWorker = &NodeRoleKubernetesWorker{}
}
+func (n *Node) DisableKubernetesWorker() {
+ n.kubernetesWorker = nil
+}
+
func (n *Node) EnableConsensusMember(jc *consensus.JoinCluster) {
peers := make([]NodeRoleConsensusMemberPeer, len(jc.ExistingNodes))
for i, n := range jc.ExistingNodes {
@@ -173,6 +177,10 @@
}
}
+func (n *Node) DisableConsensusMember() {
+ n.consensusMember = nil
+}
+
var (
// nodeEtcdPrefix is an etcd key prefix preceding cluster member node IDs,
// mapping to ppb.Node values.
diff --git a/metropolis/proto/api/management.proto b/metropolis/proto/api/management.proto
index 17ff6af..ba20cee 100644
--- a/metropolis/proto/api/management.proto
+++ b/metropolis/proto/api/management.proto
@@ -60,6 +60,13 @@
need: PERMISSION_APPROVE_NODE
};
}
+
+ // UpdateNodeRoles updates a single node's roles.
+ rpc UpdateNodeRoles(UpdateNodeRolesRequest) returns (UpdateNodeRolesResponse) {
+ option (metropolis.proto.ext.authorization) = {
+ need: PERMISSION_UPDATE_NODE_ROLES
+ };
+ }
}
message GetRegisterTicketRequest {
@@ -144,3 +151,20 @@
message ApproveNodeResponse {
}
+
+// UpdateNodeRolesRequest updates roles of a single node matching pubkey. All
+// role fields are optional, and no change will result if they're either unset
+// or if their value matches existing state.
+message UpdateNodeRolesRequest {
+ // pubkey is the Ed25519 public key of this node, which can be used to
+ // generate the node's ID. This is always set.
+ bytes pubkey = 1;
+
+ // kubernetesWorker adjusts the appropriate role when set. Nodes performing
+ // this role must also be consensus members.
+ optional bool kubernetesWorker = 2;
+ optional bool consensusMember = 3;
+}
+
+message UpdateNodeRolesResponse {
+}
diff --git a/metropolis/proto/ext/authorization.proto b/metropolis/proto/ext/authorization.proto
index 60ad68a..0275bba 100644
--- a/metropolis/proto/ext/authorization.proto
+++ b/metropolis/proto/ext/authorization.proto
@@ -23,6 +23,7 @@
PERMISSION_READ_CLUSTER_STATUS = 2;
PERMISSION_UPDATE_NODE_SELF = 3;
PERMISSION_APPROVE_NODE = 4;
+ PERMISSION_UPDATE_NODE_ROLES = 5;
}
// Authorization policy for an RPC method. This message/API does not have the