metropolis/node/core/cluster: use curator

This refactors the cluster manager. It removes all etcd storage
functionality (which now lives in the curator) and otherwise dusts
things off slightly (some file renames, some comments to reflect the now
clarified and limited scope of the cluster manager).

Change-Id: Ic62d8402c0618fb5e0e65966b0d732a2cab564e0
Reviewed-on: https://review.monogon.dev/c/monogon/+/188
Reviewed-by: Lorenz Brun <lorenz@nexantic.com>
diff --git a/metropolis/node/core/cluster/BUILD.bazel b/metropolis/node/core/cluster/BUILD.bazel
index 948b9ae..fcf8aae 100644
--- a/metropolis/node/core/cluster/BUILD.bazel
+++ b/metropolis/node/core/cluster/BUILD.bazel
@@ -4,26 +4,24 @@
     name = "go_default_library",
     srcs = [
         "cluster.go",
-        "configuration.go",
-        "manager.go",
-        "manager_bootstrap.go",
-        "node.go",
+        "cluster_bootstrap.go",
+        "status.go",
+        "watcher.go",
     ],
     importpath = "source.monogon.dev/metropolis/node/core/cluster",
     visibility = ["//metropolis/node/core:__subpackages__"],
     deps = [
         "//metropolis/node/core/consensus:go_default_library",
         "//metropolis/node/core/consensus/client:go_default_library",
+        "//metropolis/node/core/curator:go_default_library",
         "//metropolis/node/core/localstorage:go_default_library",
         "//metropolis/node/core/network:go_default_library",
         "//metropolis/pkg/event:go_default_library",
         "//metropolis/pkg/event/memory:go_default_library",
-        "//metropolis/pkg/pki:go_default_library",
         "//metropolis/pkg/supervisor:go_default_library",
         "//metropolis/proto/api:go_default_library",
+        "//metropolis/proto/common:go_default_library",
         "//metropolis/proto/private:go_default_library",
-        "@io_etcd_go_etcd//clientv3:go_default_library",
         "@org_golang_google_protobuf//proto:go_default_library",
-        "@org_golang_x_sys//unix:go_default_library",
     ],
 )
diff --git a/metropolis/node/core/cluster/cluster.go b/metropolis/node/core/cluster/cluster.go
index b194f25..aa293b0 100644
--- a/metropolis/node/core/cluster/cluster.go
+++ b/metropolis/node/core/cluster/cluster.go
@@ -14,69 +14,165 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+// cluster implements low-level clustering logic, especially logic regarding to
+// bootstrapping, registering into and joining a cluster. Its goal is to provide
+// the rest of the node code with the following:
+//  - A mounted plaintext storage.
+//  - Node credentials/identity.
+//  - A locally running etcd server if the node is supposed to run one, and a
+//    client connection to that etcd cluster if so.
+//  - The state of the cluster as seen by the node, to enable code to respond to
+//    node lifecycle changes.
 package cluster
 
 import (
+	"context"
+	"errors"
 	"fmt"
+	"io/ioutil"
+	"sync"
 
-	"source.monogon.dev/metropolis/pkg/pki"
+	"google.golang.org/protobuf/proto"
+
+	"source.monogon.dev/metropolis/node/core/consensus"
+	"source.monogon.dev/metropolis/node/core/localstorage"
+	"source.monogon.dev/metropolis/node/core/network"
+	"source.monogon.dev/metropolis/pkg/event/memory"
+	"source.monogon.dev/metropolis/pkg/supervisor"
+	apb "source.monogon.dev/metropolis/proto/api"
+	ppb "source.monogon.dev/metropolis/proto/private"
 )
 
-// ClusterState is the state of the cluster from the point of view of the
-// current node. Clients within the node code can watch this state to change
-// their behaviour as needed.
-type ClusterState int
+type state struct {
+	mu sync.RWMutex
 
-const (
-	// ClusterStateUnknown means the node has not yet determined the existence
-	// of a cluster it should join or start. This is a transient, initial state
-	// that should only manifest during boot.
-	ClusterUnknown ClusterState = iota
-	// ClusterForeign means the node is attempting to register into an already
-	// existing cluster with which it managed to make preliminary contact, but
-	// which the cluster has not yet fully productionized (eg. the node is
-	// still being hardware attested, or the operator needs to confirm the
-	// registration of this node).
-	ClusterForeign
-	// ClusterTrusted means the node is attempting to register into an already
-	// registered cluster, and has been trusted by it. The node is now
-	// attempting to finally commit into registering the cluster.
-	ClusterTrusted
-	// ClusterHome means the node is part of a cluster. This is the bulk of
-	// time in which this node will spend its time.
-	ClusterHome
-	// ClusterDisowning means the node has been disowned (ie., removed) by the
-	// cluster, and that it will not be ever part of any cluster again, and
-	// that it will be decommissioned by the operator.
-	ClusterDisowning
-	// ClusterSplit means that the node would usually be Home in a cluster, but
-	// has been split from the consensus of the cluster. This can happen for
-	// nodes running consensus when consensus is lost (eg. when there is no
-	// quorum or this node has been netsplit), and for other nodes if they have
-	// lost network connectivity to the consensus nodes. Clients should make
-	// their own decision what action to perform in this state, depending on
-	// the level of consistency required and whether it makes sense for the
-	// node to fence its services off.
-	ClusterSplit
-)
+	oneway bool
 
-func (s ClusterState) String() string {
-	switch s {
-	case ClusterForeign:
-		return "ClusterForeign"
-	case ClusterTrusted:
-		return "ClusterTrusted"
-	case ClusterHome:
-		return "ClusterHome"
-	case ClusterDisowning:
-		return "ClusterDisowning"
-	case ClusterSplit:
-		return "ClusterSplit"
-	}
-	return fmt.Sprintf("Invalid(%d)", s)
+	configuration *ppb.SealedConfiguration
 }
 
-var (
-	PKINamespace = pki.Namespaced("/cluster-pki/")
-	PKICA        = PKINamespace.New(pki.SelfSigned, "cluster-ca", pki.CA("Metropolis Cluster CA"))
-)
+type Manager struct {
+	storageRoot    *localstorage.Root
+	networkService *network.Service
+	status         memory.Value
+
+	state
+
+	// consensus is the spawned etcd/consensus service, if the Manager brought
+	// up a Node that should run one.
+	consensus *consensus.Service
+}
+
+// NewManager creates a new cluster Manager. The given localstorage Root must
+// be places, but not yet started (and will be started as the Manager makes
+// progress). The given network Service must already be running.
+func NewManager(storageRoot *localstorage.Root, networkService *network.Service) *Manager {
+	return &Manager{
+		storageRoot:    storageRoot,
+		networkService: networkService,
+
+		state: state{},
+	}
+}
+
+func (m *Manager) lock() (*state, func()) {
+	m.mu.Lock()
+	return &m.state, m.mu.Unlock
+}
+
+func (m *Manager) rlock() (*state, func()) {
+	m.mu.RLock()
+	return &m.state, m.mu.RUnlock
+}
+
+// Run is the runnable of the Manager, to be started using the Supervisor. It
+// is one-shot, and should not be restarted.
+func (m *Manager) Run(ctx context.Context) error {
+	state, unlock := m.lock()
+	if state.oneway {
+		unlock()
+		// TODO(q3k): restart the entire system if this happens
+		return fmt.Errorf("cannot restart cluster manager")
+	}
+	state.oneway = true
+	unlock()
+
+	configuration, err := m.storageRoot.ESP.SealedConfiguration.Unseal()
+	if err == nil {
+		supervisor.Logger(ctx).Info("Sealed configuration present. attempting to join cluster")
+		return m.join(ctx, configuration)
+	}
+
+	if !errors.Is(err, localstorage.ErrNoSealed) {
+		return fmt.Errorf("unexpected sealed config error: %w", err)
+	}
+
+	supervisor.Logger(ctx).Info("No sealed configuration, looking for node parameters")
+
+	params, err := m.nodeParams(ctx)
+	if err != nil {
+		return fmt.Errorf("no parameters available: %w", err)
+	}
+
+	switch inner := params.Cluster.(type) {
+	case *apb.NodeParameters_ClusterBootstrap_:
+		return m.bootstrap(ctx, inner.ClusterBootstrap)
+	case *apb.NodeParameters_ClusterRegister_:
+		return m.register(ctx, inner.ClusterRegister)
+	default:
+		return fmt.Errorf("node parameters misconfigured: neither cluster_bootstrap nor cluster_register set")
+	}
+}
+
+func (m *Manager) register(ctx context.Context, bootstrap *apb.NodeParameters_ClusterRegister) error {
+	return fmt.Errorf("unimplemented")
+}
+
+func (m *Manager) nodeParamsFWCFG(ctx context.Context) (*apb.NodeParameters, error) {
+	bytes, err := ioutil.ReadFile("/sys/firmware/qemu_fw_cfg/by_name/dev.monogon.metropolis/parameters.pb/raw")
+	if err != nil {
+		return nil, fmt.Errorf("could not read firmware enrolment file: %w", err)
+	}
+
+	config := apb.NodeParameters{}
+	err = proto.Unmarshal(bytes, &config)
+	if err != nil {
+		return nil, fmt.Errorf("could not unmarshal: %v", err)
+	}
+
+	return &config, nil
+}
+
+func (m *Manager) nodeParams(ctx context.Context) (*apb.NodeParameters, error) {
+	// Retrieve node parameters from qemu's fwcfg interface or ESP.
+	// TODO(q3k): probably abstract this away and implement per platform/build/...
+	paramsFWCFG, err := m.nodeParamsFWCFG(ctx)
+	if err != nil {
+		supervisor.Logger(ctx).Warningf("Could not retrieve node parameters from qemu fwcfg: %v", err)
+		paramsFWCFG = nil
+	} else {
+		supervisor.Logger(ctx).Infof("Retrieved node parameters from qemu fwcfg")
+	}
+	paramsESP, err := m.storageRoot.ESP.NodeParameters.Unmarshal()
+	if err != nil {
+		supervisor.Logger(ctx).Warningf("Could not retrieve node parameters from ESP: %v", err)
+		paramsESP = nil
+	} else {
+		supervisor.Logger(ctx).Infof("Retrieved node parameters from ESP")
+	}
+	if paramsFWCFG == nil && paramsESP == nil {
+		return nil, fmt.Errorf("could not find node parameters in ESP or qemu fwcfg")
+	}
+	if paramsFWCFG != nil && paramsESP != nil {
+		supervisor.Logger(ctx).Warningf("Node parameters found both in both ESP and qemu fwcfg, using the latter")
+		return paramsFWCFG, nil
+	} else if paramsFWCFG != nil {
+		return paramsFWCFG, nil
+	} else {
+		return paramsESP, nil
+	}
+}
+
+func (m *Manager) join(ctx context.Context, cfg *ppb.SealedConfiguration) error {
+	return fmt.Errorf("unimplemented")
+}
diff --git a/metropolis/node/core/cluster/manager_bootstrap.go b/metropolis/node/core/cluster/cluster_bootstrap.go
similarity index 67%
rename from metropolis/node/core/cluster/manager_bootstrap.go
rename to metropolis/node/core/cluster/cluster_bootstrap.go
index b80bed6..7eb8058 100644
--- a/metropolis/node/core/cluster/manager_bootstrap.go
+++ b/metropolis/node/core/cluster/cluster_bootstrap.go
@@ -24,9 +24,10 @@
 	"fmt"
 
 	"source.monogon.dev/metropolis/node/core/consensus"
-	"source.monogon.dev/metropolis/pkg/pki"
+	"source.monogon.dev/metropolis/node/core/curator"
 	"source.monogon.dev/metropolis/pkg/supervisor"
 	apb "source.monogon.dev/metropolis/proto/api"
+	cpb "source.monogon.dev/metropolis/proto/common"
 	ppb "source.monogon.dev/metropolis/proto/private"
 )
 
@@ -50,18 +51,12 @@
 	}
 	supervisor.Logger(ctx).Infof("Bootstrapping: node public key: %s", hex.EncodeToString([]byte(pub)))
 
-	node := Node{
-		clusterUnlockKey: cuk,
-		pubkey:           pub,
-		state:            ppb.Node_FSM_STATE_UP,
-		// TODO(q3k): make this configurable.
-		consensusMember:  &NodeRoleConsensusMember{},
-		kubernetesWorker: &NodeRoleKubernetesWorker{},
-	}
+	node := curator.NewNodeForBootstrap(cuk, pub)
 
 	// Run worker to keep updating /ephemeral/hosts (and thus, /etc/hosts) with
 	// our own IP address. This ensures that the node's ID always resolves to
 	// its current external IP address.
+	// TODO(q3k): move this out into roleserver.
 	supervisor.Run(ctx, "hostsfile", func(ctx context.Context) error {
 		supervisor.Signal(ctx, supervisor.SignalHealthy)
 		watcher := m.networkService.Watch()
@@ -96,51 +91,42 @@
 	}
 	supervisor.Logger(ctx).Info("Bootstrapping: consensus ready.")
 
-	nodesKV, err := m.consensus.Client().Sub("nodes")
-	if err != nil {
-		return fmt.Errorf("when retrieving nodes etcd subclient: %w", err)
-	}
-	pkiKV, err := m.consensus.Client().Sub("pki")
-	if err != nil {
-		return fmt.Errorf("when retrieving pki etcd subclient: %w", err)
-	}
-	applicationKV, err := m.consensus.Client().Sub("application")
+	metropolisKV, err := m.consensus.Client().Sub("metropolis")
 	if err != nil {
 		return fmt.Errorf("when retrieving application etcd subclient: %w", err)
 	}
 
-	// Create Metropolis CA and this node's certificate.
-	caCertBytes, _, err := PKICA.Ensure(ctx, pkiKV)
-	if err != nil {
-		return fmt.Errorf("failed to create cluster CA: %w", err)
-	}
-	nodeCert := PKINamespace.New(PKICA, "", pki.Server([]string{node.ID()}, nil))
-	nodeCert.UseExistingKey(priv)
-	nodeCertBytes, _, err := nodeCert.Ensure(ctx, pkiKV)
-	if err != nil {
-		return fmt.Errorf("failed to create node certificate: %w", err)
+	status := Status{
+		State:             cpb.ClusterState_CLUSTER_STATE_HOME,
+		hasLocalConsensus: true,
+		consensusClient:   metropolisKV,
+		// Credentials are set further down once created through a curator
+		// short-circuit bootstrap function.
+		Credentials: nil,
 	}
 
-	if err := m.storageRoot.Data.Node.Credentials.CACertificate.Write(caCertBytes, 0400); err != nil {
-		return fmt.Errorf("failed to write CA certificate: %w", err)
-	}
-	if err := m.storageRoot.Data.Node.Credentials.Certificate.Write(nodeCertBytes, 0400); err != nil {
-		return fmt.Errorf("failed to write node certificate: %w", err)
-	}
-	if err := m.storageRoot.Data.Node.Credentials.Key.Write(priv, 0400); err != nil {
-		return fmt.Errorf("failed to write node private key: %w", err)
+	// Short circuit curator into storing the new node.
+	ckv, err := status.ConsensusClient(ConsensusUserCurator)
+	if err != nil {
+		return fmt.Errorf("when retrieving consensus user for curator: %w", err)
 	}
 
-	// Update our Node object in etcd.
-	if err := node.Store(ctx, nodesKV); err != nil {
+	if err := node.BootstrapStore(ctx, ckv); err != nil {
 		return fmt.Errorf("failed to store new node in etcd: %w", err)
 	}
 
-	m.status.Set(Status{
-		State:           ClusterHome,
-		Node:            &node,
-		consensusClient: applicationKV,
-	})
+	// And short-circuit creating the curator CA and node certificate.
+	creds, err := curator.BootstrapNodeCredentials(ctx, ckv, priv, pub)
+	if err != nil {
+		return fmt.Errorf("failed to bootstrap node credentials: %w", err)
+	}
+
+	if err := creds.Save(&m.storageRoot.Data.Node.Credentials); err != nil {
+		return fmt.Errorf("failed to write node credentials: %w", err)
+	}
+
+	status.Credentials = creds
+	m.status.Set(status)
 
 	supervisor.Signal(ctx, supervisor.SignalHealthy)
 	supervisor.Signal(ctx, supervisor.SignalDone)
diff --git a/metropolis/node/core/cluster/configuration.go b/metropolis/node/core/cluster/configuration.go
deleted file mode 100644
index adb156d..0000000
--- a/metropolis/node/core/cluster/configuration.go
+++ /dev/null
@@ -1,19 +0,0 @@
-// Copyright 2020 The Monogon Project Authors.
-//
-// SPDX-License-Identifier: Apache-2.0
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package cluster
-
-
diff --git a/metropolis/node/core/cluster/manager.go b/metropolis/node/core/cluster/manager.go
deleted file mode 100644
index cc98d8e..0000000
--- a/metropolis/node/core/cluster/manager.go
+++ /dev/null
@@ -1,268 +0,0 @@
-// Copyright 2020 The Monogon Project Authors.
-//
-// SPDX-License-Identifier: Apache-2.0
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package cluster
-
-import (
-	"context"
-	"errors"
-	"fmt"
-	"io/ioutil"
-	"sync"
-
-	"google.golang.org/protobuf/proto"
-
-	"source.monogon.dev/metropolis/node/core/consensus"
-	"source.monogon.dev/metropolis/node/core/consensus/client"
-	"source.monogon.dev/metropolis/node/core/localstorage"
-	"source.monogon.dev/metropolis/node/core/network"
-	"source.monogon.dev/metropolis/pkg/event"
-	"source.monogon.dev/metropolis/pkg/event/memory"
-	"source.monogon.dev/metropolis/pkg/supervisor"
-	apb "source.monogon.dev/metropolis/proto/api"
-	ppb "source.monogon.dev/metropolis/proto/private"
-)
-
-// Status is returned to Cluster clients (ie., node code) on Manager.Watch/.Get.
-type Status struct {
-	// State is the current state of the cluster, as seen by the node.
-	State ClusterState
-	// Node is the configuration of this node in the cluster.
-	Node *Node
-
-	consensusClient client.Namespaced
-}
-
-// ConsensusUser is the to-level user of an etcd client in Metropolis node
-// code. These need to be defined ahead of time in an Go 'enum', and different
-// ConsensusUsers should not be shared by different codepaths.
-type ConsensusUser string
-
-const (
-	ConsensusUserKubernetesPKI ConsensusUser = "kubernetes-pki"
-	ConsensusUserCurator       ConsensusUser = "curator"
-)
-
-// ConsensusClient returns an etcd/consensus client for a given ConsensusUser.
-func (s *Status) ConsensusClient(user ConsensusUser) (client.Namespaced, error) {
-	// Ensure that we already are connected to etcd and are in a state in which we
-	// should be handing out cluster connectivity.
-	if s.consensusClient == nil {
-		return nil, fmt.Errorf("not connected")
-	}
-	switch s.State {
-	case ClusterHome:
-	case ClusterSplit:
-		return nil, fmt.Errorf("refusing connection with cluster state %v", s.State)
-	default:
-	}
-
-	// Ensure only defined 'applications' are used to prevent programmer error and
-	// casting to ConsensusUser from an arbitrary string.
-	switch user {
-	case ConsensusUserKubernetesPKI:
-	case ConsensusUserCurator:
-	default:
-		return nil, fmt.Errorf("unknown ConsensusUser %q", user)
-	}
-	client, err := s.consensusClient.Sub(string(user))
-	if err != nil {
-		return nil, fmt.Errorf("retrieving subclient failed: %w", err)
-	}
-	return client, nil
-}
-
-type state struct {
-	mu sync.RWMutex
-
-	oneway       bool
-	stateCluster ClusterState
-	stateNode    ppb.Node_FSMState
-
-	configuration *ppb.SealedConfiguration
-}
-
-type Watcher struct {
-	event.Watcher
-}
-
-func (w *Watcher) Get(ctx context.Context) (*Status, error) {
-	val, err := w.Watcher.Get(ctx)
-	if err != nil {
-		return nil, err
-	}
-	status := val.(Status)
-	return &status, err
-}
-
-// GetHome waits until the cluster, from the point of view of this node, is in
-// the ClusterHome state. This can be used to wait for the cluster manager to
-// 'settle', before clients start more node services.
-func (w *Watcher) GetHome(ctx context.Context) (*Status, error) {
-	for {
-		status, err := w.Get(ctx)
-		if err != nil {
-			return nil, err
-		}
-		switch status.State {
-		case ClusterHome:
-			return status, nil
-		case ClusterDisowning:
-			return nil, fmt.Errorf("the cluster has disowned this node")
-		}
-	}
-}
-
-func (m *Manager) Watch() Watcher {
-	return Watcher{
-		Watcher: m.status.Watch(),
-	}
-}
-
-type Manager struct {
-	storageRoot    *localstorage.Root
-	networkService *network.Service
-	status         memory.Value
-
-	state
-
-	// consensus is the spawned etcd/consensus service, if the Manager brought
-	// up a Node that should run one.
-	consensus *consensus.Service
-}
-
-// NewManager creates a new cluster Manager. The given localstorage Root must
-// be places, but not yet started (and will be started as the Manager makes
-// progress). The given network Service must already be running.
-func NewManager(storageRoot *localstorage.Root, networkService *network.Service) *Manager {
-	return &Manager{
-		storageRoot:    storageRoot,
-		networkService: networkService,
-
-		state: state{
-			stateCluster: ClusterUnknown,
-			stateNode:    ppb.Node_FSM_STATE_INVALID,
-		},
-	}
-}
-
-func (m *Manager) lock() (*state, func()) {
-	m.mu.Lock()
-	return &m.state, m.mu.Unlock
-}
-
-func (m *Manager) rlock() (*state, func()) {
-	m.mu.RLock()
-	return &m.state, m.mu.RUnlock
-}
-
-// Run is the runnable of the Manager, to be started using the Supervisor. It
-// is one-shot, and should not be restarted.
-func (m *Manager) Run(ctx context.Context) error {
-	state, unlock := m.lock()
-	if state.oneway {
-		unlock()
-		// TODO(q3k): restart the entire system if this happens
-		return fmt.Errorf("cannot restart cluster manager")
-	}
-	state.oneway = true
-	unlock()
-
-	configuration, err := m.storageRoot.ESP.SealedConfiguration.Unseal()
-	if err == nil {
-		supervisor.Logger(ctx).Info("Sealed configuration present. attempting to join cluster")
-		return m.join(ctx, configuration)
-	}
-
-	if !errors.Is(err, localstorage.ErrNoSealed) {
-		return fmt.Errorf("unexpected sealed config error: %w", err)
-	}
-
-	supervisor.Logger(ctx).Info("No sealed configuration, looking for node parameters")
-
-	params, err := m.nodeParams(ctx)
-	if err != nil {
-		return fmt.Errorf("no parameters available: %w", err)
-	}
-
-	switch inner := params.Cluster.(type) {
-	case *apb.NodeParameters_ClusterBootstrap_:
-		return m.bootstrap(ctx, inner.ClusterBootstrap)
-	case *apb.NodeParameters_ClusterRegister_:
-		return m.register(ctx, inner.ClusterRegister)
-	default:
-		return fmt.Errorf("node parameters misconfigured: neither cluster_bootstrap nor cluster_register set")
-	}
-}
-
-func (m *Manager) register(ctx context.Context, bootstrap *apb.NodeParameters_ClusterRegister) error {
-	return fmt.Errorf("unimplemented")
-}
-
-func (m *Manager) nodeParamsFWCFG(ctx context.Context) (*apb.NodeParameters, error) {
-	bytes, err := ioutil.ReadFile("/sys/firmware/qemu_fw_cfg/by_name/dev.monogon.metropolis/parameters.pb/raw")
-	if err != nil {
-		return nil, fmt.Errorf("could not read firmware enrolment file: %w", err)
-	}
-
-	config := apb.NodeParameters{}
-	err = proto.Unmarshal(bytes, &config)
-	if err != nil {
-		return nil, fmt.Errorf("could not unmarshal: %v", err)
-	}
-
-	return &config, nil
-}
-
-func (m *Manager) nodeParams(ctx context.Context) (*apb.NodeParameters, error) {
-	// Retrieve node parameters from qemu's fwcfg interface or ESP.
-	// TODO(q3k): probably abstract this away and implement per platform/build/...
-	paramsFWCFG, err := m.nodeParamsFWCFG(ctx)
-	if err != nil {
-		supervisor.Logger(ctx).Warningf("Could not retrieve node parameters from qemu fwcfg: %v", err)
-		paramsFWCFG = nil
-	} else {
-		supervisor.Logger(ctx).Infof("Retrieved node parameters from qemu fwcfg")
-	}
-	paramsESP, err := m.storageRoot.ESP.NodeParameters.Unmarshal()
-	if err != nil {
-		supervisor.Logger(ctx).Warningf("Could not retrieve node parameters from ESP: %v", err)
-		paramsESP = nil
-	} else {
-		supervisor.Logger(ctx).Infof("Retrieved node parameters from ESP")
-	}
-	if paramsFWCFG == nil && paramsESP == nil {
-		return nil, fmt.Errorf("could not find node parameters in ESP or qemu fwcfg")
-	}
-	if paramsFWCFG != nil && paramsESP != nil {
-		supervisor.Logger(ctx).Warningf("Node parameters found both in both ESP and qemu fwcfg, using the latter")
-		return paramsFWCFG, nil
-	} else if paramsFWCFG != nil {
-		return paramsFWCFG, nil
-	} else {
-		return paramsESP, nil
-	}
-}
-
-func (m *Manager) join(ctx context.Context, cfg *ppb.SealedConfiguration) error {
-	return fmt.Errorf("unimplemented")
-}
-
-// Node returns the Node that the Manager brought into a cluster, or nil if the
-// Manager is not Running.  This is safe to call from any goroutine.
-func (m *Manager) Node() *Node {
-	return nil
-}
diff --git a/metropolis/node/core/cluster/node.go b/metropolis/node/core/cluster/node.go
deleted file mode 100644
index 1d6e73d..0000000
--- a/metropolis/node/core/cluster/node.go
+++ /dev/null
@@ -1,207 +0,0 @@
-// Copyright 2020 The Monogon Project Authors.
-//
-// SPDX-License-Identifier: Apache-2.0
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package cluster
-
-import (
-	"context"
-	"encoding/hex"
-	"fmt"
-	"net"
-	"strings"
-
-	"go.etcd.io/etcd/clientv3"
-	"golang.org/x/sys/unix"
-	"google.golang.org/protobuf/proto"
-
-	"source.monogon.dev/metropolis/node/core/localstorage"
-	"source.monogon.dev/metropolis/pkg/supervisor"
-	ppb "source.monogon.dev/metropolis/proto/private"
-)
-
-// Node is a Metropolis cluster member. A node is a virtual or physical machine
-// running Metropolis. This object represents a node only as part of a cluster
-// - ie., this object will never be available outside of
-// //metropolis/node/core/cluster if the Node is not part of a Cluster.  Nodes
-// are inherently tied to their long term storage, which is etcd. As such,
-// methods on this object relate heavily to the Node's expected lifecycle on
-// etcd.
-type Node struct {
-	// clusterUnlockKey is half of the unlock key required to mount the node's
-	// data partition. It's stored in etcd, and will only be provided to the
-	// Node if it can prove its identity via an integrity mechanism (ie. via
-	// TPM), or when the Node was just created (as the key is generated locally
-	// by localstorage on first format/mount).  The other part of the unlock
-	// key is the LocalUnlockKey that's present on the node's ESP partition.
-	clusterUnlockKey []byte
-
-	pubkey []byte
-
-	state ppb.Node_FSMState
-
-	// A Node can have multiple Roles. Each Role is represented by the presence
-	// of NodeRole* structures in this structure, with a nil pointer
-	// representing the lack of a role.
-	consensusMember  *NodeRoleConsensusMember
-	kubernetesWorker *NodeRoleKubernetesWorker
-}
-
-// NodeRoleConsensusMember defines that the Node is a consensus (etcd) cluster
-// member.
-type NodeRoleConsensusMember struct {
-}
-
-// NodeRoleKubernetesWorker defines that the Node should be running the
-// Kubernetes control and data plane.
-type NodeRoleKubernetesWorker struct {
-}
-
-// ID returns the name of this node, which is `metropolis-{pubkeyHash}`. This
-// name should be the primary way to refer to Metropoils nodes within a
-// cluster, and is guaranteed to be unique by relying on cryptographic
-// randomness.
-func (n *Node) ID() string {
-	return fmt.Sprintf("metropolis-%s", n.IDBare())
-}
-
-// IDBare returns the `{pubkeyHash}` part of the node ID.
-func (n Node) IDBare() string {
-	return hex.EncodeToString(n.pubkey[:16])
-}
-
-func (n *Node) String() string {
-	return n.ID()
-}
-
-// ConsensusMember returns a copy of the NodeRoleConsensusMember struct if the
-// Node is a consensus member, otherwise nil.
-func (n *Node) ConsensusMember() *NodeRoleConsensusMember {
-	if n.consensusMember == nil {
-		return nil
-	}
-	cm := *n.consensusMember
-	return &cm
-}
-
-// KubernetesWorker returns a copy of the NodeRoleKubernetesWorker struct if
-// the Node is a kubernetes worker, otherwise nil.
-func (n *Node) KubernetesWorker() *NodeRoleKubernetesWorker {
-	if n.kubernetesWorker == nil {
-		return nil
-	}
-	kw := *n.kubernetesWorker
-	return &kw
-}
-
-// etcdPath builds the etcd path in which this node's protobuf-serialized state
-// is stored in etcd.
-func (n *Node) etcdPath() string {
-	return fmt.Sprintf("/nodes/%s", n.ID())
-}
-
-// proto serializes the Node object into protobuf, to be used for saving to
-// etcd.
-func (n *Node) proto() *ppb.Node {
-	msg := &ppb.Node{
-		ClusterUnlockKey: n.clusterUnlockKey,
-		PublicKey:        n.pubkey,
-		FsmState:         n.state,
-		Roles:            &ppb.Node_Roles{},
-	}
-	if n.consensusMember != nil {
-		msg.Roles.ConsensusMember = &ppb.Node_Roles_ConsensusMember{}
-	}
-	if n.kubernetesWorker != nil {
-		msg.Roles.KubernetesWorker = &ppb.Node_Roles_KubernetesWorker{}
-	}
-	return msg
-}
-
-// Store saves the Node into etcd. This should be called only once per Node
-// (ie. when the Node has been created).
-func (n *Node) Store(ctx context.Context, kv clientv3.KV) error {
-	// Currently the only flow to store a node to etcd is a write-once flow:
-	// once a node is created, it cannot be deleted or updated. In the future,
-	// flows to change cluster node roles might be introduced (ie. to promote
-	// nodes to consensus members, etc).
-	key := n.etcdPath()
-	msg := n.proto()
-	nodeRaw, err := proto.Marshal(msg)
-	if err != nil {
-		return fmt.Errorf("failed to marshal node: %w", err)
-	}
-
-	res, err := kv.Txn(ctx).If(
-		clientv3.Compare(clientv3.CreateRevision(key), "=", 0),
-	).Then(
-		clientv3.OpPut(key, string(nodeRaw)),
-	).Commit()
-	if err != nil {
-		return fmt.Errorf("failed to store node: %w", err)
-	}
-
-	if !res.Succeeded {
-		return fmt.Errorf("attempted to re-register node (unsupported flow)")
-	}
-	return nil
-}
-
-// MakeConsensusMember turns the node into a consensus member. This only
-// configures internal fields, and does not actually start any services.
-func (n *Node) MakeConsensusMember() error {
-	if n.consensusMember != nil {
-		return fmt.Errorf("node already is consensus member")
-	}
-	n.consensusMember = &NodeRoleConsensusMember{}
-	return nil
-}
-
-// MakeKubernetesWorker turns the node into a kubernetes worker. This only
-// configures internal fields, and does not actually start any services.
-func (n *Node) MakeKubernetesWorker() error {
-	if n.kubernetesWorker != nil {
-		return fmt.Errorf("node is already kubernetes worker")
-	}
-	n.kubernetesWorker = &NodeRoleKubernetesWorker{}
-	return nil
-}
-
-// ConfigureLocalHostname uses the node's ID as a hostname, and sets the
-// current hostname, and local files like hosts and machine-id accordingly.
-func (n *Node) ConfigureLocalHostname(ctx context.Context, ephemeral *localstorage.EphemeralDirectory, address net.IP) error {
-	if err := unix.Sethostname([]byte(n.ID())); err != nil {
-		return fmt.Errorf("failed to set runtime hostname: %w", err)
-	}
-	hosts := []string{
-		"127.0.0.1 localhost",
-		"::1 localhost",
-		fmt.Sprintf("%s %s", address.String(), n.ID()),
-	}
-	if err := ephemeral.Hosts.Write([]byte(strings.Join(hosts, "\n")), 0644); err != nil {
-		return fmt.Errorf("failed to write /ephemeral/hosts: %w", err)
-	}
-	if err := ephemeral.MachineID.Write([]byte(n.IDBare()), 0644); err != nil {
-		return fmt.Errorf("failed to write /ephemeral/machine-id: %w", err)
-	}
-
-	// Check that we are self-resolvable.
-	ip, err := net.ResolveIPAddr("ip", n.ID())
-	if err != nil {
-		return fmt.Errorf("failed to self-resolve: %w", err)
-	}
-	supervisor.Logger(ctx).Infof("This is node %s at %v", n.ID(), ip)
-	return nil
-}
diff --git a/metropolis/node/core/cluster/status.go b/metropolis/node/core/cluster/status.go
new file mode 100644
index 0000000..d82130b
--- /dev/null
+++ b/metropolis/node/core/cluster/status.go
@@ -0,0 +1,77 @@
+package cluster
+
+import (
+	"errors"
+	"fmt"
+
+	"source.monogon.dev/metropolis/node/core/consensus/client"
+	"source.monogon.dev/metropolis/node/core/curator"
+	cpb "source.monogon.dev/metropolis/proto/common"
+)
+
+var (
+	ErrNoLocalConsensus = errors.New("this node does not have direct access to etcd")
+)
+
+// Status is returned to Cluster clients (ie., node code) on Manager.Watch/.Get.
+type Status struct {
+	// State is the current state of the cluster, as seen by the node.
+	State cpb.ClusterState
+
+	// hasLocalConsensus is true if the local node is running a local consensus
+	// (etcd) server.
+	hasLocalConsensus bool
+	// consensusClient is an etcd client to the local consensus server if the node
+	// has such a server and the cluster state is HOME or SPLIT.
+	consensusClient client.Namespaced
+
+	// Credentials used for the node to authenticate to the Curator and other
+	// cluster services.
+	Credentials *curator.NodeCredentials
+}
+
+// ConsensusUser is the to-level user of an etcd client in Metropolis node
+// code. These need to be defined ahead of time in an Go 'enum', and different
+// ConsensusUsers should not be shared by different codepaths.
+type ConsensusUser string
+
+const (
+	ConsensusUserKubernetesPKI ConsensusUser = "kubernetes-pki"
+	ConsensusUserCurator       ConsensusUser = "curator"
+)
+
+// ConsensusClient returns an etcd/consensus client for a given ConsensusUser.
+// The node must be running a local consensus/etcd server.
+func (s *Status) ConsensusClient(user ConsensusUser) (client.Namespaced, error) {
+	if !s.hasLocalConsensus {
+		return nil, ErrNoLocalConsensus
+	}
+
+	// Ensure that we already are connected to etcd and are in a state in which we
+	// should be handing out cluster connectivity.
+	if s.consensusClient == nil {
+		return nil, fmt.Errorf("not connected")
+	}
+	switch s.State {
+	case cpb.ClusterState_CLUSTER_STATE_HOME:
+	case cpb.ClusterState_CLUSTER_STATE_SPLIT:
+		// The consensus client is resistant to being split off, and will serve
+		// as soon as the split is resolved.
+	default:
+		return nil, fmt.Errorf("refusing connection with cluster state %v", s.State)
+	}
+
+	// Ensure only defined 'applications' are used to prevent programmer error and
+	// casting to ConsensusUser from an arbitrary string.
+	switch user {
+	case ConsensusUserKubernetesPKI:
+	case ConsensusUserCurator:
+	default:
+		return nil, fmt.Errorf("unknown ConsensusUser %q", user)
+	}
+	client, err := s.consensusClient.Sub(string(user))
+	if err != nil {
+		return nil, fmt.Errorf("retrieving subclient failed: %w", err)
+	}
+	return client, nil
+}
diff --git a/metropolis/node/core/cluster/watcher.go b/metropolis/node/core/cluster/watcher.go
new file mode 100644
index 0000000..1251f2d
--- /dev/null
+++ b/metropolis/node/core/cluster/watcher.go
@@ -0,0 +1,46 @@
+package cluster
+
+import (
+	"context"
+	"fmt"
+
+	"source.monogon.dev/metropolis/pkg/event"
+	cpb "source.monogon.dev/metropolis/proto/common"
+)
+
+type Watcher struct {
+	event.Watcher
+}
+
+func (w *Watcher) Get(ctx context.Context) (*Status, error) {
+	val, err := w.Watcher.Get(ctx)
+	if err != nil {
+		return nil, err
+	}
+	status := val.(Status)
+	return &status, err
+}
+
+// GetHome waits until the cluster, from the point of view of this node, is in
+// the ClusterHome state. This can be used to wait for the cluster manager to
+// 'settle', before clients start more node services.
+func (w *Watcher) GetHome(ctx context.Context) (*Status, error) {
+	for {
+		status, err := w.Get(ctx)
+		if err != nil {
+			return nil, err
+		}
+		switch status.State {
+		case cpb.ClusterState_CLUSTER_STATE_HOME:
+			return status, nil
+		case cpb.ClusterState_CLUSTER_STATE_DISOWNING:
+			return nil, fmt.Errorf("the cluster has disowned this node")
+		}
+	}
+}
+
+func (m *Manager) Watch() Watcher {
+	return Watcher{
+		Watcher: m.status.Watch(),
+	}
+}
diff --git a/metropolis/node/core/main.go b/metropolis/node/core/main.go
index d9f408e..6f13a3a 100644
--- a/metropolis/node/core/main.go
+++ b/metropolis/node/core/main.go
@@ -32,7 +32,6 @@
 
 	"golang.org/x/sys/unix"
 	"google.golang.org/grpc"
-
 	common "source.monogon.dev/metropolis/node"
 	"source.monogon.dev/metropolis/node/core/cluster"
 	"source.monogon.dev/metropolis/node/core/curator"
@@ -174,7 +173,7 @@
 		}
 		c := curator.New(curator.Config{
 			Etcd:   kv,
-			NodeID: status.Node.ID(),
+			NodeID: status.Credentials.ID(),
 			// TODO(q3k): make this configurable?
 			LeaderTTL: time.Second * 5,
 			Directory: &root.Ephemeral.Curator,
@@ -187,54 +186,35 @@
 		// start all services that we should be running.
 
 		logger.Info("Enrolment success, continuing startup.")
-		logger.Info(fmt.Sprintf("This node (%s) has roles:", status.Node.String()))
-		if cm := status.Node.ConsensusMember(); cm != nil {
-			// There's no need to start anything for when we are a consensus
-			// member - the cluster manager does this for us if necessary (as
-			// creating/enrolling/joining a cluster is pretty tied into cluster
-			// lifecycle management).
-			logger.Info(fmt.Sprintf(" - etcd consensus member"))
-		}
-		if kw := status.Node.KubernetesWorker(); kw != nil {
-			logger.Info(fmt.Sprintf(" - kubernetes worker"))
+
+		// HACK: always start k8s worker, this is being currently refactored
+		// and will get fixed in review/188.
+		logger.Info("Starting Kubernetes worker services...")
+
+		kkv, err := status.ConsensusClient(cluster.ConsensusUserKubernetesPKI)
+		if err != nil {
+			return fmt.Errorf("failed to retrieve consensus kubernetes PKI client: %w", err)
 		}
 
-		// If we're supposed to be a kubernetes worker, start kubernetes
-		// services and containerd.  In the future, this might be split further
-		// into kubernetes control plane and data plane roles.
-		// TODO(q3k): watch on cluster status updates to start/stop kubernetes
-		// service.
-		var containerdSvc *containerd.Service
-		var kubeSvc *kubernetes.Service
-		if kw := status.Node.KubernetesWorker(); kw != nil {
-			logger.Info("Starting Kubernetes worker services...")
+		// Ensure Kubernetes PKI objects exist in etcd.
+		kpki := pki.New(lt.MustLeveledFor("pki.kubernetes"), kkv)
+		if err := kpki.EnsureAll(ctx); err != nil {
+			return fmt.Errorf("failed to ensure kubernetes PKI present: %w", err)
+		}
 
-			kv, err := status.ConsensusClient(cluster.ConsensusUserKubernetesPKI)
-			if err != nil {
-				return fmt.Errorf("failed to retrieve consensus kubernetes PKI client: %w", err)
-			}
+		containerdSvc := &containerd.Service{
+			EphemeralVolume: &root.Ephemeral.Containerd,
+		}
+		if err := supervisor.Run(ctx, "containerd", containerdSvc.Run); err != nil {
+			return fmt.Errorf("failed to start containerd service: %w", err)
+		}
 
-			// Ensure Kubernetes PKI objects exist in etcd.
-			kpki := pki.New(lt.MustLeveledFor("pki.kubernetes"), kv)
-			if err := kpki.EnsureAll(ctx); err != nil {
-				return fmt.Errorf("failed to ensure kubernetes PKI present: %w", err)
-			}
-
-			containerdSvc = &containerd.Service{
-				EphemeralVolume: &root.Ephemeral.Containerd,
-			}
-			if err := supervisor.Run(ctx, "containerd", containerdSvc.Run); err != nil {
-				return fmt.Errorf("failed to start containerd service: %w", err)
-			}
-
-			kubernetesConfig.KPKI = kpki
-			kubernetesConfig.Root = root
-			kubernetesConfig.Network = networkSvc
-			kubeSvc = kubernetes.New(kubernetesConfig)
-			if err := supervisor.Run(ctx, "kubernetes", kubeSvc.Run); err != nil {
-				return fmt.Errorf("failed to start kubernetes service: %w", err)
-			}
-
+		kubernetesConfig.KPKI = kpki
+		kubernetesConfig.Root = root
+		kubernetesConfig.Network = networkSvc
+		kubeSvc := kubernetes.New(kubernetesConfig)
+		if err := supervisor.Run(ctx, "kubernetes", kubeSvc.Run); err != nil {
+			return fmt.Errorf("failed to start kubernetes service: %w", err)
 		}
 
 		// Start the node debug service.