blob: 5bf04161dc6b33c12b4dc50a870f3b55adb955b2 [file] [log] [blame]
Tim Windelschmidt6d33a432025-02-04 14:34:25 +01001// Copyright The Monogon Project Authors.
2// SPDX-License-Identifier: Apache-2.0
3
Serge Bazanskif05e80a2021-10-12 11:53:34 +02004package consensus
5
6import (
7 "context"
8 "crypto/ed25519"
9 "crypto/x509"
10 "fmt"
11 "net"
12 "strconv"
13
Lorenz Brund13c1c62022-03-30 19:58:58 +020014 clientv3 "go.etcd.io/etcd/client/v3"
Serge Bazanskif05e80a2021-10-12 11:53:34 +020015
Jan Schär0f8ce4c2025-09-04 13:27:50 +020016 "source.monogon.dev/metropolis/node/allocs"
Serge Bazanskif05e80a2021-10-12 11:53:34 +020017 "source.monogon.dev/metropolis/node/core/consensus/client"
Tim Windelschmidt9f21f532024-05-07 15:14:20 +020018 "source.monogon.dev/osbase/event"
19 "source.monogon.dev/osbase/pki"
Serge Bazanskif05e80a2021-10-12 11:53:34 +020020)
21
Serge Bazanski5839e972021-11-16 15:46:19 +010022// ServiceHandle is implemented by Service and should be the type expected by
23// other code which relies on a Consensus instance. Ie., it's the downstream API
24// for a Consensus Service.
25type ServiceHandle interface {
26 // Watch returns a Event Value compatible Watcher for accessing the State of the
27 // consensus Service in a safe manner.
Serge Bazanski37110c32023-03-01 13:57:27 +000028 Watch() event.Watcher[*Status]
Serge Bazanski5839e972021-11-16 15:46:19 +010029}
30
Serge Bazanski37110c32023-03-01 13:57:27 +000031var FilterRunning = event.Filter(func(st *Status) bool {
32 return st.Running()
33})
Serge Bazanski5839e972021-11-16 15:46:19 +010034
Serge Bazanskif05e80a2021-10-12 11:53:34 +020035// Status of the consensus service. It represents either a running consensus
36// service to which a client can connect and on which management can be
37// performed, or a stopped service.
38type Status struct {
Serge Bazanski5839e972021-11-16 15:46:19 +010039 // localPeerURL and localMemberID are the expected public URL and etcd member ID
40 // of the etcd server wrapped by this consensus instance. If set, a sub-runnable
41 // of the consensus will ensure that the given memberID always has localPeerURL
42 // set as its peer URL.
43 //
44 // These will not be set when the Status has been generated by a
45 // testServiceHandle.
Serge Bazanskif05e80a2021-10-12 11:53:34 +020046 localPeerURL string
47 localMemberID uint64
Serge Bazanski5839e972021-11-16 15:46:19 +010048 // cl is the root etcd client to the underlying cluster.
49 cl *clientv3.Client
50 // ca is the PKI CA used to authenticate etcd members.
51 ca *pki.Certificate
52 // stopped is set to true if the underlying service has been stopped or hasn't
53 // yet been started.
54 stopped bool
Mateusz Zalegabb2edbe2022-06-08 11:57:09 +020055
56 // noClusterMemberManagement disables etcd cluster member management in
57 // UpdateNodeRoles. This is currently necessary in order to test the call,
58 // due to limitations of the test harness.
59 noClusterMemberManagement bool
Serge Bazanskif05e80a2021-10-12 11:53:34 +020060}
61
62// Running returns true if this status represents a running consensus service
63// which can be connected to or managed. These calls are not guaranteed to
64// succeed (as the server might have stopped in the meantime), but the caller
65// can use this value as a hint to whether attempts to access the consensus
66// service should be done.
67func (s *Status) Running() bool {
68 return !s.stopped
69}
70
71func (s *Status) pkiClient() (client.Namespaced, error) {
72 return clientFor(s.cl, "namespaced", "etcd-pki")
73}
74
Serge Bazanski5839e972021-11-16 15:46:19 +010075// CuratorClient returns a namespaced etcd client for use by the Curator.
76func (s *Status) CuratorClient() (client.Namespaced, error) {
77 return clientFor(s.cl, "namespaced", "curator")
78}
79
80// KubernetesClient returns a namespaced etcd client for use by Kubernetes.
81func (s *Status) KubernetesClient() (client.Namespaced, error) {
82 return clientFor(s.cl, "namespaced", "kubernetes")
83}
84
85// ClusterClient returns an etcd management API client, for use by downstream
86// clients that wish to perform maintenance operations on the etcd cluster (eg.
87// list/modify nodes, promote learners, ...).
88func (s *Status) ClusterClient() clientv3.Cluster {
89 return s.cl
Serge Bazanskif05e80a2021-10-12 11:53:34 +020090}
91
Jan Schär39d9c242024-09-24 13:49:55 +020092// AddNode creates a new consensus member corresponding to a given node ID
93// if one does not yet exist. The member will at first be marked as a
Serge Bazanskif05e80a2021-10-12 11:53:34 +020094// Learner, ensuring it does not take part in quorum until it has finished
95// catching up to the state of the etcd store. As it does, the autopromoter will
96// turn it into a 'full' node and it will start taking part in the quorum and be
97// able to perform all etcd operations.
Jan Schär39d9c242024-09-24 13:49:55 +020098func (s *Status) AddNode(ctx context.Context, nodeID string, pk ed25519.PublicKey, opts ...*AddNodeOption) (*JoinCluster, error) {
Serge Bazanskif05e80a2021-10-12 11:53:34 +020099 clPKI, err := s.pkiClient()
100 if err != nil {
101 return nil, err
102 }
103
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200104 var extraNames []string
105 name := nodeID
Jan Schär0f8ce4c2025-09-04 13:27:50 +0200106 port := int(allocs.PortConsensus)
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200107 for _, opt := range opts {
108 if opt.externalAddress != "" {
109 name = opt.externalAddress
110 extraNames = append(extraNames, name)
111 }
112 if opt.externalPort != 0 {
113 port = opt.externalPort
114 }
115 }
116
117 member := pki.Certificate{
118 Name: nodeID,
119 Namespace: &pkiNamespace,
120 Issuer: s.ca,
Jan Schär39d9c242024-09-24 13:49:55 +0200121 Template: pkiPeerCertificate(nodeID, extraNames),
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200122 Mode: pki.CertificateExternal,
123 PublicKey: pk,
124 }
125 caBytes, err := s.ca.Ensure(ctx, clPKI)
126 if err != nil {
127 return nil, fmt.Errorf("could not ensure CA certificate: %w", err)
128 }
129 memberBytes, err := member.Ensure(ctx, clPKI)
130 if err != nil {
131 return nil, fmt.Errorf("could not ensure member certificate: %w", err)
132 }
133 caCert, err := x509.ParseCertificate(caBytes)
134 if err != nil {
135 return nil, fmt.Errorf("could not parse CA certificate: %w", err)
136 }
137 memberCert, err := x509.ParseCertificate(memberBytes)
138 if err != nil {
139 return nil, fmt.Errorf("could not parse newly issued member certificate: %w", err)
140 }
141
142 members, err := s.cl.MemberList(ctx)
143 if err != nil {
144 return nil, fmt.Errorf("could not retrieve existing members: %w", err)
145 }
146
147 var existingNodes []ExistingNode
148 var newExists bool
149 for _, m := range members.Members {
Jan Schär442cf682024-09-05 18:28:48 +0200150 if GetEtcdMemberNodeId(m) == nodeID {
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200151 newExists = true
152 }
153 if m.IsLearner {
154 continue
155 }
156 if len(m.PeerURLs) < 1 {
157 continue
158 }
159 existingNodes = append(existingNodes, ExistingNode{
160 Name: m.Name,
161 URL: m.PeerURLs[0],
162 })
163 }
164
165 crlW := s.ca.WatchCRL(clPKI)
Serge Bazanski50f5ec72022-06-21 14:16:56 +0200166 defer crlW.Close()
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200167 crl, err := crlW.Get(ctx)
168 if err != nil {
169 return nil, fmt.Errorf("could not retrieve initial CRL: %w", err)
170 }
171
Mateusz Zalegabb2edbe2022-06-08 11:57:09 +0200172 if !newExists && !s.noClusterMemberManagement {
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200173 addr := fmt.Sprintf("https://%s", net.JoinHostPort(name, strconv.Itoa(port)))
174 if _, err := s.cl.MemberAddAsLearner(ctx, []string{addr}); err != nil {
175 return nil, fmt.Errorf("could not add new member as learner: %w", err)
176 }
177 }
178
179 return &JoinCluster{
180 CACertificate: caCert,
181 NodeCertificate: memberCert,
182 ExistingNodes: existingNodes,
183 InitialCRL: crl,
184 }, nil
185}
186
Tim Windelschmidt8732d432024-04-18 23:20:05 +0200187// AddNodeOption can be passed to AddNode to influence the behaviour of the
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200188// function. Currently this is only used internally by tests.
189type AddNodeOption struct {
190 externalAddress string
191 externalPort int
192}
Jan Schärad8982f2024-09-17 13:56:34 +0200193
194// RemoveNode removes the etcd member with the given node ID, if it is currently
195// a member. Etcd fails this operation if it is not safe to perform.
196func (s *Status) RemoveNode(ctx context.Context, nodeID string) error {
197 members, err := s.cl.MemberList(ctx)
198 if err != nil {
199 return fmt.Errorf("could not retrieve existing members: %w", err)
200 }
201 for _, m := range members.Members {
202 if GetEtcdMemberNodeId(m) == nodeID {
203 _, err := s.cl.MemberRemove(ctx, m.ID)
204 if err != nil {
205 return fmt.Errorf("could not remove member: %w", err)
206 }
207 }
208 }
209 return nil
210}