core/internal/cluster: implement multi-node clusters with 'golden ticket'.

As we have fully ripped out all traces of the node management service or
integrity checks, we implement a stopgap system that allows us to
continue developing multi-node clusters. This mechanism is enrolment
using 'golden tickets', which are protobuf messages that can be
generated via the debug service on an existing cluster, and set on a new
node's EnrolmentConfig to bring that enrol that node into the cluster.

As this is a stopgap measure (waiting for better cluster lifecycle
design), this is somewhat poorly implemented, with known issues:
 - odd enrolment flow that creates all certificates off-node and results
   in some code duplication in the cluster manager and node debug
   service
 - (more) assumptions that every node is both a kubernetes and etcd
   member.
 - absolutely no protection against consensus loss due to even quorum
   membership, repeated issuance of certificates
 - dependence on knowing the IP address of the new node ahead of time,
   which is not something that our test harness supports well (or that
   we want to rely on at all)

Test Plan: part of existing multi-node tests

X-Origin-Diff: phab/D591
GitOrigin-RevId: 8f099e6ef37f8d47fb2272a3a14b25ed480e377a
diff --git a/core/internal/cluster/BUILD.bazel b/core/internal/cluster/BUILD.bazel
index 99a6eac..3efdab0 100644
--- a/core/internal/cluster/BUILD.bazel
+++ b/core/internal/cluster/BUILD.bazel
@@ -9,10 +9,13 @@
     importpath = "git.monogon.dev/source/nexantic.git/core/internal/cluster",
     visibility = ["//core:__subpackages__"],
     deps = [
+        "//core/internal/common:go_default_library",
         "//core/internal/common/supervisor:go_default_library",
         "//core/internal/consensus:go_default_library",
         "//core/internal/localstorage:go_default_library",
+        "//core/internal/localstorage/declarative:go_default_library",
         "//core/internal/network:go_default_library",
+        "//core/proto/api:go_default_library",
         "//core/proto/internal:go_default_library",
         "@com_github_cenkalti_backoff_v4//:go_default_library",
         "@com_github_golang_protobuf//proto:go_default_library",
diff --git a/core/internal/cluster/manager.go b/core/internal/cluster/manager.go
index 7e5af9f..dfdab36 100644
--- a/core/internal/cluster/manager.go
+++ b/core/internal/cluster/manager.go
@@ -18,19 +18,27 @@
 
 import (
 	"context"
+	"crypto/x509"
+	"encoding/pem"
 	"fmt"
 	"io/ioutil"
 	"os"
+	"strings"
 	"sync"
 	"time"
 
+	apb "git.monogon.dev/source/nexantic.git/core/proto/api"
+
 	"github.com/cenkalti/backoff/v4"
+	"github.com/golang/protobuf/proto"
 	"go.etcd.io/etcd/clientv3"
 	"go.uber.org/zap"
 
+	"git.monogon.dev/source/nexantic.git/core/internal/common"
 	"git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
 	"git.monogon.dev/source/nexantic.git/core/internal/consensus"
 	"git.monogon.dev/source/nexantic.git/core/internal/localstorage"
+	"git.monogon.dev/source/nexantic.git/core/internal/localstorage/declarative"
 	"git.monogon.dev/source/nexantic.git/core/internal/network"
 )
 
@@ -67,6 +75,9 @@
 	// reaches a final state (Running or Failed respectively).
 	stateWaiters []chan bool
 
+	// goldenTicket is the Golden Ticket present in the enrolment config, if any.
+	goldenTicket *apb.GoldenTicket
+
 	// consensus is the spawned etcd/consensus service, if the Manager brought up a Node that should run one.
 	consensus *consensus.Service
 }
@@ -89,6 +100,9 @@
 	// StateCreatingCluster is when the Manager attempts to create a new cluster - this happens when a node is started
 	// with no EnrolmentConfig.
 	StateCreatingCluster
+	// StateCharlie is when the Manager uses the Golden Ticket debug/stopgap system to join an already
+	// existing cluster. This mechanism will be removed before the first Smalltown release.
+	StateCharlie
 	// StateRunning is when the Manager successfully got the node to be part of a cluster. stateRunningNode is valid.
 	StateRunning
 	// StateFailed is when the Manager failed to ge the node to be part of a cluster.
@@ -101,6 +115,8 @@
 		return "New"
 	case StateCreatingCluster:
 		return "CreatingCluster"
+	case StateCharlie:
+		return "Charlie"
 	case StateRunning:
 		return "Running"
 	case StateFailed:
@@ -112,8 +128,9 @@
 
 // allowedTransition describes all allowed state transitions (map[From][]To).
 var allowedTransitions = map[State][]State{
-	StateNew:             {StateCreatingCluster},
+	StateNew:             {StateCreatingCluster, StateCharlie},
 	StateCreatingCluster: {StateRunning, StateFailed},
+	StateCharlie:         {StateRunning, StateFailed},
 }
 
 // allowed returns whether a transition from a state to another state is allowed (ie. is defined in allowedTransitions).
@@ -207,6 +224,8 @@
 			err = m.stateNew(ctx)
 		case StateCreatingCluster:
 			err = m.stateCreatingCluster(ctx)
+		case StateCharlie:
+			err = m.stateCharlie(ctx)
 		default:
 			done = true
 			break
@@ -263,8 +282,21 @@
 		return nil
 	}
 
-	// Enrolment file exists, this is not yet implemented (need to enroll into or join existing cluster).
-	return fmt.Errorf("unimplemented join/enroll")
+	// Enrolment file exists, parse it.
+
+	enrolmentConfig := apb.EnrolmentConfig{}
+	if err := proto.Unmarshal(configRaw, &enrolmentConfig); err != nil {
+		return fmt.Errorf("could not unmarshal local enrolment file: %w", err)
+	}
+
+	// If no join ticket exists, we can't do anything yet.
+	if enrolmentConfig.GoldenTicket == nil {
+		return fmt.Errorf("joining a cluster without a golden ticket not yet implemented")
+	}
+
+	// Otherwise, we begin enrolling with the Golden Ticket.
+	m.next(ctx, StateCharlie)
+	return nil
 }
 
 // stateCreatingCluster is called when the Manager has decided to create a new cluster.
@@ -349,6 +381,122 @@
 	return nil
 }
 
+// stateCharlie is used to join an existing cluster via the GoldenTicket mechanism. This mechanism is temporarily
+// implemented in Smalltown in order to allow for testing multi-node clusters without a TPM attestation flow implemented.
+// The Golden Ticket contains a pregenerated node certificate, etcd certificate, and other data that any node can
+// use to join the cluster.
+// Since this flow is temporary, it has a slight impedance mismatch with methods exposed by localstorage, node, etc.,
+// and the resulting sequencing is a bit odd:
+//  - the {node,etcd} certificates/keys are loaded (this already dictates the new node name, as the node name is based
+//    off of the node public key)
+//  - local storage is initialized, a local/cluster unlock keypair is generated
+//  - etcd keys are manually saved to localstorage (vs. being generated locally by CA)
+//  - an etcd/consensus member is started, knowing that the remote member was already generated when the golden ticket
+//    was generated (vs. being created now by an RPC call, via an promote-node-to-etcd-member flow)
+//  - the node is then promoted to a consensus member and kubernetes worker, its clusterunlock key is set, and then it
+//    is saved to etcd.
+// As such, in this flow, we first create an etcd member (on goldenticket generation), and then only create a new Smalltown
+// node (when the goldenticket is used).
+func (m *Manager) stateCharlie(ctx context.Context) error {
+	t := m.goldenTicket
+	nodeCert, err := x509.ParseCertificate(t.NodeCert)
+	if err != nil {
+		return fmt.Errorf("parsing node certificate from ticket: %w", err)
+	}
+
+	supervisor.Logger(ctx).Info("Joining cluster: waiting for IP address...")
+	ip, err := m.networkService.GetIP(ctx, true)
+	if err != nil {
+		return fmt.Errorf("when getting IP address: %w", err)
+	}
+	supervisor.Logger(ctx).Info("Joining cluster: got IP address", zap.String("address", ip.String()))
+
+	supervisor.Logger(ctx).Info("Joining cluster: initializing storage...")
+	cuk, err := m.storageRoot.Data.MountNew(&m.storageRoot.ESP.LocalUnlock)
+	if err != nil {
+		return fmt.Errorf("when making new data partition: %w", err)
+	}
+	supervisor.Logger(ctx).Info("Joining cluster: storage initialized")
+	node := NewNode(cuk, *ip, *nodeCert)
+
+	// Save etcd PKI to disk.
+	for _, f := range []struct {
+		target    declarative.FilePlacement
+		data      []byte
+		blockType string
+	}{
+		{m.storageRoot.Data.Etcd.PeerPKI.Key, t.EtcdClientKey, "PRIVATE KEY"},
+		{m.storageRoot.Data.Etcd.PeerPKI.Certificate, t.EtcdClientCert, "CERTIFICATE"},
+		{m.storageRoot.Data.Etcd.PeerPKI.CACertificate, t.EtcdCaCert, "CERTIFICATE"},
+	} {
+		if err := f.target.Write(pem.EncodeToMemory(&pem.Block{Type: f.blockType, Bytes: f.data}), 0600); err != nil {
+			return fmt.Errorf("when writing etcd PKI data: %w", err)
+		}
+	}
+	if err := m.storageRoot.Data.Etcd.PeerCRL.Write(t.EtcdCrl, 0600); err != nil {
+		return fmt.Errorf("when writing etcd CRL: %w", err)
+	}
+
+	https := func(p *apb.GoldenTicket_EtcdPeer) string {
+		return fmt.Sprintf("%s=https://%s:%d", p.Name, p.Address, common.ConsensusPort)
+	}
+	var initialCluster []string
+	for _, p := range t.Peers {
+		initialCluster = append(initialCluster, https(p))
+	}
+	initialCluster = append(initialCluster, https(t.This))
+
+	supervisor.Logger(ctx).Info("Joining cluster: starting etcd join...",
+		zap.String("initial_cluster", strings.Join(initialCluster, ",")), zap.String("name", node.ID()))
+	m.consensus = consensus.New(consensus.Config{
+		Data:           &m.storageRoot.Data.Etcd,
+		Ephemeral:      &m.storageRoot.Ephemeral.Consensus,
+		Name:           node.ID(),
+		InitialCluster: strings.Join(initialCluster, ","),
+		ExternalHost:   ip.String(),
+		ListenHost:     ip.String(),
+	})
+
+	if err := supervisor.Run(ctx, "consensus", m.consensus.Run); err != nil {
+		return fmt.Errorf("when starting consensus: %w", err)
+	}
+
+	// TODO(q3k): make timeout configurable?
+	ctxT, ctxC := context.WithTimeout(ctx, 5*time.Second)
+	defer ctxC()
+
+	supervisor.Logger(ctx).Info("Joining cluster: waiting for consensus...")
+	if err := m.consensus.WaitReady(ctxT); err != nil {
+		return fmt.Errorf("consensus service failed to become ready: %w", err)
+	}
+
+	// Configure node to be a consensus member and kubernetes worker. In the future, different nodes will have
+	// different roles, but for now they're all symmetrical.
+	_, consensusName, err := m.consensus.MemberInfo(ctx)
+	if err != nil {
+		return fmt.Errorf("could not get consensus MemberInfo: %w", err)
+	}
+	if err := node.MakeConsensusMember(consensusName); err != nil {
+		return fmt.Errorf("could not make new node into consensus member: %w", err)
+	}
+	if err := node.MakeKubernetesWorker(node.ID()); err != nil {
+		return fmt.Errorf("could not make new node into kubernetes worker: %w", err)
+	}
+
+	// Save node into etcd.
+	supervisor.Logger(ctx).Info("Creating new cluster: storing first node...")
+	if err := node.Store(ctx, m.consensus.KV("cluster", "enrolment")); err != nil {
+		return fmt.Errorf("could not save new node: %w", err)
+	}
+
+	m.stateLock.Lock()
+	m.stateRunningNode = node
+	m.stateLock.Unlock()
+
+	m.next(ctx, StateRunning)
+	return nil
+}
+
 // Node returns the Node that the Manager brought into a cluster, or nil if the Manager is not Running.
 // This is safe to call from any goroutine.
 func (m *Manager) Node() *Node {