m/n/core/cluster: migrate to events and etcd namespaced client
This moves the status of the cluster manager to use a local event
variable. Watchers (like the node startup code) can now use this to get
updates on the state of the node and its cluster membership in a way
that's more abstracted from a sequential startup. This will permit us to
move a lof othe startup code into code common across different node
lifecycle paths.
Test Plan: Refactor, exercised by e2e.
X-Origin-Diff: phab/D757
GitOrigin-RevId: 31a3600ad2aab90a1e7f84d741e7ea40a0422724
diff --git a/metropolis/node/core/main.go b/metropolis/node/core/main.go
index 442102f..4051663 100644
--- a/metropolis/node/core/main.go
+++ b/metropolis/node/core/main.go
@@ -150,38 +150,45 @@
return fmt.Errorf("when starting enrolment: %w", err)
}
- // Wait until the cluster manager settles.
- node, err := m.Wait()
+ // Wait until the node finds a home in the new cluster.
+ watcher := m.Watch()
+ status, err := watcher.GetHome(ctx)
if err != nil {
close(trapdoor)
- return fmt.Errorf("enrolment failed, aborting: %w", err)
+ return fmt.Errorf("new couldn't find home in new cluster, aborting: %w", err)
}
// We are now in a cluster. We can thus access our 'node' object and start all services that
// we should be running.
logger.Info("Enrolment success, continuing startup.")
- logger.Info(fmt.Sprintf("This node (%s) has roles:", node.String()))
- if cm := node.ConsensusMember(); cm != nil {
+ logger.Info(fmt.Sprintf("This node (%s) has roles:", status.Node.String()))
+ if cm := status.Node.ConsensusMember(); cm != nil {
// There's no need to start anything for when we are a consensus member - the cluster
// manager does this for us if necessary (as creating/enrolling/joining a cluster is
// pretty tied into cluster lifecycle management).
logger.Info(fmt.Sprintf(" - etcd consensus member"))
}
- if kw := node.KubernetesWorker(); kw != nil {
+ if kw := status.Node.KubernetesWorker(); kw != nil {
logger.Info(fmt.Sprintf(" - kubernetes worker"))
}
// If we're supposed to be a kubernetes worker, start kubernetes services and containerd.
// In the future, this might be split further into kubernetes control plane and data plane
// roles.
+ // TODO(q3k): watch on cluster status updates to start/stop kubernetes service.
var containerdSvc *containerd.Service
var kubeSvc *kubernetes.Service
- if kw := node.KubernetesWorker(); kw != nil {
+ if kw := status.Node.KubernetesWorker(); kw != nil {
logger.Info("Starting Kubernetes worker services...")
+ kv, err := status.ConsensusClient(cluster.ConsensusUserKubernetesPKI)
+ if err != nil {
+ return fmt.Errorf("failed to retrieve consensus kubernetes PKI client: %w", err)
+ }
+
// Ensure Kubernetes PKI objects exist in etcd.
- kpki := pki.New(lt.MustLeveledFor("pki.kubernetes"), node.KV)
+ kpki := pki.New(lt.MustLeveledFor("pki.kubernetes"), kv)
if err := kpki.EnsureAll(ctx); err != nil {
return fmt.Errorf("failed to ensure kubernetes PKI present: %w", err)
}