blob: 5c8c4b2fa362a9bb827e95fc26c292c446fef480 [file] [log] [blame]
Serge Bazanski6dff6d62022-01-28 18:15:14 +01001package roleserve
2
3import (
Serge Bazanski58cf3bc2022-03-09 20:33:36 +01004 "bytes"
Serge Bazanski6dff6d62022-01-28 18:15:14 +01005 "context"
6 "crypto/ed25519"
7 "crypto/x509"
8 "fmt"
9 "time"
10
Mateusz Zalega2930e992022-04-25 12:52:35 +020011 "golang.org/x/sys/unix"
12 "google.golang.org/protobuf/proto"
13
Serge Bazanski6dff6d62022-01-28 18:15:14 +010014 "source.monogon.dev/metropolis/node/core/consensus"
15 "source.monogon.dev/metropolis/node/core/curator"
16 "source.monogon.dev/metropolis/node/core/identity"
17 "source.monogon.dev/metropolis/node/core/localstorage"
Serge Bazanskib43d0f02022-06-23 17:32:10 +020018 "source.monogon.dev/metropolis/node/core/rpc/resolver"
Serge Bazanski6dff6d62022-01-28 18:15:14 +010019 "source.monogon.dev/metropolis/pkg/event/memory"
20 "source.monogon.dev/metropolis/pkg/pki"
21 "source.monogon.dev/metropolis/pkg/supervisor"
22 cpb "source.monogon.dev/metropolis/proto/common"
Mateusz Zalega2930e992022-04-25 12:52:35 +020023 ppb "source.monogon.dev/metropolis/proto/private"
Serge Bazanski6dff6d62022-01-28 18:15:14 +010024)
25
26// workerControlPlane is the Control Plane Worker, responsible for maintaining a
27// locally running Control Plane (Consensus and Curator service pair) if needed.
28//
29// The Control Plane will run under the following conditions:
Serge Bazanski15f7f632023-03-14 17:17:20 +010030// - This node has been started in BOOTSTRAP mode and bootstrapData was provided
31// by the cluster enrolment logic. In this case, the Control Plane Worker will
32// perform the required bootstrap steps, creating a local node with appropriate
33// roles, and will start Consensus and the Curator.
34// - This node has the ConsensusMember Node Role. This will be true for nodes
35// which are REGISTERing into the cluster, as well as already running nodes that
36// have been assigned the role.
Serge Bazanski6dff6d62022-01-28 18:15:14 +010037//
38// In either case, ClusterMembership will be updated to allow connecting to the
39// newly locally running control plane. For nodes that are bootstrapping the
40// cluster, this will be the fist time the rest of the node can reach the
41// Curator. For other cases, this will be the new, preferred way to reach
42// consensus, without having to rely on external Control Plane nodes.
43type workerControlPlane struct {
44 storageRoot *localstorage.Root
45
46 // bootstrapData will be read.
47 bootstrapData *bootstrapDataValue
48 // clusterMembership will be read and written.
49 clusterMembership *ClusterMembershipValue
50 // localRoles will be read.
51 localRoles *localRolesValue
Serge Bazanskib43d0f02022-06-23 17:32:10 +020052 // resolver will be read and used to populate ClusterMembership.
53 resolver *resolver.Resolver
Serge Bazanski6dff6d62022-01-28 18:15:14 +010054}
55
56// controlPlaneStartup is used internally to provide a reduced (as in MapReduce)
57// datum for the main Control Plane launcher responsible for launching the
58// Control Plane Services, if at all.
59type controlPlaneStartup struct {
60 // consensusConfig is set if the node should run the control plane, and will
61 // contain the configuration of the Consensus service.
62 consensusConfig *consensus.Config
63 // bootstrap is set if this node should bootstrap consensus. It contains all
64 // data required to perform this bootstrap step.
65 bootstrap *bootstrapData
66
67 // existingMembership is the ClusterMembership that the node already had
68 // available before deciding to run the Control Plane. This will be used to
69 // carry over existing data from the membership into the new membership as
70 // affected by starting the control plane.
71 existingMembership *ClusterMembership
72}
73
74// changed informs the Control Plane launcher whether two different
75// controlPlaneStartups differ to the point where a restart of the control plane
76// should happen.
77//
78// Currently this is only true when a node switches to/from having a Control
79// Plane role.
80func (c *controlPlaneStartup) changed(o *controlPlaneStartup) bool {
81 hasConsensusA := c.consensusConfig != nil
82 hasConsensusB := o.consensusConfig != nil
83 if hasConsensusA != hasConsensusB {
84 return true
85 }
86
87 return false
88}
89
90func (s *workerControlPlane) run(ctx context.Context) error {
91 // Map/Reduce a *controlPlaneStartup from different data sources. This will then
92 // populate an Event Value that the actual launcher will use to start the
93 // Control Plane.
94 //
95 // bootstrapData -M-> bootstrapDataC ------.
96 // |
97 // ClusterMambership -M-> clusterMembershipC --R---> startupV
98 // |
99 // NodeRoles -M-> rolesC --------------'
100 //
101 var startupV memory.Value
102
103 // Channels are used as intermediaries between map stages and the final reduce,
104 // which is okay as long as the entire tree restarts simultaneously (which we
105 // ensure via RunGroup).
106 bootstrapDataC := make(chan *bootstrapData)
107 clusterMembershipC := make(chan *ClusterMembership)
108 rolesC := make(chan *cpb.NodeRoles)
109
110 supervisor.RunGroup(ctx, map[string]supervisor.Runnable{
111 // Plain conversion from Event Value to channel.
112 "map-bootstrap-data": func(ctx context.Context) error {
113 w := s.bootstrapData.Watch()
114 defer w.Close()
115 for {
116 v, err := w.Get(ctx)
117 if err != nil {
118 return err
119 }
120 bootstrapDataC <- v
121 }
122 },
123 // Plain conversion from Event Value to channel.
124 "map-cluster-membership": func(ctx context.Context) error {
125 supervisor.Signal(ctx, supervisor.SignalHealthy)
126 w := s.clusterMembership.Watch()
127 defer w.Close()
128 for {
129 v, err := w.GetHome(ctx)
130 if err != nil {
131 return err
132 }
133 clusterMembershipC <- v
134 }
135 },
136 // Plain conversion from Event Value to channel.
137 "map-roles": func(ctx context.Context) error {
138 supervisor.Signal(ctx, supervisor.SignalHealthy)
139 w := s.localRoles.Watch()
140 defer w.Close()
141 for {
142 v, err := w.Get(ctx)
143 if err != nil {
144 return err
145 }
146 rolesC <- v
147 }
148 },
149 // Provide config from clusterMembership and roles.
150 "reduce-config": func(ctx context.Context) error {
151 supervisor.Signal(ctx, supervisor.SignalHealthy)
152 var lr *cpb.NodeRoles
153 var cm *ClusterMembership
154 var bd *bootstrapData
155 for {
156 select {
157 case <-ctx.Done():
158 return ctx.Err()
159 case lr = <-rolesC:
160 case cm = <-clusterMembershipC:
161 case bd = <-bootstrapDataC:
162 }
163
164 // If we have any bootstrap config ever, always use that.
165 //
166 // If there is a conflict between two available configuration methods (bootstrap
167 // and non-bootstrap) there effectively shouldn't be any difference between the
168 // two and it shouldn't matter which one we pick. That is because the bootstrap
169 // data is only effectively used to populate the JoinCluster parameter of etcd,
170 // which in turns is only used when a node is starting without any data present.
171 // And since we managed to get our own node roles and that won the race against
172 // bootstrap data, it means the bootstrap was successful and we can now start
173 // without the bootstrap data.
174 //
175 // The only problem is when we remove a ConsensusMember from a node which still
176 // has BootstrapData lingering from first bootup. However, we currently do not
177 // support removing consensus roles (or any roles for that matter).
178 //
179 // TODO(q3k): support the above edge case. This can be done, for example, by
180 // rewriting the reduction to wait for all data to be available and by
181 // pre-populating all values to be nil at startup, thereby allowing for priority
182 // encoding and removing the above race condition.
183 if bd != nil {
184 supervisor.Logger(ctx).Infof("Using bootstrap data...")
185 startupV.Set(&controlPlaneStartup{
186 consensusConfig: &consensus.Config{
187 Data: &s.storageRoot.Data.Etcd,
188 Ephemeral: &s.storageRoot.Ephemeral.Consensus,
189 NodePrivateKey: bd.nodePrivateKey,
190 },
191 bootstrap: bd,
192 })
193 continue
194 }
195
196 // Otherwise, try to interpret node roles if available.
197 if lr != nil && cm != nil {
Serge Bazanski949e4252022-06-21 13:52:05 +0200198 supervisor.Logger(ctx).Infof("Using role assigned by cluster...")
Serge Bazanski6dff6d62022-01-28 18:15:14 +0100199 role := lr.ConsensusMember
200 if role == nil {
201 supervisor.Logger(ctx).Infof("Not a control plane node.")
202 startupV.Set(&controlPlaneStartup{})
203 continue
204 }
205 supervisor.Logger(ctx).Infof("Control plane node, building config...")
206
207 // Parse X509 data from NodeRoles.
208 caCert, err := x509.ParseCertificate(role.CaCertificate)
209 if err != nil {
210 supervisor.Logger(ctx).Errorf("Could not parse CA certificate: %v", err)
211 continue
212 }
213 peerCert, err := x509.ParseCertificate(role.PeerCertificate)
214 if err != nil {
215 supervisor.Logger(ctx).Errorf("Could not parse peer certificate: %v", err)
216 continue
217 }
218 crl, err := x509.ParseCRL(role.InitialCrl)
219 if err != nil {
220 supervisor.Logger(ctx).Errorf("Could not parse CRL: %v", err)
221 continue
222 }
223
224 // Convert NodeRoles peers into consensus peers. Let the user know what peers
225 // we're starting with.
226 supervisor.Logger(ctx).Infof("Node role mandates cluster membership with initial peers:")
227 for _, p := range role.Peers {
228 supervisor.Logger(ctx).Infof(" - %s (%s)", p.Name, p.URL)
229 }
230 nodes := make([]consensus.ExistingNode, len(role.Peers))
231 for i, p := range role.Peers {
232 nodes[i].Name = p.Name
233 nodes[i].URL = p.URL
234 }
235
236 // Build and submit config to startup V.
237 startupV.Set(&controlPlaneStartup{
238 consensusConfig: &consensus.Config{
239 Data: &s.storageRoot.Data.Etcd,
240 Ephemeral: &s.storageRoot.Ephemeral.Consensus,
241 NodePrivateKey: cm.credentials.TLSCredentials().PrivateKey.(ed25519.PrivateKey),
242 JoinCluster: &consensus.JoinCluster{
243 CACertificate: caCert,
244 NodeCertificate: peerCert,
245 InitialCRL: &pki.CRL{
246 Raw: role.InitialCrl,
247 List: crl,
248 },
249 ExistingNodes: nodes,
250 },
251 },
252 existingMembership: cm,
253 })
254 }
255 }
256 },
257 })
258
259 // Run main Control Plane launcher. This depends on a config being put to
260 // startupV.
261 supervisor.Run(ctx, "launcher", func(ctx context.Context) error {
262 supervisor.Logger(ctx).Infof("Waiting for start data...")
263
264 // Read config from startupV.
265 w := startupV.Watch()
266 defer w.Close()
267 startupI, err := w.Get(ctx)
268 if err != nil {
269 return err
270 }
271 startup := startupI.(*controlPlaneStartup)
272
273 // Start Control Plane if we have a config.
274 if startup.consensusConfig == nil {
275 supervisor.Logger(ctx).Infof("No consensus config, not starting up control plane.")
276 } else {
277 supervisor.Logger(ctx).Infof("Got config, starting consensus and curator...")
278
279 // Start consensus with config from startupV. This bootstraps the consensus
280 // service if needed.
281 con := consensus.New(*startup.consensusConfig)
282 if err := supervisor.Run(ctx, "consensus", con.Run); err != nil {
283 return fmt.Errorf("failed to start consensus service: %w", err)
284 }
285
286 // Prepare curator config, notably performing a bootstrap step if necessary. The
287 // preparation will result in a set of node credentials to run the curator with
288 // and a previously used cluster directory to be passed over to the new
289 // ClusterMembership, if any.
290 var creds *identity.NodeCredentials
Mateusz Zalega2930e992022-04-25 12:52:35 +0200291 var caCert []byte
Serge Bazanski6dff6d62022-01-28 18:15:14 +0100292 var directory *cpb.ClusterDirectory
293 if b := startup.bootstrap; b != nil {
294 supervisor.Logger(ctx).Infof("Bootstrapping control plane. Waiting for consensus...")
295
296 // Connect to etcd as curator to perform the bootstrap step.
297 w := con.Watch()
298 st, err := w.Get(ctx)
299 if err != nil {
300 return fmt.Errorf("while waiting for consensus for bootstrap: %w", err)
301 }
302 ckv, err := st.CuratorClient()
303 if err != nil {
304 return fmt.Errorf("when retrieving curator client for bootstarp: %w", err)
305 }
306
307 supervisor.Logger(ctx).Infof("Bootstrapping control plane. Performing bootstrap...")
308
309 // Perform curator bootstrap step in etcd.
310 //
311 // This is all idempotent, so there's no harm in re-running this on every
312 // curator startup.
313 //
314 // TODO(q3k): collapse the curator bootstrap shenanigans into a single function.
Mateusz Zalega2930e992022-04-25 12:52:35 +0200315 npub := b.nodePrivateKey.Public().(ed25519.PublicKey)
316 jpub := b.nodePrivateJoinKey.Public().(ed25519.PublicKey)
Serge Bazanski949e4252022-06-21 13:52:05 +0200317
Mateusz Zalega2930e992022-04-25 12:52:35 +0200318 n := curator.NewNodeForBootstrap(b.clusterUnlockKey, npub, jpub)
Serge Bazanski949e4252022-06-21 13:52:05 +0200319
320 // The first node always runs consensus.
321 join, err := st.AddNode(ctx, npub)
322 if err != nil {
323 return fmt.Errorf("when retrieving node join data from consensus: %w", err)
324 }
325
326 n.EnableConsensusMember(join)
Serge Bazanski15f7f632023-03-14 17:17:20 +0100327 n.EnableKubernetesController()
Mateusz Zalega2930e992022-04-25 12:52:35 +0200328
329 var nodeCert []byte
330 caCert, nodeCert, err = curator.BootstrapNodeFinish(ctx, ckv, &n, b.initialOwnerKey)
Serge Bazanski6dff6d62022-01-28 18:15:14 +0100331 if err != nil {
332 return fmt.Errorf("while bootstrapping node: %w", err)
333 }
334 // ... and build new credentials from bootstrap step.
335 creds, err = identity.NewNodeCredentials(b.nodePrivateKey, nodeCert, caCert)
336 if err != nil {
337 return fmt.Errorf("when creating bootstrap node credentials: %w", err)
338 }
339 supervisor.Logger(ctx).Infof("Control plane bootstrap complete, starting curator...")
340 } else {
341 // Not bootstrapping, just starting consensus with credentials we already have.
342
343 // First, run a few assertions. This should never happen with the Map/Reduce
344 // logic above, ideally we would encode this in the type system.
345 if startup.existingMembership == nil {
346 panic("no existingMembership but not bootstrapping either")
347 }
348 if startup.existingMembership.credentials == nil {
349 panic("no existingMembership.credentials but not bootstrapping either")
350 }
351 if startup.existingMembership.remoteCurators == nil {
352 panic("no existingMembership.remoteCurators but not bootstrapping either")
353 }
354
355 // Use already existing credentials, and pass over already known curators (as
356 // we're not the only node, and we'd like downstream consumers to be able to
357 // keep connecting to existing curators in case the local one fails).
358 creds = startup.existingMembership.credentials
359 directory = startup.existingMembership.remoteCurators
360 }
361
Serge Bazanski58cf3bc2022-03-09 20:33:36 +0100362 // Ensure this node is present in the cluster directory.
363 if directory == nil {
364 directory = &cpb.ClusterDirectory{}
365 }
366 missing := true
367 for _, n := range directory.Nodes {
368 if bytes.Equal(n.PublicKey, creds.PublicKey()) {
369 missing = false
370 break
371 }
372 }
373 if missing {
374 directory.Nodes = append(directory.Nodes, &cpb.ClusterDirectory_Node{
375 PublicKey: creds.PublicKey(),
376 Addresses: []*cpb.ClusterDirectory_Node_Address{
377 {
378 Host: "127.0.0.1",
379 },
380 },
381 })
382 }
383
Mateusz Zalega2930e992022-04-25 12:52:35 +0200384 // Save this node's credentials, cluster directory and configuration as
385 // part of the control plane bootstrap process.
386 if b := startup.bootstrap; b != nil && caCert != nil {
387 if err = creds.Save(&s.storageRoot.Data.Node.Credentials); err != nil {
388 return fmt.Errorf("while saving node credentials: %w", err)
389 }
390
391 cdirRaw, err := proto.Marshal(directory)
392 if err != nil {
393 return fmt.Errorf("couldn't marshal ClusterDirectory: %w", err)
394 }
395 if err = s.storageRoot.ESP.Metropolis.ClusterDirectory.Write(cdirRaw, 0644); err != nil {
396 return err
397 }
398
399 sc := ppb.SealedConfiguration{
400 NodeUnlockKey: b.nodeUnlockKey,
401 JoinKey: b.nodePrivateJoinKey,
402 ClusterCa: caCert,
403 }
404 if err = s.storageRoot.ESP.Metropolis.SealedConfiguration.SealSecureBoot(&sc); err != nil {
405 return err
406 }
407
408 supervisor.Logger(ctx).Infof("Saved bootstrapped node's credentials.")
409 unix.Sync()
410 }
411
Serge Bazanski6dff6d62022-01-28 18:15:14 +0100412 // Start curator.
413 cur := curator.New(curator.Config{
414 NodeCredentials: creds,
415 Consensus: con,
416 LeaderTTL: 10 * time.Second,
Serge Bazanski6dff6d62022-01-28 18:15:14 +0100417 })
418 if err := supervisor.Run(ctx, "curator", cur.Run); err != nil {
419 return fmt.Errorf("failed to start curator: %w", err)
420 }
421
422 supervisor.Signal(ctx, supervisor.SignalHealthy)
423 supervisor.Logger(ctx).Infof("Control plane running, submitting clusterMembership.")
424
425 // We now have a locally running ControlPlane. Reflect that in a new
426 // ClusterMembership.
427 s.clusterMembership.set(&ClusterMembership{
Serge Bazanski6dff6d62022-01-28 18:15:14 +0100428 localConsensus: con,
Serge Bazanski966d40c2022-06-23 13:27:16 +0200429 localCurator: cur,
Serge Bazanski6dff6d62022-01-28 18:15:14 +0100430 credentials: creds,
431 remoteCurators: directory,
432 pubkey: creds.PublicKey(),
Serge Bazanskib43d0f02022-06-23 17:32:10 +0200433 resolver: s.resolver,
Serge Bazanski6dff6d62022-01-28 18:15:14 +0100434 })
435 }
436
437 // Restart everything if we get a significantly different config (ie., a config
438 // whose change would/should either turn up or tear down the Control Plane).
439 //
440 // Not restarting on every single change prevents us from going in a
441 // ClusterMembership -> ClusterDirectory -> ClusterMembership thrashing loop.
442 for {
443 ncI, err := w.Get(ctx)
444 if err != nil {
445 return err
446 }
447 nc := ncI.(*controlPlaneStartup)
448 if nc.changed(startup) {
449 supervisor.Logger(ctx).Infof("Configuration changed, restarting...")
450 return fmt.Errorf("config changed, restarting")
451 }
452 }
453 })
454
455 supervisor.Signal(ctx, supervisor.SignalHealthy)
456 <-ctx.Done()
457 return ctx.Err()
458}