core/internal/cluster: add new single-node cluster code
This adds a cluster library, that consists of:
- a Node object that can be loaded from and saved into etcd,
representing a node of the cluster that can have different 'role
tags' assigned to it
- a cluster Manager, that is responsible for bringing up the local node
into a cluster (by creaating a new cluster, enrolling into or joining a
cluster)
This also gets wired into core/cmd/init, and as such completes a chunk
of The Refactor. This code should pass tests.
Test Plan: this should work! should be covered by existing e2e tests.
X-Origin-Diff: phab/D590
GitOrigin-RevId: e88022164e4353249b29fc16849a02805f15dd49
diff --git a/core/cmd/init/BUILD.bazel b/core/cmd/init/BUILD.bazel
index 368470f..2f8103f 100644
--- a/core/cmd/init/BUILD.bazel
+++ b/core/cmd/init/BUILD.bazel
@@ -4,21 +4,30 @@
name = "go_default_library",
# keep
srcs = [
+ "debug_service.go",
"main.go",
"switchroot.go",
] + select({
- "//core:debug_build": ["debug_enabled.go"],
- "//conditions:default": ["debug_disabled.go"],
+ "//core:debug_build": ["delve_enabled.go"],
+ "//conditions:default": ["delve_disabled.go"],
}),
importpath = "git.monogon.dev/source/nexantic.git/core/cmd/init",
visibility = ["//visibility:private"],
deps = [
- "//core/internal/common:go_default_library", # keep
+ "//core/internal/cluster:go_default_library",
+ "//core/internal/common:go_default_library",
"//core/internal/common/supervisor:go_default_library",
+ "//core/internal/containerd:go_default_library",
+ "//core/internal/kubernetes:go_default_library",
+ "//core/internal/kubernetes/pki:go_default_library",
+ "//core/internal/localstorage:go_default_library",
+ "//core/internal/localstorage/declarative:go_default_library",
"//core/internal/network:go_default_library",
"//core/pkg/tpm:go_default_library",
- "@de_cinfra_git_source_nexantic_git//core/internal/node:go_default_library",
- "@de_cinfra_git_source_nexantic_git//core/internal/storage:go_default_library",
+ "//core/proto/api:go_default_library",
+ "@org_golang_google_grpc//:go_default_library",
+ "@org_golang_google_grpc//codes:go_default_library",
+ "@org_golang_google_grpc//status:go_default_library",
"@org_golang_x_sys//unix:go_default_library",
"@org_uber_go_zap//:go_default_library",
],
diff --git a/core/cmd/init/debug_service.go b/core/cmd/init/debug_service.go
new file mode 100644
index 0000000..e66b6d7
--- /dev/null
+++ b/core/cmd/init/debug_service.go
@@ -0,0 +1,95 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+ "context"
+ "math"
+
+ "git.monogon.dev/source/nexantic.git/core/internal/cluster"
+ "git.monogon.dev/source/nexantic.git/core/internal/containerd"
+ "git.monogon.dev/source/nexantic.git/core/internal/kubernetes"
+ apb "git.monogon.dev/source/nexantic.git/core/proto/api"
+
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+// debugService implements the Smalltown node debug API.
+// TODO(q3k): this should probably be implemented somewhere else way once we have a better
+// supervision introspection/status API.
+type debugService struct {
+ cluster *cluster.Manager
+ kubernetes *kubernetes.Service
+ containerd *containerd.Service
+}
+
+func (s *debugService) GetDebugKubeconfig(ctx context.Context, req *apb.GetDebugKubeconfigRequest) (*apb.GetDebugKubeconfigResponse, error) {
+ return s.kubernetes.GetDebugKubeconfig(ctx, req)
+}
+
+// GetComponentLogs gets various logbuffers from binaries we call. This function just deals with the first path component,
+// delegating the rest to the service-specific handlers.
+func (s *debugService) GetComponentLogs(ctx context.Context, req *apb.GetComponentLogsRequest) (*apb.GetComponentLogsResponse, error) {
+ if len(req.ComponentPath) < 1 {
+ return nil, status.Error(codes.InvalidArgument, "component_path needs to contain at least one part")
+ }
+ linesToRead := int(req.TailLines)
+ if linesToRead == 0 {
+ linesToRead = math.MaxInt32
+ }
+ var lines []string
+ var err error
+ switch req.ComponentPath[0] {
+ case "containerd":
+ if len(req.ComponentPath) < 2 {
+ lines = s.containerd.Log.ReadLinesTruncated(linesToRead, "...")
+ } else if req.ComponentPath[1] == "runsc" {
+ lines = s.containerd.RunscLog.ReadLinesTruncated(linesToRead, "...")
+ }
+ case "kube":
+ if len(req.ComponentPath) < 2 {
+ return nil, status.Error(codes.NotFound, "Component not found")
+ }
+ lines, err = s.kubernetes.GetComponentLogs(req.ComponentPath[1], linesToRead)
+ if err != nil {
+ return nil, status.Error(codes.NotFound, "Component not found")
+ }
+ default:
+ return nil, status.Error(codes.NotFound, "component not found")
+ }
+ return &apb.GetComponentLogsResponse{Line: lines}, nil
+}
+
+// GetCondition checks for various conditions exposed by different services. Mostly intended for testing. If you need
+// to make sure something is available in an E2E test, consider adding a condition here.
+// TODO(q3k): since all conditions are now 'true' after the node lifecycle refactor, remove this call - or, start the
+// debug service earlier.
+func (s *debugService) GetCondition(ctx context.Context, req *apb.GetConditionRequest) (*apb.GetConditionResponse, error) {
+ var ok bool
+ switch req.Name {
+ case "IPAssigned":
+ ok = true
+ case "DataAvailable":
+ ok = true
+ default:
+ return nil, status.Errorf(codes.NotFound, "condition %v not found", req.Name)
+ }
+ return &apb.GetConditionResponse{
+ Ok: ok,
+ }, nil
+}
diff --git a/core/cmd/init/debug_disabled.go b/core/cmd/init/delve_disabled.go
similarity index 100%
rename from core/cmd/init/debug_disabled.go
rename to core/cmd/init/delve_disabled.go
diff --git a/core/cmd/init/debug_enabled.go b/core/cmd/init/delve_enabled.go
similarity index 100%
rename from core/cmd/init/debug_enabled.go
rename to core/cmd/init/delve_enabled.go
diff --git a/core/cmd/init/main.go b/core/cmd/init/main.go
index f5b09f8..0dc7d5e 100644
--- a/core/cmd/init/main.go
+++ b/core/cmd/init/main.go
@@ -19,23 +19,42 @@
import (
"context"
"fmt"
+ "log"
+ "net"
"os"
"os/signal"
"runtime/debug"
- "git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
- "git.monogon.dev/source/nexantic.git/core/internal/network"
- "git.monogon.dev/source/nexantic.git/core/internal/node"
- "git.monogon.dev/source/nexantic.git/core/internal/storage"
- "git.monogon.dev/source/nexantic.git/core/pkg/tpm"
-
"go.uber.org/zap"
"golang.org/x/sys/unix"
+ "google.golang.org/grpc"
+
+ "git.monogon.dev/source/nexantic.git/core/internal/cluster"
+ "git.monogon.dev/source/nexantic.git/core/internal/common"
+ "git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
+ "git.monogon.dev/source/nexantic.git/core/internal/containerd"
+ "git.monogon.dev/source/nexantic.git/core/internal/kubernetes"
+ "git.monogon.dev/source/nexantic.git/core/internal/kubernetes/pki"
+ "git.monogon.dev/source/nexantic.git/core/internal/localstorage"
+ "git.monogon.dev/source/nexantic.git/core/internal/localstorage/declarative"
+ "git.monogon.dev/source/nexantic.git/core/internal/network"
+ "git.monogon.dev/source/nexantic.git/core/pkg/tpm"
+ apb "git.monogon.dev/source/nexantic.git/core/proto/api"
)
-const (
- apiPort = 7833
- consensusPort = 7834
+var (
+ // kubernetesConfig is the static/global part of the Kubernetes service configuration. In the future, this might
+ // be configurable by loading it from the EnrolmentConfig. Fow now, it's static and same across all clusters.
+ kubernetesConfig = kubernetes.Config{
+ ServiceIPRange: net.IPNet{ // TODO(q3k): Decide if configurable / final value
+ IP: net.IP{192, 168, 188, 0},
+ Mask: net.IPMask{0xff, 0xff, 0xff, 0x00}, // /24, but Go stores as a literal mask
+ },
+ ClusterNet: net.IPNet{
+ IP: net.IP{10, 0, 0, 0},
+ Mask: net.IPMask{0xff, 0xff, 0x00, 0x00}, // /16
+ },
+ }
)
func main() {
@@ -76,70 +95,176 @@
logger.Panic("Failed to initialize TPM 2.0", zap.Error(err))
}
- storageManager, err := storage.Initialize(logger.With(zap.String("component", "storage")))
- if err != nil {
- panic(fmt.Errorf("could not initialize storage: %w", err))
- }
-
networkSvc := network.New(network.Config{})
- if err != nil {
- panic(err)
- }
// This function initializes a headless Delve if this is a debug build or does nothing if it's not
initializeDebugger(networkSvc)
- supervisor.New(context.Background(), logger, func(ctx context.Context) error {
+ // Prepare local storage.
+ root := &localstorage.Root{}
+ if err := declarative.PlaceFS(root, "/"); err != nil {
+ panic(fmt.Errorf("when placing root FS: %w", err))
+ }
+
+ // trapdoor is a channel used to signal to the init service that a very low-level, unrecoverable failure
+ // occured. This causes a GURU MEDITATION ERROR visible to the end user.
+ trapdoor := make(chan struct{})
+
+ // Make context for supervisor. We cancel it when we reach the trapdoor.
+ ctxS, ctxC := context.WithCancel(context.Background())
+
+ // Start root initialization code as a supervisor one-shot runnable. This means waiting for the network, starting
+ // the cluster manager, and then starting all services related to the node's roles.
+ // TODO(q3k): move this to a separate 'init' service.
+ supervisor.New(ctxS, logger, func(ctx context.Context) error {
+ logger := supervisor.Logger(ctx)
+
+ // Start storage and network - we need this to get anything else done.
+ if err := root.Start(ctx); err != nil {
+ return fmt.Errorf("cannot start root FS: %w", err)
+ }
if err := supervisor.Run(ctx, "network", networkSvc.Run); err != nil {
- return err
+ return fmt.Errorf("when starting network: %w", err)
}
- // TODO(q3k): convert node code to supervisor
- nodeInstance, err := node.NewSmalltownNode(logger, networkSvc, storageManager)
+ // Wait for IP address from network.
+ ip, err := networkSvc.GetIP(ctx, true)
if err != nil {
- panic(err)
+ return fmt.Errorf("when waiting for IP address: %w", err)
}
- err = nodeInstance.Start(ctx)
+ // Start cluster manager. This kicks off cluster membership machinery, which will either start
+ // a new cluster, enroll into one or join one.
+ m := cluster.NewManager(root, networkSvc)
+ if err := supervisor.Run(ctx, "enrolment", m.Run); err != nil {
+ return fmt.Errorf("when starting enrolment: %w", err)
+ }
+
+ // Wait until the cluster manager settles.
+ success := m.WaitFinished()
+ if !success {
+ close(trapdoor)
+ return fmt.Errorf("enrolment failed, aborting")
+ }
+
+ // We are now in a cluster. We can thus access our 'node' object and start all services that
+ // we should be running.
+
+ node := m.Node()
+ if err := node.ConfigureLocalHostname(&root.Etc); err != nil {
+ close(trapdoor)
+ return fmt.Errorf("failed to set local hostname: %w", err)
+ }
+
+ logger.Info("Enrolment success, continuing startup.")
+ logger.Info(fmt.Sprintf("This node (%s) has roles:", node.String()))
+ if cm := node.ConsensusMember(); cm != nil {
+ // There's no need to start anything for when we are a consensus member - the cluster
+ // manager does this for us if necessary (as creating/enrolling/joining a cluster is
+ // pretty tied into cluster lifecycle management).
+ logger.Info(fmt.Sprintf(" - etcd consensus member"))
+ }
+ if kw := node.KubernetesWorker(); kw != nil {
+ logger.Info(fmt.Sprintf(" - kubernetes worker"))
+ }
+
+ // If we're supposed to be a kubernetes worker, start kubernetes services and containerd.
+ // In the future, this might be split further into kubernetes control plane and data plane
+ // roles.
+ var containerdSvc *containerd.Service
+ var kubeSvc *kubernetes.Service
+ if kw := node.KubernetesWorker(); kw != nil {
+ logger.Info("Starting Kubernetes worker services...")
+
+ // Ensure Kubernetes PKI objects exist in etcd.
+ kpkiKV := m.ConsensusKV("cluster", "kpki")
+ kpki := pki.NewKubernetes(logger.Named("kpki"), kpkiKV)
+ if err := kpki.EnsureAll(ctx); err != nil {
+ return fmt.Errorf("failed to ensure kubernetes PKI present: %w", err)
+ }
+
+ containerdSvc = &containerd.Service{
+ EphemeralVolume: &root.Ephemeral.Containerd,
+ }
+ if err := supervisor.Run(ctx, "containerd", containerdSvc.Run); err != nil {
+ return fmt.Errorf("failed to start containerd service: %w", err)
+ }
+
+ kubernetesConfig.KPKI = kpki
+ kubernetesConfig.Root = root
+ kubernetesConfig.AdvertiseAddress = *ip
+ kubeSvc = kubernetes.New(kubernetesConfig)
+ if err := supervisor.Run(ctx, "kubernetes", kubeSvc.Run); err != nil {
+ return fmt.Errorf("failed to start kubernetes service: %w", err)
+ }
+
+ }
+
+ // Start the node debug service.
+ // TODO(q3k): this needs to be done in a smarter way once LogTree lands, and then a few things can be
+ // refactored to start this earlier, or this can be split up into a multiple gRPC service on a single listener.
+ dbg := &debugService{
+ cluster: m,
+ containerd: containerdSvc,
+ kubernetes: kubeSvc,
+ }
+ dbgSrv := grpc.NewServer()
+ apb.RegisterNodeDebugServiceServer(dbgSrv, dbg)
+ dbgLis, err := net.Listen("tcp", fmt.Sprintf(":%d", common.DebugServicePort))
if err != nil {
- panic(err)
+ return fmt.Errorf("failed to listen on debug service: %w", err)
+ }
+ if err := supervisor.Run(ctx, "debug", supervisor.GRPCServer(dbgSrv, dbgLis, false)); err != nil {
+ return fmt.Errorf("failed to start debug service: %w", err)
}
supervisor.Signal(ctx, supervisor.SignalHealthy)
+ supervisor.Signal(ctx, supervisor.SignalDone)
+ return nil
+ })
- // We're PID1, so orphaned processes get reparented to us to clean up
- for {
- select {
- case <-ctx.Done():
- return ctx.Err()
- case sig := <-signalChannel:
- switch sig {
- case unix.SIGCHLD:
- var status unix.WaitStatus
- var rusage unix.Rusage
- for {
- res, err := unix.Wait4(-1, &status, unix.WNOHANG, &rusage)
- if err != nil && err != unix.ECHILD {
- logger.Error("Failed to wait on orphaned child", zap.Error(err))
- break
- }
- if res <= 0 {
- break
- }
+ // We're PID1, so orphaned processes get reparented to us to clean up
+ for {
+ select {
+ case <-trapdoor:
+ // If the trapdoor got closed, we got stuck early enough in the boot process that we can't do anything about
+ // it. Display a generic error message until we handle error conditions better.
+ ctxC()
+ log.Printf(" ########################")
+ log.Printf(" # GURU MEDIATION ERROR #")
+ log.Printf(" ########################")
+ log.Printf("")
+ log.Printf("Smalltown encountered an uncorrectable error and must be restarted.")
+ log.Printf("(Error condition: init trapdoor closed)")
+ log.Printf("")
+ select {}
+
+ case sig := <-signalChannel:
+ switch sig {
+ case unix.SIGCHLD:
+ var status unix.WaitStatus
+ var rusage unix.Rusage
+ for {
+ res, err := unix.Wait4(-1, &status, unix.WNOHANG, &rusage)
+ if err != nil && err != unix.ECHILD {
+ logger.Error("Failed to wait on orphaned child", zap.Error(err))
+ break
}
- case unix.SIGURG:
- // Go 1.14 introduced asynchronous preemption, which uses SIGURG.
- // In order not to break backwards compatibility in the unlikely case
- // of an application actually using SIGURG on its own, they're not filtering them.
- // (https://github.com/golang/go/issues/37942)
- logger.Debug("Ignoring SIGURG")
- // TODO(lorenz): We can probably get more than just SIGCHLD as init, but I can't think
- // of any others right now, just log them in case we hit any of them.
- default:
- logger.Warn("Got unexpected signal", zap.String("signal", sig.String()))
+ if res <= 0 {
+ break
+ }
}
+ case unix.SIGURG:
+ // Go 1.14 introduced asynchronous preemption, which uses SIGURG.
+ // In order not to break backwards compatibility in the unlikely case
+ // of an application actually using SIGURG on its own, they're not filtering them.
+ // (https://github.com/golang/go/issues/37942)
+ logger.Debug("Ignoring SIGURG")
+ // TODO(lorenz): We can probably get more than just SIGCHLD as init, but I can't think
+ // of any others right now, just log them in case we hit any of them.
+ default:
+ logger.Warn("Got unexpected signal", zap.String("signal", sig.String()))
}
}
- })
- select {}
+ }
}
diff --git a/core/cmd/launch-multi2/BUILD.bazel b/core/cmd/launch-multi2/BUILD.bazel
index 87f4c88..867838a 100644
--- a/core/cmd/launch-multi2/BUILD.bazel
+++ b/core/cmd/launch-multi2/BUILD.bazel
@@ -8,7 +8,6 @@
deps = [
"//core/internal/common:go_default_library",
"//core/internal/launch:go_default_library",
- "//core/proto/api:go_default_library",
"@com_github_grpc_ecosystem_go_grpc_middleware//retry:go_default_library",
"@org_golang_google_grpc//:go_default_library",
],
diff --git a/core/cmd/launch-multi2/main.go b/core/cmd/launch-multi2/main.go
index f8c9035..2a38cef 100644
--- a/core/cmd/launch-multi2/main.go
+++ b/core/cmd/launch-multi2/main.go
@@ -29,7 +29,6 @@
"git.monogon.dev/source/nexantic.git/core/internal/common"
"git.monogon.dev/source/nexantic.git/core/internal/launch"
- apb "git.monogon.dev/source/nexantic.git/core/proto/api"
)
func main() {
@@ -73,16 +72,9 @@
panic(err)
}
defer conn.Close()
- cmc := api.NewClusterManagementClient(conn)
- res, err := cmc.NewEnrolmentConfig(context.Background(), &api.NewEnrolmentConfigRequest{
- Name: "test",
- }, grpcretry.WithMax(10))
- if err != nil {
- log.Fatalf("Failed to get enrolment config: %v", err)
- }
- if err := launch.Launch(ctx, launch.Options{ConnectToSocket: vm1, EnrolmentConfig: res.EnrolmentConfig, SerialPort: os.Stdout}); err != nil {
- log.Fatalf("Failed to launch vm1: %v", err)
- }
+ // TODO(D591): this gets implemented there.
+ _ = vm1
+ panic("unimplemented")
}()
if err := launch.RunMicroVM(ctx, &launch.MicroVMOptions{
SerialPort: os.Stdout,
diff --git a/core/internal/cluster/BUILD.bazel b/core/internal/cluster/BUILD.bazel
new file mode 100644
index 0000000..99a6eac
--- /dev/null
+++ b/core/internal/cluster/BUILD.bazel
@@ -0,0 +1,23 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library")
+
+go_library(
+ name = "go_default_library",
+ srcs = [
+ "manager.go",
+ "node.go",
+ ],
+ importpath = "git.monogon.dev/source/nexantic.git/core/internal/cluster",
+ visibility = ["//core:__subpackages__"],
+ deps = [
+ "//core/internal/common/supervisor:go_default_library",
+ "//core/internal/consensus:go_default_library",
+ "//core/internal/localstorage:go_default_library",
+ "//core/internal/network:go_default_library",
+ "//core/proto/internal:go_default_library",
+ "@com_github_cenkalti_backoff_v4//:go_default_library",
+ "@com_github_golang_protobuf//proto:go_default_library",
+ "@io_etcd_go_etcd//clientv3:go_default_library",
+ "@org_golang_x_sys//unix:go_default_library",
+ "@org_uber_go_zap//:go_default_library",
+ ],
+)
diff --git a/core/internal/cluster/manager.go b/core/internal/cluster/manager.go
new file mode 100644
index 0000000..7e5af9f
--- /dev/null
+++ b/core/internal/cluster/manager.go
@@ -0,0 +1,409 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package cluster
+
+import (
+ "context"
+ "fmt"
+ "io/ioutil"
+ "os"
+ "sync"
+ "time"
+
+ "github.com/cenkalti/backoff/v4"
+ "go.etcd.io/etcd/clientv3"
+ "go.uber.org/zap"
+
+ "git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
+ "git.monogon.dev/source/nexantic.git/core/internal/consensus"
+ "git.monogon.dev/source/nexantic.git/core/internal/localstorage"
+ "git.monogon.dev/source/nexantic.git/core/internal/network"
+)
+
+// Manager is a finite state machine that joins this node (ie., Smalltown instance running on a virtual/physical machine)
+// into a Smalltown cluster (ie. group of nodes that act as a single control plane for Smalltown services). It does that
+// by bringing up all required operating-system level components, including mounting the local filesystem, bringing up
+// a consensus (etcd) server/client, ...
+//
+// The Manager runs as a single-shot Runnable. It will attempt to progress its state from the initial state (New) to
+// either Running (meaning that the node is now part of a cluster), or Failed (meaning that the node couldn't become
+// part of a cluster). It is not restartable, as it mutates quite a bit of implicit operating-system level state (like
+// filesystem mounts). As such, it's difficult to recover reliably from failures, and since these failures indicate
+// some high issues with the cluster configuration/state, a failure requires a full kernel reboot to retry (or fix/
+// reconfigure the node).
+//
+// Currently, the Manager only supports one flow for bringing up a Node: by creating a new cluster. As such, it's
+// missing the following flows:
+// - joining a new node into an already running cluster
+// - restarting a node into an already existing cluster
+// - restarting a node into an already running cluster (ie. full reboot of whole cluster)
+//
+type Manager struct {
+ storageRoot *localstorage.Root
+ networkService *network.Service
+
+ // stateLock locks all state* variables.
+ stateLock sync.RWMutex
+ // state is the FSM state of the Manager.
+ state State
+ // stateRunningNode is the Node that this Manager got from joining a cluster. It's only valid if the Manager is
+ // Running.
+ stateRunningNode *Node
+ // stateWaiters is a list of channels that wish to be notified (by sending true or false) for when the Manager
+ // reaches a final state (Running or Failed respectively).
+ stateWaiters []chan bool
+
+ // consensus is the spawned etcd/consensus service, if the Manager brought up a Node that should run one.
+ consensus *consensus.Service
+}
+
+// NewManager creates a new cluster Manager. The given localstorage Root must be places, but not yet started (and will
+// be started as the Manager makes progress). The given network Service must already be running.
+func NewManager(storageRoot *localstorage.Root, networkService *network.Service) *Manager {
+ return &Manager{
+ storageRoot: storageRoot,
+ networkService: networkService,
+ }
+}
+
+// State is the state of the Manager finite state machine.
+type State int
+
+const (
+ // StateNew is the initial state of the Manager. It decides how to go about joining or creating a cluster.
+ StateNew State = iota
+ // StateCreatingCluster is when the Manager attempts to create a new cluster - this happens when a node is started
+ // with no EnrolmentConfig.
+ StateCreatingCluster
+ // StateRunning is when the Manager successfully got the node to be part of a cluster. stateRunningNode is valid.
+ StateRunning
+ // StateFailed is when the Manager failed to ge the node to be part of a cluster.
+ StateFailed
+)
+
+func (s State) String() string {
+ switch s {
+ case StateNew:
+ return "New"
+ case StateCreatingCluster:
+ return "CreatingCluster"
+ case StateRunning:
+ return "Running"
+ case StateFailed:
+ return "Failed"
+ default:
+ return "UNKNOWN"
+ }
+}
+
+// allowedTransition describes all allowed state transitions (map[From][]To).
+var allowedTransitions = map[State][]State{
+ StateNew: {StateCreatingCluster},
+ StateCreatingCluster: {StateRunning, StateFailed},
+}
+
+// allowed returns whether a transition from a state to another state is allowed (ie. is defined in allowedTransitions).
+func (m *Manager) allowed(from, to State) bool {
+ for _, allowed := range allowedTransitions[from] {
+ if to == allowed {
+ return true
+ }
+ }
+ return false
+}
+
+// next moves the Manager finite state machine from its current state to `n`, or to Failed if the transition is not
+// allowed.
+func (m *Manager) next(ctx context.Context, n State) {
+ m.stateLock.Lock()
+ defer m.stateLock.Unlock()
+
+ if !m.allowed(m.state, n) {
+ supervisor.Logger(ctx).Error("Attempted invalid enrolment state transition, failing enrolment",
+ zap.String("from", m.state.String()), zap.String("to", m.state.String()))
+ m.state = StateFailed
+ return
+ }
+
+ supervisor.Logger(ctx).Info("Enrolment state change",
+ zap.String("from", m.state.String()), zap.String("to", n.String()))
+
+ m.state = n
+}
+
+// State returns the state of the Manager. It's safe to call this from any goroutine.
+func (m *Manager) State() State {
+ m.stateLock.RLock()
+ defer m.stateLock.RUnlock()
+ return m.state
+}
+
+// WaitFinished waits until the Manager FSM reaches Running or Failed, and returns true if the FSM is Running. It's
+// safe to call this from any goroutine.
+func (m *Manager) WaitFinished() (success bool) {
+ m.stateLock.Lock()
+ switch m.state {
+ case StateFailed:
+ m.stateLock.Unlock()
+ return false
+ case StateRunning:
+ m.stateLock.Unlock()
+ return true
+ }
+
+ C := make(chan bool)
+ m.stateWaiters = append(m.stateWaiters, C)
+ m.stateLock.Unlock()
+ return <-C
+}
+
+// wakeWaiters wakes any WaitFinished waiters and lets them know about the current state of the Manager.
+// The stateLock must already been taken, and the state must have been set in the same critical section (otherwise
+// this can cause a race condition).
+func (m *Manager) wakeWaiters() {
+ state := m.state
+ waiters := m.stateWaiters
+ m.stateWaiters = nil
+
+ for _, waiter := range waiters {
+ go func(w chan bool) {
+ w <- state == StateRunning
+ }(waiter)
+ }
+}
+
+// Run is the runnable of the Manager, to be started using the Supervisor. It is one-shot, and should not be restarted.
+func (m *Manager) Run(ctx context.Context) error {
+ if state := m.State(); state != StateNew {
+ supervisor.Logger(ctx).Error("Manager started with non-New state, failing", zap.String("state", state.String()))
+ m.stateLock.Lock()
+ m.state = StateFailed
+ m.wakeWaiters()
+ m.stateLock.Unlock()
+ return nil
+ }
+
+ var err error
+ bo := backoff.NewExponentialBackOff()
+ for {
+ done := false
+ state := m.State()
+ switch state {
+ case StateNew:
+ err = m.stateNew(ctx)
+ case StateCreatingCluster:
+ err = m.stateCreatingCluster(ctx)
+ default:
+ done = true
+ break
+ }
+
+ if err != nil || done {
+ break
+ }
+
+ if state == m.State() && !m.allowed(state, m.State()) {
+ supervisor.Logger(ctx).Error("Enrolment got stuck, failing", zap.String("state", m.state.String()))
+ m.stateLock.Lock()
+ m.state = StateFailed
+ m.stateLock.Unlock()
+ } else {
+ bo.Reset()
+ }
+ }
+
+ m.stateLock.Lock()
+ state := m.state
+ if state != StateRunning {
+ supervisor.Logger(ctx).Error("Enrolment failed", zap.Error(err), zap.String("state", m.state.String()))
+ } else {
+ supervisor.Logger(ctx).Info("Enrolment successful!")
+ }
+ m.wakeWaiters()
+ m.stateLock.Unlock()
+
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ supervisor.Signal(ctx, supervisor.SignalDone)
+ return nil
+}
+
+// stateNew is called when a Manager is New. It makes the decision on how to join this node into a cluster.
+func (m *Manager) stateNew(ctx context.Context) error {
+ supervisor.Logger(ctx).Info("Starting enrolment process...")
+
+ // Check for presence of EnrolmentConfig on ESP or in qemu firmware variables.
+ var configRaw []byte
+ configRaw, err := m.storageRoot.ESP.Enrolment.Read()
+ if err != nil && !os.IsNotExist(err) {
+ return fmt.Errorf("could not read local enrolment file: %w", err)
+ } else if err != nil {
+ configRaw, err = ioutil.ReadFile("/sys/firmware/qemu_fw_cfg/by_name/com.nexantic.smalltown/enrolment.pb/raw")
+ if err != nil && !os.IsNotExist(err) {
+ return fmt.Errorf("could not read firmware enrolment file: %w", err)
+ }
+ }
+
+ // If no enrolment file exists, we create a new cluster.
+ if configRaw == nil {
+ m.next(ctx, StateCreatingCluster)
+ return nil
+ }
+
+ // Enrolment file exists, this is not yet implemented (need to enroll into or join existing cluster).
+ return fmt.Errorf("unimplemented join/enroll")
+}
+
+// stateCreatingCluster is called when the Manager has decided to create a new cluster.
+//
+// The process to create a new cluster is as follows:
+// - wait for IP address
+// - initialize new data partition, by generating local and cluster unlock keys (the local unlock key is saved to
+// the ESP, while the cluster unlock key is returned)
+// - create a new node certificate and Node (with new given cluster unlock key)
+// - start up a new etcd cluster, with this node being the only member
+// - save the new Node to the new etcd cluster (thereby saving the node's cluster unlock key to etcd)
+func (m *Manager) stateCreatingCluster(ctx context.Context) error {
+ logger := supervisor.Logger(ctx)
+ logger.Info("Creating new cluster: waiting for IP address...")
+ ip, err := m.networkService.GetIP(ctx, true)
+ if err != nil {
+ return fmt.Errorf("when getting IP address: %w", err)
+ }
+ logger.Info("Creating new cluster: got IP address", zap.String("address", ip.String()))
+
+ logger.Info("Creating new cluster: initializing storage...")
+ cuk, err := m.storageRoot.Data.MountNew(&m.storageRoot.ESP.LocalUnlock)
+ if err != nil {
+ return fmt.Errorf("when making new data partition: %w", err)
+ }
+ logger.Info("Creating new cluster: storage initialized")
+
+ // Create certificate for node.
+ cert, err := m.storageRoot.Data.Node.EnsureSelfSigned(localstorage.CertificateForNode)
+ if err != nil {
+ return fmt.Errorf("failed to create new node certificate: %w", err)
+ }
+
+ node := NewNode(cuk, *ip, *cert.Leaf)
+
+ m.consensus = consensus.New(consensus.Config{
+ Data: &m.storageRoot.Data.Etcd,
+ Ephemeral: &m.storageRoot.Ephemeral.Consensus,
+ NewCluster: true,
+ Name: node.ID(),
+ InitialCluster: ip.String(),
+ ExternalHost: ip.String(),
+ ListenHost: ip.String(),
+ })
+ if err := supervisor.Run(ctx, "consensus", m.consensus.Run); err != nil {
+ return fmt.Errorf("when starting consensus: %w", err)
+ }
+
+ // TODO(q3k): make timeout configurable?
+ ctxT, ctxC := context.WithTimeout(ctx, 5*time.Second)
+ defer ctxC()
+
+ supervisor.Logger(ctx).Info("Creating new cluster: waiting for consensus...")
+ if err := m.consensus.WaitReady(ctxT); err != nil {
+ return fmt.Errorf("consensus service failed to become ready: %w", err)
+ }
+
+ // Configure node to be a consensus member and kubernetes worker. In the future, different nodes will have
+ // different roles, but for now they're all symmetrical.
+ _, consensusName, err := m.consensus.MemberInfo(ctx)
+ if err != nil {
+ return fmt.Errorf("could not get consensus MemberInfo: %w", err)
+ }
+ if err := node.MakeConsensusMember(consensusName); err != nil {
+ return fmt.Errorf("could not make new node into consensus member: %w", err)
+ }
+ if err := node.MakeKubernetesWorker(node.ID()); err != nil {
+ return fmt.Errorf("could not make new node into kubernetes worker: %w", err)
+ }
+
+ // Save node into etcd.
+ supervisor.Logger(ctx).Info("Creating new cluster: storing first node...")
+ if err := node.Store(ctx, m.consensus.KV("cluster", "enrolment")); err != nil {
+ return fmt.Errorf("could not save new node: %w", err)
+ }
+
+ m.stateLock.Lock()
+ m.stateRunningNode = node
+ m.stateLock.Unlock()
+
+ m.next(ctx, StateRunning)
+ return nil
+}
+
+// Node returns the Node that the Manager brought into a cluster, or nil if the Manager is not Running.
+// This is safe to call from any goroutine.
+func (m *Manager) Node() *Node {
+ m.stateLock.Lock()
+ defer m.stateLock.Unlock()
+ if m.state != StateRunning {
+ return nil
+ }
+ return m.stateRunningNode
+}
+
+// ConsensusKV returns a namespaced etcd KV client, or nil if the Manager is not Running.
+// This is safe to call from any goroutine.
+func (m *Manager) ConsensusKV(module, space string) clientv3.KV {
+ m.stateLock.Lock()
+ defer m.stateLock.Unlock()
+ if m.state != StateRunning {
+ return nil
+ }
+ if m.stateRunningNode.ConsensusMember() == nil {
+ // TODO(q3k): in this case, we should return a client to etcd even though this
+ // node is not a member of consensus. For now, all nodes are consensus members.
+ return nil
+ }
+ return m.consensus.KV(module, space)
+}
+
+// ConsensusKVRoot returns a non-namespaced etcd KV client, or nil if the Manager is not Running.
+// This is safe to call from any goroutine.
+func (m *Manager) ConsensusKVRoot() clientv3.KV {
+ m.stateLock.Lock()
+ defer m.stateLock.Unlock()
+ if m.state != StateRunning {
+ return nil
+ }
+ if m.stateRunningNode.ConsensusMember() == nil {
+ // TODO(q3k): in this case, we should return a client to etcd even though this
+ // node is not a member of consensus. For now, all nodes are consensus members.
+ return nil
+ }
+ return m.consensus.KVRoot()
+}
+
+// ConsensusCluster returns an etcd Cluster client, or nil if the Manager is not Running.
+// This is safe to call from any goroutine.
+func (m *Manager) ConsensusCluster() clientv3.Cluster {
+ m.stateLock.Lock()
+ defer m.stateLock.Unlock()
+ if m.state != StateRunning {
+ return nil
+ }
+ if m.stateRunningNode.ConsensusMember() == nil {
+ // TODO(q3k): in this case, we should return a client to etcd even though this
+ // node is not a member of consensus. For now, all nodes are consensus members.
+ return nil
+ }
+ return m.consensus.Cluster()
+}
diff --git a/core/internal/cluster/node.go b/core/internal/cluster/node.go
new file mode 100644
index 0000000..841529d
--- /dev/null
+++ b/core/internal/cluster/node.go
@@ -0,0 +1,219 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package cluster
+
+import (
+ "context"
+ "crypto/ed25519"
+ "crypto/x509"
+ "encoding/hex"
+ "fmt"
+ "net"
+
+ "github.com/golang/protobuf/proto"
+ "go.etcd.io/etcd/clientv3"
+ "golang.org/x/sys/unix"
+
+ "git.monogon.dev/source/nexantic.git/core/internal/localstorage"
+ ipb "git.monogon.dev/source/nexantic.git/core/proto/internal"
+)
+
+// Node is a Smalltown cluster member. A node is a virtual or physical machine running Smalltown. This object represents a
+// node only as part of a Cluster - ie., this object will never be available outside of //core/internal/cluster if the
+// Node is not part of a Cluster.
+// Nodes are inherently tied to their long term storage, which is etcd. As such, methods on this object relate heavily
+// to the Node's expected lifecycle on etcd.
+type Node struct {
+ // clusterUnlockKey is half of the unlock key required to mount the node's data partition. It's stored in etcd, and
+ // will only be provided to the Node if it can prove its identity via an integrity mechanism (ie. via TPM), or when
+ // the Node was just created (as the key is generated locally by localstorage on first format/mount).
+ // The other part of the unlock key is the LocalUnlockKey that's present on the node's ESP partition.
+ clusterUnlockKey []byte
+ // certificate is the node's TLS certificate, used to authenticate Smalltown gRPC calls/services (but not
+ // consensus/etcd). The certificate for a node is permanent (and never expires). It's self-signed by the node on
+ // startup, and contains the node's IP address in its SAN. Callers/services should check directly against the
+ // expected certificate, and not against a CA.
+ certificate x509.Certificate
+ // address is the management IP address of the node. The management IP address of a node is permanent.
+ address net.IP
+
+ // A Node can have multiple Roles. Each Role is represented by the presence of NodeRole* structures in this
+ // structure, with a nil pointer representing the lack of a role.
+
+ consensusMember *NodeRoleConsensusMember
+ kubernetesWorker *NodeRoleKubernetesWorker
+}
+
+// NewNode creates a new Node. This is only called when a New node is supposed to be created as part of a cluster,
+// otherwise it should be loaded from Etcd.
+func NewNode(cuk []byte, address net.IP, certificate x509.Certificate) *Node {
+ if certificate.Raw == nil {
+ panic("new node must contain raw certificate")
+ }
+ return &Node{
+ clusterUnlockKey: cuk,
+ certificate: certificate,
+ address: address,
+ }
+}
+
+// NodeRoleConsensusMember defines that the Node is a consensus (etcd) cluster member.
+type NodeRoleConsensusMember struct {
+ // etcdMember is the name of the node in Kubernetes. This is for now usually the same as the ID() of the Node.
+ etcdMemberName string
+}
+
+// NodeRoleKubernetesWorker defines that the Node should be running the Kubernetes control and data plane.
+type NodeRoleKubernetesWorker struct {
+ // nodeName is the name of the node in Kubernetes. This is for now usually the same as the ID() of the Node.
+ nodeName string
+}
+
+// ID returns the name of this node, which is `smalltown-{pubkeyHash}`. This name should be the primary way to refer to
+// Smalltown nodes within a cluster, and is guaranteed to be unique by relying on cryptographic randomness.
+func (n *Node) ID() string {
+ return fmt.Sprintf("smalltown-%s", n.IDBare())
+}
+
+// IDBare returns the `{pubkeyHash}` part of the node ID.
+func (n Node) IDBare() string {
+ pubKey, ok := n.certificate.PublicKey.(ed25519.PublicKey)
+ if !ok {
+ panic("node has non-ed25519 public key")
+ }
+ return hex.EncodeToString(pubKey[:16])
+}
+
+func (n *Node) String() string {
+ return n.ID()
+}
+
+// ConsensusMember returns a copy of the NodeRoleConsensusMember struct if the Node is a consensus member, otherwise
+// nil.
+func (n *Node) ConsensusMember() *NodeRoleConsensusMember {
+ if n.consensusMember == nil {
+ return nil
+ }
+ cm := *n.consensusMember
+ return &cm
+}
+
+// KubernetesWorker returns a copy of the NodeRoleKubernetesWorker struct if the Node is a kubernetes worker, otherwise
+// nil.
+func (n *Node) KubernetesWorker() *NodeRoleKubernetesWorker {
+ if n.kubernetesWorker == nil {
+ return nil
+ }
+ kw := *n.kubernetesWorker
+ return &kw
+}
+
+// etcdPath builds the etcd path in which this node's protobuf-serialized state is stored in etcd.
+func (n *Node) etcdPath() string {
+ return fmt.Sprintf("/nodes/%s", n.ID())
+}
+
+// proto serializes the Node object into protobuf, to be used for saving to etcd.
+func (n *Node) proto() *ipb.Node {
+ msg := &ipb.Node{
+ Certificate: n.certificate.Raw,
+ ClusterUnlockKey: n.clusterUnlockKey,
+ Address: n.address.String(),
+ Roles: &ipb.Node_Roles{},
+ }
+ if n.consensusMember != nil {
+ msg.Roles.ConsensusMember = &ipb.Node_Roles_ConsensusMember{
+ EtcdMemberName: n.consensusMember.etcdMemberName,
+ }
+ }
+ if n.kubernetesWorker != nil {
+ msg.Roles.KubernetesWorker = &ipb.Node_Roles_KubernetesWorker{
+ NodeName: n.kubernetesWorker.nodeName,
+ }
+ }
+ return msg
+}
+
+// Store saves the Node into etcd. This should be called only once per Node (ie. when the Node has been created).
+func (n *Node) Store(ctx context.Context, kv clientv3.KV) error {
+ // Currently the only flow to store a node to etcd is a write-once flow: once a node is created, it cannot be
+ // deleted or updated. In the future, flows to change cluster node roles might be introduced (ie. to promote nodes
+ // to consensus members, etc).
+ key := n.etcdPath()
+ msg := n.proto()
+ nodeRaw, err := proto.Marshal(msg)
+ if err != nil {
+ return fmt.Errorf("failed to marshal node: %w", err)
+ }
+
+ res, err := kv.Txn(ctx).If(
+ clientv3.Compare(clientv3.CreateRevision(key), "=", 0),
+ ).Then(
+ clientv3.OpPut(key, string(nodeRaw)),
+ ).Commit()
+ if err != nil {
+ return fmt.Errorf("failed to store node: %w", err)
+ }
+
+ if !res.Succeeded {
+ return fmt.Errorf("attempted to re-register node (unsupported flow)")
+ }
+ return nil
+}
+
+// MakeConsensusMember turns the node into a consensus member with a given name. This only configures internal fields,
+// and does not actually start any services.
+func (n *Node) MakeConsensusMember(etcdMemberName string) error {
+ if n.consensusMember != nil {
+ return fmt.Errorf("node already is consensus member")
+ }
+ n.consensusMember = &NodeRoleConsensusMember{
+ etcdMemberName: etcdMemberName,
+ }
+ return nil
+}
+
+// MakeKubernetesWorker turns the node into a kubernetes worker with a given name. This only configures internal fields,
+// and does not actually start any services.
+func (n *Node) MakeKubernetesWorker(name string) error {
+ if n.kubernetesWorker != nil {
+ return fmt.Errorf("node is already kubernetes worker")
+ }
+ n.kubernetesWorker = &NodeRoleKubernetesWorker{
+ nodeName: name,
+ }
+ return nil
+}
+
+func (n *Node) Address() net.IP {
+ return n.address
+}
+
+// ConfigureLocalHostname uses the node's ID as a hostname, and sets the current hostname, and local files like hosts
+// and machine-id accordingly.
+func (n *Node) ConfigureLocalHostname(etc *localstorage.EtcDirectory) error {
+ if err := unix.Sethostname([]byte(n.ID())); err != nil {
+ return fmt.Errorf("failed to set runtime hostname: %w", err)
+ }
+ if err := etc.Hosts.Write([]byte(fmt.Sprintf("%s %s", "127.0.0.1", n.ID())), 0644); err != nil {
+ return fmt.Errorf("failed to write /etc/hosts: %w", err)
+ }
+ if err := etc.MachineID.Write([]byte(n.IDBare()), 0644); err != nil {
+ return fmt.Errorf("failed to write /etc/machine-id: %w", err)
+ }
+ return nil
+}
diff --git a/core/internal/kubernetes/pki/certificate.go b/core/internal/kubernetes/pki/certificate.go
index e0dea0d..6bd50f9 100644
--- a/core/internal/kubernetes/pki/certificate.go
+++ b/core/internal/kubernetes/pki/certificate.go
@@ -106,6 +106,12 @@
// ensure returns a DER-encoded x509 certificate and internally encoded bare ed25519 key for a given Certificate,
// in memory (if volatile), loading it from etcd, or creating and saving it on etcd if needed.
+// This function is safe to call in parallel from multiple etcd clients (including across machines), but it will error
+// in case a concurrent certificate generation happens. These errors are, however, safe to retry - as long as all the
+// certificate creators (ie., Smalltown nodes) run the same version of this code.
+// TODO(q3k): in the future, this should be handled better - especially as we introduce new certificates, or worse,
+// change the issuance chain. As a stopgap measure, an explicit per-certificate or even global lock can be implemented.
+// And, even before that, we can handle concurrency errors in a smarter way.
func (c *Certificate) ensure(ctx context.Context, kv clientv3.KV) (cert, key []byte, err error) {
if c.name == "" {
// Volatile certificate - generate.
@@ -149,14 +155,21 @@
return
}
- // Save to etcd in transaction. This ensures that no partial writes happen.
- _, err = kv.Txn(ctx).
+ // Save to etcd in transaction. This ensures that no partial writes happen, and that we haven't been raced to the
+ // save.
+ res, err := kv.Txn(ctx).
+ If(
+ clientv3.Compare(clientv3.CreateRevision(certPath), "=", 0),
+ clientv3.Compare(clientv3.CreateRevision(keyPath), "=", 0),
+ ).
Then(
clientv3.OpPut(certPath, string(cert)),
clientv3.OpPut(keyPath, string(key)),
).Commit()
if err != nil {
err = fmt.Errorf("failed to write newly issued certificate: %w", err)
+ } else if !res.Succeeded {
+ err = fmt.Errorf("certificate issuance transaction failed: concurrent write")
}
return
diff --git a/core/tests/e2e/main_test.go b/core/tests/e2e/main_test.go
index ae14a9a..224aa15 100644
--- a/core/tests/e2e/main_test.go
+++ b/core/tests/e2e/main_test.go
@@ -47,7 +47,7 @@
globalTestTimeout = 600 * time.Second
// Timeouts for individual end-to-end tests of different sizes.
- smallTestTimeout = 30 * time.Second
+ smallTestTimeout = 60 * time.Second
largeTestTimeout = 120 * time.Second
)