blob: d7ffe5a029291544cb8efa0f2b7fb7e39f47c380 [file] [log] [blame]
Serge Bazanski1ebd1e12020-07-13 19:17:16 +02001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package cluster
18
19import (
20 "context"
21 "fmt"
22 "io/ioutil"
23 "os"
24 "sync"
25 "time"
26
27 "github.com/cenkalti/backoff/v4"
28 "go.etcd.io/etcd/clientv3"
Serge Bazanski0ed2f962021-03-15 16:39:30 +010029 "google.golang.org/protobuf/proto"
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020030
Serge Bazanski31370b02021-01-07 16:31:14 +010031 "source.monogon.dev/metropolis/node/core/consensus"
32 "source.monogon.dev/metropolis/node/core/localstorage"
Serge Bazanski31370b02021-01-07 16:31:14 +010033 "source.monogon.dev/metropolis/node/core/network"
34 "source.monogon.dev/metropolis/pkg/supervisor"
35 apb "source.monogon.dev/metropolis/proto/api"
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020036)
37
Serge Bazanski662b5b32020-12-21 13:49:00 +010038// Manager is a finite state machine that joins this node (ie., Metropolis node running on a virtual/physical machine)
39// into a Metropolis cluster (ie. group of nodes that act as a single control plane for Metropolis services). It does
40// this by bringing up all required operating-system level components, including mounting the local filesystem, bringing
41// up a consensus (etcd) server/client, ...
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020042//
43// The Manager runs as a single-shot Runnable. It will attempt to progress its state from the initial state (New) to
44// either Running (meaning that the node is now part of a cluster), or Failed (meaning that the node couldn't become
45// part of a cluster). It is not restartable, as it mutates quite a bit of implicit operating-system level state (like
46// filesystem mounts). As such, it's difficult to recover reliably from failures, and since these failures indicate
47// some high issues with the cluster configuration/state, a failure requires a full kernel reboot to retry (or fix/
48// reconfigure the node).
49//
50// Currently, the Manager only supports one flow for bringing up a Node: by creating a new cluster. As such, it's
51// missing the following flows:
52// - joining a new node into an already running cluster
53// - restarting a node into an already existing cluster
54// - restarting a node into an already running cluster (ie. full reboot of whole cluster)
55//
56type Manager struct {
57 storageRoot *localstorage.Root
58 networkService *network.Service
59
60 // stateLock locks all state* variables.
61 stateLock sync.RWMutex
62 // state is the FSM state of the Manager.
63 state State
64 // stateRunningNode is the Node that this Manager got from joining a cluster. It's only valid if the Manager is
65 // Running.
66 stateRunningNode *Node
67 // stateWaiters is a list of channels that wish to be notified (by sending true or false) for when the Manager
68 // reaches a final state (Running or Failed respectively).
69 stateWaiters []chan bool
70
71 // consensus is the spawned etcd/consensus service, if the Manager brought up a Node that should run one.
72 consensus *consensus.Service
73}
74
75// NewManager creates a new cluster Manager. The given localstorage Root must be places, but not yet started (and will
76// be started as the Manager makes progress). The given network Service must already be running.
77func NewManager(storageRoot *localstorage.Root, networkService *network.Service) *Manager {
78 return &Manager{
79 storageRoot: storageRoot,
80 networkService: networkService,
81 }
82}
83
84// State is the state of the Manager finite state machine.
85type State int
86
87const (
88 // StateNew is the initial state of the Manager. It decides how to go about joining or creating a cluster.
89 StateNew State = iota
90 // StateCreatingCluster is when the Manager attempts to create a new cluster - this happens when a node is started
91 // with no EnrolmentConfig.
92 StateCreatingCluster
93 // StateRunning is when the Manager successfully got the node to be part of a cluster. stateRunningNode is valid.
94 StateRunning
95 // StateFailed is when the Manager failed to ge the node to be part of a cluster.
96 StateFailed
97)
98
99func (s State) String() string {
100 switch s {
101 case StateNew:
102 return "New"
103 case StateCreatingCluster:
104 return "CreatingCluster"
105 case StateRunning:
106 return "Running"
107 case StateFailed:
108 return "Failed"
109 default:
110 return "UNKNOWN"
111 }
112}
113
114// allowedTransition describes all allowed state transitions (map[From][]To).
115var allowedTransitions = map[State][]State{
Serge Bazanski0ed2f962021-03-15 16:39:30 +0100116 StateNew: {StateCreatingCluster},
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200117 StateCreatingCluster: {StateRunning, StateFailed},
118}
119
120// allowed returns whether a transition from a state to another state is allowed (ie. is defined in allowedTransitions).
121func (m *Manager) allowed(from, to State) bool {
122 for _, allowed := range allowedTransitions[from] {
123 if to == allowed {
124 return true
125 }
126 }
127 return false
128}
129
130// next moves the Manager finite state machine from its current state to `n`, or to Failed if the transition is not
131// allowed.
132func (m *Manager) next(ctx context.Context, n State) {
133 m.stateLock.Lock()
134 defer m.stateLock.Unlock()
135
136 if !m.allowed(m.state, n) {
Serge Bazanskic7359672020-10-30 16:38:57 +0100137 supervisor.Logger(ctx).Errorf("Attempted invalid enrolment state transition, failing enrolment; from: %s, to: %s",
138 m.state.String(), n.String())
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200139 m.state = StateFailed
140 return
141 }
142
Serge Bazanskic7359672020-10-30 16:38:57 +0100143 supervisor.Logger(ctx).Infof("Enrolment state change; from: %s, to: %s", m.state.String(), n.String())
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200144
145 m.state = n
146}
147
148// State returns the state of the Manager. It's safe to call this from any goroutine.
149func (m *Manager) State() State {
150 m.stateLock.RLock()
151 defer m.stateLock.RUnlock()
152 return m.state
153}
154
155// WaitFinished waits until the Manager FSM reaches Running or Failed, and returns true if the FSM is Running. It's
156// safe to call this from any goroutine.
157func (m *Manager) WaitFinished() (success bool) {
158 m.stateLock.Lock()
159 switch m.state {
160 case StateFailed:
161 m.stateLock.Unlock()
162 return false
163 case StateRunning:
164 m.stateLock.Unlock()
165 return true
166 }
167
168 C := make(chan bool)
169 m.stateWaiters = append(m.stateWaiters, C)
170 m.stateLock.Unlock()
171 return <-C
172}
173
174// wakeWaiters wakes any WaitFinished waiters and lets them know about the current state of the Manager.
175// The stateLock must already been taken, and the state must have been set in the same critical section (otherwise
176// this can cause a race condition).
177func (m *Manager) wakeWaiters() {
178 state := m.state
179 waiters := m.stateWaiters
180 m.stateWaiters = nil
181
182 for _, waiter := range waiters {
183 go func(w chan bool) {
184 w <- state == StateRunning
185 }(waiter)
186 }
187}
188
189// Run is the runnable of the Manager, to be started using the Supervisor. It is one-shot, and should not be restarted.
190func (m *Manager) Run(ctx context.Context) error {
191 if state := m.State(); state != StateNew {
Serge Bazanskic7359672020-10-30 16:38:57 +0100192 supervisor.Logger(ctx).Errorf("Manager started with non-New state %s, failing", state.String())
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200193 m.stateLock.Lock()
194 m.state = StateFailed
195 m.wakeWaiters()
196 m.stateLock.Unlock()
197 return nil
198 }
199
200 var err error
201 bo := backoff.NewExponentialBackOff()
202 for {
203 done := false
204 state := m.State()
205 switch state {
206 case StateNew:
207 err = m.stateNew(ctx)
208 case StateCreatingCluster:
209 err = m.stateCreatingCluster(ctx)
210 default:
211 done = true
212 break
213 }
214
215 if err != nil || done {
216 break
217 }
218
219 if state == m.State() && !m.allowed(state, m.State()) {
Serge Bazanskic7359672020-10-30 16:38:57 +0100220 supervisor.Logger(ctx).Errorf("Enrolment got stuck at %s, failing", m.state.String())
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200221 m.stateLock.Lock()
222 m.state = StateFailed
223 m.stateLock.Unlock()
224 } else {
225 bo.Reset()
226 }
227 }
228
229 m.stateLock.Lock()
230 state := m.state
231 if state != StateRunning {
Serge Bazanskic7359672020-10-30 16:38:57 +0100232 supervisor.Logger(ctx).Errorf("Enrolment failed at %s: %v", m.state.String(), err)
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200233 } else {
234 supervisor.Logger(ctx).Info("Enrolment successful!")
235 }
236 m.wakeWaiters()
237 m.stateLock.Unlock()
238
239 supervisor.Signal(ctx, supervisor.SignalHealthy)
240 supervisor.Signal(ctx, supervisor.SignalDone)
241 return nil
242}
243
244// stateNew is called when a Manager is New. It makes the decision on how to join this node into a cluster.
245func (m *Manager) stateNew(ctx context.Context) error {
246 supervisor.Logger(ctx).Info("Starting enrolment process...")
247
Serge Bazanski0ed2f962021-03-15 16:39:30 +0100248 // STOPGAP when migrating to enrolment config and cluster lifecycle: always
249 // expect NodeParameters with ClusterBootstrap.
250
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200251 // Check for presence of EnrolmentConfig on ESP or in qemu firmware variables.
252 var configRaw []byte
Serge Bazanski0ed2f962021-03-15 16:39:30 +0100253 configRaw, err := m.storageRoot.ESP.NodeParameters.Read()
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200254 if err != nil && !os.IsNotExist(err) {
255 return fmt.Errorf("could not read local enrolment file: %w", err)
256 } else if err != nil {
Serge Bazanski0ed2f962021-03-15 16:39:30 +0100257 configRaw, err = ioutil.ReadFile("/sys/firmware/qemu_fw_cfg/by_name/dev.monogon.metropolis/parameters.pb/raw")
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200258 if err != nil && !os.IsNotExist(err) {
259 return fmt.Errorf("could not read firmware enrolment file: %w", err)
260 }
261 }
262
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200263 if configRaw == nil {
Serge Bazanski0ed2f962021-03-15 16:39:30 +0100264 return fmt.Errorf("no enrolment config present")
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200265 }
266
Serge Bazanski0ed2f962021-03-15 16:39:30 +0100267 parameters := &apb.NodeParameters{}
268 if err := proto.Unmarshal(configRaw, parameters); err != nil {
269 return fmt.Errorf("enrolment config could not get unmarshaled: %w", err)
Serge Bazanski57b43752020-07-13 19:17:48 +0200270 }
271
Serge Bazanski0ed2f962021-03-15 16:39:30 +0100272 switch parameters.Cluster.(type) {
273 case *apb.NodeParameters_ClusterBootstrap_:
274 default:
275 return fmt.Errorf("enrolment config has no ClusterBootstrap: %w", err)
Serge Bazanski57b43752020-07-13 19:17:48 +0200276 }
277
Serge Bazanski0ed2f962021-03-15 16:39:30 +0100278 m.next(ctx, StateCreatingCluster)
Serge Bazanski57b43752020-07-13 19:17:48 +0200279 return nil
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200280}
281
282// stateCreatingCluster is called when the Manager has decided to create a new cluster.
283//
284// The process to create a new cluster is as follows:
285// - wait for IP address
286// - initialize new data partition, by generating local and cluster unlock keys (the local unlock key is saved to
287// the ESP, while the cluster unlock key is returned)
288// - create a new node certificate and Node (with new given cluster unlock key)
289// - start up a new etcd cluster, with this node being the only member
290// - save the new Node to the new etcd cluster (thereby saving the node's cluster unlock key to etcd)
291func (m *Manager) stateCreatingCluster(ctx context.Context) error {
292 logger := supervisor.Logger(ctx)
293 logger.Info("Creating new cluster: waiting for IP address...")
Serge Bazanskid8af5bf2021-03-16 13:38:29 +0100294
295 // STOPGAP: bad use of watcher (should be long-term)
296 watcher := m.networkService.Watch()
297 defer watcher.Close()
298 data, err := watcher.Get(ctx)
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200299 if err != nil {
300 return fmt.Errorf("when getting IP address: %w", err)
301 }
Serge Bazanskid8af5bf2021-03-16 13:38:29 +0100302 ip := data.ExternalAddress
Serge Bazanskic7359672020-10-30 16:38:57 +0100303 logger.Infof("Creating new cluster: got IP address %s", ip.String())
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200304
305 logger.Info("Creating new cluster: initializing storage...")
306 cuk, err := m.storageRoot.Data.MountNew(&m.storageRoot.ESP.LocalUnlock)
307 if err != nil {
308 return fmt.Errorf("when making new data partition: %w", err)
309 }
310 logger.Info("Creating new cluster: storage initialized")
311
312 // Create certificate for node.
313 cert, err := m.storageRoot.Data.Node.EnsureSelfSigned(localstorage.CertificateForNode)
314 if err != nil {
315 return fmt.Errorf("failed to create new node certificate: %w", err)
316 }
317
Serge Bazanskid8af5bf2021-03-16 13:38:29 +0100318 node := NewNode(cuk, ip, *cert.Leaf)
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200319
320 m.consensus = consensus.New(consensus.Config{
Serge Bazanski34fe8c62021-03-16 13:20:09 +0100321 Data: &m.storageRoot.Data.Etcd,
322 Ephemeral: &m.storageRoot.Ephemeral.Consensus,
323 NewCluster: true,
324 Name: node.ID(),
325 // STOPGAP: this will not be used after the manager rewrite.
326 ExternalHost: ip.String(),
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200327 })
328 if err := supervisor.Run(ctx, "consensus", m.consensus.Run); err != nil {
329 return fmt.Errorf("when starting consensus: %w", err)
330 }
331
332 // TODO(q3k): make timeout configurable?
333 ctxT, ctxC := context.WithTimeout(ctx, 5*time.Second)
334 defer ctxC()
335
336 supervisor.Logger(ctx).Info("Creating new cluster: waiting for consensus...")
337 if err := m.consensus.WaitReady(ctxT); err != nil {
338 return fmt.Errorf("consensus service failed to become ready: %w", err)
339 }
340
341 // Configure node to be a consensus member and kubernetes worker. In the future, different nodes will have
342 // different roles, but for now they're all symmetrical.
343 _, consensusName, err := m.consensus.MemberInfo(ctx)
344 if err != nil {
345 return fmt.Errorf("could not get consensus MemberInfo: %w", err)
346 }
347 if err := node.MakeConsensusMember(consensusName); err != nil {
348 return fmt.Errorf("could not make new node into consensus member: %w", err)
349 }
350 if err := node.MakeKubernetesWorker(node.ID()); err != nil {
351 return fmt.Errorf("could not make new node into kubernetes worker: %w", err)
352 }
353
354 // Save node into etcd.
355 supervisor.Logger(ctx).Info("Creating new cluster: storing first node...")
356 if err := node.Store(ctx, m.consensus.KV("cluster", "enrolment")); err != nil {
357 return fmt.Errorf("could not save new node: %w", err)
358 }
359
360 m.stateLock.Lock()
361 m.stateRunningNode = node
362 m.stateLock.Unlock()
363
364 m.next(ctx, StateRunning)
365 return nil
366}
367
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200368// Node returns the Node that the Manager brought into a cluster, or nil if the Manager is not Running.
369// This is safe to call from any goroutine.
370func (m *Manager) Node() *Node {
371 m.stateLock.Lock()
372 defer m.stateLock.Unlock()
373 if m.state != StateRunning {
374 return nil
375 }
376 return m.stateRunningNode
377}
378
379// ConsensusKV returns a namespaced etcd KV client, or nil if the Manager is not Running.
380// This is safe to call from any goroutine.
381func (m *Manager) ConsensusKV(module, space string) clientv3.KV {
382 m.stateLock.Lock()
383 defer m.stateLock.Unlock()
384 if m.state != StateRunning {
385 return nil
386 }
387 if m.stateRunningNode.ConsensusMember() == nil {
388 // TODO(q3k): in this case, we should return a client to etcd even though this
389 // node is not a member of consensus. For now, all nodes are consensus members.
390 return nil
391 }
392 return m.consensus.KV(module, space)
393}
394
395// ConsensusKVRoot returns a non-namespaced etcd KV client, or nil if the Manager is not Running.
396// This is safe to call from any goroutine.
397func (m *Manager) ConsensusKVRoot() clientv3.KV {
398 m.stateLock.Lock()
399 defer m.stateLock.Unlock()
400 if m.state != StateRunning {
401 return nil
402 }
403 if m.stateRunningNode.ConsensusMember() == nil {
404 // TODO(q3k): in this case, we should return a client to etcd even though this
405 // node is not a member of consensus. For now, all nodes are consensus members.
406 return nil
407 }
408 return m.consensus.KVRoot()
409}
410
411// ConsensusCluster returns an etcd Cluster client, or nil if the Manager is not Running.
412// This is safe to call from any goroutine.
413func (m *Manager) ConsensusCluster() clientv3.Cluster {
414 m.stateLock.Lock()
415 defer m.stateLock.Unlock()
416 if m.state != StateRunning {
417 return nil
418 }
419 if m.stateRunningNode.ConsensusMember() == nil {
420 // TODO(q3k): in this case, we should return a client to etcd even though this
421 // node is not a member of consensus. For now, all nodes are consensus members.
422 return nil
423 }
424 return m.consensus.Cluster()
425}