m/n/core/cluster: rewrite bootstrap to conform to cluster lifecycle DD
This removes the existing cluster/manager code and reimplements it from
scratch, finally implementing the cluster lifecycle design document for
cluster bootstrap.
Test Plan:
E2e should cover this. Maybe we could unit test the manager? But that would
require a ton of DI work. Not sure if it's worth it.
X-Origin-Diff: phab/D735
GitOrigin-RevId: b00c97b0a102a21605d16086df82a6ece6eb7f4d
diff --git a/metropolis/node/core/cluster/BUILD.bazel b/metropolis/node/core/cluster/BUILD.bazel
index 4490e1f..b240d16 100644
--- a/metropolis/node/core/cluster/BUILD.bazel
+++ b/metropolis/node/core/cluster/BUILD.bazel
@@ -3,6 +3,8 @@
go_library(
name = "go_default_library",
srcs = [
+ "cluster.go",
+ "configuration.go",
"manager.go",
"node.go",
],
@@ -12,11 +14,10 @@
"//metropolis/node/core/consensus:go_default_library",
"//metropolis/node/core/localstorage:go_default_library",
"//metropolis/node/core/network:go_default_library",
+ "//metropolis/pkg/pki:go_default_library",
"//metropolis/pkg/supervisor:go_default_library",
"//metropolis/proto/api:go_default_library",
- "//metropolis/proto/internal:go_default_library",
- "@com_github_cenkalti_backoff_v4//:go_default_library",
- "@com_github_golang_protobuf//proto:go_default_library",
+ "//metropolis/proto/private:go_default_library",
"@io_etcd_go_etcd//clientv3:go_default_library",
"@org_golang_google_protobuf//proto:go_default_library",
"@org_golang_x_sys//unix:go_default_library",
diff --git a/metropolis/node/core/cluster/cluster.go b/metropolis/node/core/cluster/cluster.go
new file mode 100644
index 0000000..1277c2a
--- /dev/null
+++ b/metropolis/node/core/cluster/cluster.go
@@ -0,0 +1,63 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package cluster
+
+import (
+ "fmt"
+
+ "source.monogon.dev/metropolis/pkg/pki"
+)
+
+type ClusterState int
+
+const (
+ ClusterUnknown ClusterState = iota
+ ClusterForeign
+ ClusterTrusted
+ ClusterHome
+ ClusterDisowning
+)
+
+type Cluster struct {
+ State ClusterState
+}
+
+func (s ClusterState) String() string {
+ switch s {
+ case ClusterForeign:
+ return "ClusterForeign"
+ case ClusterTrusted:
+ return "ClusterTrusted"
+ case ClusterHome:
+ return "ClusterHome"
+ case ClusterDisowning:
+ return "ClusterDisowning"
+ }
+ return fmt.Sprintf("Invalid(%d)", s)
+}
+
+var clusterStateTransitions = map[ClusterState][]ClusterState{
+ ClusterUnknown: {ClusterForeign, ClusterHome, ClusterDisowning},
+ ClusterForeign: {ClusterTrusted},
+ ClusterTrusted: {ClusterHome},
+ ClusterHome: {ClusterHome, ClusterDisowning},
+}
+
+var (
+ PKINamespace = pki.Namespaced("/cluster-pki/")
+ PKICA = PKINamespace.New(pki.SelfSigned, "cluster-ca", pki.CA("Metropolis Cluster CA"))
+)
diff --git a/metropolis/node/core/cluster/configuration.go b/metropolis/node/core/cluster/configuration.go
new file mode 100644
index 0000000..adb156d
--- /dev/null
+++ b/metropolis/node/core/cluster/configuration.go
@@ -0,0 +1,19 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package cluster
+
+
diff --git a/metropolis/node/core/cluster/manager.go b/metropolis/node/core/cluster/manager.go
index d7ffe5a..10d699c 100644
--- a/metropolis/node/core/cluster/manager.go
+++ b/metropolis/node/core/cluster/manager.go
@@ -18,408 +18,304 @@
import (
"context"
+ "crypto/ed25519"
+ "crypto/rand"
+ "encoding/hex"
+ "errors"
"fmt"
"io/ioutil"
- "os"
"sync"
- "time"
- "github.com/cenkalti/backoff/v4"
- "go.etcd.io/etcd/clientv3"
"google.golang.org/protobuf/proto"
"source.monogon.dev/metropolis/node/core/consensus"
"source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/node/core/network"
+ "source.monogon.dev/metropolis/pkg/pki"
"source.monogon.dev/metropolis/pkg/supervisor"
apb "source.monogon.dev/metropolis/proto/api"
+ ppb "source.monogon.dev/metropolis/proto/private"
)
-// Manager is a finite state machine that joins this node (ie., Metropolis node running on a virtual/physical machine)
-// into a Metropolis cluster (ie. group of nodes that act as a single control plane for Metropolis services). It does
-// this by bringing up all required operating-system level components, including mounting the local filesystem, bringing
-// up a consensus (etcd) server/client, ...
-//
-// The Manager runs as a single-shot Runnable. It will attempt to progress its state from the initial state (New) to
-// either Running (meaning that the node is now part of a cluster), or Failed (meaning that the node couldn't become
-// part of a cluster). It is not restartable, as it mutates quite a bit of implicit operating-system level state (like
-// filesystem mounts). As such, it's difficult to recover reliably from failures, and since these failures indicate
-// some high issues with the cluster configuration/state, a failure requires a full kernel reboot to retry (or fix/
-// reconfigure the node).
-//
-// Currently, the Manager only supports one flow for bringing up a Node: by creating a new cluster. As such, it's
-// missing the following flows:
-// - joining a new node into an already running cluster
-// - restarting a node into an already existing cluster
-// - restarting a node into an already running cluster (ie. full reboot of whole cluster)
-//
+type managerResult struct {
+ node *Node
+ err error
+}
+
+type state struct {
+ mu sync.RWMutex
+
+ oneway bool
+ stateCluster ClusterState
+ stateNode ppb.Node_FSMState
+
+ configuration *ppb.SealedConfiguration
+
+ result *managerResult
+ waiters []chan *managerResult
+}
+
+func (s *state) setResult(node *Node, err error) {
+ s.result = &managerResult{
+ node: node,
+ err: err,
+ }
+ for _, w := range s.waiters {
+ go func(c chan *managerResult) {
+ c <- s.result
+ }(w)
+ }
+ s.waiters = nil
+}
+
type Manager struct {
storageRoot *localstorage.Root
networkService *network.Service
- // stateLock locks all state* variables.
- stateLock sync.RWMutex
- // state is the FSM state of the Manager.
- state State
- // stateRunningNode is the Node that this Manager got from joining a cluster. It's only valid if the Manager is
- // Running.
- stateRunningNode *Node
- // stateWaiters is a list of channels that wish to be notified (by sending true or false) for when the Manager
- // reaches a final state (Running or Failed respectively).
- stateWaiters []chan bool
+ state
- // consensus is the spawned etcd/consensus service, if the Manager brought up a Node that should run one.
+ // consensus is the spawned etcd/consensus service, if the Manager brought
+ // up a Node that should run one.
consensus *consensus.Service
}
-// NewManager creates a new cluster Manager. The given localstorage Root must be places, but not yet started (and will
-// be started as the Manager makes progress). The given network Service must already be running.
+// NewManager creates a new cluster Manager. The given localstorage Root must
+// be places, but not yet started (and will be started as the Manager makes
+// progress). The given network Service must already be running.
func NewManager(storageRoot *localstorage.Root, networkService *network.Service) *Manager {
return &Manager{
storageRoot: storageRoot,
networkService: networkService,
+
+ state: state{
+ stateCluster: ClusterUnknown,
+ stateNode: ppb.Node_FSM_STATE_INVALID,
+ },
}
}
-// State is the state of the Manager finite state machine.
-type State int
-
-const (
- // StateNew is the initial state of the Manager. It decides how to go about joining or creating a cluster.
- StateNew State = iota
- // StateCreatingCluster is when the Manager attempts to create a new cluster - this happens when a node is started
- // with no EnrolmentConfig.
- StateCreatingCluster
- // StateRunning is when the Manager successfully got the node to be part of a cluster. stateRunningNode is valid.
- StateRunning
- // StateFailed is when the Manager failed to ge the node to be part of a cluster.
- StateFailed
-)
-
-func (s State) String() string {
- switch s {
- case StateNew:
- return "New"
- case StateCreatingCluster:
- return "CreatingCluster"
- case StateRunning:
- return "Running"
- case StateFailed:
- return "Failed"
- default:
- return "UNKNOWN"
- }
+func (m *Manager) lock() (*state, func()) {
+ m.mu.Lock()
+ return &m.state, m.mu.Unlock
}
-// allowedTransition describes all allowed state transitions (map[From][]To).
-var allowedTransitions = map[State][]State{
- StateNew: {StateCreatingCluster},
- StateCreatingCluster: {StateRunning, StateFailed},
+func (m *Manager) rlock() (*state, func()) {
+ m.mu.RLock()
+ return &m.state, m.mu.RUnlock
}
-// allowed returns whether a transition from a state to another state is allowed (ie. is defined in allowedTransitions).
-func (m *Manager) allowed(from, to State) bool {
- for _, allowed := range allowedTransitions[from] {
- if to == allowed {
- return true
- }
- }
- return false
-}
+func (m *Manager) Wait() (*Node, error) {
+ state, unlock := m.lock()
-// next moves the Manager finite state machine from its current state to `n`, or to Failed if the transition is not
-// allowed.
-func (m *Manager) next(ctx context.Context, n State) {
- m.stateLock.Lock()
- defer m.stateLock.Unlock()
-
- if !m.allowed(m.state, n) {
- supervisor.Logger(ctx).Errorf("Attempted invalid enrolment state transition, failing enrolment; from: %s, to: %s",
- m.state.String(), n.String())
- m.state = StateFailed
- return
+ if state.result != nil {
+ unlock()
+ return state.result.node, state.result.err
}
- supervisor.Logger(ctx).Infof("Enrolment state change; from: %s, to: %s", m.state.String(), n.String())
-
- m.state = n
+ c := make(chan *managerResult)
+ state.waiters = append(state.waiters, c)
+ unlock()
+ res := <-c
+ return res.node, res.err
}
-// State returns the state of the Manager. It's safe to call this from any goroutine.
-func (m *Manager) State() State {
- m.stateLock.RLock()
- defer m.stateLock.RUnlock()
- return m.state
-}
-
-// WaitFinished waits until the Manager FSM reaches Running or Failed, and returns true if the FSM is Running. It's
-// safe to call this from any goroutine.
-func (m *Manager) WaitFinished() (success bool) {
- m.stateLock.Lock()
- switch m.state {
- case StateFailed:
- m.stateLock.Unlock()
- return false
- case StateRunning:
- m.stateLock.Unlock()
- return true
- }
-
- C := make(chan bool)
- m.stateWaiters = append(m.stateWaiters, C)
- m.stateLock.Unlock()
- return <-C
-}
-
-// wakeWaiters wakes any WaitFinished waiters and lets them know about the current state of the Manager.
-// The stateLock must already been taken, and the state must have been set in the same critical section (otherwise
-// this can cause a race condition).
-func (m *Manager) wakeWaiters() {
- state := m.state
- waiters := m.stateWaiters
- m.stateWaiters = nil
-
- for _, waiter := range waiters {
- go func(w chan bool) {
- w <- state == StateRunning
- }(waiter)
- }
-}
-
-// Run is the runnable of the Manager, to be started using the Supervisor. It is one-shot, and should not be restarted.
+// Run is the runnable of the Manager, to be started using the Supervisor. It
+// is one-shot, and should not be restarted.
func (m *Manager) Run(ctx context.Context) error {
- if state := m.State(); state != StateNew {
- supervisor.Logger(ctx).Errorf("Manager started with non-New state %s, failing", state.String())
- m.stateLock.Lock()
- m.state = StateFailed
- m.wakeWaiters()
- m.stateLock.Unlock()
- return nil
+ state, unlock := m.lock()
+ if state.oneway {
+ unlock()
+ // TODO(q3k): restart the entire system if this happens
+ return fmt.Errorf("cannot restart cluster manager")
+ }
+ state.oneway = true
+ unlock()
+
+ configuration, err := m.storageRoot.ESP.SealedConfiguration.Unseal()
+ if err == nil {
+ supervisor.Logger(ctx).Info("Sealed configuration present. attempting to join cluster")
+ return m.join(ctx, configuration)
}
- var err error
- bo := backoff.NewExponentialBackOff()
- for {
- done := false
- state := m.State()
- switch state {
- case StateNew:
- err = m.stateNew(ctx)
- case StateCreatingCluster:
- err = m.stateCreatingCluster(ctx)
- default:
- done = true
- break
- }
-
- if err != nil || done {
- break
- }
-
- if state == m.State() && !m.allowed(state, m.State()) {
- supervisor.Logger(ctx).Errorf("Enrolment got stuck at %s, failing", m.state.String())
- m.stateLock.Lock()
- m.state = StateFailed
- m.stateLock.Unlock()
- } else {
- bo.Reset()
- }
+ if !errors.Is(err, localstorage.ErrNoSealed) {
+ return fmt.Errorf("unexpected sealed config error: %w", err)
}
- m.stateLock.Lock()
- state := m.state
- if state != StateRunning {
- supervisor.Logger(ctx).Errorf("Enrolment failed at %s: %v", m.state.String(), err)
- } else {
- supervisor.Logger(ctx).Info("Enrolment successful!")
+ supervisor.Logger(ctx).Info("No sealed configuration, looking for node parameters")
+
+ params, err := m.nodeParams(ctx)
+ if err != nil {
+ return fmt.Errorf("no parameters available: %w", err)
}
- m.wakeWaiters()
- m.stateLock.Unlock()
+
+ switch inner := params.Cluster.(type) {
+ case *apb.NodeParameters_ClusterBootstrap_:
+ return m.bootstrap(ctx, inner.ClusterBootstrap)
+ case *apb.NodeParameters_ClusterRegister_:
+ return m.register(ctx, inner.ClusterRegister)
+ default:
+ return fmt.Errorf("node parameters misconfigured: neither cluster_bootstrap nor cluster_register set")
+ }
+}
+
+func (m *Manager) bootstrap(ctx context.Context, bootstrap *apb.NodeParameters_ClusterBootstrap) error {
+ supervisor.Logger(ctx).Infof("Bootstrapping new cluster, owner public key: %s", hex.EncodeToString(bootstrap.OwnerPublicKey))
+ state, unlock := m.lock()
+ defer unlock()
+
+ state.configuration = &ppb.SealedConfiguration{}
+
+ // Mount new storage with generated CUK, and save LUK into sealed config proto.
+ supervisor.Logger(ctx).Infof("Bootstrapping: mounting new storage...")
+ cuk, err := m.storageRoot.Data.MountNew(state.configuration)
+ if err != nil {
+ return fmt.Errorf("could not make and mount data partition: %w", err)
+ }
+
+ pub, priv, err := ed25519.GenerateKey(rand.Reader)
+ if err != nil {
+ return fmt.Errorf("could not generate node keypair: %w", err)
+ }
+ supervisor.Logger(ctx).Infof("Bootstrapping: node public key: %s", hex.EncodeToString([]byte(pub)))
+
+ node := Node{
+ clusterUnlockKey: cuk,
+ pubkey: pub,
+ state: ppb.Node_FSM_STATE_UP,
+ // TODO(q3k): make this configurable.
+ consensusMember: &NodeRoleConsensusMember{},
+ kubernetesWorker: &NodeRoleKubernetesWorker{},
+ }
+
+ // Run worker to keep updating /ephemeral/hosts (and thus, /etc/hosts) with
+ // our own IP address. This ensures that the node's ID always resolves to
+ // its current external IP address.
+ supervisor.Run(ctx, "hostsfile", func(ctx context.Context) error {
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ watcher := m.networkService.Watch()
+ for {
+ status, err := watcher.Get(ctx)
+ if err != nil {
+ return err
+ }
+ err = node.ConfigureLocalHostname(ctx, &m.storageRoot.Ephemeral, status.ExternalAddress)
+ if err != nil {
+ return fmt.Errorf("could not configure hostname: %w", err)
+ }
+ }
+ })
+
+ // Bring up consensus with this node as the only member.
+ m.consensus = consensus.New(consensus.Config{
+ Data: &m.storageRoot.Data.Etcd,
+ Ephemeral: &m.storageRoot.Ephemeral.Consensus,
+ NewCluster: true,
+ Name: node.ID(),
+ })
+
+ supervisor.Logger(ctx).Infof("Bootstrapping: starting consensus...")
+ if err := supervisor.Run(ctx, "consensus", m.consensus.Run); err != nil {
+ return fmt.Errorf("when starting consensus: %w", err)
+ }
+
+ supervisor.Logger(ctx).Info("Bootstrapping: waiting for consensus...")
+ if err := m.consensus.WaitReady(ctx); err != nil {
+ return fmt.Errorf("consensus service failed to become ready: %w", err)
+ }
+ supervisor.Logger(ctx).Info("Bootstrapping: consensus ready.")
+
+ kv := m.consensus.KVRoot()
+ node.KV = kv
+
+ // Create Metropolis CA and this node's certificate.
+ caCertBytes, _, err := PKICA.Ensure(ctx, kv)
+ if err != nil {
+ return fmt.Errorf("failed to create cluster CA: %w", err)
+ }
+ nodeCert := PKINamespace.New(PKICA, "", pki.Server([]string{node.ID()}, nil))
+ nodeCert.UseExistingKey(priv)
+ nodeCertBytes, _, err := nodeCert.Ensure(ctx, kv)
+ if err != nil {
+ return fmt.Errorf("failed to create node certificate: %w", err)
+ }
+
+ if err := m.storageRoot.Data.Node.Credentials.CACertificate.Write(caCertBytes, 0400); err != nil {
+ return fmt.Errorf("failed to write CA certificate: %w", err)
+ }
+ if err := m.storageRoot.Data.Node.Credentials.Certificate.Write(nodeCertBytes, 0400); err != nil {
+ return fmt.Errorf("failed to write node certificate: %w", err)
+ }
+ if err := m.storageRoot.Data.Node.Credentials.Key.Write(priv, 0400); err != nil {
+ return fmt.Errorf("failed to write node private key: %w", err)
+ }
+
+ // Update our Node obejct in etcd.
+ if err := node.Store(ctx, kv); err != nil {
+ return fmt.Errorf("failed to store new node in etcd: %w", err)
+ }
+
+ state.setResult(&node, nil)
supervisor.Signal(ctx, supervisor.SignalHealthy)
supervisor.Signal(ctx, supervisor.SignalDone)
return nil
}
-// stateNew is called when a Manager is New. It makes the decision on how to join this node into a cluster.
-func (m *Manager) stateNew(ctx context.Context) error {
- supervisor.Logger(ctx).Info("Starting enrolment process...")
-
- // STOPGAP when migrating to enrolment config and cluster lifecycle: always
- // expect NodeParameters with ClusterBootstrap.
-
- // Check for presence of EnrolmentConfig on ESP or in qemu firmware variables.
- var configRaw []byte
- configRaw, err := m.storageRoot.ESP.NodeParameters.Read()
- if err != nil && !os.IsNotExist(err) {
- return fmt.Errorf("could not read local enrolment file: %w", err)
- } else if err != nil {
- configRaw, err = ioutil.ReadFile("/sys/firmware/qemu_fw_cfg/by_name/dev.monogon.metropolis/parameters.pb/raw")
- if err != nil && !os.IsNotExist(err) {
- return fmt.Errorf("could not read firmware enrolment file: %w", err)
- }
- }
-
- if configRaw == nil {
- return fmt.Errorf("no enrolment config present")
- }
-
- parameters := &apb.NodeParameters{}
- if err := proto.Unmarshal(configRaw, parameters); err != nil {
- return fmt.Errorf("enrolment config could not get unmarshaled: %w", err)
- }
-
- switch parameters.Cluster.(type) {
- case *apb.NodeParameters_ClusterBootstrap_:
- default:
- return fmt.Errorf("enrolment config has no ClusterBootstrap: %w", err)
- }
-
- m.next(ctx, StateCreatingCluster)
- return nil
+func (m *Manager) register(ctx context.Context, bootstrap *apb.NodeParameters_ClusterRegister) error {
+ return fmt.Errorf("unimplemented")
}
-// stateCreatingCluster is called when the Manager has decided to create a new cluster.
-//
-// The process to create a new cluster is as follows:
-// - wait for IP address
-// - initialize new data partition, by generating local and cluster unlock keys (the local unlock key is saved to
-// the ESP, while the cluster unlock key is returned)
-// - create a new node certificate and Node (with new given cluster unlock key)
-// - start up a new etcd cluster, with this node being the only member
-// - save the new Node to the new etcd cluster (thereby saving the node's cluster unlock key to etcd)
-func (m *Manager) stateCreatingCluster(ctx context.Context) error {
- logger := supervisor.Logger(ctx)
- logger.Info("Creating new cluster: waiting for IP address...")
-
- // STOPGAP: bad use of watcher (should be long-term)
- watcher := m.networkService.Watch()
- defer watcher.Close()
- data, err := watcher.Get(ctx)
+func (m *Manager) nodeParamsFWCFG(ctx context.Context) (*apb.NodeParameters, error) {
+ bytes, err := ioutil.ReadFile("/sys/firmware/qemu_fw_cfg/by_name/dev.monogon.metropolis/parameters.pb/raw")
if err != nil {
- return fmt.Errorf("when getting IP address: %w", err)
+ return nil, fmt.Errorf("could not read firmware enrolment file: %w", err)
}
- ip := data.ExternalAddress
- logger.Infof("Creating new cluster: got IP address %s", ip.String())
- logger.Info("Creating new cluster: initializing storage...")
- cuk, err := m.storageRoot.Data.MountNew(&m.storageRoot.ESP.LocalUnlock)
+ config := apb.NodeParameters{}
+ err = proto.Unmarshal(bytes, &config)
if err != nil {
- return fmt.Errorf("when making new data partition: %w", err)
- }
- logger.Info("Creating new cluster: storage initialized")
-
- // Create certificate for node.
- cert, err := m.storageRoot.Data.Node.EnsureSelfSigned(localstorage.CertificateForNode)
- if err != nil {
- return fmt.Errorf("failed to create new node certificate: %w", err)
+ return nil, fmt.Errorf("could not unmarshal: %v", err)
}
- node := NewNode(cuk, ip, *cert.Leaf)
-
- m.consensus = consensus.New(consensus.Config{
- Data: &m.storageRoot.Data.Etcd,
- Ephemeral: &m.storageRoot.Ephemeral.Consensus,
- NewCluster: true,
- Name: node.ID(),
- // STOPGAP: this will not be used after the manager rewrite.
- ExternalHost: ip.String(),
- })
- if err := supervisor.Run(ctx, "consensus", m.consensus.Run); err != nil {
- return fmt.Errorf("when starting consensus: %w", err)
- }
-
- // TODO(q3k): make timeout configurable?
- ctxT, ctxC := context.WithTimeout(ctx, 5*time.Second)
- defer ctxC()
-
- supervisor.Logger(ctx).Info("Creating new cluster: waiting for consensus...")
- if err := m.consensus.WaitReady(ctxT); err != nil {
- return fmt.Errorf("consensus service failed to become ready: %w", err)
- }
-
- // Configure node to be a consensus member and kubernetes worker. In the future, different nodes will have
- // different roles, but for now they're all symmetrical.
- _, consensusName, err := m.consensus.MemberInfo(ctx)
- if err != nil {
- return fmt.Errorf("could not get consensus MemberInfo: %w", err)
- }
- if err := node.MakeConsensusMember(consensusName); err != nil {
- return fmt.Errorf("could not make new node into consensus member: %w", err)
- }
- if err := node.MakeKubernetesWorker(node.ID()); err != nil {
- return fmt.Errorf("could not make new node into kubernetes worker: %w", err)
- }
-
- // Save node into etcd.
- supervisor.Logger(ctx).Info("Creating new cluster: storing first node...")
- if err := node.Store(ctx, m.consensus.KV("cluster", "enrolment")); err != nil {
- return fmt.Errorf("could not save new node: %w", err)
- }
-
- m.stateLock.Lock()
- m.stateRunningNode = node
- m.stateLock.Unlock()
-
- m.next(ctx, StateRunning)
- return nil
+ return &config, nil
}
-// Node returns the Node that the Manager brought into a cluster, or nil if the Manager is not Running.
-// This is safe to call from any goroutine.
+func (m *Manager) nodeParams(ctx context.Context) (*apb.NodeParameters, error) {
+ // Retrieve node parameters from qemu's fwcfg interface or ESP.
+ // TODO(q3k): probably abstract this away and implement per platform/build/...
+ paramsFWCFG, err := m.nodeParamsFWCFG(ctx)
+ if err != nil {
+ supervisor.Logger(ctx).Warningf("Could not retrieve node parameters from qemu fwcfg: %v", err)
+ paramsFWCFG = nil
+ } else {
+ supervisor.Logger(ctx).Infof("Retrieved node parameters from qemu fwcfg")
+ }
+ paramsESP, err := m.storageRoot.ESP.NodeParameters.Unmarshal()
+ if err != nil {
+ supervisor.Logger(ctx).Warningf("Could not retrieve node parameters from ESP: %v", err)
+ paramsESP = nil
+ } else {
+ supervisor.Logger(ctx).Infof("Retrieved node parameters from ESP")
+ }
+ if paramsFWCFG == nil && paramsESP == nil {
+ return nil, fmt.Errorf("could not find node parameters in ESP or qemu fwcfg")
+ }
+ if paramsFWCFG != nil && paramsESP != nil {
+ supervisor.Logger(ctx).Warningf("Node parameters found both inboth ESP and qemu fwcfg, using the latter")
+ return paramsFWCFG, nil
+ } else if paramsFWCFG != nil {
+ return paramsFWCFG, nil
+ } else {
+ return paramsESP, nil
+ }
+}
+
+func (m *Manager) join(ctx context.Context, cfg *ppb.SealedConfiguration) error {
+ return fmt.Errorf("unimplemented")
+}
+
+// Node returns the Node that the Manager brought into a cluster, or nil if the
+// Manager is not Running. This is safe to call from any goroutine.
func (m *Manager) Node() *Node {
- m.stateLock.Lock()
- defer m.stateLock.Unlock()
- if m.state != StateRunning {
- return nil
- }
- return m.stateRunningNode
-}
-
-// ConsensusKV returns a namespaced etcd KV client, or nil if the Manager is not Running.
-// This is safe to call from any goroutine.
-func (m *Manager) ConsensusKV(module, space string) clientv3.KV {
- m.stateLock.Lock()
- defer m.stateLock.Unlock()
- if m.state != StateRunning {
- return nil
- }
- if m.stateRunningNode.ConsensusMember() == nil {
- // TODO(q3k): in this case, we should return a client to etcd even though this
- // node is not a member of consensus. For now, all nodes are consensus members.
- return nil
- }
- return m.consensus.KV(module, space)
-}
-
-// ConsensusKVRoot returns a non-namespaced etcd KV client, or nil if the Manager is not Running.
-// This is safe to call from any goroutine.
-func (m *Manager) ConsensusKVRoot() clientv3.KV {
- m.stateLock.Lock()
- defer m.stateLock.Unlock()
- if m.state != StateRunning {
- return nil
- }
- if m.stateRunningNode.ConsensusMember() == nil {
- // TODO(q3k): in this case, we should return a client to etcd even though this
- // node is not a member of consensus. For now, all nodes are consensus members.
- return nil
- }
- return m.consensus.KVRoot()
-}
-
-// ConsensusCluster returns an etcd Cluster client, or nil if the Manager is not Running.
-// This is safe to call from any goroutine.
-func (m *Manager) ConsensusCluster() clientv3.Cluster {
- m.stateLock.Lock()
- defer m.stateLock.Unlock()
- if m.state != StateRunning {
- return nil
- }
- if m.stateRunningNode.ConsensusMember() == nil {
- // TODO(q3k): in this case, we should return a client to etcd even though this
- // node is not a member of consensus. For now, all nodes are consensus members.
- return nil
- }
- return m.consensus.Cluster()
+ return nil
}
diff --git a/metropolis/node/core/cluster/node.go b/metropolis/node/core/cluster/node.go
index 7d26213..a38da8b 100644
--- a/metropolis/node/core/cluster/node.go
+++ b/metropolis/node/core/cluster/node.go
@@ -18,92 +18,80 @@
import (
"context"
- "crypto/ed25519"
- "crypto/x509"
"encoding/hex"
"fmt"
"net"
+ "strings"
- "github.com/golang/protobuf/proto"
"go.etcd.io/etcd/clientv3"
"golang.org/x/sys/unix"
+ "google.golang.org/protobuf/proto"
"source.monogon.dev/metropolis/node/core/localstorage"
- ipb "source.monogon.dev/metropolis/proto/internal"
+ "source.monogon.dev/metropolis/pkg/supervisor"
+ ppb "source.monogon.dev/metropolis/proto/private"
)
-// Node is a Metropolis cluster member. A node is a virtual or physical machine running Metropolis. This object
-// represents a node only as part of a cluster - ie., this object will never be available outside of
-// //metropolis/node/core/cluster if the Node is not part of a Cluster.
-// Nodes are inherently tied to their long term storage, which is etcd. As such, methods on this object relate heavily
-// to the Node's expected lifecycle on etcd.
+// Node is a Metropolis cluster member. A node is a virtual or physical machine
+// running Metropolis. This object represents a node only as part of a cluster
+// - ie., this object will never be available outside of
+// //metropolis/node/core/cluster if the Node is not part of a Cluster. Nodes
+// are inherently tied to their long term storage, which is etcd. As such,
+// methods on this object relate heavily to the Node's expected lifecycle on
+// etcd.
type Node struct {
- // clusterUnlockKey is half of the unlock key required to mount the node's data partition. It's stored in etcd, and
- // will only be provided to the Node if it can prove its identity via an integrity mechanism (ie. via TPM), or when
- // the Node was just created (as the key is generated locally by localstorage on first format/mount).
- // The other part of the unlock key is the LocalUnlockKey that's present on the node's ESP partition.
+ // clusterUnlockKey is half of the unlock key required to mount the node's
+ // data partition. It's stored in etcd, and will only be provided to the
+ // Node if it can prove its identity via an integrity mechanism (ie. via
+ // TPM), or when the Node was just created (as the key is generated locally
+ // by localstorage on first format/mount). The other part of the unlock
+ // key is the LocalUnlockKey that's present on the node's ESP partition.
clusterUnlockKey []byte
- // certificate is the node's TLS certificate, used to authenticate Metropolis gRPC calls/services (but not
- // consensus/etcd). The certificate for a node is permanent (and never expires). It's self-signed by the node on
- // startup, and contains the node's IP address in its SAN. Callers/services should check directly against the
- // expected certificate, and not against a CA.
- certificate x509.Certificate
- // address is the management IP address of the node. The management IP address of a node is permanent.
- address net.IP
- // A Node can have multiple Roles. Each Role is represented by the presence of NodeRole* structures in this
- // structure, with a nil pointer representing the lack of a role.
+ pubkey []byte
+ state ppb.Node_FSMState
+
+ // A Node can have multiple Roles. Each Role is represented by the presence
+ // of NodeRole* structures in this structure, with a nil pointer
+ // representing the lack of a role.
consensusMember *NodeRoleConsensusMember
kubernetesWorker *NodeRoleKubernetesWorker
+
+ // At runtime, this represents an etcd client to the consensus cluster. This
+ // is used by applications (like Kubernetes).
+ KV clientv3.KV
}
-// NewNode creates a new Node. This is only called when a New node is supposed to be created as part of a cluster,
-// otherwise it should be loaded from Etcd.
-func NewNode(cuk []byte, address net.IP, certificate x509.Certificate) *Node {
- if certificate.Raw == nil {
- panic("new node must contain raw certificate")
- }
- return &Node{
- clusterUnlockKey: cuk,
- certificate: certificate,
- address: address,
- }
-}
-
-// NodeRoleConsensusMember defines that the Node is a consensus (etcd) cluster member.
+// NodeRoleConsensusMember defines that the Node is a consensus (etcd) cluster
+// member.
type NodeRoleConsensusMember struct {
- // etcdMember is the name of the node in Kubernetes. This is for now usually the same as the ID() of the Node.
- etcdMemberName string
}
-// NodeRoleKubernetesWorker defines that the Node should be running the Kubernetes control and data plane.
+// NodeRoleKubernetesWorker defines that the Node should be running the
+// Kubernetes control and data plane.
type NodeRoleKubernetesWorker struct {
- // nodeName is the name of the node in Kubernetes. This is for now usually the same as the ID() of the Node.
- nodeName string
}
-// ID returns the name of this node, which is `metropolis-{pubkeyHash}`. This name should be the primary way to refer to
-// Metropoils nodes within a cluster, and is guaranteed to be unique by relying on cryptographic randomness.
+// ID returns the name of this node, which is `metropolis-{pubkeyHash}`. This
+// name should be the primary way to refer to Metropoils nodes within a
+// cluster, and is guaranteed to be unique by relying on cryptographic
+// randomness.
func (n *Node) ID() string {
return fmt.Sprintf("metropolis-%s", n.IDBare())
}
// IDBare returns the `{pubkeyHash}` part of the node ID.
func (n Node) IDBare() string {
- pubKey, ok := n.certificate.PublicKey.(ed25519.PublicKey)
- if !ok {
- panic("node has non-ed25519 public key")
- }
- return hex.EncodeToString(pubKey[:16])
+ return hex.EncodeToString(n.pubkey[:16])
}
func (n *Node) String() string {
return n.ID()
}
-// ConsensusMember returns a copy of the NodeRoleConsensusMember struct if the Node is a consensus member, otherwise
-// nil.
+// ConsensusMember returns a copy of the NodeRoleConsensusMember struct if the
+// Node is a consensus member, otherwise nil.
func (n *Node) ConsensusMember() *NodeRoleConsensusMember {
if n.consensusMember == nil {
return nil
@@ -112,8 +100,8 @@
return &cm
}
-// KubernetesWorker returns a copy of the NodeRoleKubernetesWorker struct if the Node is a kubernetes worker, otherwise
-// nil.
+// KubernetesWorker returns a copy of the NodeRoleKubernetesWorker struct if
+// the Node is a kubernetes worker, otherwise nil.
func (n *Node) KubernetesWorker() *NodeRoleKubernetesWorker {
if n.kubernetesWorker == nil {
return nil
@@ -122,37 +110,37 @@
return &kw
}
-// etcdPath builds the etcd path in which this node's protobuf-serialized state is stored in etcd.
+// etcdPath builds the etcd path in which this node's protobuf-serialized state
+// is stored in etcd.
func (n *Node) etcdPath() string {
return fmt.Sprintf("/nodes/%s", n.ID())
}
-// proto serializes the Node object into protobuf, to be used for saving to etcd.
-func (n *Node) proto() *ipb.Node {
- msg := &ipb.Node{
- Certificate: n.certificate.Raw,
+// proto serializes the Node object into protobuf, to be used for saving to
+// etcd.
+func (n *Node) proto() *ppb.Node {
+ msg := &ppb.Node{
ClusterUnlockKey: n.clusterUnlockKey,
- Address: n.address.String(),
- Roles: &ipb.Node_Roles{},
+ PublicKey: n.pubkey,
+ FsmState: n.state,
+ Roles: &ppb.Node_Roles{},
}
if n.consensusMember != nil {
- msg.Roles.ConsensusMember = &ipb.Node_Roles_ConsensusMember{
- EtcdMemberName: n.consensusMember.etcdMemberName,
- }
+ msg.Roles.ConsensusMember = &ppb.Node_Roles_ConsensusMember{}
}
if n.kubernetesWorker != nil {
- msg.Roles.KubernetesWorker = &ipb.Node_Roles_KubernetesWorker{
- NodeName: n.kubernetesWorker.nodeName,
- }
+ msg.Roles.KubernetesWorker = &ppb.Node_Roles_KubernetesWorker{}
}
return msg
}
-// Store saves the Node into etcd. This should be called only once per Node (ie. when the Node has been created).
+// Store saves the Node into etcd. This should be called only once per Node
+// (ie. when the Node has been created).
func (n *Node) Store(ctx context.Context, kv clientv3.KV) error {
- // Currently the only flow to store a node to etcd is a write-once flow: once a node is created, it cannot be
- // deleted or updated. In the future, flows to change cluster node roles might be introduced (ie. to promote nodes
- // to consensus members, etc).
+ // Currently the only flow to store a node to etcd is a write-once flow:
+ // once a node is created, it cannot be deleted or updated. In the future,
+ // flows to change cluster node roles might be introduced (ie. to promote
+ // nodes to consensus members, etc).
key := n.etcdPath()
msg := n.proto()
nodeRaw, err := proto.Marshal(msg)
@@ -175,45 +163,49 @@
return nil
}
-// MakeConsensusMember turns the node into a consensus member with a given name. This only configures internal fields,
-// and does not actually start any services.
-func (n *Node) MakeConsensusMember(etcdMemberName string) error {
+// MakeConsensusMember turns the node into a consensus member. This only
+// configures internal fields, and does not actually start any services.
+func (n *Node) MakeConsensusMember() error {
if n.consensusMember != nil {
return fmt.Errorf("node already is consensus member")
}
- n.consensusMember = &NodeRoleConsensusMember{
- etcdMemberName: etcdMemberName,
- }
+ n.consensusMember = &NodeRoleConsensusMember{}
return nil
}
-// MakeKubernetesWorker turns the node into a kubernetes worker with a given name. This only configures internal fields,
-// and does not actually start any services.
-func (n *Node) MakeKubernetesWorker(name string) error {
+// MakeKubernetesWorker turns the node into a kubernetes worker. This only
+// configures internal fields, and does not actually start any services.
+func (n *Node) MakeKubernetesWorker() error {
if n.kubernetesWorker != nil {
return fmt.Errorf("node is already kubernetes worker")
}
- n.kubernetesWorker = &NodeRoleKubernetesWorker{
- nodeName: name,
- }
+ n.kubernetesWorker = &NodeRoleKubernetesWorker{}
return nil
}
-func (n *Node) Address() net.IP {
- return n.address
-}
-
-// ConfigureLocalHostname uses the node's ID as a hostname, and sets the current hostname, and local files like hosts
-// and machine-id accordingly.
-func (n *Node) ConfigureLocalHostname(etc *localstorage.EphemeralDirectory) error {
+// ConfigureLocalHostname uses the node's ID as a hostname, and sets the
+// current hostname, and local files like hosts and machine-id accordingly.
+func (n *Node) ConfigureLocalHostname(ctx context.Context, ephemeral *localstorage.EphemeralDirectory, address net.IP) error {
if err := unix.Sethostname([]byte(n.ID())); err != nil {
return fmt.Errorf("failed to set runtime hostname: %w", err)
}
- if err := etc.Hosts.Write([]byte(fmt.Sprintf("%s %s", "127.0.0.1", n.ID())), 0644); err != nil {
- return fmt.Errorf("failed to write /etc/hosts: %w", err)
+ hosts := []string{
+ "127.0.0.1 localhost",
+ "::1 localhost",
+ fmt.Sprintf("%s %s", address.String(), n.ID()),
}
- if err := etc.MachineID.Write([]byte(n.IDBare()), 0644); err != nil {
- return fmt.Errorf("failed to write /etc/machine-id: %w", err)
+ if err := ephemeral.Hosts.Write([]byte(strings.Join(hosts, "\n")), 0644); err != nil {
+ return fmt.Errorf("failed to write /ephemeral/hosts: %w", err)
}
+ if err := ephemeral.MachineID.Write([]byte(n.IDBare()), 0644); err != nil {
+ return fmt.Errorf("failed to write /ephemeral/machine-id: %w", err)
+ }
+
+ // Check that we are self-resolvable.
+ ip, err := net.ResolveIPAddr("ip", n.ID())
+ if err != nil {
+ return fmt.Errorf("failed to self-resolve: %w", err)
+ }
+ supervisor.Logger(ctx).Infof("This is node %s at %v", n.ID(), ip)
return nil
}