m/n/core/mgmt: implement node-local management service

Change-Id: I1e8a8ff46d1172e00f2d991ae3cc3af1929b6e4e
Reviewed-on: https://review.monogon.dev/c/monogon/+/1428
Tested-by: Jenkins CI
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/node/core/mgmt/BUILD.bazel b/metropolis/node/core/mgmt/BUILD.bazel
new file mode 100644
index 0000000..41a25d2
--- /dev/null
+++ b/metropolis/node/core/mgmt/BUILD.bazel
@@ -0,0 +1,16 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library")
+
+go_library(
+    name = "mgmt",
+    srcs = ["mgmt.go"],
+    importpath = "source.monogon.dev/metropolis/node/core/mgmt",
+    visibility = ["//visibility:public"],
+    deps = [
+        "//metropolis/node",
+        "//metropolis/node/core/identity",
+        "//metropolis/node/core/rpc",
+        "//metropolis/pkg/supervisor",
+        "//metropolis/proto/api",
+        "@org_golang_google_grpc//:go_default_library",
+    ],
+)
diff --git a/metropolis/node/core/mgmt/mgmt.go b/metropolis/node/core/mgmt/mgmt.go
new file mode 100644
index 0000000..5fa12a0
--- /dev/null
+++ b/metropolis/node/core/mgmt/mgmt.go
@@ -0,0 +1,46 @@
+// Package mgmt implements the node-local management service, a.k.a.
+// metropolis.proto.api.NodeManagement.
+package mgmt
+
+import (
+	"context"
+	"fmt"
+	"net"
+
+	"google.golang.org/grpc"
+
+	"source.monogon.dev/metropolis/node"
+	"source.monogon.dev/metropolis/node/core/identity"
+	"source.monogon.dev/metropolis/node/core/rpc"
+	"source.monogon.dev/metropolis/pkg/supervisor"
+
+	apb "source.monogon.dev/metropolis/proto/api"
+)
+
+type Service struct {
+	NodeCredentials *identity.NodeCredentials
+}
+
+func (s *Service) Run(ctx context.Context) error {
+	sec := rpc.ServerSecurity{
+		NodeCredentials: s.NodeCredentials,
+	}
+	logger := supervisor.MustSubLogger(ctx, "rpc")
+	opts := sec.GRPCOptions(logger)
+	lis, err := net.Listen("tcp", fmt.Sprintf(":%d", node.NodeManagement))
+	if err != nil {
+		return fmt.Errorf("failed to listen on node management socket socket: %w", err)
+	}
+	defer lis.Close()
+
+	srv := grpc.NewServer(opts...)
+	apb.RegisterNodeManagementServer(srv, s)
+
+	runnable := supervisor.GRPCServer(srv, lis, false)
+	if err := supervisor.Run(ctx, "server", runnable); err != nil {
+		return fmt.Errorf("could not run server: %w", err)
+	}
+	supervisor.Signal(ctx, supervisor.SignalHealthy)
+	<-ctx.Done()
+	return ctx.Err()
+}
diff --git a/metropolis/node/core/roleserve/BUILD.bazel b/metropolis/node/core/roleserve/BUILD.bazel
index 5980563..c2dada4 100644
--- a/metropolis/node/core/roleserve/BUILD.bazel
+++ b/metropolis/node/core/roleserve/BUILD.bazel
@@ -11,6 +11,7 @@
         "worker_controlplane.go",
         "worker_heartbeat.go",
         "worker_kubernetes.go",
+        "worker_nodemgmt.go",
         "worker_rolefetch.go",
         "worker_statuspush.go",
     ],
@@ -23,6 +24,7 @@
         "//metropolis/node/core/curator/proto/api",
         "//metropolis/node/core/identity",
         "//metropolis/node/core/localstorage",
+        "//metropolis/node/core/mgmt",
         "//metropolis/node/core/network",
         "//metropolis/node/core/rpc",
         "//metropolis/node/core/rpc/resolver",
diff --git a/metropolis/node/core/roleserve/roleserve.go b/metropolis/node/core/roleserve/roleserve.go
index 0d0997d..8f9bb47 100644
--- a/metropolis/node/core/roleserve/roleserve.go
+++ b/metropolis/node/core/roleserve/roleserve.go
@@ -85,6 +85,7 @@
 	heartbeat    *workerHeartbeat
 	kubernetes   *workerKubernetes
 	rolefetch    *workerRoleFetch
+	nodeMgmt     *workerNodeMgmt
 }
 
 // New creates a Role Server services from a Config.
@@ -129,6 +130,10 @@
 		localRoles: &s.localRoles,
 	}
 
+	s.nodeMgmt = &workerNodeMgmt{
+		clusterMembership: &s.ClusterMembership,
+	}
+
 	return s
 }
 
@@ -187,6 +192,7 @@
 	supervisor.Run(ctx, "statuspush", s.statusPush.run)
 	supervisor.Run(ctx, "heartbeat", s.heartbeat.run)
 	supervisor.Run(ctx, "rolefetch", s.rolefetch.run)
+	supervisor.Run(ctx, "nodemgmt", s.nodeMgmt.run)
 	supervisor.Signal(ctx, supervisor.SignalHealthy)
 
 	<-ctx.Done()
diff --git a/metropolis/node/core/roleserve/worker_nodemgmt.go b/metropolis/node/core/roleserve/worker_nodemgmt.go
new file mode 100644
index 0000000..889f0eb
--- /dev/null
+++ b/metropolis/node/core/roleserve/worker_nodemgmt.go
@@ -0,0 +1,29 @@
+package roleserve
+
+import (
+	"context"
+
+	"source.monogon.dev/metropolis/node/core/mgmt"
+	"source.monogon.dev/metropolis/pkg/event/memory"
+	"source.monogon.dev/metropolis/pkg/supervisor"
+)
+
+type workerNodeMgmt struct {
+	clusterMembership *memory.Value[*ClusterMembership]
+}
+
+func (s *workerNodeMgmt) run(ctx context.Context) error {
+	w := s.clusterMembership.Watch()
+	defer w.Close()
+	supervisor.Logger(ctx).Infof("Waiting for cluster membership...")
+	cm, err := w.Get(ctx, FilterHome())
+	if err != nil {
+		return err
+	}
+
+	supervisor.Logger(ctx).Infof("Got cluster membership, starting...")
+	srv := mgmt.Service{
+		NodeCredentials: cm.credentials,
+	}
+	return srv.Run(ctx)
+}
diff --git a/metropolis/node/ports.go b/metropolis/node/ports.go
index f4ffa0b..440f127 100644
--- a/metropolis/node/ports.go
+++ b/metropolis/node/ports.go
@@ -34,6 +34,9 @@
 	// WireGuardPort is the UDP port on which the Wireguard Kubernetes network
 	// overlay listens for incoming peer traffic.
 	WireGuardPort Port = 7838
+	// NodeManagement is the TCP port on which the node-local management service
+	// serves gRPC traffic for NodeManagement.
+	NodeManagement Port = 7839
 	// KubernetesAPIPort is the TCP port on which the Kubernetes API is
 	// exposed.
 	KubernetesAPIPort Port = 6443
@@ -59,6 +62,8 @@
 		return "debug"
 	case WireGuardPort:
 		return "wireguard"
+	case NodeManagement:
+		return "node-mgmt"
 	case KubernetesAPIPort:
 		return "kubernetes-api"
 	case KubernetesWorkerLocalAPIPort: