metropolis/node/core: use curator

This finally switches over the node startup code to use the full Cluster
Manager / Curator / Role Server chain to bring up the node.

Change-Id: Iaf6173671aed107a67b4201d9d1ad8bb33baa90f
Reviewed-on: https://review.monogon.dev/c/monogon/+/189
Reviewed-by: Lorenz Brun <lorenz@nexantic.com>
diff --git a/metropolis/node/core/main.go b/metropolis/node/core/main.go
index 6f13a3a..566e65d 100644
--- a/metropolis/node/core/main.go
+++ b/metropolis/node/core/main.go
@@ -38,8 +38,7 @@
 	"source.monogon.dev/metropolis/node/core/localstorage"
 	"source.monogon.dev/metropolis/node/core/localstorage/declarative"
 	"source.monogon.dev/metropolis/node/core/network"
-	"source.monogon.dev/metropolis/node/kubernetes"
-	"source.monogon.dev/metropolis/node/kubernetes/containerd"
+	"source.monogon.dev/metropolis/node/core/roleserve"
 	"source.monogon.dev/metropolis/node/kubernetes/pki"
 	"source.monogon.dev/metropolis/pkg/logtree"
 	"source.monogon.dev/metropolis/pkg/supervisor"
@@ -47,23 +46,6 @@
 	apb "source.monogon.dev/metropolis/proto/api"
 )
 
-var (
-	// kubernetesConfig is the static/global part of the Kubernetes service
-	// configuration. In the future, this might be configurable by loading it
-	// from the EnrolmentConfig. Fow now, it's static and same across all
-	// clusters.
-	kubernetesConfig = kubernetes.Config{
-		ServiceIPRange: net.IPNet{ // TODO(q3k): Decide if configurable / final value
-			IP:   net.IP{10, 0, 255, 1},
-			Mask: net.IPMask{0xff, 0xff, 0xff, 0x00}, // /24, but Go stores as a literal mask
-		},
-		ClusterNet: net.IPNet{
-			IP:   net.IP{10, 0, 0, 0},
-			Mask: net.IPMask{0xff, 0xff, 0x00, 0x00}, // /16
-		},
-	}
-)
-
 func main() {
 	defer func() {
 		if r := recover(); r != nil {
@@ -141,8 +123,6 @@
 	// starting all services related to the node's roles.
 	// TODO(q3k): move this to a separate 'init' service.
 	supervisor.New(ctxS, func(ctx context.Context) error {
-		logger := supervisor.Logger(ctx)
-
 		// Start storage and network - we need this to get anything else done.
 		if err := root.Start(ctx); err != nil {
 			return fmt.Errorf("cannot start root FS: %w", err)
@@ -166,63 +146,77 @@
 			return fmt.Errorf("new couldn't find home in new cluster, aborting: %w", err)
 		}
 
-		// Start cluster curator.
-		kv, err := status.ConsensusClient(cluster.ConsensusUserCurator)
+		// Here starts some hairy stopgap code. In the future, not all nodes will have
+		// direct access to etcd (ie. the ability to retrieve an etcd client via
+		// status.ConsensusClient).
+		// However, we are not ready to implement this yet, as that would require
+		// moving more logic into the curator (eg. some of the Kubernetes PKI logic).
+		//
+		// For now, we keep Kubernetes PKI initialization logic here, and just assume
+		// that every node will have direct access to etcd.
+
+		// Retrieve namespaced etcd KV clients for the two main direct etcd users:
+		// - Curator
+		// - Kubernetes PKI
+		ckv, err := status.ConsensusClient(cluster.ConsensusUserCurator)
 		if err != nil {
+			close(trapdoor)
 			return fmt.Errorf("failed to retrieve consensus curator client: %w", err)
 		}
+		kkv, err := status.ConsensusClient(cluster.ConsensusUserKubernetesPKI)
+		if err != nil {
+			close(trapdoor)
+			return fmt.Errorf("failed to retrieve consensus kubernetes PKI client: %w", err)
+		}
+
+		// Start cluster curator. The cluster curator is responsible for lifecycle
+		// management of the cluster.
+		// In the future, this will only be started on nodes that run etcd.
 		c := curator.New(curator.Config{
-			Etcd:   kv,
+			Etcd:   ckv,
 			NodeID: status.Credentials.ID(),
 			// TODO(q3k): make this configurable?
 			LeaderTTL: time.Second * 5,
 			Directory: &root.Ephemeral.Curator,
 		})
 		if err := supervisor.Run(ctx, "curator", c.Run); err != nil {
+			close(trapdoor)
 			return fmt.Errorf("when starting curator: %w", err)
 		}
 
 		// We are now in a cluster. We can thus access our 'node' object and
 		// start all services that we should be running.
-
 		logger.Info("Enrolment success, continuing startup.")
 
-		// HACK: always start k8s worker, this is being currently refactored
-		// and will get fixed in review/188.
-		logger.Info("Starting Kubernetes worker services...")
-
-		kkv, err := status.ConsensusClient(cluster.ConsensusUserKubernetesPKI)
-		if err != nil {
-			return fmt.Errorf("failed to retrieve consensus kubernetes PKI client: %w", err)
-		}
-
-		// Ensure Kubernetes PKI objects exist in etcd.
+		// Ensure Kubernetes PKI objects exist in etcd. In the future, this logic will
+		// be implemented in the curator.
 		kpki := pki.New(lt.MustLeveledFor("pki.kubernetes"), kkv)
 		if err := kpki.EnsureAll(ctx); err != nil {
+			close(trapdoor)
 			return fmt.Errorf("failed to ensure kubernetes PKI present: %w", err)
 		}
 
-		containerdSvc := &containerd.Service{
-			EphemeralVolume: &root.Ephemeral.Containerd,
-		}
-		if err := supervisor.Run(ctx, "containerd", containerdSvc.Run); err != nil {
-			return fmt.Errorf("failed to start containerd service: %w", err)
-		}
-
-		kubernetesConfig.KPKI = kpki
-		kubernetesConfig.Root = root
-		kubernetesConfig.Network = networkSvc
-		kubeSvc := kubernetes.New(kubernetesConfig)
-		if err := supervisor.Run(ctx, "kubernetes", kubeSvc.Run); err != nil {
-			return fmt.Errorf("failed to start kubernetes service: %w", err)
+		// Start the role service. The role service connects to the curator and runs
+		// all node-specific role code (eg. Kubernetes services).
+		//   supervisor.Logger(ctx).Infof("Starting role service...")
+		rs := roleserve.New(roleserve.Config{
+			CuratorDial: c.DialCluster,
+			StorageRoot: root,
+			Network:     networkSvc,
+			KPKI:        kpki,
+			NodeID:      status.Credentials.ID(),
+		})
+		if err := supervisor.Run(ctx, "role", rs.Run); err != nil {
+			close(trapdoor)
+			return fmt.Errorf("failed to start role service: %w", err)
 		}
 
 		// Start the node debug service.
+		supervisor.Logger(ctx).Infof("Starting debug service...")
 		dbg := &debugService{
-			cluster:    m,
-			logtree:    lt,
-			kubernetes: kubeSvc,
-			traceLock:  make(chan struct{}, 1),
+			roleserve: rs,
+			logtree:   lt,
+			traceLock: make(chan struct{}, 1),
 		}
 		dbgSrv := grpc.NewServer()
 		apb.RegisterNodeDebugServiceServer(dbgSrv, dbg)