blob: d87a506e9f51a66ee140779add1d1bbfd1cb48f2 [file] [log] [blame]
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +02001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package consensus
18
19import (
Lorenz Bruna4ea9d02019-10-31 11:40:30 +010020 "bytes"
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020021 "context"
Lorenz Bruna4ea9d02019-10-31 11:40:30 +010022 "crypto/x509"
23 "encoding/hex"
24 "encoding/pem"
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020025 "fmt"
Lorenz Bruna4ea9d02019-10-31 11:40:30 +010026 "io/ioutil"
27 "math/rand"
28 "net/url"
29 "os"
30 "path"
31 "path/filepath"
32 "strings"
33 "time"
34
35 "git.monogon.dev/source/nexantic.git/core/generated/api"
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020036 "git.monogon.dev/source/nexantic.git/core/internal/common"
Lorenz Bruna4ea9d02019-10-31 11:40:30 +010037 "git.monogon.dev/source/nexantic.git/core/internal/consensus/ca"
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020038 "github.com/pkg/errors"
39 "go.etcd.io/etcd/clientv3"
40 "go.etcd.io/etcd/clientv3/namespace"
41 "go.etcd.io/etcd/embed"
42 "go.etcd.io/etcd/etcdserver/api/membership"
43 "go.etcd.io/etcd/pkg/types"
44 "go.etcd.io/etcd/proxy/grpcproxy/adapter"
45 "go.uber.org/zap"
Lorenz Bruna4ea9d02019-10-31 11:40:30 +010046 "golang.org/x/sys/unix"
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020047)
48
49const (
50 DefaultClusterToken = "SIGNOS"
51 DefaultLogger = "zap"
52)
53
Lorenz Bruna4ea9d02019-10-31 11:40:30 +010054const (
55 CAPath = "ca.pem"
56 CertPath = "cert.pem"
57 KeyPath = "cert-key.pem"
58 CRLPath = "ca-crl.der"
59 CRLSwapPath = "ca-crl.der.swp"
60)
61
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020062type (
63 Service struct {
64 *common.BaseService
65
Lorenz Bruna4ea9d02019-10-31 11:40:30 +010066 etcd *embed.Etcd
67 kv clientv3.KV
68 ready bool
69 bootstrapCA *ca.CA
70 bootstrapCert []byte
71 watchCRLTicker *time.Ticker
72 lastCRL []byte
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +020073
74 config *Config
75 }
76
77 Config struct {
78 Name string
79 DataDir string
80 InitialCluster string
81 NewCluster bool
82
83 ExternalHost string
84 ListenHost string
85 ListenPort uint16
86 }
87
88 Member struct {
89 ID uint64
90 Name string
91 Address string
92 Synced bool
93 }
94)
95
96func NewConsensusService(config Config, logger *zap.Logger) (*Service, error) {
97 consensusServer := &Service{
98 config: &config,
99 }
100 consensusServer.BaseService = common.NewBaseService("consensus", logger, consensusServer)
101
102 return consensusServer, nil
103}
104
105func (s *Service) OnStart() error {
106 if s.config == nil {
107 return errors.New("config for consensus is nil")
108 }
109
110 cfg := embed.NewConfig()
111
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100112 cfg.PeerTLSInfo.CertFile = filepath.Join(s.config.DataDir, CertPath)
113 cfg.PeerTLSInfo.KeyFile = filepath.Join(s.config.DataDir, KeyPath)
114 cfg.PeerTLSInfo.TrustedCAFile = filepath.Join(s.config.DataDir, CAPath)
115 cfg.PeerTLSInfo.ClientCertAuth = true
116 cfg.PeerTLSInfo.CRLFile = filepath.Join(s.config.DataDir, CRLPath)
117
118 lastCRL, err := ioutil.ReadFile(cfg.PeerTLSInfo.CRLFile)
119 if err != nil {
120 return fmt.Errorf("failed to read etcd CRL: %w", err)
121 }
122 s.lastCRL = lastCRL
123
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200124 // Reset LCUrls because we don't want to expose any client
125 cfg.LCUrls = nil
126
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100127 apURL, err := url.Parse(fmt.Sprintf("https://%s:%d", s.config.ExternalHost, s.config.ListenPort))
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200128 if err != nil {
129 return errors.Wrap(err, "invalid external_host or listen_port")
130 }
131
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100132 lpURL, err := url.Parse(fmt.Sprintf("https://%s:%d", s.config.ListenHost, s.config.ListenPort))
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200133 if err != nil {
134 return errors.Wrap(err, "invalid listen_host or listen_port")
135 }
136 cfg.APUrls = []url.URL{*apURL}
137 cfg.LPUrls = []url.URL{*lpURL}
138 cfg.ACUrls = []url.URL{}
139
140 cfg.Dir = s.config.DataDir
141 cfg.InitialClusterToken = DefaultClusterToken
142 cfg.Name = s.config.Name
143
144 // Only relevant if creating or joining a cluster; otherwise settings will be ignored
145 if s.config.NewCluster {
146 cfg.ClusterState = "new"
147 cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
148 } else if s.config.InitialCluster != "" {
149 cfg.ClusterState = "existing"
150 cfg.InitialCluster = s.config.InitialCluster
151 }
152
153 cfg.Logger = DefaultLogger
154
155 server, err := embed.StartEtcd(cfg)
156 if err != nil {
157 return err
158 }
159 s.etcd = server
160
161 // Override the logger
162 //*server.GetLogger() = *s.Logger.With(zap.String("component", "etcd"))
163
164 go func() {
165 s.Logger.Info("waiting for etcd to become ready")
166 <-s.etcd.Server.ReadyNotify()
167 s.ready = true
168 s.Logger.Info("etcd is now ready")
169 }()
170
171 // Inject kv client
172 s.kv = clientv3.NewKVFromKVClient(adapter.KvServerToKvClient(s.etcd.Server), nil)
173
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100174 // Start CRL watcher
175 go s.watchCRL()
176
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200177 return nil
178}
179
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100180func (s *Service) SetupCertificates(certs *api.ConsensusCertificates) error {
181 if err := ioutil.WriteFile(filepath.Join(s.config.DataDir, CRLPath), certs.Crl, 0600); err != nil {
182 return err
183 }
184 if err := ioutil.WriteFile(filepath.Join(s.config.DataDir, CertPath),
185 pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: certs.Cert}), 0600); err != nil {
186 return err
187 }
188 if err := ioutil.WriteFile(filepath.Join(s.config.DataDir, KeyPath),
189 pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: certs.Key}), 0600); err != nil {
190 return err
191 }
192 if err := ioutil.WriteFile(filepath.Join(s.config.DataDir, CAPath),
193 pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: certs.Ca}), 0600); err != nil {
194 return err
195 }
196 return nil
197}
198
199func (s *Service) PrecreateCA() error {
200 // Provision an etcd CA
201 etcdRootCA, err := ca.New("Smalltown etcd Root CA")
202 if err != nil {
203 return err
204 }
205 cert, privkey, err := etcdRootCA.IssueCertificate(s.config.ExternalHost)
206 if err != nil {
207 return fmt.Errorf("failed to self-issue a certificate: %w", err)
208 }
209 if err := os.MkdirAll(s.config.DataDir, 0700); err != nil {
210 return fmt.Errorf("failed to create consensus data dir: %w", err)
211 }
212 // Preserve certificate for later injection
213 s.bootstrapCert = cert
214 if err := s.SetupCertificates(&api.ConsensusCertificates{
215 Ca: etcdRootCA.CACertRaw,
216 Crl: etcdRootCA.CRLRaw,
217 Cert: cert,
218 Key: privkey,
219 }); err != nil {
220 return fmt.Errorf("failed to setup certificates: %w", err)
221 }
222 s.bootstrapCA = etcdRootCA
223 return nil
224}
225
226const (
227 caPathEtcd = "/etcd-ca/ca.der"
228 caKeyPathEtcd = "/etcd-ca/ca-key.der"
229 crlPathEtcd = "/etcd-ca/crl.der"
230 certPrefixEtcd = "/etcd-ca/certs"
231)
232
233func (s *Service) InjectCA() error {
234 if _, err := s.kv.Put(context.Background(), caPathEtcd, string(s.bootstrapCA.CACertRaw)); err != nil {
235 return err
236 }
237 // TODO: Should be wrapped by the master key
238 if _, err := s.kv.Put(context.Background(), caKeyPathEtcd, string([]byte(*s.bootstrapCA.PrivateKey))); err != nil {
239 return err
240 }
241 if _, err := s.kv.Put(context.Background(), crlPathEtcd, string(s.bootstrapCA.CRLRaw)); err != nil {
242 return err
243 }
244 certVal, err := x509.ParseCertificate(s.bootstrapCert)
245 if err != nil {
246 return err
247 }
248 serial := hex.EncodeToString(certVal.SerialNumber.Bytes())
249 if _, err := s.kv.Put(context.Background(), path.Join(certPrefixEtcd, serial), string(s.bootstrapCert)); err != nil {
250 return fmt.Errorf("failed to persist certificate: %w", err)
251 }
252 // Clear out bootstrap CA after injecting
253 s.bootstrapCA = nil
254 s.bootstrapCert = []byte{}
255 return nil
256}
257
258func (s *Service) etcdGetSingle(path string) ([]byte, int64, error) {
259 res, err := s.kv.Get(context.Background(), path)
260 if err != nil {
261 return nil, -1, fmt.Errorf("failed to get key from etcd: %w", err)
262 }
263 if len(res.Kvs) != 1 {
264 return nil, -1, errors.New("key not available")
265 }
266 return res.Kvs[0].Value, res.Kvs[0].ModRevision, nil
267}
268
269func (s *Service) takeCAOnline() (*ca.CA, int64, error) {
270 // TODO: Technically this could be done in a single request, but it's more logic
271 caCert, _, err := s.etcdGetSingle(caPathEtcd)
272 if err != nil {
273 return nil, -1, fmt.Errorf("failed to get CA certificate from etcd: %w", err)
274 }
275 caKey, _, err := s.etcdGetSingle(caKeyPathEtcd)
276 if err != nil {
277 return nil, -1, fmt.Errorf("failed to get CA key from etcd: %w", err)
278 }
279 // TODO: Unwrap CA key once wrapping is implemented
280 crl, crlRevision, err := s.etcdGetSingle(crlPathEtcd)
281 if err != nil {
282 return nil, -1, fmt.Errorf("failed to get CRL from etcd: %w", err)
283 }
284 idCA, err := ca.FromCertificates(caCert, caKey, crl)
285 if err != nil {
286 return nil, -1, fmt.Errorf("failed to take CA online: %w", err)
287 }
288 return idCA, crlRevision, nil
289}
290
291func (s *Service) IssueCertificate(hostname string) (*api.ConsensusCertificates, error) {
292 idCA, _, err := s.takeCAOnline()
293 if err != nil {
294 return nil, err
295 }
296 cert, key, err := idCA.IssueCertificate(hostname)
297 if err != nil {
298 return nil, fmt.Errorf("failed to issue certificate: %w", err)
299 }
300 certVal, err := x509.ParseCertificate(cert)
301 if err != nil {
302 return nil, err
303 }
304 serial := hex.EncodeToString(certVal.SerialNumber.Bytes())
305 if _, err := s.kv.Put(context.Background(), path.Join(certPrefixEtcd, serial), string(cert)); err != nil {
306 return nil, fmt.Errorf("failed to persist certificate: %w", err)
307 }
308 return &api.ConsensusCertificates{
309 Ca: idCA.CACertRaw,
310 Cert: cert,
311 Crl: idCA.CRLRaw,
312 Key: key,
313 }, nil
314}
315
316func (s *Service) RevokeCertificate(hostname string) error {
317 rand.Seed(time.Now().UnixNano())
318 for {
319 idCA, crlRevision, err := s.takeCAOnline()
320 if err != nil {
321 return err
322 }
323 allIssuedCerts, err := s.kv.Get(context.Background(), certPrefixEtcd, clientv3.WithPrefix())
324 for _, cert := range allIssuedCerts.Kvs {
325 certVal, err := x509.ParseCertificate(cert.Value)
326 if err != nil {
327 s.Logger.Error("Failed to parse previously issued certificate, this is a security risk", zap.Error(err))
328 continue
329 }
330 for _, dnsName := range certVal.DNSNames {
331 if dnsName == hostname {
332 // Revoke this
333 if err := idCA.Revoke(certVal.SerialNumber); err != nil {
334 // We need to fail if any single revocation fails otherwise outer applications
335 // have no chance of calling this safely
336 return err
337 }
338 }
339 }
340 }
341 cmp := clientv3.Compare(clientv3.ModRevision(crlPathEtcd), "=", crlRevision)
342 op := clientv3.OpPut(crlPathEtcd, string(idCA.CRLRaw))
343 res, err := s.kv.Txn(context.Background()).If(cmp).Then(op).Commit()
344 if err != nil {
345 return fmt.Errorf("failed to persist new CRL in etcd: %w", err)
346 }
347 if res.Succeeded { // Transaction has succeeded
348 break
349 }
350 // Sleep a random duration between 0 and 300ms to reduce serialization failures
351 time.Sleep(time.Duration(rand.Intn(300)) * time.Millisecond)
352 }
353 return nil
354}
355
356func (s *Service) watchCRL() {
357 // TODO: Change etcd client to WatchableKV and make this an actual watch
358 // This needs changes in more places, so leaving it now
359 s.watchCRLTicker = time.NewTicker(30 * time.Second)
360 for range s.watchCRLTicker.C {
361 crl, _, err := s.etcdGetSingle(crlPathEtcd)
362 if err != nil {
363 s.Logger.Warn("Failed to check for new CRL", zap.Error(err))
364 continue
365 }
366 // This is cryptographic material but not secret, so no constant time compare necessary here
367 if !bytes.Equal(crl, s.lastCRL) {
368 if err := ioutil.WriteFile(filepath.Join(s.config.DataDir, CRLSwapPath), crl, 0600); err != nil {
369 s.Logger.Warn("Failed to write updated CRL", zap.Error(err))
370 }
371 // This uses unix.Rename to guarantee a particular atomic update behavior
372 if err := unix.Rename(filepath.Join(s.config.DataDir, CRLSwapPath), filepath.Join(s.config.DataDir, CRLPath)); err != nil {
373 s.Logger.Warn("Failed to atomically swap updated CRL", zap.Error(err))
374 }
375 }
376 }
377}
378
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200379func (s *Service) OnStop() error {
Lorenz Bruna4ea9d02019-10-31 11:40:30 +0100380 s.watchCRLTicker.Stop()
Hendrik Hofstadt0d7c91e2019-10-23 21:44:47 +0200381 s.etcd.Close()
382
383 return nil
384}
385
386// IsProvisioned returns whether the node has been setup before and etcd has a data directory
387func (s *Service) IsProvisioned() bool {
388 _, err := os.Stat(s.config.DataDir)
389
390 return !os.IsNotExist(err)
391}
392
393// IsReady returns whether etcd is ready and synced
394func (s *Service) IsReady() bool {
395 return s.ready
396}
397
398// AddMember adds a new etcd member to the cluster
399func (s *Service) AddMember(ctx context.Context, name string, url string) (uint64, error) {
400 urls, err := types.NewURLs([]string{url})
401 if err != nil {
402 return 0, err
403 }
404
405 member := membership.NewMember(name, urls, DefaultClusterToken, nil)
406
407 _, err = s.etcd.Server.AddMember(ctx, *member)
408 if err != nil {
409 return 0, err
410 }
411
412 return uint64(member.ID), nil
413}
414
415// RemoveMember removes a member from the etcd cluster
416func (s *Service) RemoveMember(ctx context.Context, id uint64) error {
417 _, err := s.etcd.Server.RemoveMember(ctx, id)
418 return err
419}
420
421// Health returns the current cluster health
422func (s *Service) Health() {
423}
424
425// GetConfig returns the current consensus config
426func (s *Service) GetConfig() Config {
427 return *s.config
428}
429
430// SetConfig sets the consensus config. Changes are only applied when the service is restarted.
431func (s *Service) SetConfig(config Config) {
432 s.config = &config
433}
434
435// GetInitialClusterString returns the InitialCluster string that can be used to bootstrap a consensus node
436func (s *Service) GetInitialClusterString() string {
437 members := s.etcd.Server.Cluster().Members()
438 clusterString := strings.Builder{}
439
440 for i, m := range members {
441 if i != 0 {
442 clusterString.WriteString(",")
443 }
444 clusterString.WriteString(m.Name)
445 clusterString.WriteString("=")
446 clusterString.WriteString(m.PickPeerURL())
447 }
448
449 return clusterString.String()
450}
451
452// GetNodes returns a list of consensus nodes
453func (s *Service) GetNodes() []Member {
454 members := s.etcd.Server.Cluster().Members()
455 cMembers := make([]Member, len(members))
456 for i, m := range members {
457 cMembers[i] = Member{
458 ID: uint64(m.ID),
459 Name: m.Name,
460 Address: m.PickPeerURL(),
461 Synced: !m.IsLearner,
462 }
463 }
464
465 return cMembers
466}
467
468func (s *Service) GetStore(module, space string) clientv3.KV {
469 return namespace.NewKV(s.kv, fmt.Sprintf("%s:%s", module, space))
470}