blob: dfdab367ae04733ca3d6cf85dd15434d07c3fc3a [file] [log] [blame]
Serge Bazanski1ebd1e12020-07-13 19:17:16 +02001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package cluster
18
19import (
20 "context"
Serge Bazanski57b43752020-07-13 19:17:48 +020021 "crypto/x509"
22 "encoding/pem"
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020023 "fmt"
24 "io/ioutil"
25 "os"
Serge Bazanski57b43752020-07-13 19:17:48 +020026 "strings"
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020027 "sync"
28 "time"
29
Serge Bazanski57b43752020-07-13 19:17:48 +020030 apb "git.monogon.dev/source/nexantic.git/core/proto/api"
31
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020032 "github.com/cenkalti/backoff/v4"
Serge Bazanski57b43752020-07-13 19:17:48 +020033 "github.com/golang/protobuf/proto"
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020034 "go.etcd.io/etcd/clientv3"
35 "go.uber.org/zap"
36
Serge Bazanski57b43752020-07-13 19:17:48 +020037 "git.monogon.dev/source/nexantic.git/core/internal/common"
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020038 "git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
39 "git.monogon.dev/source/nexantic.git/core/internal/consensus"
40 "git.monogon.dev/source/nexantic.git/core/internal/localstorage"
Serge Bazanski57b43752020-07-13 19:17:48 +020041 "git.monogon.dev/source/nexantic.git/core/internal/localstorage/declarative"
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020042 "git.monogon.dev/source/nexantic.git/core/internal/network"
43)
44
45// Manager is a finite state machine that joins this node (ie., Smalltown instance running on a virtual/physical machine)
46// into a Smalltown cluster (ie. group of nodes that act as a single control plane for Smalltown services). It does that
47// by bringing up all required operating-system level components, including mounting the local filesystem, bringing up
48// a consensus (etcd) server/client, ...
49//
50// The Manager runs as a single-shot Runnable. It will attempt to progress its state from the initial state (New) to
51// either Running (meaning that the node is now part of a cluster), or Failed (meaning that the node couldn't become
52// part of a cluster). It is not restartable, as it mutates quite a bit of implicit operating-system level state (like
53// filesystem mounts). As such, it's difficult to recover reliably from failures, and since these failures indicate
54// some high issues with the cluster configuration/state, a failure requires a full kernel reboot to retry (or fix/
55// reconfigure the node).
56//
57// Currently, the Manager only supports one flow for bringing up a Node: by creating a new cluster. As such, it's
58// missing the following flows:
59// - joining a new node into an already running cluster
60// - restarting a node into an already existing cluster
61// - restarting a node into an already running cluster (ie. full reboot of whole cluster)
62//
63type Manager struct {
64 storageRoot *localstorage.Root
65 networkService *network.Service
66
67 // stateLock locks all state* variables.
68 stateLock sync.RWMutex
69 // state is the FSM state of the Manager.
70 state State
71 // stateRunningNode is the Node that this Manager got from joining a cluster. It's only valid if the Manager is
72 // Running.
73 stateRunningNode *Node
74 // stateWaiters is a list of channels that wish to be notified (by sending true or false) for when the Manager
75 // reaches a final state (Running or Failed respectively).
76 stateWaiters []chan bool
77
Serge Bazanski57b43752020-07-13 19:17:48 +020078 // goldenTicket is the Golden Ticket present in the enrolment config, if any.
79 goldenTicket *apb.GoldenTicket
80
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020081 // consensus is the spawned etcd/consensus service, if the Manager brought up a Node that should run one.
82 consensus *consensus.Service
83}
84
85// NewManager creates a new cluster Manager. The given localstorage Root must be places, but not yet started (and will
86// be started as the Manager makes progress). The given network Service must already be running.
87func NewManager(storageRoot *localstorage.Root, networkService *network.Service) *Manager {
88 return &Manager{
89 storageRoot: storageRoot,
90 networkService: networkService,
91 }
92}
93
94// State is the state of the Manager finite state machine.
95type State int
96
97const (
98 // StateNew is the initial state of the Manager. It decides how to go about joining or creating a cluster.
99 StateNew State = iota
100 // StateCreatingCluster is when the Manager attempts to create a new cluster - this happens when a node is started
101 // with no EnrolmentConfig.
102 StateCreatingCluster
Serge Bazanski57b43752020-07-13 19:17:48 +0200103 // StateCharlie is when the Manager uses the Golden Ticket debug/stopgap system to join an already
104 // existing cluster. This mechanism will be removed before the first Smalltown release.
105 StateCharlie
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200106 // StateRunning is when the Manager successfully got the node to be part of a cluster. stateRunningNode is valid.
107 StateRunning
108 // StateFailed is when the Manager failed to ge the node to be part of a cluster.
109 StateFailed
110)
111
112func (s State) String() string {
113 switch s {
114 case StateNew:
115 return "New"
116 case StateCreatingCluster:
117 return "CreatingCluster"
Serge Bazanski57b43752020-07-13 19:17:48 +0200118 case StateCharlie:
119 return "Charlie"
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200120 case StateRunning:
121 return "Running"
122 case StateFailed:
123 return "Failed"
124 default:
125 return "UNKNOWN"
126 }
127}
128
129// allowedTransition describes all allowed state transitions (map[From][]To).
130var allowedTransitions = map[State][]State{
Serge Bazanski57b43752020-07-13 19:17:48 +0200131 StateNew: {StateCreatingCluster, StateCharlie},
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200132 StateCreatingCluster: {StateRunning, StateFailed},
Serge Bazanski57b43752020-07-13 19:17:48 +0200133 StateCharlie: {StateRunning, StateFailed},
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200134}
135
136// allowed returns whether a transition from a state to another state is allowed (ie. is defined in allowedTransitions).
137func (m *Manager) allowed(from, to State) bool {
138 for _, allowed := range allowedTransitions[from] {
139 if to == allowed {
140 return true
141 }
142 }
143 return false
144}
145
146// next moves the Manager finite state machine from its current state to `n`, or to Failed if the transition is not
147// allowed.
148func (m *Manager) next(ctx context.Context, n State) {
149 m.stateLock.Lock()
150 defer m.stateLock.Unlock()
151
152 if !m.allowed(m.state, n) {
153 supervisor.Logger(ctx).Error("Attempted invalid enrolment state transition, failing enrolment",
154 zap.String("from", m.state.String()), zap.String("to", m.state.String()))
155 m.state = StateFailed
156 return
157 }
158
159 supervisor.Logger(ctx).Info("Enrolment state change",
160 zap.String("from", m.state.String()), zap.String("to", n.String()))
161
162 m.state = n
163}
164
165// State returns the state of the Manager. It's safe to call this from any goroutine.
166func (m *Manager) State() State {
167 m.stateLock.RLock()
168 defer m.stateLock.RUnlock()
169 return m.state
170}
171
172// WaitFinished waits until the Manager FSM reaches Running or Failed, and returns true if the FSM is Running. It's
173// safe to call this from any goroutine.
174func (m *Manager) WaitFinished() (success bool) {
175 m.stateLock.Lock()
176 switch m.state {
177 case StateFailed:
178 m.stateLock.Unlock()
179 return false
180 case StateRunning:
181 m.stateLock.Unlock()
182 return true
183 }
184
185 C := make(chan bool)
186 m.stateWaiters = append(m.stateWaiters, C)
187 m.stateLock.Unlock()
188 return <-C
189}
190
191// wakeWaiters wakes any WaitFinished waiters and lets them know about the current state of the Manager.
192// The stateLock must already been taken, and the state must have been set in the same critical section (otherwise
193// this can cause a race condition).
194func (m *Manager) wakeWaiters() {
195 state := m.state
196 waiters := m.stateWaiters
197 m.stateWaiters = nil
198
199 for _, waiter := range waiters {
200 go func(w chan bool) {
201 w <- state == StateRunning
202 }(waiter)
203 }
204}
205
206// Run is the runnable of the Manager, to be started using the Supervisor. It is one-shot, and should not be restarted.
207func (m *Manager) Run(ctx context.Context) error {
208 if state := m.State(); state != StateNew {
209 supervisor.Logger(ctx).Error("Manager started with non-New state, failing", zap.String("state", state.String()))
210 m.stateLock.Lock()
211 m.state = StateFailed
212 m.wakeWaiters()
213 m.stateLock.Unlock()
214 return nil
215 }
216
217 var err error
218 bo := backoff.NewExponentialBackOff()
219 for {
220 done := false
221 state := m.State()
222 switch state {
223 case StateNew:
224 err = m.stateNew(ctx)
225 case StateCreatingCluster:
226 err = m.stateCreatingCluster(ctx)
Serge Bazanski57b43752020-07-13 19:17:48 +0200227 case StateCharlie:
228 err = m.stateCharlie(ctx)
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200229 default:
230 done = true
231 break
232 }
233
234 if err != nil || done {
235 break
236 }
237
238 if state == m.State() && !m.allowed(state, m.State()) {
239 supervisor.Logger(ctx).Error("Enrolment got stuck, failing", zap.String("state", m.state.String()))
240 m.stateLock.Lock()
241 m.state = StateFailed
242 m.stateLock.Unlock()
243 } else {
244 bo.Reset()
245 }
246 }
247
248 m.stateLock.Lock()
249 state := m.state
250 if state != StateRunning {
251 supervisor.Logger(ctx).Error("Enrolment failed", zap.Error(err), zap.String("state", m.state.String()))
252 } else {
253 supervisor.Logger(ctx).Info("Enrolment successful!")
254 }
255 m.wakeWaiters()
256 m.stateLock.Unlock()
257
258 supervisor.Signal(ctx, supervisor.SignalHealthy)
259 supervisor.Signal(ctx, supervisor.SignalDone)
260 return nil
261}
262
263// stateNew is called when a Manager is New. It makes the decision on how to join this node into a cluster.
264func (m *Manager) stateNew(ctx context.Context) error {
265 supervisor.Logger(ctx).Info("Starting enrolment process...")
266
267 // Check for presence of EnrolmentConfig on ESP or in qemu firmware variables.
268 var configRaw []byte
269 configRaw, err := m.storageRoot.ESP.Enrolment.Read()
270 if err != nil && !os.IsNotExist(err) {
271 return fmt.Errorf("could not read local enrolment file: %w", err)
272 } else if err != nil {
273 configRaw, err = ioutil.ReadFile("/sys/firmware/qemu_fw_cfg/by_name/com.nexantic.smalltown/enrolment.pb/raw")
274 if err != nil && !os.IsNotExist(err) {
275 return fmt.Errorf("could not read firmware enrolment file: %w", err)
276 }
277 }
278
279 // If no enrolment file exists, we create a new cluster.
280 if configRaw == nil {
281 m.next(ctx, StateCreatingCluster)
282 return nil
283 }
284
Serge Bazanski57b43752020-07-13 19:17:48 +0200285 // Enrolment file exists, parse it.
286
287 enrolmentConfig := apb.EnrolmentConfig{}
288 if err := proto.Unmarshal(configRaw, &enrolmentConfig); err != nil {
289 return fmt.Errorf("could not unmarshal local enrolment file: %w", err)
290 }
291
292 // If no join ticket exists, we can't do anything yet.
293 if enrolmentConfig.GoldenTicket == nil {
294 return fmt.Errorf("joining a cluster without a golden ticket not yet implemented")
295 }
296
297 // Otherwise, we begin enrolling with the Golden Ticket.
298 m.next(ctx, StateCharlie)
299 return nil
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200300}
301
302// stateCreatingCluster is called when the Manager has decided to create a new cluster.
303//
304// The process to create a new cluster is as follows:
305// - wait for IP address
306// - initialize new data partition, by generating local and cluster unlock keys (the local unlock key is saved to
307// the ESP, while the cluster unlock key is returned)
308// - create a new node certificate and Node (with new given cluster unlock key)
309// - start up a new etcd cluster, with this node being the only member
310// - save the new Node to the new etcd cluster (thereby saving the node's cluster unlock key to etcd)
311func (m *Manager) stateCreatingCluster(ctx context.Context) error {
312 logger := supervisor.Logger(ctx)
313 logger.Info("Creating new cluster: waiting for IP address...")
314 ip, err := m.networkService.GetIP(ctx, true)
315 if err != nil {
316 return fmt.Errorf("when getting IP address: %w", err)
317 }
318 logger.Info("Creating new cluster: got IP address", zap.String("address", ip.String()))
319
320 logger.Info("Creating new cluster: initializing storage...")
321 cuk, err := m.storageRoot.Data.MountNew(&m.storageRoot.ESP.LocalUnlock)
322 if err != nil {
323 return fmt.Errorf("when making new data partition: %w", err)
324 }
325 logger.Info("Creating new cluster: storage initialized")
326
327 // Create certificate for node.
328 cert, err := m.storageRoot.Data.Node.EnsureSelfSigned(localstorage.CertificateForNode)
329 if err != nil {
330 return fmt.Errorf("failed to create new node certificate: %w", err)
331 }
332
333 node := NewNode(cuk, *ip, *cert.Leaf)
334
335 m.consensus = consensus.New(consensus.Config{
336 Data: &m.storageRoot.Data.Etcd,
337 Ephemeral: &m.storageRoot.Ephemeral.Consensus,
338 NewCluster: true,
339 Name: node.ID(),
340 InitialCluster: ip.String(),
341 ExternalHost: ip.String(),
342 ListenHost: ip.String(),
343 })
344 if err := supervisor.Run(ctx, "consensus", m.consensus.Run); err != nil {
345 return fmt.Errorf("when starting consensus: %w", err)
346 }
347
348 // TODO(q3k): make timeout configurable?
349 ctxT, ctxC := context.WithTimeout(ctx, 5*time.Second)
350 defer ctxC()
351
352 supervisor.Logger(ctx).Info("Creating new cluster: waiting for consensus...")
353 if err := m.consensus.WaitReady(ctxT); err != nil {
354 return fmt.Errorf("consensus service failed to become ready: %w", err)
355 }
356
357 // Configure node to be a consensus member and kubernetes worker. In the future, different nodes will have
358 // different roles, but for now they're all symmetrical.
359 _, consensusName, err := m.consensus.MemberInfo(ctx)
360 if err != nil {
361 return fmt.Errorf("could not get consensus MemberInfo: %w", err)
362 }
363 if err := node.MakeConsensusMember(consensusName); err != nil {
364 return fmt.Errorf("could not make new node into consensus member: %w", err)
365 }
366 if err := node.MakeKubernetesWorker(node.ID()); err != nil {
367 return fmt.Errorf("could not make new node into kubernetes worker: %w", err)
368 }
369
370 // Save node into etcd.
371 supervisor.Logger(ctx).Info("Creating new cluster: storing first node...")
372 if err := node.Store(ctx, m.consensus.KV("cluster", "enrolment")); err != nil {
373 return fmt.Errorf("could not save new node: %w", err)
374 }
375
376 m.stateLock.Lock()
377 m.stateRunningNode = node
378 m.stateLock.Unlock()
379
380 m.next(ctx, StateRunning)
381 return nil
382}
383
Serge Bazanski57b43752020-07-13 19:17:48 +0200384// stateCharlie is used to join an existing cluster via the GoldenTicket mechanism. This mechanism is temporarily
385// implemented in Smalltown in order to allow for testing multi-node clusters without a TPM attestation flow implemented.
386// The Golden Ticket contains a pregenerated node certificate, etcd certificate, and other data that any node can
387// use to join the cluster.
388// Since this flow is temporary, it has a slight impedance mismatch with methods exposed by localstorage, node, etc.,
389// and the resulting sequencing is a bit odd:
390// - the {node,etcd} certificates/keys are loaded (this already dictates the new node name, as the node name is based
391// off of the node public key)
392// - local storage is initialized, a local/cluster unlock keypair is generated
393// - etcd keys are manually saved to localstorage (vs. being generated locally by CA)
394// - an etcd/consensus member is started, knowing that the remote member was already generated when the golden ticket
395// was generated (vs. being created now by an RPC call, via an promote-node-to-etcd-member flow)
396// - the node is then promoted to a consensus member and kubernetes worker, its clusterunlock key is set, and then it
397// is saved to etcd.
398// As such, in this flow, we first create an etcd member (on goldenticket generation), and then only create a new Smalltown
399// node (when the goldenticket is used).
400func (m *Manager) stateCharlie(ctx context.Context) error {
401 t := m.goldenTicket
402 nodeCert, err := x509.ParseCertificate(t.NodeCert)
403 if err != nil {
404 return fmt.Errorf("parsing node certificate from ticket: %w", err)
405 }
406
407 supervisor.Logger(ctx).Info("Joining cluster: waiting for IP address...")
408 ip, err := m.networkService.GetIP(ctx, true)
409 if err != nil {
410 return fmt.Errorf("when getting IP address: %w", err)
411 }
412 supervisor.Logger(ctx).Info("Joining cluster: got IP address", zap.String("address", ip.String()))
413
414 supervisor.Logger(ctx).Info("Joining cluster: initializing storage...")
415 cuk, err := m.storageRoot.Data.MountNew(&m.storageRoot.ESP.LocalUnlock)
416 if err != nil {
417 return fmt.Errorf("when making new data partition: %w", err)
418 }
419 supervisor.Logger(ctx).Info("Joining cluster: storage initialized")
420 node := NewNode(cuk, *ip, *nodeCert)
421
422 // Save etcd PKI to disk.
423 for _, f := range []struct {
424 target declarative.FilePlacement
425 data []byte
426 blockType string
427 }{
428 {m.storageRoot.Data.Etcd.PeerPKI.Key, t.EtcdClientKey, "PRIVATE KEY"},
429 {m.storageRoot.Data.Etcd.PeerPKI.Certificate, t.EtcdClientCert, "CERTIFICATE"},
430 {m.storageRoot.Data.Etcd.PeerPKI.CACertificate, t.EtcdCaCert, "CERTIFICATE"},
431 } {
432 if err := f.target.Write(pem.EncodeToMemory(&pem.Block{Type: f.blockType, Bytes: f.data}), 0600); err != nil {
433 return fmt.Errorf("when writing etcd PKI data: %w", err)
434 }
435 }
436 if err := m.storageRoot.Data.Etcd.PeerCRL.Write(t.EtcdCrl, 0600); err != nil {
437 return fmt.Errorf("when writing etcd CRL: %w", err)
438 }
439
440 https := func(p *apb.GoldenTicket_EtcdPeer) string {
441 return fmt.Sprintf("%s=https://%s:%d", p.Name, p.Address, common.ConsensusPort)
442 }
443 var initialCluster []string
444 for _, p := range t.Peers {
445 initialCluster = append(initialCluster, https(p))
446 }
447 initialCluster = append(initialCluster, https(t.This))
448
449 supervisor.Logger(ctx).Info("Joining cluster: starting etcd join...",
450 zap.String("initial_cluster", strings.Join(initialCluster, ",")), zap.String("name", node.ID()))
451 m.consensus = consensus.New(consensus.Config{
452 Data: &m.storageRoot.Data.Etcd,
453 Ephemeral: &m.storageRoot.Ephemeral.Consensus,
454 Name: node.ID(),
455 InitialCluster: strings.Join(initialCluster, ","),
456 ExternalHost: ip.String(),
457 ListenHost: ip.String(),
458 })
459
460 if err := supervisor.Run(ctx, "consensus", m.consensus.Run); err != nil {
461 return fmt.Errorf("when starting consensus: %w", err)
462 }
463
464 // TODO(q3k): make timeout configurable?
465 ctxT, ctxC := context.WithTimeout(ctx, 5*time.Second)
466 defer ctxC()
467
468 supervisor.Logger(ctx).Info("Joining cluster: waiting for consensus...")
469 if err := m.consensus.WaitReady(ctxT); err != nil {
470 return fmt.Errorf("consensus service failed to become ready: %w", err)
471 }
472
473 // Configure node to be a consensus member and kubernetes worker. In the future, different nodes will have
474 // different roles, but for now they're all symmetrical.
475 _, consensusName, err := m.consensus.MemberInfo(ctx)
476 if err != nil {
477 return fmt.Errorf("could not get consensus MemberInfo: %w", err)
478 }
479 if err := node.MakeConsensusMember(consensusName); err != nil {
480 return fmt.Errorf("could not make new node into consensus member: %w", err)
481 }
482 if err := node.MakeKubernetesWorker(node.ID()); err != nil {
483 return fmt.Errorf("could not make new node into kubernetes worker: %w", err)
484 }
485
486 // Save node into etcd.
487 supervisor.Logger(ctx).Info("Creating new cluster: storing first node...")
488 if err := node.Store(ctx, m.consensus.KV("cluster", "enrolment")); err != nil {
489 return fmt.Errorf("could not save new node: %w", err)
490 }
491
492 m.stateLock.Lock()
493 m.stateRunningNode = node
494 m.stateLock.Unlock()
495
496 m.next(ctx, StateRunning)
497 return nil
498}
499
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200500// Node returns the Node that the Manager brought into a cluster, or nil if the Manager is not Running.
501// This is safe to call from any goroutine.
502func (m *Manager) Node() *Node {
503 m.stateLock.Lock()
504 defer m.stateLock.Unlock()
505 if m.state != StateRunning {
506 return nil
507 }
508 return m.stateRunningNode
509}
510
511// ConsensusKV returns a namespaced etcd KV client, or nil if the Manager is not Running.
512// This is safe to call from any goroutine.
513func (m *Manager) ConsensusKV(module, space string) clientv3.KV {
514 m.stateLock.Lock()
515 defer m.stateLock.Unlock()
516 if m.state != StateRunning {
517 return nil
518 }
519 if m.stateRunningNode.ConsensusMember() == nil {
520 // TODO(q3k): in this case, we should return a client to etcd even though this
521 // node is not a member of consensus. For now, all nodes are consensus members.
522 return nil
523 }
524 return m.consensus.KV(module, space)
525}
526
527// ConsensusKVRoot returns a non-namespaced etcd KV client, or nil if the Manager is not Running.
528// This is safe to call from any goroutine.
529func (m *Manager) ConsensusKVRoot() clientv3.KV {
530 m.stateLock.Lock()
531 defer m.stateLock.Unlock()
532 if m.state != StateRunning {
533 return nil
534 }
535 if m.stateRunningNode.ConsensusMember() == nil {
536 // TODO(q3k): in this case, we should return a client to etcd even though this
537 // node is not a member of consensus. For now, all nodes are consensus members.
538 return nil
539 }
540 return m.consensus.KVRoot()
541}
542
543// ConsensusCluster returns an etcd Cluster client, or nil if the Manager is not Running.
544// This is safe to call from any goroutine.
545func (m *Manager) ConsensusCluster() clientv3.Cluster {
546 m.stateLock.Lock()
547 defer m.stateLock.Unlock()
548 if m.state != StateRunning {
549 return nil
550 }
551 if m.stateRunningNode.ConsensusMember() == nil {
552 // TODO(q3k): in this case, we should return a client to etcd even though this
553 // node is not a member of consensus. For now, all nodes are consensus members.
554 return nil
555 }
556 return m.consensus.Cluster()
557}