m/n/core/cluster: migrate to events and etcd namespaced client
This moves the status of the cluster manager to use a local event
variable. Watchers (like the node startup code) can now use this to get
updates on the state of the node and its cluster membership in a way
that's more abstracted from a sequential startup. This will permit us to
move a lof othe startup code into code common across different node
lifecycle paths.
Test Plan: Refactor, exercised by e2e.
X-Origin-Diff: phab/D757
GitOrigin-RevId: 31a3600ad2aab90a1e7f84d741e7ea40a0422724
diff --git a/metropolis/node/core/cluster/manager.go b/metropolis/node/core/cluster/manager.go
index 65af212..874d3ae 100644
--- a/metropolis/node/core/cluster/manager.go
+++ b/metropolis/node/core/cluster/manager.go
@@ -26,16 +26,60 @@
"google.golang.org/protobuf/proto"
"source.monogon.dev/metropolis/node/core/consensus"
+ "source.monogon.dev/metropolis/node/core/consensus/client"
"source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/node/core/network"
+ "source.monogon.dev/metropolis/pkg/event"
"source.monogon.dev/metropolis/pkg/supervisor"
apb "source.monogon.dev/metropolis/proto/api"
ppb "source.monogon.dev/metropolis/proto/private"
)
-type managerResult struct {
- node *Node
- err error
+// Status is returned to Cluster clients (ie., node code) on Manager.Watch/.Get.
+type Status struct {
+ // State is the current state of the cluster, as seen by the node.
+ State ClusterState
+ // Node is the configuration of this node in the cluster.
+ Node *Node
+
+ consensusClient client.Namespaced
+}
+
+// ConsensusUser is the to-level user of an etcd client in Metropolis node
+// code. These need to be defined ahead of time in an Go 'enum', and different
+// ConsensusUsers should not be shared by different codepaths.
+type ConsensusUser string
+
+const (
+ ConsensusUserKubernetesPKI ConsensusUser = "kubernetes-pki"
+)
+
+// ConsensusClient returns an etcd/consensus client for a given ConsensusUser.
+func (s *Status) ConsensusClient(user ConsensusUser) (client.Namespaced, error) {
+ // Ensure that we already are connected to etcd and are in a state in which we
+ // should be handing out cluster connectivity.
+ if s.consensusClient == nil {
+ return nil, fmt.Errorf("not connected")
+ }
+ switch s.State {
+ case ClusterHome:
+ case ClusterSplit:
+ return nil, fmt.Errorf("refusing connection with cluster state %v", s.State)
+ default:
+ }
+
+ // Ensure only defined 'applications' are used to prevent programmer error and
+ // casting to ConsensusUser from an arbitrary string.
+ switch user {
+ case ConsensusUserKubernetesPKI:
+ default:
+ return nil, fmt.Errorf("unknown ConsensusUser %q", user)
+ }
+ client, err := s.consensusClient.Sub(string(user))
+ if err != nil {
+ return nil, fmt.Errorf("retrieving subclient failed: %w", err)
+ }
+ return client, nil
}
type state struct {
@@ -46,27 +90,49 @@
stateNode ppb.Node_FSMState
configuration *ppb.SealedConfiguration
-
- result *managerResult
- waiters []chan *managerResult
}
-func (s *state) setResult(node *Node, err error) {
- s.result = &managerResult{
- node: node,
- err: err,
+type Watcher struct {
+ event.Watcher
+}
+
+func (w *Watcher) Get(ctx context.Context) (*Status, error) {
+ val, err := w.Watcher.Get(ctx)
+ if err != nil {
+ return nil, err
}
- for _, w := range s.waiters {
- go func(c chan *managerResult) {
- c <- s.result
- }(w)
+ status := val.(Status)
+ return &status, err
+}
+
+// GetHome waits until the cluster, from the point of view of this node, is in
+// the ClusterHome state. This can be used to wait for the cluster manager to
+// 'settle', before clients start more node services.
+func (w *Watcher) GetHome(ctx context.Context) (*Status, error) {
+ for {
+ status, err := w.Get(ctx)
+ if err != nil {
+ return nil, err
+ }
+ switch status.State {
+ case ClusterHome:
+ return status, nil
+ case ClusterDisowning:
+ return nil, fmt.Errorf("the cluster has disowned this node")
+ }
}
- s.waiters = nil
+}
+
+func (m *Manager) Watch() Watcher {
+ return Watcher{
+ Watcher: m.status.Watch(),
+ }
}
type Manager struct {
storageRoot *localstorage.Root
networkService *network.Service
+ status event.MemoryValue
state
@@ -100,21 +166,6 @@
return &m.state, m.mu.RUnlock
}
-func (m *Manager) Wait() (*Node, error) {
- state, unlock := m.lock()
-
- if state.result != nil {
- unlock()
- return state.result.node, state.result.err
- }
-
- c := make(chan *managerResult)
- state.waiters = append(state.waiters, c)
- unlock()
- res := <-c
- return res.node, res.err
-}
-
// Run is the runnable of the Manager, to be started using the Supervisor. It
// is one-shot, and should not be restarted.
func (m *Manager) Run(ctx context.Context) error {
@@ -194,7 +245,7 @@
return nil, fmt.Errorf("could not find node parameters in ESP or qemu fwcfg")
}
if paramsFWCFG != nil && paramsESP != nil {
- supervisor.Logger(ctx).Warningf("Node parameters found both inboth ESP and qemu fwcfg, using the latter")
+ supervisor.Logger(ctx).Warningf("Node parameters found both in both ESP and qemu fwcfg, using the latter")
return paramsFWCFG, nil
} else if paramsFWCFG != nil {
return paramsFWCFG, nil