core/internal/cluster: add new single-node cluster code
This adds a cluster library, that consists of:
- a Node object that can be loaded from and saved into etcd,
representing a node of the cluster that can have different 'role
tags' assigned to it
- a cluster Manager, that is responsible for bringing up the local node
into a cluster (by creaating a new cluster, enrolling into or joining a
cluster)
This also gets wired into core/cmd/init, and as such completes a chunk
of The Refactor. This code should pass tests.
Test Plan: this should work! should be covered by existing e2e tests.
X-Origin-Diff: phab/D590
GitOrigin-RevId: e88022164e4353249b29fc16849a02805f15dd49
diff --git a/core/cmd/init/BUILD.bazel b/core/cmd/init/BUILD.bazel
index 368470f..2f8103f 100644
--- a/core/cmd/init/BUILD.bazel
+++ b/core/cmd/init/BUILD.bazel
@@ -4,21 +4,30 @@
name = "go_default_library",
# keep
srcs = [
+ "debug_service.go",
"main.go",
"switchroot.go",
] + select({
- "//core:debug_build": ["debug_enabled.go"],
- "//conditions:default": ["debug_disabled.go"],
+ "//core:debug_build": ["delve_enabled.go"],
+ "//conditions:default": ["delve_disabled.go"],
}),
importpath = "git.monogon.dev/source/nexantic.git/core/cmd/init",
visibility = ["//visibility:private"],
deps = [
- "//core/internal/common:go_default_library", # keep
+ "//core/internal/cluster:go_default_library",
+ "//core/internal/common:go_default_library",
"//core/internal/common/supervisor:go_default_library",
+ "//core/internal/containerd:go_default_library",
+ "//core/internal/kubernetes:go_default_library",
+ "//core/internal/kubernetes/pki:go_default_library",
+ "//core/internal/localstorage:go_default_library",
+ "//core/internal/localstorage/declarative:go_default_library",
"//core/internal/network:go_default_library",
"//core/pkg/tpm:go_default_library",
- "@de_cinfra_git_source_nexantic_git//core/internal/node:go_default_library",
- "@de_cinfra_git_source_nexantic_git//core/internal/storage:go_default_library",
+ "//core/proto/api:go_default_library",
+ "@org_golang_google_grpc//:go_default_library",
+ "@org_golang_google_grpc//codes:go_default_library",
+ "@org_golang_google_grpc//status:go_default_library",
"@org_golang_x_sys//unix:go_default_library",
"@org_uber_go_zap//:go_default_library",
],
diff --git a/core/cmd/init/debug_service.go b/core/cmd/init/debug_service.go
new file mode 100644
index 0000000..e66b6d7
--- /dev/null
+++ b/core/cmd/init/debug_service.go
@@ -0,0 +1,95 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+ "context"
+ "math"
+
+ "git.monogon.dev/source/nexantic.git/core/internal/cluster"
+ "git.monogon.dev/source/nexantic.git/core/internal/containerd"
+ "git.monogon.dev/source/nexantic.git/core/internal/kubernetes"
+ apb "git.monogon.dev/source/nexantic.git/core/proto/api"
+
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+// debugService implements the Smalltown node debug API.
+// TODO(q3k): this should probably be implemented somewhere else way once we have a better
+// supervision introspection/status API.
+type debugService struct {
+ cluster *cluster.Manager
+ kubernetes *kubernetes.Service
+ containerd *containerd.Service
+}
+
+func (s *debugService) GetDebugKubeconfig(ctx context.Context, req *apb.GetDebugKubeconfigRequest) (*apb.GetDebugKubeconfigResponse, error) {
+ return s.kubernetes.GetDebugKubeconfig(ctx, req)
+}
+
+// GetComponentLogs gets various logbuffers from binaries we call. This function just deals with the first path component,
+// delegating the rest to the service-specific handlers.
+func (s *debugService) GetComponentLogs(ctx context.Context, req *apb.GetComponentLogsRequest) (*apb.GetComponentLogsResponse, error) {
+ if len(req.ComponentPath) < 1 {
+ return nil, status.Error(codes.InvalidArgument, "component_path needs to contain at least one part")
+ }
+ linesToRead := int(req.TailLines)
+ if linesToRead == 0 {
+ linesToRead = math.MaxInt32
+ }
+ var lines []string
+ var err error
+ switch req.ComponentPath[0] {
+ case "containerd":
+ if len(req.ComponentPath) < 2 {
+ lines = s.containerd.Log.ReadLinesTruncated(linesToRead, "...")
+ } else if req.ComponentPath[1] == "runsc" {
+ lines = s.containerd.RunscLog.ReadLinesTruncated(linesToRead, "...")
+ }
+ case "kube":
+ if len(req.ComponentPath) < 2 {
+ return nil, status.Error(codes.NotFound, "Component not found")
+ }
+ lines, err = s.kubernetes.GetComponentLogs(req.ComponentPath[1], linesToRead)
+ if err != nil {
+ return nil, status.Error(codes.NotFound, "Component not found")
+ }
+ default:
+ return nil, status.Error(codes.NotFound, "component not found")
+ }
+ return &apb.GetComponentLogsResponse{Line: lines}, nil
+}
+
+// GetCondition checks for various conditions exposed by different services. Mostly intended for testing. If you need
+// to make sure something is available in an E2E test, consider adding a condition here.
+// TODO(q3k): since all conditions are now 'true' after the node lifecycle refactor, remove this call - or, start the
+// debug service earlier.
+func (s *debugService) GetCondition(ctx context.Context, req *apb.GetConditionRequest) (*apb.GetConditionResponse, error) {
+ var ok bool
+ switch req.Name {
+ case "IPAssigned":
+ ok = true
+ case "DataAvailable":
+ ok = true
+ default:
+ return nil, status.Errorf(codes.NotFound, "condition %v not found", req.Name)
+ }
+ return &apb.GetConditionResponse{
+ Ok: ok,
+ }, nil
+}
diff --git a/core/cmd/init/debug_disabled.go b/core/cmd/init/delve_disabled.go
similarity index 100%
rename from core/cmd/init/debug_disabled.go
rename to core/cmd/init/delve_disabled.go
diff --git a/core/cmd/init/debug_enabled.go b/core/cmd/init/delve_enabled.go
similarity index 100%
rename from core/cmd/init/debug_enabled.go
rename to core/cmd/init/delve_enabled.go
diff --git a/core/cmd/init/main.go b/core/cmd/init/main.go
index f5b09f8..0dc7d5e 100644
--- a/core/cmd/init/main.go
+++ b/core/cmd/init/main.go
@@ -19,23 +19,42 @@
import (
"context"
"fmt"
+ "log"
+ "net"
"os"
"os/signal"
"runtime/debug"
- "git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
- "git.monogon.dev/source/nexantic.git/core/internal/network"
- "git.monogon.dev/source/nexantic.git/core/internal/node"
- "git.monogon.dev/source/nexantic.git/core/internal/storage"
- "git.monogon.dev/source/nexantic.git/core/pkg/tpm"
-
"go.uber.org/zap"
"golang.org/x/sys/unix"
+ "google.golang.org/grpc"
+
+ "git.monogon.dev/source/nexantic.git/core/internal/cluster"
+ "git.monogon.dev/source/nexantic.git/core/internal/common"
+ "git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
+ "git.monogon.dev/source/nexantic.git/core/internal/containerd"
+ "git.monogon.dev/source/nexantic.git/core/internal/kubernetes"
+ "git.monogon.dev/source/nexantic.git/core/internal/kubernetes/pki"
+ "git.monogon.dev/source/nexantic.git/core/internal/localstorage"
+ "git.monogon.dev/source/nexantic.git/core/internal/localstorage/declarative"
+ "git.monogon.dev/source/nexantic.git/core/internal/network"
+ "git.monogon.dev/source/nexantic.git/core/pkg/tpm"
+ apb "git.monogon.dev/source/nexantic.git/core/proto/api"
)
-const (
- apiPort = 7833
- consensusPort = 7834
+var (
+ // kubernetesConfig is the static/global part of the Kubernetes service configuration. In the future, this might
+ // be configurable by loading it from the EnrolmentConfig. Fow now, it's static and same across all clusters.
+ kubernetesConfig = kubernetes.Config{
+ ServiceIPRange: net.IPNet{ // TODO(q3k): Decide if configurable / final value
+ IP: net.IP{192, 168, 188, 0},
+ Mask: net.IPMask{0xff, 0xff, 0xff, 0x00}, // /24, but Go stores as a literal mask
+ },
+ ClusterNet: net.IPNet{
+ IP: net.IP{10, 0, 0, 0},
+ Mask: net.IPMask{0xff, 0xff, 0x00, 0x00}, // /16
+ },
+ }
)
func main() {
@@ -76,70 +95,176 @@
logger.Panic("Failed to initialize TPM 2.0", zap.Error(err))
}
- storageManager, err := storage.Initialize(logger.With(zap.String("component", "storage")))
- if err != nil {
- panic(fmt.Errorf("could not initialize storage: %w", err))
- }
-
networkSvc := network.New(network.Config{})
- if err != nil {
- panic(err)
- }
// This function initializes a headless Delve if this is a debug build or does nothing if it's not
initializeDebugger(networkSvc)
- supervisor.New(context.Background(), logger, func(ctx context.Context) error {
+ // Prepare local storage.
+ root := &localstorage.Root{}
+ if err := declarative.PlaceFS(root, "/"); err != nil {
+ panic(fmt.Errorf("when placing root FS: %w", err))
+ }
+
+ // trapdoor is a channel used to signal to the init service that a very low-level, unrecoverable failure
+ // occured. This causes a GURU MEDITATION ERROR visible to the end user.
+ trapdoor := make(chan struct{})
+
+ // Make context for supervisor. We cancel it when we reach the trapdoor.
+ ctxS, ctxC := context.WithCancel(context.Background())
+
+ // Start root initialization code as a supervisor one-shot runnable. This means waiting for the network, starting
+ // the cluster manager, and then starting all services related to the node's roles.
+ // TODO(q3k): move this to a separate 'init' service.
+ supervisor.New(ctxS, logger, func(ctx context.Context) error {
+ logger := supervisor.Logger(ctx)
+
+ // Start storage and network - we need this to get anything else done.
+ if err := root.Start(ctx); err != nil {
+ return fmt.Errorf("cannot start root FS: %w", err)
+ }
if err := supervisor.Run(ctx, "network", networkSvc.Run); err != nil {
- return err
+ return fmt.Errorf("when starting network: %w", err)
}
- // TODO(q3k): convert node code to supervisor
- nodeInstance, err := node.NewSmalltownNode(logger, networkSvc, storageManager)
+ // Wait for IP address from network.
+ ip, err := networkSvc.GetIP(ctx, true)
if err != nil {
- panic(err)
+ return fmt.Errorf("when waiting for IP address: %w", err)
}
- err = nodeInstance.Start(ctx)
+ // Start cluster manager. This kicks off cluster membership machinery, which will either start
+ // a new cluster, enroll into one or join one.
+ m := cluster.NewManager(root, networkSvc)
+ if err := supervisor.Run(ctx, "enrolment", m.Run); err != nil {
+ return fmt.Errorf("when starting enrolment: %w", err)
+ }
+
+ // Wait until the cluster manager settles.
+ success := m.WaitFinished()
+ if !success {
+ close(trapdoor)
+ return fmt.Errorf("enrolment failed, aborting")
+ }
+
+ // We are now in a cluster. We can thus access our 'node' object and start all services that
+ // we should be running.
+
+ node := m.Node()
+ if err := node.ConfigureLocalHostname(&root.Etc); err != nil {
+ close(trapdoor)
+ return fmt.Errorf("failed to set local hostname: %w", err)
+ }
+
+ logger.Info("Enrolment success, continuing startup.")
+ logger.Info(fmt.Sprintf("This node (%s) has roles:", node.String()))
+ if cm := node.ConsensusMember(); cm != nil {
+ // There's no need to start anything for when we are a consensus member - the cluster
+ // manager does this for us if necessary (as creating/enrolling/joining a cluster is
+ // pretty tied into cluster lifecycle management).
+ logger.Info(fmt.Sprintf(" - etcd consensus member"))
+ }
+ if kw := node.KubernetesWorker(); kw != nil {
+ logger.Info(fmt.Sprintf(" - kubernetes worker"))
+ }
+
+ // If we're supposed to be a kubernetes worker, start kubernetes services and containerd.
+ // In the future, this might be split further into kubernetes control plane and data plane
+ // roles.
+ var containerdSvc *containerd.Service
+ var kubeSvc *kubernetes.Service
+ if kw := node.KubernetesWorker(); kw != nil {
+ logger.Info("Starting Kubernetes worker services...")
+
+ // Ensure Kubernetes PKI objects exist in etcd.
+ kpkiKV := m.ConsensusKV("cluster", "kpki")
+ kpki := pki.NewKubernetes(logger.Named("kpki"), kpkiKV)
+ if err := kpki.EnsureAll(ctx); err != nil {
+ return fmt.Errorf("failed to ensure kubernetes PKI present: %w", err)
+ }
+
+ containerdSvc = &containerd.Service{
+ EphemeralVolume: &root.Ephemeral.Containerd,
+ }
+ if err := supervisor.Run(ctx, "containerd", containerdSvc.Run); err != nil {
+ return fmt.Errorf("failed to start containerd service: %w", err)
+ }
+
+ kubernetesConfig.KPKI = kpki
+ kubernetesConfig.Root = root
+ kubernetesConfig.AdvertiseAddress = *ip
+ kubeSvc = kubernetes.New(kubernetesConfig)
+ if err := supervisor.Run(ctx, "kubernetes", kubeSvc.Run); err != nil {
+ return fmt.Errorf("failed to start kubernetes service: %w", err)
+ }
+
+ }
+
+ // Start the node debug service.
+ // TODO(q3k): this needs to be done in a smarter way once LogTree lands, and then a few things can be
+ // refactored to start this earlier, or this can be split up into a multiple gRPC service on a single listener.
+ dbg := &debugService{
+ cluster: m,
+ containerd: containerdSvc,
+ kubernetes: kubeSvc,
+ }
+ dbgSrv := grpc.NewServer()
+ apb.RegisterNodeDebugServiceServer(dbgSrv, dbg)
+ dbgLis, err := net.Listen("tcp", fmt.Sprintf(":%d", common.DebugServicePort))
if err != nil {
- panic(err)
+ return fmt.Errorf("failed to listen on debug service: %w", err)
+ }
+ if err := supervisor.Run(ctx, "debug", supervisor.GRPCServer(dbgSrv, dbgLis, false)); err != nil {
+ return fmt.Errorf("failed to start debug service: %w", err)
}
supervisor.Signal(ctx, supervisor.SignalHealthy)
+ supervisor.Signal(ctx, supervisor.SignalDone)
+ return nil
+ })
- // We're PID1, so orphaned processes get reparented to us to clean up
- for {
- select {
- case <-ctx.Done():
- return ctx.Err()
- case sig := <-signalChannel:
- switch sig {
- case unix.SIGCHLD:
- var status unix.WaitStatus
- var rusage unix.Rusage
- for {
- res, err := unix.Wait4(-1, &status, unix.WNOHANG, &rusage)
- if err != nil && err != unix.ECHILD {
- logger.Error("Failed to wait on orphaned child", zap.Error(err))
- break
- }
- if res <= 0 {
- break
- }
+ // We're PID1, so orphaned processes get reparented to us to clean up
+ for {
+ select {
+ case <-trapdoor:
+ // If the trapdoor got closed, we got stuck early enough in the boot process that we can't do anything about
+ // it. Display a generic error message until we handle error conditions better.
+ ctxC()
+ log.Printf(" ########################")
+ log.Printf(" # GURU MEDIATION ERROR #")
+ log.Printf(" ########################")
+ log.Printf("")
+ log.Printf("Smalltown encountered an uncorrectable error and must be restarted.")
+ log.Printf("(Error condition: init trapdoor closed)")
+ log.Printf("")
+ select {}
+
+ case sig := <-signalChannel:
+ switch sig {
+ case unix.SIGCHLD:
+ var status unix.WaitStatus
+ var rusage unix.Rusage
+ for {
+ res, err := unix.Wait4(-1, &status, unix.WNOHANG, &rusage)
+ if err != nil && err != unix.ECHILD {
+ logger.Error("Failed to wait on orphaned child", zap.Error(err))
+ break
}
- case unix.SIGURG:
- // Go 1.14 introduced asynchronous preemption, which uses SIGURG.
- // In order not to break backwards compatibility in the unlikely case
- // of an application actually using SIGURG on its own, they're not filtering them.
- // (https://github.com/golang/go/issues/37942)
- logger.Debug("Ignoring SIGURG")
- // TODO(lorenz): We can probably get more than just SIGCHLD as init, but I can't think
- // of any others right now, just log them in case we hit any of them.
- default:
- logger.Warn("Got unexpected signal", zap.String("signal", sig.String()))
+ if res <= 0 {
+ break
+ }
}
+ case unix.SIGURG:
+ // Go 1.14 introduced asynchronous preemption, which uses SIGURG.
+ // In order not to break backwards compatibility in the unlikely case
+ // of an application actually using SIGURG on its own, they're not filtering them.
+ // (https://github.com/golang/go/issues/37942)
+ logger.Debug("Ignoring SIGURG")
+ // TODO(lorenz): We can probably get more than just SIGCHLD as init, but I can't think
+ // of any others right now, just log them in case we hit any of them.
+ default:
+ logger.Warn("Got unexpected signal", zap.String("signal", sig.String()))
}
}
- })
- select {}
+ }
}