blob: 2bbe1ae50d799194bff63c91c52234c4f3deedc3 [file] [log] [blame]
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +02001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
Serge Bazanskicb883e22020-07-06 17:47:55 +020017// Package consensus implements a managed etcd cluster member service, with a self-hosted CA system for issuing peer
Serge Bazanski662b5b32020-12-21 13:49:00 +010018// certificates. Currently each Metropolis node runs an etcd member, and connects to the etcd member locally over a
Serge Bazanskicb883e22020-07-06 17:47:55 +020019// domain socket.
20//
21// The service supports two modes of startup:
22// - initializing a new cluster, by bootstrapping the CA in memory, starting a cluster, committing the CA to etcd
23// afterwards, and saving the new node's certificate to local storage
24// - joining an existing cluster, using certificates from local storage and loading the CA from etcd. This flow is also
25// used when the node joins a cluster for the first time (then the certificates required must be provisioned
26// externally before starting the consensus service).
27//
28// Regardless of how the etcd member service was started, the resulting running service is further managed and used
29// in the same way.
30//
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020031package consensus
32
33import (
34 "context"
Lorenz Bruna4ea9d02019-10-31 11:40:30 +010035 "encoding/pem"
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020036 "fmt"
Lorenz Bruna4ea9d02019-10-31 11:40:30 +010037 "net/url"
Serge Bazanskicb883e22020-07-06 17:47:55 +020038 "sync"
Lorenz Bruna4ea9d02019-10-31 11:40:30 +010039 "time"
40
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020041 "go.etcd.io/etcd/clientv3"
42 "go.etcd.io/etcd/clientv3/namespace"
43 "go.etcd.io/etcd/embed"
Lorenz Brunfc5dbc62020-05-28 12:18:07 +020044 "go.uber.org/atomic"
Hendrik Hofstadt8efe51e2020-02-28 12:53:41 +010045
Serge Bazanski31370b02021-01-07 16:31:14 +010046 node "source.monogon.dev/metropolis/node"
47 "source.monogon.dev/metropolis/node/core/consensus/ca"
48 "source.monogon.dev/metropolis/node/core/localstorage"
49 "source.monogon.dev/metropolis/pkg/supervisor"
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020050)
51
52const (
Serge Bazanski662b5b32020-12-21 13:49:00 +010053 DefaultClusterToken = "METROPOLIS"
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020054 DefaultLogger = "zap"
55)
56
Serge Bazanskicb883e22020-07-06 17:47:55 +020057// Service is the etcd cluster member service.
58type Service struct {
59 // The configuration with which the service was started. This is immutable.
60 config *Config
Lorenz Bruna4ea9d02019-10-31 11:40:30 +010061
Serge Bazanskicb883e22020-07-06 17:47:55 +020062 // stateMu guards state. This is locked internally on public methods of Service that require access to state. The
63 // state might be recreated on service restart.
64 stateMu sync.Mutex
65 state *state
66}
Lorenz Brun6e8f69c2019-11-18 10:44:24 +010067
Serge Bazanskicb883e22020-07-06 17:47:55 +020068// state is the runtime state of a running etcd member.
69type state struct {
70 etcd *embed.Etcd
71 ready atomic.Bool
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020072
Serge Bazanskicb883e22020-07-06 17:47:55 +020073 ca *ca.CA
74 // cl is an etcd client that loops back to the localy running etcd server. This runs over the Client unix domain
75 // socket that etcd starts.
76 cl *clientv3.Client
77}
Leopold Schabel68c58752019-11-14 21:00:59 +010078
Serge Bazanskicb883e22020-07-06 17:47:55 +020079type Config struct {
80 // Data directory (persistent, encrypted storage) for etcd.
81 Data *localstorage.DataEtcdDirectory
82 // Ephemeral directory for etcd.
83 Ephemeral *localstorage.EphemeralConsensusDirectory
Leopold Schabel68c58752019-11-14 21:00:59 +010084
Serge Bazanskicb883e22020-07-06 17:47:55 +020085 // Name is the cluster name. This must be the same amongst all etcd members within one cluster.
86 Name string
87 // NewCluster selects whether the etcd member will start a new cluster and bootstrap a CA and the first member
88 // certificate, or load existing PKI certificates from disk.
89 NewCluster bool
Serge Bazanskicb883e22020-07-06 17:47:55 +020090 // Port is the port at which this cluster member will listen for other members. If zero, defaults to the global
Serge Bazanski662b5b32020-12-21 13:49:00 +010091 // Metropolis setting.
Serge Bazanskicb883e22020-07-06 17:47:55 +020092 Port int
Serge Bazanski34fe8c62021-03-16 13:20:09 +010093
94 // ExternalHost is used by tests to override the address at which etcd should listen for peer connections.
95 // TODO(q3k): make this unexported once the new cluster manager logic lands.
96 ExternalHost string
Serge Bazanskicb883e22020-07-06 17:47:55 +020097}
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020098
Serge Bazanskicb883e22020-07-06 17:47:55 +020099func New(config Config) *Service {
100 return &Service{
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200101 config: &config,
102 }
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200103}
104
Serge Bazanskicb883e22020-07-06 17:47:55 +0200105// configure transforms the service configuration into an embedded etcd configuration. This is pure and side effect
106// free.
107func (s *Service) configure(ctx context.Context) (*embed.Config, error) {
108 if err := s.config.Ephemeral.MkdirAll(0700); err != nil {
109 return nil, fmt.Errorf("failed to create ephemeral directory: %w", err)
110 }
111 if err := s.config.Data.MkdirAll(0700); err != nil {
112 return nil, fmt.Errorf("failed to create data directory: %w", err)
113 }
Lorenz Brun52f7f292020-06-24 16:42:02 +0200114
Serge Bazanski34fe8c62021-03-16 13:20:09 +0100115 if s.config.Name == "" {
116 return nil, fmt.Errorf("Name not set")
117 }
Serge Bazanskicb883e22020-07-06 17:47:55 +0200118 port := s.config.Port
119 if port == 0 {
Serge Bazanski549b72b2021-01-07 14:54:19 +0100120 port = node.ConsensusPort
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200121 }
122
123 cfg := embed.NewConfig()
124
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200125 cfg.Name = s.config.Name
Serge Bazanskicb883e22020-07-06 17:47:55 +0200126 cfg.Dir = s.config.Data.Data.FullPath()
127 cfg.InitialClusterToken = DefaultClusterToken
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200128
Serge Bazanskicb883e22020-07-06 17:47:55 +0200129 cfg.PeerTLSInfo.CertFile = s.config.Data.PeerPKI.Certificate.FullPath()
130 cfg.PeerTLSInfo.KeyFile = s.config.Data.PeerPKI.Key.FullPath()
131 cfg.PeerTLSInfo.TrustedCAFile = s.config.Data.PeerPKI.CACertificate.FullPath()
132 cfg.PeerTLSInfo.ClientCertAuth = true
133 cfg.PeerTLSInfo.CRLFile = s.config.Data.PeerCRL.FullPath()
134
135 cfg.LCUrls = []url.URL{{
136 Scheme: "unix",
137 Path: s.config.Ephemeral.ClientSocket.FullPath() + ":0",
138 }}
139 cfg.ACUrls = []url.URL{}
140 cfg.LPUrls = []url.URL{{
141 Scheme: "https",
Serge Bazanski34fe8c62021-03-16 13:20:09 +0100142 Host: fmt.Sprintf("[::]:%d", port),
Serge Bazanskicb883e22020-07-06 17:47:55 +0200143 }}
Serge Bazanski34fe8c62021-03-16 13:20:09 +0100144
145 // Always listen on the address pointed to by our name - unless running in
146 // tests, where we can't control our hostname easily.
147 ExternalHost := fmt.Sprintf("%s:%d", s.config.Name, port)
148 if s.config.ExternalHost != "" {
149 ExternalHost = fmt.Sprintf("%s:%d", s.config.ExternalHost, port)
150 }
Serge Bazanskicb883e22020-07-06 17:47:55 +0200151 cfg.APUrls = []url.URL{{
152 Scheme: "https",
Serge Bazanski34fe8c62021-03-16 13:20:09 +0100153 Host: ExternalHost,
Serge Bazanskicb883e22020-07-06 17:47:55 +0200154 }}
155
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200156 if s.config.NewCluster {
157 cfg.ClusterState = "new"
158 cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
Serge Bazanski34fe8c62021-03-16 13:20:09 +0100159 } else {
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200160 cfg.ClusterState = "existing"
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200161 }
162
Serge Bazanskic7359672020-10-30 16:38:57 +0100163 // TODO(q3k): pipe logs from etcd to supervisor.RawLogger via a file.
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200164 cfg.Logger = DefaultLogger
Serge Bazanskic7359672020-10-30 16:38:57 +0100165 cfg.LogOutputs = []string{"stderr"}
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200166
Serge Bazanskicb883e22020-07-06 17:47:55 +0200167 return cfg, nil
168}
169
170// Run is a Supervisor runnable that starts the etcd member service. It will become healthy once the member joins the
171// cluster successfully.
172func (s *Service) Run(ctx context.Context) error {
173 st := &state{
174 ready: *atomic.NewBool(false),
175 }
176 s.stateMu.Lock()
177 s.state = st
178 s.stateMu.Unlock()
179
180 if s.config.NewCluster {
Serge Bazanski3ea1a3a2021-03-16 13:17:33 +0100181 // Create certificate if absent. It can only be present if we attempt
182 // to re-start the service in NewCluster after a failure. This can
183 // happen if etcd crashed or failed to start up before (eg. because of
184 // networking not having settled yet).
Serge Bazanskicb883e22020-07-06 17:47:55 +0200185 absent, err := s.config.Data.PeerPKI.AllAbsent()
186 if err != nil {
187 return fmt.Errorf("checking certificate existence: %w", err)
188 }
Serge Bazanskicb883e22020-07-06 17:47:55 +0200189
Serge Bazanski3ea1a3a2021-03-16 13:17:33 +0100190 if absent {
191 // Generate CA, keep in memory, write it down in etcd later.
192 st.ca, err = ca.New("Metropolis etcd peer Root CA")
193 if err != nil {
194 return fmt.Errorf("when creating new cluster's peer CA: %w", err)
195 }
Serge Bazanskicb883e22020-07-06 17:47:55 +0200196
Serge Bazanski34fe8c62021-03-16 13:20:09 +0100197 cert, key, err := st.ca.Issue(ctx, nil, s.config.Name)
Serge Bazanski3ea1a3a2021-03-16 13:17:33 +0100198 if err != nil {
199 return fmt.Errorf("when issuing new cluster's first certificate: %w", err)
200 }
Serge Bazanskicb883e22020-07-06 17:47:55 +0200201
Serge Bazanski3ea1a3a2021-03-16 13:17:33 +0100202 if err := s.config.Data.PeerPKI.MkdirAll(0700); err != nil {
203 return fmt.Errorf("when creating PKI directory: %w", err)
204 }
205 if err := s.config.Data.PeerPKI.CACertificate.Write(pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: st.ca.CACertRaw}), 0600); err != nil {
206 return fmt.Errorf("when writing CA certificate to disk: %w", err)
207 }
208 if err := s.config.Data.PeerPKI.Certificate.Write(pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: cert}), 0600); err != nil {
209 return fmt.Errorf("when writing certificate to disk: %w", err)
210 }
211 if err := s.config.Data.PeerPKI.Key.Write(pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: key}), 0600); err != nil {
212 return fmt.Errorf("when writing certificate to disk: %w", err)
213 }
Serge Bazanskicb883e22020-07-06 17:47:55 +0200214 }
215 }
216
Serge Bazanski3ea1a3a2021-03-16 13:17:33 +0100217 // Expect certificate to be present on disk.
218 present, err := s.config.Data.PeerPKI.AllExist()
219 if err != nil {
220 return fmt.Errorf("checking certificate existence: %w", err)
221 }
222 if !present {
223 return fmt.Errorf("etcd starting without fully ready certificates - aborted NewCluster or corrupted local storage?")
Serge Bazanskicb883e22020-07-06 17:47:55 +0200224 }
225
226 cfg, err := s.configure(ctx)
227 if err != nil {
228 return fmt.Errorf("when configuring etcd: %w", err)
229 }
230
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200231 server, err := embed.StartEtcd(cfg)
Serge Bazanskicb883e22020-07-06 17:47:55 +0200232 keep := false
233 defer func() {
234 if !keep && server != nil {
235 server.Close()
236 }
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200237 }()
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100238 if err != nil {
Serge Bazanskicb883e22020-07-06 17:47:55 +0200239 return fmt.Errorf("failed to start etcd: %w", err)
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100240 }
Serge Bazanskicb883e22020-07-06 17:47:55 +0200241 st.etcd = server
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100242
Serge Bazanskicb883e22020-07-06 17:47:55 +0200243 okay := true
244 select {
245 case <-st.etcd.Server.ReadyNotify():
246 case <-ctx.Done():
247 okay = false
Lorenz Brun52f7f292020-06-24 16:42:02 +0200248 }
249
Serge Bazanskicb883e22020-07-06 17:47:55 +0200250 if !okay {
251 supervisor.Logger(ctx).Info("context done, aborting wait")
252 return ctx.Err()
Lorenz Brun52f7f292020-06-24 16:42:02 +0200253 }
254
Serge Bazanskicb883e22020-07-06 17:47:55 +0200255 socket := s.config.Ephemeral.ClientSocket.FullPath()
256 cl, err := clientv3.New(clientv3.Config{
257 Endpoints: []string{fmt.Sprintf("unix://%s:0", socket)},
258 DialTimeout: time.Second,
Lorenz Brun52f7f292020-06-24 16:42:02 +0200259 })
260 if err != nil {
Serge Bazanskicb883e22020-07-06 17:47:55 +0200261 return fmt.Errorf("failed to connect to new etcd instance: %w", err)
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100262 }
Serge Bazanskicb883e22020-07-06 17:47:55 +0200263 st.cl = cl
264
265 if s.config.NewCluster {
266 if st.ca == nil {
267 panic("peerCA has not been generated")
268 }
269
270 // Save new CA into etcd.
271 err = st.ca.Save(ctx, cl.KV)
272 if err != nil {
273 return fmt.Errorf("failed to save new CA to etcd: %w", err)
274 }
275 } else {
276 // Load existing CA from etcd.
277 st.ca, err = ca.Load(ctx, cl.KV)
278 if err != nil {
279 return fmt.Errorf("failed to load CA from etcd: %w", err)
280 }
281 }
282
283 // Start CRL watcher.
284 if err := supervisor.Run(ctx, "crl", s.watchCRL); err != nil {
285 return fmt.Errorf("failed to start CRL watcher: %w", err)
286 }
287 // Start autopromoter.
288 if err := supervisor.Run(ctx, "autopromoter", s.autopromoter); err != nil {
289 return fmt.Errorf("failed to start autopromoter: %w", err)
290 }
291
292 supervisor.Logger(ctx).Info("etcd is now ready")
293 keep = true
294 st.ready.Store(true)
295 supervisor.Signal(ctx, supervisor.SignalHealthy)
296
297 <-ctx.Done()
298 st.etcd.Close()
299 return ctx.Err()
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100300}
301
Serge Bazanskicb883e22020-07-06 17:47:55 +0200302// watchCRL is a sub-runnable of the etcd cluster member service that updates the on-local-storage CRL to match the
303// newest available version in etcd.
304func (s *Service) watchCRL(ctx context.Context) error {
305 s.stateMu.Lock()
306 cl := s.state.cl
307 ca := s.state.ca
308 s.stateMu.Unlock()
309
310 supervisor.Signal(ctx, supervisor.SignalHealthy)
311 for e := range ca.WaitCRLChange(ctx, cl.KV, cl.Watcher) {
312 if e.Err != nil {
313 return fmt.Errorf("watching CRL: %w", e.Err)
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100314 }
Serge Bazanskicb883e22020-07-06 17:47:55 +0200315
316 if err := s.config.Data.PeerCRL.Write(e.CRL, 0600); err != nil {
317 return fmt.Errorf("saving CRL: %w", err)
318 }
319 }
320
321 // unreachable
322 return nil
323}
324
325func (s *Service) autopromoter(ctx context.Context) error {
326 t := time.NewTicker(5 * time.Second)
327 defer t.Stop()
328
329 autopromote := func() {
330 s.stateMu.Lock()
331 st := s.state
332 s.stateMu.Unlock()
333
334 if st.etcd.Server.Leader() != st.etcd.Server.ID() {
335 return
336 }
337
338 for _, member := range st.etcd.Server.Cluster().Members() {
339 if !member.IsLearner {
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100340 continue
341 }
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100342
Serge Bazanskicb883e22020-07-06 17:47:55 +0200343 // We always call PromoteMember since the metadata necessary to decide if we should is private.
344 // Luckily etcd already does sanity checks internally and will refuse to promote nodes that aren't
345 // connected or are still behind on transactions.
346 if _, err := st.etcd.Server.PromoteMember(ctx, uint64(member.ID)); err != nil {
Serge Bazanskic7359672020-10-30 16:38:57 +0100347 supervisor.Logger(ctx).Infof("Failed to promote consensus node %s: %v", member.Name, err)
Serge Bazanskicb883e22020-07-06 17:47:55 +0200348 } else {
Serge Bazanskic7359672020-10-30 16:38:57 +0100349 supervisor.Logger(ctx).Infof("Promoted new consensus node %s", member.Name)
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100350 }
351 }
352 }
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100353
Serge Bazanskicb883e22020-07-06 17:47:55 +0200354 for {
355 select {
356 case <-ctx.Done():
357 return ctx.Err()
358 case <-t.C:
359 autopromote()
Lorenz Brun52f7f292020-06-24 16:42:02 +0200360 }
361 }
362}
363
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200364// IsReady returns whether etcd is ready and synced
365func (s *Service) IsReady() bool {
Serge Bazanskicb883e22020-07-06 17:47:55 +0200366 s.stateMu.Lock()
367 defer s.stateMu.Unlock()
368 if s.state == nil {
369 return false
370 }
371 return s.state.ready.Load()
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200372}
373
Serge Bazanskicb883e22020-07-06 17:47:55 +0200374func (s *Service) WaitReady(ctx context.Context) error {
375 // TODO(q3k): reimplement the atomic ready flag as an event synchronization mechanism
376 if s.IsReady() {
377 return nil
378 }
379 t := time.NewTicker(100 * time.Millisecond)
380 defer t.Stop()
381 for {
382 select {
383 case <-ctx.Done():
384 return ctx.Err()
385 case <-t.C:
386 if s.IsReady() {
387 return nil
388 }
389 }
390 }
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200391}
392
Serge Bazanskicb883e22020-07-06 17:47:55 +0200393// KV returns and etcd KV client interface to the etcd member/cluster.
394func (s *Service) KV(module, space string) clientv3.KV {
395 s.stateMu.Lock()
396 defer s.stateMu.Unlock()
397 return namespace.NewKV(s.state.cl.KV, fmt.Sprintf("%s:%s", module, space))
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200398}
399
Serge Bazanskicb883e22020-07-06 17:47:55 +0200400func (s *Service) KVRoot() clientv3.KV {
401 s.stateMu.Lock()
402 defer s.stateMu.Unlock()
403 return s.state.cl.KV
404}
405
406func (s *Service) Cluster() clientv3.Cluster {
407 s.stateMu.Lock()
408 defer s.stateMu.Unlock()
409 return s.state.cl.Cluster
410}
411
412// MemberInfo returns information about this etcd cluster member: its ID and name. This will block until this
413// information is available (ie. the cluster status is Ready).
414func (s *Service) MemberInfo(ctx context.Context) (id uint64, name string, err error) {
415 if err = s.WaitReady(ctx); err != nil {
416 err = fmt.Errorf("when waiting for cluster readiness: %w", err)
417 return
418 }
419
420 s.stateMu.Lock()
421 defer s.stateMu.Unlock()
422 id = uint64(s.state.etcd.Server.ID())
423 name = s.config.Name
424 return
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200425}