blob: f5ee94944d48e71accf43345f0ca6ef3ccc1edc1 [file] [log] [blame]
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +02001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
Leopold Schabel68c58752019-11-14 21:00:59 +010017// package consensus manages the embedded etcd cluster.
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020018package consensus
19
20import (
Lorenz Bruna4ea9d02019-10-31 11:40:30 +010021 "bytes"
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020022 "context"
Lorenz Bruna4ea9d02019-10-31 11:40:30 +010023 "crypto/x509"
24 "encoding/hex"
25 "encoding/pem"
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020026 "fmt"
Leopold Schabel68c58752019-11-14 21:00:59 +010027 "git.monogon.dev/source/nexantic.git/core/internal/common/service"
Lorenz Bruna4ea9d02019-10-31 11:40:30 +010028 "io/ioutil"
29 "math/rand"
30 "net/url"
31 "os"
32 "path"
33 "path/filepath"
34 "strings"
35 "time"
36
37 "git.monogon.dev/source/nexantic.git/core/generated/api"
Lorenz Bruna4ea9d02019-10-31 11:40:30 +010038 "git.monogon.dev/source/nexantic.git/core/internal/consensus/ca"
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020039 "github.com/pkg/errors"
40 "go.etcd.io/etcd/clientv3"
41 "go.etcd.io/etcd/clientv3/namespace"
42 "go.etcd.io/etcd/embed"
43 "go.etcd.io/etcd/etcdserver/api/membership"
44 "go.etcd.io/etcd/pkg/types"
45 "go.etcd.io/etcd/proxy/grpcproxy/adapter"
46 "go.uber.org/zap"
Lorenz Bruna4ea9d02019-10-31 11:40:30 +010047 "golang.org/x/sys/unix"
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020048)
49
50const (
51 DefaultClusterToken = "SIGNOS"
52 DefaultLogger = "zap"
53)
54
Lorenz Bruna4ea9d02019-10-31 11:40:30 +010055const (
56 CAPath = "ca.pem"
57 CertPath = "cert.pem"
58 KeyPath = "cert-key.pem"
59 CRLPath = "ca-crl.der"
60 CRLSwapPath = "ca-crl.der.swp"
61)
62
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020063type (
64 Service struct {
Leopold Schabel68c58752019-11-14 21:00:59 +010065 *service.BaseService
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020066
Leopold Schabel68c58752019-11-14 21:00:59 +010067 etcd *embed.Etcd
68 kv clientv3.KV
69 ready bool
70
71 // bootstrapCA and bootstrapCert cache the etcd cluster CA data during bootstrap.
72 bootstrapCA *ca.CA
73 bootstrapCert []byte
74
Lorenz Bruna4ea9d02019-10-31 11:40:30 +010075 watchCRLTicker *time.Ticker
76 lastCRL []byte
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020077
78 config *Config
79 }
80
81 Config struct {
82 Name string
83 DataDir string
84 InitialCluster string
85 NewCluster bool
Leopold Schabel68c58752019-11-14 21:00:59 +010086 ExternalHost string
87 ListenHost string
88 ListenPort uint16
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020089 }
90
91 Member struct {
92 ID uint64
93 Name string
94 Address string
95 Synced bool
96 }
97)
98
99func NewConsensusService(config Config, logger *zap.Logger) (*Service, error) {
100 consensusServer := &Service{
101 config: &config,
102 }
Leopold Schabel68c58752019-11-14 21:00:59 +0100103 consensusServer.BaseService = service.NewBaseService("consensus", logger, consensusServer)
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200104
105 return consensusServer, nil
106}
107
108func (s *Service) OnStart() error {
Leopold Schabel68c58752019-11-14 21:00:59 +0100109 // See: https://godoc.org/github.com/coreos/etcd/embed#Config
110
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200111 if s.config == nil {
112 return errors.New("config for consensus is nil")
113 }
114
115 cfg := embed.NewConfig()
116
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100117 cfg.PeerTLSInfo.CertFile = filepath.Join(s.config.DataDir, CertPath)
118 cfg.PeerTLSInfo.KeyFile = filepath.Join(s.config.DataDir, KeyPath)
119 cfg.PeerTLSInfo.TrustedCAFile = filepath.Join(s.config.DataDir, CAPath)
120 cfg.PeerTLSInfo.ClientCertAuth = true
121 cfg.PeerTLSInfo.CRLFile = filepath.Join(s.config.DataDir, CRLPath)
122
123 lastCRL, err := ioutil.ReadFile(cfg.PeerTLSInfo.CRLFile)
124 if err != nil {
125 return fmt.Errorf("failed to read etcd CRL: %w", err)
126 }
127 s.lastCRL = lastCRL
128
Leopold Schabel68c58752019-11-14 21:00:59 +0100129 // Reset Listen Client URLs because we don't want to expose any client
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200130 cfg.LCUrls = nil
131
Leopold Schabel68c58752019-11-14 21:00:59 +0100132 // Advertise Peer URLs
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100133 apURL, err := url.Parse(fmt.Sprintf("https://%s:%d", s.config.ExternalHost, s.config.ListenPort))
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200134 if err != nil {
Leopold Schabel68c58752019-11-14 21:00:59 +0100135 return fmt.Errorf("invalid external_host or listen_port: %w", err)
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200136 }
137
Leopold Schabel68c58752019-11-14 21:00:59 +0100138 // Listen Peer URLs
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100139 lpURL, err := url.Parse(fmt.Sprintf("https://%s:%d", s.config.ListenHost, s.config.ListenPort))
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200140 if err != nil {
Leopold Schabel68c58752019-11-14 21:00:59 +0100141 return fmt.Errorf("invalid listen_host or listen_port: %w", err)
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200142 }
143 cfg.APUrls = []url.URL{*apURL}
144 cfg.LPUrls = []url.URL{*lpURL}
145 cfg.ACUrls = []url.URL{}
146
147 cfg.Dir = s.config.DataDir
148 cfg.InitialClusterToken = DefaultClusterToken
149 cfg.Name = s.config.Name
150
151 // Only relevant if creating or joining a cluster; otherwise settings will be ignored
152 if s.config.NewCluster {
153 cfg.ClusterState = "new"
154 cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
155 } else if s.config.InitialCluster != "" {
156 cfg.ClusterState = "existing"
157 cfg.InitialCluster = s.config.InitialCluster
158 }
159
160 cfg.Logger = DefaultLogger
161
162 server, err := embed.StartEtcd(cfg)
163 if err != nil {
164 return err
165 }
166 s.etcd = server
167
168 // Override the logger
169 //*server.GetLogger() = *s.Logger.With(zap.String("component", "etcd"))
Leopold Schabel68c58752019-11-14 21:00:59 +0100170 // TODO(leo): can we uncomment this?
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200171
172 go func() {
173 s.Logger.Info("waiting for etcd to become ready")
174 <-s.etcd.Server.ReadyNotify()
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200175 s.Logger.Info("etcd is now ready")
176 }()
177
178 // Inject kv client
179 s.kv = clientv3.NewKVFromKVClient(adapter.KvServerToKvClient(s.etcd.Server), nil)
180
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100181 // Start CRL watcher
182 go s.watchCRL()
183
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200184 return nil
185}
186
Leopold Schabel68c58752019-11-14 21:00:59 +0100187// WriteCertificateFiles writes the given node certificate data to local storage
188// such that it can be used by the embedded etcd server.
189// Unfortunately, we cannot pass the certificates directly to etcd.
190func (s *Service) WriteCertificateFiles(certs *api.ConsensusCertificates) error {
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100191 if err := ioutil.WriteFile(filepath.Join(s.config.DataDir, CRLPath), certs.Crl, 0600); err != nil {
192 return err
193 }
194 if err := ioutil.WriteFile(filepath.Join(s.config.DataDir, CertPath),
195 pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: certs.Cert}), 0600); err != nil {
196 return err
197 }
198 if err := ioutil.WriteFile(filepath.Join(s.config.DataDir, KeyPath),
199 pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: certs.Key}), 0600); err != nil {
200 return err
201 }
202 if err := ioutil.WriteFile(filepath.Join(s.config.DataDir, CAPath),
203 pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: certs.Ca}), 0600); err != nil {
204 return err
205 }
206 return nil
207}
208
Leopold Schabel68c58752019-11-14 21:00:59 +0100209// PrecreateCA generates the etcd cluster certificate authority and writes it to local storage.
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100210func (s *Service) PrecreateCA() error {
211 // Provision an etcd CA
212 etcdRootCA, err := ca.New("Smalltown etcd Root CA")
213 if err != nil {
214 return err
215 }
216 cert, privkey, err := etcdRootCA.IssueCertificate(s.config.ExternalHost)
217 if err != nil {
218 return fmt.Errorf("failed to self-issue a certificate: %w", err)
219 }
220 if err := os.MkdirAll(s.config.DataDir, 0700); err != nil {
221 return fmt.Errorf("failed to create consensus data dir: %w", err)
222 }
223 // Preserve certificate for later injection
224 s.bootstrapCert = cert
Leopold Schabel68c58752019-11-14 21:00:59 +0100225 if err := s.WriteCertificateFiles(&api.ConsensusCertificates{
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100226 Ca: etcdRootCA.CACertRaw,
227 Crl: etcdRootCA.CRLRaw,
228 Cert: cert,
229 Key: privkey,
230 }); err != nil {
231 return fmt.Errorf("failed to setup certificates: %w", err)
232 }
233 s.bootstrapCA = etcdRootCA
234 return nil
235}
236
237const (
238 caPathEtcd = "/etcd-ca/ca.der"
239 caKeyPathEtcd = "/etcd-ca/ca-key.der"
240 crlPathEtcd = "/etcd-ca/crl.der"
Leopold Schabel68c58752019-11-14 21:00:59 +0100241
242 // This prefix stores the individual certs the etcd CA has issued.
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100243 certPrefixEtcd = "/etcd-ca/certs"
244)
245
Leopold Schabel68c58752019-11-14 21:00:59 +0100246// InjectCA copies the CA from data cached during PrecreateCA to etcd.
247// Requires a previous call to PrecreateCA.
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100248func (s *Service) InjectCA() error {
Leopold Schabel68c58752019-11-14 21:00:59 +0100249 if s.bootstrapCA == nil || s.bootstrapCert == nil {
250 panic("bootstrapCA or bootstrapCert are nil - missing PrecreateCA call?")
251 }
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100252 if _, err := s.kv.Put(context.Background(), caPathEtcd, string(s.bootstrapCA.CACertRaw)); err != nil {
253 return err
254 }
Leopold Schabel68c58752019-11-14 21:00:59 +0100255 // TODO(lorenz): Should be wrapped by the master key
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100256 if _, err := s.kv.Put(context.Background(), caKeyPathEtcd, string([]byte(*s.bootstrapCA.PrivateKey))); err != nil {
257 return err
258 }
259 if _, err := s.kv.Put(context.Background(), crlPathEtcd, string(s.bootstrapCA.CRLRaw)); err != nil {
260 return err
261 }
262 certVal, err := x509.ParseCertificate(s.bootstrapCert)
263 if err != nil {
264 return err
265 }
266 serial := hex.EncodeToString(certVal.SerialNumber.Bytes())
267 if _, err := s.kv.Put(context.Background(), path.Join(certPrefixEtcd, serial), string(s.bootstrapCert)); err != nil {
268 return fmt.Errorf("failed to persist certificate: %w", err)
269 }
270 // Clear out bootstrap CA after injecting
271 s.bootstrapCA = nil
272 s.bootstrapCert = []byte{}
273 return nil
274}
275
276func (s *Service) etcdGetSingle(path string) ([]byte, int64, error) {
277 res, err := s.kv.Get(context.Background(), path)
278 if err != nil {
279 return nil, -1, fmt.Errorf("failed to get key from etcd: %w", err)
280 }
281 if len(res.Kvs) != 1 {
Leopold Schabel68c58752019-11-14 21:00:59 +0100282 return nil, -1, errors.New("key not available or multiple keys returned")
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100283 }
284 return res.Kvs[0].Value, res.Kvs[0].ModRevision, nil
285}
286
Leopold Schabel68c58752019-11-14 21:00:59 +0100287func (s *Service) getCAFromEtcd() (*ca.CA, int64, error) {
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100288 // TODO: Technically this could be done in a single request, but it's more logic
289 caCert, _, err := s.etcdGetSingle(caPathEtcd)
290 if err != nil {
291 return nil, -1, fmt.Errorf("failed to get CA certificate from etcd: %w", err)
292 }
293 caKey, _, err := s.etcdGetSingle(caKeyPathEtcd)
294 if err != nil {
295 return nil, -1, fmt.Errorf("failed to get CA key from etcd: %w", err)
296 }
297 // TODO: Unwrap CA key once wrapping is implemented
298 crl, crlRevision, err := s.etcdGetSingle(crlPathEtcd)
299 if err != nil {
300 return nil, -1, fmt.Errorf("failed to get CRL from etcd: %w", err)
301 }
302 idCA, err := ca.FromCertificates(caCert, caKey, crl)
303 if err != nil {
304 return nil, -1, fmt.Errorf("failed to take CA online: %w", err)
305 }
306 return idCA, crlRevision, nil
307}
308
309func (s *Service) IssueCertificate(hostname string) (*api.ConsensusCertificates, error) {
Leopold Schabel68c58752019-11-14 21:00:59 +0100310 idCA, _, err := s.getCAFromEtcd()
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100311 if err != nil {
312 return nil, err
313 }
314 cert, key, err := idCA.IssueCertificate(hostname)
315 if err != nil {
316 return nil, fmt.Errorf("failed to issue certificate: %w", err)
317 }
318 certVal, err := x509.ParseCertificate(cert)
319 if err != nil {
320 return nil, err
321 }
322 serial := hex.EncodeToString(certVal.SerialNumber.Bytes())
323 if _, err := s.kv.Put(context.Background(), path.Join(certPrefixEtcd, serial), string(cert)); err != nil {
Leopold Schabel68c58752019-11-14 21:00:59 +0100324 // We issued a certificate, but failed to persist it. Return an error and forget it ever happened.
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100325 return nil, fmt.Errorf("failed to persist certificate: %w", err)
326 }
327 return &api.ConsensusCertificates{
328 Ca: idCA.CACertRaw,
329 Cert: cert,
330 Crl: idCA.CRLRaw,
331 Key: key,
332 }, nil
333}
334
335func (s *Service) RevokeCertificate(hostname string) error {
336 rand.Seed(time.Now().UnixNano())
337 for {
Leopold Schabel68c58752019-11-14 21:00:59 +0100338 idCA, crlRevision, err := s.getCAFromEtcd()
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100339 if err != nil {
340 return err
341 }
342 allIssuedCerts, err := s.kv.Get(context.Background(), certPrefixEtcd, clientv3.WithPrefix())
343 for _, cert := range allIssuedCerts.Kvs {
344 certVal, err := x509.ParseCertificate(cert.Value)
345 if err != nil {
346 s.Logger.Error("Failed to parse previously issued certificate, this is a security risk", zap.Error(err))
347 continue
348 }
349 for _, dnsName := range certVal.DNSNames {
350 if dnsName == hostname {
351 // Revoke this
352 if err := idCA.Revoke(certVal.SerialNumber); err != nil {
353 // We need to fail if any single revocation fails otherwise outer applications
354 // have no chance of calling this safely
355 return err
356 }
357 }
358 }
359 }
Leopold Schabel68c58752019-11-14 21:00:59 +0100360 // TODO(leo): this needs a test
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100361 cmp := clientv3.Compare(clientv3.ModRevision(crlPathEtcd), "=", crlRevision)
362 op := clientv3.OpPut(crlPathEtcd, string(idCA.CRLRaw))
363 res, err := s.kv.Txn(context.Background()).If(cmp).Then(op).Commit()
364 if err != nil {
365 return fmt.Errorf("failed to persist new CRL in etcd: %w", err)
366 }
367 if res.Succeeded { // Transaction has succeeded
368 break
369 }
370 // Sleep a random duration between 0 and 300ms to reduce serialization failures
371 time.Sleep(time.Duration(rand.Intn(300)) * time.Millisecond)
372 }
373 return nil
374}
375
376func (s *Service) watchCRL() {
Leopold Schabel68c58752019-11-14 21:00:59 +0100377 // TODO(lorenz): Change etcd client to WatchableKV and make this an actual watch
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100378 // This needs changes in more places, so leaving it now
379 s.watchCRLTicker = time.NewTicker(30 * time.Second)
380 for range s.watchCRLTicker.C {
381 crl, _, err := s.etcdGetSingle(crlPathEtcd)
382 if err != nil {
383 s.Logger.Warn("Failed to check for new CRL", zap.Error(err))
384 continue
385 }
386 // This is cryptographic material but not secret, so no constant time compare necessary here
387 if !bytes.Equal(crl, s.lastCRL) {
388 if err := ioutil.WriteFile(filepath.Join(s.config.DataDir, CRLSwapPath), crl, 0600); err != nil {
389 s.Logger.Warn("Failed to write updated CRL", zap.Error(err))
390 }
391 // This uses unix.Rename to guarantee a particular atomic update behavior
392 if err := unix.Rename(filepath.Join(s.config.DataDir, CRLSwapPath), filepath.Join(s.config.DataDir, CRLPath)); err != nil {
393 s.Logger.Warn("Failed to atomically swap updated CRL", zap.Error(err))
394 }
395 }
396 }
397}
398
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200399func (s *Service) OnStop() error {
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100400 s.watchCRLTicker.Stop()
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200401 s.etcd.Close()
402
403 return nil
404}
405
406// IsProvisioned returns whether the node has been setup before and etcd has a data directory
407func (s *Service) IsProvisioned() bool {
408 _, err := os.Stat(s.config.DataDir)
409
410 return !os.IsNotExist(err)
411}
412
413// IsReady returns whether etcd is ready and synced
414func (s *Service) IsReady() bool {
415 return s.ready
416}
417
418// AddMember adds a new etcd member to the cluster
419func (s *Service) AddMember(ctx context.Context, name string, url string) (uint64, error) {
420 urls, err := types.NewURLs([]string{url})
421 if err != nil {
422 return 0, err
423 }
424
425 member := membership.NewMember(name, urls, DefaultClusterToken, nil)
426
427 _, err = s.etcd.Server.AddMember(ctx, *member)
428 if err != nil {
429 return 0, err
430 }
431
432 return uint64(member.ID), nil
433}
434
435// RemoveMember removes a member from the etcd cluster
436func (s *Service) RemoveMember(ctx context.Context, id uint64) error {
437 _, err := s.etcd.Server.RemoveMember(ctx, id)
438 return err
439}
440
441// Health returns the current cluster health
442func (s *Service) Health() {
443}
444
445// GetConfig returns the current consensus config
446func (s *Service) GetConfig() Config {
447 return *s.config
448}
449
450// SetConfig sets the consensus config. Changes are only applied when the service is restarted.
451func (s *Service) SetConfig(config Config) {
452 s.config = &config
453}
454
455// GetInitialClusterString returns the InitialCluster string that can be used to bootstrap a consensus node
456func (s *Service) GetInitialClusterString() string {
457 members := s.etcd.Server.Cluster().Members()
458 clusterString := strings.Builder{}
459
460 for i, m := range members {
461 if i != 0 {
462 clusterString.WriteString(",")
463 }
464 clusterString.WriteString(m.Name)
465 clusterString.WriteString("=")
466 clusterString.WriteString(m.PickPeerURL())
467 }
468
469 return clusterString.String()
470}
471
472// GetNodes returns a list of consensus nodes
473func (s *Service) GetNodes() []Member {
474 members := s.etcd.Server.Cluster().Members()
475 cMembers := make([]Member, len(members))
476 for i, m := range members {
477 cMembers[i] = Member{
478 ID: uint64(m.ID),
479 Name: m.Name,
480 Address: m.PickPeerURL(),
481 Synced: !m.IsLearner,
482 }
483 }
484
485 return cMembers
486}
487
488func (s *Service) GetStore(module, space string) clientv3.KV {
489 return namespace.NewKV(s.kv, fmt.Sprintf("%s:%s", module, space))
490}