m/proto: Add RunningCurator to status, report in status pusher

This data allows more dynamic reporting of a node's Curator status, and
notably allows reporting which port it's running on.

We weren't planning on supporting running on non-standard ports, and we
probably still don't, but it's actually super useful to have this
ability in (future) tests.

We use the opportunity to refactor the roleserver's statuspush worker,
and to add a test for it.

Change-Id: I53322e6c8d268186ede085d4a05b646acb422a6b
Reviewed-on: https://review.monogon.dev/c/monogon/+/793
Reviewed-by: Mateusz Zalega <mateusz@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/node/core/roleserve/BUILD.bazel b/metropolis/node/core/roleserve/BUILD.bazel
index 3ac818d..2c9610c 100644
--- a/metropolis/node/core/roleserve/BUILD.bazel
+++ b/metropolis/node/core/roleserve/BUILD.bazel
@@ -1,4 +1,4 @@
-load("@io_bazel_rules_go//go:def.bzl", "go_library")
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
 
 go_library(
     name = "roleserve",
@@ -35,7 +35,26 @@
         "//metropolis/proto/common",
         "//metropolis/proto/private",
         "@org_golang_google_grpc//:go_default_library",
+        "@org_golang_google_protobuf//encoding/prototext",
         "@org_golang_google_protobuf//proto",
         "@org_golang_x_sys//unix",
     ],
 )
+
+go_test(
+    name = "roleserve_test",
+    srcs = ["worker_statuspush_test.go"],
+    embed = [":roleserve"],
+    deps = [
+        "//metropolis/node",
+        "//metropolis/node/core/curator/proto/api",
+        "//metropolis/pkg/supervisor",
+        "//metropolis/proto/common",
+        "@com_github_cenkalti_backoff_v4//:backoff",
+        "@com_github_google_go_cmp//cmp",
+        "@org_golang_google_grpc//:go_default_library",
+        "@org_golang_google_grpc//credentials/insecure",
+        "@org_golang_google_grpc//test/bufconn",
+        "@org_golang_google_protobuf//testing/protocmp",
+    ],
+)
diff --git a/metropolis/node/core/roleserve/value_clustermembership.go b/metropolis/node/core/roleserve/value_clustermembership.go
index 1a9ebc5..37f67e7 100644
--- a/metropolis/node/core/roleserve/value_clustermembership.go
+++ b/metropolis/node/core/roleserve/value_clustermembership.go
@@ -10,6 +10,7 @@
 
 	common "source.monogon.dev/metropolis/node"
 	"source.monogon.dev/metropolis/node/core/consensus"
+	"source.monogon.dev/metropolis/node/core/curator"
 	"source.monogon.dev/metropolis/node/core/identity"
 	"source.monogon.dev/metropolis/node/core/rpc"
 	"source.monogon.dev/metropolis/pkg/event"
@@ -32,9 +33,10 @@
 // but also accesses it to pass over information about already known remote
 // curators and to get the local node's identity.
 type ClusterMembership struct {
-	// localConsensus is set by the Control Plane Worker when this node runs control
-	// plane services.
+	// localConsensus and localCurator are set by the Control Plane Worker when this
+	// node runs control plane services.
 	localConsensus *consensus.Service
+	localCurator   *curator.Service
 	// remoteCurators gets set by Cluster Enrolment code when Registering into a
 	// cluster and gets propagated by the Control Plane Worker to maintain
 	// connectivity to external Curators regardless of local curator health.
diff --git a/metropolis/node/core/roleserve/worker_controlplane.go b/metropolis/node/core/roleserve/worker_controlplane.go
index 6f524ee..469df16 100644
--- a/metropolis/node/core/roleserve/worker_controlplane.go
+++ b/metropolis/node/core/roleserve/worker_controlplane.go
@@ -423,6 +423,7 @@
 			// ClusterMembership.
 			s.clusterMembership.set(&ClusterMembership{
 				localConsensus: con,
+				localCurator:   cur,
 				credentials:    creds,
 				remoteCurators: directory,
 				pubkey:         creds.PublicKey(),
diff --git a/metropolis/node/core/roleserve/worker_statuspush.go b/metropolis/node/core/roleserve/worker_statuspush.go
index cdeeb26..732508a 100644
--- a/metropolis/node/core/roleserve/worker_statuspush.go
+++ b/metropolis/node/core/roleserve/worker_statuspush.go
@@ -3,8 +3,11 @@
 import (
 	"context"
 	"fmt"
-	"net"
 
+	"google.golang.org/grpc"
+	"google.golang.org/protobuf/encoding/prototext"
+
+	common "source.monogon.dev/metropolis/node"
 	ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
 	"source.monogon.dev/metropolis/node/core/network"
 	"source.monogon.dev/metropolis/pkg/supervisor"
@@ -13,10 +16,6 @@
 
 // workerStatusPush is the Status Pusher, a service responsible for sending
 // UpdateNodeStatus RPCs to a cluster whenever a Curator is available.
-//
-// TODO(q3k): factor this out of the roleserver, there's no need for this to be
-// internal, as it only depends on ClusterMembership. This could maybe even live
-// in the Network service?
 type workerStatusPush struct {
 	network *network.Service
 
@@ -24,50 +23,155 @@
 	clusterMembership *ClusterMembershipValue
 }
 
-func (s *workerStatusPush) run(ctx context.Context) error {
-	nw := s.network.Watch()
-	defer nw.Close()
+// workerStatusPushChannels contain all the channels between the status pusher's
+// 'map' runnables (waiting on Event Values) and the main loop.
+type workerStatusPushChannels struct {
+	// address of the node, or empty if none. Retrieved from network service.
+	address chan string
+	// nodeID of this node. Populated whenever available from ClusterMembership.
+	nodeID chan string
+	// curatorClient connecting to the cluster, populated whenever available from
+	// ClusterMembership. Used to actually submit the update.
+	curatorClient chan ipb.CuratorClient
+	// localCurator describing whether this node has a locally running curator on
+	// the default port. Retrieved from ClusterMembership.
+	localCurator chan bool
+}
 
-	w := s.clusterMembership.Watch()
-	defer w.Close()
-	supervisor.Logger(ctx).Infof("Waiting for cluster membership...")
-	cm, err := w.GetHome(ctx)
-	if err != nil {
-		return err
-	}
-	supervisor.Logger(ctx).Infof("Got cluster membership, starting...")
+// workerStatusPushLoop runs the main loop acting on data received from
+// workerStatusPushChannels.
+func workerStatusPushLoop(ctx context.Context, chans *workerStatusPushChannels) error {
+	status := cpb.NodeStatus{}
+	var cur ipb.CuratorClient
+	var nodeID string
 
-	nodeID := cm.NodeID()
-	conn, err := cm.DialCurator()
-	if err != nil {
-		return err
-	}
-	defer conn.Close()
-	cur := ipb.NewCuratorClient(conn)
-
-	// Start watch on Network service, update IP address whenever new one is set.
-	supervisor.Signal(ctx, supervisor.SignalHealthy)
-	var external net.IP
 	for {
-		st, err := nw.Get(ctx)
-		if err != nil {
-			return fmt.Errorf("getting network status failed: %w", err)
+		changed := false
+
+		select {
+		case <-ctx.Done():
+			return fmt.Errorf("while waiting for map updates: %w", ctx.Err())
+
+		case address := <-chans.address:
+			if address != status.ExternalAddress {
+				supervisor.Logger(ctx).Infof("Got external address: %s", address)
+				status.ExternalAddress = address
+				changed = true
+			}
+
+		case newNodeID := <-chans.nodeID:
+			if nodeID != newNodeID {
+				supervisor.Logger(ctx).Infof("Got new NodeID: %s", newNodeID)
+				nodeID = newNodeID
+				changed = true
+			}
+
+		case cur = <-chans.curatorClient:
+			supervisor.Logger(ctx).Infof("Got curator connection.")
+
+		case localCurator := <-chans.localCurator:
+			if status.RunningCurator == nil && localCurator {
+				supervisor.Logger(ctx).Infof("Got new local curator state: running")
+				status.RunningCurator = &cpb.NodeStatus_RunningCurator{
+					Port: int32(common.CuratorServicePort),
+				}
+				changed = true
+			}
+			if status.RunningCurator != nil && !localCurator {
+				supervisor.Logger(ctx).Infof("Got new local curator state: not running")
+				status.RunningCurator = nil
+				changed = true
+			}
 		}
 
-		if external.Equal(st.ExternalAddress) {
-			continue
+		if cur != nil && nodeID != "" && changed && status.ExternalAddress != "" {
+			txt, _ := prototext.Marshal(&status)
+			supervisor.Logger(ctx).Infof("Submitting status: %q", txt)
+			_, err := cur.UpdateNodeStatus(ctx, &ipb.UpdateNodeStatusRequest{
+				NodeId: nodeID,
+				Status: &status,
+			})
+			if err != nil {
+				return fmt.Errorf("UpdateNodeStatus failed: %w", err)
+			}
 		}
-		supervisor.Logger(ctx).Infof("New external address (%s), submitting update to cluster...", st.ExternalAddress.String())
-		_, err = cur.UpdateNodeStatus(ctx, &ipb.UpdateNodeStatusRequest{
-			NodeId: nodeID,
-			Status: &cpb.NodeStatus{
-				ExternalAddress: st.ExternalAddress.String(),
-			},
-		})
-		if err != nil {
-			return fmt.Errorf("UpdateNodeStatus failed: %w", err)
-		}
-		external = st.ExternalAddress
-		supervisor.Logger(ctx).Infof("Updated.")
 	}
 }
+
+func (s *workerStatusPush) run(ctx context.Context) error {
+	chans := workerStatusPushChannels{
+		address:       make(chan string),
+		nodeID:        make(chan string),
+		curatorClient: make(chan ipb.CuratorClient),
+		localCurator:  make(chan bool),
+	}
+
+	// All the channel sends in the map runnables are preemptible by a context
+	// cancelation. This is because workerStatusPushLoop can crash while processing
+	// the events, requiring a restart of this runnable. Without the preemption this
+	// runnable could get stuck.
+
+	supervisor.Run(ctx, "map-network", func(ctx context.Context) error {
+		nw := s.network.Watch()
+		defer nw.Close()
+
+		supervisor.Signal(ctx, supervisor.SignalHealthy)
+		for {
+			st, err := nw.Get(ctx)
+			if err != nil {
+				return fmt.Errorf("getting network status failed: %w", err)
+			}
+			select {
+			case chans.address <- st.ExternalAddress.String():
+			case <-ctx.Done():
+				return ctx.Err()
+			}
+		}
+	})
+	supervisor.Run(ctx, "map-cluster-membership", func(ctx context.Context) error {
+		supervisor.Signal(ctx, supervisor.SignalHealthy)
+
+		var conn *grpc.ClientConn
+		defer func() {
+			if conn != nil {
+				conn.Close()
+			}
+		}()
+
+		w := s.clusterMembership.Watch()
+		defer w.Close()
+		supervisor.Logger(ctx).Infof("Waiting for cluster membership...")
+		for {
+			cm, err := w.GetHome(ctx)
+			if err != nil {
+				return fmt.Errorf("getting cluster membership status failed: %w", err)
+			}
+
+			if conn == nil {
+				conn, err = cm.DialCurator()
+				if err != nil {
+					return fmt.Errorf("when attempting to connect to curator: %w", err)
+				}
+				select {
+				case chans.curatorClient <- ipb.NewCuratorClient(conn):
+				case <-ctx.Done():
+					return ctx.Err()
+				}
+			}
+
+			select {
+			case chans.localCurator <- cm.localCurator != nil:
+			case <-ctx.Done():
+				return ctx.Err()
+			}
+			select {
+			case chans.nodeID <- cm.NodeID():
+			case <-ctx.Done():
+				return ctx.Err()
+			}
+		}
+	})
+
+	supervisor.Signal(ctx, supervisor.SignalHealthy)
+	return workerStatusPushLoop(ctx, &chans)
+}
diff --git a/metropolis/node/core/roleserve/worker_statuspush_test.go b/metropolis/node/core/roleserve/worker_statuspush_test.go
new file mode 100644
index 0000000..a285c77
--- /dev/null
+++ b/metropolis/node/core/roleserve/worker_statuspush_test.go
@@ -0,0 +1,144 @@
+package roleserve
+
+import (
+	"context"
+	"fmt"
+	"net"
+	"sync"
+	"testing"
+	"time"
+
+	"github.com/cenkalti/backoff/v4"
+	"github.com/google/go-cmp/cmp"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/credentials/insecure"
+	"google.golang.org/grpc/test/bufconn"
+	"google.golang.org/protobuf/testing/protocmp"
+
+	common "source.monogon.dev/metropolis/node"
+	ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+	"source.monogon.dev/metropolis/pkg/supervisor"
+	cpb "source.monogon.dev/metropolis/proto/common"
+)
+
+// statusRecodingCurator is a fake implementation of the Curator which updates
+// UpdateNodeStatus requests and logs them.
+type statusRecordingCurator struct {
+	ipb.UnimplementedCuratorServer
+
+	mu            sync.Mutex
+	statusReports []*ipb.UpdateNodeStatusRequest
+}
+
+func (f *statusRecordingCurator) UpdateNodeStatus(ctx context.Context, req *ipb.UpdateNodeStatusRequest) (*ipb.UpdateNodeStatusResponse, error) {
+	f.mu.Lock()
+	defer f.mu.Unlock()
+	f.statusReports = append(f.statusReports, req)
+	return &ipb.UpdateNodeStatusResponse{}, nil
+}
+
+// expectReports waits until the given requests have been logged by the
+// statusRecordingCurator.
+func (f *statusRecordingCurator) expectReports(t *testing.T, want []*ipb.UpdateNodeStatusRequest) {
+	t.Helper()
+
+	bo := backoff.NewExponentialBackOff()
+	bo.MaxElapsedTime = time.Second * 10
+	err := backoff.Retry(func() error {
+		f.mu.Lock()
+		defer f.mu.Unlock()
+
+		if diff := cmp.Diff(want, f.statusReports, protocmp.Transform()); diff != "" {
+			return fmt.Errorf("unexpected difference:\n%v", diff)
+		}
+		return nil
+	}, bo)
+	if err != nil {
+		t.Fatal(err)
+	}
+}
+
+// TestWorkerStatusPush ensures that the status push worker main loop behaves as
+// expected. It does not exercise the 'map' runnables.
+func TestWorkerStatusPush(t *testing.T) {
+	chans := workerStatusPushChannels{
+		address:       make(chan string),
+		nodeID:        make(chan string),
+		curatorClient: make(chan ipb.CuratorClient),
+		localCurator:  make(chan bool),
+	}
+
+	go supervisor.TestHarness(t, func(ctx context.Context) error {
+		supervisor.Signal(ctx, supervisor.SignalHealthy)
+		return workerStatusPushLoop(ctx, &chans)
+	})
+
+	// Build a loopback gRPC server served by the statusRecordingCurator and connect
+	// to it.
+	cur := &statusRecordingCurator{}
+	srv := grpc.NewServer()
+	defer srv.Stop()
+	ipb.RegisterCuratorServer(srv, cur)
+	lis := bufconn.Listen(1024 * 1024)
+	defer lis.Close()
+	go func() {
+		if err := srv.Serve(lis); err != nil {
+			t.Fatalf("GRPC serve failed: %v", err)
+		}
+	}()
+	withLocalDialer := grpc.WithContextDialer(func(_ context.Context, _ string) (net.Conn, error) {
+		return lis.Dial()
+	})
+	cl, err := grpc.Dial("local", withLocalDialer, grpc.WithTransportCredentials(insecure.NewCredentials()))
+	if err != nil {
+		t.Fatalf("Dial failed: %v", err)
+	}
+	defer cl.Close()
+
+	// Actual test code starts here.
+
+	chans.curatorClient <- ipb.NewCuratorClient(cl)
+	cur.expectReports(t, nil)
+
+	// Provide enough data for the first status report to be submitted.
+	chans.nodeID <- "1234"
+	chans.address <- "192.0.2.10"
+	cur.expectReports(t, []*ipb.UpdateNodeStatusRequest{
+		{NodeId: "1234", Status: &cpb.NodeStatus{
+			ExternalAddress: "192.0.2.10",
+		}},
+	})
+
+	// Spurious address update should be ignored.
+	chans.address <- "192.0.2.10"
+	chans.address <- "192.0.2.11"
+	cur.expectReports(t, []*ipb.UpdateNodeStatusRequest{
+		{NodeId: "1234", Status: &cpb.NodeStatus{
+			ExternalAddress: "192.0.2.10",
+		}},
+		{NodeId: "1234", Status: &cpb.NodeStatus{
+			ExternalAddress: "192.0.2.11",
+		}},
+	})
+
+	// Enabling and disabling local curator should work.
+	chans.localCurator <- true
+	chans.localCurator <- false
+	cur.expectReports(t, []*ipb.UpdateNodeStatusRequest{
+		{NodeId: "1234", Status: &cpb.NodeStatus{
+			ExternalAddress: "192.0.2.10",
+		}},
+		{NodeId: "1234", Status: &cpb.NodeStatus{
+			ExternalAddress: "192.0.2.11",
+		}},
+		{NodeId: "1234", Status: &cpb.NodeStatus{
+			ExternalAddress: "192.0.2.11",
+			RunningCurator: &cpb.NodeStatus_RunningCurator{
+				Port: int32(common.CuratorServicePort),
+			},
+		}},
+		{NodeId: "1234", Status: &cpb.NodeStatus{
+			ExternalAddress: "192.0.2.11",
+		}},
+	})
+}