blob: 38c29459e42a806626ba4b4048ea95d3a4b66b25 [file] [log] [blame]
Serge Bazanski1ebd1e12020-07-13 19:17:16 +02001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package cluster
18
19import (
20 "context"
Serge Bazanski57b43752020-07-13 19:17:48 +020021 "crypto/x509"
22 "encoding/pem"
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020023 "fmt"
24 "io/ioutil"
25 "os"
Serge Bazanski57b43752020-07-13 19:17:48 +020026 "strings"
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020027 "sync"
28 "time"
29
Serge Bazanski57b43752020-07-13 19:17:48 +020030 apb "git.monogon.dev/source/nexantic.git/core/proto/api"
31
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020032 "github.com/cenkalti/backoff/v4"
Serge Bazanski57b43752020-07-13 19:17:48 +020033 "github.com/golang/protobuf/proto"
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020034 "go.etcd.io/etcd/clientv3"
35 "go.uber.org/zap"
36
Serge Bazanski57b43752020-07-13 19:17:48 +020037 "git.monogon.dev/source/nexantic.git/core/internal/common"
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020038 "git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
39 "git.monogon.dev/source/nexantic.git/core/internal/consensus"
40 "git.monogon.dev/source/nexantic.git/core/internal/localstorage"
Serge Bazanski57b43752020-07-13 19:17:48 +020041 "git.monogon.dev/source/nexantic.git/core/internal/localstorage/declarative"
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020042 "git.monogon.dev/source/nexantic.git/core/internal/network"
43)
44
45// Manager is a finite state machine that joins this node (ie., Smalltown instance running on a virtual/physical machine)
46// into a Smalltown cluster (ie. group of nodes that act as a single control plane for Smalltown services). It does that
47// by bringing up all required operating-system level components, including mounting the local filesystem, bringing up
48// a consensus (etcd) server/client, ...
49//
50// The Manager runs as a single-shot Runnable. It will attempt to progress its state from the initial state (New) to
51// either Running (meaning that the node is now part of a cluster), or Failed (meaning that the node couldn't become
52// part of a cluster). It is not restartable, as it mutates quite a bit of implicit operating-system level state (like
53// filesystem mounts). As such, it's difficult to recover reliably from failures, and since these failures indicate
54// some high issues with the cluster configuration/state, a failure requires a full kernel reboot to retry (or fix/
55// reconfigure the node).
56//
57// Currently, the Manager only supports one flow for bringing up a Node: by creating a new cluster. As such, it's
58// missing the following flows:
59// - joining a new node into an already running cluster
60// - restarting a node into an already existing cluster
61// - restarting a node into an already running cluster (ie. full reboot of whole cluster)
62//
63type Manager struct {
64 storageRoot *localstorage.Root
65 networkService *network.Service
66
67 // stateLock locks all state* variables.
68 stateLock sync.RWMutex
69 // state is the FSM state of the Manager.
70 state State
71 // stateRunningNode is the Node that this Manager got from joining a cluster. It's only valid if the Manager is
72 // Running.
73 stateRunningNode *Node
74 // stateWaiters is a list of channels that wish to be notified (by sending true or false) for when the Manager
75 // reaches a final state (Running or Failed respectively).
76 stateWaiters []chan bool
77
Serge Bazanski57b43752020-07-13 19:17:48 +020078 // goldenTicket is the Golden Ticket present in the enrolment config, if any.
79 goldenTicket *apb.GoldenTicket
80
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020081 // consensus is the spawned etcd/consensus service, if the Manager brought up a Node that should run one.
82 consensus *consensus.Service
83}
84
85// NewManager creates a new cluster Manager. The given localstorage Root must be places, but not yet started (and will
86// be started as the Manager makes progress). The given network Service must already be running.
87func NewManager(storageRoot *localstorage.Root, networkService *network.Service) *Manager {
88 return &Manager{
89 storageRoot: storageRoot,
90 networkService: networkService,
91 }
92}
93
94// State is the state of the Manager finite state machine.
95type State int
96
97const (
98 // StateNew is the initial state of the Manager. It decides how to go about joining or creating a cluster.
99 StateNew State = iota
100 // StateCreatingCluster is when the Manager attempts to create a new cluster - this happens when a node is started
101 // with no EnrolmentConfig.
102 StateCreatingCluster
Serge Bazanski57b43752020-07-13 19:17:48 +0200103 // StateCharlie is when the Manager uses the Golden Ticket debug/stopgap system to join an already
104 // existing cluster. This mechanism will be removed before the first Smalltown release.
105 StateCharlie
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200106 // StateRunning is when the Manager successfully got the node to be part of a cluster. stateRunningNode is valid.
107 StateRunning
108 // StateFailed is when the Manager failed to ge the node to be part of a cluster.
109 StateFailed
110)
111
112func (s State) String() string {
113 switch s {
114 case StateNew:
115 return "New"
116 case StateCreatingCluster:
117 return "CreatingCluster"
Serge Bazanski57b43752020-07-13 19:17:48 +0200118 case StateCharlie:
119 return "Charlie"
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200120 case StateRunning:
121 return "Running"
122 case StateFailed:
123 return "Failed"
124 default:
125 return "UNKNOWN"
126 }
127}
128
129// allowedTransition describes all allowed state transitions (map[From][]To).
130var allowedTransitions = map[State][]State{
Serge Bazanski57b43752020-07-13 19:17:48 +0200131 StateNew: {StateCreatingCluster, StateCharlie},
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200132 StateCreatingCluster: {StateRunning, StateFailed},
Serge Bazanski57b43752020-07-13 19:17:48 +0200133 StateCharlie: {StateRunning, StateFailed},
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200134}
135
136// allowed returns whether a transition from a state to another state is allowed (ie. is defined in allowedTransitions).
137func (m *Manager) allowed(from, to State) bool {
138 for _, allowed := range allowedTransitions[from] {
139 if to == allowed {
140 return true
141 }
142 }
143 return false
144}
145
146// next moves the Manager finite state machine from its current state to `n`, or to Failed if the transition is not
147// allowed.
148func (m *Manager) next(ctx context.Context, n State) {
149 m.stateLock.Lock()
150 defer m.stateLock.Unlock()
151
152 if !m.allowed(m.state, n) {
153 supervisor.Logger(ctx).Error("Attempted invalid enrolment state transition, failing enrolment",
154 zap.String("from", m.state.String()), zap.String("to", m.state.String()))
155 m.state = StateFailed
156 return
157 }
158
159 supervisor.Logger(ctx).Info("Enrolment state change",
160 zap.String("from", m.state.String()), zap.String("to", n.String()))
161
162 m.state = n
163}
164
165// State returns the state of the Manager. It's safe to call this from any goroutine.
166func (m *Manager) State() State {
167 m.stateLock.RLock()
168 defer m.stateLock.RUnlock()
169 return m.state
170}
171
172// WaitFinished waits until the Manager FSM reaches Running or Failed, and returns true if the FSM is Running. It's
173// safe to call this from any goroutine.
174func (m *Manager) WaitFinished() (success bool) {
175 m.stateLock.Lock()
176 switch m.state {
177 case StateFailed:
178 m.stateLock.Unlock()
179 return false
180 case StateRunning:
181 m.stateLock.Unlock()
182 return true
183 }
184
185 C := make(chan bool)
186 m.stateWaiters = append(m.stateWaiters, C)
187 m.stateLock.Unlock()
188 return <-C
189}
190
191// wakeWaiters wakes any WaitFinished waiters and lets them know about the current state of the Manager.
192// The stateLock must already been taken, and the state must have been set in the same critical section (otherwise
193// this can cause a race condition).
194func (m *Manager) wakeWaiters() {
195 state := m.state
196 waiters := m.stateWaiters
197 m.stateWaiters = nil
198
199 for _, waiter := range waiters {
200 go func(w chan bool) {
201 w <- state == StateRunning
202 }(waiter)
203 }
204}
205
206// Run is the runnable of the Manager, to be started using the Supervisor. It is one-shot, and should not be restarted.
207func (m *Manager) Run(ctx context.Context) error {
208 if state := m.State(); state != StateNew {
209 supervisor.Logger(ctx).Error("Manager started with non-New state, failing", zap.String("state", state.String()))
210 m.stateLock.Lock()
211 m.state = StateFailed
212 m.wakeWaiters()
213 m.stateLock.Unlock()
214 return nil
215 }
216
217 var err error
218 bo := backoff.NewExponentialBackOff()
219 for {
220 done := false
221 state := m.State()
222 switch state {
223 case StateNew:
224 err = m.stateNew(ctx)
225 case StateCreatingCluster:
226 err = m.stateCreatingCluster(ctx)
Serge Bazanski57b43752020-07-13 19:17:48 +0200227 case StateCharlie:
228 err = m.stateCharlie(ctx)
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200229 default:
230 done = true
231 break
232 }
233
234 if err != nil || done {
235 break
236 }
237
238 if state == m.State() && !m.allowed(state, m.State()) {
239 supervisor.Logger(ctx).Error("Enrolment got stuck, failing", zap.String("state", m.state.String()))
240 m.stateLock.Lock()
241 m.state = StateFailed
242 m.stateLock.Unlock()
243 } else {
244 bo.Reset()
245 }
246 }
247
248 m.stateLock.Lock()
249 state := m.state
250 if state != StateRunning {
251 supervisor.Logger(ctx).Error("Enrolment failed", zap.Error(err), zap.String("state", m.state.String()))
252 } else {
253 supervisor.Logger(ctx).Info("Enrolment successful!")
254 }
255 m.wakeWaiters()
256 m.stateLock.Unlock()
257
258 supervisor.Signal(ctx, supervisor.SignalHealthy)
259 supervisor.Signal(ctx, supervisor.SignalDone)
260 return nil
261}
262
263// stateNew is called when a Manager is New. It makes the decision on how to join this node into a cluster.
264func (m *Manager) stateNew(ctx context.Context) error {
265 supervisor.Logger(ctx).Info("Starting enrolment process...")
266
267 // Check for presence of EnrolmentConfig on ESP or in qemu firmware variables.
268 var configRaw []byte
269 configRaw, err := m.storageRoot.ESP.Enrolment.Read()
270 if err != nil && !os.IsNotExist(err) {
271 return fmt.Errorf("could not read local enrolment file: %w", err)
272 } else if err != nil {
273 configRaw, err = ioutil.ReadFile("/sys/firmware/qemu_fw_cfg/by_name/com.nexantic.smalltown/enrolment.pb/raw")
274 if err != nil && !os.IsNotExist(err) {
275 return fmt.Errorf("could not read firmware enrolment file: %w", err)
276 }
277 }
278
279 // If no enrolment file exists, we create a new cluster.
280 if configRaw == nil {
281 m.next(ctx, StateCreatingCluster)
282 return nil
283 }
284
Serge Bazanski57b43752020-07-13 19:17:48 +0200285 // Enrolment file exists, parse it.
286
287 enrolmentConfig := apb.EnrolmentConfig{}
288 if err := proto.Unmarshal(configRaw, &enrolmentConfig); err != nil {
289 return fmt.Errorf("could not unmarshal local enrolment file: %w", err)
290 }
291
292 // If no join ticket exists, we can't do anything yet.
293 if enrolmentConfig.GoldenTicket == nil {
294 return fmt.Errorf("joining a cluster without a golden ticket not yet implemented")
295 }
296
Lorenz Brunca24cfa2020-08-18 13:49:37 +0200297 m.goldenTicket = enrolmentConfig.GoldenTicket
298
Serge Bazanski57b43752020-07-13 19:17:48 +0200299 // Otherwise, we begin enrolling with the Golden Ticket.
300 m.next(ctx, StateCharlie)
301 return nil
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200302}
303
304// stateCreatingCluster is called when the Manager has decided to create a new cluster.
305//
306// The process to create a new cluster is as follows:
307// - wait for IP address
308// - initialize new data partition, by generating local and cluster unlock keys (the local unlock key is saved to
309// the ESP, while the cluster unlock key is returned)
310// - create a new node certificate and Node (with new given cluster unlock key)
311// - start up a new etcd cluster, with this node being the only member
312// - save the new Node to the new etcd cluster (thereby saving the node's cluster unlock key to etcd)
313func (m *Manager) stateCreatingCluster(ctx context.Context) error {
314 logger := supervisor.Logger(ctx)
315 logger.Info("Creating new cluster: waiting for IP address...")
316 ip, err := m.networkService.GetIP(ctx, true)
317 if err != nil {
318 return fmt.Errorf("when getting IP address: %w", err)
319 }
320 logger.Info("Creating new cluster: got IP address", zap.String("address", ip.String()))
321
322 logger.Info("Creating new cluster: initializing storage...")
323 cuk, err := m.storageRoot.Data.MountNew(&m.storageRoot.ESP.LocalUnlock)
324 if err != nil {
325 return fmt.Errorf("when making new data partition: %w", err)
326 }
327 logger.Info("Creating new cluster: storage initialized")
328
329 // Create certificate for node.
330 cert, err := m.storageRoot.Data.Node.EnsureSelfSigned(localstorage.CertificateForNode)
331 if err != nil {
332 return fmt.Errorf("failed to create new node certificate: %w", err)
333 }
334
335 node := NewNode(cuk, *ip, *cert.Leaf)
336
337 m.consensus = consensus.New(consensus.Config{
338 Data: &m.storageRoot.Data.Etcd,
339 Ephemeral: &m.storageRoot.Ephemeral.Consensus,
340 NewCluster: true,
341 Name: node.ID(),
342 InitialCluster: ip.String(),
343 ExternalHost: ip.String(),
344 ListenHost: ip.String(),
345 })
346 if err := supervisor.Run(ctx, "consensus", m.consensus.Run); err != nil {
347 return fmt.Errorf("when starting consensus: %w", err)
348 }
349
350 // TODO(q3k): make timeout configurable?
351 ctxT, ctxC := context.WithTimeout(ctx, 5*time.Second)
352 defer ctxC()
353
354 supervisor.Logger(ctx).Info("Creating new cluster: waiting for consensus...")
355 if err := m.consensus.WaitReady(ctxT); err != nil {
356 return fmt.Errorf("consensus service failed to become ready: %w", err)
357 }
358
359 // Configure node to be a consensus member and kubernetes worker. In the future, different nodes will have
360 // different roles, but for now they're all symmetrical.
361 _, consensusName, err := m.consensus.MemberInfo(ctx)
362 if err != nil {
363 return fmt.Errorf("could not get consensus MemberInfo: %w", err)
364 }
365 if err := node.MakeConsensusMember(consensusName); err != nil {
366 return fmt.Errorf("could not make new node into consensus member: %w", err)
367 }
368 if err := node.MakeKubernetesWorker(node.ID()); err != nil {
369 return fmt.Errorf("could not make new node into kubernetes worker: %w", err)
370 }
371
372 // Save node into etcd.
373 supervisor.Logger(ctx).Info("Creating new cluster: storing first node...")
374 if err := node.Store(ctx, m.consensus.KV("cluster", "enrolment")); err != nil {
375 return fmt.Errorf("could not save new node: %w", err)
376 }
377
378 m.stateLock.Lock()
379 m.stateRunningNode = node
380 m.stateLock.Unlock()
381
382 m.next(ctx, StateRunning)
383 return nil
384}
385
Serge Bazanski57b43752020-07-13 19:17:48 +0200386// stateCharlie is used to join an existing cluster via the GoldenTicket mechanism. This mechanism is temporarily
387// implemented in Smalltown in order to allow for testing multi-node clusters without a TPM attestation flow implemented.
388// The Golden Ticket contains a pregenerated node certificate, etcd certificate, and other data that any node can
389// use to join the cluster.
390// Since this flow is temporary, it has a slight impedance mismatch with methods exposed by localstorage, node, etc.,
391// and the resulting sequencing is a bit odd:
392// - the {node,etcd} certificates/keys are loaded (this already dictates the new node name, as the node name is based
393// off of the node public key)
394// - local storage is initialized, a local/cluster unlock keypair is generated
395// - etcd keys are manually saved to localstorage (vs. being generated locally by CA)
396// - an etcd/consensus member is started, knowing that the remote member was already generated when the golden ticket
397// was generated (vs. being created now by an RPC call, via an promote-node-to-etcd-member flow)
398// - the node is then promoted to a consensus member and kubernetes worker, its clusterunlock key is set, and then it
399// is saved to etcd.
400// As such, in this flow, we first create an etcd member (on goldenticket generation), and then only create a new Smalltown
401// node (when the goldenticket is used).
402func (m *Manager) stateCharlie(ctx context.Context) error {
403 t := m.goldenTicket
404 nodeCert, err := x509.ParseCertificate(t.NodeCert)
405 if err != nil {
406 return fmt.Errorf("parsing node certificate from ticket: %w", err)
407 }
408
409 supervisor.Logger(ctx).Info("Joining cluster: waiting for IP address...")
410 ip, err := m.networkService.GetIP(ctx, true)
411 if err != nil {
412 return fmt.Errorf("when getting IP address: %w", err)
413 }
414 supervisor.Logger(ctx).Info("Joining cluster: got IP address", zap.String("address", ip.String()))
415
416 supervisor.Logger(ctx).Info("Joining cluster: initializing storage...")
417 cuk, err := m.storageRoot.Data.MountNew(&m.storageRoot.ESP.LocalUnlock)
418 if err != nil {
419 return fmt.Errorf("when making new data partition: %w", err)
420 }
421 supervisor.Logger(ctx).Info("Joining cluster: storage initialized")
422 node := NewNode(cuk, *ip, *nodeCert)
423
424 // Save etcd PKI to disk.
425 for _, f := range []struct {
426 target declarative.FilePlacement
427 data []byte
428 blockType string
429 }{
430 {m.storageRoot.Data.Etcd.PeerPKI.Key, t.EtcdClientKey, "PRIVATE KEY"},
431 {m.storageRoot.Data.Etcd.PeerPKI.Certificate, t.EtcdClientCert, "CERTIFICATE"},
432 {m.storageRoot.Data.Etcd.PeerPKI.CACertificate, t.EtcdCaCert, "CERTIFICATE"},
433 } {
434 if err := f.target.Write(pem.EncodeToMemory(&pem.Block{Type: f.blockType, Bytes: f.data}), 0600); err != nil {
435 return fmt.Errorf("when writing etcd PKI data: %w", err)
436 }
437 }
438 if err := m.storageRoot.Data.Etcd.PeerCRL.Write(t.EtcdCrl, 0600); err != nil {
439 return fmt.Errorf("when writing etcd CRL: %w", err)
440 }
441
442 https := func(p *apb.GoldenTicket_EtcdPeer) string {
443 return fmt.Sprintf("%s=https://%s:%d", p.Name, p.Address, common.ConsensusPort)
444 }
445 var initialCluster []string
446 for _, p := range t.Peers {
447 initialCluster = append(initialCluster, https(p))
448 }
449 initialCluster = append(initialCluster, https(t.This))
450
451 supervisor.Logger(ctx).Info("Joining cluster: starting etcd join...",
452 zap.String("initial_cluster", strings.Join(initialCluster, ",")), zap.String("name", node.ID()))
453 m.consensus = consensus.New(consensus.Config{
454 Data: &m.storageRoot.Data.Etcd,
455 Ephemeral: &m.storageRoot.Ephemeral.Consensus,
456 Name: node.ID(),
457 InitialCluster: strings.Join(initialCluster, ","),
458 ExternalHost: ip.String(),
459 ListenHost: ip.String(),
460 })
461
462 if err := supervisor.Run(ctx, "consensus", m.consensus.Run); err != nil {
463 return fmt.Errorf("when starting consensus: %w", err)
464 }
465
466 // TODO(q3k): make timeout configurable?
467 ctxT, ctxC := context.WithTimeout(ctx, 5*time.Second)
468 defer ctxC()
469
470 supervisor.Logger(ctx).Info("Joining cluster: waiting for consensus...")
471 if err := m.consensus.WaitReady(ctxT); err != nil {
472 return fmt.Errorf("consensus service failed to become ready: %w", err)
473 }
474
475 // Configure node to be a consensus member and kubernetes worker. In the future, different nodes will have
476 // different roles, but for now they're all symmetrical.
477 _, consensusName, err := m.consensus.MemberInfo(ctx)
478 if err != nil {
479 return fmt.Errorf("could not get consensus MemberInfo: %w", err)
480 }
481 if err := node.MakeConsensusMember(consensusName); err != nil {
482 return fmt.Errorf("could not make new node into consensus member: %w", err)
483 }
484 if err := node.MakeKubernetesWorker(node.ID()); err != nil {
485 return fmt.Errorf("could not make new node into kubernetes worker: %w", err)
486 }
487
488 // Save node into etcd.
489 supervisor.Logger(ctx).Info("Creating new cluster: storing first node...")
490 if err := node.Store(ctx, m.consensus.KV("cluster", "enrolment")); err != nil {
491 return fmt.Errorf("could not save new node: %w", err)
492 }
493
494 m.stateLock.Lock()
495 m.stateRunningNode = node
496 m.stateLock.Unlock()
497
498 m.next(ctx, StateRunning)
499 return nil
500}
501
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200502// Node returns the Node that the Manager brought into a cluster, or nil if the Manager is not Running.
503// This is safe to call from any goroutine.
504func (m *Manager) Node() *Node {
505 m.stateLock.Lock()
506 defer m.stateLock.Unlock()
507 if m.state != StateRunning {
508 return nil
509 }
510 return m.stateRunningNode
511}
512
513// ConsensusKV returns a namespaced etcd KV client, or nil if the Manager is not Running.
514// This is safe to call from any goroutine.
515func (m *Manager) ConsensusKV(module, space string) clientv3.KV {
516 m.stateLock.Lock()
517 defer m.stateLock.Unlock()
518 if m.state != StateRunning {
519 return nil
520 }
521 if m.stateRunningNode.ConsensusMember() == nil {
522 // TODO(q3k): in this case, we should return a client to etcd even though this
523 // node is not a member of consensus. For now, all nodes are consensus members.
524 return nil
525 }
526 return m.consensus.KV(module, space)
527}
528
529// ConsensusKVRoot returns a non-namespaced etcd KV client, or nil if the Manager is not Running.
530// This is safe to call from any goroutine.
531func (m *Manager) ConsensusKVRoot() clientv3.KV {
532 m.stateLock.Lock()
533 defer m.stateLock.Unlock()
534 if m.state != StateRunning {
535 return nil
536 }
537 if m.stateRunningNode.ConsensusMember() == nil {
538 // TODO(q3k): in this case, we should return a client to etcd even though this
539 // node is not a member of consensus. For now, all nodes are consensus members.
540 return nil
541 }
542 return m.consensus.KVRoot()
543}
544
545// ConsensusCluster returns an etcd Cluster client, or nil if the Manager is not Running.
546// This is safe to call from any goroutine.
547func (m *Manager) ConsensusCluster() clientv3.Cluster {
548 m.stateLock.Lock()
549 defer m.stateLock.Unlock()
550 if m.state != StateRunning {
551 return nil
552 }
553 if m.stateRunningNode.ConsensusMember() == nil {
554 // TODO(q3k): in this case, we should return a client to etcd even though this
555 // node is not a member of consensus. For now, all nodes are consensus members.
556 return nil
557 }
558 return m.consensus.Cluster()
559}