core -> metropolis
Smalltown is now called Metropolis!
This is the first commit in a series of cleanup commits that prepare us
for an open source release. This one just some Bazel packages around to
follow a stricter directory layout.
All of Metropolis now lives in `//metropolis`.
All of Metropolis Node code now lives in `//metropolis/node`.
All of the main /init now lives in `//m/n/core`.
All of the Kubernetes functionality/glue now lives in `//m/n/kubernetes`.
Next steps:
- hunt down all references to Smalltown and replace them appropriately
- narrow down visibility rules
- document new code organization
- move `//build/toolchain` to `//monogon/build/toolchain`
- do another cleanup pass between `//golibs` and
`//monogon/node/{core,common}`.
- remove `//delta` and `//anubis`
Fixes T799.
Test Plan: Just a very large refactor. CI should help us out here.
Bug: T799
X-Origin-Diff: phab/D667
GitOrigin-RevId: 6029b8d4edc42325d50042596b639e8b122d0ded
diff --git a/metropolis/node/core/cluster/manager.go b/metropolis/node/core/cluster/manager.go
new file mode 100644
index 0000000..6bb87f4
--- /dev/null
+++ b/metropolis/node/core/cluster/manager.go
@@ -0,0 +1,555 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package cluster
+
+import (
+ "context"
+ "crypto/x509"
+ "encoding/pem"
+ "fmt"
+ "io/ioutil"
+ "os"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/cenkalti/backoff/v4"
+ "github.com/golang/protobuf/proto"
+ "go.etcd.io/etcd/clientv3"
+
+ common "git.monogon.dev/source/nexantic.git/metropolis/node"
+ "git.monogon.dev/source/nexantic.git/metropolis/node/common/supervisor"
+ "git.monogon.dev/source/nexantic.git/metropolis/node/core/consensus"
+ "git.monogon.dev/source/nexantic.git/metropolis/node/core/localstorage"
+ "git.monogon.dev/source/nexantic.git/metropolis/node/core/localstorage/declarative"
+ "git.monogon.dev/source/nexantic.git/metropolis/node/core/network"
+ apb "git.monogon.dev/source/nexantic.git/metropolis/proto/api"
+)
+
+// Manager is a finite state machine that joins this node (ie., Smalltown instance running on a virtual/physical machine)
+// into a Smalltown cluster (ie. group of nodes that act as a single control plane for Smalltown services). It does that
+// by bringing up all required operating-system level components, including mounting the local filesystem, bringing up
+// a consensus (etcd) server/client, ...
+//
+// The Manager runs as a single-shot Runnable. It will attempt to progress its state from the initial state (New) to
+// either Running (meaning that the node is now part of a cluster), or Failed (meaning that the node couldn't become
+// part of a cluster). It is not restartable, as it mutates quite a bit of implicit operating-system level state (like
+// filesystem mounts). As such, it's difficult to recover reliably from failures, and since these failures indicate
+// some high issues with the cluster configuration/state, a failure requires a full kernel reboot to retry (or fix/
+// reconfigure the node).
+//
+// Currently, the Manager only supports one flow for bringing up a Node: by creating a new cluster. As such, it's
+// missing the following flows:
+// - joining a new node into an already running cluster
+// - restarting a node into an already existing cluster
+// - restarting a node into an already running cluster (ie. full reboot of whole cluster)
+//
+type Manager struct {
+ storageRoot *localstorage.Root
+ networkService *network.Service
+
+ // stateLock locks all state* variables.
+ stateLock sync.RWMutex
+ // state is the FSM state of the Manager.
+ state State
+ // stateRunningNode is the Node that this Manager got from joining a cluster. It's only valid if the Manager is
+ // Running.
+ stateRunningNode *Node
+ // stateWaiters is a list of channels that wish to be notified (by sending true or false) for when the Manager
+ // reaches a final state (Running or Failed respectively).
+ stateWaiters []chan bool
+
+ // goldenTicket is the Golden Ticket present in the enrolment config, if any.
+ goldenTicket *apb.GoldenTicket
+
+ // consensus is the spawned etcd/consensus service, if the Manager brought up a Node that should run one.
+ consensus *consensus.Service
+}
+
+// NewManager creates a new cluster Manager. The given localstorage Root must be places, but not yet started (and will
+// be started as the Manager makes progress). The given network Service must already be running.
+func NewManager(storageRoot *localstorage.Root, networkService *network.Service) *Manager {
+ return &Manager{
+ storageRoot: storageRoot,
+ networkService: networkService,
+ }
+}
+
+// State is the state of the Manager finite state machine.
+type State int
+
+const (
+ // StateNew is the initial state of the Manager. It decides how to go about joining or creating a cluster.
+ StateNew State = iota
+ // StateCreatingCluster is when the Manager attempts to create a new cluster - this happens when a node is started
+ // with no EnrolmentConfig.
+ StateCreatingCluster
+ // StateCharlie is when the Manager uses the Golden Ticket debug/stopgap system to join an already
+ // existing cluster. This mechanism will be removed before the first Smalltown release.
+ StateCharlie
+ // StateRunning is when the Manager successfully got the node to be part of a cluster. stateRunningNode is valid.
+ StateRunning
+ // StateFailed is when the Manager failed to ge the node to be part of a cluster.
+ StateFailed
+)
+
+func (s State) String() string {
+ switch s {
+ case StateNew:
+ return "New"
+ case StateCreatingCluster:
+ return "CreatingCluster"
+ case StateCharlie:
+ return "Charlie"
+ case StateRunning:
+ return "Running"
+ case StateFailed:
+ return "Failed"
+ default:
+ return "UNKNOWN"
+ }
+}
+
+// allowedTransition describes all allowed state transitions (map[From][]To).
+var allowedTransitions = map[State][]State{
+ StateNew: {StateCreatingCluster, StateCharlie},
+ StateCreatingCluster: {StateRunning, StateFailed},
+ StateCharlie: {StateRunning, StateFailed},
+}
+
+// allowed returns whether a transition from a state to another state is allowed (ie. is defined in allowedTransitions).
+func (m *Manager) allowed(from, to State) bool {
+ for _, allowed := range allowedTransitions[from] {
+ if to == allowed {
+ return true
+ }
+ }
+ return false
+}
+
+// next moves the Manager finite state machine from its current state to `n`, or to Failed if the transition is not
+// allowed.
+func (m *Manager) next(ctx context.Context, n State) {
+ m.stateLock.Lock()
+ defer m.stateLock.Unlock()
+
+ if !m.allowed(m.state, n) {
+ supervisor.Logger(ctx).Errorf("Attempted invalid enrolment state transition, failing enrolment; from: %s, to: %s",
+ m.state.String(), n.String())
+ m.state = StateFailed
+ return
+ }
+
+ supervisor.Logger(ctx).Infof("Enrolment state change; from: %s, to: %s", m.state.String(), n.String())
+
+ m.state = n
+}
+
+// State returns the state of the Manager. It's safe to call this from any goroutine.
+func (m *Manager) State() State {
+ m.stateLock.RLock()
+ defer m.stateLock.RUnlock()
+ return m.state
+}
+
+// WaitFinished waits until the Manager FSM reaches Running or Failed, and returns true if the FSM is Running. It's
+// safe to call this from any goroutine.
+func (m *Manager) WaitFinished() (success bool) {
+ m.stateLock.Lock()
+ switch m.state {
+ case StateFailed:
+ m.stateLock.Unlock()
+ return false
+ case StateRunning:
+ m.stateLock.Unlock()
+ return true
+ }
+
+ C := make(chan bool)
+ m.stateWaiters = append(m.stateWaiters, C)
+ m.stateLock.Unlock()
+ return <-C
+}
+
+// wakeWaiters wakes any WaitFinished waiters and lets them know about the current state of the Manager.
+// The stateLock must already been taken, and the state must have been set in the same critical section (otherwise
+// this can cause a race condition).
+func (m *Manager) wakeWaiters() {
+ state := m.state
+ waiters := m.stateWaiters
+ m.stateWaiters = nil
+
+ for _, waiter := range waiters {
+ go func(w chan bool) {
+ w <- state == StateRunning
+ }(waiter)
+ }
+}
+
+// Run is the runnable of the Manager, to be started using the Supervisor. It is one-shot, and should not be restarted.
+func (m *Manager) Run(ctx context.Context) error {
+ if state := m.State(); state != StateNew {
+ supervisor.Logger(ctx).Errorf("Manager started with non-New state %s, failing", state.String())
+ m.stateLock.Lock()
+ m.state = StateFailed
+ m.wakeWaiters()
+ m.stateLock.Unlock()
+ return nil
+ }
+
+ var err error
+ bo := backoff.NewExponentialBackOff()
+ for {
+ done := false
+ state := m.State()
+ switch state {
+ case StateNew:
+ err = m.stateNew(ctx)
+ case StateCreatingCluster:
+ err = m.stateCreatingCluster(ctx)
+ case StateCharlie:
+ err = m.stateCharlie(ctx)
+ default:
+ done = true
+ break
+ }
+
+ if err != nil || done {
+ break
+ }
+
+ if state == m.State() && !m.allowed(state, m.State()) {
+ supervisor.Logger(ctx).Errorf("Enrolment got stuck at %s, failing", m.state.String())
+ m.stateLock.Lock()
+ m.state = StateFailed
+ m.stateLock.Unlock()
+ } else {
+ bo.Reset()
+ }
+ }
+
+ m.stateLock.Lock()
+ state := m.state
+ if state != StateRunning {
+ supervisor.Logger(ctx).Errorf("Enrolment failed at %s: %v", m.state.String(), err)
+ } else {
+ supervisor.Logger(ctx).Info("Enrolment successful!")
+ }
+ m.wakeWaiters()
+ m.stateLock.Unlock()
+
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ supervisor.Signal(ctx, supervisor.SignalDone)
+ return nil
+}
+
+// stateNew is called when a Manager is New. It makes the decision on how to join this node into a cluster.
+func (m *Manager) stateNew(ctx context.Context) error {
+ supervisor.Logger(ctx).Info("Starting enrolment process...")
+
+ // Check for presence of EnrolmentConfig on ESP or in qemu firmware variables.
+ var configRaw []byte
+ configRaw, err := m.storageRoot.ESP.Enrolment.Read()
+ if err != nil && !os.IsNotExist(err) {
+ return fmt.Errorf("could not read local enrolment file: %w", err)
+ } else if err != nil {
+ configRaw, err = ioutil.ReadFile("/sys/firmware/qemu_fw_cfg/by_name/com.nexantic.smalltown/enrolment.pb/raw")
+ if err != nil && !os.IsNotExist(err) {
+ return fmt.Errorf("could not read firmware enrolment file: %w", err)
+ }
+ }
+
+ // If no enrolment file exists, we create a new cluster.
+ if configRaw == nil {
+ m.next(ctx, StateCreatingCluster)
+ return nil
+ }
+
+ // Enrolment file exists, parse it.
+
+ enrolmentConfig := apb.EnrolmentConfig{}
+ if err := proto.Unmarshal(configRaw, &enrolmentConfig); err != nil {
+ return fmt.Errorf("could not unmarshal local enrolment file: %w", err)
+ }
+
+ // If no join ticket exists, we can't do anything yet.
+ if enrolmentConfig.GoldenTicket == nil {
+ return fmt.Errorf("joining a cluster without a golden ticket not yet implemented")
+ }
+
+ m.goldenTicket = enrolmentConfig.GoldenTicket
+
+ // Otherwise, we begin enrolling with the Golden Ticket.
+ m.next(ctx, StateCharlie)
+ return nil
+}
+
+// stateCreatingCluster is called when the Manager has decided to create a new cluster.
+//
+// The process to create a new cluster is as follows:
+// - wait for IP address
+// - initialize new data partition, by generating local and cluster unlock keys (the local unlock key is saved to
+// the ESP, while the cluster unlock key is returned)
+// - create a new node certificate and Node (with new given cluster unlock key)
+// - start up a new etcd cluster, with this node being the only member
+// - save the new Node to the new etcd cluster (thereby saving the node's cluster unlock key to etcd)
+func (m *Manager) stateCreatingCluster(ctx context.Context) error {
+ logger := supervisor.Logger(ctx)
+ logger.Info("Creating new cluster: waiting for IP address...")
+ ip, err := m.networkService.GetIP(ctx, true)
+ if err != nil {
+ return fmt.Errorf("when getting IP address: %w", err)
+ }
+ logger.Infof("Creating new cluster: got IP address %s", ip.String())
+
+ logger.Info("Creating new cluster: initializing storage...")
+ cuk, err := m.storageRoot.Data.MountNew(&m.storageRoot.ESP.LocalUnlock)
+ if err != nil {
+ return fmt.Errorf("when making new data partition: %w", err)
+ }
+ logger.Info("Creating new cluster: storage initialized")
+
+ // Create certificate for node.
+ cert, err := m.storageRoot.Data.Node.EnsureSelfSigned(localstorage.CertificateForNode)
+ if err != nil {
+ return fmt.Errorf("failed to create new node certificate: %w", err)
+ }
+
+ node := NewNode(cuk, *ip, *cert.Leaf)
+
+ m.consensus = consensus.New(consensus.Config{
+ Data: &m.storageRoot.Data.Etcd,
+ Ephemeral: &m.storageRoot.Ephemeral.Consensus,
+ NewCluster: true,
+ Name: node.ID(),
+ InitialCluster: ip.String(),
+ ExternalHost: ip.String(),
+ ListenHost: ip.String(),
+ })
+ if err := supervisor.Run(ctx, "consensus", m.consensus.Run); err != nil {
+ return fmt.Errorf("when starting consensus: %w", err)
+ }
+
+ // TODO(q3k): make timeout configurable?
+ ctxT, ctxC := context.WithTimeout(ctx, 5*time.Second)
+ defer ctxC()
+
+ supervisor.Logger(ctx).Info("Creating new cluster: waiting for consensus...")
+ if err := m.consensus.WaitReady(ctxT); err != nil {
+ return fmt.Errorf("consensus service failed to become ready: %w", err)
+ }
+
+ // Configure node to be a consensus member and kubernetes worker. In the future, different nodes will have
+ // different roles, but for now they're all symmetrical.
+ _, consensusName, err := m.consensus.MemberInfo(ctx)
+ if err != nil {
+ return fmt.Errorf("could not get consensus MemberInfo: %w", err)
+ }
+ if err := node.MakeConsensusMember(consensusName); err != nil {
+ return fmt.Errorf("could not make new node into consensus member: %w", err)
+ }
+ if err := node.MakeKubernetesWorker(node.ID()); err != nil {
+ return fmt.Errorf("could not make new node into kubernetes worker: %w", err)
+ }
+
+ // Save node into etcd.
+ supervisor.Logger(ctx).Info("Creating new cluster: storing first node...")
+ if err := node.Store(ctx, m.consensus.KV("cluster", "enrolment")); err != nil {
+ return fmt.Errorf("could not save new node: %w", err)
+ }
+
+ m.stateLock.Lock()
+ m.stateRunningNode = node
+ m.stateLock.Unlock()
+
+ m.next(ctx, StateRunning)
+ return nil
+}
+
+// stateCharlie is used to join an existing cluster via the GoldenTicket mechanism. This mechanism is temporarily
+// implemented in Smalltown in order to allow for testing multi-node clusters without a TPM attestation flow implemented.
+// The Golden Ticket contains a pregenerated node certificate, etcd certificate, and other data that any node can
+// use to join the cluster.
+// Since this flow is temporary, it has a slight impedance mismatch with methods exposed by localstorage, node, etc.,
+// and the resulting sequencing is a bit odd:
+// - the {node,etcd} certificates/keys are loaded (this already dictates the new node name, as the node name is based
+// off of the node public key)
+// - local storage is initialized, a local/cluster unlock keypair is generated
+// - etcd keys are manually saved to localstorage (vs. being generated locally by CA)
+// - an etcd/consensus member is started, knowing that the remote member was already generated when the golden ticket
+// was generated (vs. being created now by an RPC call, via an promote-node-to-etcd-member flow)
+// - the node is then promoted to a consensus member and kubernetes worker, its clusterunlock key is set, and then it
+// is saved to etcd.
+// As such, in this flow, we first create an etcd member (on goldenticket generation), and then only create a new Smalltown
+// node (when the goldenticket is used).
+func (m *Manager) stateCharlie(ctx context.Context) error {
+ t := m.goldenTicket
+ nodeCert, err := x509.ParseCertificate(t.NodeCert)
+ if err != nil {
+ return fmt.Errorf("parsing node certificate from ticket: %w", err)
+ }
+
+ supervisor.Logger(ctx).Info("Joining cluster: waiting for IP address...")
+ ip, err := m.networkService.GetIP(ctx, true)
+ if err != nil {
+ return fmt.Errorf("when getting IP address: %w", err)
+ }
+ supervisor.Logger(ctx).Info("Joining cluster: got IP address %s", ip.String())
+
+ supervisor.Logger(ctx).Info("Joining cluster: initializing storage...")
+ cuk, err := m.storageRoot.Data.MountNew(&m.storageRoot.ESP.LocalUnlock)
+ if err != nil {
+ return fmt.Errorf("when making new data partition: %w", err)
+ }
+ supervisor.Logger(ctx).Info("Joining cluster: storage initialized")
+ node := NewNode(cuk, *ip, *nodeCert)
+
+ // Save etcd PKI to disk.
+ for _, f := range []struct {
+ target declarative.FilePlacement
+ data []byte
+ blockType string
+ }{
+ {m.storageRoot.Data.Etcd.PeerPKI.Key, t.EtcdClientKey, "PRIVATE KEY"},
+ {m.storageRoot.Data.Etcd.PeerPKI.Certificate, t.EtcdClientCert, "CERTIFICATE"},
+ {m.storageRoot.Data.Etcd.PeerPKI.CACertificate, t.EtcdCaCert, "CERTIFICATE"},
+ } {
+ if err := f.target.Write(pem.EncodeToMemory(&pem.Block{Type: f.blockType, Bytes: f.data}), 0600); err != nil {
+ return fmt.Errorf("when writing etcd PKI data: %w", err)
+ }
+ }
+ if err := m.storageRoot.Data.Etcd.PeerCRL.Write(t.EtcdCrl, 0600); err != nil {
+ return fmt.Errorf("when writing etcd CRL: %w", err)
+ }
+
+ https := func(p *apb.GoldenTicket_EtcdPeer) string {
+ return fmt.Sprintf("%s=https://%s:%d", p.Name, p.Address, common.ConsensusPort)
+ }
+ var initialCluster []string
+ for _, p := range t.Peers {
+ initialCluster = append(initialCluster, https(p))
+ }
+ initialCluster = append(initialCluster, https(t.This))
+
+ supervisor.Logger(ctx).Infof("Joining cluster: starting etcd join, name: %s, initial_cluster: %s", node.ID(), strings.Join(initialCluster, ","))
+ m.consensus = consensus.New(consensus.Config{
+ Data: &m.storageRoot.Data.Etcd,
+ Ephemeral: &m.storageRoot.Ephemeral.Consensus,
+ Name: node.ID(),
+ InitialCluster: strings.Join(initialCluster, ","),
+ ExternalHost: ip.String(),
+ ListenHost: ip.String(),
+ })
+
+ if err := supervisor.Run(ctx, "consensus", m.consensus.Run); err != nil {
+ return fmt.Errorf("when starting consensus: %w", err)
+ }
+
+ // TODO(q3k): make timeout configurable?
+ ctxT, ctxC := context.WithTimeout(ctx, 5*time.Second)
+ defer ctxC()
+
+ supervisor.Logger(ctx).Info("Joining cluster: waiting for consensus...")
+ if err := m.consensus.WaitReady(ctxT); err != nil {
+ return fmt.Errorf("consensus service failed to become ready: %w", err)
+ }
+
+ // Configure node to be a consensus member and kubernetes worker. In the future, different nodes will have
+ // different roles, but for now they're all symmetrical.
+ _, consensusName, err := m.consensus.MemberInfo(ctx)
+ if err != nil {
+ return fmt.Errorf("could not get consensus MemberInfo: %w", err)
+ }
+ if err := node.MakeConsensusMember(consensusName); err != nil {
+ return fmt.Errorf("could not make new node into consensus member: %w", err)
+ }
+ if err := node.MakeKubernetesWorker(node.ID()); err != nil {
+ return fmt.Errorf("could not make new node into kubernetes worker: %w", err)
+ }
+
+ // Save node into etcd.
+ supervisor.Logger(ctx).Info("Creating new cluster: storing first node...")
+ if err := node.Store(ctx, m.consensus.KV("cluster", "enrolment")); err != nil {
+ return fmt.Errorf("could not save new node: %w", err)
+ }
+
+ m.stateLock.Lock()
+ m.stateRunningNode = node
+ m.stateLock.Unlock()
+
+ m.next(ctx, StateRunning)
+ return nil
+}
+
+// Node returns the Node that the Manager brought into a cluster, or nil if the Manager is not Running.
+// This is safe to call from any goroutine.
+func (m *Manager) Node() *Node {
+ m.stateLock.Lock()
+ defer m.stateLock.Unlock()
+ if m.state != StateRunning {
+ return nil
+ }
+ return m.stateRunningNode
+}
+
+// ConsensusKV returns a namespaced etcd KV client, or nil if the Manager is not Running.
+// This is safe to call from any goroutine.
+func (m *Manager) ConsensusKV(module, space string) clientv3.KV {
+ m.stateLock.Lock()
+ defer m.stateLock.Unlock()
+ if m.state != StateRunning {
+ return nil
+ }
+ if m.stateRunningNode.ConsensusMember() == nil {
+ // TODO(q3k): in this case, we should return a client to etcd even though this
+ // node is not a member of consensus. For now, all nodes are consensus members.
+ return nil
+ }
+ return m.consensus.KV(module, space)
+}
+
+// ConsensusKVRoot returns a non-namespaced etcd KV client, or nil if the Manager is not Running.
+// This is safe to call from any goroutine.
+func (m *Manager) ConsensusKVRoot() clientv3.KV {
+ m.stateLock.Lock()
+ defer m.stateLock.Unlock()
+ if m.state != StateRunning {
+ return nil
+ }
+ if m.stateRunningNode.ConsensusMember() == nil {
+ // TODO(q3k): in this case, we should return a client to etcd even though this
+ // node is not a member of consensus. For now, all nodes are consensus members.
+ return nil
+ }
+ return m.consensus.KVRoot()
+}
+
+// ConsensusCluster returns an etcd Cluster client, or nil if the Manager is not Running.
+// This is safe to call from any goroutine.
+func (m *Manager) ConsensusCluster() clientv3.Cluster {
+ m.stateLock.Lock()
+ defer m.stateLock.Unlock()
+ if m.state != StateRunning {
+ return nil
+ }
+ if m.stateRunningNode.ConsensusMember() == nil {
+ // TODO(q3k): in this case, we should return a client to etcd even though this
+ // node is not a member of consensus. For now, all nodes are consensus members.
+ return nil
+ }
+ return m.consensus.Cluster()
+}