blob: 992d0ac856d42bda564b78ebc2939e1880fb2a78 [file] [log] [blame]
Serge Bazanskif05e80a2021-10-12 11:53:34 +02001package consensus
2
3import (
4 "context"
5 "crypto/ed25519"
6 "crypto/x509"
7 "fmt"
8 "net"
9 "strconv"
10
Lorenz Brund13c1c62022-03-30 19:58:58 +020011 clientv3 "go.etcd.io/etcd/client/v3"
Serge Bazanskif05e80a2021-10-12 11:53:34 +020012
13 "source.monogon.dev/metropolis/node"
14 "source.monogon.dev/metropolis/node/core/consensus/client"
15 "source.monogon.dev/metropolis/node/core/identity"
16 "source.monogon.dev/metropolis/pkg/event"
17 "source.monogon.dev/metropolis/pkg/pki"
18)
19
Serge Bazanski5839e972021-11-16 15:46:19 +010020// ServiceHandle is implemented by Service and should be the type expected by
21// other code which relies on a Consensus instance. Ie., it's the downstream API
22// for a Consensus Service.
23type ServiceHandle interface {
24 // Watch returns a Event Value compatible Watcher for accessing the State of the
25 // consensus Service in a safe manner.
26 Watch() Watcher
27}
28
Serge Bazanskif05e80a2021-10-12 11:53:34 +020029// Watch returns a Event Value compatible Watcher for accessing the State of the
30// consensus Service in a safe manner.
31func (s *Service) Watch() Watcher {
32 return Watcher{s.value.Watch()}
33}
34
35type Watcher struct {
36 event.Watcher
37}
38
39func (w *Watcher) Get(ctx context.Context, opts ...event.GetOption) (*Status, error) {
40 v, err := w.Watcher.Get(ctx, opts...)
41 if err != nil {
42 return nil, err
43 }
44 return v.(*Status), nil
45}
46
Serge Bazanski5839e972021-11-16 15:46:19 +010047func (w *Watcher) GetRunning(ctx context.Context) (*Status, error) {
48 for {
49 st, err := w.Get(ctx)
50 if err != nil {
51 return nil, err
52 }
53 if st.Running() {
54 return st, nil
55 }
56 }
57}
58
Serge Bazanskif05e80a2021-10-12 11:53:34 +020059// Status of the consensus service. It represents either a running consensus
60// service to which a client can connect and on which management can be
61// performed, or a stopped service.
62type Status struct {
Serge Bazanski5839e972021-11-16 15:46:19 +010063 // localPeerURL and localMemberID are the expected public URL and etcd member ID
64 // of the etcd server wrapped by this consensus instance. If set, a sub-runnable
65 // of the consensus will ensure that the given memberID always has localPeerURL
66 // set as its peer URL.
67 //
68 // These will not be set when the Status has been generated by a
69 // testServiceHandle.
Serge Bazanskif05e80a2021-10-12 11:53:34 +020070 localPeerURL string
71 localMemberID uint64
Serge Bazanski5839e972021-11-16 15:46:19 +010072 // cl is the root etcd client to the underlying cluster.
73 cl *clientv3.Client
74 // ca is the PKI CA used to authenticate etcd members.
75 ca *pki.Certificate
76 // stopped is set to true if the underlying service has been stopped or hasn't
77 // yet been started.
78 stopped bool
Mateusz Zalegabb2edbe2022-06-08 11:57:09 +020079
80 // noClusterMemberManagement disables etcd cluster member management in
81 // UpdateNodeRoles. This is currently necessary in order to test the call,
82 // due to limitations of the test harness.
83 noClusterMemberManagement bool
Serge Bazanskif05e80a2021-10-12 11:53:34 +020084}
85
86// Running returns true if this status represents a running consensus service
87// which can be connected to or managed. These calls are not guaranteed to
88// succeed (as the server might have stopped in the meantime), but the caller
89// can use this value as a hint to whether attempts to access the consensus
90// service should be done.
91func (s *Status) Running() bool {
92 return !s.stopped
93}
94
95func (s *Status) pkiClient() (client.Namespaced, error) {
96 return clientFor(s.cl, "namespaced", "etcd-pki")
97}
98
Serge Bazanski5839e972021-11-16 15:46:19 +010099// CuratorClient returns a namespaced etcd client for use by the Curator.
100func (s *Status) CuratorClient() (client.Namespaced, error) {
101 return clientFor(s.cl, "namespaced", "curator")
102}
103
104// KubernetesClient returns a namespaced etcd client for use by Kubernetes.
105func (s *Status) KubernetesClient() (client.Namespaced, error) {
106 return clientFor(s.cl, "namespaced", "kubernetes")
107}
108
109// ClusterClient returns an etcd management API client, for use by downstream
110// clients that wish to perform maintenance operations on the etcd cluster (eg.
111// list/modify nodes, promote learners, ...).
112func (s *Status) ClusterClient() clientv3.Cluster {
113 return s.cl
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200114}
115
116// AddNode creates a new consensus member corresponding to a given Ed25519 node
117// public key if one does not yet exist. The member will at first be marked as a
118// Learner, ensuring it does not take part in quorum until it has finished
119// catching up to the state of the etcd store. As it does, the autopromoter will
120// turn it into a 'full' node and it will start taking part in the quorum and be
121// able to perform all etcd operations.
122func (s *Status) AddNode(ctx context.Context, pk ed25519.PublicKey, opts ...*AddNodeOption) (*JoinCluster, error) {
123 clPKI, err := s.pkiClient()
124 if err != nil {
125 return nil, err
126 }
127
128 nodeID := identity.NodeID(pk)
129 var extraNames []string
130 name := nodeID
131 port := int(node.ConsensusPort)
132 for _, opt := range opts {
133 if opt.externalAddress != "" {
134 name = opt.externalAddress
135 extraNames = append(extraNames, name)
136 }
137 if opt.externalPort != 0 {
138 port = opt.externalPort
139 }
140 }
141
142 member := pki.Certificate{
143 Name: nodeID,
144 Namespace: &pkiNamespace,
145 Issuer: s.ca,
146 Template: pkiPeerCertificate(pk, extraNames),
147 Mode: pki.CertificateExternal,
148 PublicKey: pk,
149 }
150 caBytes, err := s.ca.Ensure(ctx, clPKI)
151 if err != nil {
152 return nil, fmt.Errorf("could not ensure CA certificate: %w", err)
153 }
154 memberBytes, err := member.Ensure(ctx, clPKI)
155 if err != nil {
156 return nil, fmt.Errorf("could not ensure member certificate: %w", err)
157 }
158 caCert, err := x509.ParseCertificate(caBytes)
159 if err != nil {
160 return nil, fmt.Errorf("could not parse CA certificate: %w", err)
161 }
162 memberCert, err := x509.ParseCertificate(memberBytes)
163 if err != nil {
164 return nil, fmt.Errorf("could not parse newly issued member certificate: %w", err)
165 }
166
167 members, err := s.cl.MemberList(ctx)
168 if err != nil {
169 return nil, fmt.Errorf("could not retrieve existing members: %w", err)
170 }
171
172 var existingNodes []ExistingNode
173 var newExists bool
174 for _, m := range members.Members {
175 if m.Name == nodeID {
176 newExists = true
177 }
178 if m.IsLearner {
179 continue
180 }
181 if len(m.PeerURLs) < 1 {
182 continue
183 }
184 existingNodes = append(existingNodes, ExistingNode{
185 Name: m.Name,
186 URL: m.PeerURLs[0],
187 })
188 }
189
190 crlW := s.ca.WatchCRL(clPKI)
Serge Bazanski50f5ec72022-06-21 14:16:56 +0200191 defer crlW.Close()
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200192 crl, err := crlW.Get(ctx)
193 if err != nil {
194 return nil, fmt.Errorf("could not retrieve initial CRL: %w", err)
195 }
196
Mateusz Zalegabb2edbe2022-06-08 11:57:09 +0200197 if !newExists && !s.noClusterMemberManagement {
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200198 addr := fmt.Sprintf("https://%s", net.JoinHostPort(name, strconv.Itoa(port)))
199 if _, err := s.cl.MemberAddAsLearner(ctx, []string{addr}); err != nil {
200 return nil, fmt.Errorf("could not add new member as learner: %w", err)
201 }
202 }
203
204 return &JoinCluster{
205 CACertificate: caCert,
206 NodeCertificate: memberCert,
207 ExistingNodes: existingNodes,
208 InitialCRL: crl,
209 }, nil
210}
211
212// AddNodeOptions can be passed to AddNode to influence the behaviour of the
213// function. Currently this is only used internally by tests.
214type AddNodeOption struct {
215 externalAddress string
216 externalPort int
217}