blob: b570c69fdbf2c498d81371e7ad9f59252ff1c1e2 [file] [log] [blame]
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +02001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
Serge Bazanskif05e80a2021-10-12 11:53:34 +020017// Package consensus implements a runnable that manages an etcd instance which
18// forms part of a Metropolis etcd cluster. This cluster is a foundational
19// building block of Metropolis and its startup/management sequencing needs to
20// be as robust as possible.
Serge Bazanskicb883e22020-07-06 17:47:55 +020021//
Serge Bazanskif05e80a2021-10-12 11:53:34 +020022// Cluster Structure
Serge Bazanskicb883e22020-07-06 17:47:55 +020023//
Serge Bazanskif05e80a2021-10-12 11:53:34 +020024// Each etcd instance listens for two kinds of traffic:
Serge Bazanskicb883e22020-07-06 17:47:55 +020025//
Serge Bazanskif05e80a2021-10-12 11:53:34 +020026// 1. Peer traffic over TLS on a TCP port of the node's main interface. This is
27// where other etcd instances connect to to exchange peer traffic, perform
28// transactions and build quorum. The TLS credentials are stored in a PKI that
29// is managed internally by the consensus runnable, with its state stored in
30// etcd itself.
31//
32// 2. Client traffic over a local domain socket, with access control based on
33// standard Linux user/group permissions. Currently this allows any code running
34// as root on the host namespace full access to the etcd cluster.
35//
36// This means that if code running on a node wishes to perform etcd
37// transactions, it must also run an etcd instance. This colocation of all
38// direct etcd access and the etcd intances themselves effectively delegate all
39// Metropolis control plane functionality to whatever subset of nodes is running
40// consensus and all codes that connects to etcd directly (the Curator).
41//
42// For example, if nodes foo and bar are parts of the control plane, but node
43// worker is not:
44//
45// .---------------------.
46// | node-foo |
47// |---------------------|
48// | .--------------------.
49// | | etcd |<---etcd/TLS--. (node.ConsensusPort)
50// | '--------------------' |
51// | ^ Domain Socket | |
52// | | etcd/plain | |
53// | .--------------------. |
54// | | curator |<---gRPC/TLS----. (node.CuratorServicePort)
55// | '--------------------' | |
56// | ^ Domain Socket | | |
57// | | gRPC/plain | | |
58// | .-----------------. | | |
59// | | node logic | | | |
60// | '-----------------' | | |
61// '---------------------' | |
62// | |
63// .---------------------. | |
64// | node-baz | | |
65// |---------------------| | |
66// | .--------------------. | |
67// | | etcd |<-------------' |
68// | '--------------------' |
69// | ^ Domain Socket | |
70// | | gRPC/plain | |
71// | .--------------------. |
72// | | curator |<---gRPC/TLS----:
73// | '--------------------' |
74// | ... | |
75// '---------------------' |
76// |
77// .---------------------. |
78// | node-worker | |
79// |---------------------| |
80// | .-----------------. | |
81// | | node logic |-------------------'
82// | '-----------------' |
83// '---------------------'
84//
85
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020086package consensus
87
88import (
89 "context"
Serge Bazanskif05e80a2021-10-12 11:53:34 +020090 "crypto/ed25519"
91 "crypto/x509"
92 "crypto/x509/pkix"
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020093 "fmt"
Serge Bazanskif05e80a2021-10-12 11:53:34 +020094 "math/big"
Lorenz Bruna4ea9d02019-10-31 11:40:30 +010095 "time"
96
Lorenz Brund13c1c62022-03-30 19:58:58 +020097 clientv3 "go.etcd.io/etcd/client/v3"
98 "go.etcd.io/etcd/server/v3/embed"
Hendrik Hofstadt8efe51e2020-02-28 12:53:41 +010099
Serge Bazanskia105db52021-04-12 19:57:46 +0200100 "source.monogon.dev/metropolis/node/core/consensus/client"
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200101 "source.monogon.dev/metropolis/node/core/identity"
102 "source.monogon.dev/metropolis/pkg/event/memory"
Serge Bazanski50009e02021-07-07 14:35:27 +0200103 "source.monogon.dev/metropolis/pkg/logtree/unraw"
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200104 "source.monogon.dev/metropolis/pkg/pki"
Serge Bazanski31370b02021-01-07 16:31:14 +0100105 "source.monogon.dev/metropolis/pkg/supervisor"
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200106)
107
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200108var (
109 pkiNamespace = pki.Namespaced("/pki/")
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200110)
111
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200112func pkiCA() *pki.Certificate {
113 return &pki.Certificate{
114 Name: "CA",
115 Namespace: &pkiNamespace,
116 Issuer: pki.SelfSigned,
117 Template: x509.Certificate{
118 SerialNumber: big.NewInt(1),
119 Subject: pkix.Name{
120 CommonName: "Metropolis etcd CA Certificate",
121 },
122 IsCA: true,
123 KeyUsage: x509.KeyUsageCertSign | x509.KeyUsageCRLSign | x509.KeyUsageDigitalSignature,
124 ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageOCSPSigning},
125 },
126 }
127}
128
129func pkiPeerCertificate(pubkey ed25519.PublicKey, extraNames []string) x509.Certificate {
130 return x509.Certificate{
131 Subject: pkix.Name{
132 CommonName: identity.NodeID(pubkey),
133 },
134 KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageKeyEncipherment,
135 ExtKeyUsage: []x509.ExtKeyUsage{
136 x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth,
137 },
138 DNSNames: append(extraNames, identity.NodeID(pubkey)),
139 }
140}
141
142// Service is the etcd cluster member service. See package-level documentation
143// for more information.
Serge Bazanskicb883e22020-07-06 17:47:55 +0200144type Service struct {
Serge Bazanskicb883e22020-07-06 17:47:55 +0200145 config *Config
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100146
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200147 value memory.Value
148 ca *pki.Certificate
Serge Bazanskicb883e22020-07-06 17:47:55 +0200149}
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200150
Serge Bazanskicb883e22020-07-06 17:47:55 +0200151func New(config Config) *Service {
152 return &Service{
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200153 config: &config,
154 }
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200155}
156
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200157// Run is a Supervisor runnable that starts the etcd member service. It will
158// become healthy once the member joins the cluster successfully.
159func (s *Service) Run(ctx context.Context) error {
160 // Always re-create CA to make sure we don't have PKI state from previous runs.
161 //
162 // TODO(q3k): make the PKI library immune to this misuse.
163 s.ca = pkiCA()
Lorenz Brun52f7f292020-06-24 16:42:02 +0200164
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200165 // Create log converter. This will ingest etcd logs and pipe them out to this
166 // runnable's leveled logging facilities.
167 //
168 // TODO(q3k): add support for streaming to a sub-logger in the tree to get
169 // cleaner logs.
Serge Bazanski50009e02021-07-07 14:35:27 +0200170 converter := unraw.Converter{
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200171 Parser: parseEtcdLogEntry,
Serge Bazanski50009e02021-07-07 14:35:27 +0200172 MaximumLineLength: 8192,
173 LeveledLogger: supervisor.Logger(ctx),
174 }
175 fifoPath := s.config.Ephemeral.ServerLogsFIFO.FullPath()
176 pipe, err := converter.NamedPipeReader(fifoPath)
177 if err != nil {
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200178 return fmt.Errorf("when creating pipe reader: %w", err)
Serge Bazanski50009e02021-07-07 14:35:27 +0200179 }
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200180 if err := supervisor.Run(ctx, "piper", pipe); err != nil {
181 return fmt.Errorf("when starting log piper: %w", err)
Serge Bazanski50009e02021-07-07 14:35:27 +0200182 }
183
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200184 // Create autopromoter, which will automatically promote all learners to full
185 // etcd members.
186 if err := supervisor.Run(ctx, "autopromoter", s.autopromoter); err != nil {
187 return fmt.Errorf("when starting autopromtoer: %w", err)
188 }
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200189
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200190 // Create selfupdater, which will perform a one-shot update of this member's
191 // peer address in etcd.
Mateusz Zalega619029b2022-05-05 17:18:26 +0200192 if err := supervisor.Run(ctx, "selfupdater", s.selfupdater); err != nil {
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200193 return fmt.Errorf("when starting selfupdater: %w", err)
194 }
195
196 // Prepare cluster PKI credentials.
197 ppki := s.config.Data.PeerPKI
198 jc := s.config.JoinCluster
199 if jc != nil {
Serge Bazanski97d68082022-06-22 13:15:21 +0200200 supervisor.Logger(ctx).Info("JoinCluster set, writing PPKI data to disk...")
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200201 // For nodes that join an existing cluster, or re-join it, always write whatever
202 // we've been given on startup.
203 if err := ppki.WriteAll(jc.NodeCertificate.Raw, s.config.NodePrivateKey, jc.CACertificate.Raw); err != nil {
204 return fmt.Errorf("when writing credentials for join: %w", err)
205 }
206 if err := s.config.Data.PeerCRL.Write(jc.InitialCRL.Raw, 0400); err != nil {
207 return fmt.Errorf("when writing CRL for join: %w", err)
208 }
209 } else {
210 // For other nodes, we should already have credentials from a previous join, or
211 // a previous bootstrap. If none exist, assume we need to bootstrap these
212 // credentials.
213 //
214 // TODO(q3k): once we have node join (ie. node restart from disk) flow, add a
215 // special configuration marker to prevent spurious bootstraps.
216 absent, err := ppki.AllAbsent()
217 if err != nil {
218 return fmt.Errorf("when checking for PKI file absence: %w", err)
219 }
220 if absent {
Serge Bazanski97d68082022-06-22 13:15:21 +0200221 supervisor.Logger(ctx).Info("PKI data absent, bootstrapping.")
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200222 if err := s.bootstrap(ctx, fifoPath); err != nil {
223 return fmt.Errorf("bootstrap failed: %w", err)
224 }
225 } else {
226 supervisor.Logger(ctx).Info("PKI data present, not bootstrapping.")
227 }
228 }
229
230 // Start etcd ...
231 cfg := s.config.build(true)
232 server, err := embed.StartEtcd(cfg)
233 if err != nil {
234 return fmt.Errorf("when starting etcd: %w", err)
235 }
236
237 // ... wait for server to be ready...
238 select {
239 case <-ctx.Done():
240 return ctx.Err()
241 case <-server.Server.ReadyNotify():
242 }
243
244 // ... build a client to its' socket...
245 cl, err := s.config.localClient()
246 if err != nil {
247 return fmt.Errorf("getting local client failed: %w", err)
248 }
249
250 // ... and wait until we're not a learner anymore.
251 for {
252 members, err := cl.MemberList(ctx)
253 if err != nil {
254 supervisor.Logger(ctx).Warningf("MemberList failed: %v", err)
255 time.Sleep(time.Second)
256 continue
257 }
258
259 isMember := false
260 for _, member := range members.Members {
261 if member.ID != uint64(server.Server.ID()) {
262 continue
263 }
264 if !member.IsLearner {
265 isMember = true
266 break
267 }
268 }
269 if isMember {
270 break
271 }
272 supervisor.Logger(ctx).Warningf("Still a learner, waiting...")
273 time.Sleep(time.Second)
274 }
275
276 // All done! Report status.
277 supervisor.Logger(ctx).Infof("etcd server ready")
278
279 st := &Status{
280 localPeerURL: cfg.APUrls[0].String(),
281 localMemberID: uint64(server.Server.ID()),
282 cl: cl,
283 ca: s.ca,
284 }
285 s.value.Set(st)
286
287 // Wait until server dies for whatever reason, update status when that
288 // happens.
289 supervisor.Signal(ctx, supervisor.SignalHealthy)
290 select {
291 case err = <-server.Err():
292 err = fmt.Errorf("server returned error: %w", err)
293 case <-ctx.Done():
294 server.Close()
295 err = ctx.Err()
296 }
297 st.stopped = true
298 s.value.Set(st)
299 return err
Serge Bazanskicb883e22020-07-06 17:47:55 +0200300}
301
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200302func clientFor(kv *clientv3.Client, parts ...string) (client.Namespaced, error) {
303 var err error
304 namespaced := client.NewLocal(kv)
305 for _, el := range parts {
306 namespaced, err = namespaced.Sub(el)
Serge Bazanskicb883e22020-07-06 17:47:55 +0200307 if err != nil {
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200308 return nil, fmt.Errorf("when getting sub client: %w", err)
Serge Bazanskicb883e22020-07-06 17:47:55 +0200309 }
Serge Bazanskicb883e22020-07-06 17:47:55 +0200310
Serge Bazanskicb883e22020-07-06 17:47:55 +0200311 }
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200312 return namespaced, nil
313}
Serge Bazanskicb883e22020-07-06 17:47:55 +0200314
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200315// bootstrap performs a procedure to resolve the following bootstrap problems:
316// in order to start an etcd server for consensus, we need it to serve over TLS.
317// However, these TLS certificates also need to be stored in etcd so that
318// further certificates can be issued for new nodes.
319//
320// This was previously solved by a using a special PKI/TLS management system that
321// could first create certificates and keys in memory, then only commit them to
322// etcd. However, this ended up being somewhat brittle in the face of startup
323// sequencing issues, so we're now going with a different approach.
324//
325// This function starts an etcd instance first without any PKI/TLS support,
326// without listening on any external port for peer traffic. Once the instance is
327// running, it uses the standard metropolis pki library to create all required
328// data directly in the running etcd instance. It then writes all required
329// startup data (node private key, member certificate, CA certificate) to disk,
330// so that a 'full' etcd instance can be started.
331func (s *Service) bootstrap(ctx context.Context, fifoPath string) error {
332 supervisor.Logger(ctx).Infof("Bootstrapping PKI: starting etcd...")
Serge Bazanskicb883e22020-07-06 17:47:55 +0200333
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200334 cfg := s.config.build(false)
335 // This will make etcd create data directories and create a fully new cluster if
336 // needed. If we're restarting due to an error, the old cluster data will still
337 // exist.
338 cfg.ClusterState = "new"
Serge Bazanskicb883e22020-07-06 17:47:55 +0200339
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200340 // Start the bootstrap etcd instance...
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200341 server, err := embed.StartEtcd(cfg)
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100342 if err != nil {
Serge Bazanskicb883e22020-07-06 17:47:55 +0200343 return fmt.Errorf("failed to start etcd: %w", err)
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100344 }
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100345
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200346 // ... wait for it to run ...
Serge Bazanskicb883e22020-07-06 17:47:55 +0200347 select {
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200348 case <-server.Server.ReadyNotify():
Serge Bazanskicb883e22020-07-06 17:47:55 +0200349 case <-ctx.Done():
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200350 return fmt.Errorf("when waiting for bootstrap etcd: %w", err)
Lorenz Brun52f7f292020-06-24 16:42:02 +0200351 }
352
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200353 // ... create a client to it ...
354 cl, err := s.config.localClient()
Lorenz Brun52f7f292020-06-24 16:42:02 +0200355 if err != nil {
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200356 return fmt.Errorf("when getting bootstrap client: %w", err)
Serge Bazanskicb883e22020-07-06 17:47:55 +0200357 }
358
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200359 // ... and build PKI there. This is idempotent, so we will never override
360 // anything that's already in the cluster, instead just retrieve it.
361 supervisor.Logger(ctx).Infof("Bootstrapping PKI: etcd running, building PKI...")
362 clPKI, err := clientFor(cl, "namespaced", "etcd-pki")
363 if err != nil {
364 return fmt.Errorf("when getting pki client: %w", err)
Serge Bazanskicb883e22020-07-06 17:47:55 +0200365 }
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200366 defer clPKI.Close()
367 caCert, err := s.ca.Ensure(ctx, clPKI)
368 if err != nil {
369 return fmt.Errorf("failed to ensure CA certificate: %w", err)
Serge Bazanskicb883e22020-07-06 17:47:55 +0200370 }
371
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200372 // If we're running with a test overridden external address (eg. localhost), we
373 // need to also make that part of the member certificate.
374 var extraNames []string
375 if external := s.config.testOverrides.externalAddress; external != "" {
376 extraNames = []string{external}
377 }
378 memberTemplate := pki.Certificate{
379 Name: identity.NodeID(s.config.nodePublicKey()),
380 Namespace: &pkiNamespace,
381 Issuer: s.ca,
382 Template: pkiPeerCertificate(s.config.nodePublicKey(), extraNames),
383 Mode: pki.CertificateExternal,
384 PublicKey: s.config.nodePublicKey(),
385 }
386 memberCert, err := memberTemplate.Ensure(ctx, clPKI)
387 if err != nil {
388 return fmt.Errorf("failed to ensure member certificate: %w", err)
389 }
Serge Bazanskicb883e22020-07-06 17:47:55 +0200390
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200391 // Retrieve CRL.
392 crlW := s.ca.WatchCRL(clPKI)
393 crl, err := crlW.Get(ctx)
394 if err != nil {
395 return fmt.Errorf("failed to retrieve initial CRL: %w", err)
396 }
397
398 // We have everything we need. Write things to disk.
399 supervisor.Logger(ctx).Infof("Bootstrapping PKI: certificates issued, writing to disk...")
400
401 if err := s.config.Data.PeerPKI.WriteAll(memberCert, s.config.NodePrivateKey, caCert); err != nil {
402 return fmt.Errorf("failed to write bootstrapped certificates: %w", err)
403 }
404 if err := s.config.Data.PeerCRL.Write(crl.Raw, 0400); err != nil {
405 return fmt.Errorf("failed tow rite CRL: %w", err)
406 }
407
408 // Stop the server synchronously (blocking until it's fully shutdown), and
409 // return. The caller can now run the 'full' etcd instance with PKI.
410 supervisor.Logger(ctx).Infof("Bootstrapping PKI: done, stopping server...")
411 server.Close()
Serge Bazanskicb883e22020-07-06 17:47:55 +0200412 return ctx.Err()
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100413}
414
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200415// autopromoter is a runnable which repeatedly attempts to promote etcd learners
416// in the cluster to full followers. This is needed to bring any new cluster
417// members (which are always added as learners) to full membership and make them
418// part of the etcd quorum.
Serge Bazanskicb883e22020-07-06 17:47:55 +0200419func (s *Service) autopromoter(ctx context.Context) error {
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200420 autopromote := func(ctx context.Context, cl *clientv3.Client) {
421 // Only autopromote if our endpoint is a leader. This is a bargain bin version
422 // of leader election: it's simple and cheap, but not very reliable. The most
423 // obvious failure mode is that the instance we contacted isn't a leader by the
424 // time we promote a member, but that's fine - the promotion is idempotent. What
425 // we really use the 'leader election' here for isn't for consistency, but to
426 // prevent the cluster from being hammered by spurious leadership promotion
427 // requests from every etcd member.
428 status, err := cl.Status(ctx, cl.Endpoints()[0])
429 if err != nil {
430 supervisor.Logger(ctx).Warningf("Failed to get endpoint status: %v", err)
431 }
432 if status.Leader != status.Header.MemberId {
Serge Bazanskicb883e22020-07-06 17:47:55 +0200433 return
434 }
435
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200436 members, err := cl.MemberList(ctx)
437 if err != nil {
438 supervisor.Logger(ctx).Warningf("Failed to list members: %v", err)
439 return
440 }
441 for _, member := range members.Members {
Serge Bazanskicb883e22020-07-06 17:47:55 +0200442 if !member.IsLearner {
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100443 continue
444 }
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200445 // Always call PromoteMember since the metadata necessary to decide if we should
446 // is private. Luckily etcd already does consistency checks internally and will
447 // refuse to promote nodes that aren't connected or are still behind on
448 // transactions.
449 if _, err := cl.MemberPromote(ctx, member.ID); err != nil {
Serge Bazanskic7359672020-10-30 16:38:57 +0100450 supervisor.Logger(ctx).Infof("Failed to promote consensus node %s: %v", member.Name, err)
Serge Bazanskicb883e22020-07-06 17:47:55 +0200451 } else {
Serge Bazanskic7359672020-10-30 16:38:57 +0100452 supervisor.Logger(ctx).Infof("Promoted new consensus node %s", member.Name)
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100453 }
454 }
455 }
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100456
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200457 w := s.Watch()
Serge Bazanskicb883e22020-07-06 17:47:55 +0200458 for {
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200459 st, err := w.Get(ctx)
460 if err != nil {
461 return fmt.Errorf("status get failed: %w", err)
Lorenz Brun52f7f292020-06-24 16:42:02 +0200462 }
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200463 t := time.NewTicker(5 * time.Second)
464 for {
465 autopromote(ctx, st.cl)
466 select {
467 case <-ctx.Done():
468 t.Stop()
469 return ctx.Err()
470 case <-t.C:
Serge Bazanskicb883e22020-07-06 17:47:55 +0200471 }
472 }
473 }
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200474}
475
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200476// selfupdater is a runnable that performs a one-shot (once per Service Run,
477// thus once for each configuration) update of the node's Peer URL in etcd. This
478// is currently only really needed because the first node in the cluster
479// bootstraps itself without any peer URLs at first, and this allows it to then
480// add the peer URLs afterwards. Instead of a runnable, this might as well have
481// been part of the bootstarp logic, but making it a restartable runnable is
482// more robust.
483func (s *Service) selfupdater(ctx context.Context) error {
484 supervisor.Signal(ctx, supervisor.SignalHealthy)
485 w := s.Watch()
486 for {
487 st, err := w.Get(ctx)
488 if err != nil {
489 return fmt.Errorf("failed to get status: %w", err)
490 }
491
Serge Bazanski5839e972021-11-16 15:46:19 +0100492 if st.localPeerURL != "" {
493 supervisor.Logger(ctx).Infof("Updating local peer URL...")
494 peerURL := st.localPeerURL
495 if _, err := st.cl.MemberUpdate(ctx, st.localMemberID, []string{peerURL}); err != nil {
496 supervisor.Logger(ctx).Warningf("failed to update member: %v", err)
497 time.Sleep(1 * time.Second)
498 continue
499 }
500 } else {
501 supervisor.Logger(ctx).Infof("No local peer URL, not updating.")
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200502 }
503
504 supervisor.Signal(ctx, supervisor.SignalDone)
505 return nil
Serge Bazanskia105db52021-04-12 19:57:46 +0200506 }
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200507}