m/n/core/cluster: rewrite bootstrap to conform to cluster lifecycle DD

This removes the existing cluster/manager code and reimplements it from
scratch, finally implementing the cluster lifecycle design document for
cluster bootstrap.

Test Plan:
E2e should cover this. Maybe we could unit test the manager? But that would
require a ton of DI work. Not sure if it's worth it.

X-Origin-Diff: phab/D735
GitOrigin-RevId: b00c97b0a102a21605d16086df82a6ece6eb7f4d
diff --git a/metropolis/node/core/cluster/manager.go b/metropolis/node/core/cluster/manager.go
index d7ffe5a..10d699c 100644
--- a/metropolis/node/core/cluster/manager.go
+++ b/metropolis/node/core/cluster/manager.go
@@ -18,408 +18,304 @@
 
 import (
 	"context"
+	"crypto/ed25519"
+	"crypto/rand"
+	"encoding/hex"
+	"errors"
 	"fmt"
 	"io/ioutil"
-	"os"
 	"sync"
-	"time"
 
-	"github.com/cenkalti/backoff/v4"
-	"go.etcd.io/etcd/clientv3"
 	"google.golang.org/protobuf/proto"
 
 	"source.monogon.dev/metropolis/node/core/consensus"
 	"source.monogon.dev/metropolis/node/core/localstorage"
 	"source.monogon.dev/metropolis/node/core/network"
+	"source.monogon.dev/metropolis/pkg/pki"
 	"source.monogon.dev/metropolis/pkg/supervisor"
 	apb "source.monogon.dev/metropolis/proto/api"
+	ppb "source.monogon.dev/metropolis/proto/private"
 )
 
-// Manager is a finite state machine that joins this node (ie., Metropolis node running on a virtual/physical machine)
-// into a Metropolis cluster (ie. group of nodes that act as a single control plane for Metropolis services). It does
-// this by bringing up all required operating-system level components, including mounting the local filesystem, bringing
-// up a consensus (etcd) server/client, ...
-//
-// The Manager runs as a single-shot Runnable. It will attempt to progress its state from the initial state (New) to
-// either Running (meaning that the node is now part of a cluster), or Failed (meaning that the node couldn't become
-// part of a cluster). It is not restartable, as it mutates quite a bit of implicit operating-system level state (like
-// filesystem mounts).  As such, it's difficult to recover reliably from failures, and since these failures indicate
-// some high issues with the cluster configuration/state, a failure requires a full kernel reboot to retry (or fix/
-// reconfigure the node).
-//
-// Currently, the Manager only supports one flow for bringing up a Node: by creating a new cluster. As such, it's
-// missing the following flows:
-//  - joining a new node into an already running cluster
-//  - restarting a node into an already existing cluster
-//  - restarting a node into an already running cluster (ie. full reboot of whole cluster)
-//
+type managerResult struct {
+	node *Node
+	err  error
+}
+
+type state struct {
+	mu sync.RWMutex
+
+	oneway       bool
+	stateCluster ClusterState
+	stateNode    ppb.Node_FSMState
+
+	configuration *ppb.SealedConfiguration
+
+	result  *managerResult
+	waiters []chan *managerResult
+}
+
+func (s *state) setResult(node *Node, err error) {
+	s.result = &managerResult{
+		node: node,
+		err:  err,
+	}
+	for _, w := range s.waiters {
+		go func(c chan *managerResult) {
+			c <- s.result
+		}(w)
+	}
+	s.waiters = nil
+}
+
 type Manager struct {
 	storageRoot    *localstorage.Root
 	networkService *network.Service
 
-	// stateLock locks all state* variables.
-	stateLock sync.RWMutex
-	// state is the FSM state of the Manager.
-	state State
-	// stateRunningNode is the Node that this Manager got from joining a cluster. It's only valid if the Manager is
-	// Running.
-	stateRunningNode *Node
-	// stateWaiters is a list of channels that wish to be notified (by sending true or false) for when the Manager
-	// reaches a final state (Running or Failed respectively).
-	stateWaiters []chan bool
+	state
 
-	// consensus is the spawned etcd/consensus service, if the Manager brought up a Node that should run one.
+	// consensus is the spawned etcd/consensus service, if the Manager brought
+	// up a Node that should run one.
 	consensus *consensus.Service
 }
 
-// NewManager creates a new cluster Manager. The given localstorage Root must be places, but not yet started (and will
-// be started as the Manager makes progress). The given network Service must already be running.
+// NewManager creates a new cluster Manager. The given localstorage Root must
+// be places, but not yet started (and will be started as the Manager makes
+// progress). The given network Service must already be running.
 func NewManager(storageRoot *localstorage.Root, networkService *network.Service) *Manager {
 	return &Manager{
 		storageRoot:    storageRoot,
 		networkService: networkService,
+
+		state: state{
+			stateCluster: ClusterUnknown,
+			stateNode:    ppb.Node_FSM_STATE_INVALID,
+		},
 	}
 }
 
-// State is the state of the Manager finite state machine.
-type State int
-
-const (
-	// StateNew is the initial state of the Manager. It decides how to go about joining or creating a cluster.
-	StateNew State = iota
-	// StateCreatingCluster is when the Manager attempts to create a new cluster - this happens when a node is started
-	// with no EnrolmentConfig.
-	StateCreatingCluster
-	// StateRunning is when the Manager successfully got the node to be part of a cluster. stateRunningNode is valid.
-	StateRunning
-	// StateFailed is when the Manager failed to ge the node to be part of a cluster.
-	StateFailed
-)
-
-func (s State) String() string {
-	switch s {
-	case StateNew:
-		return "New"
-	case StateCreatingCluster:
-		return "CreatingCluster"
-	case StateRunning:
-		return "Running"
-	case StateFailed:
-		return "Failed"
-	default:
-		return "UNKNOWN"
-	}
+func (m *Manager) lock() (*state, func()) {
+	m.mu.Lock()
+	return &m.state, m.mu.Unlock
 }
 
-// allowedTransition describes all allowed state transitions (map[From][]To).
-var allowedTransitions = map[State][]State{
-	StateNew:             {StateCreatingCluster},
-	StateCreatingCluster: {StateRunning, StateFailed},
+func (m *Manager) rlock() (*state, func()) {
+	m.mu.RLock()
+	return &m.state, m.mu.RUnlock
 }
 
-// allowed returns whether a transition from a state to another state is allowed (ie. is defined in allowedTransitions).
-func (m *Manager) allowed(from, to State) bool {
-	for _, allowed := range allowedTransitions[from] {
-		if to == allowed {
-			return true
-		}
-	}
-	return false
-}
+func (m *Manager) Wait() (*Node, error) {
+	state, unlock := m.lock()
 
-// next moves the Manager finite state machine from its current state to `n`, or to Failed if the transition is not
-// allowed.
-func (m *Manager) next(ctx context.Context, n State) {
-	m.stateLock.Lock()
-	defer m.stateLock.Unlock()
-
-	if !m.allowed(m.state, n) {
-		supervisor.Logger(ctx).Errorf("Attempted invalid enrolment state transition, failing enrolment; from: %s, to: %s",
-			m.state.String(), n.String())
-		m.state = StateFailed
-		return
+	if state.result != nil {
+		unlock()
+		return state.result.node, state.result.err
 	}
 
-	supervisor.Logger(ctx).Infof("Enrolment state change; from: %s, to: %s", m.state.String(), n.String())
-
-	m.state = n
+	c := make(chan *managerResult)
+	state.waiters = append(state.waiters, c)
+	unlock()
+	res := <-c
+	return res.node, res.err
 }
 
-// State returns the state of the Manager. It's safe to call this from any goroutine.
-func (m *Manager) State() State {
-	m.stateLock.RLock()
-	defer m.stateLock.RUnlock()
-	return m.state
-}
-
-// WaitFinished waits until the Manager FSM reaches Running or Failed, and returns true if the FSM is Running. It's
-// safe to call this from any goroutine.
-func (m *Manager) WaitFinished() (success bool) {
-	m.stateLock.Lock()
-	switch m.state {
-	case StateFailed:
-		m.stateLock.Unlock()
-		return false
-	case StateRunning:
-		m.stateLock.Unlock()
-		return true
-	}
-
-	C := make(chan bool)
-	m.stateWaiters = append(m.stateWaiters, C)
-	m.stateLock.Unlock()
-	return <-C
-}
-
-// wakeWaiters wakes any WaitFinished waiters and lets them know about the current state of the Manager.
-// The stateLock must already been taken, and the state must have been set in the same critical section (otherwise
-// this can cause a race condition).
-func (m *Manager) wakeWaiters() {
-	state := m.state
-	waiters := m.stateWaiters
-	m.stateWaiters = nil
-
-	for _, waiter := range waiters {
-		go func(w chan bool) {
-			w <- state == StateRunning
-		}(waiter)
-	}
-}
-
-// Run is the runnable of the Manager, to be started using the Supervisor. It is one-shot, and should not be restarted.
+// Run is the runnable of the Manager, to be started using the Supervisor. It
+// is one-shot, and should not be restarted.
 func (m *Manager) Run(ctx context.Context) error {
-	if state := m.State(); state != StateNew {
-		supervisor.Logger(ctx).Errorf("Manager started with non-New state %s, failing", state.String())
-		m.stateLock.Lock()
-		m.state = StateFailed
-		m.wakeWaiters()
-		m.stateLock.Unlock()
-		return nil
+	state, unlock := m.lock()
+	if state.oneway {
+		unlock()
+		// TODO(q3k): restart the entire system if this happens
+		return fmt.Errorf("cannot restart cluster manager")
+	}
+	state.oneway = true
+	unlock()
+
+	configuration, err := m.storageRoot.ESP.SealedConfiguration.Unseal()
+	if err == nil {
+		supervisor.Logger(ctx).Info("Sealed configuration present. attempting to join cluster")
+		return m.join(ctx, configuration)
 	}
 
-	var err error
-	bo := backoff.NewExponentialBackOff()
-	for {
-		done := false
-		state := m.State()
-		switch state {
-		case StateNew:
-			err = m.stateNew(ctx)
-		case StateCreatingCluster:
-			err = m.stateCreatingCluster(ctx)
-		default:
-			done = true
-			break
-		}
-
-		if err != nil || done {
-			break
-		}
-
-		if state == m.State() && !m.allowed(state, m.State()) {
-			supervisor.Logger(ctx).Errorf("Enrolment got stuck at %s, failing", m.state.String())
-			m.stateLock.Lock()
-			m.state = StateFailed
-			m.stateLock.Unlock()
-		} else {
-			bo.Reset()
-		}
+	if !errors.Is(err, localstorage.ErrNoSealed) {
+		return fmt.Errorf("unexpected sealed config error: %w", err)
 	}
 
-	m.stateLock.Lock()
-	state := m.state
-	if state != StateRunning {
-		supervisor.Logger(ctx).Errorf("Enrolment failed at %s: %v", m.state.String(), err)
-	} else {
-		supervisor.Logger(ctx).Info("Enrolment successful!")
+	supervisor.Logger(ctx).Info("No sealed configuration, looking for node parameters")
+
+	params, err := m.nodeParams(ctx)
+	if err != nil {
+		return fmt.Errorf("no parameters available: %w", err)
 	}
-	m.wakeWaiters()
-	m.stateLock.Unlock()
+
+	switch inner := params.Cluster.(type) {
+	case *apb.NodeParameters_ClusterBootstrap_:
+		return m.bootstrap(ctx, inner.ClusterBootstrap)
+	case *apb.NodeParameters_ClusterRegister_:
+		return m.register(ctx, inner.ClusterRegister)
+	default:
+		return fmt.Errorf("node parameters misconfigured: neither cluster_bootstrap nor cluster_register set")
+	}
+}
+
+func (m *Manager) bootstrap(ctx context.Context, bootstrap *apb.NodeParameters_ClusterBootstrap) error {
+	supervisor.Logger(ctx).Infof("Bootstrapping new cluster, owner public key: %s", hex.EncodeToString(bootstrap.OwnerPublicKey))
+	state, unlock := m.lock()
+	defer unlock()
+
+	state.configuration = &ppb.SealedConfiguration{}
+
+	// Mount new storage with generated CUK, and save LUK into sealed config proto.
+	supervisor.Logger(ctx).Infof("Bootstrapping: mounting new storage...")
+	cuk, err := m.storageRoot.Data.MountNew(state.configuration)
+	if err != nil {
+		return fmt.Errorf("could not make and mount data partition: %w", err)
+	}
+
+	pub, priv, err := ed25519.GenerateKey(rand.Reader)
+	if err != nil {
+		return fmt.Errorf("could not generate node keypair: %w", err)
+	}
+	supervisor.Logger(ctx).Infof("Bootstrapping: node public key: %s", hex.EncodeToString([]byte(pub)))
+
+	node := Node{
+		clusterUnlockKey: cuk,
+		pubkey:           pub,
+		state:            ppb.Node_FSM_STATE_UP,
+		// TODO(q3k): make this configurable.
+		consensusMember:  &NodeRoleConsensusMember{},
+		kubernetesWorker: &NodeRoleKubernetesWorker{},
+	}
+
+	// Run worker to keep updating /ephemeral/hosts (and thus, /etc/hosts) with
+	// our own IP address. This ensures that the node's ID always resolves to
+	// its current external IP address.
+	supervisor.Run(ctx, "hostsfile", func(ctx context.Context) error {
+		supervisor.Signal(ctx, supervisor.SignalHealthy)
+		watcher := m.networkService.Watch()
+		for {
+			status, err := watcher.Get(ctx)
+			if err != nil {
+				return err
+			}
+			err = node.ConfigureLocalHostname(ctx, &m.storageRoot.Ephemeral, status.ExternalAddress)
+			if err != nil {
+				return fmt.Errorf("could not configure hostname: %w", err)
+			}
+		}
+	})
+
+	// Bring up consensus with this node as the only member.
+	m.consensus = consensus.New(consensus.Config{
+		Data:       &m.storageRoot.Data.Etcd,
+		Ephemeral:  &m.storageRoot.Ephemeral.Consensus,
+		NewCluster: true,
+		Name:       node.ID(),
+	})
+
+	supervisor.Logger(ctx).Infof("Bootstrapping: starting consensus...")
+	if err := supervisor.Run(ctx, "consensus", m.consensus.Run); err != nil {
+		return fmt.Errorf("when starting consensus: %w", err)
+	}
+
+	supervisor.Logger(ctx).Info("Bootstrapping: waiting for consensus...")
+	if err := m.consensus.WaitReady(ctx); err != nil {
+		return fmt.Errorf("consensus service failed to become ready: %w", err)
+	}
+	supervisor.Logger(ctx).Info("Bootstrapping: consensus ready.")
+
+	kv := m.consensus.KVRoot()
+	node.KV = kv
+
+	// Create Metropolis CA and this node's certificate.
+	caCertBytes, _, err := PKICA.Ensure(ctx, kv)
+	if err != nil {
+		return fmt.Errorf("failed to create cluster CA: %w", err)
+	}
+	nodeCert := PKINamespace.New(PKICA, "", pki.Server([]string{node.ID()}, nil))
+	nodeCert.UseExistingKey(priv)
+	nodeCertBytes, _, err := nodeCert.Ensure(ctx, kv)
+	if err != nil {
+		return fmt.Errorf("failed to create node certificate: %w", err)
+	}
+
+	if err := m.storageRoot.Data.Node.Credentials.CACertificate.Write(caCertBytes, 0400); err != nil {
+		return fmt.Errorf("failed to write CA certificate: %w", err)
+	}
+	if err := m.storageRoot.Data.Node.Credentials.Certificate.Write(nodeCertBytes, 0400); err != nil {
+		return fmt.Errorf("failed to write node certificate: %w", err)
+	}
+	if err := m.storageRoot.Data.Node.Credentials.Key.Write(priv, 0400); err != nil {
+		return fmt.Errorf("failed to write node private key: %w", err)
+	}
+
+	// Update our Node obejct in etcd.
+	if err := node.Store(ctx, kv); err != nil {
+		return fmt.Errorf("failed to store new node in etcd: %w", err)
+	}
+
+	state.setResult(&node, nil)
 
 	supervisor.Signal(ctx, supervisor.SignalHealthy)
 	supervisor.Signal(ctx, supervisor.SignalDone)
 	return nil
 }
 
-// stateNew is called when a Manager is New. It makes the decision on how to join this node into a cluster.
-func (m *Manager) stateNew(ctx context.Context) error {
-	supervisor.Logger(ctx).Info("Starting enrolment process...")
-
-	// STOPGAP when migrating to enrolment config and cluster lifecycle: always
-	// expect NodeParameters with ClusterBootstrap.
-
-	// Check for presence of EnrolmentConfig on ESP or in qemu firmware variables.
-	var configRaw []byte
-	configRaw, err := m.storageRoot.ESP.NodeParameters.Read()
-	if err != nil && !os.IsNotExist(err) {
-		return fmt.Errorf("could not read local enrolment file: %w", err)
-	} else if err != nil {
-		configRaw, err = ioutil.ReadFile("/sys/firmware/qemu_fw_cfg/by_name/dev.monogon.metropolis/parameters.pb/raw")
-		if err != nil && !os.IsNotExist(err) {
-			return fmt.Errorf("could not read firmware enrolment file: %w", err)
-		}
-	}
-
-	if configRaw == nil {
-		return fmt.Errorf("no enrolment config present")
-	}
-
-	parameters := &apb.NodeParameters{}
-	if err := proto.Unmarshal(configRaw, parameters); err != nil {
-		return fmt.Errorf("enrolment config could not get unmarshaled: %w", err)
-	}
-
-	switch parameters.Cluster.(type) {
-	case *apb.NodeParameters_ClusterBootstrap_:
-	default:
-		return fmt.Errorf("enrolment config has no ClusterBootstrap: %w", err)
-	}
-
-	m.next(ctx, StateCreatingCluster)
-	return nil
+func (m *Manager) register(ctx context.Context, bootstrap *apb.NodeParameters_ClusterRegister) error {
+	return fmt.Errorf("unimplemented")
 }
 
-// stateCreatingCluster is called when the Manager has decided to create a new cluster.
-//
-// The process to create a new cluster is as follows:
-//   - wait for IP address
-//   - initialize new data partition, by generating local and cluster unlock keys (the local unlock key is saved to
-//     the ESP, while the cluster unlock key is returned)
-//   - create a new node certificate and Node (with new given cluster unlock key)
-//   - start up a new etcd cluster, with this node being the only member
-//   - save the new Node to the new etcd cluster (thereby saving the node's cluster unlock key to etcd)
-func (m *Manager) stateCreatingCluster(ctx context.Context) error {
-	logger := supervisor.Logger(ctx)
-	logger.Info("Creating new cluster: waiting for IP address...")
-
-	// STOPGAP: bad use of watcher (should be long-term)
-	watcher := m.networkService.Watch()
-	defer watcher.Close()
-	data, err := watcher.Get(ctx)
+func (m *Manager) nodeParamsFWCFG(ctx context.Context) (*apb.NodeParameters, error) {
+	bytes, err := ioutil.ReadFile("/sys/firmware/qemu_fw_cfg/by_name/dev.monogon.metropolis/parameters.pb/raw")
 	if err != nil {
-		return fmt.Errorf("when getting IP address: %w", err)
+		return nil, fmt.Errorf("could not read firmware enrolment file: %w", err)
 	}
-	ip := data.ExternalAddress
-	logger.Infof("Creating new cluster: got IP address %s", ip.String())
 
-	logger.Info("Creating new cluster: initializing storage...")
-	cuk, err := m.storageRoot.Data.MountNew(&m.storageRoot.ESP.LocalUnlock)
+	config := apb.NodeParameters{}
+	err = proto.Unmarshal(bytes, &config)
 	if err != nil {
-		return fmt.Errorf("when making new data partition: %w", err)
-	}
-	logger.Info("Creating new cluster: storage initialized")
-
-	// Create certificate for node.
-	cert, err := m.storageRoot.Data.Node.EnsureSelfSigned(localstorage.CertificateForNode)
-	if err != nil {
-		return fmt.Errorf("failed to create new node certificate: %w", err)
+		return nil, fmt.Errorf("could not unmarshal: %v", err)
 	}
 
-	node := NewNode(cuk, ip, *cert.Leaf)
-
-	m.consensus = consensus.New(consensus.Config{
-		Data:       &m.storageRoot.Data.Etcd,
-		Ephemeral:  &m.storageRoot.Ephemeral.Consensus,
-		NewCluster: true,
-		Name:       node.ID(),
-		// STOPGAP: this will not be used after the manager rewrite.
-		ExternalHost: ip.String(),
-	})
-	if err := supervisor.Run(ctx, "consensus", m.consensus.Run); err != nil {
-		return fmt.Errorf("when starting consensus: %w", err)
-	}
-
-	// TODO(q3k): make timeout configurable?
-	ctxT, ctxC := context.WithTimeout(ctx, 5*time.Second)
-	defer ctxC()
-
-	supervisor.Logger(ctx).Info("Creating new cluster: waiting for consensus...")
-	if err := m.consensus.WaitReady(ctxT); err != nil {
-		return fmt.Errorf("consensus service failed to become ready: %w", err)
-	}
-
-	// Configure node to be a consensus member and kubernetes worker. In the future, different nodes will have
-	// different roles, but for now they're all symmetrical.
-	_, consensusName, err := m.consensus.MemberInfo(ctx)
-	if err != nil {
-		return fmt.Errorf("could not get consensus MemberInfo: %w", err)
-	}
-	if err := node.MakeConsensusMember(consensusName); err != nil {
-		return fmt.Errorf("could not make new node into consensus member: %w", err)
-	}
-	if err := node.MakeKubernetesWorker(node.ID()); err != nil {
-		return fmt.Errorf("could not make new node into kubernetes worker: %w", err)
-	}
-
-	// Save node into etcd.
-	supervisor.Logger(ctx).Info("Creating new cluster: storing first node...")
-	if err := node.Store(ctx, m.consensus.KV("cluster", "enrolment")); err != nil {
-		return fmt.Errorf("could not save new node: %w", err)
-	}
-
-	m.stateLock.Lock()
-	m.stateRunningNode = node
-	m.stateLock.Unlock()
-
-	m.next(ctx, StateRunning)
-	return nil
+	return &config, nil
 }
 
-// Node returns the Node that the Manager brought into a cluster, or nil if the Manager is not Running.
-// This is safe to call from any goroutine.
+func (m *Manager) nodeParams(ctx context.Context) (*apb.NodeParameters, error) {
+	// Retrieve node parameters from qemu's fwcfg interface or ESP.
+	// TODO(q3k): probably abstract this away and implement per platform/build/...
+	paramsFWCFG, err := m.nodeParamsFWCFG(ctx)
+	if err != nil {
+		supervisor.Logger(ctx).Warningf("Could not retrieve node parameters from qemu fwcfg: %v", err)
+		paramsFWCFG = nil
+	} else {
+		supervisor.Logger(ctx).Infof("Retrieved node parameters from qemu fwcfg")
+	}
+	paramsESP, err := m.storageRoot.ESP.NodeParameters.Unmarshal()
+	if err != nil {
+		supervisor.Logger(ctx).Warningf("Could not retrieve node parameters from ESP: %v", err)
+		paramsESP = nil
+	} else {
+		supervisor.Logger(ctx).Infof("Retrieved node parameters from ESP")
+	}
+	if paramsFWCFG == nil && paramsESP == nil {
+		return nil, fmt.Errorf("could not find node parameters in ESP or qemu fwcfg")
+	}
+	if paramsFWCFG != nil && paramsESP != nil {
+		supervisor.Logger(ctx).Warningf("Node parameters found both inboth ESP and qemu fwcfg, using the latter")
+		return paramsFWCFG, nil
+	} else if paramsFWCFG != nil {
+		return paramsFWCFG, nil
+	} else {
+		return paramsESP, nil
+	}
+}
+
+func (m *Manager) join(ctx context.Context, cfg *ppb.SealedConfiguration) error {
+	return fmt.Errorf("unimplemented")
+}
+
+// Node returns the Node that the Manager brought into a cluster, or nil if the
+// Manager is not Running.  This is safe to call from any goroutine.
 func (m *Manager) Node() *Node {
-	m.stateLock.Lock()
-	defer m.stateLock.Unlock()
-	if m.state != StateRunning {
-		return nil
-	}
-	return m.stateRunningNode
-}
-
-// ConsensusKV returns a namespaced etcd KV client, or nil if the Manager is not Running.
-// This is safe to call from any goroutine.
-func (m *Manager) ConsensusKV(module, space string) clientv3.KV {
-	m.stateLock.Lock()
-	defer m.stateLock.Unlock()
-	if m.state != StateRunning {
-		return nil
-	}
-	if m.stateRunningNode.ConsensusMember() == nil {
-		// TODO(q3k): in this case, we should return a client to etcd even though this
-		// node is not a member of consensus. For now, all nodes are consensus members.
-		return nil
-	}
-	return m.consensus.KV(module, space)
-}
-
-// ConsensusKVRoot returns a non-namespaced etcd KV client, or nil if the Manager is not Running.
-// This is safe to call from any goroutine.
-func (m *Manager) ConsensusKVRoot() clientv3.KV {
-	m.stateLock.Lock()
-	defer m.stateLock.Unlock()
-	if m.state != StateRunning {
-		return nil
-	}
-	if m.stateRunningNode.ConsensusMember() == nil {
-		// TODO(q3k): in this case, we should return a client to etcd even though this
-		// node is not a member of consensus. For now, all nodes are consensus members.
-		return nil
-	}
-	return m.consensus.KVRoot()
-}
-
-// ConsensusCluster returns an etcd Cluster client, or nil if the Manager is not Running.
-// This is safe to call from any goroutine.
-func (m *Manager) ConsensusCluster() clientv3.Cluster {
-	m.stateLock.Lock()
-	defer m.stateLock.Unlock()
-	if m.state != StateRunning {
-		return nil
-	}
-	if m.stateRunningNode.ConsensusMember() == nil {
-		// TODO(q3k): in this case, we should return a client to etcd even though this
-		// node is not a member of consensus. For now, all nodes are consensus members.
-		return nil
-	}
-	return m.consensus.Cluster()
+	return nil
 }