blob: 269ff7eb0befe7f5c128455448e9c5ca97365b9f [file] [log] [blame]
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +02001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
Serge Bazanskicb883e22020-07-06 17:47:55 +020017// Package consensus implements a managed etcd cluster member service, with a self-hosted CA system for issuing peer
Serge Bazanski662b5b32020-12-21 13:49:00 +010018// certificates. Currently each Metropolis node runs an etcd member, and connects to the etcd member locally over a
Serge Bazanskicb883e22020-07-06 17:47:55 +020019// domain socket.
20//
21// The service supports two modes of startup:
22// - initializing a new cluster, by bootstrapping the CA in memory, starting a cluster, committing the CA to etcd
23// afterwards, and saving the new node's certificate to local storage
24// - joining an existing cluster, using certificates from local storage and loading the CA from etcd. This flow is also
25// used when the node joins a cluster for the first time (then the certificates required must be provisioned
26// externally before starting the consensus service).
27//
28// Regardless of how the etcd member service was started, the resulting running service is further managed and used
29// in the same way.
30//
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020031package consensus
32
33import (
34 "context"
Lorenz Bruna4ea9d02019-10-31 11:40:30 +010035 "encoding/pem"
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020036 "fmt"
Lorenz Brun52f7f292020-06-24 16:42:02 +020037 "net"
Lorenz Bruna4ea9d02019-10-31 11:40:30 +010038 "net/url"
Serge Bazanskicb883e22020-07-06 17:47:55 +020039 "sync"
Lorenz Bruna4ea9d02019-10-31 11:40:30 +010040 "time"
41
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020042 "go.etcd.io/etcd/clientv3"
43 "go.etcd.io/etcd/clientv3/namespace"
44 "go.etcd.io/etcd/embed"
Lorenz Brunfc5dbc62020-05-28 12:18:07 +020045 "go.uber.org/atomic"
Hendrik Hofstadt8efe51e2020-02-28 12:53:41 +010046
Serge Bazanski31370b02021-01-07 16:31:14 +010047 node "source.monogon.dev/metropolis/node"
48 "source.monogon.dev/metropolis/node/core/consensus/ca"
49 "source.monogon.dev/metropolis/node/core/localstorage"
50 "source.monogon.dev/metropolis/pkg/supervisor"
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020051)
52
53const (
Serge Bazanski662b5b32020-12-21 13:49:00 +010054 DefaultClusterToken = "METROPOLIS"
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020055 DefaultLogger = "zap"
56)
57
Serge Bazanskicb883e22020-07-06 17:47:55 +020058// Service is the etcd cluster member service.
59type Service struct {
60 // The configuration with which the service was started. This is immutable.
61 config *Config
Lorenz Bruna4ea9d02019-10-31 11:40:30 +010062
Serge Bazanskicb883e22020-07-06 17:47:55 +020063 // stateMu guards state. This is locked internally on public methods of Service that require access to state. The
64 // state might be recreated on service restart.
65 stateMu sync.Mutex
66 state *state
67}
Lorenz Brun6e8f69c2019-11-18 10:44:24 +010068
Serge Bazanskicb883e22020-07-06 17:47:55 +020069// state is the runtime state of a running etcd member.
70type state struct {
71 etcd *embed.Etcd
72 ready atomic.Bool
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020073
Serge Bazanskicb883e22020-07-06 17:47:55 +020074 ca *ca.CA
75 // cl is an etcd client that loops back to the localy running etcd server. This runs over the Client unix domain
76 // socket that etcd starts.
77 cl *clientv3.Client
78}
Leopold Schabel68c58752019-11-14 21:00:59 +010079
Serge Bazanskicb883e22020-07-06 17:47:55 +020080type Config struct {
81 // Data directory (persistent, encrypted storage) for etcd.
82 Data *localstorage.DataEtcdDirectory
83 // Ephemeral directory for etcd.
84 Ephemeral *localstorage.EphemeralConsensusDirectory
Leopold Schabel68c58752019-11-14 21:00:59 +010085
Serge Bazanskicb883e22020-07-06 17:47:55 +020086 // Name is the cluster name. This must be the same amongst all etcd members within one cluster.
87 Name string
88 // NewCluster selects whether the etcd member will start a new cluster and bootstrap a CA and the first member
89 // certificate, or load existing PKI certificates from disk.
90 NewCluster bool
91 // InitialCluster sets the initial cluster peer URLs when NewCluster is set, and is ignored otherwise. Usually this
92 // will be just the new, single server, and more members will be added later.
93 InitialCluster string
94 // ExternalHost is the IP address or hostname at which this cluster member is reachable to other cluster members.
95 ExternalHost string
96 // ListenHost is the IP address or hostname at which this cluster member will listen.
97 ListenHost string
98 // Port is the port at which this cluster member will listen for other members. If zero, defaults to the global
Serge Bazanski662b5b32020-12-21 13:49:00 +010099 // Metropolis setting.
Serge Bazanskicb883e22020-07-06 17:47:55 +0200100 Port int
101}
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200102
Serge Bazanskicb883e22020-07-06 17:47:55 +0200103func New(config Config) *Service {
104 return &Service{
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200105 config: &config,
106 }
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200107}
108
Serge Bazanskicb883e22020-07-06 17:47:55 +0200109// configure transforms the service configuration into an embedded etcd configuration. This is pure and side effect
110// free.
111func (s *Service) configure(ctx context.Context) (*embed.Config, error) {
112 if err := s.config.Ephemeral.MkdirAll(0700); err != nil {
113 return nil, fmt.Errorf("failed to create ephemeral directory: %w", err)
114 }
115 if err := s.config.Data.MkdirAll(0700); err != nil {
116 return nil, fmt.Errorf("failed to create data directory: %w", err)
117 }
Lorenz Brun52f7f292020-06-24 16:42:02 +0200118
Serge Bazanskicb883e22020-07-06 17:47:55 +0200119 port := s.config.Port
120 if port == 0 {
Serge Bazanski549b72b2021-01-07 14:54:19 +0100121 port = node.ConsensusPort
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200122 }
123
124 cfg := embed.NewConfig()
125
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200126 cfg.Name = s.config.Name
Serge Bazanskicb883e22020-07-06 17:47:55 +0200127 cfg.Dir = s.config.Data.Data.FullPath()
128 cfg.InitialClusterToken = DefaultClusterToken
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200129
Serge Bazanskicb883e22020-07-06 17:47:55 +0200130 cfg.PeerTLSInfo.CertFile = s.config.Data.PeerPKI.Certificate.FullPath()
131 cfg.PeerTLSInfo.KeyFile = s.config.Data.PeerPKI.Key.FullPath()
132 cfg.PeerTLSInfo.TrustedCAFile = s.config.Data.PeerPKI.CACertificate.FullPath()
133 cfg.PeerTLSInfo.ClientCertAuth = true
134 cfg.PeerTLSInfo.CRLFile = s.config.Data.PeerCRL.FullPath()
135
136 cfg.LCUrls = []url.URL{{
137 Scheme: "unix",
138 Path: s.config.Ephemeral.ClientSocket.FullPath() + ":0",
139 }}
140 cfg.ACUrls = []url.URL{}
141 cfg.LPUrls = []url.URL{{
142 Scheme: "https",
143 Host: fmt.Sprintf("%s:%d", s.config.ListenHost, port),
144 }}
145 cfg.APUrls = []url.URL{{
146 Scheme: "https",
147 Host: fmt.Sprintf("%s:%d", s.config.ExternalHost, port),
148 }}
149
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200150 if s.config.NewCluster {
151 cfg.ClusterState = "new"
152 cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
153 } else if s.config.InitialCluster != "" {
154 cfg.ClusterState = "existing"
155 cfg.InitialCluster = s.config.InitialCluster
156 }
157
Serge Bazanskic7359672020-10-30 16:38:57 +0100158 // TODO(q3k): pipe logs from etcd to supervisor.RawLogger via a file.
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200159 cfg.Logger = DefaultLogger
Serge Bazanskic7359672020-10-30 16:38:57 +0100160 cfg.LogOutputs = []string{"stderr"}
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200161
Serge Bazanskicb883e22020-07-06 17:47:55 +0200162 return cfg, nil
163}
164
165// Run is a Supervisor runnable that starts the etcd member service. It will become healthy once the member joins the
166// cluster successfully.
167func (s *Service) Run(ctx context.Context) error {
168 st := &state{
169 ready: *atomic.NewBool(false),
170 }
171 s.stateMu.Lock()
172 s.state = st
173 s.stateMu.Unlock()
174
175 if s.config.NewCluster {
Serge Bazanski3ea1a3a2021-03-16 13:17:33 +0100176 // Create certificate if absent. It can only be present if we attempt
177 // to re-start the service in NewCluster after a failure. This can
178 // happen if etcd crashed or failed to start up before (eg. because of
179 // networking not having settled yet).
Serge Bazanskicb883e22020-07-06 17:47:55 +0200180 absent, err := s.config.Data.PeerPKI.AllAbsent()
181 if err != nil {
182 return fmt.Errorf("checking certificate existence: %w", err)
183 }
Serge Bazanskicb883e22020-07-06 17:47:55 +0200184
Serge Bazanski3ea1a3a2021-03-16 13:17:33 +0100185 if absent {
186 // Generate CA, keep in memory, write it down in etcd later.
187 st.ca, err = ca.New("Metropolis etcd peer Root CA")
188 if err != nil {
189 return fmt.Errorf("when creating new cluster's peer CA: %w", err)
190 }
Serge Bazanskicb883e22020-07-06 17:47:55 +0200191
Serge Bazanski3ea1a3a2021-03-16 13:17:33 +0100192 ip := net.ParseIP(s.config.ExternalHost)
193 if ip == nil {
194 return fmt.Errorf("configued external host is not an IP address (got %q)", s.config.ExternalHost)
195 }
Serge Bazanskicb883e22020-07-06 17:47:55 +0200196
Serge Bazanski3ea1a3a2021-03-16 13:17:33 +0100197 cert, key, err := st.ca.Issue(ctx, nil, s.config.Name, ip)
198 if err != nil {
199 return fmt.Errorf("when issuing new cluster's first certificate: %w", err)
200 }
Serge Bazanskicb883e22020-07-06 17:47:55 +0200201
Serge Bazanski3ea1a3a2021-03-16 13:17:33 +0100202 if err := s.config.Data.PeerPKI.MkdirAll(0700); err != nil {
203 return fmt.Errorf("when creating PKI directory: %w", err)
204 }
205 if err := s.config.Data.PeerPKI.CACertificate.Write(pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: st.ca.CACertRaw}), 0600); err != nil {
206 return fmt.Errorf("when writing CA certificate to disk: %w", err)
207 }
208 if err := s.config.Data.PeerPKI.Certificate.Write(pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: cert}), 0600); err != nil {
209 return fmt.Errorf("when writing certificate to disk: %w", err)
210 }
211 if err := s.config.Data.PeerPKI.Key.Write(pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: key}), 0600); err != nil {
212 return fmt.Errorf("when writing certificate to disk: %w", err)
213 }
Serge Bazanskicb883e22020-07-06 17:47:55 +0200214 }
215 }
216
Serge Bazanski3ea1a3a2021-03-16 13:17:33 +0100217 // Expect certificate to be present on disk.
218 present, err := s.config.Data.PeerPKI.AllExist()
219 if err != nil {
220 return fmt.Errorf("checking certificate existence: %w", err)
221 }
222 if !present {
223 return fmt.Errorf("etcd starting without fully ready certificates - aborted NewCluster or corrupted local storage?")
Serge Bazanskicb883e22020-07-06 17:47:55 +0200224 }
225
226 cfg, err := s.configure(ctx)
227 if err != nil {
228 return fmt.Errorf("when configuring etcd: %w", err)
229 }
230
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200231 server, err := embed.StartEtcd(cfg)
Serge Bazanskicb883e22020-07-06 17:47:55 +0200232 keep := false
233 defer func() {
234 if !keep && server != nil {
235 server.Close()
236 }
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200237 }()
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100238 if err != nil {
Serge Bazanskicb883e22020-07-06 17:47:55 +0200239 return fmt.Errorf("failed to start etcd: %w", err)
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100240 }
Serge Bazanskicb883e22020-07-06 17:47:55 +0200241 st.etcd = server
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100242
Serge Bazanskicb883e22020-07-06 17:47:55 +0200243 supervisor.Logger(ctx).Info("waiting for etcd...")
Leopold Schabel68c58752019-11-14 21:00:59 +0100244
Serge Bazanskicb883e22020-07-06 17:47:55 +0200245 okay := true
246 select {
247 case <-st.etcd.Server.ReadyNotify():
248 case <-ctx.Done():
249 okay = false
Lorenz Brun52f7f292020-06-24 16:42:02 +0200250 }
251
Serge Bazanskicb883e22020-07-06 17:47:55 +0200252 if !okay {
253 supervisor.Logger(ctx).Info("context done, aborting wait")
254 return ctx.Err()
Lorenz Brun52f7f292020-06-24 16:42:02 +0200255 }
256
Serge Bazanskicb883e22020-07-06 17:47:55 +0200257 socket := s.config.Ephemeral.ClientSocket.FullPath()
258 cl, err := clientv3.New(clientv3.Config{
259 Endpoints: []string{fmt.Sprintf("unix://%s:0", socket)},
260 DialTimeout: time.Second,
Lorenz Brun52f7f292020-06-24 16:42:02 +0200261 })
262 if err != nil {
Serge Bazanskicb883e22020-07-06 17:47:55 +0200263 return fmt.Errorf("failed to connect to new etcd instance: %w", err)
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100264 }
Serge Bazanskicb883e22020-07-06 17:47:55 +0200265 st.cl = cl
266
267 if s.config.NewCluster {
268 if st.ca == nil {
269 panic("peerCA has not been generated")
270 }
271
272 // Save new CA into etcd.
273 err = st.ca.Save(ctx, cl.KV)
274 if err != nil {
275 return fmt.Errorf("failed to save new CA to etcd: %w", err)
276 }
277 } else {
278 // Load existing CA from etcd.
279 st.ca, err = ca.Load(ctx, cl.KV)
280 if err != nil {
281 return fmt.Errorf("failed to load CA from etcd: %w", err)
282 }
283 }
284
285 // Start CRL watcher.
286 if err := supervisor.Run(ctx, "crl", s.watchCRL); err != nil {
287 return fmt.Errorf("failed to start CRL watcher: %w", err)
288 }
289 // Start autopromoter.
290 if err := supervisor.Run(ctx, "autopromoter", s.autopromoter); err != nil {
291 return fmt.Errorf("failed to start autopromoter: %w", err)
292 }
293
294 supervisor.Logger(ctx).Info("etcd is now ready")
295 keep = true
296 st.ready.Store(true)
297 supervisor.Signal(ctx, supervisor.SignalHealthy)
298
299 <-ctx.Done()
300 st.etcd.Close()
301 return ctx.Err()
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100302}
303
Serge Bazanskicb883e22020-07-06 17:47:55 +0200304// watchCRL is a sub-runnable of the etcd cluster member service that updates the on-local-storage CRL to match the
305// newest available version in etcd.
306func (s *Service) watchCRL(ctx context.Context) error {
307 s.stateMu.Lock()
308 cl := s.state.cl
309 ca := s.state.ca
310 s.stateMu.Unlock()
311
312 supervisor.Signal(ctx, supervisor.SignalHealthy)
313 for e := range ca.WaitCRLChange(ctx, cl.KV, cl.Watcher) {
314 if e.Err != nil {
315 return fmt.Errorf("watching CRL: %w", e.Err)
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100316 }
Serge Bazanskicb883e22020-07-06 17:47:55 +0200317
318 if err := s.config.Data.PeerCRL.Write(e.CRL, 0600); err != nil {
319 return fmt.Errorf("saving CRL: %w", err)
320 }
321 }
322
323 // unreachable
324 return nil
325}
326
327func (s *Service) autopromoter(ctx context.Context) error {
328 t := time.NewTicker(5 * time.Second)
329 defer t.Stop()
330
331 autopromote := func() {
332 s.stateMu.Lock()
333 st := s.state
334 s.stateMu.Unlock()
335
336 if st.etcd.Server.Leader() != st.etcd.Server.ID() {
337 return
338 }
339
340 for _, member := range st.etcd.Server.Cluster().Members() {
341 if !member.IsLearner {
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100342 continue
343 }
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100344
Serge Bazanskicb883e22020-07-06 17:47:55 +0200345 // We always call PromoteMember since the metadata necessary to decide if we should is private.
346 // Luckily etcd already does sanity checks internally and will refuse to promote nodes that aren't
347 // connected or are still behind on transactions.
348 if _, err := st.etcd.Server.PromoteMember(ctx, uint64(member.ID)); err != nil {
Serge Bazanskic7359672020-10-30 16:38:57 +0100349 supervisor.Logger(ctx).Infof("Failed to promote consensus node %s: %v", member.Name, err)
Serge Bazanskicb883e22020-07-06 17:47:55 +0200350 } else {
Serge Bazanskic7359672020-10-30 16:38:57 +0100351 supervisor.Logger(ctx).Infof("Promoted new consensus node %s", member.Name)
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100352 }
353 }
354 }
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100355
Serge Bazanskicb883e22020-07-06 17:47:55 +0200356 for {
357 select {
358 case <-ctx.Done():
359 return ctx.Err()
360 case <-t.C:
361 autopromote()
Lorenz Brun52f7f292020-06-24 16:42:02 +0200362 }
363 }
364}
365
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200366// IsReady returns whether etcd is ready and synced
367func (s *Service) IsReady() bool {
Serge Bazanskicb883e22020-07-06 17:47:55 +0200368 s.stateMu.Lock()
369 defer s.stateMu.Unlock()
370 if s.state == nil {
371 return false
372 }
373 return s.state.ready.Load()
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200374}
375
Serge Bazanskicb883e22020-07-06 17:47:55 +0200376func (s *Service) WaitReady(ctx context.Context) error {
377 // TODO(q3k): reimplement the atomic ready flag as an event synchronization mechanism
378 if s.IsReady() {
379 return nil
380 }
381 t := time.NewTicker(100 * time.Millisecond)
382 defer t.Stop()
383 for {
384 select {
385 case <-ctx.Done():
386 return ctx.Err()
387 case <-t.C:
388 if s.IsReady() {
389 return nil
390 }
391 }
392 }
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200393}
394
Serge Bazanskicb883e22020-07-06 17:47:55 +0200395// KV returns and etcd KV client interface to the etcd member/cluster.
396func (s *Service) KV(module, space string) clientv3.KV {
397 s.stateMu.Lock()
398 defer s.stateMu.Unlock()
399 return namespace.NewKV(s.state.cl.KV, fmt.Sprintf("%s:%s", module, space))
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200400}
401
Serge Bazanskicb883e22020-07-06 17:47:55 +0200402func (s *Service) KVRoot() clientv3.KV {
403 s.stateMu.Lock()
404 defer s.stateMu.Unlock()
405 return s.state.cl.KV
406}
407
408func (s *Service) Cluster() clientv3.Cluster {
409 s.stateMu.Lock()
410 defer s.stateMu.Unlock()
411 return s.state.cl.Cluster
412}
413
414// MemberInfo returns information about this etcd cluster member: its ID and name. This will block until this
415// information is available (ie. the cluster status is Ready).
416func (s *Service) MemberInfo(ctx context.Context) (id uint64, name string, err error) {
417 if err = s.WaitReady(ctx); err != nil {
418 err = fmt.Errorf("when waiting for cluster readiness: %w", err)
419 return
420 }
421
422 s.stateMu.Lock()
423 defer s.stateMu.Unlock()
424 id = uint64(s.state.etcd.Server.ID())
425 name = s.config.Name
426 return
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200427}