blob: 40988e50bfe5f08d5ad8c8be0c70f0d4c444fc3b [file] [log] [blame]
Serge Bazanskif05e80a2021-10-12 11:53:34 +02001package consensus
2
3import (
4 "context"
5 "crypto/ed25519"
6 "crypto/x509"
7 "fmt"
8 "net"
9 "strconv"
10
Lorenz Brund13c1c62022-03-30 19:58:58 +020011 clientv3 "go.etcd.io/etcd/client/v3"
Serge Bazanskif05e80a2021-10-12 11:53:34 +020012
13 "source.monogon.dev/metropolis/node"
14 "source.monogon.dev/metropolis/node/core/consensus/client"
Tim Windelschmidt9f21f532024-05-07 15:14:20 +020015 "source.monogon.dev/osbase/event"
16 "source.monogon.dev/osbase/pki"
Serge Bazanskif05e80a2021-10-12 11:53:34 +020017)
18
Serge Bazanski5839e972021-11-16 15:46:19 +010019// ServiceHandle is implemented by Service and should be the type expected by
20// other code which relies on a Consensus instance. Ie., it's the downstream API
21// for a Consensus Service.
22type ServiceHandle interface {
23 // Watch returns a Event Value compatible Watcher for accessing the State of the
24 // consensus Service in a safe manner.
Serge Bazanski37110c32023-03-01 13:57:27 +000025 Watch() event.Watcher[*Status]
Serge Bazanski5839e972021-11-16 15:46:19 +010026}
27
Serge Bazanski37110c32023-03-01 13:57:27 +000028var FilterRunning = event.Filter(func(st *Status) bool {
29 return st.Running()
30})
Serge Bazanski5839e972021-11-16 15:46:19 +010031
Serge Bazanskif05e80a2021-10-12 11:53:34 +020032// Status of the consensus service. It represents either a running consensus
33// service to which a client can connect and on which management can be
34// performed, or a stopped service.
35type Status struct {
Serge Bazanski5839e972021-11-16 15:46:19 +010036 // localPeerURL and localMemberID are the expected public URL and etcd member ID
37 // of the etcd server wrapped by this consensus instance. If set, a sub-runnable
38 // of the consensus will ensure that the given memberID always has localPeerURL
39 // set as its peer URL.
40 //
41 // These will not be set when the Status has been generated by a
42 // testServiceHandle.
Serge Bazanskif05e80a2021-10-12 11:53:34 +020043 localPeerURL string
44 localMemberID uint64
Serge Bazanski5839e972021-11-16 15:46:19 +010045 // cl is the root etcd client to the underlying cluster.
46 cl *clientv3.Client
47 // ca is the PKI CA used to authenticate etcd members.
48 ca *pki.Certificate
49 // stopped is set to true if the underlying service has been stopped or hasn't
50 // yet been started.
51 stopped bool
Mateusz Zalegabb2edbe2022-06-08 11:57:09 +020052
53 // noClusterMemberManagement disables etcd cluster member management in
54 // UpdateNodeRoles. This is currently necessary in order to test the call,
55 // due to limitations of the test harness.
56 noClusterMemberManagement bool
Serge Bazanskif05e80a2021-10-12 11:53:34 +020057}
58
59// Running returns true if this status represents a running consensus service
60// which can be connected to or managed. These calls are not guaranteed to
61// succeed (as the server might have stopped in the meantime), but the caller
62// can use this value as a hint to whether attempts to access the consensus
63// service should be done.
64func (s *Status) Running() bool {
65 return !s.stopped
66}
67
68func (s *Status) pkiClient() (client.Namespaced, error) {
69 return clientFor(s.cl, "namespaced", "etcd-pki")
70}
71
Serge Bazanski5839e972021-11-16 15:46:19 +010072// CuratorClient returns a namespaced etcd client for use by the Curator.
73func (s *Status) CuratorClient() (client.Namespaced, error) {
74 return clientFor(s.cl, "namespaced", "curator")
75}
76
77// KubernetesClient returns a namespaced etcd client for use by Kubernetes.
78func (s *Status) KubernetesClient() (client.Namespaced, error) {
79 return clientFor(s.cl, "namespaced", "kubernetes")
80}
81
82// ClusterClient returns an etcd management API client, for use by downstream
83// clients that wish to perform maintenance operations on the etcd cluster (eg.
84// list/modify nodes, promote learners, ...).
85func (s *Status) ClusterClient() clientv3.Cluster {
86 return s.cl
Serge Bazanskif05e80a2021-10-12 11:53:34 +020087}
88
Jan Schär39d9c242024-09-24 13:49:55 +020089// AddNode creates a new consensus member corresponding to a given node ID
90// if one does not yet exist. The member will at first be marked as a
Serge Bazanskif05e80a2021-10-12 11:53:34 +020091// Learner, ensuring it does not take part in quorum until it has finished
92// catching up to the state of the etcd store. As it does, the autopromoter will
93// turn it into a 'full' node and it will start taking part in the quorum and be
94// able to perform all etcd operations.
Jan Schär39d9c242024-09-24 13:49:55 +020095func (s *Status) AddNode(ctx context.Context, nodeID string, pk ed25519.PublicKey, opts ...*AddNodeOption) (*JoinCluster, error) {
Serge Bazanskif05e80a2021-10-12 11:53:34 +020096 clPKI, err := s.pkiClient()
97 if err != nil {
98 return nil, err
99 }
100
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200101 var extraNames []string
102 name := nodeID
103 port := int(node.ConsensusPort)
104 for _, opt := range opts {
105 if opt.externalAddress != "" {
106 name = opt.externalAddress
107 extraNames = append(extraNames, name)
108 }
109 if opt.externalPort != 0 {
110 port = opt.externalPort
111 }
112 }
113
114 member := pki.Certificate{
115 Name: nodeID,
116 Namespace: &pkiNamespace,
117 Issuer: s.ca,
Jan Schär39d9c242024-09-24 13:49:55 +0200118 Template: pkiPeerCertificate(nodeID, extraNames),
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200119 Mode: pki.CertificateExternal,
120 PublicKey: pk,
121 }
122 caBytes, err := s.ca.Ensure(ctx, clPKI)
123 if err != nil {
124 return nil, fmt.Errorf("could not ensure CA certificate: %w", err)
125 }
126 memberBytes, err := member.Ensure(ctx, clPKI)
127 if err != nil {
128 return nil, fmt.Errorf("could not ensure member certificate: %w", err)
129 }
130 caCert, err := x509.ParseCertificate(caBytes)
131 if err != nil {
132 return nil, fmt.Errorf("could not parse CA certificate: %w", err)
133 }
134 memberCert, err := x509.ParseCertificate(memberBytes)
135 if err != nil {
136 return nil, fmt.Errorf("could not parse newly issued member certificate: %w", err)
137 }
138
139 members, err := s.cl.MemberList(ctx)
140 if err != nil {
141 return nil, fmt.Errorf("could not retrieve existing members: %w", err)
142 }
143
144 var existingNodes []ExistingNode
145 var newExists bool
146 for _, m := range members.Members {
Jan Schär442cf682024-09-05 18:28:48 +0200147 if GetEtcdMemberNodeId(m) == nodeID {
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200148 newExists = true
149 }
150 if m.IsLearner {
151 continue
152 }
153 if len(m.PeerURLs) < 1 {
154 continue
155 }
156 existingNodes = append(existingNodes, ExistingNode{
157 Name: m.Name,
158 URL: m.PeerURLs[0],
159 })
160 }
161
162 crlW := s.ca.WatchCRL(clPKI)
Serge Bazanski50f5ec72022-06-21 14:16:56 +0200163 defer crlW.Close()
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200164 crl, err := crlW.Get(ctx)
165 if err != nil {
166 return nil, fmt.Errorf("could not retrieve initial CRL: %w", err)
167 }
168
Mateusz Zalegabb2edbe2022-06-08 11:57:09 +0200169 if !newExists && !s.noClusterMemberManagement {
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200170 addr := fmt.Sprintf("https://%s", net.JoinHostPort(name, strconv.Itoa(port)))
171 if _, err := s.cl.MemberAddAsLearner(ctx, []string{addr}); err != nil {
172 return nil, fmt.Errorf("could not add new member as learner: %w", err)
173 }
174 }
175
176 return &JoinCluster{
177 CACertificate: caCert,
178 NodeCertificate: memberCert,
179 ExistingNodes: existingNodes,
180 InitialCRL: crl,
181 }, nil
182}
183
Tim Windelschmidt8732d432024-04-18 23:20:05 +0200184// AddNodeOption can be passed to AddNode to influence the behaviour of the
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200185// function. Currently this is only used internally by tests.
186type AddNodeOption struct {
187 externalAddress string
188 externalPort int
189}
Jan Schärad8982f2024-09-17 13:56:34 +0200190
191// RemoveNode removes the etcd member with the given node ID, if it is currently
192// a member. Etcd fails this operation if it is not safe to perform.
193func (s *Status) RemoveNode(ctx context.Context, nodeID string) error {
194 members, err := s.cl.MemberList(ctx)
195 if err != nil {
196 return fmt.Errorf("could not retrieve existing members: %w", err)
197 }
198 for _, m := range members.Members {
199 if GetEtcdMemberNodeId(m) == nodeID {
200 _, err := s.cl.MemberRemove(ctx, m.ID)
201 if err != nil {
202 return fmt.Errorf("could not remove member: %w", err)
203 }
204 }
205 }
206 return nil
207}