blob: 8da53d6ec82c92fc4d50efcf10e260eac48d7f62 [file] [log] [blame]
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +02001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
Serge Bazanskif05e80a2021-10-12 11:53:34 +020017// Package consensus implements a runnable that manages an etcd instance which
18// forms part of a Metropolis etcd cluster. This cluster is a foundational
19// building block of Metropolis and its startup/management sequencing needs to
20// be as robust as possible.
Serge Bazanskicb883e22020-07-06 17:47:55 +020021//
Serge Bazanskif05e80a2021-10-12 11:53:34 +020022// Cluster Structure
Serge Bazanskicb883e22020-07-06 17:47:55 +020023//
Serge Bazanskif05e80a2021-10-12 11:53:34 +020024// Each etcd instance listens for two kinds of traffic:
Serge Bazanskicb883e22020-07-06 17:47:55 +020025//
Serge Bazanskif05e80a2021-10-12 11:53:34 +020026// 1. Peer traffic over TLS on a TCP port of the node's main interface. This is
27// where other etcd instances connect to to exchange peer traffic, perform
28// transactions and build quorum. The TLS credentials are stored in a PKI that
29// is managed internally by the consensus runnable, with its state stored in
30// etcd itself.
31//
32// 2. Client traffic over a local domain socket, with access control based on
33// standard Linux user/group permissions. Currently this allows any code running
34// as root on the host namespace full access to the etcd cluster.
35//
36// This means that if code running on a node wishes to perform etcd
37// transactions, it must also run an etcd instance. This colocation of all
38// direct etcd access and the etcd intances themselves effectively delegate all
39// Metropolis control plane functionality to whatever subset of nodes is running
40// consensus and all codes that connects to etcd directly (the Curator).
41//
42// For example, if nodes foo and bar are parts of the control plane, but node
43// worker is not:
44//
45// .---------------------.
46// | node-foo |
47// |---------------------|
48// | .--------------------.
49// | | etcd |<---etcd/TLS--. (node.ConsensusPort)
50// | '--------------------' |
51// | ^ Domain Socket | |
52// | | etcd/plain | |
53// | .--------------------. |
54// | | curator |<---gRPC/TLS----. (node.CuratorServicePort)
55// | '--------------------' | |
56// | ^ Domain Socket | | |
57// | | gRPC/plain | | |
58// | .-----------------. | | |
59// | | node logic | | | |
60// | '-----------------' | | |
61// '---------------------' | |
62// | |
63// .---------------------. | |
64// | node-baz | | |
65// |---------------------| | |
66// | .--------------------. | |
67// | | etcd |<-------------' |
68// | '--------------------' |
69// | ^ Domain Socket | |
70// | | gRPC/plain | |
71// | .--------------------. |
72// | | curator |<---gRPC/TLS----:
73// | '--------------------' |
74// | ... | |
75// '---------------------' |
76// |
77// .---------------------. |
78// | node-worker | |
79// |---------------------| |
80// | .-----------------. | |
81// | | node logic |-------------------'
82// | '-----------------' |
83// '---------------------'
84//
85
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020086package consensus
87
88import (
89 "context"
Serge Bazanskif05e80a2021-10-12 11:53:34 +020090 "crypto/ed25519"
91 "crypto/x509"
92 "crypto/x509/pkix"
Lorenz Bruna6223792023-07-31 17:13:11 +020093 "errors"
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020094 "fmt"
Serge Bazanskif05e80a2021-10-12 11:53:34 +020095 "math/big"
Serge Bazanskic1cb37c2023-03-16 17:54:33 +010096 "net"
97 "net/url"
Lorenz Bruna4ea9d02019-10-31 11:40:30 +010098 "time"
99
Jan Schär442cf682024-09-05 18:28:48 +0200100 "go.etcd.io/etcd/api/v3/etcdserverpb"
Lorenz Brund13c1c62022-03-30 19:58:58 +0200101 clientv3 "go.etcd.io/etcd/client/v3"
102 "go.etcd.io/etcd/server/v3/embed"
Hendrik Hofstadt8efe51e2020-02-28 12:53:41 +0100103
Serge Bazanskia105db52021-04-12 19:57:46 +0200104 "source.monogon.dev/metropolis/node/core/consensus/client"
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200105 "source.monogon.dev/metropolis/node/core/identity"
Tim Windelschmidt9f21f532024-05-07 15:14:20 +0200106 "source.monogon.dev/osbase/event"
107 "source.monogon.dev/osbase/event/memory"
108 "source.monogon.dev/osbase/logtree/unraw"
109 "source.monogon.dev/osbase/pki"
110 "source.monogon.dev/osbase/supervisor"
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200111)
112
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200113var (
114 pkiNamespace = pki.Namespaced("/pki/")
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200115)
116
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200117func pkiCA() *pki.Certificate {
118 return &pki.Certificate{
119 Name: "CA",
120 Namespace: &pkiNamespace,
121 Issuer: pki.SelfSigned,
122 Template: x509.Certificate{
123 SerialNumber: big.NewInt(1),
124 Subject: pkix.Name{
125 CommonName: "Metropolis etcd CA Certificate",
126 },
127 IsCA: true,
128 KeyUsage: x509.KeyUsageCertSign | x509.KeyUsageCRLSign | x509.KeyUsageDigitalSignature,
129 ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageOCSPSigning},
130 },
131 }
132}
133
134func pkiPeerCertificate(pubkey ed25519.PublicKey, extraNames []string) x509.Certificate {
135 return x509.Certificate{
136 Subject: pkix.Name{
137 CommonName: identity.NodeID(pubkey),
138 },
139 KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageKeyEncipherment,
140 ExtKeyUsage: []x509.ExtKeyUsage{
141 x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth,
142 },
143 DNSNames: append(extraNames, identity.NodeID(pubkey)),
144 }
145}
146
Jan Schär442cf682024-09-05 18:28:48 +0200147// GetEtcdMemberNodeId returns the node ID of an etcd member. It works even for
148// members which have not started, where member.Name is empty.
149func GetEtcdMemberNodeId(member *etcdserverpb.Member) string {
150 if member.Name != "" {
151 return member.Name
152 }
153 if len(member.PeerURLs) == 0 {
154 return ""
155 }
156 u, err := url.Parse(member.PeerURLs[0])
157 if err != nil {
158 return ""
159 }
160 nodeId, _, err := net.SplitHostPort(u.Host)
161 if err != nil {
162 return ""
163 }
164 return nodeId
165}
166
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200167// Service is the etcd cluster member service. See package-level documentation
168// for more information.
Serge Bazanskicb883e22020-07-06 17:47:55 +0200169type Service struct {
Serge Bazanskicb883e22020-07-06 17:47:55 +0200170 config *Config
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100171
Serge Bazanski37110c32023-03-01 13:57:27 +0000172 value memory.Value[*Status]
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200173 ca *pki.Certificate
Serge Bazanskicb883e22020-07-06 17:47:55 +0200174}
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200175
Serge Bazanskicb883e22020-07-06 17:47:55 +0200176func New(config Config) *Service {
177 return &Service{
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200178 config: &config,
179 }
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200180}
181
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200182// Run is a Supervisor runnable that starts the etcd member service. It will
183// become healthy once the member joins the cluster successfully.
184func (s *Service) Run(ctx context.Context) error {
185 // Always re-create CA to make sure we don't have PKI state from previous runs.
186 //
187 // TODO(q3k): make the PKI library immune to this misuse.
188 s.ca = pkiCA()
Lorenz Brun52f7f292020-06-24 16:42:02 +0200189
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200190 // Create log converter. This will ingest etcd logs and pipe them out to this
191 // runnable's leveled logging facilities.
Serge Bazanskic1cb37c2023-03-16 17:54:33 +0100192
193 // This is not where etcd will run, but where its log ingestion machinery lives.
194 // This ensures that the (annoying verbose) etcd logs are contained into just
195 // .etcd.
196 err := supervisor.Run(ctx, "etcd", func(ctx context.Context) error {
197 converter := unraw.Converter{
198 Parser: parseEtcdLogEntry,
199 MaximumLineLength: 8192,
200 LeveledLogger: supervisor.Logger(ctx),
201 }
Serge Bazanski5ad31442024-04-17 15:40:52 +0200202 pipe, err := converter.NamedPipeReader(s.config.Ephemeral.ServerLogsFIFO.FullPath())
Serge Bazanskic1cb37c2023-03-16 17:54:33 +0100203 if err != nil {
204 return fmt.Errorf("when creating pipe reader: %w", err)
205 }
206 if err := supervisor.Run(ctx, "piper", pipe); err != nil {
207 return fmt.Errorf("when starting log piper: %w", err)
208 }
209 supervisor.Signal(ctx, supervisor.SignalHealthy)
210 <-ctx.Done()
211 return ctx.Err()
212 })
Serge Bazanski50009e02021-07-07 14:35:27 +0200213 if err != nil {
Serge Bazanskic1cb37c2023-03-16 17:54:33 +0100214 return fmt.Errorf("when starting etcd logger: %w", err)
Serge Bazanski50009e02021-07-07 14:35:27 +0200215 }
216
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200217 // Create autopromoter, which will automatically promote all learners to full
218 // etcd members.
219 if err := supervisor.Run(ctx, "autopromoter", s.autopromoter); err != nil {
220 return fmt.Errorf("when starting autopromtoer: %w", err)
221 }
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200222
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200223 // Create selfupdater, which will perform a one-shot update of this member's
224 // peer address in etcd.
Mateusz Zalega619029b2022-05-05 17:18:26 +0200225 if err := supervisor.Run(ctx, "selfupdater", s.selfupdater); err != nil {
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200226 return fmt.Errorf("when starting selfupdater: %w", err)
227 }
228
229 // Prepare cluster PKI credentials.
230 ppki := s.config.Data.PeerPKI
231 jc := s.config.JoinCluster
232 if jc != nil {
Serge Bazanski97d68082022-06-22 13:15:21 +0200233 supervisor.Logger(ctx).Info("JoinCluster set, writing PPKI data to disk...")
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200234 // For nodes that join an existing cluster, or re-join it, always write whatever
235 // we've been given on startup.
236 if err := ppki.WriteAll(jc.NodeCertificate.Raw, s.config.NodePrivateKey, jc.CACertificate.Raw); err != nil {
237 return fmt.Errorf("when writing credentials for join: %w", err)
238 }
239 if err := s.config.Data.PeerCRL.Write(jc.InitialCRL.Raw, 0400); err != nil {
240 return fmt.Errorf("when writing CRL for join: %w", err)
241 }
242 } else {
243 // For other nodes, we should already have credentials from a previous join, or
244 // a previous bootstrap. If none exist, assume we need to bootstrap these
245 // credentials.
246 //
247 // TODO(q3k): once we have node join (ie. node restart from disk) flow, add a
248 // special configuration marker to prevent spurious bootstraps.
249 absent, err := ppki.AllAbsent()
250 if err != nil {
251 return fmt.Errorf("when checking for PKI file absence: %w", err)
252 }
253 if absent {
Serge Bazanski97d68082022-06-22 13:15:21 +0200254 supervisor.Logger(ctx).Info("PKI data absent, bootstrapping.")
Serge Bazanski5ad31442024-04-17 15:40:52 +0200255 if err := s.bootstrap(ctx); err != nil {
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200256 return fmt.Errorf("bootstrap failed: %w", err)
257 }
258 } else {
259 supervisor.Logger(ctx).Info("PKI data present, not bootstrapping.")
260 }
261 }
262
Serge Bazanskic1cb37c2023-03-16 17:54:33 +0100263 // If we're joining a cluster, make sure that our peers are actually DNS
264 // resolvable. This prevents us from immediately failing due to transient DNS
265 // issues.
266 if jc := s.config.JoinCluster; jc != nil {
267 supervisor.Logger(ctx).Infof("Waiting for initial peers to be DNS resolvable...")
268 startLogging := time.Now().Add(5 * time.Second)
269 for {
270 allOkay := true
271 shouldLog := time.Now().After(startLogging)
272 for _, node := range jc.ExistingNodes {
Tim Windelschmidtd5cabde2024-04-19 02:56:46 +0200273 u, err := url.Parse(node.URL)
Serge Bazanskic1cb37c2023-03-16 17:54:33 +0100274 if err != nil {
275 // Just pretend this node is up. If the URL is really bad, etcd will complain
276 // more clearly than us. This shouldn't happen, anyway.
Tim Windelschmidtd5cabde2024-04-19 02:56:46 +0200277 continue
Serge Bazanskic1cb37c2023-03-16 17:54:33 +0100278 }
279 host := u.Hostname()
Tim Windelschmidtd5cabde2024-04-19 02:56:46 +0200280 if _, err := net.LookupIP(host); err == nil {
Serge Bazanskic1cb37c2023-03-16 17:54:33 +0100281 continue
282 }
283 if shouldLog {
284 supervisor.Logger(ctx).Errorf("Still can't resolve peer %s (%s): %v", node.Name, host, err)
285 }
286 allOkay = false
287 }
288 if allOkay {
289 supervisor.Logger(ctx).Infof("All peers resolvable, continuing startup.")
290 break
291 }
292
293 time.Sleep(100 * time.Millisecond)
294 if shouldLog {
295 startLogging = time.Now().Add(5 * time.Second)
296 }
297 }
298 }
299
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200300 // Start etcd ...
Serge Bazanskic1cb37c2023-03-16 17:54:33 +0100301 supervisor.Logger(ctx).Infof("Starting etcd...")
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200302 cfg := s.config.build(true)
303 server, err := embed.StartEtcd(cfg)
304 if err != nil {
305 return fmt.Errorf("when starting etcd: %w", err)
306 }
307
308 // ... wait for server to be ready...
309 select {
310 case <-ctx.Done():
311 return ctx.Err()
312 case <-server.Server.ReadyNotify():
313 }
314
315 // ... build a client to its' socket...
316 cl, err := s.config.localClient()
317 if err != nil {
318 return fmt.Errorf("getting local client failed: %w", err)
319 }
320
321 // ... and wait until we're not a learner anymore.
322 for {
323 members, err := cl.MemberList(ctx)
324 if err != nil {
325 supervisor.Logger(ctx).Warningf("MemberList failed: %v", err)
326 time.Sleep(time.Second)
327 continue
328 }
329
330 isMember := false
331 for _, member := range members.Members {
332 if member.ID != uint64(server.Server.ID()) {
333 continue
334 }
335 if !member.IsLearner {
336 isMember = true
337 break
338 }
339 }
340 if isMember {
341 break
342 }
343 supervisor.Logger(ctx).Warningf("Still a learner, waiting...")
344 time.Sleep(time.Second)
345 }
346
347 // All done! Report status.
348 supervisor.Logger(ctx).Infof("etcd server ready")
349
350 st := &Status{
Lorenz Brun6211e4d2023-11-14 19:09:40 +0100351 localPeerURL: cfg.AdvertisePeerUrls[0].String(),
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200352 localMemberID: uint64(server.Server.ID()),
353 cl: cl,
354 ca: s.ca,
355 }
Serge Bazanski98a6ccc2023-06-20 13:09:12 +0200356 st2 := *st
357 s.value.Set(&st2)
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200358
359 // Wait until server dies for whatever reason, update status when that
360 // happens.
361 supervisor.Signal(ctx, supervisor.SignalHealthy)
362 select {
363 case err = <-server.Err():
364 err = fmt.Errorf("server returned error: %w", err)
365 case <-ctx.Done():
366 server.Close()
367 err = ctx.Err()
368 }
Serge Bazanski98a6ccc2023-06-20 13:09:12 +0200369
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200370 st.stopped = true
Serge Bazanski98a6ccc2023-06-20 13:09:12 +0200371 st3 := *st
372 s.value.Set(&st3)
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200373 return err
Serge Bazanskicb883e22020-07-06 17:47:55 +0200374}
375
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200376func clientFor(kv *clientv3.Client, parts ...string) (client.Namespaced, error) {
377 var err error
378 namespaced := client.NewLocal(kv)
379 for _, el := range parts {
380 namespaced, err = namespaced.Sub(el)
Serge Bazanskicb883e22020-07-06 17:47:55 +0200381 if err != nil {
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200382 return nil, fmt.Errorf("when getting sub client: %w", err)
Serge Bazanskicb883e22020-07-06 17:47:55 +0200383 }
Serge Bazanskicb883e22020-07-06 17:47:55 +0200384
Serge Bazanskicb883e22020-07-06 17:47:55 +0200385 }
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200386 return namespaced, nil
387}
Serge Bazanskicb883e22020-07-06 17:47:55 +0200388
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200389// bootstrap performs a procedure to resolve the following bootstrap problems:
390// in order to start an etcd server for consensus, we need it to serve over TLS.
391// However, these TLS certificates also need to be stored in etcd so that
392// further certificates can be issued for new nodes.
393//
394// This was previously solved by a using a special PKI/TLS management system that
395// could first create certificates and keys in memory, then only commit them to
396// etcd. However, this ended up being somewhat brittle in the face of startup
397// sequencing issues, so we're now going with a different approach.
398//
399// This function starts an etcd instance first without any PKI/TLS support,
400// without listening on any external port for peer traffic. Once the instance is
401// running, it uses the standard metropolis pki library to create all required
402// data directly in the running etcd instance. It then writes all required
403// startup data (node private key, member certificate, CA certificate) to disk,
404// so that a 'full' etcd instance can be started.
Serge Bazanski5ad31442024-04-17 15:40:52 +0200405func (s *Service) bootstrap(ctx context.Context) error {
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200406 supervisor.Logger(ctx).Infof("Bootstrapping PKI: starting etcd...")
Serge Bazanskicb883e22020-07-06 17:47:55 +0200407
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200408 cfg := s.config.build(false)
409 // This will make etcd create data directories and create a fully new cluster if
410 // needed. If we're restarting due to an error, the old cluster data will still
411 // exist.
412 cfg.ClusterState = "new"
Serge Bazanskicb883e22020-07-06 17:47:55 +0200413
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200414 // Start the bootstrap etcd instance...
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200415 server, err := embed.StartEtcd(cfg)
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100416 if err != nil {
Serge Bazanskib76b8d12023-03-16 00:46:56 +0100417 return fmt.Errorf("failed to start bootstrap etcd: %w", err)
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100418 }
Serge Bazanskib76b8d12023-03-16 00:46:56 +0100419 defer server.Close()
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100420
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200421 // ... wait for it to run ...
Serge Bazanskicb883e22020-07-06 17:47:55 +0200422 select {
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200423 case <-server.Server.ReadyNotify():
Serge Bazanskicb883e22020-07-06 17:47:55 +0200424 case <-ctx.Done():
Lorenz Bruna6223792023-07-31 17:13:11 +0200425 return errors.New("timed out waiting for etcd to become ready")
Lorenz Brun52f7f292020-06-24 16:42:02 +0200426 }
427
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200428 // ... create a client to it ...
429 cl, err := s.config.localClient()
Lorenz Brun52f7f292020-06-24 16:42:02 +0200430 if err != nil {
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200431 return fmt.Errorf("when getting bootstrap client: %w", err)
Serge Bazanskicb883e22020-07-06 17:47:55 +0200432 }
433
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200434 // ... and build PKI there. This is idempotent, so we will never override
435 // anything that's already in the cluster, instead just retrieve it.
436 supervisor.Logger(ctx).Infof("Bootstrapping PKI: etcd running, building PKI...")
437 clPKI, err := clientFor(cl, "namespaced", "etcd-pki")
438 if err != nil {
439 return fmt.Errorf("when getting pki client: %w", err)
Serge Bazanskicb883e22020-07-06 17:47:55 +0200440 }
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200441 defer clPKI.Close()
442 caCert, err := s.ca.Ensure(ctx, clPKI)
443 if err != nil {
444 return fmt.Errorf("failed to ensure CA certificate: %w", err)
Serge Bazanskicb883e22020-07-06 17:47:55 +0200445 }
446
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200447 // If we're running with a test overridden external address (eg. localhost), we
448 // need to also make that part of the member certificate.
449 var extraNames []string
450 if external := s.config.testOverrides.externalAddress; external != "" {
451 extraNames = []string{external}
452 }
453 memberTemplate := pki.Certificate{
454 Name: identity.NodeID(s.config.nodePublicKey()),
455 Namespace: &pkiNamespace,
456 Issuer: s.ca,
457 Template: pkiPeerCertificate(s.config.nodePublicKey(), extraNames),
458 Mode: pki.CertificateExternal,
459 PublicKey: s.config.nodePublicKey(),
460 }
461 memberCert, err := memberTemplate.Ensure(ctx, clPKI)
462 if err != nil {
463 return fmt.Errorf("failed to ensure member certificate: %w", err)
464 }
Serge Bazanskicb883e22020-07-06 17:47:55 +0200465
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200466 // Retrieve CRL.
467 crlW := s.ca.WatchCRL(clPKI)
468 crl, err := crlW.Get(ctx)
469 if err != nil {
470 return fmt.Errorf("failed to retrieve initial CRL: %w", err)
471 }
472
473 // We have everything we need. Write things to disk.
474 supervisor.Logger(ctx).Infof("Bootstrapping PKI: certificates issued, writing to disk...")
475
476 if err := s.config.Data.PeerPKI.WriteAll(memberCert, s.config.NodePrivateKey, caCert); err != nil {
477 return fmt.Errorf("failed to write bootstrapped certificates: %w", err)
478 }
479 if err := s.config.Data.PeerCRL.Write(crl.Raw, 0400); err != nil {
480 return fmt.Errorf("failed tow rite CRL: %w", err)
481 }
482
483 // Stop the server synchronously (blocking until it's fully shutdown), and
484 // return. The caller can now run the 'full' etcd instance with PKI.
485 supervisor.Logger(ctx).Infof("Bootstrapping PKI: done, stopping server...")
486 server.Close()
Serge Bazanskicb883e22020-07-06 17:47:55 +0200487 return ctx.Err()
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100488}
489
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200490// autopromoter is a runnable which repeatedly attempts to promote etcd learners
491// in the cluster to full followers. This is needed to bring any new cluster
492// members (which are always added as learners) to full membership and make them
493// part of the etcd quorum.
Serge Bazanskicb883e22020-07-06 17:47:55 +0200494func (s *Service) autopromoter(ctx context.Context) error {
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200495 autopromote := func(ctx context.Context, cl *clientv3.Client) {
496 // Only autopromote if our endpoint is a leader. This is a bargain bin version
497 // of leader election: it's simple and cheap, but not very reliable. The most
498 // obvious failure mode is that the instance we contacted isn't a leader by the
499 // time we promote a member, but that's fine - the promotion is idempotent. What
500 // we really use the 'leader election' here for isn't for consistency, but to
501 // prevent the cluster from being hammered by spurious leadership promotion
502 // requests from every etcd member.
503 status, err := cl.Status(ctx, cl.Endpoints()[0])
504 if err != nil {
505 supervisor.Logger(ctx).Warningf("Failed to get endpoint status: %v", err)
Jan Schärb9769672024-04-09 15:31:40 +0200506 return
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200507 }
508 if status.Leader != status.Header.MemberId {
Serge Bazanskicb883e22020-07-06 17:47:55 +0200509 return
510 }
511
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200512 members, err := cl.MemberList(ctx)
513 if err != nil {
514 supervisor.Logger(ctx).Warningf("Failed to list members: %v", err)
515 return
516 }
517 for _, member := range members.Members {
Serge Bazanskicb883e22020-07-06 17:47:55 +0200518 if !member.IsLearner {
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100519 continue
520 }
Jan Schär442cf682024-09-05 18:28:48 +0200521 if member.Name == "" {
522 // If the name is empty, the member has not started.
523 continue
524 }
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200525 // Always call PromoteMember since the metadata necessary to decide if we should
526 // is private. Luckily etcd already does consistency checks internally and will
527 // refuse to promote nodes that aren't connected or are still behind on
528 // transactions.
529 if _, err := cl.MemberPromote(ctx, member.ID); err != nil {
Serge Bazanskic7359672020-10-30 16:38:57 +0100530 supervisor.Logger(ctx).Infof("Failed to promote consensus node %s: %v", member.Name, err)
Serge Bazanskicb883e22020-07-06 17:47:55 +0200531 } else {
Serge Bazanskic7359672020-10-30 16:38:57 +0100532 supervisor.Logger(ctx).Infof("Promoted new consensus node %s", member.Name)
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100533 }
534 }
535 }
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100536
Serge Bazanski37110c32023-03-01 13:57:27 +0000537 w := s.value.Watch()
Serge Bazanskicb883e22020-07-06 17:47:55 +0200538 for {
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200539 st, err := w.Get(ctx)
540 if err != nil {
541 return fmt.Errorf("status get failed: %w", err)
Lorenz Brun52f7f292020-06-24 16:42:02 +0200542 }
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200543 t := time.NewTicker(5 * time.Second)
544 for {
545 autopromote(ctx, st.cl)
546 select {
547 case <-ctx.Done():
548 t.Stop()
549 return ctx.Err()
550 case <-t.C:
Serge Bazanskicb883e22020-07-06 17:47:55 +0200551 }
552 }
553 }
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200554}
555
Serge Bazanski37110c32023-03-01 13:57:27 +0000556func (s *Service) Watch() event.Watcher[*Status] {
557 return s.value.Watch()
558}
559
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200560// selfupdater is a runnable that performs a one-shot (once per Service Run,
561// thus once for each configuration) update of the node's Peer URL in etcd. This
562// is currently only really needed because the first node in the cluster
563// bootstraps itself without any peer URLs at first, and this allows it to then
564// add the peer URLs afterwards. Instead of a runnable, this might as well have
565// been part of the bootstarp logic, but making it a restartable runnable is
566// more robust.
567func (s *Service) selfupdater(ctx context.Context) error {
568 supervisor.Signal(ctx, supervisor.SignalHealthy)
Serge Bazanski37110c32023-03-01 13:57:27 +0000569 w := s.value.Watch()
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200570 for {
571 st, err := w.Get(ctx)
572 if err != nil {
573 return fmt.Errorf("failed to get status: %w", err)
574 }
575
Serge Bazanski5839e972021-11-16 15:46:19 +0100576 if st.localPeerURL != "" {
577 supervisor.Logger(ctx).Infof("Updating local peer URL...")
578 peerURL := st.localPeerURL
579 if _, err := st.cl.MemberUpdate(ctx, st.localMemberID, []string{peerURL}); err != nil {
580 supervisor.Logger(ctx).Warningf("failed to update member: %v", err)
581 time.Sleep(1 * time.Second)
582 continue
583 }
584 } else {
585 supervisor.Logger(ctx).Infof("No local peer URL, not updating.")
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200586 }
587
588 supervisor.Signal(ctx, supervisor.SignalDone)
589 return nil
Serge Bazanskia105db52021-04-12 19:57:46 +0200590 }
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200591}