blob: e2b15f8ab9319c78c62d93f777ab38bf3284f937 [file] [log] [blame]
Serge Bazanskif05e80a2021-10-12 11:53:34 +02001package consensus
2
3import (
4 "context"
5 "crypto/ed25519"
6 "crypto/x509"
7 "fmt"
8 "net"
9 "strconv"
10
Lorenz Brund13c1c62022-03-30 19:58:58 +020011 clientv3 "go.etcd.io/etcd/client/v3"
Serge Bazanskif05e80a2021-10-12 11:53:34 +020012
13 "source.monogon.dev/metropolis/node"
14 "source.monogon.dev/metropolis/node/core/consensus/client"
15 "source.monogon.dev/metropolis/node/core/identity"
16 "source.monogon.dev/metropolis/pkg/event"
17 "source.monogon.dev/metropolis/pkg/pki"
18)
19
Serge Bazanski5839e972021-11-16 15:46:19 +010020// ServiceHandle is implemented by Service and should be the type expected by
21// other code which relies on a Consensus instance. Ie., it's the downstream API
22// for a Consensus Service.
23type ServiceHandle interface {
24 // Watch returns a Event Value compatible Watcher for accessing the State of the
25 // consensus Service in a safe manner.
26 Watch() Watcher
27}
28
Serge Bazanskif05e80a2021-10-12 11:53:34 +020029// Watch returns a Event Value compatible Watcher for accessing the State of the
30// consensus Service in a safe manner.
31func (s *Service) Watch() Watcher {
32 return Watcher{s.value.Watch()}
33}
34
35type Watcher struct {
36 event.Watcher
37}
38
39func (w *Watcher) Get(ctx context.Context, opts ...event.GetOption) (*Status, error) {
40 v, err := w.Watcher.Get(ctx, opts...)
41 if err != nil {
42 return nil, err
43 }
44 return v.(*Status), nil
45}
46
Serge Bazanski5839e972021-11-16 15:46:19 +010047func (w *Watcher) GetRunning(ctx context.Context) (*Status, error) {
48 for {
49 st, err := w.Get(ctx)
50 if err != nil {
51 return nil, err
52 }
53 if st.Running() {
54 return st, nil
55 }
56 }
57}
58
Serge Bazanskif05e80a2021-10-12 11:53:34 +020059// Status of the consensus service. It represents either a running consensus
60// service to which a client can connect and on which management can be
61// performed, or a stopped service.
62type Status struct {
Serge Bazanski5839e972021-11-16 15:46:19 +010063 // localPeerURL and localMemberID are the expected public URL and etcd member ID
64 // of the etcd server wrapped by this consensus instance. If set, a sub-runnable
65 // of the consensus will ensure that the given memberID always has localPeerURL
66 // set as its peer URL.
67 //
68 // These will not be set when the Status has been generated by a
69 // testServiceHandle.
Serge Bazanskif05e80a2021-10-12 11:53:34 +020070 localPeerURL string
71 localMemberID uint64
Serge Bazanski5839e972021-11-16 15:46:19 +010072 // cl is the root etcd client to the underlying cluster.
73 cl *clientv3.Client
74 // ca is the PKI CA used to authenticate etcd members.
75 ca *pki.Certificate
76 // stopped is set to true if the underlying service has been stopped or hasn't
77 // yet been started.
78 stopped bool
Serge Bazanskif05e80a2021-10-12 11:53:34 +020079}
80
81// Running returns true if this status represents a running consensus service
82// which can be connected to or managed. These calls are not guaranteed to
83// succeed (as the server might have stopped in the meantime), but the caller
84// can use this value as a hint to whether attempts to access the consensus
85// service should be done.
86func (s *Status) Running() bool {
87 return !s.stopped
88}
89
90func (s *Status) pkiClient() (client.Namespaced, error) {
91 return clientFor(s.cl, "namespaced", "etcd-pki")
92}
93
Serge Bazanski5839e972021-11-16 15:46:19 +010094// CuratorClient returns a namespaced etcd client for use by the Curator.
95func (s *Status) CuratorClient() (client.Namespaced, error) {
96 return clientFor(s.cl, "namespaced", "curator")
97}
98
99// KubernetesClient returns a namespaced etcd client for use by Kubernetes.
100func (s *Status) KubernetesClient() (client.Namespaced, error) {
101 return clientFor(s.cl, "namespaced", "kubernetes")
102}
103
104// ClusterClient returns an etcd management API client, for use by downstream
105// clients that wish to perform maintenance operations on the etcd cluster (eg.
106// list/modify nodes, promote learners, ...).
107func (s *Status) ClusterClient() clientv3.Cluster {
108 return s.cl
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200109}
110
111// AddNode creates a new consensus member corresponding to a given Ed25519 node
112// public key if one does not yet exist. The member will at first be marked as a
113// Learner, ensuring it does not take part in quorum until it has finished
114// catching up to the state of the etcd store. As it does, the autopromoter will
115// turn it into a 'full' node and it will start taking part in the quorum and be
116// able to perform all etcd operations.
117func (s *Status) AddNode(ctx context.Context, pk ed25519.PublicKey, opts ...*AddNodeOption) (*JoinCluster, error) {
118 clPKI, err := s.pkiClient()
119 if err != nil {
120 return nil, err
121 }
122
123 nodeID := identity.NodeID(pk)
124 var extraNames []string
125 name := nodeID
126 port := int(node.ConsensusPort)
127 for _, opt := range opts {
128 if opt.externalAddress != "" {
129 name = opt.externalAddress
130 extraNames = append(extraNames, name)
131 }
132 if opt.externalPort != 0 {
133 port = opt.externalPort
134 }
135 }
136
137 member := pki.Certificate{
138 Name: nodeID,
139 Namespace: &pkiNamespace,
140 Issuer: s.ca,
141 Template: pkiPeerCertificate(pk, extraNames),
142 Mode: pki.CertificateExternal,
143 PublicKey: pk,
144 }
145 caBytes, err := s.ca.Ensure(ctx, clPKI)
146 if err != nil {
147 return nil, fmt.Errorf("could not ensure CA certificate: %w", err)
148 }
149 memberBytes, err := member.Ensure(ctx, clPKI)
150 if err != nil {
151 return nil, fmt.Errorf("could not ensure member certificate: %w", err)
152 }
153 caCert, err := x509.ParseCertificate(caBytes)
154 if err != nil {
155 return nil, fmt.Errorf("could not parse CA certificate: %w", err)
156 }
157 memberCert, err := x509.ParseCertificate(memberBytes)
158 if err != nil {
159 return nil, fmt.Errorf("could not parse newly issued member certificate: %w", err)
160 }
161
162 members, err := s.cl.MemberList(ctx)
163 if err != nil {
164 return nil, fmt.Errorf("could not retrieve existing members: %w", err)
165 }
166
167 var existingNodes []ExistingNode
168 var newExists bool
169 for _, m := range members.Members {
170 if m.Name == nodeID {
171 newExists = true
172 }
173 if m.IsLearner {
174 continue
175 }
176 if len(m.PeerURLs) < 1 {
177 continue
178 }
179 existingNodes = append(existingNodes, ExistingNode{
180 Name: m.Name,
181 URL: m.PeerURLs[0],
182 })
183 }
184
185 crlW := s.ca.WatchCRL(clPKI)
186 crl, err := crlW.Get(ctx)
187 if err != nil {
188 return nil, fmt.Errorf("could not retrieve initial CRL: %w", err)
189 }
190
191 if !newExists {
192 addr := fmt.Sprintf("https://%s", net.JoinHostPort(name, strconv.Itoa(port)))
193 if _, err := s.cl.MemberAddAsLearner(ctx, []string{addr}); err != nil {
194 return nil, fmt.Errorf("could not add new member as learner: %w", err)
195 }
196 }
197
198 return &JoinCluster{
199 CACertificate: caCert,
200 NodeCertificate: memberCert,
201 ExistingNodes: existingNodes,
202 InitialCRL: crl,
203 }, nil
204}
205
206// AddNodeOptions can be passed to AddNode to influence the behaviour of the
207// function. Currently this is only used internally by tests.
208type AddNodeOption struct {
209 externalAddress string
210 externalPort int
211}