| package roleserve |
| |
| import ( |
| "context" |
| "crypto/ed25519" |
| "crypto/x509" |
| "fmt" |
| "time" |
| |
| "source.monogon.dev/metropolis/node/core/consensus" |
| "source.monogon.dev/metropolis/node/core/curator" |
| "source.monogon.dev/metropolis/node/core/identity" |
| "source.monogon.dev/metropolis/node/core/localstorage" |
| "source.monogon.dev/metropolis/node/core/rpc/resolver" |
| "source.monogon.dev/metropolis/pkg/event" |
| "source.monogon.dev/metropolis/pkg/event/memory" |
| "source.monogon.dev/metropolis/pkg/pki" |
| "source.monogon.dev/metropolis/pkg/supervisor" |
| cpb "source.monogon.dev/metropolis/proto/common" |
| ) |
| |
| // workerControlPlane is the Control Plane Worker, responsible for maintaining a |
| // locally running Control Plane (Consensus and Curator service pair) if needed. |
| // |
| // The Control Plane will run under the following conditions: |
| // - This node has been started in BOOTSTRAP mode and bootstrapData was provided |
| // by the cluster enrolment logic. In this case, the Control Plane Worker will |
| // perform the required bootstrap steps, creating a local node with appropriate |
| // roles, and will start Consensus and the Curator. |
| // - This node has the ConsensusMember Node Role. This will be true for nodes |
| // which are REGISTERing into the cluster, as well as already running nodes that |
| // have been assigned the role. |
| // |
| // In either case, localControlPlane will be updated to allow direct access to |
| // the now locally running control plane. For bootstrapping node, |
| // curatorConnection is also populated, as that is now the first time the rest of |
| // the node services can reach the newly minted cluster control plane. |
| type workerControlPlane struct { |
| storageRoot *localstorage.Root |
| |
| // bootstrapData will be read. |
| bootstrapData *memory.Value[*bootstrapData] |
| // localRoles will be read. |
| localRoles *memory.Value[*cpb.NodeRoles] |
| // resolver will be read and used to populate curatorConnection when |
| // bootstrapping consensus. |
| resolver *resolver.Resolver |
| |
| // localControlPlane will be written. |
| localControlPlane *memory.Value[*localControlPlane] |
| // curatorConnection will be written. |
| curatorConnection *memory.Value[*curatorConnection] |
| } |
| |
| // controlPlaneStartup is used internally to provide a reduced (as in MapReduce) |
| // datum for the main Control Plane launcher responsible for launching the |
| // Control Plane Services, if at all. |
| type controlPlaneStartup struct { |
| // consensusConfig is set if the node should run the control plane, and will |
| // contain the configuration of the Consensus service. |
| consensusConfig *consensus.Config |
| // bootstrap is set if this node should bootstrap consensus. It contains all |
| // data required to perform this bootstrap step. |
| bootstrap *bootstrapData |
| existing *curatorConnection |
| } |
| |
| // changed informs the Control Plane launcher whether two different |
| // controlPlaneStartups differ to the point where a restart of the control plane |
| // should happen. |
| // |
| // Currently, this is only true when a node switches to/from having a Control |
| // Plane role. |
| func (c *controlPlaneStartup) changed(o *controlPlaneStartup) bool { |
| hasConsensusA := c.consensusConfig != nil |
| hasConsensusB := o.consensusConfig != nil |
| if hasConsensusA != hasConsensusB { |
| return true |
| } |
| |
| return false |
| } |
| |
| func (s *workerControlPlane) run(ctx context.Context) error { |
| // Map/Reduce a *controlPlaneStartup from different data sources. This will then |
| // populate an Event Value that the actual launcher will use to start the |
| // Control Plane. |
| // |
| // bootstrapData -M-> bootstrapDataC ------. |
| // | |
| // curatorConnection -M-> curatorConnectionC --R---> startupV |
| // | |
| // NodeRoles -M-> rolesC --------------' |
| // |
| var startupV memory.Value[*controlPlaneStartup] |
| |
| // Channels are used as intermediaries between map stages and the final reduce, |
| // which is okay as long as the entire tree restarts simultaneously (which we |
| // ensure via RunGroup). |
| bootstrapDataC := make(chan *bootstrapData) |
| curatorConnectionC := make(chan *curatorConnection) |
| rolesC := make(chan *cpb.NodeRoles) |
| |
| supervisor.RunGroup(ctx, map[string]supervisor.Runnable{ |
| // Plain conversion from Event Value to channel. |
| "map-bootstrap-data": event.Pipe[*bootstrapData](s.bootstrapData, bootstrapDataC), |
| // Plain conversion from Event Value to channel. |
| "map-curator-connection": event.Pipe[*curatorConnection](s.curatorConnection, curatorConnectionC), |
| // Plain conversion from Event Value to channel. |
| "map-roles": event.Pipe[*cpb.NodeRoles](s.localRoles, rolesC), |
| // Provide config from above. |
| "reduce-config": func(ctx context.Context) error { |
| supervisor.Signal(ctx, supervisor.SignalHealthy) |
| var lr *cpb.NodeRoles |
| var cc *curatorConnection |
| var bd *bootstrapData |
| for { |
| select { |
| case <-ctx.Done(): |
| return ctx.Err() |
| case lr = <-rolesC: |
| case cc = <-curatorConnectionC: |
| case bd = <-bootstrapDataC: |
| } |
| |
| // If we have any bootstrap config ever, always use that. |
| // |
| // If there is a conflict between two available configuration methods (bootstrap |
| // and non-bootstrap) there effectively shouldn't be any difference between the |
| // two and it shouldn't matter which one we pick. That is because the bootstrap |
| // data is only effectively used to populate the JoinCluster parameter of etcd, |
| // which in turns is only used when a node is starting without any data present. |
| // And since we managed to get our own node roles and that won the race against |
| // bootstrap data, it means the bootstrap was successful and we can now start |
| // without the bootstrap data. |
| // |
| // The only problem is when we remove a ConsensusMember from a node which still |
| // has BootstrapData lingering from first bootup. However, we currently do not |
| // support removing consensus roles. |
| // |
| // TODO(q3k): support the above edge case. This can be done, for example, by |
| // rewriting the reduction to wait for all data to be available and by |
| // pre-populating all values to be nil at startup, thereby allowing for priority |
| // encoding and removing the above race condition. |
| if bd != nil { |
| supervisor.Logger(ctx).Infof("Using bootstrap data...") |
| startupV.Set(&controlPlaneStartup{ |
| consensusConfig: &consensus.Config{ |
| Data: &s.storageRoot.Data.Etcd, |
| Ephemeral: &s.storageRoot.Ephemeral.Consensus, |
| NodePrivateKey: bd.nodePrivateKey, |
| }, |
| bootstrap: bd, |
| }) |
| continue |
| } |
| |
| // Otherwise, try to interpret node roles if available. |
| if lr != nil && cc != nil { |
| supervisor.Logger(ctx).Infof("Using role assigned by cluster...") |
| role := lr.ConsensusMember |
| if role == nil { |
| supervisor.Logger(ctx).Infof("Not a control plane node.") |
| startupV.Set(&controlPlaneStartup{}) |
| continue |
| } |
| supervisor.Logger(ctx).Infof("Control plane node, building config...") |
| |
| // Parse X509 data from NodeRoles. |
| caCert, err := x509.ParseCertificate(role.CaCertificate) |
| if err != nil { |
| supervisor.Logger(ctx).Errorf("Could not parse CA certificate: %v", err) |
| continue |
| } |
| peerCert, err := x509.ParseCertificate(role.PeerCertificate) |
| if err != nil { |
| supervisor.Logger(ctx).Errorf("Could not parse peer certificate: %v", err) |
| continue |
| } |
| crl, err := x509.ParseCRL(role.InitialCrl) |
| if err != nil { |
| supervisor.Logger(ctx).Errorf("Could not parse CRL: %v", err) |
| continue |
| } |
| |
| // Convert NodeRoles peers into consensus peers. Let the user know what peers |
| // we're starting with. |
| supervisor.Logger(ctx).Infof("Node role mandates cluster membership with initial peers:") |
| for _, p := range role.Peers { |
| supervisor.Logger(ctx).Infof(" - %s (%s)", p.Name, p.URL) |
| } |
| nodes := make([]consensus.ExistingNode, len(role.Peers)) |
| for i, p := range role.Peers { |
| nodes[i].Name = p.Name |
| nodes[i].URL = p.URL |
| } |
| |
| // Build and submit config to startup V. |
| startupV.Set(&controlPlaneStartup{ |
| consensusConfig: &consensus.Config{ |
| Data: &s.storageRoot.Data.Etcd, |
| Ephemeral: &s.storageRoot.Ephemeral.Consensus, |
| NodePrivateKey: cc.credentials.TLSCredentials().PrivateKey.(ed25519.PrivateKey), |
| JoinCluster: &consensus.JoinCluster{ |
| CACertificate: caCert, |
| NodeCertificate: peerCert, |
| InitialCRL: &pki.CRL{ |
| Raw: role.InitialCrl, |
| List: crl, |
| }, |
| ExistingNodes: nodes, |
| }, |
| }, |
| existing: cc, |
| }) |
| } |
| } |
| }, |
| }) |
| |
| // Run main Control Plane launcher. This depends on a config being put to |
| // startupV. |
| supervisor.Run(ctx, "launcher", func(ctx context.Context) error { |
| supervisor.Logger(ctx).Infof("Waiting for start data...") |
| |
| // Read config from startupV. |
| w := startupV.Watch() |
| defer w.Close() |
| startup, err := w.Get(ctx) |
| if err != nil { |
| return err |
| } |
| |
| // Start Control Plane if we have a config. |
| if startup.consensusConfig == nil { |
| supervisor.Logger(ctx).Infof("No consensus config, not starting up control plane.") |
| s.localControlPlane.Set(nil) |
| } else { |
| supervisor.Logger(ctx).Infof("Got config, starting consensus and curator...") |
| |
| // Start consensus with config from startupV. This bootstraps the consensus |
| // service if needed. |
| con := consensus.New(*startup.consensusConfig) |
| if err := supervisor.Run(ctx, "consensus", con.Run); err != nil { |
| return fmt.Errorf("failed to start consensus service: %w", err) |
| } |
| |
| // Prepare curator config, notably performing a bootstrap step if necessary. The |
| // preparation will result in a set of node credentials that will be used to |
| // fully continue bringing up this node. |
| var creds *identity.NodeCredentials |
| var caCert []byte |
| if b := startup.bootstrap; b != nil { |
| supervisor.Logger(ctx).Infof("Bootstrapping control plane. Waiting for consensus...") |
| |
| // Connect to etcd as curator to perform the bootstrap step. |
| w := con.Watch() |
| st, err := w.Get(ctx) |
| if err != nil { |
| return fmt.Errorf("while waiting for consensus for bootstrap: %w", err) |
| } |
| ckv, err := st.CuratorClient() |
| if err != nil { |
| return fmt.Errorf("when retrieving curator client for bootstarp: %w", err) |
| } |
| |
| supervisor.Logger(ctx).Infof("Bootstrapping control plane. Performing bootstrap...") |
| |
| // Perform curator bootstrap step in etcd. |
| // |
| // This is all idempotent, so there's no harm in re-running this on every |
| // curator startup. |
| // |
| // TODO(q3k): collapse the curator bootstrap shenanigans into a single function. |
| npub := b.nodePrivateKey.Public().(ed25519.PublicKey) |
| jpub := b.nodePrivateJoinKey.Public().(ed25519.PublicKey) |
| |
| n := curator.NewNodeForBootstrap(b.clusterUnlockKey, npub, jpub, b.nodeTPMUsage) |
| |
| // The first node always runs consensus. |
| join, err := st.AddNode(ctx, npub) |
| if err != nil { |
| return fmt.Errorf("when retrieving node join data from consensus: %w", err) |
| } |
| |
| n.EnableConsensusMember(join) |
| n.EnableKubernetesController() |
| |
| var nodeCert []byte |
| caCert, nodeCert, err = curator.BootstrapNodeFinish(ctx, ckv, &n, b.initialOwnerKey, b.initialClusterConfiguration) |
| if err != nil { |
| return fmt.Errorf("while bootstrapping node: %w", err) |
| } |
| // ... and build new credentials from bootstrap step. |
| creds, err = identity.NewNodeCredentials(b.nodePrivateKey, nodeCert, caCert) |
| if err != nil { |
| return fmt.Errorf("when creating bootstrap node credentials: %w", err) |
| } |
| |
| if err = creds.Save(&s.storageRoot.Data.Node.Credentials); err != nil { |
| return fmt.Errorf("while saving node credentials: %w", err) |
| } |
| sc, err := s.storageRoot.ESP.Metropolis.SealedConfiguration.Unseal(b.nodeTPMUsage) |
| if err != nil { |
| return fmt.Errorf("reading sealed configuration failed: %w", err) |
| } |
| sc.ClusterCa = caCert |
| if err = s.storageRoot.ESP.Metropolis.SealedConfiguration.SealSecureBoot(sc, b.nodeTPMUsage); err != nil { |
| return fmt.Errorf("writing sealed configuration failed: %w", err) |
| } |
| supervisor.Logger(ctx).Infof("Control plane bootstrap complete, starting curator...") |
| } else { |
| // Not bootstrapping, just starting consensus with credentials we already have. |
| |
| // First, run a few assertions. This should never happen with the Map/Reduce |
| // logic above, ideally we would encode this in the type system. |
| if startup.existing == nil { |
| panic("no existing curator connection but not bootstrapping either") |
| } |
| if startup.existing.credentials == nil { |
| panic("no existing.credentials but not bootstrapping either") |
| } |
| |
| // Use already existing credentials, and pass over already known curators (as |
| // we're not the only node, and we'd like downstream consumers to be able to |
| // keep connecting to existing curators in case the local one fails). |
| creds = startup.existing.credentials |
| } |
| |
| // Start curator. |
| cur := curator.New(curator.Config{ |
| NodeCredentials: creds, |
| Consensus: con, |
| LeaderTTL: 10 * time.Second, |
| }) |
| if err := supervisor.Run(ctx, "curator", cur.Run); err != nil { |
| return fmt.Errorf("failed to start curator: %w", err) |
| } |
| |
| supervisor.Signal(ctx, supervisor.SignalHealthy) |
| supervisor.Logger(ctx).Infof("Control plane running, submitting localControlPlane.") |
| |
| s.localControlPlane.Set(&localControlPlane{consensus: con, curator: cur}) |
| if startup.bootstrap != nil { |
| // Feed curatorConnection if bootstrapping to continue the node bringup. |
| s.curatorConnection.Set(newCuratorConnection(creds, s.resolver)) |
| } |
| } |
| |
| // Restart everything if we get a significantly different config (i.e. a config |
| // whose change would/should either turn up or tear down the Control Plane). |
| for { |
| nc, err := w.Get(ctx) |
| if err != nil { |
| return err |
| } |
| if nc.changed(startup) { |
| supervisor.Logger(ctx).Infof("Configuration changed, restarting...") |
| return fmt.Errorf("config changed, restarting") |
| } |
| } |
| }) |
| |
| supervisor.Signal(ctx, supervisor.SignalHealthy) |
| <-ctx.Done() |
| return ctx.Err() |
| } |