blob: 04b13fafec86d4b3f65304d224563482348d7e89 [file] [log] [blame]
Serge Bazanskif05e80a2021-10-12 11:53:34 +02001package consensus
2
3import (
4 "context"
5 "crypto/ed25519"
6 "crypto/x509"
7 "fmt"
8 "net"
9 "strconv"
10
11 "go.etcd.io/etcd/clientv3"
12
13 "source.monogon.dev/metropolis/node"
14 "source.monogon.dev/metropolis/node/core/consensus/client"
15 "source.monogon.dev/metropolis/node/core/identity"
16 "source.monogon.dev/metropolis/pkg/event"
17 "source.monogon.dev/metropolis/pkg/pki"
18)
19
20// Watch returns a Event Value compatible Watcher for accessing the State of the
21// consensus Service in a safe manner.
22func (s *Service) Watch() Watcher {
23 return Watcher{s.value.Watch()}
24}
25
26type Watcher struct {
27 event.Watcher
28}
29
30func (w *Watcher) Get(ctx context.Context, opts ...event.GetOption) (*Status, error) {
31 v, err := w.Watcher.Get(ctx, opts...)
32 if err != nil {
33 return nil, err
34 }
35 return v.(*Status), nil
36}
37
38// Status of the consensus service. It represents either a running consensus
39// service to which a client can connect and on which management can be
40// performed, or a stopped service.
41type Status struct {
42 localPeerURL string
43 localMemberID uint64
44 cl *clientv3.Client
45 ca *pki.Certificate
46 stopped bool
47}
48
49// Running returns true if this status represents a running consensus service
50// which can be connected to or managed. These calls are not guaranteed to
51// succeed (as the server might have stopped in the meantime), but the caller
52// can use this value as a hint to whether attempts to access the consensus
53// service should be done.
54func (s *Status) Running() bool {
55 return !s.stopped
56}
57
58func (s *Status) pkiClient() (client.Namespaced, error) {
59 return clientFor(s.cl, "namespaced", "etcd-pki")
60}
61
62// MetropolisClient returns a namespaced etcd client for use by the rest of the
63// metropolis code (thtough the cluster bootstrap code). This method is
64// deprecated, and will be replaced with more granular clients as the cluster
65// bootstrap code gets refactored.
66func (s *Status) MetropolisClient() (client.Namespaced, error) {
67 return clientFor(s.cl, "namespaced", "metropolis")
68}
69
70// AddNode creates a new consensus member corresponding to a given Ed25519 node
71// public key if one does not yet exist. The member will at first be marked as a
72// Learner, ensuring it does not take part in quorum until it has finished
73// catching up to the state of the etcd store. As it does, the autopromoter will
74// turn it into a 'full' node and it will start taking part in the quorum and be
75// able to perform all etcd operations.
76func (s *Status) AddNode(ctx context.Context, pk ed25519.PublicKey, opts ...*AddNodeOption) (*JoinCluster, error) {
77 clPKI, err := s.pkiClient()
78 if err != nil {
79 return nil, err
80 }
81
82 nodeID := identity.NodeID(pk)
83 var extraNames []string
84 name := nodeID
85 port := int(node.ConsensusPort)
86 for _, opt := range opts {
87 if opt.externalAddress != "" {
88 name = opt.externalAddress
89 extraNames = append(extraNames, name)
90 }
91 if opt.externalPort != 0 {
92 port = opt.externalPort
93 }
94 }
95
96 member := pki.Certificate{
97 Name: nodeID,
98 Namespace: &pkiNamespace,
99 Issuer: s.ca,
100 Template: pkiPeerCertificate(pk, extraNames),
101 Mode: pki.CertificateExternal,
102 PublicKey: pk,
103 }
104 caBytes, err := s.ca.Ensure(ctx, clPKI)
105 if err != nil {
106 return nil, fmt.Errorf("could not ensure CA certificate: %w", err)
107 }
108 memberBytes, err := member.Ensure(ctx, clPKI)
109 if err != nil {
110 return nil, fmt.Errorf("could not ensure member certificate: %w", err)
111 }
112 caCert, err := x509.ParseCertificate(caBytes)
113 if err != nil {
114 return nil, fmt.Errorf("could not parse CA certificate: %w", err)
115 }
116 memberCert, err := x509.ParseCertificate(memberBytes)
117 if err != nil {
118 return nil, fmt.Errorf("could not parse newly issued member certificate: %w", err)
119 }
120
121 members, err := s.cl.MemberList(ctx)
122 if err != nil {
123 return nil, fmt.Errorf("could not retrieve existing members: %w", err)
124 }
125
126 var existingNodes []ExistingNode
127 var newExists bool
128 for _, m := range members.Members {
129 if m.Name == nodeID {
130 newExists = true
131 }
132 if m.IsLearner {
133 continue
134 }
135 if len(m.PeerURLs) < 1 {
136 continue
137 }
138 existingNodes = append(existingNodes, ExistingNode{
139 Name: m.Name,
140 URL: m.PeerURLs[0],
141 })
142 }
143
144 crlW := s.ca.WatchCRL(clPKI)
145 crl, err := crlW.Get(ctx)
146 if err != nil {
147 return nil, fmt.Errorf("could not retrieve initial CRL: %w", err)
148 }
149
150 if !newExists {
151 addr := fmt.Sprintf("https://%s", net.JoinHostPort(name, strconv.Itoa(port)))
152 if _, err := s.cl.MemberAddAsLearner(ctx, []string{addr}); err != nil {
153 return nil, fmt.Errorf("could not add new member as learner: %w", err)
154 }
155 }
156
157 return &JoinCluster{
158 CACertificate: caCert,
159 NodeCertificate: memberCert,
160 ExistingNodes: existingNodes,
161 InitialCRL: crl,
162 }, nil
163}
164
165// AddNodeOptions can be passed to AddNode to influence the behaviour of the
166// function. Currently this is only used internally by tests.
167type AddNodeOption struct {
168 externalAddress string
169 externalPort int
170}