blob: f0a2e2242e02394e446f76c6e7a6415c67b2b4cc [file] [log] [blame]
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +02001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
Serge Bazanskicb883e22020-07-06 17:47:55 +020017// Package consensus implements a managed etcd cluster member service, with a self-hosted CA system for issuing peer
Serge Bazanski662b5b32020-12-21 13:49:00 +010018// certificates. Currently each Metropolis node runs an etcd member, and connects to the etcd member locally over a
Serge Bazanskicb883e22020-07-06 17:47:55 +020019// domain socket.
20//
21// The service supports two modes of startup:
22// - initializing a new cluster, by bootstrapping the CA in memory, starting a cluster, committing the CA to etcd
23// afterwards, and saving the new node's certificate to local storage
24// - joining an existing cluster, using certificates from local storage and loading the CA from etcd. This flow is also
25// used when the node joins a cluster for the first time (then the certificates required must be provisioned
26// externally before starting the consensus service).
27//
28// Regardless of how the etcd member service was started, the resulting running service is further managed and used
29// in the same way.
30//
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020031package consensus
32
33import (
34 "context"
Lorenz Bruna4ea9d02019-10-31 11:40:30 +010035 "encoding/pem"
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020036 "fmt"
Lorenz Brun52f7f292020-06-24 16:42:02 +020037 "net"
Lorenz Bruna4ea9d02019-10-31 11:40:30 +010038 "net/url"
Serge Bazanskicb883e22020-07-06 17:47:55 +020039 "sync"
Lorenz Bruna4ea9d02019-10-31 11:40:30 +010040 "time"
41
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020042 "go.etcd.io/etcd/clientv3"
43 "go.etcd.io/etcd/clientv3/namespace"
44 "go.etcd.io/etcd/embed"
Lorenz Brunfc5dbc62020-05-28 12:18:07 +020045 "go.uber.org/atomic"
Hendrik Hofstadt8efe51e2020-02-28 12:53:41 +010046
Serge Bazanski31370b02021-01-07 16:31:14 +010047 node "source.monogon.dev/metropolis/node"
48 "source.monogon.dev/metropolis/node/core/consensus/ca"
49 "source.monogon.dev/metropolis/node/core/localstorage"
50 "source.monogon.dev/metropolis/pkg/supervisor"
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020051)
52
53const (
Serge Bazanski662b5b32020-12-21 13:49:00 +010054 DefaultClusterToken = "METROPOLIS"
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020055 DefaultLogger = "zap"
56)
57
Serge Bazanskicb883e22020-07-06 17:47:55 +020058// Service is the etcd cluster member service.
59type Service struct {
60 // The configuration with which the service was started. This is immutable.
61 config *Config
Lorenz Bruna4ea9d02019-10-31 11:40:30 +010062
Serge Bazanskicb883e22020-07-06 17:47:55 +020063 // stateMu guards state. This is locked internally on public methods of Service that require access to state. The
64 // state might be recreated on service restart.
65 stateMu sync.Mutex
66 state *state
67}
Lorenz Brun6e8f69c2019-11-18 10:44:24 +010068
Serge Bazanskicb883e22020-07-06 17:47:55 +020069// state is the runtime state of a running etcd member.
70type state struct {
71 etcd *embed.Etcd
72 ready atomic.Bool
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020073
Serge Bazanskicb883e22020-07-06 17:47:55 +020074 ca *ca.CA
75 // cl is an etcd client that loops back to the localy running etcd server. This runs over the Client unix domain
76 // socket that etcd starts.
77 cl *clientv3.Client
78}
Leopold Schabel68c58752019-11-14 21:00:59 +010079
Serge Bazanskicb883e22020-07-06 17:47:55 +020080type Config struct {
81 // Data directory (persistent, encrypted storage) for etcd.
82 Data *localstorage.DataEtcdDirectory
83 // Ephemeral directory for etcd.
84 Ephemeral *localstorage.EphemeralConsensusDirectory
Leopold Schabel68c58752019-11-14 21:00:59 +010085
Serge Bazanskicb883e22020-07-06 17:47:55 +020086 // Name is the cluster name. This must be the same amongst all etcd members within one cluster.
87 Name string
88 // NewCluster selects whether the etcd member will start a new cluster and bootstrap a CA and the first member
89 // certificate, or load existing PKI certificates from disk.
90 NewCluster bool
91 // InitialCluster sets the initial cluster peer URLs when NewCluster is set, and is ignored otherwise. Usually this
92 // will be just the new, single server, and more members will be added later.
93 InitialCluster string
94 // ExternalHost is the IP address or hostname at which this cluster member is reachable to other cluster members.
95 ExternalHost string
96 // ListenHost is the IP address or hostname at which this cluster member will listen.
97 ListenHost string
98 // Port is the port at which this cluster member will listen for other members. If zero, defaults to the global
Serge Bazanski662b5b32020-12-21 13:49:00 +010099 // Metropolis setting.
Serge Bazanskicb883e22020-07-06 17:47:55 +0200100 Port int
101}
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200102
Serge Bazanskicb883e22020-07-06 17:47:55 +0200103func New(config Config) *Service {
104 return &Service{
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200105 config: &config,
106 }
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200107}
108
Serge Bazanskicb883e22020-07-06 17:47:55 +0200109// configure transforms the service configuration into an embedded etcd configuration. This is pure and side effect
110// free.
111func (s *Service) configure(ctx context.Context) (*embed.Config, error) {
112 if err := s.config.Ephemeral.MkdirAll(0700); err != nil {
113 return nil, fmt.Errorf("failed to create ephemeral directory: %w", err)
114 }
115 if err := s.config.Data.MkdirAll(0700); err != nil {
116 return nil, fmt.Errorf("failed to create data directory: %w", err)
117 }
Lorenz Brun52f7f292020-06-24 16:42:02 +0200118
Serge Bazanskicb883e22020-07-06 17:47:55 +0200119 port := s.config.Port
120 if port == 0 {
Serge Bazanski549b72b2021-01-07 14:54:19 +0100121 port = node.ConsensusPort
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200122 }
123
124 cfg := embed.NewConfig()
125
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200126 cfg.Name = s.config.Name
Serge Bazanskicb883e22020-07-06 17:47:55 +0200127 cfg.Dir = s.config.Data.Data.FullPath()
128 cfg.InitialClusterToken = DefaultClusterToken
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200129
Serge Bazanskicb883e22020-07-06 17:47:55 +0200130 cfg.PeerTLSInfo.CertFile = s.config.Data.PeerPKI.Certificate.FullPath()
131 cfg.PeerTLSInfo.KeyFile = s.config.Data.PeerPKI.Key.FullPath()
132 cfg.PeerTLSInfo.TrustedCAFile = s.config.Data.PeerPKI.CACertificate.FullPath()
133 cfg.PeerTLSInfo.ClientCertAuth = true
134 cfg.PeerTLSInfo.CRLFile = s.config.Data.PeerCRL.FullPath()
135
136 cfg.LCUrls = []url.URL{{
137 Scheme: "unix",
138 Path: s.config.Ephemeral.ClientSocket.FullPath() + ":0",
139 }}
140 cfg.ACUrls = []url.URL{}
141 cfg.LPUrls = []url.URL{{
142 Scheme: "https",
143 Host: fmt.Sprintf("%s:%d", s.config.ListenHost, port),
144 }}
145 cfg.APUrls = []url.URL{{
146 Scheme: "https",
147 Host: fmt.Sprintf("%s:%d", s.config.ExternalHost, port),
148 }}
149
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200150 if s.config.NewCluster {
151 cfg.ClusterState = "new"
152 cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
153 } else if s.config.InitialCluster != "" {
154 cfg.ClusterState = "existing"
155 cfg.InitialCluster = s.config.InitialCluster
156 }
157
Serge Bazanskic7359672020-10-30 16:38:57 +0100158 // TODO(q3k): pipe logs from etcd to supervisor.RawLogger via a file.
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200159 cfg.Logger = DefaultLogger
Serge Bazanskic7359672020-10-30 16:38:57 +0100160 cfg.LogOutputs = []string{"stderr"}
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200161
Serge Bazanskicb883e22020-07-06 17:47:55 +0200162 return cfg, nil
163}
164
165// Run is a Supervisor runnable that starts the etcd member service. It will become healthy once the member joins the
166// cluster successfully.
167func (s *Service) Run(ctx context.Context) error {
168 st := &state{
169 ready: *atomic.NewBool(false),
170 }
171 s.stateMu.Lock()
172 s.state = st
173 s.stateMu.Unlock()
174
175 if s.config.NewCluster {
176 // Expect certificate to be absent from disk.
177 absent, err := s.config.Data.PeerPKI.AllAbsent()
178 if err != nil {
179 return fmt.Errorf("checking certificate existence: %w", err)
180 }
181 if !absent {
182 return fmt.Errorf("want new cluster, but certificates already exist on disk")
183 }
184
185 // Generate CA, keep in memory, write it down in etcd later.
Serge Bazanski662b5b32020-12-21 13:49:00 +0100186 st.ca, err = ca.New("Metropolis etcd peer Root CA")
Serge Bazanskicb883e22020-07-06 17:47:55 +0200187 if err != nil {
188 return fmt.Errorf("when creating new cluster's peer CA: %w", err)
189 }
190
191 ip := net.ParseIP(s.config.ExternalHost)
192 if ip == nil {
193 return fmt.Errorf("configued external host is not an IP address (got %q)", s.config.ExternalHost)
194 }
195
196 cert, key, err := st.ca.Issue(ctx, nil, s.config.Name, ip)
197 if err != nil {
198 return fmt.Errorf("when issuing new cluster's first certificate: %w", err)
199 }
200
Serge Bazanskif12bedf2021-01-15 16:58:50 +0100201 if err := s.config.Data.PeerPKI.MkdirAll(0700); err != nil {
Serge Bazanskicb883e22020-07-06 17:47:55 +0200202 return fmt.Errorf("when creating PKI directory: %w", err)
203 }
204 if err := s.config.Data.PeerPKI.CACertificate.Write(pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: st.ca.CACertRaw}), 0600); err != nil {
205 return fmt.Errorf("when writing CA certificate to disk: %w", err)
206 }
207 if err := s.config.Data.PeerPKI.Certificate.Write(pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: cert}), 0600); err != nil {
208 return fmt.Errorf("when writing certificate to disk: %w", err)
209 }
210 if err := s.config.Data.PeerPKI.Key.Write(pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: key}), 0600); err != nil {
211 return fmt.Errorf("when writing certificate to disk: %w", err)
212 }
213 } else {
214 // Expect certificate to be present on disk.
215 present, err := s.config.Data.PeerPKI.AllExist()
216 if err != nil {
217 return fmt.Errorf("checking certificate existence: %w", err)
218 }
219 if !present {
220 return fmt.Errorf("want existing cluster, but certificate is missing from disk")
221 }
222 }
223
Serge Bazanskif12bedf2021-01-15 16:58:50 +0100224 if err := s.config.Data.MkdirAll(0700); err != nil {
Serge Bazanskicb883e22020-07-06 17:47:55 +0200225 return fmt.Errorf("failed to create data directory; %w", err)
226 }
227
228 cfg, err := s.configure(ctx)
229 if err != nil {
230 return fmt.Errorf("when configuring etcd: %w", err)
231 }
232
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200233 server, err := embed.StartEtcd(cfg)
Serge Bazanskicb883e22020-07-06 17:47:55 +0200234 keep := false
235 defer func() {
236 if !keep && server != nil {
237 server.Close()
238 }
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200239 }()
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100240 if err != nil {
Serge Bazanskicb883e22020-07-06 17:47:55 +0200241 return fmt.Errorf("failed to start etcd: %w", err)
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100242 }
Serge Bazanskicb883e22020-07-06 17:47:55 +0200243 st.etcd = server
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100244
Serge Bazanskicb883e22020-07-06 17:47:55 +0200245 supervisor.Logger(ctx).Info("waiting for etcd...")
Leopold Schabel68c58752019-11-14 21:00:59 +0100246
Serge Bazanskicb883e22020-07-06 17:47:55 +0200247 okay := true
248 select {
249 case <-st.etcd.Server.ReadyNotify():
250 case <-ctx.Done():
251 okay = false
Lorenz Brun52f7f292020-06-24 16:42:02 +0200252 }
253
Serge Bazanskicb883e22020-07-06 17:47:55 +0200254 if !okay {
255 supervisor.Logger(ctx).Info("context done, aborting wait")
256 return ctx.Err()
Lorenz Brun52f7f292020-06-24 16:42:02 +0200257 }
258
Serge Bazanskicb883e22020-07-06 17:47:55 +0200259 socket := s.config.Ephemeral.ClientSocket.FullPath()
260 cl, err := clientv3.New(clientv3.Config{
261 Endpoints: []string{fmt.Sprintf("unix://%s:0", socket)},
262 DialTimeout: time.Second,
Lorenz Brun52f7f292020-06-24 16:42:02 +0200263 })
264 if err != nil {
Serge Bazanskicb883e22020-07-06 17:47:55 +0200265 return fmt.Errorf("failed to connect to new etcd instance: %w", err)
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100266 }
Serge Bazanskicb883e22020-07-06 17:47:55 +0200267 st.cl = cl
268
269 if s.config.NewCluster {
270 if st.ca == nil {
271 panic("peerCA has not been generated")
272 }
273
274 // Save new CA into etcd.
275 err = st.ca.Save(ctx, cl.KV)
276 if err != nil {
277 return fmt.Errorf("failed to save new CA to etcd: %w", err)
278 }
279 } else {
280 // Load existing CA from etcd.
281 st.ca, err = ca.Load(ctx, cl.KV)
282 if err != nil {
283 return fmt.Errorf("failed to load CA from etcd: %w", err)
284 }
285 }
286
287 // Start CRL watcher.
288 if err := supervisor.Run(ctx, "crl", s.watchCRL); err != nil {
289 return fmt.Errorf("failed to start CRL watcher: %w", err)
290 }
291 // Start autopromoter.
292 if err := supervisor.Run(ctx, "autopromoter", s.autopromoter); err != nil {
293 return fmt.Errorf("failed to start autopromoter: %w", err)
294 }
295
296 supervisor.Logger(ctx).Info("etcd is now ready")
297 keep = true
298 st.ready.Store(true)
299 supervisor.Signal(ctx, supervisor.SignalHealthy)
300
301 <-ctx.Done()
302 st.etcd.Close()
303 return ctx.Err()
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100304}
305
Serge Bazanskicb883e22020-07-06 17:47:55 +0200306// watchCRL is a sub-runnable of the etcd cluster member service that updates the on-local-storage CRL to match the
307// newest available version in etcd.
308func (s *Service) watchCRL(ctx context.Context) error {
309 s.stateMu.Lock()
310 cl := s.state.cl
311 ca := s.state.ca
312 s.stateMu.Unlock()
313
314 supervisor.Signal(ctx, supervisor.SignalHealthy)
315 for e := range ca.WaitCRLChange(ctx, cl.KV, cl.Watcher) {
316 if e.Err != nil {
317 return fmt.Errorf("watching CRL: %w", e.Err)
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100318 }
Serge Bazanskicb883e22020-07-06 17:47:55 +0200319
320 if err := s.config.Data.PeerCRL.Write(e.CRL, 0600); err != nil {
321 return fmt.Errorf("saving CRL: %w", err)
322 }
323 }
324
325 // unreachable
326 return nil
327}
328
329func (s *Service) autopromoter(ctx context.Context) error {
330 t := time.NewTicker(5 * time.Second)
331 defer t.Stop()
332
333 autopromote := func() {
334 s.stateMu.Lock()
335 st := s.state
336 s.stateMu.Unlock()
337
338 if st.etcd.Server.Leader() != st.etcd.Server.ID() {
339 return
340 }
341
342 for _, member := range st.etcd.Server.Cluster().Members() {
343 if !member.IsLearner {
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100344 continue
345 }
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100346
Serge Bazanskicb883e22020-07-06 17:47:55 +0200347 // We always call PromoteMember since the metadata necessary to decide if we should is private.
348 // Luckily etcd already does sanity checks internally and will refuse to promote nodes that aren't
349 // connected or are still behind on transactions.
350 if _, err := st.etcd.Server.PromoteMember(ctx, uint64(member.ID)); err != nil {
Serge Bazanskic7359672020-10-30 16:38:57 +0100351 supervisor.Logger(ctx).Infof("Failed to promote consensus node %s: %v", member.Name, err)
Serge Bazanskicb883e22020-07-06 17:47:55 +0200352 } else {
Serge Bazanskic7359672020-10-30 16:38:57 +0100353 supervisor.Logger(ctx).Infof("Promoted new consensus node %s", member.Name)
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100354 }
355 }
356 }
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100357
Serge Bazanskicb883e22020-07-06 17:47:55 +0200358 for {
359 select {
360 case <-ctx.Done():
361 return ctx.Err()
362 case <-t.C:
363 autopromote()
Lorenz Brun52f7f292020-06-24 16:42:02 +0200364 }
365 }
366}
367
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200368// IsReady returns whether etcd is ready and synced
369func (s *Service) IsReady() bool {
Serge Bazanskicb883e22020-07-06 17:47:55 +0200370 s.stateMu.Lock()
371 defer s.stateMu.Unlock()
372 if s.state == nil {
373 return false
374 }
375 return s.state.ready.Load()
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200376}
377
Serge Bazanskicb883e22020-07-06 17:47:55 +0200378func (s *Service) WaitReady(ctx context.Context) error {
379 // TODO(q3k): reimplement the atomic ready flag as an event synchronization mechanism
380 if s.IsReady() {
381 return nil
382 }
383 t := time.NewTicker(100 * time.Millisecond)
384 defer t.Stop()
385 for {
386 select {
387 case <-ctx.Done():
388 return ctx.Err()
389 case <-t.C:
390 if s.IsReady() {
391 return nil
392 }
393 }
394 }
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200395}
396
Serge Bazanskicb883e22020-07-06 17:47:55 +0200397// KV returns and etcd KV client interface to the etcd member/cluster.
398func (s *Service) KV(module, space string) clientv3.KV {
399 s.stateMu.Lock()
400 defer s.stateMu.Unlock()
401 return namespace.NewKV(s.state.cl.KV, fmt.Sprintf("%s:%s", module, space))
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200402}
403
Serge Bazanskicb883e22020-07-06 17:47:55 +0200404func (s *Service) KVRoot() clientv3.KV {
405 s.stateMu.Lock()
406 defer s.stateMu.Unlock()
407 return s.state.cl.KV
408}
409
410func (s *Service) Cluster() clientv3.Cluster {
411 s.stateMu.Lock()
412 defer s.stateMu.Unlock()
413 return s.state.cl.Cluster
414}
415
416// MemberInfo returns information about this etcd cluster member: its ID and name. This will block until this
417// information is available (ie. the cluster status is Ready).
418func (s *Service) MemberInfo(ctx context.Context) (id uint64, name string, err error) {
419 if err = s.WaitReady(ctx); err != nil {
420 err = fmt.Errorf("when waiting for cluster readiness: %w", err)
421 return
422 }
423
424 s.stateMu.Lock()
425 defer s.stateMu.Unlock()
426 id = uint64(s.state.etcd.Server.ID())
427 name = s.config.Name
428 return
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200429}