blob: 94d84b2fcabe52940f654fd3c2b76bdbefddbb28 [file] [log] [blame]
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +02001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
Serge Bazanskicb883e22020-07-06 17:47:55 +020017// Package consensus implements a managed etcd cluster member service, with a self-hosted CA system for issuing peer
18// certificates. Currently each Smalltown node runs an etcd member, and connects to the etcd member locally over a unix
19// domain socket.
20//
21// The service supports two modes of startup:
22// - initializing a new cluster, by bootstrapping the CA in memory, starting a cluster, committing the CA to etcd
23// afterwards, and saving the new node's certificate to local storage
24// - joining an existing cluster, using certificates from local storage and loading the CA from etcd. This flow is also
25// used when the node joins a cluster for the first time (then the certificates required must be provisioned
26// externally before starting the consensus service).
27//
28// Regardless of how the etcd member service was started, the resulting running service is further managed and used
29// in the same way.
30//
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020031package consensus
32
33import (
34 "context"
Lorenz Bruna4ea9d02019-10-31 11:40:30 +010035 "encoding/pem"
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020036 "fmt"
Lorenz Brun52f7f292020-06-24 16:42:02 +020037 "net"
Lorenz Bruna4ea9d02019-10-31 11:40:30 +010038 "net/url"
Serge Bazanskicb883e22020-07-06 17:47:55 +020039 "sync"
Lorenz Bruna4ea9d02019-10-31 11:40:30 +010040 "time"
41
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020042 "go.etcd.io/etcd/clientv3"
43 "go.etcd.io/etcd/clientv3/namespace"
44 "go.etcd.io/etcd/embed"
Lorenz Brunfc5dbc62020-05-28 12:18:07 +020045 "go.uber.org/atomic"
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020046 "go.uber.org/zap"
Lorenz Brun60febd92020-05-07 14:08:18 +020047 "go.uber.org/zap/zapcore"
Hendrik Hofstadt8efe51e2020-02-28 12:53:41 +010048
Lorenz Brunfc5dbc62020-05-28 12:18:07 +020049 "git.monogon.dev/source/nexantic.git/core/internal/common"
Serge Bazanskicb883e22020-07-06 17:47:55 +020050 "git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
Hendrik Hofstadt8efe51e2020-02-28 12:53:41 +010051 "git.monogon.dev/source/nexantic.git/core/internal/consensus/ca"
Serge Bazanskicb883e22020-07-06 17:47:55 +020052 "git.monogon.dev/source/nexantic.git/core/internal/localstorage"
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020053)
54
55const (
56 DefaultClusterToken = "SIGNOS"
57 DefaultLogger = "zap"
58)
59
Serge Bazanskicb883e22020-07-06 17:47:55 +020060// Service is the etcd cluster member service.
61type Service struct {
62 // The configuration with which the service was started. This is immutable.
63 config *Config
Lorenz Bruna4ea9d02019-10-31 11:40:30 +010064
Serge Bazanskicb883e22020-07-06 17:47:55 +020065 // stateMu guards state. This is locked internally on public methods of Service that require access to state. The
66 // state might be recreated on service restart.
67 stateMu sync.Mutex
68 state *state
69}
Lorenz Brun6e8f69c2019-11-18 10:44:24 +010070
Serge Bazanskicb883e22020-07-06 17:47:55 +020071// state is the runtime state of a running etcd member.
72type state struct {
73 etcd *embed.Etcd
74 ready atomic.Bool
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020075
Serge Bazanskicb883e22020-07-06 17:47:55 +020076 ca *ca.CA
77 // cl is an etcd client that loops back to the localy running etcd server. This runs over the Client unix domain
78 // socket that etcd starts.
79 cl *clientv3.Client
80}
Leopold Schabel68c58752019-11-14 21:00:59 +010081
Serge Bazanskicb883e22020-07-06 17:47:55 +020082type Config struct {
83 // Data directory (persistent, encrypted storage) for etcd.
84 Data *localstorage.DataEtcdDirectory
85 // Ephemeral directory for etcd.
86 Ephemeral *localstorage.EphemeralConsensusDirectory
Leopold Schabel68c58752019-11-14 21:00:59 +010087
Serge Bazanskicb883e22020-07-06 17:47:55 +020088 // Name is the cluster name. This must be the same amongst all etcd members within one cluster.
89 Name string
90 // NewCluster selects whether the etcd member will start a new cluster and bootstrap a CA and the first member
91 // certificate, or load existing PKI certificates from disk.
92 NewCluster bool
93 // InitialCluster sets the initial cluster peer URLs when NewCluster is set, and is ignored otherwise. Usually this
94 // will be just the new, single server, and more members will be added later.
95 InitialCluster string
96 // ExternalHost is the IP address or hostname at which this cluster member is reachable to other cluster members.
97 ExternalHost string
98 // ListenHost is the IP address or hostname at which this cluster member will listen.
99 ListenHost string
100 // Port is the port at which this cluster member will listen for other members. If zero, defaults to the global
101 // Smalltown setting.
102 Port int
103}
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200104
Serge Bazanskicb883e22020-07-06 17:47:55 +0200105func New(config Config) *Service {
106 return &Service{
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200107 config: &config,
108 }
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200109}
110
Serge Bazanskicb883e22020-07-06 17:47:55 +0200111// configure transforms the service configuration into an embedded etcd configuration. This is pure and side effect
112// free.
113func (s *Service) configure(ctx context.Context) (*embed.Config, error) {
114 if err := s.config.Ephemeral.MkdirAll(0700); err != nil {
115 return nil, fmt.Errorf("failed to create ephemeral directory: %w", err)
116 }
117 if err := s.config.Data.MkdirAll(0700); err != nil {
118 return nil, fmt.Errorf("failed to create data directory: %w", err)
119 }
Lorenz Brun52f7f292020-06-24 16:42:02 +0200120
Serge Bazanskicb883e22020-07-06 17:47:55 +0200121 port := s.config.Port
122 if port == 0 {
123 port = common.ConsensusPort
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200124 }
125
126 cfg := embed.NewConfig()
127
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200128 cfg.Name = s.config.Name
Serge Bazanskicb883e22020-07-06 17:47:55 +0200129 cfg.Dir = s.config.Data.Data.FullPath()
130 cfg.InitialClusterToken = DefaultClusterToken
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200131
Serge Bazanskicb883e22020-07-06 17:47:55 +0200132 cfg.PeerTLSInfo.CertFile = s.config.Data.PeerPKI.Certificate.FullPath()
133 cfg.PeerTLSInfo.KeyFile = s.config.Data.PeerPKI.Key.FullPath()
134 cfg.PeerTLSInfo.TrustedCAFile = s.config.Data.PeerPKI.CACertificate.FullPath()
135 cfg.PeerTLSInfo.ClientCertAuth = true
136 cfg.PeerTLSInfo.CRLFile = s.config.Data.PeerCRL.FullPath()
137
138 cfg.LCUrls = []url.URL{{
139 Scheme: "unix",
140 Path: s.config.Ephemeral.ClientSocket.FullPath() + ":0",
141 }}
142 cfg.ACUrls = []url.URL{}
143 cfg.LPUrls = []url.URL{{
144 Scheme: "https",
145 Host: fmt.Sprintf("%s:%d", s.config.ListenHost, port),
146 }}
147 cfg.APUrls = []url.URL{{
148 Scheme: "https",
149 Host: fmt.Sprintf("%s:%d", s.config.ExternalHost, port),
150 }}
151
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200152 if s.config.NewCluster {
153 cfg.ClusterState = "new"
154 cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
155 } else if s.config.InitialCluster != "" {
156 cfg.ClusterState = "existing"
157 cfg.InitialCluster = s.config.InitialCluster
158 }
159
Serge Bazanskicb883e22020-07-06 17:47:55 +0200160 logger := supervisor.Logger(ctx)
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200161 cfg.Logger = DefaultLogger
Lorenz Brun60febd92020-05-07 14:08:18 +0200162 cfg.ZapLoggerBuilder = embed.NewZapCoreLoggerBuilder(
Serge Bazanskicb883e22020-07-06 17:47:55 +0200163 logger.With(zap.String("component", "etcd")).WithOptions(zap.IncreaseLevel(zapcore.WarnLevel)),
164 logger.Core(),
Lorenz Brun60febd92020-05-07 14:08:18 +0200165 nil,
166 )
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200167
Serge Bazanskicb883e22020-07-06 17:47:55 +0200168 return cfg, nil
169}
170
171// Run is a Supervisor runnable that starts the etcd member service. It will become healthy once the member joins the
172// cluster successfully.
173func (s *Service) Run(ctx context.Context) error {
174 st := &state{
175 ready: *atomic.NewBool(false),
176 }
177 s.stateMu.Lock()
178 s.state = st
179 s.stateMu.Unlock()
180
181 if s.config.NewCluster {
182 // Expect certificate to be absent from disk.
183 absent, err := s.config.Data.PeerPKI.AllAbsent()
184 if err != nil {
185 return fmt.Errorf("checking certificate existence: %w", err)
186 }
187 if !absent {
188 return fmt.Errorf("want new cluster, but certificates already exist on disk")
189 }
190
191 // Generate CA, keep in memory, write it down in etcd later.
192 st.ca, err = ca.New("Smalltown etcd peer Root CA")
193 if err != nil {
194 return fmt.Errorf("when creating new cluster's peer CA: %w", err)
195 }
196
197 ip := net.ParseIP(s.config.ExternalHost)
198 if ip == nil {
199 return fmt.Errorf("configued external host is not an IP address (got %q)", s.config.ExternalHost)
200 }
201
202 cert, key, err := st.ca.Issue(ctx, nil, s.config.Name, ip)
203 if err != nil {
204 return fmt.Errorf("when issuing new cluster's first certificate: %w", err)
205 }
206
207 if err := s.config.Data.PeerPKI.MkdirAll(0600); err != nil {
208 return fmt.Errorf("when creating PKI directory: %w", err)
209 }
210 if err := s.config.Data.PeerPKI.CACertificate.Write(pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: st.ca.CACertRaw}), 0600); err != nil {
211 return fmt.Errorf("when writing CA certificate to disk: %w", err)
212 }
213 if err := s.config.Data.PeerPKI.Certificate.Write(pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: cert}), 0600); err != nil {
214 return fmt.Errorf("when writing certificate to disk: %w", err)
215 }
216 if err := s.config.Data.PeerPKI.Key.Write(pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: key}), 0600); err != nil {
217 return fmt.Errorf("when writing certificate to disk: %w", err)
218 }
219 } else {
220 // Expect certificate to be present on disk.
221 present, err := s.config.Data.PeerPKI.AllExist()
222 if err != nil {
223 return fmt.Errorf("checking certificate existence: %w", err)
224 }
225 if !present {
226 return fmt.Errorf("want existing cluster, but certificate is missing from disk")
227 }
228 }
229
230 if err := s.config.Data.MkdirAll(0600); err != nil {
231 return fmt.Errorf("failed to create data directory; %w", err)
232 }
233
234 cfg, err := s.configure(ctx)
235 if err != nil {
236 return fmt.Errorf("when configuring etcd: %w", err)
237 }
238
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200239 server, err := embed.StartEtcd(cfg)
Serge Bazanskicb883e22020-07-06 17:47:55 +0200240 keep := false
241 defer func() {
242 if !keep && server != nil {
243 server.Close()
244 }
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200245 }()
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100246 if err != nil {
Serge Bazanskicb883e22020-07-06 17:47:55 +0200247 return fmt.Errorf("failed to start etcd: %w", err)
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100248 }
Serge Bazanskicb883e22020-07-06 17:47:55 +0200249 st.etcd = server
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100250
Serge Bazanskicb883e22020-07-06 17:47:55 +0200251 supervisor.Logger(ctx).Info("waiting for etcd...")
Leopold Schabel68c58752019-11-14 21:00:59 +0100252
Serge Bazanskicb883e22020-07-06 17:47:55 +0200253 okay := true
254 select {
255 case <-st.etcd.Server.ReadyNotify():
256 case <-ctx.Done():
257 okay = false
Lorenz Brun52f7f292020-06-24 16:42:02 +0200258 }
259
Serge Bazanskicb883e22020-07-06 17:47:55 +0200260 if !okay {
261 supervisor.Logger(ctx).Info("context done, aborting wait")
262 return ctx.Err()
Lorenz Brun52f7f292020-06-24 16:42:02 +0200263 }
264
Serge Bazanskicb883e22020-07-06 17:47:55 +0200265 socket := s.config.Ephemeral.ClientSocket.FullPath()
266 cl, err := clientv3.New(clientv3.Config{
267 Endpoints: []string{fmt.Sprintf("unix://%s:0", socket)},
268 DialTimeout: time.Second,
Lorenz Brun52f7f292020-06-24 16:42:02 +0200269 })
270 if err != nil {
Serge Bazanskicb883e22020-07-06 17:47:55 +0200271 return fmt.Errorf("failed to connect to new etcd instance: %w", err)
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100272 }
Serge Bazanskicb883e22020-07-06 17:47:55 +0200273 st.cl = cl
274
275 if s.config.NewCluster {
276 if st.ca == nil {
277 panic("peerCA has not been generated")
278 }
279
280 // Save new CA into etcd.
281 err = st.ca.Save(ctx, cl.KV)
282 if err != nil {
283 return fmt.Errorf("failed to save new CA to etcd: %w", err)
284 }
285 } else {
286 // Load existing CA from etcd.
287 st.ca, err = ca.Load(ctx, cl.KV)
288 if err != nil {
289 return fmt.Errorf("failed to load CA from etcd: %w", err)
290 }
291 }
292
293 // Start CRL watcher.
294 if err := supervisor.Run(ctx, "crl", s.watchCRL); err != nil {
295 return fmt.Errorf("failed to start CRL watcher: %w", err)
296 }
297 // Start autopromoter.
298 if err := supervisor.Run(ctx, "autopromoter", s.autopromoter); err != nil {
299 return fmt.Errorf("failed to start autopromoter: %w", err)
300 }
301
302 supervisor.Logger(ctx).Info("etcd is now ready")
303 keep = true
304 st.ready.Store(true)
305 supervisor.Signal(ctx, supervisor.SignalHealthy)
306
307 <-ctx.Done()
308 st.etcd.Close()
309 return ctx.Err()
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100310}
311
Serge Bazanskicb883e22020-07-06 17:47:55 +0200312// watchCRL is a sub-runnable of the etcd cluster member service that updates the on-local-storage CRL to match the
313// newest available version in etcd.
314func (s *Service) watchCRL(ctx context.Context) error {
315 s.stateMu.Lock()
316 cl := s.state.cl
317 ca := s.state.ca
318 s.stateMu.Unlock()
319
320 supervisor.Signal(ctx, supervisor.SignalHealthy)
321 for e := range ca.WaitCRLChange(ctx, cl.KV, cl.Watcher) {
322 if e.Err != nil {
323 return fmt.Errorf("watching CRL: %w", e.Err)
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100324 }
Serge Bazanskicb883e22020-07-06 17:47:55 +0200325
326 if err := s.config.Data.PeerCRL.Write(e.CRL, 0600); err != nil {
327 return fmt.Errorf("saving CRL: %w", err)
328 }
329 }
330
331 // unreachable
332 return nil
333}
334
335func (s *Service) autopromoter(ctx context.Context) error {
336 t := time.NewTicker(5 * time.Second)
337 defer t.Stop()
338
339 autopromote := func() {
340 s.stateMu.Lock()
341 st := s.state
342 s.stateMu.Unlock()
343
344 if st.etcd.Server.Leader() != st.etcd.Server.ID() {
345 return
346 }
347
348 for _, member := range st.etcd.Server.Cluster().Members() {
349 if !member.IsLearner {
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100350 continue
351 }
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100352
Serge Bazanskicb883e22020-07-06 17:47:55 +0200353 // We always call PromoteMember since the metadata necessary to decide if we should is private.
354 // Luckily etcd already does sanity checks internally and will refuse to promote nodes that aren't
355 // connected or are still behind on transactions.
356 if _, err := st.etcd.Server.PromoteMember(ctx, uint64(member.ID)); err != nil {
357 supervisor.Logger(ctx).Info("Failed to promote consensus node", zap.String("node", member.Name), zap.Error(err))
358 } else {
359 supervisor.Logger(ctx).Info("Promoted new consensus node", zap.String("node", member.Name))
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100360 }
361 }
362 }
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100363
Serge Bazanskicb883e22020-07-06 17:47:55 +0200364 for {
365 select {
366 case <-ctx.Done():
367 return ctx.Err()
368 case <-t.C:
369 autopromote()
Lorenz Brun52f7f292020-06-24 16:42:02 +0200370 }
371 }
372}
373
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200374// IsReady returns whether etcd is ready and synced
375func (s *Service) IsReady() bool {
Serge Bazanskicb883e22020-07-06 17:47:55 +0200376 s.stateMu.Lock()
377 defer s.stateMu.Unlock()
378 if s.state == nil {
379 return false
380 }
381 return s.state.ready.Load()
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200382}
383
Serge Bazanskicb883e22020-07-06 17:47:55 +0200384func (s *Service) WaitReady(ctx context.Context) error {
385 // TODO(q3k): reimplement the atomic ready flag as an event synchronization mechanism
386 if s.IsReady() {
387 return nil
388 }
389 t := time.NewTicker(100 * time.Millisecond)
390 defer t.Stop()
391 for {
392 select {
393 case <-ctx.Done():
394 return ctx.Err()
395 case <-t.C:
396 if s.IsReady() {
397 return nil
398 }
399 }
400 }
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200401}
402
Serge Bazanskicb883e22020-07-06 17:47:55 +0200403// KV returns and etcd KV client interface to the etcd member/cluster.
404func (s *Service) KV(module, space string) clientv3.KV {
405 s.stateMu.Lock()
406 defer s.stateMu.Unlock()
407 return namespace.NewKV(s.state.cl.KV, fmt.Sprintf("%s:%s", module, space))
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200408}
409
Serge Bazanskicb883e22020-07-06 17:47:55 +0200410func (s *Service) KVRoot() clientv3.KV {
411 s.stateMu.Lock()
412 defer s.stateMu.Unlock()
413 return s.state.cl.KV
414}
415
416func (s *Service) Cluster() clientv3.Cluster {
417 s.stateMu.Lock()
418 defer s.stateMu.Unlock()
419 return s.state.cl.Cluster
420}
421
422// MemberInfo returns information about this etcd cluster member: its ID and name. This will block until this
423// information is available (ie. the cluster status is Ready).
424func (s *Service) MemberInfo(ctx context.Context) (id uint64, name string, err error) {
425 if err = s.WaitReady(ctx); err != nil {
426 err = fmt.Errorf("when waiting for cluster readiness: %w", err)
427 return
428 }
429
430 s.stateMu.Lock()
431 defer s.stateMu.Unlock()
432 id = uint64(s.state.etcd.Server.ID())
433 name = s.config.Name
434 return
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200435}