m/n/c/curator: add UpdateStatus
This implements Curator.UpdateStatus, which lets nodes self-report some
status items. Currently this is their external IP address, which is
needed to generate a Cluster Directory which is in turn needed to
register into a cluster.
Change-Id: Ib5464ca78ee3466d9b9f89b7af8b40f613ae8dcc
Reviewed-on: https://review.monogon.dev/c/monogon/+/332
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/node/core/curator/BUILD.bazel b/metropolis/node/core/curator/BUILD.bazel
index 22e237f..8eb9a49 100644
--- a/metropolis/node/core/curator/BUILD.bazel
+++ b/metropolis/node/core/curator/BUILD.bazel
@@ -54,6 +54,7 @@
embed = [":go_default_library"],
deps = [
"//metropolis/node/core/consensus/client:go_default_library",
+ "//metropolis/node/core/curator/proto/api:go_default_library",
"//metropolis/node/core/identity:go_default_library",
"//metropolis/node/core/localstorage:go_default_library",
"//metropolis/node/core/localstorage/declarative:go_default_library",
@@ -61,6 +62,7 @@
"//metropolis/pkg/event/memory:go_default_library",
"//metropolis/pkg/supervisor:go_default_library",
"//metropolis/proto/api:go_default_library",
+ "//metropolis/proto/common:go_default_library",
"@io_etcd_go_etcd//clientv3:go_default_library",
"@io_etcd_go_etcd//integration:go_default_library",
"@org_golang_google_grpc//:go_default_library",
diff --git a/metropolis/node/core/curator/impl_follower.go b/metropolis/node/core/curator/impl_follower.go
index cf01b47..f1556a0 100644
--- a/metropolis/node/core/curator/impl_follower.go
+++ b/metropolis/node/core/curator/impl_follower.go
@@ -24,3 +24,7 @@
func (f *curatorFollower) GetRegisterTicket(_ context.Context, _ *apb.GetRegisterTicketRequest) (*apb.GetRegisterTicketResponse, error) {
return nil, status.Error(codes.Unimplemented, "curator follower not implemented")
}
+
+func (f *curatorFollower) UpdateNodeStatus(ctx context.Context, req *cpb.UpdateNodeStatusRequest) (*cpb.UpdateNodeStatusResponse, error) {
+ return nil, status.Error(codes.Unimplemented, "curator follower not implemented")
+}
diff --git a/metropolis/node/core/curator/impl_leader.go b/metropolis/node/core/curator/impl_leader.go
index 3ebfcfb..7767e48 100644
--- a/metropolis/node/core/curator/impl_leader.go
+++ b/metropolis/node/core/curator/impl_leader.go
@@ -77,8 +77,8 @@
func newCuratorLeader(l leadership) *curatorLeader {
return &curatorLeader{
- leaderCurator{l},
- leaderAAA{l},
- leaderManagement{l},
+ leaderCurator{leadership: l},
+ leaderAAA{leadership: l},
+ leaderManagement{leadership: l},
}
}
diff --git a/metropolis/node/core/curator/impl_leader_curator.go b/metropolis/node/core/curator/impl_leader_curator.go
index 1043d58..5a15c94 100644
--- a/metropolis/node/core/curator/impl_leader_curator.go
+++ b/metropolis/node/core/curator/impl_leader_curator.go
@@ -1,10 +1,17 @@
package curator
import (
+ "context"
+ "sync"
+
+ "go.etcd.io/etcd/clientv3"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
+ "google.golang.org/protobuf/proto"
cpb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+ "source.monogon.dev/metropolis/node/core/identity"
+ "source.monogon.dev/metropolis/node/core/rpc"
"source.monogon.dev/metropolis/pkg/event/etcd"
)
@@ -12,6 +19,15 @@
// leader.
type leaderCurator struct {
leadership
+
+ // muNodes guards any changes to nodes, and prevents race conditions where the
+ // curator performs a read-modify-write operation to node data. The curator's
+ // leadership ensure no two curators run simultaneously, and this lock ensures
+ // no two parallel curator operations race eachother.
+ //
+ // This lock has to be taken any time such RMW operation takes place when not
+ // additionally guarded using etcd transactions.
+ muNodes sync.Mutex
}
// Watch returns a stream of updates concerning some part of the cluster
@@ -54,8 +70,9 @@
ev := &cpb.WatchEvent{
Nodes: []*cpb.Node{
{
- Id: node.ID(),
- Roles: node.proto().Roles,
+ Id: node.ID(),
+ Roles: node.proto().Roles,
+ Status: node.status,
},
},
}
@@ -64,3 +81,69 @@
}
}
}
+
+// UpdateNodeStatus is called by nodes in the cluster to report their own
+// status. This status is recorded by the curator and can be retrieed via
+// Watch.
+func (l *leaderCurator) UpdateNodeStatus(ctx context.Context, req *cpb.UpdateNodeStatusRequest) (*cpb.UpdateNodeStatusResponse, error) {
+ // Ensure that the given node_id matches the calling node. We currently
+ // only allow for direct self-reporting of status by nodes.
+ pi := rpc.GetPeerInfo(ctx)
+ if pi == nil || pi.Node == nil {
+ return nil, status.Error(codes.PermissionDenied, "only nodes can update node status")
+ }
+ id := identity.NodeID(pi.Node.PublicKey)
+ if id != req.NodeId {
+ return nil, status.Errorf(codes.PermissionDenied, "node %q cannot update the status of node %q", id, req.NodeId)
+ }
+
+ // Verify sent status. Currently we assume the entire status must be set at
+ // once, and cannot be unset.
+ if req.Status == nil || req.Status.ExternalAddress == "" {
+ return nil, status.Errorf(codes.InvalidArgument, "Status and Status.ExternalAddress must be set")
+ }
+
+ // As we're performing a node update with two etcd transactions below (one
+ // to retrieve, one to save and upate node), take a local lock to ensure
+ // that we don't have a race between either two UpdateNodeStatus calls or
+ // an UpdateNodeStatus call and some other mutation to the node store.
+ l.muNodes.Lock()
+ defer l.muNodes.Unlock()
+
+ // Retrieve node ...
+ key, err := nodeEtcdPrefix.Key(id)
+ if err != nil {
+ return nil, status.Errorf(codes.InvalidArgument, "invalid node id")
+ }
+ res, err := l.txnAsLeader(ctx, clientv3.OpGet(key))
+ if err != nil {
+ if rpcErr, ok := rpcError(err); ok {
+ return nil, rpcErr
+ }
+ return nil, status.Errorf(codes.Unavailable, "could not retrieve node: %v", err)
+ }
+ kvs := res.Responses[0].GetResponseRange().Kvs
+ if len(kvs) < 1 {
+ return nil, status.Error(codes.NotFound, "no such node")
+ }
+ node, err := nodeUnmarshal(kvs[0].Value)
+ if err != nil {
+ return nil, status.Errorf(codes.Unavailable, "failed to unmarshal node: %v", err)
+ }
+ // ... update its' status ...
+ node.status = req.Status
+ // ... and save it to etcd.
+ bytes, err := proto.Marshal(node.proto())
+ if err != nil {
+ return nil, status.Errorf(codes.Unavailable, "failed to marshal node: %v", err)
+ }
+ _, err = l.txnAsLeader(ctx, clientv3.OpPut(key, string(bytes)))
+ if err != nil {
+ if rpcErr, ok := rpcError(err); ok {
+ return nil, rpcErr
+ }
+ return nil, status.Errorf(codes.Unavailable, "could not update node: %v", err)
+ }
+
+ return &cpb.UpdateNodeStatusResponse{}, nil
+}
diff --git a/metropolis/node/core/curator/impl_leader_management.go b/metropolis/node/core/curator/impl_leader_management.go
index 1895888..7284eb0 100644
--- a/metropolis/node/core/curator/impl_leader_management.go
+++ b/metropolis/node/core/curator/impl_leader_management.go
@@ -9,7 +9,7 @@
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
- cpb "source.monogon.dev/metropolis/node/core/curator/proto/private"
+ ppb "source.monogon.dev/metropolis/node/core/curator/proto/private"
apb "source.monogon.dev/metropolis/proto/api"
)
@@ -52,7 +52,7 @@
}
// No ticket, generate one.
- ticket := &cpb.RegisterTicket{
+ ticket := &ppb.RegisterTicket{
Opaque: make([]byte, registerTicketSize),
}
_, err = rand.Read(ticket.Opaque)
diff --git a/metropolis/node/core/curator/impl_leader_test.go b/metropolis/node/core/curator/impl_leader_test.go
index 593830d..d20541e 100644
--- a/metropolis/node/core/curator/impl_leader_test.go
+++ b/metropolis/node/core/curator/impl_leader_test.go
@@ -10,8 +10,10 @@
"google.golang.org/grpc/test/bufconn"
"source.monogon.dev/metropolis/node/core/consensus/client"
+ ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
"source.monogon.dev/metropolis/node/core/rpc"
apb "source.monogon.dev/metropolis/proto/api"
+ cpb "source.monogon.dev/metropolis/proto/common"
)
// fakeLeader creates a curatorLeader without any underlying leader election, in
@@ -63,7 +65,7 @@
// Build a test cluster PKI and node/manager certificates, and create the
// listener security parameters which will authenticate incoming requests.
- ephemeral := rpc.NewEphemeralClusterCredentials(t, 1)
+ ephemeral := rpc.NewEphemeralClusterCredentials(t, 2)
nodeCredentials := ephemeral.Nodes[0]
cNode := NewNodeForBootstrap(nil, nodeCredentials.PublicKey())
@@ -127,6 +129,7 @@
mgmtConn: mcl,
localNodeConn: lcl,
localNodeID: nodeCredentials.ID(),
+ otherNodeID: ephemeral.Nodes[1].ID(),
cancel: ctxC,
}
}
@@ -143,6 +146,9 @@
localNodeConn grpc.ClientConnInterface
// localNodeID is the NodeID of the fake node that the leader is running on.
localNodeID string
+ // otherNodeID is the NodeID of some other node present in the curator
+ // state.
+ otherNodeID string
// cancel shuts down the fake leader and all client connections.
cancel context.CancelFunc
}
@@ -175,3 +181,74 @@
t.Errorf("Unexpected ticket change between calls")
}
}
+
+// TestClusterUpdateNodeStatus exercises the Curator.UpdateNodeStatus RPC by
+// sending node updates and making sure they are reflected in subsequent Watch
+// events.
+func TestClusterUpdateNodeStatus(t *testing.T) {
+ cl := fakeLeader(t)
+ defer cl.cancel()
+
+ curator := ipb.NewCuratorClient(cl.localNodeConn)
+
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+
+ // Retrieve initial node data, it should have no status set.
+ value, err := curator.Watch(ctx, &ipb.WatchRequest{
+ Kind: &ipb.WatchRequest_NodeInCluster_{
+ NodeInCluster: &ipb.WatchRequest_NodeInCluster{
+ NodeId: cl.localNodeID,
+ },
+ },
+ })
+ if err != nil {
+ t.Fatalf("Could not request node watch: %v", err)
+ }
+ ev, err := value.Recv()
+ if err != nil {
+ t.Fatalf("Could not receive initial node value: %v", err)
+ }
+ if status := ev.Nodes[0].Status; status != nil {
+ t.Errorf("Initial node value contains status, should be nil: %+v", status)
+ }
+
+ // Update status...
+ _, err = curator.UpdateNodeStatus(ctx, &ipb.UpdateNodeStatusRequest{
+ NodeId: cl.localNodeID,
+ Status: &cpb.NodeStatus{
+ ExternalAddress: "192.0.2.10",
+ },
+ })
+ if err != nil {
+ t.Fatalf("UpdateNodeStatus: %v", err)
+ }
+
+ // ... and expect it to be reflected in the new node value.
+ for {
+ ev, err = value.Recv()
+ if err != nil {
+ t.Fatalf("Could not receive second node value: %v", err)
+ }
+ // Keep waiting until we get a status.
+ status := ev.Nodes[0].Status
+ if status == nil {
+ continue
+ }
+ if want, got := "192.0.2.10", status.ExternalAddress; want != got {
+ t.Errorf("Wanted external address %q, got %q", want, got)
+ }
+ break
+ }
+
+ // Expect updating some other node's ID to fail.
+ _, err = curator.UpdateNodeStatus(ctx, &ipb.UpdateNodeStatusRequest{
+ NodeId: cl.otherNodeID,
+ Status: &cpb.NodeStatus{
+ ExternalAddress: "192.0.2.10",
+ },
+ })
+ if err == nil {
+ t.Errorf("UpdateNodeStatus for other node (%q vs local %q) succeeded, should have failed", cl.localNodeID, cl.otherNodeID)
+ }
+}
diff --git a/metropolis/node/core/curator/listener.go b/metropolis/node/core/curator/listener.go
index d21d951..a190167 100644
--- a/metropolis/node/core/curator/listener.go
+++ b/metropolis/node/core/curator/listener.go
@@ -358,3 +358,12 @@
})
return
}
+
+func (l *listener) UpdateNodeStatus(ctx context.Context, req *cpb.UpdateNodeStatusRequest) (res *cpb.UpdateNodeStatusResponse, err error) {
+ err = l.callImpl(ctx, func(ctx context.Context, impl rpc.ClusterExternalServices) error {
+ var err2 error
+ res, err2 = impl.UpdateNodeStatus(ctx, req)
+ return err2
+ })
+ return
+}
diff --git a/metropolis/node/core/curator/proto/api/api.proto b/metropolis/node/core/curator/proto/api/api.proto
index da3e3c3..ab78ba6 100644
--- a/metropolis/node/core/curator/proto/api/api.proto
+++ b/metropolis/node/core/curator/proto/api/api.proto
@@ -46,6 +46,14 @@
need: PERMISSION_READ_CLUSTER_STATUS;
};
}
+ // UpdateNodestatus is called by nodes in the cluster to report their own
+ // status. This status is recorded by the curator and can be retrieved via
+ // Watch.
+ rpc UpdateNodeStatus(UpdateNodeStatusRequest) returns (UpdateNodeStatusResponse) {
+ option (metropolis.proto.ext.authorization) = {
+ need: PERMISSION_UPDATE_NODE_SELF;
+ };
+ }
}
// Node is the state and configuration of a node in the cluster.
@@ -54,6 +62,8 @@
string id = 1;
// Roles that the nodes is supposed to take on.
metropolis.proto.common.NodeRoles roles = 2;
+ // Last reported status of the node, if available.
+ metropolis.proto.common.NodeStatus status = 3;
};
// WatchRequest specifies what data the caller is interested in. This influences
@@ -81,3 +91,15 @@
// filter out anything spurious.
repeated Node nodes = 1;
}
+
+message UpdateNodeStatusRequest {
+ // node_id is the Metropolis node identity string of the node for which to
+ // set a new status. This currently must be the same node as the one
+ // performing the RPC and is included for safety.
+ string node_id = 1;
+ // status to be set. All fields are overwritten.
+ metropolis.proto.common.NodeStatus status = 2;
+}
+
+message UpdateNodeStatusResponse {
+}
diff --git a/metropolis/node/core/curator/proto/private/storage.proto b/metropolis/node/core/curator/proto/private/storage.proto
index 3208cdb..c8b6b79 100644
--- a/metropolis/node/core/curator/proto/private/storage.proto
+++ b/metropolis/node/core/curator/proto/private/storage.proto
@@ -24,6 +24,8 @@
// The node's intended roles when running.
metropolis.proto.common.NodeRoles roles = 4;
+
+ metropolis.proto.common.NodeStatus status = 5;
}
// Information about the cluster owner, currently the only Metropolis management
diff --git a/metropolis/node/core/curator/state_node.go b/metropolis/node/core/curator/state_node.go
index e0763c4..9a84bb7 100644
--- a/metropolis/node/core/curator/state_node.go
+++ b/metropolis/node/core/curator/state_node.go
@@ -64,6 +64,8 @@
// cluster. See //metropolis/proto:common.proto for more information.
state cpb.NodeState
+ status *cpb.NodeStatus
+
// A Node can have multiple Roles. Each Role is represented by the presence
// of NodeRole* structures in this structure, with a nil pointer
// representing the lack of a role.
@@ -131,6 +133,7 @@
PublicKey: n.pubkey,
FsmState: n.state,
Roles: &cpb.NodeRoles{},
+ Status: n.status,
}
if n.kubernetesWorker != nil {
msg.Roles.KubernetesWorker = &cpb.NodeRoles_KubernetesWorker{}
@@ -147,6 +150,7 @@
clusterUnlockKey: msg.ClusterUnlockKey,
pubkey: msg.PublicKey,
state: msg.FsmState,
+ status: msg.Status,
}
if msg.Roles.KubernetesWorker != nil {
n.kubernetesWorker = &NodeRoleKubernetesWorker{}
diff --git a/metropolis/node/core/rpc/server.go b/metropolis/node/core/rpc/server.go
index 70a0a74..0e823f4 100644
--- a/metropolis/node/core/rpc/server.go
+++ b/metropolis/node/core/rpc/server.go
@@ -12,6 +12,7 @@
// services, either locally or remotely.
nodePermissions = Permissions{
epb.Permission_PERMISSION_READ_CLUSTER_STATUS: true,
+ epb.Permission_PERMISSION_UPDATE_NODE_SELF: true,
}
)