blob: 10d699c1ac3a67cf38fc53bc7fe11ccac34de547 [file] [log] [blame]
Serge Bazanski1ebd1e12020-07-13 19:17:16 +02001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package cluster
18
19import (
20 "context"
Serge Bazanski42e61c62021-03-18 15:07:18 +010021 "crypto/ed25519"
22 "crypto/rand"
23 "encoding/hex"
24 "errors"
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020025 "fmt"
26 "io/ioutil"
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020027 "sync"
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020028
Serge Bazanski0ed2f962021-03-15 16:39:30 +010029 "google.golang.org/protobuf/proto"
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020030
Serge Bazanski31370b02021-01-07 16:31:14 +010031 "source.monogon.dev/metropolis/node/core/consensus"
32 "source.monogon.dev/metropolis/node/core/localstorage"
Serge Bazanski31370b02021-01-07 16:31:14 +010033 "source.monogon.dev/metropolis/node/core/network"
Serge Bazanski42e61c62021-03-18 15:07:18 +010034 "source.monogon.dev/metropolis/pkg/pki"
Serge Bazanski31370b02021-01-07 16:31:14 +010035 "source.monogon.dev/metropolis/pkg/supervisor"
36 apb "source.monogon.dev/metropolis/proto/api"
Serge Bazanski42e61c62021-03-18 15:07:18 +010037 ppb "source.monogon.dev/metropolis/proto/private"
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020038)
39
Serge Bazanski42e61c62021-03-18 15:07:18 +010040type managerResult struct {
41 node *Node
42 err error
43}
44
45type state struct {
46 mu sync.RWMutex
47
48 oneway bool
49 stateCluster ClusterState
50 stateNode ppb.Node_FSMState
51
52 configuration *ppb.SealedConfiguration
53
54 result *managerResult
55 waiters []chan *managerResult
56}
57
58func (s *state) setResult(node *Node, err error) {
59 s.result = &managerResult{
60 node: node,
61 err: err,
62 }
63 for _, w := range s.waiters {
64 go func(c chan *managerResult) {
65 c <- s.result
66 }(w)
67 }
68 s.waiters = nil
69}
70
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020071type Manager struct {
72 storageRoot *localstorage.Root
73 networkService *network.Service
74
Serge Bazanski42e61c62021-03-18 15:07:18 +010075 state
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020076
Serge Bazanski42e61c62021-03-18 15:07:18 +010077 // consensus is the spawned etcd/consensus service, if the Manager brought
78 // up a Node that should run one.
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020079 consensus *consensus.Service
80}
81
Serge Bazanski42e61c62021-03-18 15:07:18 +010082// NewManager creates a new cluster Manager. The given localstorage Root must
83// be places, but not yet started (and will be started as the Manager makes
84// progress). The given network Service must already be running.
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020085func NewManager(storageRoot *localstorage.Root, networkService *network.Service) *Manager {
86 return &Manager{
87 storageRoot: storageRoot,
88 networkService: networkService,
Serge Bazanski42e61c62021-03-18 15:07:18 +010089
90 state: state{
91 stateCluster: ClusterUnknown,
92 stateNode: ppb.Node_FSM_STATE_INVALID,
93 },
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020094 }
95}
96
Serge Bazanski42e61c62021-03-18 15:07:18 +010097func (m *Manager) lock() (*state, func()) {
98 m.mu.Lock()
99 return &m.state, m.mu.Unlock
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200100}
101
Serge Bazanski42e61c62021-03-18 15:07:18 +0100102func (m *Manager) rlock() (*state, func()) {
103 m.mu.RLock()
104 return &m.state, m.mu.RUnlock
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200105}
106
Serge Bazanski42e61c62021-03-18 15:07:18 +0100107func (m *Manager) Wait() (*Node, error) {
108 state, unlock := m.lock()
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200109
Serge Bazanski42e61c62021-03-18 15:07:18 +0100110 if state.result != nil {
111 unlock()
112 return state.result.node, state.result.err
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200113 }
114
Serge Bazanski42e61c62021-03-18 15:07:18 +0100115 c := make(chan *managerResult)
116 state.waiters = append(state.waiters, c)
117 unlock()
118 res := <-c
119 return res.node, res.err
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200120}
121
Serge Bazanski42e61c62021-03-18 15:07:18 +0100122// Run is the runnable of the Manager, to be started using the Supervisor. It
123// is one-shot, and should not be restarted.
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200124func (m *Manager) Run(ctx context.Context) error {
Serge Bazanski42e61c62021-03-18 15:07:18 +0100125 state, unlock := m.lock()
126 if state.oneway {
127 unlock()
128 // TODO(q3k): restart the entire system if this happens
129 return fmt.Errorf("cannot restart cluster manager")
130 }
131 state.oneway = true
132 unlock()
133
134 configuration, err := m.storageRoot.ESP.SealedConfiguration.Unseal()
135 if err == nil {
136 supervisor.Logger(ctx).Info("Sealed configuration present. attempting to join cluster")
137 return m.join(ctx, configuration)
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200138 }
139
Serge Bazanski42e61c62021-03-18 15:07:18 +0100140 if !errors.Is(err, localstorage.ErrNoSealed) {
141 return fmt.Errorf("unexpected sealed config error: %w", err)
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200142 }
143
Serge Bazanski42e61c62021-03-18 15:07:18 +0100144 supervisor.Logger(ctx).Info("No sealed configuration, looking for node parameters")
145
146 params, err := m.nodeParams(ctx)
147 if err != nil {
148 return fmt.Errorf("no parameters available: %w", err)
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200149 }
Serge Bazanski42e61c62021-03-18 15:07:18 +0100150
151 switch inner := params.Cluster.(type) {
152 case *apb.NodeParameters_ClusterBootstrap_:
153 return m.bootstrap(ctx, inner.ClusterBootstrap)
154 case *apb.NodeParameters_ClusterRegister_:
155 return m.register(ctx, inner.ClusterRegister)
156 default:
157 return fmt.Errorf("node parameters misconfigured: neither cluster_bootstrap nor cluster_register set")
158 }
159}
160
161func (m *Manager) bootstrap(ctx context.Context, bootstrap *apb.NodeParameters_ClusterBootstrap) error {
162 supervisor.Logger(ctx).Infof("Bootstrapping new cluster, owner public key: %s", hex.EncodeToString(bootstrap.OwnerPublicKey))
163 state, unlock := m.lock()
164 defer unlock()
165
166 state.configuration = &ppb.SealedConfiguration{}
167
168 // Mount new storage with generated CUK, and save LUK into sealed config proto.
169 supervisor.Logger(ctx).Infof("Bootstrapping: mounting new storage...")
170 cuk, err := m.storageRoot.Data.MountNew(state.configuration)
171 if err != nil {
172 return fmt.Errorf("could not make and mount data partition: %w", err)
173 }
174
175 pub, priv, err := ed25519.GenerateKey(rand.Reader)
176 if err != nil {
177 return fmt.Errorf("could not generate node keypair: %w", err)
178 }
179 supervisor.Logger(ctx).Infof("Bootstrapping: node public key: %s", hex.EncodeToString([]byte(pub)))
180
181 node := Node{
182 clusterUnlockKey: cuk,
183 pubkey: pub,
184 state: ppb.Node_FSM_STATE_UP,
185 // TODO(q3k): make this configurable.
186 consensusMember: &NodeRoleConsensusMember{},
187 kubernetesWorker: &NodeRoleKubernetesWorker{},
188 }
189
190 // Run worker to keep updating /ephemeral/hosts (and thus, /etc/hosts) with
191 // our own IP address. This ensures that the node's ID always resolves to
192 // its current external IP address.
193 supervisor.Run(ctx, "hostsfile", func(ctx context.Context) error {
194 supervisor.Signal(ctx, supervisor.SignalHealthy)
195 watcher := m.networkService.Watch()
196 for {
197 status, err := watcher.Get(ctx)
198 if err != nil {
199 return err
200 }
201 err = node.ConfigureLocalHostname(ctx, &m.storageRoot.Ephemeral, status.ExternalAddress)
202 if err != nil {
203 return fmt.Errorf("could not configure hostname: %w", err)
204 }
205 }
206 })
207
208 // Bring up consensus with this node as the only member.
209 m.consensus = consensus.New(consensus.Config{
210 Data: &m.storageRoot.Data.Etcd,
211 Ephemeral: &m.storageRoot.Ephemeral.Consensus,
212 NewCluster: true,
213 Name: node.ID(),
214 })
215
216 supervisor.Logger(ctx).Infof("Bootstrapping: starting consensus...")
217 if err := supervisor.Run(ctx, "consensus", m.consensus.Run); err != nil {
218 return fmt.Errorf("when starting consensus: %w", err)
219 }
220
221 supervisor.Logger(ctx).Info("Bootstrapping: waiting for consensus...")
222 if err := m.consensus.WaitReady(ctx); err != nil {
223 return fmt.Errorf("consensus service failed to become ready: %w", err)
224 }
225 supervisor.Logger(ctx).Info("Bootstrapping: consensus ready.")
226
227 kv := m.consensus.KVRoot()
228 node.KV = kv
229
230 // Create Metropolis CA and this node's certificate.
231 caCertBytes, _, err := PKICA.Ensure(ctx, kv)
232 if err != nil {
233 return fmt.Errorf("failed to create cluster CA: %w", err)
234 }
235 nodeCert := PKINamespace.New(PKICA, "", pki.Server([]string{node.ID()}, nil))
236 nodeCert.UseExistingKey(priv)
237 nodeCertBytes, _, err := nodeCert.Ensure(ctx, kv)
238 if err != nil {
239 return fmt.Errorf("failed to create node certificate: %w", err)
240 }
241
242 if err := m.storageRoot.Data.Node.Credentials.CACertificate.Write(caCertBytes, 0400); err != nil {
243 return fmt.Errorf("failed to write CA certificate: %w", err)
244 }
245 if err := m.storageRoot.Data.Node.Credentials.Certificate.Write(nodeCertBytes, 0400); err != nil {
246 return fmt.Errorf("failed to write node certificate: %w", err)
247 }
248 if err := m.storageRoot.Data.Node.Credentials.Key.Write(priv, 0400); err != nil {
249 return fmt.Errorf("failed to write node private key: %w", err)
250 }
251
252 // Update our Node obejct in etcd.
253 if err := node.Store(ctx, kv); err != nil {
254 return fmt.Errorf("failed to store new node in etcd: %w", err)
255 }
256
257 state.setResult(&node, nil)
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200258
259 supervisor.Signal(ctx, supervisor.SignalHealthy)
260 supervisor.Signal(ctx, supervisor.SignalDone)
261 return nil
262}
263
Serge Bazanski42e61c62021-03-18 15:07:18 +0100264func (m *Manager) register(ctx context.Context, bootstrap *apb.NodeParameters_ClusterRegister) error {
265 return fmt.Errorf("unimplemented")
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200266}
267
Serge Bazanski42e61c62021-03-18 15:07:18 +0100268func (m *Manager) nodeParamsFWCFG(ctx context.Context) (*apb.NodeParameters, error) {
269 bytes, err := ioutil.ReadFile("/sys/firmware/qemu_fw_cfg/by_name/dev.monogon.metropolis/parameters.pb/raw")
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200270 if err != nil {
Serge Bazanski42e61c62021-03-18 15:07:18 +0100271 return nil, fmt.Errorf("could not read firmware enrolment file: %w", err)
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200272 }
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200273
Serge Bazanski42e61c62021-03-18 15:07:18 +0100274 config := apb.NodeParameters{}
275 err = proto.Unmarshal(bytes, &config)
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200276 if err != nil {
Serge Bazanski42e61c62021-03-18 15:07:18 +0100277 return nil, fmt.Errorf("could not unmarshal: %v", err)
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200278 }
279
Serge Bazanski42e61c62021-03-18 15:07:18 +0100280 return &config, nil
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200281}
282
Serge Bazanski42e61c62021-03-18 15:07:18 +0100283func (m *Manager) nodeParams(ctx context.Context) (*apb.NodeParameters, error) {
284 // Retrieve node parameters from qemu's fwcfg interface or ESP.
285 // TODO(q3k): probably abstract this away and implement per platform/build/...
286 paramsFWCFG, err := m.nodeParamsFWCFG(ctx)
287 if err != nil {
288 supervisor.Logger(ctx).Warningf("Could not retrieve node parameters from qemu fwcfg: %v", err)
289 paramsFWCFG = nil
290 } else {
291 supervisor.Logger(ctx).Infof("Retrieved node parameters from qemu fwcfg")
292 }
293 paramsESP, err := m.storageRoot.ESP.NodeParameters.Unmarshal()
294 if err != nil {
295 supervisor.Logger(ctx).Warningf("Could not retrieve node parameters from ESP: %v", err)
296 paramsESP = nil
297 } else {
298 supervisor.Logger(ctx).Infof("Retrieved node parameters from ESP")
299 }
300 if paramsFWCFG == nil && paramsESP == nil {
301 return nil, fmt.Errorf("could not find node parameters in ESP or qemu fwcfg")
302 }
303 if paramsFWCFG != nil && paramsESP != nil {
304 supervisor.Logger(ctx).Warningf("Node parameters found both inboth ESP and qemu fwcfg, using the latter")
305 return paramsFWCFG, nil
306 } else if paramsFWCFG != nil {
307 return paramsFWCFG, nil
308 } else {
309 return paramsESP, nil
310 }
311}
312
313func (m *Manager) join(ctx context.Context, cfg *ppb.SealedConfiguration) error {
314 return fmt.Errorf("unimplemented")
315}
316
317// Node returns the Node that the Manager brought into a cluster, or nil if the
318// Manager is not Running. This is safe to call from any goroutine.
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200319func (m *Manager) Node() *Node {
Serge Bazanski42e61c62021-03-18 15:07:18 +0100320 return nil
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200321}