blob: f00a7a9a26736ee09e6ba38b7cc5e5c96aa45405 [file] [log] [blame]
Tim Windelschmidt6d33a432025-02-04 14:34:25 +01001// Copyright The Monogon Project Authors.
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +02002// SPDX-License-Identifier: Apache-2.0
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +02003
Serge Bazanskif05e80a2021-10-12 11:53:34 +02004// Package consensus implements a runnable that manages an etcd instance which
5// forms part of a Metropolis etcd cluster. This cluster is a foundational
6// building block of Metropolis and its startup/management sequencing needs to
7// be as robust as possible.
Serge Bazanskicb883e22020-07-06 17:47:55 +02008//
Serge Bazanskif05e80a2021-10-12 11:53:34 +02009// Cluster Structure
Serge Bazanskicb883e22020-07-06 17:47:55 +020010//
Serge Bazanskif05e80a2021-10-12 11:53:34 +020011// Each etcd instance listens for two kinds of traffic:
Serge Bazanskicb883e22020-07-06 17:47:55 +020012//
Serge Bazanskif05e80a2021-10-12 11:53:34 +020013// 1. Peer traffic over TLS on a TCP port of the node's main interface. This is
14// where other etcd instances connect to to exchange peer traffic, perform
15// transactions and build quorum. The TLS credentials are stored in a PKI that
16// is managed internally by the consensus runnable, with its state stored in
17// etcd itself.
18//
19// 2. Client traffic over a local domain socket, with access control based on
20// standard Linux user/group permissions. Currently this allows any code running
21// as root on the host namespace full access to the etcd cluster.
22//
23// This means that if code running on a node wishes to perform etcd
24// transactions, it must also run an etcd instance. This colocation of all
25// direct etcd access and the etcd intances themselves effectively delegate all
26// Metropolis control plane functionality to whatever subset of nodes is running
27// consensus and all codes that connects to etcd directly (the Curator).
28//
29// For example, if nodes foo and bar are parts of the control plane, but node
30// worker is not:
31//
32// .---------------------.
33// | node-foo |
34// |---------------------|
35// | .--------------------.
Jan Schär0f8ce4c2025-09-04 13:27:50 +020036// | | etcd |<---etcd/TLS--. (allocs.PortConsensus)
Serge Bazanskif05e80a2021-10-12 11:53:34 +020037// | '--------------------' |
38// | ^ Domain Socket | |
39// | | etcd/plain | |
40// | .--------------------. |
Jan Schär0f8ce4c2025-09-04 13:27:50 +020041// | | curator |<---gRPC/TLS----. (allocs.PortCuratorService)
Serge Bazanskif05e80a2021-10-12 11:53:34 +020042// | '--------------------' | |
43// | ^ Domain Socket | | |
44// | | gRPC/plain | | |
45// | .-----------------. | | |
46// | | node logic | | | |
47// | '-----------------' | | |
48// '---------------------' | |
49// | |
50// .---------------------. | |
51// | node-baz | | |
52// |---------------------| | |
53// | .--------------------. | |
54// | | etcd |<-------------' |
55// | '--------------------' |
56// | ^ Domain Socket | |
57// | | gRPC/plain | |
58// | .--------------------. |
59// | | curator |<---gRPC/TLS----:
60// | '--------------------' |
61// | ... | |
62// '---------------------' |
63// |
64// .---------------------. |
65// | node-worker | |
66// |---------------------| |
67// | .-----------------. | |
68// | | node logic |-------------------'
69// | '-----------------' |
70// '---------------------'
71//
72
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020073package consensus
74
75import (
76 "context"
Serge Bazanskif05e80a2021-10-12 11:53:34 +020077 "crypto/x509"
78 "crypto/x509/pkix"
Lorenz Bruna6223792023-07-31 17:13:11 +020079 "errors"
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020080 "fmt"
Serge Bazanskif05e80a2021-10-12 11:53:34 +020081 "math/big"
Serge Bazanskic1cb37c2023-03-16 17:54:33 +010082 "net"
83 "net/url"
Lorenz Bruna4ea9d02019-10-31 11:40:30 +010084 "time"
85
Jan Schär442cf682024-09-05 18:28:48 +020086 "go.etcd.io/etcd/api/v3/etcdserverpb"
Lorenz Brund13c1c62022-03-30 19:58:58 +020087 clientv3 "go.etcd.io/etcd/client/v3"
88 "go.etcd.io/etcd/server/v3/embed"
Hendrik Hofstadt8efe51e2020-02-28 12:53:41 +010089
Serge Bazanskia105db52021-04-12 19:57:46 +020090 "source.monogon.dev/metropolis/node/core/consensus/client"
Tim Windelschmidt9f21f532024-05-07 15:14:20 +020091 "source.monogon.dev/osbase/event"
92 "source.monogon.dev/osbase/event/memory"
93 "source.monogon.dev/osbase/logtree/unraw"
94 "source.monogon.dev/osbase/pki"
95 "source.monogon.dev/osbase/supervisor"
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020096)
97
Serge Bazanskif05e80a2021-10-12 11:53:34 +020098var (
99 pkiNamespace = pki.Namespaced("/pki/")
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200100)
101
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200102func pkiCA() *pki.Certificate {
103 return &pki.Certificate{
104 Name: "CA",
105 Namespace: &pkiNamespace,
106 Issuer: pki.SelfSigned,
107 Template: x509.Certificate{
108 SerialNumber: big.NewInt(1),
109 Subject: pkix.Name{
110 CommonName: "Metropolis etcd CA Certificate",
111 },
112 IsCA: true,
113 KeyUsage: x509.KeyUsageCertSign | x509.KeyUsageCRLSign | x509.KeyUsageDigitalSignature,
114 ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageOCSPSigning},
115 },
116 }
117}
118
Jan Schär39d9c242024-09-24 13:49:55 +0200119func pkiPeerCertificate(nodeID string, extraNames []string) x509.Certificate {
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200120 return x509.Certificate{
121 Subject: pkix.Name{
Jan Schär39d9c242024-09-24 13:49:55 +0200122 CommonName: nodeID,
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200123 },
124 KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageKeyEncipherment,
125 ExtKeyUsage: []x509.ExtKeyUsage{
126 x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth,
127 },
Jan Schär39d9c242024-09-24 13:49:55 +0200128 DNSNames: append(extraNames, nodeID),
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200129 }
130}
131
Jan Schär442cf682024-09-05 18:28:48 +0200132// GetEtcdMemberNodeId returns the node ID of an etcd member. It works even for
133// members which have not started, where member.Name is empty.
134func GetEtcdMemberNodeId(member *etcdserverpb.Member) string {
135 if member.Name != "" {
136 return member.Name
137 }
138 if len(member.PeerURLs) == 0 {
139 return ""
140 }
141 u, err := url.Parse(member.PeerURLs[0])
142 if err != nil {
143 return ""
144 }
145 nodeId, _, err := net.SplitHostPort(u.Host)
146 if err != nil {
147 return ""
148 }
149 return nodeId
150}
151
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200152// Service is the etcd cluster member service. See package-level documentation
153// for more information.
Serge Bazanskicb883e22020-07-06 17:47:55 +0200154type Service struct {
Serge Bazanskicb883e22020-07-06 17:47:55 +0200155 config *Config
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100156
Serge Bazanski37110c32023-03-01 13:57:27 +0000157 value memory.Value[*Status]
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200158 ca *pki.Certificate
Serge Bazanskicb883e22020-07-06 17:47:55 +0200159}
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200160
Serge Bazanskicb883e22020-07-06 17:47:55 +0200161func New(config Config) *Service {
162 return &Service{
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200163 config: &config,
164 }
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200165}
166
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200167// Run is a Supervisor runnable that starts the etcd member service. It will
168// become healthy once the member joins the cluster successfully.
169func (s *Service) Run(ctx context.Context) error {
170 // Always re-create CA to make sure we don't have PKI state from previous runs.
171 //
172 // TODO(q3k): make the PKI library immune to this misuse.
173 s.ca = pkiCA()
Lorenz Brun52f7f292020-06-24 16:42:02 +0200174
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200175 // Create log converter. This will ingest etcd logs and pipe them out to this
176 // runnable's leveled logging facilities.
Serge Bazanskic1cb37c2023-03-16 17:54:33 +0100177
178 // This is not where etcd will run, but where its log ingestion machinery lives.
179 // This ensures that the (annoying verbose) etcd logs are contained into just
180 // .etcd.
181 err := supervisor.Run(ctx, "etcd", func(ctx context.Context) error {
182 converter := unraw.Converter{
183 Parser: parseEtcdLogEntry,
184 MaximumLineLength: 8192,
185 LeveledLogger: supervisor.Logger(ctx),
186 }
Serge Bazanski5ad31442024-04-17 15:40:52 +0200187 pipe, err := converter.NamedPipeReader(s.config.Ephemeral.ServerLogsFIFO.FullPath())
Serge Bazanskic1cb37c2023-03-16 17:54:33 +0100188 if err != nil {
189 return fmt.Errorf("when creating pipe reader: %w", err)
190 }
191 if err := supervisor.Run(ctx, "piper", pipe); err != nil {
192 return fmt.Errorf("when starting log piper: %w", err)
193 }
194 supervisor.Signal(ctx, supervisor.SignalHealthy)
195 <-ctx.Done()
196 return ctx.Err()
197 })
Serge Bazanski50009e02021-07-07 14:35:27 +0200198 if err != nil {
Serge Bazanskic1cb37c2023-03-16 17:54:33 +0100199 return fmt.Errorf("when starting etcd logger: %w", err)
Serge Bazanski50009e02021-07-07 14:35:27 +0200200 }
201
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200202 // Create autopromoter, which will automatically promote all learners to full
203 // etcd members.
204 if err := supervisor.Run(ctx, "autopromoter", s.autopromoter); err != nil {
205 return fmt.Errorf("when starting autopromtoer: %w", err)
206 }
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200207
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200208 // Create selfupdater, which will perform a one-shot update of this member's
209 // peer address in etcd.
Mateusz Zalega619029b2022-05-05 17:18:26 +0200210 if err := supervisor.Run(ctx, "selfupdater", s.selfupdater); err != nil {
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200211 return fmt.Errorf("when starting selfupdater: %w", err)
212 }
213
214 // Prepare cluster PKI credentials.
215 ppki := s.config.Data.PeerPKI
216 jc := s.config.JoinCluster
217 if jc != nil {
Serge Bazanski97d68082022-06-22 13:15:21 +0200218 supervisor.Logger(ctx).Info("JoinCluster set, writing PPKI data to disk...")
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200219 // For nodes that join an existing cluster, or re-join it, always write whatever
220 // we've been given on startup.
221 if err := ppki.WriteAll(jc.NodeCertificate.Raw, s.config.NodePrivateKey, jc.CACertificate.Raw); err != nil {
222 return fmt.Errorf("when writing credentials for join: %w", err)
223 }
224 if err := s.config.Data.PeerCRL.Write(jc.InitialCRL.Raw, 0400); err != nil {
225 return fmt.Errorf("when writing CRL for join: %w", err)
226 }
227 } else {
228 // For other nodes, we should already have credentials from a previous join, or
229 // a previous bootstrap. If none exist, assume we need to bootstrap these
230 // credentials.
231 //
232 // TODO(q3k): once we have node join (ie. node restart from disk) flow, add a
233 // special configuration marker to prevent spurious bootstraps.
234 absent, err := ppki.AllAbsent()
235 if err != nil {
236 return fmt.Errorf("when checking for PKI file absence: %w", err)
237 }
238 if absent {
Serge Bazanski97d68082022-06-22 13:15:21 +0200239 supervisor.Logger(ctx).Info("PKI data absent, bootstrapping.")
Serge Bazanski5ad31442024-04-17 15:40:52 +0200240 if err := s.bootstrap(ctx); err != nil {
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200241 return fmt.Errorf("bootstrap failed: %w", err)
242 }
243 } else {
244 supervisor.Logger(ctx).Info("PKI data present, not bootstrapping.")
245 }
246 }
247
248 // Start etcd ...
Serge Bazanskic1cb37c2023-03-16 17:54:33 +0100249 supervisor.Logger(ctx).Infof("Starting etcd...")
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200250 cfg := s.config.build(true)
251 server, err := embed.StartEtcd(cfg)
252 if err != nil {
253 return fmt.Errorf("when starting etcd: %w", err)
254 }
255
256 // ... wait for server to be ready...
257 select {
258 case <-ctx.Done():
259 return ctx.Err()
260 case <-server.Server.ReadyNotify():
261 }
262
263 // ... build a client to its' socket...
264 cl, err := s.config.localClient()
265 if err != nil {
266 return fmt.Errorf("getting local client failed: %w", err)
267 }
268
269 // ... and wait until we're not a learner anymore.
270 for {
271 members, err := cl.MemberList(ctx)
272 if err != nil {
273 supervisor.Logger(ctx).Warningf("MemberList failed: %v", err)
274 time.Sleep(time.Second)
275 continue
276 }
277
278 isMember := false
279 for _, member := range members.Members {
Lorenz Brun62229cf2025-07-07 12:47:31 +0200280 if member.ID != uint64(server.Server.MemberID()) {
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200281 continue
282 }
283 if !member.IsLearner {
284 isMember = true
285 break
286 }
287 }
288 if isMember {
289 break
290 }
291 supervisor.Logger(ctx).Warningf("Still a learner, waiting...")
292 time.Sleep(time.Second)
293 }
294
295 // All done! Report status.
296 supervisor.Logger(ctx).Infof("etcd server ready")
297
298 st := &Status{
Lorenz Brun6211e4d2023-11-14 19:09:40 +0100299 localPeerURL: cfg.AdvertisePeerUrls[0].String(),
Lorenz Brun62229cf2025-07-07 12:47:31 +0200300 localMemberID: uint64(server.Server.MemberID()),
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200301 cl: cl,
302 ca: s.ca,
303 }
Serge Bazanski98a6ccc2023-06-20 13:09:12 +0200304 st2 := *st
305 s.value.Set(&st2)
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200306
307 // Wait until server dies for whatever reason, update status when that
308 // happens.
309 supervisor.Signal(ctx, supervisor.SignalHealthy)
310 select {
311 case err = <-server.Err():
312 err = fmt.Errorf("server returned error: %w", err)
313 case <-ctx.Done():
314 server.Close()
315 err = ctx.Err()
316 }
Serge Bazanski98a6ccc2023-06-20 13:09:12 +0200317
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200318 st.stopped = true
Serge Bazanski98a6ccc2023-06-20 13:09:12 +0200319 st3 := *st
320 s.value.Set(&st3)
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200321 return err
Serge Bazanskicb883e22020-07-06 17:47:55 +0200322}
323
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200324func clientFor(kv *clientv3.Client, parts ...string) (client.Namespaced, error) {
325 var err error
326 namespaced := client.NewLocal(kv)
327 for _, el := range parts {
328 namespaced, err = namespaced.Sub(el)
Serge Bazanskicb883e22020-07-06 17:47:55 +0200329 if err != nil {
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200330 return nil, fmt.Errorf("when getting sub client: %w", err)
Serge Bazanskicb883e22020-07-06 17:47:55 +0200331 }
Serge Bazanskicb883e22020-07-06 17:47:55 +0200332
Serge Bazanskicb883e22020-07-06 17:47:55 +0200333 }
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200334 return namespaced, nil
335}
Serge Bazanskicb883e22020-07-06 17:47:55 +0200336
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200337// bootstrap performs a procedure to resolve the following bootstrap problems:
338// in order to start an etcd server for consensus, we need it to serve over TLS.
339// However, these TLS certificates also need to be stored in etcd so that
340// further certificates can be issued for new nodes.
341//
342// This was previously solved by a using a special PKI/TLS management system that
343// could first create certificates and keys in memory, then only commit them to
344// etcd. However, this ended up being somewhat brittle in the face of startup
345// sequencing issues, so we're now going with a different approach.
346//
347// This function starts an etcd instance first without any PKI/TLS support,
348// without listening on any external port for peer traffic. Once the instance is
349// running, it uses the standard metropolis pki library to create all required
350// data directly in the running etcd instance. It then writes all required
351// startup data (node private key, member certificate, CA certificate) to disk,
352// so that a 'full' etcd instance can be started.
Serge Bazanski5ad31442024-04-17 15:40:52 +0200353func (s *Service) bootstrap(ctx context.Context) error {
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200354 supervisor.Logger(ctx).Infof("Bootstrapping PKI: starting etcd...")
Serge Bazanskicb883e22020-07-06 17:47:55 +0200355
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200356 cfg := s.config.build(false)
357 // This will make etcd create data directories and create a fully new cluster if
358 // needed. If we're restarting due to an error, the old cluster data will still
359 // exist.
360 cfg.ClusterState = "new"
Serge Bazanskicb883e22020-07-06 17:47:55 +0200361
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200362 // Start the bootstrap etcd instance...
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200363 server, err := embed.StartEtcd(cfg)
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100364 if err != nil {
Serge Bazanskib76b8d12023-03-16 00:46:56 +0100365 return fmt.Errorf("failed to start bootstrap etcd: %w", err)
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100366 }
Serge Bazanskib76b8d12023-03-16 00:46:56 +0100367 defer server.Close()
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100368
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200369 // ... wait for it to run ...
Serge Bazanskicb883e22020-07-06 17:47:55 +0200370 select {
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200371 case <-server.Server.ReadyNotify():
Serge Bazanskicb883e22020-07-06 17:47:55 +0200372 case <-ctx.Done():
Lorenz Bruna6223792023-07-31 17:13:11 +0200373 return errors.New("timed out waiting for etcd to become ready")
Lorenz Brun52f7f292020-06-24 16:42:02 +0200374 }
375
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200376 // ... create a client to it ...
377 cl, err := s.config.localClient()
Lorenz Brun52f7f292020-06-24 16:42:02 +0200378 if err != nil {
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200379 return fmt.Errorf("when getting bootstrap client: %w", err)
Serge Bazanskicb883e22020-07-06 17:47:55 +0200380 }
381
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200382 // ... and build PKI there. This is idempotent, so we will never override
383 // anything that's already in the cluster, instead just retrieve it.
384 supervisor.Logger(ctx).Infof("Bootstrapping PKI: etcd running, building PKI...")
385 clPKI, err := clientFor(cl, "namespaced", "etcd-pki")
386 if err != nil {
387 return fmt.Errorf("when getting pki client: %w", err)
Serge Bazanskicb883e22020-07-06 17:47:55 +0200388 }
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200389 defer clPKI.Close()
390 caCert, err := s.ca.Ensure(ctx, clPKI)
391 if err != nil {
392 return fmt.Errorf("failed to ensure CA certificate: %w", err)
Serge Bazanskicb883e22020-07-06 17:47:55 +0200393 }
394
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200395 // If we're running with a test overridden external address (eg. localhost), we
396 // need to also make that part of the member certificate.
397 var extraNames []string
398 if external := s.config.testOverrides.externalAddress; external != "" {
399 extraNames = []string{external}
400 }
401 memberTemplate := pki.Certificate{
Jan Schär39d9c242024-09-24 13:49:55 +0200402 Name: s.config.NodeID,
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200403 Namespace: &pkiNamespace,
404 Issuer: s.ca,
Jan Schär39d9c242024-09-24 13:49:55 +0200405 Template: pkiPeerCertificate(s.config.NodeID, extraNames),
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200406 Mode: pki.CertificateExternal,
407 PublicKey: s.config.nodePublicKey(),
408 }
409 memberCert, err := memberTemplate.Ensure(ctx, clPKI)
410 if err != nil {
411 return fmt.Errorf("failed to ensure member certificate: %w", err)
412 }
Serge Bazanskicb883e22020-07-06 17:47:55 +0200413
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200414 // Retrieve CRL.
415 crlW := s.ca.WatchCRL(clPKI)
416 crl, err := crlW.Get(ctx)
417 if err != nil {
418 return fmt.Errorf("failed to retrieve initial CRL: %w", err)
419 }
420
421 // We have everything we need. Write things to disk.
422 supervisor.Logger(ctx).Infof("Bootstrapping PKI: certificates issued, writing to disk...")
423
424 if err := s.config.Data.PeerPKI.WriteAll(memberCert, s.config.NodePrivateKey, caCert); err != nil {
425 return fmt.Errorf("failed to write bootstrapped certificates: %w", err)
426 }
427 if err := s.config.Data.PeerCRL.Write(crl.Raw, 0400); err != nil {
428 return fmt.Errorf("failed tow rite CRL: %w", err)
429 }
430
431 // Stop the server synchronously (blocking until it's fully shutdown), and
432 // return. The caller can now run the 'full' etcd instance with PKI.
433 supervisor.Logger(ctx).Infof("Bootstrapping PKI: done, stopping server...")
434 server.Close()
Serge Bazanskicb883e22020-07-06 17:47:55 +0200435 return ctx.Err()
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100436}
437
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200438// autopromoter is a runnable which repeatedly attempts to promote etcd learners
439// in the cluster to full followers. This is needed to bring any new cluster
440// members (which are always added as learners) to full membership and make them
441// part of the etcd quorum.
Serge Bazanskicb883e22020-07-06 17:47:55 +0200442func (s *Service) autopromoter(ctx context.Context) error {
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200443 autopromote := func(ctx context.Context, cl *clientv3.Client) {
444 // Only autopromote if our endpoint is a leader. This is a bargain bin version
445 // of leader election: it's simple and cheap, but not very reliable. The most
446 // obvious failure mode is that the instance we contacted isn't a leader by the
447 // time we promote a member, but that's fine - the promotion is idempotent. What
448 // we really use the 'leader election' here for isn't for consistency, but to
449 // prevent the cluster from being hammered by spurious leadership promotion
450 // requests from every etcd member.
451 status, err := cl.Status(ctx, cl.Endpoints()[0])
452 if err != nil {
453 supervisor.Logger(ctx).Warningf("Failed to get endpoint status: %v", err)
Jan Schärb9769672024-04-09 15:31:40 +0200454 return
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200455 }
456 if status.Leader != status.Header.MemberId {
Serge Bazanskicb883e22020-07-06 17:47:55 +0200457 return
458 }
459
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200460 members, err := cl.MemberList(ctx)
461 if err != nil {
462 supervisor.Logger(ctx).Warningf("Failed to list members: %v", err)
463 return
464 }
465 for _, member := range members.Members {
Serge Bazanskicb883e22020-07-06 17:47:55 +0200466 if !member.IsLearner {
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100467 continue
468 }
Jan Schär442cf682024-09-05 18:28:48 +0200469 if member.Name == "" {
470 // If the name is empty, the member has not started.
471 continue
472 }
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200473 // Always call PromoteMember since the metadata necessary to decide if we should
474 // is private. Luckily etcd already does consistency checks internally and will
475 // refuse to promote nodes that aren't connected or are still behind on
476 // transactions.
477 if _, err := cl.MemberPromote(ctx, member.ID); err != nil {
Serge Bazanskic7359672020-10-30 16:38:57 +0100478 supervisor.Logger(ctx).Infof("Failed to promote consensus node %s: %v", member.Name, err)
Serge Bazanskicb883e22020-07-06 17:47:55 +0200479 } else {
Serge Bazanskic7359672020-10-30 16:38:57 +0100480 supervisor.Logger(ctx).Infof("Promoted new consensus node %s", member.Name)
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100481 }
482 }
483 }
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100484
Serge Bazanski37110c32023-03-01 13:57:27 +0000485 w := s.value.Watch()
Serge Bazanskicb883e22020-07-06 17:47:55 +0200486 for {
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200487 st, err := w.Get(ctx)
488 if err != nil {
489 return fmt.Errorf("status get failed: %w", err)
Lorenz Brun52f7f292020-06-24 16:42:02 +0200490 }
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200491 t := time.NewTicker(5 * time.Second)
492 for {
493 autopromote(ctx, st.cl)
494 select {
495 case <-ctx.Done():
496 t.Stop()
497 return ctx.Err()
498 case <-t.C:
Serge Bazanskicb883e22020-07-06 17:47:55 +0200499 }
500 }
501 }
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200502}
503
Serge Bazanski37110c32023-03-01 13:57:27 +0000504func (s *Service) Watch() event.Watcher[*Status] {
505 return s.value.Watch()
506}
507
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200508// selfupdater is a runnable that performs a one-shot (once per Service Run,
509// thus once for each configuration) update of the node's Peer URL in etcd. This
510// is currently only really needed because the first node in the cluster
511// bootstraps itself without any peer URLs at first, and this allows it to then
512// add the peer URLs afterwards. Instead of a runnable, this might as well have
513// been part of the bootstarp logic, but making it a restartable runnable is
514// more robust.
515func (s *Service) selfupdater(ctx context.Context) error {
516 supervisor.Signal(ctx, supervisor.SignalHealthy)
Serge Bazanski37110c32023-03-01 13:57:27 +0000517 w := s.value.Watch()
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200518 for {
519 st, err := w.Get(ctx)
520 if err != nil {
521 return fmt.Errorf("failed to get status: %w", err)
522 }
523
Serge Bazanski5839e972021-11-16 15:46:19 +0100524 if st.localPeerURL != "" {
525 supervisor.Logger(ctx).Infof("Updating local peer URL...")
526 peerURL := st.localPeerURL
527 if _, err := st.cl.MemberUpdate(ctx, st.localMemberID, []string{peerURL}); err != nil {
528 supervisor.Logger(ctx).Warningf("failed to update member: %v", err)
529 time.Sleep(1 * time.Second)
530 continue
531 }
532 } else {
533 supervisor.Logger(ctx).Infof("No local peer URL, not updating.")
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200534 }
535
536 supervisor.Signal(ctx, supervisor.SignalDone)
537 return nil
Serge Bazanskia105db52021-04-12 19:57:46 +0200538 }
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200539}