m/n/core/cluster: migrate to events and etcd namespaced client
This moves the status of the cluster manager to use a local event
variable. Watchers (like the node startup code) can now use this to get
updates on the state of the node and its cluster membership in a way
that's more abstracted from a sequential startup. This will permit us to
move a lof othe startup code into code common across different node
lifecycle paths.
Test Plan: Refactor, exercised by e2e.
X-Origin-Diff: phab/D757
GitOrigin-RevId: 31a3600ad2aab90a1e7f84d741e7ea40a0422724
diff --git a/metropolis/node/core/cluster/BUILD.bazel b/metropolis/node/core/cluster/BUILD.bazel
index c6e01e5..a3e2593 100644
--- a/metropolis/node/core/cluster/BUILD.bazel
+++ b/metropolis/node/core/cluster/BUILD.bazel
@@ -13,8 +13,10 @@
visibility = ["//metropolis/node/core:__subpackages__"],
deps = [
"//metropolis/node/core/consensus:go_default_library",
+ "//metropolis/node/core/consensus/client:go_default_library",
"//metropolis/node/core/localstorage:go_default_library",
"//metropolis/node/core/network:go_default_library",
+ "//metropolis/pkg/event:go_default_library",
"//metropolis/pkg/pki:go_default_library",
"//metropolis/pkg/supervisor:go_default_library",
"//metropolis/proto/api:go_default_library",
diff --git a/metropolis/node/core/cluster/cluster.go b/metropolis/node/core/cluster/cluster.go
index 1277c2a..b194f25 100644
--- a/metropolis/node/core/cluster/cluster.go
+++ b/metropolis/node/core/cluster/cluster.go
@@ -22,20 +22,44 @@
"source.monogon.dev/metropolis/pkg/pki"
)
+// ClusterState is the state of the cluster from the point of view of the
+// current node. Clients within the node code can watch this state to change
+// their behaviour as needed.
type ClusterState int
const (
+ // ClusterStateUnknown means the node has not yet determined the existence
+ // of a cluster it should join or start. This is a transient, initial state
+ // that should only manifest during boot.
ClusterUnknown ClusterState = iota
+ // ClusterForeign means the node is attempting to register into an already
+ // existing cluster with which it managed to make preliminary contact, but
+ // which the cluster has not yet fully productionized (eg. the node is
+ // still being hardware attested, or the operator needs to confirm the
+ // registration of this node).
ClusterForeign
+ // ClusterTrusted means the node is attempting to register into an already
+ // registered cluster, and has been trusted by it. The node is now
+ // attempting to finally commit into registering the cluster.
ClusterTrusted
+ // ClusterHome means the node is part of a cluster. This is the bulk of
+ // time in which this node will spend its time.
ClusterHome
+ // ClusterDisowning means the node has been disowned (ie., removed) by the
+ // cluster, and that it will not be ever part of any cluster again, and
+ // that it will be decommissioned by the operator.
ClusterDisowning
+ // ClusterSplit means that the node would usually be Home in a cluster, but
+ // has been split from the consensus of the cluster. This can happen for
+ // nodes running consensus when consensus is lost (eg. when there is no
+ // quorum or this node has been netsplit), and for other nodes if they have
+ // lost network connectivity to the consensus nodes. Clients should make
+ // their own decision what action to perform in this state, depending on
+ // the level of consistency required and whether it makes sense for the
+ // node to fence its services off.
+ ClusterSplit
)
-type Cluster struct {
- State ClusterState
-}
-
func (s ClusterState) String() string {
switch s {
case ClusterForeign:
@@ -46,17 +70,12 @@
return "ClusterHome"
case ClusterDisowning:
return "ClusterDisowning"
+ case ClusterSplit:
+ return "ClusterSplit"
}
return fmt.Sprintf("Invalid(%d)", s)
}
-var clusterStateTransitions = map[ClusterState][]ClusterState{
- ClusterUnknown: {ClusterForeign, ClusterHome, ClusterDisowning},
- ClusterForeign: {ClusterTrusted},
- ClusterTrusted: {ClusterHome},
- ClusterHome: {ClusterHome, ClusterDisowning},
-}
-
var (
PKINamespace = pki.Namespaced("/cluster-pki/")
PKICA = PKINamespace.New(pki.SelfSigned, "cluster-ca", pki.CA("Metropolis Cluster CA"))
diff --git a/metropolis/node/core/cluster/manager.go b/metropolis/node/core/cluster/manager.go
index 65af212..874d3ae 100644
--- a/metropolis/node/core/cluster/manager.go
+++ b/metropolis/node/core/cluster/manager.go
@@ -26,16 +26,60 @@
"google.golang.org/protobuf/proto"
"source.monogon.dev/metropolis/node/core/consensus"
+ "source.monogon.dev/metropolis/node/core/consensus/client"
"source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/node/core/network"
+ "source.monogon.dev/metropolis/pkg/event"
"source.monogon.dev/metropolis/pkg/supervisor"
apb "source.monogon.dev/metropolis/proto/api"
ppb "source.monogon.dev/metropolis/proto/private"
)
-type managerResult struct {
- node *Node
- err error
+// Status is returned to Cluster clients (ie., node code) on Manager.Watch/.Get.
+type Status struct {
+ // State is the current state of the cluster, as seen by the node.
+ State ClusterState
+ // Node is the configuration of this node in the cluster.
+ Node *Node
+
+ consensusClient client.Namespaced
+}
+
+// ConsensusUser is the to-level user of an etcd client in Metropolis node
+// code. These need to be defined ahead of time in an Go 'enum', and different
+// ConsensusUsers should not be shared by different codepaths.
+type ConsensusUser string
+
+const (
+ ConsensusUserKubernetesPKI ConsensusUser = "kubernetes-pki"
+)
+
+// ConsensusClient returns an etcd/consensus client for a given ConsensusUser.
+func (s *Status) ConsensusClient(user ConsensusUser) (client.Namespaced, error) {
+ // Ensure that we already are connected to etcd and are in a state in which we
+ // should be handing out cluster connectivity.
+ if s.consensusClient == nil {
+ return nil, fmt.Errorf("not connected")
+ }
+ switch s.State {
+ case ClusterHome:
+ case ClusterSplit:
+ return nil, fmt.Errorf("refusing connection with cluster state %v", s.State)
+ default:
+ }
+
+ // Ensure only defined 'applications' are used to prevent programmer error and
+ // casting to ConsensusUser from an arbitrary string.
+ switch user {
+ case ConsensusUserKubernetesPKI:
+ default:
+ return nil, fmt.Errorf("unknown ConsensusUser %q", user)
+ }
+ client, err := s.consensusClient.Sub(string(user))
+ if err != nil {
+ return nil, fmt.Errorf("retrieving subclient failed: %w", err)
+ }
+ return client, nil
}
type state struct {
@@ -46,27 +90,49 @@
stateNode ppb.Node_FSMState
configuration *ppb.SealedConfiguration
-
- result *managerResult
- waiters []chan *managerResult
}
-func (s *state) setResult(node *Node, err error) {
- s.result = &managerResult{
- node: node,
- err: err,
+type Watcher struct {
+ event.Watcher
+}
+
+func (w *Watcher) Get(ctx context.Context) (*Status, error) {
+ val, err := w.Watcher.Get(ctx)
+ if err != nil {
+ return nil, err
}
- for _, w := range s.waiters {
- go func(c chan *managerResult) {
- c <- s.result
- }(w)
+ status := val.(Status)
+ return &status, err
+}
+
+// GetHome waits until the cluster, from the point of view of this node, is in
+// the ClusterHome state. This can be used to wait for the cluster manager to
+// 'settle', before clients start more node services.
+func (w *Watcher) GetHome(ctx context.Context) (*Status, error) {
+ for {
+ status, err := w.Get(ctx)
+ if err != nil {
+ return nil, err
+ }
+ switch status.State {
+ case ClusterHome:
+ return status, nil
+ case ClusterDisowning:
+ return nil, fmt.Errorf("the cluster has disowned this node")
+ }
}
- s.waiters = nil
+}
+
+func (m *Manager) Watch() Watcher {
+ return Watcher{
+ Watcher: m.status.Watch(),
+ }
}
type Manager struct {
storageRoot *localstorage.Root
networkService *network.Service
+ status event.MemoryValue
state
@@ -100,21 +166,6 @@
return &m.state, m.mu.RUnlock
}
-func (m *Manager) Wait() (*Node, error) {
- state, unlock := m.lock()
-
- if state.result != nil {
- unlock()
- return state.result.node, state.result.err
- }
-
- c := make(chan *managerResult)
- state.waiters = append(state.waiters, c)
- unlock()
- res := <-c
- return res.node, res.err
-}
-
// Run is the runnable of the Manager, to be started using the Supervisor. It
// is one-shot, and should not be restarted.
func (m *Manager) Run(ctx context.Context) error {
@@ -194,7 +245,7 @@
return nil, fmt.Errorf("could not find node parameters in ESP or qemu fwcfg")
}
if paramsFWCFG != nil && paramsESP != nil {
- supervisor.Logger(ctx).Warningf("Node parameters found both inboth ESP and qemu fwcfg, using the latter")
+ supervisor.Logger(ctx).Warningf("Node parameters found both in both ESP and qemu fwcfg, using the latter")
return paramsFWCFG, nil
} else if paramsFWCFG != nil {
return paramsFWCFG, nil
diff --git a/metropolis/node/core/cluster/manager_bootstrap.go b/metropolis/node/core/cluster/manager_bootstrap.go
index e6f94a2..b80bed6 100644
--- a/metropolis/node/core/cluster/manager_bootstrap.go
+++ b/metropolis/node/core/cluster/manager_bootstrap.go
@@ -96,17 +96,27 @@
}
supervisor.Logger(ctx).Info("Bootstrapping: consensus ready.")
- kv := m.consensus.KVRoot()
- node.KV = kv
+ nodesKV, err := m.consensus.Client().Sub("nodes")
+ if err != nil {
+ return fmt.Errorf("when retrieving nodes etcd subclient: %w", err)
+ }
+ pkiKV, err := m.consensus.Client().Sub("pki")
+ if err != nil {
+ return fmt.Errorf("when retrieving pki etcd subclient: %w", err)
+ }
+ applicationKV, err := m.consensus.Client().Sub("application")
+ if err != nil {
+ return fmt.Errorf("when retrieving application etcd subclient: %w", err)
+ }
// Create Metropolis CA and this node's certificate.
- caCertBytes, _, err := PKICA.Ensure(ctx, kv)
+ caCertBytes, _, err := PKICA.Ensure(ctx, pkiKV)
if err != nil {
return fmt.Errorf("failed to create cluster CA: %w", err)
}
nodeCert := PKINamespace.New(PKICA, "", pki.Server([]string{node.ID()}, nil))
nodeCert.UseExistingKey(priv)
- nodeCertBytes, _, err := nodeCert.Ensure(ctx, kv)
+ nodeCertBytes, _, err := nodeCert.Ensure(ctx, pkiKV)
if err != nil {
return fmt.Errorf("failed to create node certificate: %w", err)
}
@@ -121,15 +131,18 @@
return fmt.Errorf("failed to write node private key: %w", err)
}
- // Update our Node obejct in etcd.
- if err := node.Store(ctx, kv); err != nil {
+ // Update our Node object in etcd.
+ if err := node.Store(ctx, nodesKV); err != nil {
return fmt.Errorf("failed to store new node in etcd: %w", err)
}
- state.setResult(&node, nil)
+ m.status.Set(Status{
+ State: ClusterHome,
+ Node: &node,
+ consensusClient: applicationKV,
+ })
supervisor.Signal(ctx, supervisor.SignalHealthy)
supervisor.Signal(ctx, supervisor.SignalDone)
return nil
}
-
diff --git a/metropolis/node/core/cluster/node.go b/metropolis/node/core/cluster/node.go
index a38da8b..1d6e73d 100644
--- a/metropolis/node/core/cluster/node.go
+++ b/metropolis/node/core/cluster/node.go
@@ -57,10 +57,6 @@
// representing the lack of a role.
consensusMember *NodeRoleConsensusMember
kubernetesWorker *NodeRoleKubernetesWorker
-
- // At runtime, this represents an etcd client to the consensus cluster. This
- // is used by applications (like Kubernetes).
- KV clientv3.KV
}
// NodeRoleConsensusMember defines that the Node is a consensus (etcd) cluster
diff --git a/metropolis/node/core/consensus/BUILD.bazel b/metropolis/node/core/consensus/BUILD.bazel
index 669985a..dd3de3c 100644
--- a/metropolis/node/core/consensus/BUILD.bazel
+++ b/metropolis/node/core/consensus/BUILD.bazel
@@ -8,10 +8,10 @@
deps = [
"//metropolis/node:go_default_library",
"//metropolis/node/core/consensus/ca:go_default_library",
+ "//metropolis/node/core/consensus/client:go_default_library",
"//metropolis/node/core/localstorage:go_default_library",
"//metropolis/pkg/supervisor:go_default_library",
"@io_etcd_go_etcd//clientv3:go_default_library",
- "@io_etcd_go_etcd//clientv3/namespace:go_default_library",
"@io_etcd_go_etcd//embed:go_default_library",
"@org_uber_go_atomic//:go_default_library",
],
diff --git a/metropolis/node/core/consensus/consensus.go b/metropolis/node/core/consensus/consensus.go
index 33a352a..683db19 100644
--- a/metropolis/node/core/consensus/consensus.go
+++ b/metropolis/node/core/consensus/consensus.go
@@ -39,12 +39,12 @@
"time"
"go.etcd.io/etcd/clientv3"
- "go.etcd.io/etcd/clientv3/namespace"
"go.etcd.io/etcd/embed"
"go.uber.org/atomic"
node "source.monogon.dev/metropolis/node"
"source.monogon.dev/metropolis/node/core/consensus/ca"
+ "source.monogon.dev/metropolis/node/core/consensus/client"
"source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/pkg/supervisor"
)
@@ -389,17 +389,20 @@
}
}
-// KV returns and etcd KV client interface to the etcd member/cluster.
-func (s *Service) KV(module, space string) clientv3.KV {
+func (s *Service) Client() client.Namespaced {
s.stateMu.Lock()
defer s.stateMu.Unlock()
- return namespace.NewKV(s.state.cl.KV, fmt.Sprintf("%s:%s", module, space))
-}
-
-func (s *Service) KVRoot() clientv3.KV {
- s.stateMu.Lock()
- defer s.stateMu.Unlock()
- return s.state.cl.KV
+ // 'namespaced' is the root of all namespaced clients within the etcd K/V
+ // store, with further paths in a colon-separated format, eg.:
+ // namespaced:example/
+ // namespaced:foo:bar:baz/
+ client, err := client.NewLocal(s.state.cl).Sub("namespaced")
+ if err != nil {
+ // This error can only happen due to a malformed path, which is
+ // constant. Thus, this is a programming error and we panic.
+ panic(fmt.Errorf("Could not get consensus etcd client: %v", err))
+ }
+ return client
}
func (s *Service) Cluster() clientv3.Cluster {
diff --git a/metropolis/node/core/consensus/consensus_test.go b/metropolis/node/core/consensus/consensus_test.go
index 8e26535..2671432 100644
--- a/metropolis/node/core/consensus/consensus_test.go
+++ b/metropolis/node/core/consensus/consensus_test.go
@@ -96,7 +96,7 @@
supervisor.New(b.ctx, etcd.Run)
waitEtcd(t, etcd)
- kv := etcd.KV("foo", "bar")
+ kv := etcd.Client()
if _, err := kv.Put(b.ctx, "/foo", "bar"); err != nil {
t.Fatalf("test key creation failed: %v", err)
}
@@ -162,7 +162,7 @@
ctx, ctxC := context.WithCancel(b.ctx)
supervisor.New(ctx, etcd.Run)
waitEtcd(t, etcd)
- kv := etcd.KV("foo", "bar")
+ kv := etcd.Client()
if new {
if _, err := kv.Put(b.ctx, "/foo", "bar"); err != nil {
t.Fatalf("test key creation failed: %v", err)
diff --git a/metropolis/node/core/main.go b/metropolis/node/core/main.go
index 442102f..4051663 100644
--- a/metropolis/node/core/main.go
+++ b/metropolis/node/core/main.go
@@ -150,38 +150,45 @@
return fmt.Errorf("when starting enrolment: %w", err)
}
- // Wait until the cluster manager settles.
- node, err := m.Wait()
+ // Wait until the node finds a home in the new cluster.
+ watcher := m.Watch()
+ status, err := watcher.GetHome(ctx)
if err != nil {
close(trapdoor)
- return fmt.Errorf("enrolment failed, aborting: %w", err)
+ return fmt.Errorf("new couldn't find home in new cluster, aborting: %w", err)
}
// We are now in a cluster. We can thus access our 'node' object and start all services that
// we should be running.
logger.Info("Enrolment success, continuing startup.")
- logger.Info(fmt.Sprintf("This node (%s) has roles:", node.String()))
- if cm := node.ConsensusMember(); cm != nil {
+ logger.Info(fmt.Sprintf("This node (%s) has roles:", status.Node.String()))
+ if cm := status.Node.ConsensusMember(); cm != nil {
// There's no need to start anything for when we are a consensus member - the cluster
// manager does this for us if necessary (as creating/enrolling/joining a cluster is
// pretty tied into cluster lifecycle management).
logger.Info(fmt.Sprintf(" - etcd consensus member"))
}
- if kw := node.KubernetesWorker(); kw != nil {
+ if kw := status.Node.KubernetesWorker(); kw != nil {
logger.Info(fmt.Sprintf(" - kubernetes worker"))
}
// If we're supposed to be a kubernetes worker, start kubernetes services and containerd.
// In the future, this might be split further into kubernetes control plane and data plane
// roles.
+ // TODO(q3k): watch on cluster status updates to start/stop kubernetes service.
var containerdSvc *containerd.Service
var kubeSvc *kubernetes.Service
- if kw := node.KubernetesWorker(); kw != nil {
+ if kw := status.Node.KubernetesWorker(); kw != nil {
logger.Info("Starting Kubernetes worker services...")
+ kv, err := status.ConsensusClient(cluster.ConsensusUserKubernetesPKI)
+ if err != nil {
+ return fmt.Errorf("failed to retrieve consensus kubernetes PKI client: %w", err)
+ }
+
// Ensure Kubernetes PKI objects exist in etcd.
- kpki := pki.New(lt.MustLeveledFor("pki.kubernetes"), node.KV)
+ kpki := pki.New(lt.MustLeveledFor("pki.kubernetes"), kv)
if err := kpki.EnsureAll(ctx); err != nil {
return fmt.Errorf("failed to ensure kubernetes PKI present: %w", err)
}