blob: 5a0beadaeedfc0ed22770cadbd5c9693ab0e066d [file] [log] [blame]
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +02001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
Leopold Schabel68c58752019-11-14 21:00:59 +010017// package consensus manages the embedded etcd cluster.
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020018package consensus
19
20import (
Lorenz Bruna4ea9d02019-10-31 11:40:30 +010021 "bytes"
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020022 "context"
Lorenz Bruna4ea9d02019-10-31 11:40:30 +010023 "crypto/x509"
24 "encoding/hex"
25 "encoding/pem"
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020026 "fmt"
Lorenz Bruna4ea9d02019-10-31 11:40:30 +010027 "io/ioutil"
28 "math/rand"
29 "net/url"
30 "os"
31 "path"
32 "path/filepath"
33 "strings"
34 "time"
35
Lorenz Brunaa6b7342019-12-12 02:55:02 +010036 "git.monogon.dev/source/nexantic.git/core/internal/common"
Lorenz Brun6e8f69c2019-11-18 10:44:24 +010037 "git.monogon.dev/source/nexantic.git/core/internal/common/service"
38
Lorenz Bruna4ea9d02019-10-31 11:40:30 +010039 "git.monogon.dev/source/nexantic.git/core/generated/api"
Lorenz Brun6e8f69c2019-11-18 10:44:24 +010040
Lorenz Bruna4ea9d02019-10-31 11:40:30 +010041 "git.monogon.dev/source/nexantic.git/core/internal/consensus/ca"
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020042 "github.com/pkg/errors"
43 "go.etcd.io/etcd/clientv3"
44 "go.etcd.io/etcd/clientv3/namespace"
45 "go.etcd.io/etcd/embed"
46 "go.etcd.io/etcd/etcdserver/api/membership"
47 "go.etcd.io/etcd/pkg/types"
48 "go.etcd.io/etcd/proxy/grpcproxy/adapter"
49 "go.uber.org/zap"
Lorenz Bruna4ea9d02019-10-31 11:40:30 +010050 "golang.org/x/sys/unix"
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020051)
52
53const (
54 DefaultClusterToken = "SIGNOS"
55 DefaultLogger = "zap"
56)
57
Lorenz Bruna4ea9d02019-10-31 11:40:30 +010058const (
59 CAPath = "ca.pem"
60 CertPath = "cert.pem"
61 KeyPath = "cert-key.pem"
62 CRLPath = "ca-crl.der"
63 CRLSwapPath = "ca-crl.der.swp"
64)
65
Lorenz Brun6e8f69c2019-11-18 10:44:24 +010066const (
67 LocalListenerURL = "unix:///consensus/listener.sock:0"
68)
69
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020070type (
71 Service struct {
Leopold Schabel68c58752019-11-14 21:00:59 +010072 *service.BaseService
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020073
Leopold Schabel68c58752019-11-14 21:00:59 +010074 etcd *embed.Etcd
75 kv clientv3.KV
76 ready bool
77
78 // bootstrapCA and bootstrapCert cache the etcd cluster CA data during bootstrap.
79 bootstrapCA *ca.CA
80 bootstrapCert []byte
81
Lorenz Bruna4ea9d02019-10-31 11:40:30 +010082 watchCRLTicker *time.Ticker
83 lastCRL []byte
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020084
85 config *Config
86 }
87
88 Config struct {
89 Name string
90 DataDir string
91 InitialCluster string
92 NewCluster bool
Leopold Schabel68c58752019-11-14 21:00:59 +010093 ExternalHost string
94 ListenHost string
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020095 }
96
97 Member struct {
98 ID uint64
99 Name string
100 Address string
101 Synced bool
102 }
103)
104
105func NewConsensusService(config Config, logger *zap.Logger) (*Service, error) {
106 consensusServer := &Service{
107 config: &config,
108 }
Leopold Schabel68c58752019-11-14 21:00:59 +0100109 consensusServer.BaseService = service.NewBaseService("consensus", logger, consensusServer)
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200110
111 return consensusServer, nil
112}
113
114func (s *Service) OnStart() error {
Leopold Schabel68c58752019-11-14 21:00:59 +0100115 // See: https://godoc.org/github.com/coreos/etcd/embed#Config
116
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200117 if s.config == nil {
118 return errors.New("config for consensus is nil")
119 }
120
121 cfg := embed.NewConfig()
122
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100123 cfg.PeerTLSInfo.CertFile = filepath.Join(s.config.DataDir, CertPath)
124 cfg.PeerTLSInfo.KeyFile = filepath.Join(s.config.DataDir, KeyPath)
125 cfg.PeerTLSInfo.TrustedCAFile = filepath.Join(s.config.DataDir, CAPath)
126 cfg.PeerTLSInfo.ClientCertAuth = true
127 cfg.PeerTLSInfo.CRLFile = filepath.Join(s.config.DataDir, CRLPath)
128
129 lastCRL, err := ioutil.ReadFile(cfg.PeerTLSInfo.CRLFile)
130 if err != nil {
131 return fmt.Errorf("failed to read etcd CRL: %w", err)
132 }
133 s.lastCRL = lastCRL
134
Lorenz Brun6e8f69c2019-11-18 10:44:24 +0100135 // Expose etcd to local processes
136 if err := os.MkdirAll("/consensus", 0700); err != nil {
137 return fmt.Errorf("Failed to create consensus runtime state directory: %w", err)
138 }
139 listenerURL, err := url.Parse(LocalListenerURL)
140 if err != nil {
141 panic(err)
142 }
143 cfg.LCUrls = []url.URL{*listenerURL}
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200144
Leopold Schabel68c58752019-11-14 21:00:59 +0100145 // Advertise Peer URLs
Lorenz Brunaa6b7342019-12-12 02:55:02 +0100146 apURL, err := url.Parse(fmt.Sprintf("https://%s:%d", s.config.ExternalHost, common.ConsensusPort))
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200147 if err != nil {
Leopold Schabel68c58752019-11-14 21:00:59 +0100148 return fmt.Errorf("invalid external_host or listen_port: %w", err)
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200149 }
150
Leopold Schabel68c58752019-11-14 21:00:59 +0100151 // Listen Peer URLs
Lorenz Brunaa6b7342019-12-12 02:55:02 +0100152 lpURL, err := url.Parse(fmt.Sprintf("https://%s:%d", s.config.ListenHost, common.ConsensusPort))
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200153 if err != nil {
Leopold Schabel68c58752019-11-14 21:00:59 +0100154 return fmt.Errorf("invalid listen_host or listen_port: %w", err)
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200155 }
156 cfg.APUrls = []url.URL{*apURL}
157 cfg.LPUrls = []url.URL{*lpURL}
158 cfg.ACUrls = []url.URL{}
159
160 cfg.Dir = s.config.DataDir
161 cfg.InitialClusterToken = DefaultClusterToken
162 cfg.Name = s.config.Name
163
164 // Only relevant if creating or joining a cluster; otherwise settings will be ignored
165 if s.config.NewCluster {
166 cfg.ClusterState = "new"
167 cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
168 } else if s.config.InitialCluster != "" {
169 cfg.ClusterState = "existing"
170 cfg.InitialCluster = s.config.InitialCluster
171 }
172
173 cfg.Logger = DefaultLogger
174
175 server, err := embed.StartEtcd(cfg)
176 if err != nil {
177 return err
178 }
179 s.etcd = server
180
181 // Override the logger
182 //*server.GetLogger() = *s.Logger.With(zap.String("component", "etcd"))
Leopold Schabel68c58752019-11-14 21:00:59 +0100183 // TODO(leo): can we uncomment this?
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200184
185 go func() {
186 s.Logger.Info("waiting for etcd to become ready")
187 <-s.etcd.Server.ReadyNotify()
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200188 s.Logger.Info("etcd is now ready")
189 }()
190
191 // Inject kv client
192 s.kv = clientv3.NewKVFromKVClient(adapter.KvServerToKvClient(s.etcd.Server), nil)
193
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100194 // Start CRL watcher
195 go s.watchCRL()
196
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200197 return nil
198}
199
Leopold Schabel68c58752019-11-14 21:00:59 +0100200// WriteCertificateFiles writes the given node certificate data to local storage
201// such that it can be used by the embedded etcd server.
202// Unfortunately, we cannot pass the certificates directly to etcd.
203func (s *Service) WriteCertificateFiles(certs *api.ConsensusCertificates) error {
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100204 if err := ioutil.WriteFile(filepath.Join(s.config.DataDir, CRLPath), certs.Crl, 0600); err != nil {
205 return err
206 }
207 if err := ioutil.WriteFile(filepath.Join(s.config.DataDir, CertPath),
208 pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: certs.Cert}), 0600); err != nil {
209 return err
210 }
211 if err := ioutil.WriteFile(filepath.Join(s.config.DataDir, KeyPath),
212 pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: certs.Key}), 0600); err != nil {
213 return err
214 }
215 if err := ioutil.WriteFile(filepath.Join(s.config.DataDir, CAPath),
216 pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: certs.Ca}), 0600); err != nil {
217 return err
218 }
219 return nil
220}
221
Leopold Schabel68c58752019-11-14 21:00:59 +0100222// PrecreateCA generates the etcd cluster certificate authority and writes it to local storage.
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100223func (s *Service) PrecreateCA() error {
224 // Provision an etcd CA
225 etcdRootCA, err := ca.New("Smalltown etcd Root CA")
226 if err != nil {
227 return err
228 }
229 cert, privkey, err := etcdRootCA.IssueCertificate(s.config.ExternalHost)
230 if err != nil {
231 return fmt.Errorf("failed to self-issue a certificate: %w", err)
232 }
233 if err := os.MkdirAll(s.config.DataDir, 0700); err != nil {
234 return fmt.Errorf("failed to create consensus data dir: %w", err)
235 }
236 // Preserve certificate for later injection
237 s.bootstrapCert = cert
Leopold Schabel68c58752019-11-14 21:00:59 +0100238 if err := s.WriteCertificateFiles(&api.ConsensusCertificates{
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100239 Ca: etcdRootCA.CACertRaw,
240 Crl: etcdRootCA.CRLRaw,
241 Cert: cert,
242 Key: privkey,
243 }); err != nil {
244 return fmt.Errorf("failed to setup certificates: %w", err)
245 }
246 s.bootstrapCA = etcdRootCA
247 return nil
248}
249
250const (
Lorenz Brun6e8f69c2019-11-18 10:44:24 +0100251 caPathEtcd = "/etcd-ca/ca.der"
252 caKeyPathEtcd = "/etcd-ca/ca-key.der"
253 crlPathEtcd = "/etcd-ca/crl.der"
Leopold Schabel68c58752019-11-14 21:00:59 +0100254
255 // This prefix stores the individual certs the etcd CA has issued.
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100256 certPrefixEtcd = "/etcd-ca/certs"
257)
258
Leopold Schabel68c58752019-11-14 21:00:59 +0100259// InjectCA copies the CA from data cached during PrecreateCA to etcd.
260// Requires a previous call to PrecreateCA.
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100261func (s *Service) InjectCA() error {
Leopold Schabel68c58752019-11-14 21:00:59 +0100262 if s.bootstrapCA == nil || s.bootstrapCert == nil {
263 panic("bootstrapCA or bootstrapCert are nil - missing PrecreateCA call?")
264 }
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100265 if _, err := s.kv.Put(context.Background(), caPathEtcd, string(s.bootstrapCA.CACertRaw)); err != nil {
266 return err
267 }
Leopold Schabel68c58752019-11-14 21:00:59 +0100268 // TODO(lorenz): Should be wrapped by the master key
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100269 if _, err := s.kv.Put(context.Background(), caKeyPathEtcd, string([]byte(*s.bootstrapCA.PrivateKey))); err != nil {
270 return err
271 }
272 if _, err := s.kv.Put(context.Background(), crlPathEtcd, string(s.bootstrapCA.CRLRaw)); err != nil {
273 return err
274 }
275 certVal, err := x509.ParseCertificate(s.bootstrapCert)
276 if err != nil {
277 return err
278 }
279 serial := hex.EncodeToString(certVal.SerialNumber.Bytes())
280 if _, err := s.kv.Put(context.Background(), path.Join(certPrefixEtcd, serial), string(s.bootstrapCert)); err != nil {
281 return fmt.Errorf("failed to persist certificate: %w", err)
282 }
283 // Clear out bootstrap CA after injecting
284 s.bootstrapCA = nil
285 s.bootstrapCert = []byte{}
286 return nil
287}
288
289func (s *Service) etcdGetSingle(path string) ([]byte, int64, error) {
290 res, err := s.kv.Get(context.Background(), path)
291 if err != nil {
292 return nil, -1, fmt.Errorf("failed to get key from etcd: %w", err)
293 }
294 if len(res.Kvs) != 1 {
Leopold Schabel68c58752019-11-14 21:00:59 +0100295 return nil, -1, errors.New("key not available or multiple keys returned")
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100296 }
297 return res.Kvs[0].Value, res.Kvs[0].ModRevision, nil
298}
299
Leopold Schabel68c58752019-11-14 21:00:59 +0100300func (s *Service) getCAFromEtcd() (*ca.CA, int64, error) {
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100301 // TODO: Technically this could be done in a single request, but it's more logic
302 caCert, _, err := s.etcdGetSingle(caPathEtcd)
303 if err != nil {
304 return nil, -1, fmt.Errorf("failed to get CA certificate from etcd: %w", err)
305 }
306 caKey, _, err := s.etcdGetSingle(caKeyPathEtcd)
307 if err != nil {
308 return nil, -1, fmt.Errorf("failed to get CA key from etcd: %w", err)
309 }
310 // TODO: Unwrap CA key once wrapping is implemented
311 crl, crlRevision, err := s.etcdGetSingle(crlPathEtcd)
312 if err != nil {
313 return nil, -1, fmt.Errorf("failed to get CRL from etcd: %w", err)
314 }
315 idCA, err := ca.FromCertificates(caCert, caKey, crl)
316 if err != nil {
317 return nil, -1, fmt.Errorf("failed to take CA online: %w", err)
318 }
319 return idCA, crlRevision, nil
320}
321
322func (s *Service) IssueCertificate(hostname string) (*api.ConsensusCertificates, error) {
Leopold Schabel68c58752019-11-14 21:00:59 +0100323 idCA, _, err := s.getCAFromEtcd()
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100324 if err != nil {
325 return nil, err
326 }
327 cert, key, err := idCA.IssueCertificate(hostname)
328 if err != nil {
329 return nil, fmt.Errorf("failed to issue certificate: %w", err)
330 }
331 certVal, err := x509.ParseCertificate(cert)
332 if err != nil {
333 return nil, err
334 }
335 serial := hex.EncodeToString(certVal.SerialNumber.Bytes())
336 if _, err := s.kv.Put(context.Background(), path.Join(certPrefixEtcd, serial), string(cert)); err != nil {
Leopold Schabel68c58752019-11-14 21:00:59 +0100337 // We issued a certificate, but failed to persist it. Return an error and forget it ever happened.
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100338 return nil, fmt.Errorf("failed to persist certificate: %w", err)
339 }
340 return &api.ConsensusCertificates{
341 Ca: idCA.CACertRaw,
342 Cert: cert,
343 Crl: idCA.CRLRaw,
344 Key: key,
345 }, nil
346}
347
348func (s *Service) RevokeCertificate(hostname string) error {
349 rand.Seed(time.Now().UnixNano())
350 for {
Leopold Schabel68c58752019-11-14 21:00:59 +0100351 idCA, crlRevision, err := s.getCAFromEtcd()
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100352 if err != nil {
353 return err
354 }
355 allIssuedCerts, err := s.kv.Get(context.Background(), certPrefixEtcd, clientv3.WithPrefix())
356 for _, cert := range allIssuedCerts.Kvs {
357 certVal, err := x509.ParseCertificate(cert.Value)
358 if err != nil {
359 s.Logger.Error("Failed to parse previously issued certificate, this is a security risk", zap.Error(err))
360 continue
361 }
362 for _, dnsName := range certVal.DNSNames {
363 if dnsName == hostname {
364 // Revoke this
365 if err := idCA.Revoke(certVal.SerialNumber); err != nil {
366 // We need to fail if any single revocation fails otherwise outer applications
367 // have no chance of calling this safely
368 return err
369 }
370 }
371 }
372 }
Leopold Schabel68c58752019-11-14 21:00:59 +0100373 // TODO(leo): this needs a test
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100374 cmp := clientv3.Compare(clientv3.ModRevision(crlPathEtcd), "=", crlRevision)
375 op := clientv3.OpPut(crlPathEtcd, string(idCA.CRLRaw))
376 res, err := s.kv.Txn(context.Background()).If(cmp).Then(op).Commit()
377 if err != nil {
378 return fmt.Errorf("failed to persist new CRL in etcd: %w", err)
379 }
380 if res.Succeeded { // Transaction has succeeded
381 break
382 }
383 // Sleep a random duration between 0 and 300ms to reduce serialization failures
384 time.Sleep(time.Duration(rand.Intn(300)) * time.Millisecond)
385 }
386 return nil
387}
388
389func (s *Service) watchCRL() {
Leopold Schabel68c58752019-11-14 21:00:59 +0100390 // TODO(lorenz): Change etcd client to WatchableKV and make this an actual watch
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100391 // This needs changes in more places, so leaving it now
392 s.watchCRLTicker = time.NewTicker(30 * time.Second)
393 for range s.watchCRLTicker.C {
394 crl, _, err := s.etcdGetSingle(crlPathEtcd)
395 if err != nil {
396 s.Logger.Warn("Failed to check for new CRL", zap.Error(err))
397 continue
398 }
399 // This is cryptographic material but not secret, so no constant time compare necessary here
400 if !bytes.Equal(crl, s.lastCRL) {
401 if err := ioutil.WriteFile(filepath.Join(s.config.DataDir, CRLSwapPath), crl, 0600); err != nil {
402 s.Logger.Warn("Failed to write updated CRL", zap.Error(err))
403 }
404 // This uses unix.Rename to guarantee a particular atomic update behavior
405 if err := unix.Rename(filepath.Join(s.config.DataDir, CRLSwapPath), filepath.Join(s.config.DataDir, CRLPath)); err != nil {
406 s.Logger.Warn("Failed to atomically swap updated CRL", zap.Error(err))
407 }
408 }
409 }
410}
411
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200412func (s *Service) OnStop() error {
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100413 s.watchCRLTicker.Stop()
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200414 s.etcd.Close()
415
416 return nil
417}
418
419// IsProvisioned returns whether the node has been setup before and etcd has a data directory
420func (s *Service) IsProvisioned() bool {
421 _, err := os.Stat(s.config.DataDir)
422
423 return !os.IsNotExist(err)
424}
425
426// IsReady returns whether etcd is ready and synced
427func (s *Service) IsReady() bool {
428 return s.ready
429}
430
431// AddMember adds a new etcd member to the cluster
432func (s *Service) AddMember(ctx context.Context, name string, url string) (uint64, error) {
433 urls, err := types.NewURLs([]string{url})
434 if err != nil {
435 return 0, err
436 }
437
438 member := membership.NewMember(name, urls, DefaultClusterToken, nil)
439
440 _, err = s.etcd.Server.AddMember(ctx, *member)
441 if err != nil {
442 return 0, err
443 }
444
445 return uint64(member.ID), nil
446}
447
448// RemoveMember removes a member from the etcd cluster
449func (s *Service) RemoveMember(ctx context.Context, id uint64) error {
450 _, err := s.etcd.Server.RemoveMember(ctx, id)
451 return err
452}
453
454// Health returns the current cluster health
455func (s *Service) Health() {
456}
457
458// GetConfig returns the current consensus config
459func (s *Service) GetConfig() Config {
460 return *s.config
461}
462
463// SetConfig sets the consensus config. Changes are only applied when the service is restarted.
464func (s *Service) SetConfig(config Config) {
465 s.config = &config
466}
467
468// GetInitialClusterString returns the InitialCluster string that can be used to bootstrap a consensus node
469func (s *Service) GetInitialClusterString() string {
470 members := s.etcd.Server.Cluster().Members()
471 clusterString := strings.Builder{}
472
473 for i, m := range members {
474 if i != 0 {
475 clusterString.WriteString(",")
476 }
477 clusterString.WriteString(m.Name)
478 clusterString.WriteString("=")
479 clusterString.WriteString(m.PickPeerURL())
480 }
481
482 return clusterString.String()
483}
484
485// GetNodes returns a list of consensus nodes
486func (s *Service) GetNodes() []Member {
487 members := s.etcd.Server.Cluster().Members()
488 cMembers := make([]Member, len(members))
489 for i, m := range members {
490 cMembers[i] = Member{
491 ID: uint64(m.ID),
492 Name: m.Name,
493 Address: m.PickPeerURL(),
494 Synced: !m.IsLearner,
495 }
496 }
497
498 return cMembers
499}
500
501func (s *Service) GetStore(module, space string) clientv3.KV {
502 return namespace.NewKV(s.kv, fmt.Sprintf("%s:%s", module, space))
503}