m/n/c/curator/watcher: init

curator/watcher is a new library which simplifies access to the Curator
from client code.

It implements common logic found in a bunch of Metropolis code, where a
node or multiple nodes are watched for updates. The single-node version
of this RPC is not that complex, but the many-node version is and
definitely deserves a client library to remove possible sources of
implementation bugs.

This doesn't yet switch over any of the existing code to use this
library - that will come up in another CR on the stack.

Change-Id: Ia1df6f21473f86cb9af1e962d157350a5cfd7d07
Reviewed-on: https://review.monogon.dev/c/monogon/+/2262
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/node/core/curator/watcher/BUILD.bazel b/metropolis/node/core/curator/watcher/BUILD.bazel
new file mode 100644
index 0000000..7db4e52
--- /dev/null
+++ b/metropolis/node/core/curator/watcher/BUILD.bazel
@@ -0,0 +1,12 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library")
+
+go_library(
+    name = "watcher",
+    srcs = [
+        "watch_node.go",
+        "watch_nodes.go",
+    ],
+    importpath = "source.monogon.dev/metropolis/node/core/curator/watcher",
+    visibility = ["//visibility:public"],
+    deps = ["//metropolis/node/core/curator/proto/api"],
+)
diff --git a/metropolis/node/core/curator/watcher/watch_node.go b/metropolis/node/core/curator/watcher/watch_node.go
new file mode 100644
index 0000000..f71940e
--- /dev/null
+++ b/metropolis/node/core/curator/watcher/watch_node.go
@@ -0,0 +1,89 @@
+package watcher
+
+import (
+	"context"
+	"fmt"
+
+	ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+)
+
+// WatchNode runs a WatchRequest for NodeInCluster with the given Curator
+// channel. The returned Watcher can then be queries in a for loop to detect
+// changes to the targeted node.
+func WatchNode(ctx context.Context, cur ipb.CuratorClient, nid string) *Watcher {
+	wa, err := cur.Watch(ctx, &ipb.WatchRequest{
+		Kind: &ipb.WatchRequest_NodeInCluster_{
+			NodeInCluster: &ipb.WatchRequest_NodeInCluster{
+				NodeId: nid,
+			},
+		},
+	})
+	if err != nil {
+		return &Watcher{
+			err: fmt.Errorf("could not watch node: %w", err),
+		}
+	}
+
+	return &Watcher{
+		wa: wa,
+	}
+}
+
+// Watcher returned by WatchNode. Must be closed by calling Close().
+type Watcher struct {
+	err error
+	wa  ipb.Curator_WatchClient
+	ev  *ipb.WatchEvent
+	ix  int
+}
+
+// Close RPC call associted with this Watcher. Must be called at least once after
+// the Watcher is not used anymore.
+func (w *Watcher) Close() {
+	if w.wa != nil {
+		w.wa.CloseSend()
+		w.wa = nil
+	}
+}
+
+// Next returns true if the next call to Node() is valid, false otherwise. Each
+// call to Next blocks until an update to the node data is available.
+//
+// If false is returned, Error() should be called to get to the underlying error
+// which caused this call to fail.
+func (w *Watcher) Next() bool {
+	if w.err != nil {
+		w.Close()
+		return false
+	}
+	if w.wa == nil {
+		w.err = fmt.Errorf("watcher closed")
+		return false
+	}
+
+	w.ix += 1
+	if w.ev == nil || w.ix >= len(w.ev.Nodes) {
+		ev, err := w.wa.Recv()
+		if err != nil {
+			w.err = err
+			return false
+		}
+		w.ev = ev
+		w.ix = 0
+	}
+	return true
+}
+
+// Returns underlying error for this Watcher, nil if no error is present. After
+// an error is returned, the Watcher cannot be used anymore.
+func (w *Watcher) Error() error {
+	return w.err
+}
+
+// Node returns the cached node state for this Watcher. The same node data is
+// returned until Next() is called. The caller can hold on to the returned Node
+// pointer, as the node data will not be modified in place with updates -
+// instead, a new Node object will be returned.
+func (w *Watcher) Node() *ipb.Node {
+	return w.ev.Nodes[w.ix]
+}
diff --git a/metropolis/node/core/curator/watcher/watch_nodes.go b/metropolis/node/core/curator/watcher/watch_nodes.go
new file mode 100644
index 0000000..4c08d34
--- /dev/null
+++ b/metropolis/node/core/curator/watcher/watch_nodes.go
@@ -0,0 +1,306 @@
+package watcher
+
+import (
+	"context"
+	"fmt"
+	"slices"
+	"sort"
+
+	ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+)
+
+// nodeSet is a collection of Node data. It accumulates updates from Events
+// returned from a Curator Watch RPC.
+//
+// Node data stored within a nodeSet are immutable - any modifications to Node
+// data are performed by replacing the entirety of the structure. This means that
+// it is safe to hold on to Node pointers acquired from the nodeSet and access
+// their fields concurrently from other goroutines, but that these pointers
+// represent a snapshot in time of a node's state, and will not get updated as
+// the watched node gets updated.
+//
+// This structure is safe to use in its zero form.
+type nodeSet struct {
+	// nodes is a map from node ID to node data.
+	nodes map[string]*ipb.Node
+	// nodeNames is an ordered list of node IDs. In combination with the nodes map,
+	// it provides an 'ordered set' semantic to the nodeSet.
+	nodeNames []string
+}
+
+// clone performs a shallow copy of a nodeSet, not cloning the underlying Node
+// structures. This is fine, as Node structures stored in a nodeSet are
+// immutable.
+func (n *nodeSet) clone() *nodeSet {
+	nodes := make(map[string]*ipb.Node)
+	nodeNames := make([]string, 0, len(n.nodeNames))
+	for _, name := range n.nodeNames {
+		nodeNames = append(nodeNames, name)
+		nodes[name] = n.nodes[name]
+	}
+	return &nodeSet{
+		nodes:     nodes,
+		nodeNames: nodeNames,
+	}
+}
+
+// updateFromEvent a node set based on data received from a Curator's WatchEvent.
+//
+// The stored Nodes are not mutated, but instead are replaced with new Node
+// structures reflecting the new state of the cluster's nodes.
+func (n *nodeSet) updateFromEvent(ev *ipb.WatchEvent) {
+	if n.nodes == nil {
+		n.nodes = make(map[string]*ipb.Node)
+	}
+
+	nodesAdded := false
+	nodesDeleted := false
+
+	for _, node := range ev.Nodes {
+		// Add to nodeNames if this is the first time we see this node.
+		if _, ok := n.nodes[node.Id]; !ok {
+			n.nodeNames = append(n.nodeNames, node.Id)
+			nodesAdded = true
+		}
+		// Replace node in map.
+		n.nodes[node.Id] = node
+	}
+
+	// Delete nodes which have been tombstoned.
+	deleted := make(map[string]bool)
+	for _, node := range ev.NodeTombstones {
+		deleted[node.NodeId] = true
+		delete(n.nodes, node.NodeId)
+		nodesDeleted = true
+	}
+
+	if nodesDeleted {
+		n.nodeNames = slices.DeleteFunc(n.nodeNames, func(id string) bool { return deleted[id] })
+	}
+	if nodesAdded || nodesDeleted {
+		sort.Strings(n.nodeNames)
+	}
+}
+
+// follow updates a nodeSet from another nodeSet, but uses a Follower interface
+// to filter out nodes and call back into external systems with information about
+// node lifecycle events.
+func (n *nodeSet) follow(origin *nodeSet, f Follower) error {
+	if n.nodes == nil {
+		n.nodes = make(map[string]*ipb.Node)
+	}
+	seen := make(map[string]bool)
+	for _, name := range origin.nodeNames {
+		if !f.Filter(origin.nodes[name]) {
+			continue
+		}
+		seen[name] = true
+		if _, ok := n.nodes[name]; !ok {
+			// New node.
+			n.nodes[name] = origin.nodes[name]
+			if err := f.New(n.nodes[name]); err != nil {
+				return fmt.Errorf("new node %s: %w", name, err)
+			}
+			n.nodeNames = append(n.nodeNames, name)
+			continue
+		} else {
+			// Updated node.
+			if !f.Equals(n.nodes[name], origin.nodes[name]) {
+				if err := f.Updated(n.nodes[name], origin.nodes[name]); err != nil {
+					return fmt.Errorf("updated node %s: %w", name, err)
+				}
+			}
+			n.nodes[name] = origin.nodes[name]
+		}
+	}
+
+	for _, name := range n.nodeNames {
+		if seen[name] {
+			continue
+		}
+		if err := f.Deleted(n.nodes[name]); err != nil {
+			return fmt.Errorf("deleted node %s: %w", name, err)
+		}
+	}
+
+	if err := f.BatchDone(); err != nil {
+		return fmt.Errorf("batch done: %w", err)
+	}
+
+	n.nodeNames = slices.DeleteFunc(n.nodeNames, func(id string) bool { return !seen[id] })
+	sort.Strings(n.nodeNames)
+	return nil
+}
+
+// A Follower is some subsystem which wishes to be notified about changes to a
+// cluster's node state.
+//
+// It provides function to filter out state and state transitions that are
+// interesting to itself, and functions which will be called when the filtered
+// state changes.
+//
+// The Filter and Equals functions make up a 'view' of the cluster state from the
+// point of view of the Follower. That is, a Follower which only cares about some
+// subset of nodes and expresses said subset with Filter will only see these
+// nodes in its nodeSet and in its callbacks' calls. Similarly, updates to the
+// nodes will also be filtered out accordingly to Equals.
+//
+// A simple callback-based implementation is available in SimpleFollower.
+type Follower interface {
+	// Filter should return true if a node is of interest to the follower - when it
+	// has all required fields present and at a requested state.
+	//
+	// For example, a Follower which wishes to watch for nodes' external IP
+	// addresses would filter out all nodes which don't have an address assigned.
+	Filter(a *ipb.Node) bool
+
+	// Equals should return true if a given node's state is identical, from the point
+	// of view of the Follower, to some other state. Correctly implementing this
+	// function allows the Follower to only receive calls to New/Updated/Deleted when
+	// the node actually changed in a meaningful and actionable way.
+	//
+	// For example, a Follower which wishes to watch for nodes' external IP addresses
+	// would return true only if the two nodes' external IP addresses actually
+	// differed.
+	Equals(a *ipb.Node, b *ipb.Node) bool
+
+	// New will be called when a node has appeared from the point of view of the
+	// Follower (i.e. started existing on the cluster and then also passed the Filter
+	// function).
+	//
+	// Any returned error is considered fatal and will stop future use of the
+	// Follower, e.g. WatchNodes will return.
+	New(new *ipb.Node) error
+
+	// Updated will be called when a node has been updated from the point of view of
+	// the Follower (i.e. has not been filtered out, and Equals returned false).
+	//
+	// Any returned error is considered fatal and will stop future use of the
+	// Follower, e.g. WatchNodes will return.
+	Updated(prev *ipb.Node, new *ipb.Node) error
+
+	// Deleted will be called when a node has been removed from the point of view of
+	// the Follower (i.e. has been filtered out, or has been removed from the cluster
+	// altogether).
+	//
+	// Any returned error is considered fatal and will stop future use of the
+	// Follower, e.g. WatchNodes will return.
+	Deleted(prev *ipb.Node) error
+
+	// BatchDone will be called at the end of any batch of node updates (either New,
+	// Updated or Deleted calls). This can be used by Followers to reduce the number
+	// of mutations of an expensive resource, for example if the Nodes watch
+	// mechanism is used to feed some other stateful system which also supports
+	// batch-based updates.
+	//
+	// Just exactly how large batches are is an implementation detail of the
+	// underlying Curator watch protocol and the way update events get created by the
+	// Curator and sent over the wire.
+	//
+	// Note: BatchDone() will not be called if any of the New/Updated/Deleted
+	// implementations returned an error - the follower will be terminated
+	// immediately!
+	BatchDone() error
+}
+
+// SimpleFollower is a callback struct based implementation of a Follower, with
+// the additional collapse of New and Updated into a NewUpdated function.
+//
+// This is the simplest way to use the Follower / WatchNodes system from a
+// function.
+type SimpleFollower struct {
+	// FilterFn corresponds to Follower.Filter - see its documentation for more
+	// details.
+	FilterFn func(a *ipb.Node) bool
+	// EqualsFn corresponds to Follower.Equals - see its documentation for more
+	// details.
+	EqualsFn func(a *ipb.Node, b *ipb.Node) bool
+
+	// OnNewUpdated will be called whenever a node is updated or appears for the
+	// first time from the point of view of the Follower.
+	OnNewUpdated func(new *ipb.Node) error
+	// OnDeleted will be called whenever a node disappears from the point of view of
+	// the Follower.
+	OnDeleted func(prev *ipb.Node) error
+
+	// OnBatchDone will be called at the end of a batch of NewUpdated/Deleted calls
+	// from the underlying Curator watch mechanism.
+	OnBatchDone func() error
+}
+
+func (f SimpleFollower) Filter(a *ipb.Node) bool {
+	if f.FilterFn == nil {
+		return true
+	}
+	return f.FilterFn(a)
+}
+
+func (f SimpleFollower) Equals(a *ipb.Node, b *ipb.Node) bool {
+	return f.EqualsFn(a, b)
+}
+
+func (f SimpleFollower) New(new *ipb.Node) error {
+	if f.OnNewUpdated == nil {
+		return nil
+	}
+	return f.OnNewUpdated(new)
+}
+
+func (f SimpleFollower) Updated(_prev *ipb.Node, new *ipb.Node) error {
+	if f.OnNewUpdated == nil {
+		return nil
+	}
+	return f.OnNewUpdated(new)
+}
+
+func (f SimpleFollower) Deleted(prev *ipb.Node) error {
+	if f.OnDeleted == nil {
+		return nil
+	}
+	return f.OnDeleted(prev)
+}
+
+func (f SimpleFollower) BatchDone() error {
+	if f.OnBatchDone == nil {
+		return nil
+	}
+	return f.OnBatchDone()
+}
+
+// WatchNodes runs a WatchRequest for NodesInCluster with the given Curator
+// channel. Any updates to the state of the nodes is processed through the given
+// Follower.
+//
+// This is the main interface to follow a state of nodes in the cluster and act
+// upon any changes. SimpleFollower is given as a type to implement the simplest
+// kind of callback-driven interface to various events, but users are free to
+// implement Follower on their own.
+//
+// This function will exit with a context error whenever the given context is
+// canceled, or return with whatever error is returned by the Follower
+// implementation.
+func WatchNodes(ctx context.Context, cur ipb.CuratorClient, f Follower) error {
+	wa, err := cur.Watch(ctx, &ipb.WatchRequest{
+		Kind: &ipb.WatchRequest_NodesInCluster_{
+			NodesInCluster: &ipb.WatchRequest_NodesInCluster{},
+		},
+	})
+	if err != nil {
+		return fmt.Errorf("watch request failed: %w", err)
+	}
+	defer wa.CloseSend()
+
+	var ons nodeSet
+	var ns nodeSet
+	for {
+		ev, err := wa.Recv()
+		if err != nil {
+			return fmt.Errorf("receive failed: %w", err)
+		}
+		ons.updateFromEvent(ev)
+
+		if err := ns.follow(&ons, f); err != nil {
+			return err
+		}
+	}
+}