m/n/core/mgmt: implement node-local management service
Change-Id: I1e8a8ff46d1172e00f2d991ae3cc3af1929b6e4e
Reviewed-on: https://review.monogon.dev/c/monogon/+/1428
Tested-by: Jenkins CI
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/node/core/mgmt/BUILD.bazel b/metropolis/node/core/mgmt/BUILD.bazel
new file mode 100644
index 0000000..41a25d2
--- /dev/null
+++ b/metropolis/node/core/mgmt/BUILD.bazel
@@ -0,0 +1,16 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library")
+
+go_library(
+ name = "mgmt",
+ srcs = ["mgmt.go"],
+ importpath = "source.monogon.dev/metropolis/node/core/mgmt",
+ visibility = ["//visibility:public"],
+ deps = [
+ "//metropolis/node",
+ "//metropolis/node/core/identity",
+ "//metropolis/node/core/rpc",
+ "//metropolis/pkg/supervisor",
+ "//metropolis/proto/api",
+ "@org_golang_google_grpc//:go_default_library",
+ ],
+)
diff --git a/metropolis/node/core/mgmt/mgmt.go b/metropolis/node/core/mgmt/mgmt.go
new file mode 100644
index 0000000..5fa12a0
--- /dev/null
+++ b/metropolis/node/core/mgmt/mgmt.go
@@ -0,0 +1,46 @@
+// Package mgmt implements the node-local management service, a.k.a.
+// metropolis.proto.api.NodeManagement.
+package mgmt
+
+import (
+ "context"
+ "fmt"
+ "net"
+
+ "google.golang.org/grpc"
+
+ "source.monogon.dev/metropolis/node"
+ "source.monogon.dev/metropolis/node/core/identity"
+ "source.monogon.dev/metropolis/node/core/rpc"
+ "source.monogon.dev/metropolis/pkg/supervisor"
+
+ apb "source.monogon.dev/metropolis/proto/api"
+)
+
+type Service struct {
+ NodeCredentials *identity.NodeCredentials
+}
+
+func (s *Service) Run(ctx context.Context) error {
+ sec := rpc.ServerSecurity{
+ NodeCredentials: s.NodeCredentials,
+ }
+ logger := supervisor.MustSubLogger(ctx, "rpc")
+ opts := sec.GRPCOptions(logger)
+ lis, err := net.Listen("tcp", fmt.Sprintf(":%d", node.NodeManagement))
+ if err != nil {
+ return fmt.Errorf("failed to listen on node management socket socket: %w", err)
+ }
+ defer lis.Close()
+
+ srv := grpc.NewServer(opts...)
+ apb.RegisterNodeManagementServer(srv, s)
+
+ runnable := supervisor.GRPCServer(srv, lis, false)
+ if err := supervisor.Run(ctx, "server", runnable); err != nil {
+ return fmt.Errorf("could not run server: %w", err)
+ }
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ <-ctx.Done()
+ return ctx.Err()
+}
diff --git a/metropolis/node/core/roleserve/BUILD.bazel b/metropolis/node/core/roleserve/BUILD.bazel
index 5980563..c2dada4 100644
--- a/metropolis/node/core/roleserve/BUILD.bazel
+++ b/metropolis/node/core/roleserve/BUILD.bazel
@@ -11,6 +11,7 @@
"worker_controlplane.go",
"worker_heartbeat.go",
"worker_kubernetes.go",
+ "worker_nodemgmt.go",
"worker_rolefetch.go",
"worker_statuspush.go",
],
@@ -23,6 +24,7 @@
"//metropolis/node/core/curator/proto/api",
"//metropolis/node/core/identity",
"//metropolis/node/core/localstorage",
+ "//metropolis/node/core/mgmt",
"//metropolis/node/core/network",
"//metropolis/node/core/rpc",
"//metropolis/node/core/rpc/resolver",
diff --git a/metropolis/node/core/roleserve/roleserve.go b/metropolis/node/core/roleserve/roleserve.go
index 0d0997d..8f9bb47 100644
--- a/metropolis/node/core/roleserve/roleserve.go
+++ b/metropolis/node/core/roleserve/roleserve.go
@@ -85,6 +85,7 @@
heartbeat *workerHeartbeat
kubernetes *workerKubernetes
rolefetch *workerRoleFetch
+ nodeMgmt *workerNodeMgmt
}
// New creates a Role Server services from a Config.
@@ -129,6 +130,10 @@
localRoles: &s.localRoles,
}
+ s.nodeMgmt = &workerNodeMgmt{
+ clusterMembership: &s.ClusterMembership,
+ }
+
return s
}
@@ -187,6 +192,7 @@
supervisor.Run(ctx, "statuspush", s.statusPush.run)
supervisor.Run(ctx, "heartbeat", s.heartbeat.run)
supervisor.Run(ctx, "rolefetch", s.rolefetch.run)
+ supervisor.Run(ctx, "nodemgmt", s.nodeMgmt.run)
supervisor.Signal(ctx, supervisor.SignalHealthy)
<-ctx.Done()
diff --git a/metropolis/node/core/roleserve/worker_nodemgmt.go b/metropolis/node/core/roleserve/worker_nodemgmt.go
new file mode 100644
index 0000000..889f0eb
--- /dev/null
+++ b/metropolis/node/core/roleserve/worker_nodemgmt.go
@@ -0,0 +1,29 @@
+package roleserve
+
+import (
+ "context"
+
+ "source.monogon.dev/metropolis/node/core/mgmt"
+ "source.monogon.dev/metropolis/pkg/event/memory"
+ "source.monogon.dev/metropolis/pkg/supervisor"
+)
+
+type workerNodeMgmt struct {
+ clusterMembership *memory.Value[*ClusterMembership]
+}
+
+func (s *workerNodeMgmt) run(ctx context.Context) error {
+ w := s.clusterMembership.Watch()
+ defer w.Close()
+ supervisor.Logger(ctx).Infof("Waiting for cluster membership...")
+ cm, err := w.Get(ctx, FilterHome())
+ if err != nil {
+ return err
+ }
+
+ supervisor.Logger(ctx).Infof("Got cluster membership, starting...")
+ srv := mgmt.Service{
+ NodeCredentials: cm.credentials,
+ }
+ return srv.Run(ctx)
+}
diff --git a/metropolis/node/ports.go b/metropolis/node/ports.go
index f4ffa0b..440f127 100644
--- a/metropolis/node/ports.go
+++ b/metropolis/node/ports.go
@@ -34,6 +34,9 @@
// WireGuardPort is the UDP port on which the Wireguard Kubernetes network
// overlay listens for incoming peer traffic.
WireGuardPort Port = 7838
+ // NodeManagement is the TCP port on which the node-local management service
+ // serves gRPC traffic for NodeManagement.
+ NodeManagement Port = 7839
// KubernetesAPIPort is the TCP port on which the Kubernetes API is
// exposed.
KubernetesAPIPort Port = 6443
@@ -59,6 +62,8 @@
return "debug"
case WireGuardPort:
return "wireguard"
+ case NodeManagement:
+ return "node-mgmt"
case KubernetesAPIPort:
return "kubernetes-api"
case KubernetesWorkerLocalAPIPort: