blob: 33a352ac090350df5e1975b0a0b2fca8afbf9cb7 [file] [log] [blame]
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +02001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
Serge Bazanskicb883e22020-07-06 17:47:55 +020017// Package consensus implements a managed etcd cluster member service, with a self-hosted CA system for issuing peer
Serge Bazanski662b5b32020-12-21 13:49:00 +010018// certificates. Currently each Metropolis node runs an etcd member, and connects to the etcd member locally over a
Serge Bazanskicb883e22020-07-06 17:47:55 +020019// domain socket.
20//
21// The service supports two modes of startup:
22// - initializing a new cluster, by bootstrapping the CA in memory, starting a cluster, committing the CA to etcd
23// afterwards, and saving the new node's certificate to local storage
24// - joining an existing cluster, using certificates from local storage and loading the CA from etcd. This flow is also
25// used when the node joins a cluster for the first time (then the certificates required must be provisioned
26// externally before starting the consensus service).
27//
28// Regardless of how the etcd member service was started, the resulting running service is further managed and used
29// in the same way.
30//
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020031package consensus
32
33import (
34 "context"
Lorenz Bruna4ea9d02019-10-31 11:40:30 +010035 "encoding/pem"
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020036 "fmt"
Lorenz Bruna4ea9d02019-10-31 11:40:30 +010037 "net/url"
Serge Bazanskicb883e22020-07-06 17:47:55 +020038 "sync"
Lorenz Bruna4ea9d02019-10-31 11:40:30 +010039 "time"
40
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020041 "go.etcd.io/etcd/clientv3"
42 "go.etcd.io/etcd/clientv3/namespace"
43 "go.etcd.io/etcd/embed"
Lorenz Brunfc5dbc62020-05-28 12:18:07 +020044 "go.uber.org/atomic"
Hendrik Hofstadt8efe51e2020-02-28 12:53:41 +010045
Serge Bazanski31370b02021-01-07 16:31:14 +010046 node "source.monogon.dev/metropolis/node"
47 "source.monogon.dev/metropolis/node/core/consensus/ca"
48 "source.monogon.dev/metropolis/node/core/localstorage"
49 "source.monogon.dev/metropolis/pkg/supervisor"
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020050)
51
52const (
Serge Bazanski662b5b32020-12-21 13:49:00 +010053 DefaultClusterToken = "METROPOLIS"
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020054 DefaultLogger = "zap"
55)
56
Serge Bazanskicb883e22020-07-06 17:47:55 +020057// Service is the etcd cluster member service.
58type Service struct {
59 // The configuration with which the service was started. This is immutable.
60 config *Config
Lorenz Bruna4ea9d02019-10-31 11:40:30 +010061
Serge Bazanskicb883e22020-07-06 17:47:55 +020062 // stateMu guards state. This is locked internally on public methods of Service that require access to state. The
63 // state might be recreated on service restart.
64 stateMu sync.Mutex
65 state *state
66}
Lorenz Brun6e8f69c2019-11-18 10:44:24 +010067
Serge Bazanskicb883e22020-07-06 17:47:55 +020068// state is the runtime state of a running etcd member.
69type state struct {
70 etcd *embed.Etcd
71 ready atomic.Bool
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020072
Serge Bazanskicb883e22020-07-06 17:47:55 +020073 ca *ca.CA
74 // cl is an etcd client that loops back to the localy running etcd server. This runs over the Client unix domain
75 // socket that etcd starts.
76 cl *clientv3.Client
77}
Leopold Schabel68c58752019-11-14 21:00:59 +010078
Serge Bazanskicb883e22020-07-06 17:47:55 +020079type Config struct {
80 // Data directory (persistent, encrypted storage) for etcd.
81 Data *localstorage.DataEtcdDirectory
82 // Ephemeral directory for etcd.
83 Ephemeral *localstorage.EphemeralConsensusDirectory
Leopold Schabel68c58752019-11-14 21:00:59 +010084
Serge Bazanskicb883e22020-07-06 17:47:55 +020085 // Name is the cluster name. This must be the same amongst all etcd members within one cluster.
86 Name string
87 // NewCluster selects whether the etcd member will start a new cluster and bootstrap a CA and the first member
88 // certificate, or load existing PKI certificates from disk.
89 NewCluster bool
Serge Bazanskicb883e22020-07-06 17:47:55 +020090 // Port is the port at which this cluster member will listen for other members. If zero, defaults to the global
Serge Bazanski662b5b32020-12-21 13:49:00 +010091 // Metropolis setting.
Serge Bazanskicb883e22020-07-06 17:47:55 +020092 Port int
Serge Bazanski34fe8c62021-03-16 13:20:09 +010093
Serge Bazanski42e61c62021-03-18 15:07:18 +010094 // externalHost is used by tests to override the address at which etcd should listen for peer connections.
95 externalHost string
Serge Bazanskicb883e22020-07-06 17:47:55 +020096}
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020097
Serge Bazanskicb883e22020-07-06 17:47:55 +020098func New(config Config) *Service {
99 return &Service{
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200100 config: &config,
101 }
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200102}
103
Serge Bazanskicb883e22020-07-06 17:47:55 +0200104// configure transforms the service configuration into an embedded etcd configuration. This is pure and side effect
105// free.
106func (s *Service) configure(ctx context.Context) (*embed.Config, error) {
107 if err := s.config.Ephemeral.MkdirAll(0700); err != nil {
108 return nil, fmt.Errorf("failed to create ephemeral directory: %w", err)
109 }
110 if err := s.config.Data.MkdirAll(0700); err != nil {
111 return nil, fmt.Errorf("failed to create data directory: %w", err)
112 }
Lorenz Brun52f7f292020-06-24 16:42:02 +0200113
Serge Bazanski34fe8c62021-03-16 13:20:09 +0100114 if s.config.Name == "" {
115 return nil, fmt.Errorf("Name not set")
116 }
Serge Bazanskicb883e22020-07-06 17:47:55 +0200117 port := s.config.Port
118 if port == 0 {
Serge Bazanski549b72b2021-01-07 14:54:19 +0100119 port = node.ConsensusPort
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200120 }
121
122 cfg := embed.NewConfig()
123
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200124 cfg.Name = s.config.Name
Serge Bazanskicb883e22020-07-06 17:47:55 +0200125 cfg.Dir = s.config.Data.Data.FullPath()
126 cfg.InitialClusterToken = DefaultClusterToken
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200127
Serge Bazanskicb883e22020-07-06 17:47:55 +0200128 cfg.PeerTLSInfo.CertFile = s.config.Data.PeerPKI.Certificate.FullPath()
129 cfg.PeerTLSInfo.KeyFile = s.config.Data.PeerPKI.Key.FullPath()
130 cfg.PeerTLSInfo.TrustedCAFile = s.config.Data.PeerPKI.CACertificate.FullPath()
131 cfg.PeerTLSInfo.ClientCertAuth = true
132 cfg.PeerTLSInfo.CRLFile = s.config.Data.PeerCRL.FullPath()
133
134 cfg.LCUrls = []url.URL{{
135 Scheme: "unix",
136 Path: s.config.Ephemeral.ClientSocket.FullPath() + ":0",
137 }}
138 cfg.ACUrls = []url.URL{}
139 cfg.LPUrls = []url.URL{{
140 Scheme: "https",
Serge Bazanski34fe8c62021-03-16 13:20:09 +0100141 Host: fmt.Sprintf("[::]:%d", port),
Serge Bazanskicb883e22020-07-06 17:47:55 +0200142 }}
Serge Bazanski34fe8c62021-03-16 13:20:09 +0100143
144 // Always listen on the address pointed to by our name - unless running in
145 // tests, where we can't control our hostname easily.
Serge Bazanski42e61c62021-03-18 15:07:18 +0100146 externalHost := fmt.Sprintf("%s:%d", s.config.Name, port)
147 if s.config.externalHost != "" {
148 externalHost = fmt.Sprintf("%s:%d", s.config.externalHost, port)
Serge Bazanski34fe8c62021-03-16 13:20:09 +0100149 }
Serge Bazanskicb883e22020-07-06 17:47:55 +0200150 cfg.APUrls = []url.URL{{
151 Scheme: "https",
Serge Bazanski42e61c62021-03-18 15:07:18 +0100152 Host: externalHost,
Serge Bazanskicb883e22020-07-06 17:47:55 +0200153 }}
154
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200155 if s.config.NewCluster {
156 cfg.ClusterState = "new"
157 cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
Serge Bazanski34fe8c62021-03-16 13:20:09 +0100158 } else {
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200159 cfg.ClusterState = "existing"
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200160 }
161
Serge Bazanskic7359672020-10-30 16:38:57 +0100162 // TODO(q3k): pipe logs from etcd to supervisor.RawLogger via a file.
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200163 cfg.Logger = DefaultLogger
Serge Bazanskic7359672020-10-30 16:38:57 +0100164 cfg.LogOutputs = []string{"stderr"}
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200165
Serge Bazanskicb883e22020-07-06 17:47:55 +0200166 return cfg, nil
167}
168
169// Run is a Supervisor runnable that starts the etcd member service. It will become healthy once the member joins the
170// cluster successfully.
171func (s *Service) Run(ctx context.Context) error {
172 st := &state{
173 ready: *atomic.NewBool(false),
174 }
175 s.stateMu.Lock()
176 s.state = st
177 s.stateMu.Unlock()
178
179 if s.config.NewCluster {
Serge Bazanski3ea1a3a2021-03-16 13:17:33 +0100180 // Create certificate if absent. It can only be present if we attempt
181 // to re-start the service in NewCluster after a failure. This can
182 // happen if etcd crashed or failed to start up before (eg. because of
183 // networking not having settled yet).
Serge Bazanskicb883e22020-07-06 17:47:55 +0200184 absent, err := s.config.Data.PeerPKI.AllAbsent()
185 if err != nil {
186 return fmt.Errorf("checking certificate existence: %w", err)
187 }
Serge Bazanskicb883e22020-07-06 17:47:55 +0200188
Serge Bazanski3ea1a3a2021-03-16 13:17:33 +0100189 if absent {
190 // Generate CA, keep in memory, write it down in etcd later.
191 st.ca, err = ca.New("Metropolis etcd peer Root CA")
192 if err != nil {
193 return fmt.Errorf("when creating new cluster's peer CA: %w", err)
194 }
Serge Bazanskicb883e22020-07-06 17:47:55 +0200195
Serge Bazanski34fe8c62021-03-16 13:20:09 +0100196 cert, key, err := st.ca.Issue(ctx, nil, s.config.Name)
Serge Bazanski3ea1a3a2021-03-16 13:17:33 +0100197 if err != nil {
198 return fmt.Errorf("when issuing new cluster's first certificate: %w", err)
199 }
Serge Bazanskicb883e22020-07-06 17:47:55 +0200200
Serge Bazanski3ea1a3a2021-03-16 13:17:33 +0100201 if err := s.config.Data.PeerPKI.MkdirAll(0700); err != nil {
202 return fmt.Errorf("when creating PKI directory: %w", err)
203 }
204 if err := s.config.Data.PeerPKI.CACertificate.Write(pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: st.ca.CACertRaw}), 0600); err != nil {
205 return fmt.Errorf("when writing CA certificate to disk: %w", err)
206 }
207 if err := s.config.Data.PeerPKI.Certificate.Write(pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: cert}), 0600); err != nil {
208 return fmt.Errorf("when writing certificate to disk: %w", err)
209 }
210 if err := s.config.Data.PeerPKI.Key.Write(pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: key}), 0600); err != nil {
211 return fmt.Errorf("when writing certificate to disk: %w", err)
212 }
Serge Bazanskicb883e22020-07-06 17:47:55 +0200213 }
214 }
215
Serge Bazanski3ea1a3a2021-03-16 13:17:33 +0100216 // Expect certificate to be present on disk.
217 present, err := s.config.Data.PeerPKI.AllExist()
218 if err != nil {
219 return fmt.Errorf("checking certificate existence: %w", err)
220 }
221 if !present {
222 return fmt.Errorf("etcd starting without fully ready certificates - aborted NewCluster or corrupted local storage?")
Serge Bazanskicb883e22020-07-06 17:47:55 +0200223 }
224
225 cfg, err := s.configure(ctx)
226 if err != nil {
227 return fmt.Errorf("when configuring etcd: %w", err)
228 }
229
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200230 server, err := embed.StartEtcd(cfg)
Serge Bazanskicb883e22020-07-06 17:47:55 +0200231 keep := false
232 defer func() {
233 if !keep && server != nil {
234 server.Close()
235 }
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200236 }()
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100237 if err != nil {
Serge Bazanskicb883e22020-07-06 17:47:55 +0200238 return fmt.Errorf("failed to start etcd: %w", err)
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100239 }
Serge Bazanskicb883e22020-07-06 17:47:55 +0200240 st.etcd = server
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100241
Serge Bazanskicb883e22020-07-06 17:47:55 +0200242 okay := true
243 select {
244 case <-st.etcd.Server.ReadyNotify():
245 case <-ctx.Done():
246 okay = false
Lorenz Brun52f7f292020-06-24 16:42:02 +0200247 }
248
Serge Bazanskicb883e22020-07-06 17:47:55 +0200249 if !okay {
250 supervisor.Logger(ctx).Info("context done, aborting wait")
251 return ctx.Err()
Lorenz Brun52f7f292020-06-24 16:42:02 +0200252 }
253
Serge Bazanskicb883e22020-07-06 17:47:55 +0200254 socket := s.config.Ephemeral.ClientSocket.FullPath()
255 cl, err := clientv3.New(clientv3.Config{
256 Endpoints: []string{fmt.Sprintf("unix://%s:0", socket)},
257 DialTimeout: time.Second,
Lorenz Brun52f7f292020-06-24 16:42:02 +0200258 })
259 if err != nil {
Serge Bazanskicb883e22020-07-06 17:47:55 +0200260 return fmt.Errorf("failed to connect to new etcd instance: %w", err)
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100261 }
Serge Bazanskicb883e22020-07-06 17:47:55 +0200262 st.cl = cl
263
264 if s.config.NewCluster {
265 if st.ca == nil {
266 panic("peerCA has not been generated")
267 }
268
269 // Save new CA into etcd.
270 err = st.ca.Save(ctx, cl.KV)
271 if err != nil {
272 return fmt.Errorf("failed to save new CA to etcd: %w", err)
273 }
274 } else {
275 // Load existing CA from etcd.
276 st.ca, err = ca.Load(ctx, cl.KV)
277 if err != nil {
278 return fmt.Errorf("failed to load CA from etcd: %w", err)
279 }
280 }
281
282 // Start CRL watcher.
283 if err := supervisor.Run(ctx, "crl", s.watchCRL); err != nil {
284 return fmt.Errorf("failed to start CRL watcher: %w", err)
285 }
286 // Start autopromoter.
287 if err := supervisor.Run(ctx, "autopromoter", s.autopromoter); err != nil {
288 return fmt.Errorf("failed to start autopromoter: %w", err)
289 }
290
291 supervisor.Logger(ctx).Info("etcd is now ready")
292 keep = true
293 st.ready.Store(true)
294 supervisor.Signal(ctx, supervisor.SignalHealthy)
295
296 <-ctx.Done()
297 st.etcd.Close()
298 return ctx.Err()
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100299}
300
Serge Bazanskicb883e22020-07-06 17:47:55 +0200301// watchCRL is a sub-runnable of the etcd cluster member service that updates the on-local-storage CRL to match the
302// newest available version in etcd.
303func (s *Service) watchCRL(ctx context.Context) error {
304 s.stateMu.Lock()
305 cl := s.state.cl
306 ca := s.state.ca
307 s.stateMu.Unlock()
308
309 supervisor.Signal(ctx, supervisor.SignalHealthy)
310 for e := range ca.WaitCRLChange(ctx, cl.KV, cl.Watcher) {
311 if e.Err != nil {
312 return fmt.Errorf("watching CRL: %w", e.Err)
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100313 }
Serge Bazanskicb883e22020-07-06 17:47:55 +0200314
315 if err := s.config.Data.PeerCRL.Write(e.CRL, 0600); err != nil {
316 return fmt.Errorf("saving CRL: %w", err)
317 }
318 }
319
320 // unreachable
321 return nil
322}
323
324func (s *Service) autopromoter(ctx context.Context) error {
325 t := time.NewTicker(5 * time.Second)
326 defer t.Stop()
327
328 autopromote := func() {
329 s.stateMu.Lock()
330 st := s.state
331 s.stateMu.Unlock()
332
333 if st.etcd.Server.Leader() != st.etcd.Server.ID() {
334 return
335 }
336
337 for _, member := range st.etcd.Server.Cluster().Members() {
338 if !member.IsLearner {
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100339 continue
340 }
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100341
Serge Bazanskicb883e22020-07-06 17:47:55 +0200342 // We always call PromoteMember since the metadata necessary to decide if we should is private.
343 // Luckily etcd already does sanity checks internally and will refuse to promote nodes that aren't
344 // connected or are still behind on transactions.
345 if _, err := st.etcd.Server.PromoteMember(ctx, uint64(member.ID)); err != nil {
Serge Bazanskic7359672020-10-30 16:38:57 +0100346 supervisor.Logger(ctx).Infof("Failed to promote consensus node %s: %v", member.Name, err)
Serge Bazanskicb883e22020-07-06 17:47:55 +0200347 } else {
Serge Bazanskic7359672020-10-30 16:38:57 +0100348 supervisor.Logger(ctx).Infof("Promoted new consensus node %s", member.Name)
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100349 }
350 }
351 }
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100352
Serge Bazanskicb883e22020-07-06 17:47:55 +0200353 for {
354 select {
355 case <-ctx.Done():
356 return ctx.Err()
357 case <-t.C:
358 autopromote()
Lorenz Brun52f7f292020-06-24 16:42:02 +0200359 }
360 }
361}
362
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200363// IsReady returns whether etcd is ready and synced
364func (s *Service) IsReady() bool {
Serge Bazanskicb883e22020-07-06 17:47:55 +0200365 s.stateMu.Lock()
366 defer s.stateMu.Unlock()
367 if s.state == nil {
368 return false
369 }
370 return s.state.ready.Load()
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200371}
372
Serge Bazanskicb883e22020-07-06 17:47:55 +0200373func (s *Service) WaitReady(ctx context.Context) error {
374 // TODO(q3k): reimplement the atomic ready flag as an event synchronization mechanism
375 if s.IsReady() {
376 return nil
377 }
378 t := time.NewTicker(100 * time.Millisecond)
379 defer t.Stop()
380 for {
381 select {
382 case <-ctx.Done():
383 return ctx.Err()
384 case <-t.C:
385 if s.IsReady() {
386 return nil
387 }
388 }
389 }
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200390}
391
Serge Bazanskicb883e22020-07-06 17:47:55 +0200392// KV returns and etcd KV client interface to the etcd member/cluster.
393func (s *Service) KV(module, space string) clientv3.KV {
394 s.stateMu.Lock()
395 defer s.stateMu.Unlock()
396 return namespace.NewKV(s.state.cl.KV, fmt.Sprintf("%s:%s", module, space))
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200397}
398
Serge Bazanskicb883e22020-07-06 17:47:55 +0200399func (s *Service) KVRoot() clientv3.KV {
400 s.stateMu.Lock()
401 defer s.stateMu.Unlock()
402 return s.state.cl.KV
403}
404
405func (s *Service) Cluster() clientv3.Cluster {
406 s.stateMu.Lock()
407 defer s.stateMu.Unlock()
408 return s.state.cl.Cluster
409}
410
411// MemberInfo returns information about this etcd cluster member: its ID and name. This will block until this
412// information is available (ie. the cluster status is Ready).
413func (s *Service) MemberInfo(ctx context.Context) (id uint64, name string, err error) {
414 if err = s.WaitReady(ctx); err != nil {
415 err = fmt.Errorf("when waiting for cluster readiness: %w", err)
416 return
417 }
418
419 s.stateMu.Lock()
420 defer s.stateMu.Unlock()
421 id = uint64(s.state.etcd.Server.ID())
422 name = s.config.Name
423 return
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200424}