m/n/c/curator: implement Watching NodesInCluster
This pipes etcd ranged watchers for nodes into a Curator RPC. This is to
be used by systems that need to compile information based on all/some
nodes in the cluster, eg. when building a cluster directory or hosts
file with DNS mappings.
The existence of both NodeInCluster and NodesInCluster could be argued
as unnecessary, and it might make sense to merge NodeInCluster
functionality into NodesInCluster with a filter-by-node-id field. We
should consider doing this once the dust settles.
We also use this opportunity to write tests for Node{,s}InCluster.
Change-Id: I544657b1bfe266a37230760236510024c6007c24
Reviewed-on: https://review.monogon.dev/c/monogon/+/420
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/node/core/curator/BUILD.bazel b/metropolis/node/core/curator/BUILD.bazel
index 8eb9a49..3d394e4 100644
--- a/metropolis/node/core/curator/BUILD.bazel
+++ b/metropolis/node/core/curator/BUILD.bazel
@@ -55,6 +55,7 @@
deps = [
"//metropolis/node/core/consensus/client:go_default_library",
"//metropolis/node/core/curator/proto/api:go_default_library",
+ "//metropolis/node/core/curator/proto/private:go_default_library",
"//metropolis/node/core/identity:go_default_library",
"//metropolis/node/core/localstorage:go_default_library",
"//metropolis/node/core/localstorage/declarative:go_default_library",
@@ -69,5 +70,6 @@
"@org_golang_google_grpc//codes:go_default_library",
"@org_golang_google_grpc//status:go_default_library",
"@org_golang_google_grpc//test/bufconn:go_default_library",
+ "@org_golang_google_protobuf//proto:go_default_library",
],
)
diff --git a/metropolis/node/core/curator/impl_leader_curator.go b/metropolis/node/core/curator/impl_leader_curator.go
index 30324ca..3c2a87c 100644
--- a/metropolis/node/core/curator/impl_leader_curator.go
+++ b/metropolis/node/core/curator/impl_leader_curator.go
@@ -2,19 +2,20 @@
import (
"context"
+ "fmt"
"go.etcd.io/etcd/clientv3"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
- cpb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+ ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/rpc"
"source.monogon.dev/metropolis/pkg/event/etcd"
)
-// leaderCurator implements the Curator gRPC API (cpb.Curator) as a curator
+// leaderCurator implements the Curator gRPC API (ipb.Curator) as a curator
// leader.
type leaderCurator struct {
*leadership
@@ -23,32 +24,49 @@
// Watch returns a stream of updates concerning some part of the cluster
// managed by the curator.
//
-// See metropolis.node.core.curator.proto.api.Curator for more information.
-func (l *leaderCurator) Watch(req *cpb.WatchRequest, srv cpb.Curator_WatchServer) error {
- nic, ok := req.Kind.(*cpb.WatchRequest_NodeInCluster_)
- if !ok {
+// See metropolis.node.core.curator.proto.api.Curator for more information about
+// the RPC semantics.
+//
+// TODO(q3k): Currently the watch RPCs are individually backed by etcd cluster
+// watches (via individual etcd event values), which might be problematic in
+// case of a significant amount of parallel Watches being issued to the Curator.
+// It might make sense to combine all pending Watch requests into a single watch
+// issued to the cluster, with an intermediary caching stage within the curator
+// instance. However, that is effectively implementing etcd learner/relay logic,
+// which has has to be carefully considered, especially with regards to serving
+// stale data.
+func (l *leaderCurator) Watch(req *ipb.WatchRequest, srv ipb.Curator_WatchServer) error {
+ switch x := req.Kind.(type) {
+ case *ipb.WatchRequest_NodeInCluster_:
+ return l.watchNodeInCluster(x.NodeInCluster, srv)
+ case *ipb.WatchRequest_NodesInCluster_:
+ return l.watchNodesInCluster(x.NodesInCluster, srv)
+ default:
return status.Error(codes.Unimplemented, "unsupported watch kind")
}
- nodeID := nic.NodeInCluster.NodeId
+}
+
+// watchNodeInCluster implements the Watch API when dealing with a single
+// node-in-cluster request. Effectively, it pipes an etcd value watcher into the
+// Watch API.
+func (l *leaderCurator) watchNodeInCluster(nic *ipb.WatchRequest_NodeInCluster, srv ipb.Curator_WatchServer) error {
+ ctx := srv.Context()
+
// Constructing arbitrary etcd path: this is okay, as we only have node objects
// underneath the NodeEtcdPrefix. Worst case an attacker can do is request a node
// that doesn't exist, and that will just hang . All access is privileged, so
// there's also no need to filter anything.
- // TODO(q3k): formalize and strongly type etcd paths for cluster state?
- // Probably worth waiting for type parameters before attempting to do that.
- nodePath, err := nodeEtcdPrefix.Key(nodeID)
+ nodePath, err := nodeEtcdPrefix.Key(nic.NodeId)
if err != nil {
return status.Errorf(codes.InvalidArgument, "invalid node name: %v", err)
}
+ value := etcd.NewValue(l.etcd, nodePath, nodeValueConverter)
- value := etcd.NewValue(l.etcd, nodePath, func(_, data []byte) (interface{}, error) {
- return nodeUnmarshal(data)
- })
w := value.Watch()
defer w.Close()
for {
- v, err := w.Get(srv.Context())
+ v, err := w.Get(ctx)
if err != nil {
if rpcErr, ok := rpcError(err); ok {
return rpcErr
@@ -56,26 +74,139 @@
// TODO(q3k): log err
return status.Error(codes.Unavailable, "internal error")
}
- node := v.(*Node)
- ev := &cpb.WatchEvent{
- Nodes: []*cpb.Node{
- {
- Id: node.ID(),
- Roles: node.proto().Roles,
- Status: node.status,
- },
- },
- }
+
+ ev := &ipb.WatchEvent{}
+ nodeKV := v.(nodeAtID)
+ nodeKV.appendToEvent(ev)
if err := srv.Send(ev); err != nil {
return err
}
}
}
+// watchNodesInCluster implements the Watch API when dealing with a
+// all-nodes-in-cluster request. Effectively, it pipes a ranged etcd value
+// watcher into the Watch API.
+func (l *leaderCurator) watchNodesInCluster(_ *ipb.WatchRequest_NodesInCluster, srv ipb.Curator_WatchServer) error {
+ ctx := srv.Context()
+
+ start, end := nodeEtcdPrefix.KeyRange()
+ value := etcd.NewValue(l.etcd, start, nodeValueConverter, etcd.Range(end))
+
+ w := value.Watch()
+ defer w.Close()
+
+ // Perform initial fetch from etcd.
+ nodes := make(map[string]*Node)
+ for {
+ v, err := w.Get(ctx, etcd.BacklogOnly)
+ if err == etcd.BacklogDone {
+ break
+ }
+ if err != nil {
+ // TODO(q3k): log err
+ return status.Error(codes.Unavailable, "internal error during initial fetch")
+ }
+ nodeKV := v.(nodeAtID)
+ if nodeKV.value != nil {
+ nodes[nodeKV.id] = nodeKV.value
+ }
+ }
+
+ // Initial send, chunked to not go over 2MiB (half of the default gRPC message
+ // size limit).
+ //
+ // TODO(q3k): formalize message limits, set const somewhere.
+ we := &ipb.WatchEvent{}
+ for _, n := range nodes {
+ we.Nodes = append(we.Nodes, &ipb.Node{
+ Id: n.ID(),
+ Roles: n.proto().Roles,
+ Status: n.status,
+ })
+ if proto.Size(we) > (2 << 20) {
+ if err := srv.Send(we); err != nil {
+ return err
+ }
+ we = &ipb.WatchEvent{}
+ }
+ }
+ // Send last update message. This might be empty, but we need to send the
+ // LAST_BACKLOGGED marker.
+ we.Progress = ipb.WatchEvent_PROGRESS_LAST_BACKLOGGED
+ if err := srv.Send(we); err != nil {
+ return err
+ }
+
+ // Send updates as they arrive from etcd watcher.
+ for {
+ v, err := w.Get(ctx)
+ if err != nil {
+ // TODO(q3k): log err
+ return status.Errorf(codes.Unavailable, "internal error during update")
+ }
+ we := &ipb.WatchEvent{}
+ nodeKV := v.(nodeAtID)
+ nodeKV.appendToEvent(we)
+ if err := srv.Send(we); err != nil {
+ return err
+ }
+ }
+}
+
+// nodeAtID is a key/pair container for a node update received from an etcd
+// watcher. The value will be nil if this update represents a node being
+// deleted.
+type nodeAtID struct {
+ id string
+ value *Node
+}
+
+// nodeValueConverter is called by etcd node value watchers to convert updates
+// from the cluster into nodeAtID, ensuring data integrity and checking
+// invariants.
+func nodeValueConverter(key, value []byte) (interface{}, error) {
+ res := nodeAtID{
+ id: nodeEtcdPrefix.ExtractID(string(key)),
+ }
+ if len(value) > 0 {
+ node, err := nodeUnmarshal(value)
+ if err != nil {
+ return nil, err
+ }
+ res.value = node
+ if res.id != res.value.ID() {
+ return nil, fmt.Errorf("node ID mismatch (etcd key: %q, value: %q)", res.id, res.value.ID())
+ }
+ }
+ if res.id == "" {
+ // This shouldn't happen, to the point where this might be better handled by a
+ // panic.
+ return nil, fmt.Errorf("invalid node key %q", key)
+ }
+ return res, nil
+}
+
+// appendToId records a node update represented by nodeAtID into a Curator
+// WatchEvent, either a Node or NodeTombstone.
+func (kv nodeAtID) appendToEvent(ev *ipb.WatchEvent) {
+ if node := kv.value; node != nil {
+ ev.Nodes = append(ev.Nodes, &ipb.Node{
+ Id: node.ID(),
+ Roles: node.proto().Roles,
+ Status: node.status,
+ })
+ } else {
+ ev.NodeTombstones = append(ev.NodeTombstones, &ipb.WatchEvent_NodeTombstone{
+ NodeId: kv.id,
+ })
+ }
+}
+
// UpdateNodeStatus is called by nodes in the cluster to report their own
// status. This status is recorded by the curator and can be retrieed via
// Watch.
-func (l *leaderCurator) UpdateNodeStatus(ctx context.Context, req *cpb.UpdateNodeStatusRequest) (*cpb.UpdateNodeStatusResponse, error) {
+func (l *leaderCurator) UpdateNodeStatus(ctx context.Context, req *ipb.UpdateNodeStatusRequest) (*ipb.UpdateNodeStatusResponse, error) {
// Ensure that the given node_id matches the calling node. We currently
// only allow for direct self-reporting of status by nodes.
pi := rpc.GetPeerInfo(ctx)
@@ -135,5 +266,5 @@
return nil, status.Errorf(codes.Unavailable, "could not update node: %v", err)
}
- return &cpb.UpdateNodeStatusResponse{}, nil
+ return &ipb.UpdateNodeStatusResponse{}, nil
}
diff --git a/metropolis/node/core/curator/impl_leader_test.go b/metropolis/node/core/curator/impl_leader_test.go
index 0b87892..23d24ed 100644
--- a/metropolis/node/core/curator/impl_leader_test.go
+++ b/metropolis/node/core/curator/impl_leader_test.go
@@ -4,15 +4,19 @@
"bytes"
"context"
"crypto/ed25519"
+ "crypto/rand"
"encoding/hex"
"testing"
"go.etcd.io/etcd/integration"
"google.golang.org/grpc"
"google.golang.org/grpc/test/bufconn"
+ "google.golang.org/protobuf/proto"
"source.monogon.dev/metropolis/node/core/consensus/client"
ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+ ppb "source.monogon.dev/metropolis/node/core/curator/proto/private"
+ "source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/rpc"
apb "source.monogon.dev/metropolis/proto/api"
cpb "source.monogon.dev/metropolis/proto/common"
@@ -133,6 +137,7 @@
otherNodeID: ephemeral.Nodes[1].ID(),
caPubKey: ephemeral.CA.PublicKey.(ed25519.PublicKey),
cancel: ctxC,
+ etcd: cl,
}
}
@@ -155,6 +160,267 @@
caPubKey ed25519.PublicKey
// cancel shuts down the fake leader and all client connections.
cancel context.CancelFunc
+ // etcd contains a low-level connection to the curator K/V store, which can be
+ // used to perform low-level changes to the store in tests.
+ etcd client.Namespaced
+}
+
+// TestWatchNodeInCluster exercises a NodeInCluster Watch, from node creation,
+// through updates, to its deletion.
+func TestWatchNodeInCluster(t *testing.T) {
+ cl := fakeLeader(t)
+ defer cl.cancel()
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+
+ cur := ipb.NewCuratorClient(cl.localNodeConn)
+
+ // We'll be using a fake node throughout, manually updating it in the etcd
+ // cluster.
+ fakeNodePub, _, err := ed25519.GenerateKey(rand.Reader)
+ if err != nil {
+ t.Fatalf("GenerateKey: %v", err)
+ }
+ fakeNodeID := identity.NodeID(fakeNodePub)
+ fakeNodeKey, _ := nodeEtcdPrefix.Key(fakeNodeID)
+
+ w, err := cur.Watch(ctx, &ipb.WatchRequest{
+ Kind: &ipb.WatchRequest_NodeInCluster_{
+ NodeInCluster: &ipb.WatchRequest_NodeInCluster{
+ NodeId: fakeNodeID,
+ },
+ },
+ })
+ if err != nil {
+ t.Fatalf("Watch: %v", err)
+ }
+
+ // Recv() should block here, as we don't yet have a node in the cluster. We
+ // can't really test that reliably, unfortunately.
+
+ // Populate new node.
+ fakeNode := &ppb.Node{
+ PublicKey: fakeNodePub,
+ Roles: &cpb.NodeRoles{},
+ }
+ fakeNodeInit, err := proto.Marshal(fakeNode)
+ if err != nil {
+ t.Fatalf("Marshal: %v", err)
+ }
+ _, err = cl.etcd.Put(ctx, fakeNodeKey, string(fakeNodeInit))
+ if err != nil {
+ t.Fatalf("Put: %v", err)
+ }
+
+ // Receive cluster node status. This should go through immediately.
+ ev, err := w.Recv()
+ if err != nil {
+ t.Fatalf("Recv: %v", err)
+ }
+ if want, got := 1, len(ev.Nodes); want != got {
+ t.Errorf("wanted %d nodes, got %d", want, got)
+ } else {
+ n := ev.Nodes[0]
+ if want, got := fakeNodeID, n.Id; want != got {
+ t.Errorf("wanted node %q, got %q", want, got)
+ }
+ if n.Status != nil {
+ t.Errorf("wanted nil status, got %v", n.Status)
+ }
+ }
+
+ // Update node status. This should trigger an update from the watcher.
+ fakeNode.Status = &cpb.NodeStatus{
+ ExternalAddress: "203.0.113.42",
+ }
+ fakeNodeInit, err = proto.Marshal(fakeNode)
+ if err != nil {
+ t.Fatalf("Marshal: %v", err)
+ }
+ _, err = cl.etcd.Put(ctx, fakeNodeKey, string(fakeNodeInit))
+ if err != nil {
+ t.Fatalf("Put: %v", err)
+ }
+
+ // Receive new node. This should go through immediately.
+ ev, err = w.Recv()
+ if err != nil {
+ t.Fatalf("Recv: %v", err)
+ }
+ if want, got := 1, len(ev.Nodes); want != got {
+ t.Errorf("wanted %d nodes, got %d", want, got)
+ } else {
+ n := ev.Nodes[0]
+ if want, got := fakeNodeID, n.Id; want != got {
+ t.Errorf("wanted node %q, got %q", want, got)
+ }
+ if want := "203.0.113.42"; n.Status == nil || n.Status.ExternalAddress != want {
+ t.Errorf("wanted status with ip address %q, got %v", want, n.Status)
+ }
+ }
+
+ // Remove node. This should trigger an update from the watcher.
+ k, _ := nodeEtcdPrefix.Key(fakeNodeID)
+ if _, err := cl.etcd.Delete(ctx, k); err != nil {
+ t.Fatalf("could not delete node from etcd: %v", err)
+ }
+ ev, err = w.Recv()
+ if err != nil {
+ t.Fatalf("Recv: %v", err)
+ }
+ if want, got := 1, len(ev.NodeTombstones); want != got {
+ t.Errorf("wanted %d node tombstoness, got %d", want, got)
+ } else {
+ n := ev.NodeTombstones[0]
+ if want, got := fakeNodeID, n.NodeId; want != got {
+ t.Errorf("wanted node %q, got %q", want, got)
+ }
+ }
+}
+
+// TestWatchNodeInCluster exercises a NodesInCluster Watch, from node creation,
+// through updates, to not deletion.
+func TestWatchNodesInCluster(t *testing.T) {
+ cl := fakeLeader(t)
+ defer cl.cancel()
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+
+ cur := ipb.NewCuratorClient(cl.localNodeConn)
+
+ w, err := cur.Watch(ctx, &ipb.WatchRequest{
+ Kind: &ipb.WatchRequest_NodesInCluster_{
+ NodesInCluster: &ipb.WatchRequest_NodesInCluster{},
+ },
+ })
+ if err != nil {
+ t.Fatalf("Watch: %v", err)
+ }
+
+ nodes := make(map[string]*ipb.Node)
+ syncNodes := func() *ipb.WatchEvent {
+ t.Helper()
+ ev, err := w.Recv()
+ if err != nil {
+ t.Fatalf("Recv: %v", err)
+ }
+ for _, n := range ev.Nodes {
+ n := n
+ nodes[n.Id] = n
+ }
+ for _, nt := range ev.NodeTombstones {
+ delete(nodes, nt.NodeId)
+ }
+ return ev
+ }
+
+ // Retrieve initial node fetch. This should yield one node.
+ for {
+ ev := syncNodes()
+ if ev.Progress == ipb.WatchEvent_PROGRESS_LAST_BACKLOGGED {
+ break
+ }
+ }
+ if n := nodes[cl.localNodeID]; n == nil || n.Id != cl.localNodeID {
+ t.Errorf("Expected node %q to be present, got %v", cl.localNodeID, nodes[cl.localNodeID])
+ }
+ if len(nodes) != 1 {
+ t.Errorf("Expected exactly one node, got %d", len(nodes))
+ }
+
+ // Update the node status and expect a corresponding WatchEvent.
+ _, err = cur.UpdateNodeStatus(ctx, &ipb.UpdateNodeStatusRequest{
+ NodeId: cl.localNodeID,
+ Status: &cpb.NodeStatus{
+ ExternalAddress: "203.0.113.43",
+ },
+ })
+ if err != nil {
+ t.Fatalf("UpdateNodeStatus: %v", err)
+ }
+ for {
+ syncNodes()
+ n := nodes[cl.localNodeID]
+ if n == nil {
+ continue
+ }
+ if n.Status == nil || n.Status.ExternalAddress != "203.0.113.43" {
+ continue
+ }
+ break
+ }
+
+ // Add a new (fake) node, and expect a corresponding WatchEvent.
+ fakeNodePub, _, err := ed25519.GenerateKey(rand.Reader)
+ if err != nil {
+ t.Fatalf("GenerateKey: %v", err)
+ }
+ fakeNodeID := identity.NodeID(fakeNodePub)
+ fakeNodeKey, _ := nodeEtcdPrefix.Key(fakeNodeID)
+ fakeNode := &ppb.Node{
+ PublicKey: fakeNodePub,
+ Roles: &cpb.NodeRoles{},
+ }
+ fakeNodeInit, err := proto.Marshal(fakeNode)
+ if err != nil {
+ t.Fatalf("Marshal: %v", err)
+ }
+ _, err = cl.etcd.Put(ctx, fakeNodeKey, string(fakeNodeInit))
+ if err != nil {
+ t.Fatalf("Put: %v", err)
+ }
+
+ for {
+ syncNodes()
+ n := nodes[fakeNodeID]
+ if n == nil {
+ continue
+ }
+ if n.Id != fakeNodeID {
+ t.Errorf("Wanted faked node ID %q, got %q", fakeNodeID, n.Id)
+ }
+ break
+ }
+
+ // Re-open watcher, resynchronize, expect two nodes to be present.
+ nodes = make(map[string]*ipb.Node)
+ w, err = cur.Watch(ctx, &ipb.WatchRequest{
+ Kind: &ipb.WatchRequest_NodesInCluster_{
+ NodesInCluster: &ipb.WatchRequest_NodesInCluster{},
+ },
+ })
+ if err != nil {
+ t.Fatalf("Watch: %v", err)
+ }
+ for {
+ ev := syncNodes()
+ if ev.Progress == ipb.WatchEvent_PROGRESS_LAST_BACKLOGGED {
+ break
+ }
+ }
+ if n := nodes[cl.localNodeID]; n == nil || n.Status == nil || n.Status.ExternalAddress != "203.0.113.43" {
+ t.Errorf("Node %q should exist and have external address, got %v", cl.localNodeID, n)
+ }
+ if n := nodes[fakeNodeID]; n == nil {
+ t.Errorf("Node %q should exist, got %v", fakeNodeID, n)
+ }
+ if len(nodes) != 2 {
+ t.Errorf("Exptected two nodes in map, got %d", len(nodes))
+ }
+
+ // Remove fake node, expect it to be removed from synced map.
+ k, _ := nodeEtcdPrefix.Key(fakeNodeID)
+ if _, err := cl.etcd.Delete(ctx, k); err != nil {
+ t.Fatalf("could not delete node from etcd: %v", err)
+ }
+
+ for {
+ syncNodes()
+ n := nodes[fakeNodeID]
+ if n == nil {
+ break
+ }
+ }
}
// TestManagementRegisterTicket exercises the Management.GetRegisterTicket RPC.
diff --git a/metropolis/node/core/curator/proto/api/api.proto b/metropolis/node/core/curator/proto/api/api.proto
index ab78ba6..8bf35df 100644
--- a/metropolis/node/core/curator/proto/api/api.proto
+++ b/metropolis/node/core/curator/proto/api/api.proto
@@ -74,14 +74,27 @@
// the state of the node and the cluster are for purposes of
// starting/stopping services, performing software updates and general node
// lifecycle management.
+ //
+ // If the requested node is not yet present in the cluster, the Watch will
+ // block until it is available. If a node is then deleted, a tombstone will
+ // be returned and the call Watch will block forever.
message NodeInCluster {
// node_id that the watcher is interested in. The curator will, best
// effort, stream updates (not necessarily all updates) to this node
// within WatchEvents.
string node_id = 1;
}
+ // The watcher wants information about all the nodes in the cluster. This
+ // is designed to be used by node-local code that needs to know the state
+ // of all the nodes within the cluster, for purposes of building aggregate
+ // views of the cluster, eg. the addresses of all nodes or a list of nodes
+ // fitting some criterion. With time, this call might offer filter
+ // functionality to perform some of this filtering server-side.
+ message NodesInCluster {
+ }
oneof kind {
NodeInCluster node_in_cluster = 1;
+ NodesInCluster nodes_in_cluster = 2;
}
}
@@ -90,6 +103,28 @@
// contain just the nodes requested in WatchRequest, so the client needs to
// filter out anything spurious.
repeated Node nodes = 1;
+ // Node tombstones, a list of node IDs that have been removed from the
+ // cluster since the last sent WatchEvent. For any node in this list, the
+ // watcher should perform logic to remove that node from its current state.
+ message NodeTombstone {
+ string node_id = 1;
+ }
+ repeated NodeTombstone node_tombstones = 3;
+
+ // Progress of the watch stream. This is set for any event which fulfills
+ // some criterion within the context of the watch stream, and is unspecified
+ // otherwise.
+ enum Progress {
+ PROGRESS_UNSPECIFIED = 0;
+ // This event contains the last backlogged data from the watcher: all
+ // data pertinent to the request that is already known to the server
+ // has been returned, and subsequent event receives will block until new
+ // data is available. This will be set on exactly one WatchEvent from
+ // a NodesInCluster RPC, its behaviour is not defined for other Watch
+ // RPCs.
+ PROGRESS_LAST_BACKLOGGED = 1;
+ }
+ Progress progress = 2;
}
message UpdateNodeStatusRequest {
diff --git a/metropolis/node/core/curator/state.go b/metropolis/node/core/curator/state.go
index d460e4f..0db9af5 100644
--- a/metropolis/node/core/curator/state.go
+++ b/metropolis/node/core/curator/state.go
@@ -67,6 +67,9 @@
// Key returns the key for an item within an etcdPrefix, or an error if the
// given ID is invalid (ie. contains a / character).
func (p etcdPrefix) Key(id string) (string, error) {
+ if id == "" {
+ return "", fmt.Errorf("invalid id: cannot be empty")
+ }
if strings.Contains(id, "/") {
return "", fmt.Errorf("invalid id: cannot contain / character")
}
@@ -74,12 +77,35 @@
return "/" + strings.Join(path, "/"), nil
}
+// keyRange returns a pair of [start, end) keys for use with etcd range queries
+// to retrieve all keys under a given prefix.
+func (p etcdPrefix) KeyRange() (start, end string) {
+ // Range from /foo/bar/ ... to /foo/bar0 ('0' is one ASCII codepoint after '/').
+ start = "/" + strings.Join(p.parts, "/") + "/"
+ end = "/" + strings.Join(p.parts, "/") + "0"
+ return
+}
+
// Range returns an etcd clientv3.Op that represents a Range Get request over
// all the keys within this etcdPrefix.
func (p etcdPrefix) Range() clientv3.Op {
- // Range from /foo/bar/ ... to /foo/bar0 ('0' is one ASCII codepoint after '/').
- start := "/" + strings.Join(p.parts, "/") + "/"
- end := "/" + strings.Join(p.parts, "/") + "0"
-
+ start, end := p.KeyRange()
return clientv3.OpGet(start, clientv3.WithRange(end))
}
+
+// ExtractID carves out the etcdPrefix ID from an existing etcd key for this
+// range, ie. is the reverse of the Key() function.
+//
+// If the given etcd key does not correspond to a valid ID, an empty string is
+// returned.
+func (p etcdPrefix) ExtractID(key string) string {
+ strPrefix := "/" + strings.Join(p.parts, "/") + "/"
+ if !strings.HasPrefix(key, strPrefix) {
+ return ""
+ }
+ id := key[len(strPrefix):]
+ if strings.Contains(id, "/") {
+ return ""
+ }
+ return id
+}
diff --git a/metropolis/node/core/curator/state_test.go b/metropolis/node/core/curator/state_test.go
index 8b8152a..0bee7e1 100644
--- a/metropolis/node/core/curator/state_test.go
+++ b/metropolis/node/core/curator/state_test.go
@@ -42,7 +42,11 @@
t.Errorf("Wrong key, wanted %q, got %q", want, got)
}
- // Test Key() with invalid ID.
+ // Test Key() with invalid IDs.
+ _, err = p.Key("")
+ if err == nil {
+ t.Error("Key(bar/baz) returned nil, wanted error")
+ }
_, err = p.Key("bar/baz")
if err == nil {
t.Error("Key(bar/baz) returned nil, wanted error")
@@ -57,3 +61,25 @@
t.Errorf("Wrong end key, wanted %q, got %q", want, got)
}
}
+
+func TestEtcdPrefixExtractID(t *testing.T) {
+ p := mustNewEtcdPrefix("/foo/")
+
+ for i, te := range []struct {
+ key string
+ want string
+ }{
+ {"/foo/", ""},
+ {"/foo0", ""},
+ {"/foo", ""},
+ {"bar", ""},
+
+ {"/foo/bar", "bar"},
+ {"/foo/bar/baz", ""},
+ } {
+ got := p.ExtractID(te.key)
+ if te.want != got {
+ t.Errorf("%d: ExtractID(%q) should have returned %q, got %q", i, te.key, te.want, got)
+ }
+ }
+}