core/internal/cluster: add new single-node cluster code

This adds a cluster library, that consists of:
 - a Node object that can be loaded from and saved into etcd,
   representing a node of the cluster that can have different 'role
   tags' assigned to it
 - a cluster Manager, that is responsible for bringing up the local node
   into a cluster (by creaating a new cluster, enrolling into or joining a
   cluster)

This also gets wired into core/cmd/init, and as such completes a chunk
of The Refactor. This code should pass tests.

Test Plan: this should work! should be covered by existing e2e tests.

X-Origin-Diff: phab/D590
GitOrigin-RevId: e88022164e4353249b29fc16849a02805f15dd49
diff --git a/core/cmd/init/BUILD.bazel b/core/cmd/init/BUILD.bazel
index 368470f..2f8103f 100644
--- a/core/cmd/init/BUILD.bazel
+++ b/core/cmd/init/BUILD.bazel
@@ -4,21 +4,30 @@
     name = "go_default_library",
     # keep
     srcs = [
+        "debug_service.go",
         "main.go",
         "switchroot.go",
     ] + select({
-        "//core:debug_build": ["debug_enabled.go"],
-        "//conditions:default": ["debug_disabled.go"],
+        "//core:debug_build": ["delve_enabled.go"],
+        "//conditions:default": ["delve_disabled.go"],
     }),
     importpath = "git.monogon.dev/source/nexantic.git/core/cmd/init",
     visibility = ["//visibility:private"],
     deps = [
-        "//core/internal/common:go_default_library",  # keep
+        "//core/internal/cluster:go_default_library",
+        "//core/internal/common:go_default_library",
         "//core/internal/common/supervisor:go_default_library",
+        "//core/internal/containerd:go_default_library",
+        "//core/internal/kubernetes:go_default_library",
+        "//core/internal/kubernetes/pki:go_default_library",
+        "//core/internal/localstorage:go_default_library",
+        "//core/internal/localstorage/declarative:go_default_library",
         "//core/internal/network:go_default_library",
         "//core/pkg/tpm:go_default_library",
-        "@de_cinfra_git_source_nexantic_git//core/internal/node:go_default_library",
-        "@de_cinfra_git_source_nexantic_git//core/internal/storage:go_default_library",
+        "//core/proto/api:go_default_library",
+        "@org_golang_google_grpc//:go_default_library",
+        "@org_golang_google_grpc//codes:go_default_library",
+        "@org_golang_google_grpc//status:go_default_library",
         "@org_golang_x_sys//unix:go_default_library",
         "@org_uber_go_zap//:go_default_library",
     ],
diff --git a/core/cmd/init/debug_service.go b/core/cmd/init/debug_service.go
new file mode 100644
index 0000000..e66b6d7
--- /dev/null
+++ b/core/cmd/init/debug_service.go
@@ -0,0 +1,95 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+	"context"
+	"math"
+
+	"git.monogon.dev/source/nexantic.git/core/internal/cluster"
+	"git.monogon.dev/source/nexantic.git/core/internal/containerd"
+	"git.monogon.dev/source/nexantic.git/core/internal/kubernetes"
+	apb "git.monogon.dev/source/nexantic.git/core/proto/api"
+
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+)
+
+// debugService implements the Smalltown node debug API.
+// TODO(q3k): this should probably be implemented somewhere else way once we have a better
+// supervision introspection/status API.
+type debugService struct {
+	cluster    *cluster.Manager
+	kubernetes *kubernetes.Service
+	containerd *containerd.Service
+}
+
+func (s *debugService) GetDebugKubeconfig(ctx context.Context, req *apb.GetDebugKubeconfigRequest) (*apb.GetDebugKubeconfigResponse, error) {
+	return s.kubernetes.GetDebugKubeconfig(ctx, req)
+}
+
+// GetComponentLogs gets various logbuffers from binaries we call. This function just deals with the first path component,
+// delegating the rest to the service-specific handlers.
+func (s *debugService) GetComponentLogs(ctx context.Context, req *apb.GetComponentLogsRequest) (*apb.GetComponentLogsResponse, error) {
+	if len(req.ComponentPath) < 1 {
+		return nil, status.Error(codes.InvalidArgument, "component_path needs to contain at least one part")
+	}
+	linesToRead := int(req.TailLines)
+	if linesToRead == 0 {
+		linesToRead = math.MaxInt32
+	}
+	var lines []string
+	var err error
+	switch req.ComponentPath[0] {
+	case "containerd":
+		if len(req.ComponentPath) < 2 {
+			lines = s.containerd.Log.ReadLinesTruncated(linesToRead, "...")
+		} else if req.ComponentPath[1] == "runsc" {
+			lines = s.containerd.RunscLog.ReadLinesTruncated(linesToRead, "...")
+		}
+	case "kube":
+		if len(req.ComponentPath) < 2 {
+			return nil, status.Error(codes.NotFound, "Component not found")
+		}
+		lines, err = s.kubernetes.GetComponentLogs(req.ComponentPath[1], linesToRead)
+		if err != nil {
+			return nil, status.Error(codes.NotFound, "Component not found")
+		}
+	default:
+		return nil, status.Error(codes.NotFound, "component not found")
+	}
+	return &apb.GetComponentLogsResponse{Line: lines}, nil
+}
+
+// GetCondition checks for various conditions exposed by different services. Mostly intended for testing. If you need
+// to make sure something is available in an E2E test, consider adding a condition here.
+// TODO(q3k): since all conditions are now 'true' after the node lifecycle refactor, remove this call - or, start the
+// debug service earlier.
+func (s *debugService) GetCondition(ctx context.Context, req *apb.GetConditionRequest) (*apb.GetConditionResponse, error) {
+	var ok bool
+	switch req.Name {
+	case "IPAssigned":
+		ok = true
+	case "DataAvailable":
+		ok = true
+	default:
+		return nil, status.Errorf(codes.NotFound, "condition %v not found", req.Name)
+	}
+	return &apb.GetConditionResponse{
+		Ok: ok,
+	}, nil
+}
diff --git a/core/cmd/init/debug_disabled.go b/core/cmd/init/delve_disabled.go
similarity index 100%
rename from core/cmd/init/debug_disabled.go
rename to core/cmd/init/delve_disabled.go
diff --git a/core/cmd/init/debug_enabled.go b/core/cmd/init/delve_enabled.go
similarity index 100%
rename from core/cmd/init/debug_enabled.go
rename to core/cmd/init/delve_enabled.go
diff --git a/core/cmd/init/main.go b/core/cmd/init/main.go
index f5b09f8..0dc7d5e 100644
--- a/core/cmd/init/main.go
+++ b/core/cmd/init/main.go
@@ -19,23 +19,42 @@
 import (
 	"context"
 	"fmt"
+	"log"
+	"net"
 	"os"
 	"os/signal"
 	"runtime/debug"
 
-	"git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
-	"git.monogon.dev/source/nexantic.git/core/internal/network"
-	"git.monogon.dev/source/nexantic.git/core/internal/node"
-	"git.monogon.dev/source/nexantic.git/core/internal/storage"
-	"git.monogon.dev/source/nexantic.git/core/pkg/tpm"
-
 	"go.uber.org/zap"
 	"golang.org/x/sys/unix"
+	"google.golang.org/grpc"
+
+	"git.monogon.dev/source/nexantic.git/core/internal/cluster"
+	"git.monogon.dev/source/nexantic.git/core/internal/common"
+	"git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
+	"git.monogon.dev/source/nexantic.git/core/internal/containerd"
+	"git.monogon.dev/source/nexantic.git/core/internal/kubernetes"
+	"git.monogon.dev/source/nexantic.git/core/internal/kubernetes/pki"
+	"git.monogon.dev/source/nexantic.git/core/internal/localstorage"
+	"git.monogon.dev/source/nexantic.git/core/internal/localstorage/declarative"
+	"git.monogon.dev/source/nexantic.git/core/internal/network"
+	"git.monogon.dev/source/nexantic.git/core/pkg/tpm"
+	apb "git.monogon.dev/source/nexantic.git/core/proto/api"
 )
 
-const (
-	apiPort       = 7833
-	consensusPort = 7834
+var (
+	// kubernetesConfig is the static/global part of the Kubernetes service configuration. In the future, this might
+	// be configurable by loading it from the EnrolmentConfig. Fow now, it's static and same across all clusters.
+	kubernetesConfig = kubernetes.Config{
+		ServiceIPRange: net.IPNet{ // TODO(q3k): Decide if configurable / final value
+			IP:   net.IP{192, 168, 188, 0},
+			Mask: net.IPMask{0xff, 0xff, 0xff, 0x00}, // /24, but Go stores as a literal mask
+		},
+		ClusterNet: net.IPNet{
+			IP:   net.IP{10, 0, 0, 0},
+			Mask: net.IPMask{0xff, 0xff, 0x00, 0x00}, // /16
+		},
+	}
 )
 
 func main() {
@@ -76,70 +95,176 @@
 		logger.Panic("Failed to initialize TPM 2.0", zap.Error(err))
 	}
 
-	storageManager, err := storage.Initialize(logger.With(zap.String("component", "storage")))
-	if err != nil {
-		panic(fmt.Errorf("could not initialize storage: %w", err))
-	}
-
 	networkSvc := network.New(network.Config{})
-	if err != nil {
-		panic(err)
-	}
 
 	// This function initializes a headless Delve if this is a debug build or does nothing if it's not
 	initializeDebugger(networkSvc)
 
-	supervisor.New(context.Background(), logger, func(ctx context.Context) error {
+	// Prepare local storage.
+	root := &localstorage.Root{}
+	if err := declarative.PlaceFS(root, "/"); err != nil {
+		panic(fmt.Errorf("when placing root FS: %w", err))
+	}
+
+	// trapdoor is a channel used to signal to the init service that a very low-level, unrecoverable failure
+	// occured. This causes a GURU MEDITATION ERROR visible to the end user.
+	trapdoor := make(chan struct{})
+
+	// Make context for supervisor. We cancel it when we reach the trapdoor.
+	ctxS, ctxC := context.WithCancel(context.Background())
+
+	// Start root initialization code as a supervisor one-shot runnable. This means waiting for the network, starting
+	// the cluster manager, and then starting all services related to the node's roles.
+	// TODO(q3k): move this to a separate 'init' service.
+	supervisor.New(ctxS, logger, func(ctx context.Context) error {
+		logger := supervisor.Logger(ctx)
+
+		// Start storage and network - we need this to get anything else done.
+		if err := root.Start(ctx); err != nil {
+			return fmt.Errorf("cannot start root FS: %w", err)
+		}
 		if err := supervisor.Run(ctx, "network", networkSvc.Run); err != nil {
-			return err
+			return fmt.Errorf("when starting network: %w", err)
 		}
 
-		// TODO(q3k): convert node code to supervisor
-		nodeInstance, err := node.NewSmalltownNode(logger, networkSvc, storageManager)
+		// Wait for IP address from network.
+		ip, err := networkSvc.GetIP(ctx, true)
 		if err != nil {
-			panic(err)
+			return fmt.Errorf("when waiting for IP address: %w", err)
 		}
 
-		err = nodeInstance.Start(ctx)
+		// Start cluster manager. This kicks off cluster membership machinery, which will either start
+		// a new cluster, enroll into one or join one.
+		m := cluster.NewManager(root, networkSvc)
+		if err := supervisor.Run(ctx, "enrolment", m.Run); err != nil {
+			return fmt.Errorf("when starting enrolment: %w", err)
+		}
+
+		// Wait until the cluster manager settles.
+		success := m.WaitFinished()
+		if !success {
+			close(trapdoor)
+			return fmt.Errorf("enrolment failed, aborting")
+		}
+
+		// We are now in a cluster. We can thus access our 'node' object and start all services that
+		// we should be running.
+
+		node := m.Node()
+		if err := node.ConfigureLocalHostname(&root.Etc); err != nil {
+			close(trapdoor)
+			return fmt.Errorf("failed to set local hostname: %w", err)
+		}
+
+		logger.Info("Enrolment success, continuing startup.")
+		logger.Info(fmt.Sprintf("This node (%s) has roles:", node.String()))
+		if cm := node.ConsensusMember(); cm != nil {
+			// There's no need to start anything for when we are a consensus member - the cluster
+			// manager does this for us if necessary (as creating/enrolling/joining a cluster is
+			// pretty tied into cluster lifecycle management).
+			logger.Info(fmt.Sprintf(" - etcd consensus member"))
+		}
+		if kw := node.KubernetesWorker(); kw != nil {
+			logger.Info(fmt.Sprintf(" - kubernetes worker"))
+		}
+
+		// If we're supposed to be a kubernetes worker, start kubernetes services and containerd.
+		// In the future, this might be split further into kubernetes control plane and data plane
+		// roles.
+		var containerdSvc *containerd.Service
+		var kubeSvc *kubernetes.Service
+		if kw := node.KubernetesWorker(); kw != nil {
+			logger.Info("Starting Kubernetes worker services...")
+
+			// Ensure Kubernetes PKI objects exist in etcd.
+			kpkiKV := m.ConsensusKV("cluster", "kpki")
+			kpki := pki.NewKubernetes(logger.Named("kpki"), kpkiKV)
+			if err := kpki.EnsureAll(ctx); err != nil {
+				return fmt.Errorf("failed to ensure kubernetes PKI present: %w", err)
+			}
+
+			containerdSvc = &containerd.Service{
+				EphemeralVolume: &root.Ephemeral.Containerd,
+			}
+			if err := supervisor.Run(ctx, "containerd", containerdSvc.Run); err != nil {
+				return fmt.Errorf("failed to start containerd service: %w", err)
+			}
+
+			kubernetesConfig.KPKI = kpki
+			kubernetesConfig.Root = root
+			kubernetesConfig.AdvertiseAddress = *ip
+			kubeSvc = kubernetes.New(kubernetesConfig)
+			if err := supervisor.Run(ctx, "kubernetes", kubeSvc.Run); err != nil {
+				return fmt.Errorf("failed to start kubernetes service: %w", err)
+			}
+
+		}
+
+		// Start the node debug service.
+		// TODO(q3k): this needs to be done in a smarter way once LogTree lands, and then a few things can be
+		// refactored to start this earlier, or this can be split up into a multiple gRPC service on a single listener.
+		dbg := &debugService{
+			cluster:    m,
+			containerd: containerdSvc,
+			kubernetes: kubeSvc,
+		}
+		dbgSrv := grpc.NewServer()
+		apb.RegisterNodeDebugServiceServer(dbgSrv, dbg)
+		dbgLis, err := net.Listen("tcp", fmt.Sprintf(":%d", common.DebugServicePort))
 		if err != nil {
-			panic(err)
+			return fmt.Errorf("failed to listen on debug service: %w", err)
+		}
+		if err := supervisor.Run(ctx, "debug", supervisor.GRPCServer(dbgSrv, dbgLis, false)); err != nil {
+			return fmt.Errorf("failed to start debug service: %w", err)
 		}
 
 		supervisor.Signal(ctx, supervisor.SignalHealthy)
+		supervisor.Signal(ctx, supervisor.SignalDone)
+		return nil
+	})
 
-		// We're PID1, so orphaned processes get reparented to us to clean up
-		for {
-			select {
-			case <-ctx.Done():
-				return ctx.Err()
-			case sig := <-signalChannel:
-				switch sig {
-				case unix.SIGCHLD:
-					var status unix.WaitStatus
-					var rusage unix.Rusage
-					for {
-						res, err := unix.Wait4(-1, &status, unix.WNOHANG, &rusage)
-						if err != nil && err != unix.ECHILD {
-							logger.Error("Failed to wait on orphaned child", zap.Error(err))
-							break
-						}
-						if res <= 0 {
-							break
-						}
+	// We're PID1, so orphaned processes get reparented to us to clean up
+	for {
+		select {
+		case <-trapdoor:
+			// If the trapdoor got closed, we got stuck early enough in the boot process that we can't do anything about
+			// it. Display a generic error message until we handle error conditions better.
+			ctxC()
+			log.Printf("                  ########################")
+			log.Printf("                  # GURU MEDIATION ERROR #")
+			log.Printf("                  ########################")
+			log.Printf("")
+			log.Printf("Smalltown encountered an uncorrectable error and must be restarted.")
+			log.Printf("(Error condition: init trapdoor closed)")
+			log.Printf("")
+			select {}
+
+		case sig := <-signalChannel:
+			switch sig {
+			case unix.SIGCHLD:
+				var status unix.WaitStatus
+				var rusage unix.Rusage
+				for {
+					res, err := unix.Wait4(-1, &status, unix.WNOHANG, &rusage)
+					if err != nil && err != unix.ECHILD {
+						logger.Error("Failed to wait on orphaned child", zap.Error(err))
+						break
 					}
-				case unix.SIGURG:
-					// Go 1.14 introduced asynchronous preemption, which uses SIGURG.
-					// In order not to break backwards compatibility in the unlikely case
-					// of an application actually using SIGURG on its own, they're not filtering them.
-					// (https://github.com/golang/go/issues/37942)
-					logger.Debug("Ignoring SIGURG")
-				// TODO(lorenz): We can probably get more than just SIGCHLD as init, but I can't think
-				// of any others right now, just log them in case we hit any of them.
-				default:
-					logger.Warn("Got unexpected signal", zap.String("signal", sig.String()))
+					if res <= 0 {
+						break
+					}
 				}
+			case unix.SIGURG:
+				// Go 1.14 introduced asynchronous preemption, which uses SIGURG.
+				// In order not to break backwards compatibility in the unlikely case
+				// of an application actually using SIGURG on its own, they're not filtering them.
+				// (https://github.com/golang/go/issues/37942)
+				logger.Debug("Ignoring SIGURG")
+			// TODO(lorenz): We can probably get more than just SIGCHLD as init, but I can't think
+			// of any others right now, just log them in case we hit any of them.
+			default:
+				logger.Warn("Got unexpected signal", zap.String("signal", sig.String()))
 			}
 		}
-	})
-	select {}
+	}
 }