blob: f8861a7ce374422f1d4a57becf9348d0699d30db [file] [log] [blame]
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +02001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
Serge Bazanskif05e80a2021-10-12 11:53:34 +020017// Package consensus implements a runnable that manages an etcd instance which
18// forms part of a Metropolis etcd cluster. This cluster is a foundational
19// building block of Metropolis and its startup/management sequencing needs to
20// be as robust as possible.
Serge Bazanskicb883e22020-07-06 17:47:55 +020021//
Serge Bazanskif05e80a2021-10-12 11:53:34 +020022// Cluster Structure
Serge Bazanskicb883e22020-07-06 17:47:55 +020023//
Serge Bazanskif05e80a2021-10-12 11:53:34 +020024// Each etcd instance listens for two kinds of traffic:
Serge Bazanskicb883e22020-07-06 17:47:55 +020025//
Serge Bazanskif05e80a2021-10-12 11:53:34 +020026// 1. Peer traffic over TLS on a TCP port of the node's main interface. This is
27// where other etcd instances connect to to exchange peer traffic, perform
28// transactions and build quorum. The TLS credentials are stored in a PKI that
29// is managed internally by the consensus runnable, with its state stored in
30// etcd itself.
31//
32// 2. Client traffic over a local domain socket, with access control based on
33// standard Linux user/group permissions. Currently this allows any code running
34// as root on the host namespace full access to the etcd cluster.
35//
36// This means that if code running on a node wishes to perform etcd
37// transactions, it must also run an etcd instance. This colocation of all
38// direct etcd access and the etcd intances themselves effectively delegate all
39// Metropolis control plane functionality to whatever subset of nodes is running
40// consensus and all codes that connects to etcd directly (the Curator).
41//
42// For example, if nodes foo and bar are parts of the control plane, but node
43// worker is not:
44//
45// .---------------------.
46// | node-foo |
47// |---------------------|
48// | .--------------------.
49// | | etcd |<---etcd/TLS--. (node.ConsensusPort)
50// | '--------------------' |
51// | ^ Domain Socket | |
52// | | etcd/plain | |
53// | .--------------------. |
54// | | curator |<---gRPC/TLS----. (node.CuratorServicePort)
55// | '--------------------' | |
56// | ^ Domain Socket | | |
57// | | gRPC/plain | | |
58// | .-----------------. | | |
59// | | node logic | | | |
60// | '-----------------' | | |
61// '---------------------' | |
62// | |
63// .---------------------. | |
64// | node-baz | | |
65// |---------------------| | |
66// | .--------------------. | |
67// | | etcd |<-------------' |
68// | '--------------------' |
69// | ^ Domain Socket | |
70// | | gRPC/plain | |
71// | .--------------------. |
72// | | curator |<---gRPC/TLS----:
73// | '--------------------' |
74// | ... | |
75// '---------------------' |
76// |
77// .---------------------. |
78// | node-worker | |
79// |---------------------| |
80// | .-----------------. | |
81// | | node logic |-------------------'
82// | '-----------------' |
83// '---------------------'
84//
85
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020086package consensus
87
88import (
89 "context"
Serge Bazanskif05e80a2021-10-12 11:53:34 +020090 "crypto/x509"
91 "crypto/x509/pkix"
Lorenz Bruna6223792023-07-31 17:13:11 +020092 "errors"
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020093 "fmt"
Serge Bazanskif05e80a2021-10-12 11:53:34 +020094 "math/big"
Serge Bazanskic1cb37c2023-03-16 17:54:33 +010095 "net"
96 "net/url"
Lorenz Bruna4ea9d02019-10-31 11:40:30 +010097 "time"
98
Jan Schär442cf682024-09-05 18:28:48 +020099 "go.etcd.io/etcd/api/v3/etcdserverpb"
Lorenz Brund13c1c62022-03-30 19:58:58 +0200100 clientv3 "go.etcd.io/etcd/client/v3"
101 "go.etcd.io/etcd/server/v3/embed"
Hendrik Hofstadt8efe51e2020-02-28 12:53:41 +0100102
Serge Bazanskia105db52021-04-12 19:57:46 +0200103 "source.monogon.dev/metropolis/node/core/consensus/client"
Tim Windelschmidt9f21f532024-05-07 15:14:20 +0200104 "source.monogon.dev/osbase/event"
105 "source.monogon.dev/osbase/event/memory"
106 "source.monogon.dev/osbase/logtree/unraw"
107 "source.monogon.dev/osbase/pki"
108 "source.monogon.dev/osbase/supervisor"
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200109)
110
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200111var (
112 pkiNamespace = pki.Namespaced("/pki/")
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200113)
114
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200115func pkiCA() *pki.Certificate {
116 return &pki.Certificate{
117 Name: "CA",
118 Namespace: &pkiNamespace,
119 Issuer: pki.SelfSigned,
120 Template: x509.Certificate{
121 SerialNumber: big.NewInt(1),
122 Subject: pkix.Name{
123 CommonName: "Metropolis etcd CA Certificate",
124 },
125 IsCA: true,
126 KeyUsage: x509.KeyUsageCertSign | x509.KeyUsageCRLSign | x509.KeyUsageDigitalSignature,
127 ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageOCSPSigning},
128 },
129 }
130}
131
Jan Schär39d9c242024-09-24 13:49:55 +0200132func pkiPeerCertificate(nodeID string, extraNames []string) x509.Certificate {
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200133 return x509.Certificate{
134 Subject: pkix.Name{
Jan Schär39d9c242024-09-24 13:49:55 +0200135 CommonName: nodeID,
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200136 },
137 KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageKeyEncipherment,
138 ExtKeyUsage: []x509.ExtKeyUsage{
139 x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth,
140 },
Jan Schär39d9c242024-09-24 13:49:55 +0200141 DNSNames: append(extraNames, nodeID),
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200142 }
143}
144
Jan Schär442cf682024-09-05 18:28:48 +0200145// GetEtcdMemberNodeId returns the node ID of an etcd member. It works even for
146// members which have not started, where member.Name is empty.
147func GetEtcdMemberNodeId(member *etcdserverpb.Member) string {
148 if member.Name != "" {
149 return member.Name
150 }
151 if len(member.PeerURLs) == 0 {
152 return ""
153 }
154 u, err := url.Parse(member.PeerURLs[0])
155 if err != nil {
156 return ""
157 }
158 nodeId, _, err := net.SplitHostPort(u.Host)
159 if err != nil {
160 return ""
161 }
162 return nodeId
163}
164
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200165// Service is the etcd cluster member service. See package-level documentation
166// for more information.
Serge Bazanskicb883e22020-07-06 17:47:55 +0200167type Service struct {
Serge Bazanskicb883e22020-07-06 17:47:55 +0200168 config *Config
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100169
Serge Bazanski37110c32023-03-01 13:57:27 +0000170 value memory.Value[*Status]
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200171 ca *pki.Certificate
Serge Bazanskicb883e22020-07-06 17:47:55 +0200172}
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200173
Serge Bazanskicb883e22020-07-06 17:47:55 +0200174func New(config Config) *Service {
175 return &Service{
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200176 config: &config,
177 }
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200178}
179
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200180// Run is a Supervisor runnable that starts the etcd member service. It will
181// become healthy once the member joins the cluster successfully.
182func (s *Service) Run(ctx context.Context) error {
183 // Always re-create CA to make sure we don't have PKI state from previous runs.
184 //
185 // TODO(q3k): make the PKI library immune to this misuse.
186 s.ca = pkiCA()
Lorenz Brun52f7f292020-06-24 16:42:02 +0200187
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200188 // Create log converter. This will ingest etcd logs and pipe them out to this
189 // runnable's leveled logging facilities.
Serge Bazanskic1cb37c2023-03-16 17:54:33 +0100190
191 // This is not where etcd will run, but where its log ingestion machinery lives.
192 // This ensures that the (annoying verbose) etcd logs are contained into just
193 // .etcd.
194 err := supervisor.Run(ctx, "etcd", func(ctx context.Context) error {
195 converter := unraw.Converter{
196 Parser: parseEtcdLogEntry,
197 MaximumLineLength: 8192,
198 LeveledLogger: supervisor.Logger(ctx),
199 }
Serge Bazanski5ad31442024-04-17 15:40:52 +0200200 pipe, err := converter.NamedPipeReader(s.config.Ephemeral.ServerLogsFIFO.FullPath())
Serge Bazanskic1cb37c2023-03-16 17:54:33 +0100201 if err != nil {
202 return fmt.Errorf("when creating pipe reader: %w", err)
203 }
204 if err := supervisor.Run(ctx, "piper", pipe); err != nil {
205 return fmt.Errorf("when starting log piper: %w", err)
206 }
207 supervisor.Signal(ctx, supervisor.SignalHealthy)
208 <-ctx.Done()
209 return ctx.Err()
210 })
Serge Bazanski50009e02021-07-07 14:35:27 +0200211 if err != nil {
Serge Bazanskic1cb37c2023-03-16 17:54:33 +0100212 return fmt.Errorf("when starting etcd logger: %w", err)
Serge Bazanski50009e02021-07-07 14:35:27 +0200213 }
214
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200215 // Create autopromoter, which will automatically promote all learners to full
216 // etcd members.
217 if err := supervisor.Run(ctx, "autopromoter", s.autopromoter); err != nil {
218 return fmt.Errorf("when starting autopromtoer: %w", err)
219 }
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200220
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200221 // Create selfupdater, which will perform a one-shot update of this member's
222 // peer address in etcd.
Mateusz Zalega619029b2022-05-05 17:18:26 +0200223 if err := supervisor.Run(ctx, "selfupdater", s.selfupdater); err != nil {
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200224 return fmt.Errorf("when starting selfupdater: %w", err)
225 }
226
227 // Prepare cluster PKI credentials.
228 ppki := s.config.Data.PeerPKI
229 jc := s.config.JoinCluster
230 if jc != nil {
Serge Bazanski97d68082022-06-22 13:15:21 +0200231 supervisor.Logger(ctx).Info("JoinCluster set, writing PPKI data to disk...")
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200232 // For nodes that join an existing cluster, or re-join it, always write whatever
233 // we've been given on startup.
234 if err := ppki.WriteAll(jc.NodeCertificate.Raw, s.config.NodePrivateKey, jc.CACertificate.Raw); err != nil {
235 return fmt.Errorf("when writing credentials for join: %w", err)
236 }
237 if err := s.config.Data.PeerCRL.Write(jc.InitialCRL.Raw, 0400); err != nil {
238 return fmt.Errorf("when writing CRL for join: %w", err)
239 }
240 } else {
241 // For other nodes, we should already have credentials from a previous join, or
242 // a previous bootstrap. If none exist, assume we need to bootstrap these
243 // credentials.
244 //
245 // TODO(q3k): once we have node join (ie. node restart from disk) flow, add a
246 // special configuration marker to prevent spurious bootstraps.
247 absent, err := ppki.AllAbsent()
248 if err != nil {
249 return fmt.Errorf("when checking for PKI file absence: %w", err)
250 }
251 if absent {
Serge Bazanski97d68082022-06-22 13:15:21 +0200252 supervisor.Logger(ctx).Info("PKI data absent, bootstrapping.")
Serge Bazanski5ad31442024-04-17 15:40:52 +0200253 if err := s.bootstrap(ctx); err != nil {
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200254 return fmt.Errorf("bootstrap failed: %w", err)
255 }
256 } else {
257 supervisor.Logger(ctx).Info("PKI data present, not bootstrapping.")
258 }
259 }
260
Serge Bazanskic1cb37c2023-03-16 17:54:33 +0100261 // If we're joining a cluster, make sure that our peers are actually DNS
262 // resolvable. This prevents us from immediately failing due to transient DNS
263 // issues.
264 if jc := s.config.JoinCluster; jc != nil {
265 supervisor.Logger(ctx).Infof("Waiting for initial peers to be DNS resolvable...")
266 startLogging := time.Now().Add(5 * time.Second)
267 for {
268 allOkay := true
269 shouldLog := time.Now().After(startLogging)
270 for _, node := range jc.ExistingNodes {
Tim Windelschmidtd5cabde2024-04-19 02:56:46 +0200271 u, err := url.Parse(node.URL)
Serge Bazanskic1cb37c2023-03-16 17:54:33 +0100272 if err != nil {
273 // Just pretend this node is up. If the URL is really bad, etcd will complain
274 // more clearly than us. This shouldn't happen, anyway.
Tim Windelschmidtd5cabde2024-04-19 02:56:46 +0200275 continue
Serge Bazanskic1cb37c2023-03-16 17:54:33 +0100276 }
277 host := u.Hostname()
Tim Windelschmidtd5cabde2024-04-19 02:56:46 +0200278 if _, err := net.LookupIP(host); err == nil {
Serge Bazanskic1cb37c2023-03-16 17:54:33 +0100279 continue
280 }
281 if shouldLog {
282 supervisor.Logger(ctx).Errorf("Still can't resolve peer %s (%s): %v", node.Name, host, err)
283 }
284 allOkay = false
285 }
286 if allOkay {
287 supervisor.Logger(ctx).Infof("All peers resolvable, continuing startup.")
288 break
289 }
290
291 time.Sleep(100 * time.Millisecond)
292 if shouldLog {
293 startLogging = time.Now().Add(5 * time.Second)
294 }
295 }
296 }
297
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200298 // Start etcd ...
Serge Bazanskic1cb37c2023-03-16 17:54:33 +0100299 supervisor.Logger(ctx).Infof("Starting etcd...")
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200300 cfg := s.config.build(true)
301 server, err := embed.StartEtcd(cfg)
302 if err != nil {
303 return fmt.Errorf("when starting etcd: %w", err)
304 }
305
306 // ... wait for server to be ready...
307 select {
308 case <-ctx.Done():
309 return ctx.Err()
310 case <-server.Server.ReadyNotify():
311 }
312
313 // ... build a client to its' socket...
314 cl, err := s.config.localClient()
315 if err != nil {
316 return fmt.Errorf("getting local client failed: %w", err)
317 }
318
319 // ... and wait until we're not a learner anymore.
320 for {
321 members, err := cl.MemberList(ctx)
322 if err != nil {
323 supervisor.Logger(ctx).Warningf("MemberList failed: %v", err)
324 time.Sleep(time.Second)
325 continue
326 }
327
328 isMember := false
329 for _, member := range members.Members {
330 if member.ID != uint64(server.Server.ID()) {
331 continue
332 }
333 if !member.IsLearner {
334 isMember = true
335 break
336 }
337 }
338 if isMember {
339 break
340 }
341 supervisor.Logger(ctx).Warningf("Still a learner, waiting...")
342 time.Sleep(time.Second)
343 }
344
345 // All done! Report status.
346 supervisor.Logger(ctx).Infof("etcd server ready")
347
348 st := &Status{
Lorenz Brun6211e4d2023-11-14 19:09:40 +0100349 localPeerURL: cfg.AdvertisePeerUrls[0].String(),
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200350 localMemberID: uint64(server.Server.ID()),
351 cl: cl,
352 ca: s.ca,
353 }
Serge Bazanski98a6ccc2023-06-20 13:09:12 +0200354 st2 := *st
355 s.value.Set(&st2)
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200356
357 // Wait until server dies for whatever reason, update status when that
358 // happens.
359 supervisor.Signal(ctx, supervisor.SignalHealthy)
360 select {
361 case err = <-server.Err():
362 err = fmt.Errorf("server returned error: %w", err)
363 case <-ctx.Done():
364 server.Close()
365 err = ctx.Err()
366 }
Serge Bazanski98a6ccc2023-06-20 13:09:12 +0200367
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200368 st.stopped = true
Serge Bazanski98a6ccc2023-06-20 13:09:12 +0200369 st3 := *st
370 s.value.Set(&st3)
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200371 return err
Serge Bazanskicb883e22020-07-06 17:47:55 +0200372}
373
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200374func clientFor(kv *clientv3.Client, parts ...string) (client.Namespaced, error) {
375 var err error
376 namespaced := client.NewLocal(kv)
377 for _, el := range parts {
378 namespaced, err = namespaced.Sub(el)
Serge Bazanskicb883e22020-07-06 17:47:55 +0200379 if err != nil {
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200380 return nil, fmt.Errorf("when getting sub client: %w", err)
Serge Bazanskicb883e22020-07-06 17:47:55 +0200381 }
Serge Bazanskicb883e22020-07-06 17:47:55 +0200382
Serge Bazanskicb883e22020-07-06 17:47:55 +0200383 }
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200384 return namespaced, nil
385}
Serge Bazanskicb883e22020-07-06 17:47:55 +0200386
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200387// bootstrap performs a procedure to resolve the following bootstrap problems:
388// in order to start an etcd server for consensus, we need it to serve over TLS.
389// However, these TLS certificates also need to be stored in etcd so that
390// further certificates can be issued for new nodes.
391//
392// This was previously solved by a using a special PKI/TLS management system that
393// could first create certificates and keys in memory, then only commit them to
394// etcd. However, this ended up being somewhat brittle in the face of startup
395// sequencing issues, so we're now going with a different approach.
396//
397// This function starts an etcd instance first without any PKI/TLS support,
398// without listening on any external port for peer traffic. Once the instance is
399// running, it uses the standard metropolis pki library to create all required
400// data directly in the running etcd instance. It then writes all required
401// startup data (node private key, member certificate, CA certificate) to disk,
402// so that a 'full' etcd instance can be started.
Serge Bazanski5ad31442024-04-17 15:40:52 +0200403func (s *Service) bootstrap(ctx context.Context) error {
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200404 supervisor.Logger(ctx).Infof("Bootstrapping PKI: starting etcd...")
Serge Bazanskicb883e22020-07-06 17:47:55 +0200405
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200406 cfg := s.config.build(false)
407 // This will make etcd create data directories and create a fully new cluster if
408 // needed. If we're restarting due to an error, the old cluster data will still
409 // exist.
410 cfg.ClusterState = "new"
Serge Bazanskicb883e22020-07-06 17:47:55 +0200411
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200412 // Start the bootstrap etcd instance...
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200413 server, err := embed.StartEtcd(cfg)
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100414 if err != nil {
Serge Bazanskib76b8d12023-03-16 00:46:56 +0100415 return fmt.Errorf("failed to start bootstrap etcd: %w", err)
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100416 }
Serge Bazanskib76b8d12023-03-16 00:46:56 +0100417 defer server.Close()
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100418
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200419 // ... wait for it to run ...
Serge Bazanskicb883e22020-07-06 17:47:55 +0200420 select {
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200421 case <-server.Server.ReadyNotify():
Serge Bazanskicb883e22020-07-06 17:47:55 +0200422 case <-ctx.Done():
Lorenz Bruna6223792023-07-31 17:13:11 +0200423 return errors.New("timed out waiting for etcd to become ready")
Lorenz Brun52f7f292020-06-24 16:42:02 +0200424 }
425
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200426 // ... create a client to it ...
427 cl, err := s.config.localClient()
Lorenz Brun52f7f292020-06-24 16:42:02 +0200428 if err != nil {
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200429 return fmt.Errorf("when getting bootstrap client: %w", err)
Serge Bazanskicb883e22020-07-06 17:47:55 +0200430 }
431
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200432 // ... and build PKI there. This is idempotent, so we will never override
433 // anything that's already in the cluster, instead just retrieve it.
434 supervisor.Logger(ctx).Infof("Bootstrapping PKI: etcd running, building PKI...")
435 clPKI, err := clientFor(cl, "namespaced", "etcd-pki")
436 if err != nil {
437 return fmt.Errorf("when getting pki client: %w", err)
Serge Bazanskicb883e22020-07-06 17:47:55 +0200438 }
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200439 defer clPKI.Close()
440 caCert, err := s.ca.Ensure(ctx, clPKI)
441 if err != nil {
442 return fmt.Errorf("failed to ensure CA certificate: %w", err)
Serge Bazanskicb883e22020-07-06 17:47:55 +0200443 }
444
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200445 // If we're running with a test overridden external address (eg. localhost), we
446 // need to also make that part of the member certificate.
447 var extraNames []string
448 if external := s.config.testOverrides.externalAddress; external != "" {
449 extraNames = []string{external}
450 }
451 memberTemplate := pki.Certificate{
Jan Schär39d9c242024-09-24 13:49:55 +0200452 Name: s.config.NodeID,
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200453 Namespace: &pkiNamespace,
454 Issuer: s.ca,
Jan Schär39d9c242024-09-24 13:49:55 +0200455 Template: pkiPeerCertificate(s.config.NodeID, extraNames),
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200456 Mode: pki.CertificateExternal,
457 PublicKey: s.config.nodePublicKey(),
458 }
459 memberCert, err := memberTemplate.Ensure(ctx, clPKI)
460 if err != nil {
461 return fmt.Errorf("failed to ensure member certificate: %w", err)
462 }
Serge Bazanskicb883e22020-07-06 17:47:55 +0200463
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200464 // Retrieve CRL.
465 crlW := s.ca.WatchCRL(clPKI)
466 crl, err := crlW.Get(ctx)
467 if err != nil {
468 return fmt.Errorf("failed to retrieve initial CRL: %w", err)
469 }
470
471 // We have everything we need. Write things to disk.
472 supervisor.Logger(ctx).Infof("Bootstrapping PKI: certificates issued, writing to disk...")
473
474 if err := s.config.Data.PeerPKI.WriteAll(memberCert, s.config.NodePrivateKey, caCert); err != nil {
475 return fmt.Errorf("failed to write bootstrapped certificates: %w", err)
476 }
477 if err := s.config.Data.PeerCRL.Write(crl.Raw, 0400); err != nil {
478 return fmt.Errorf("failed tow rite CRL: %w", err)
479 }
480
481 // Stop the server synchronously (blocking until it's fully shutdown), and
482 // return. The caller can now run the 'full' etcd instance with PKI.
483 supervisor.Logger(ctx).Infof("Bootstrapping PKI: done, stopping server...")
484 server.Close()
Serge Bazanskicb883e22020-07-06 17:47:55 +0200485 return ctx.Err()
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100486}
487
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200488// autopromoter is a runnable which repeatedly attempts to promote etcd learners
489// in the cluster to full followers. This is needed to bring any new cluster
490// members (which are always added as learners) to full membership and make them
491// part of the etcd quorum.
Serge Bazanskicb883e22020-07-06 17:47:55 +0200492func (s *Service) autopromoter(ctx context.Context) error {
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200493 autopromote := func(ctx context.Context, cl *clientv3.Client) {
494 // Only autopromote if our endpoint is a leader. This is a bargain bin version
495 // of leader election: it's simple and cheap, but not very reliable. The most
496 // obvious failure mode is that the instance we contacted isn't a leader by the
497 // time we promote a member, but that's fine - the promotion is idempotent. What
498 // we really use the 'leader election' here for isn't for consistency, but to
499 // prevent the cluster from being hammered by spurious leadership promotion
500 // requests from every etcd member.
501 status, err := cl.Status(ctx, cl.Endpoints()[0])
502 if err != nil {
503 supervisor.Logger(ctx).Warningf("Failed to get endpoint status: %v", err)
Jan Schärb9769672024-04-09 15:31:40 +0200504 return
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200505 }
506 if status.Leader != status.Header.MemberId {
Serge Bazanskicb883e22020-07-06 17:47:55 +0200507 return
508 }
509
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200510 members, err := cl.MemberList(ctx)
511 if err != nil {
512 supervisor.Logger(ctx).Warningf("Failed to list members: %v", err)
513 return
514 }
515 for _, member := range members.Members {
Serge Bazanskicb883e22020-07-06 17:47:55 +0200516 if !member.IsLearner {
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100517 continue
518 }
Jan Schär442cf682024-09-05 18:28:48 +0200519 if member.Name == "" {
520 // If the name is empty, the member has not started.
521 continue
522 }
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200523 // Always call PromoteMember since the metadata necessary to decide if we should
524 // is private. Luckily etcd already does consistency checks internally and will
525 // refuse to promote nodes that aren't connected or are still behind on
526 // transactions.
527 if _, err := cl.MemberPromote(ctx, member.ID); err != nil {
Serge Bazanskic7359672020-10-30 16:38:57 +0100528 supervisor.Logger(ctx).Infof("Failed to promote consensus node %s: %v", member.Name, err)
Serge Bazanskicb883e22020-07-06 17:47:55 +0200529 } else {
Serge Bazanskic7359672020-10-30 16:38:57 +0100530 supervisor.Logger(ctx).Infof("Promoted new consensus node %s", member.Name)
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100531 }
532 }
533 }
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100534
Serge Bazanski37110c32023-03-01 13:57:27 +0000535 w := s.value.Watch()
Serge Bazanskicb883e22020-07-06 17:47:55 +0200536 for {
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200537 st, err := w.Get(ctx)
538 if err != nil {
539 return fmt.Errorf("status get failed: %w", err)
Lorenz Brun52f7f292020-06-24 16:42:02 +0200540 }
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200541 t := time.NewTicker(5 * time.Second)
542 for {
543 autopromote(ctx, st.cl)
544 select {
545 case <-ctx.Done():
546 t.Stop()
547 return ctx.Err()
548 case <-t.C:
Serge Bazanskicb883e22020-07-06 17:47:55 +0200549 }
550 }
551 }
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200552}
553
Serge Bazanski37110c32023-03-01 13:57:27 +0000554func (s *Service) Watch() event.Watcher[*Status] {
555 return s.value.Watch()
556}
557
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200558// selfupdater is a runnable that performs a one-shot (once per Service Run,
559// thus once for each configuration) update of the node's Peer URL in etcd. This
560// is currently only really needed because the first node in the cluster
561// bootstraps itself without any peer URLs at first, and this allows it to then
562// add the peer URLs afterwards. Instead of a runnable, this might as well have
563// been part of the bootstarp logic, but making it a restartable runnable is
564// more robust.
565func (s *Service) selfupdater(ctx context.Context) error {
566 supervisor.Signal(ctx, supervisor.SignalHealthy)
Serge Bazanski37110c32023-03-01 13:57:27 +0000567 w := s.value.Watch()
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200568 for {
569 st, err := w.Get(ctx)
570 if err != nil {
571 return fmt.Errorf("failed to get status: %w", err)
572 }
573
Serge Bazanski5839e972021-11-16 15:46:19 +0100574 if st.localPeerURL != "" {
575 supervisor.Logger(ctx).Infof("Updating local peer URL...")
576 peerURL := st.localPeerURL
577 if _, err := st.cl.MemberUpdate(ctx, st.localMemberID, []string{peerURL}); err != nil {
578 supervisor.Logger(ctx).Warningf("failed to update member: %v", err)
579 time.Sleep(1 * time.Second)
580 continue
581 }
582 } else {
583 supervisor.Logger(ctx).Infof("No local peer URL, not updating.")
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200584 }
585
586 supervisor.Signal(ctx, supervisor.SignalDone)
587 return nil
Serge Bazanskia105db52021-04-12 19:57:46 +0200588 }
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200589}