blob: 17df8f953aa650166a71b68557b57c46ef5e854e [file] [log] [blame]
Serge Bazanskif05e80a2021-10-12 11:53:34 +02001package consensus
2
3import (
4 "context"
5 "crypto/ed25519"
6 "crypto/x509"
7 "fmt"
8 "net"
9 "strconv"
10
Lorenz Brund13c1c62022-03-30 19:58:58 +020011 clientv3 "go.etcd.io/etcd/client/v3"
Serge Bazanskif05e80a2021-10-12 11:53:34 +020012
13 "source.monogon.dev/metropolis/node"
14 "source.monogon.dev/metropolis/node/core/consensus/client"
15 "source.monogon.dev/metropolis/node/core/identity"
16 "source.monogon.dev/metropolis/pkg/event"
17 "source.monogon.dev/metropolis/pkg/pki"
18)
19
Serge Bazanski5839e972021-11-16 15:46:19 +010020// ServiceHandle is implemented by Service and should be the type expected by
21// other code which relies on a Consensus instance. Ie., it's the downstream API
22// for a Consensus Service.
23type ServiceHandle interface {
24 // Watch returns a Event Value compatible Watcher for accessing the State of the
25 // consensus Service in a safe manner.
Serge Bazanski37110c32023-03-01 13:57:27 +000026 Watch() event.Watcher[*Status]
Serge Bazanski5839e972021-11-16 15:46:19 +010027}
28
Serge Bazanski37110c32023-03-01 13:57:27 +000029var FilterRunning = event.Filter(func(st *Status) bool {
30 return st.Running()
31})
Serge Bazanski5839e972021-11-16 15:46:19 +010032
Serge Bazanskif05e80a2021-10-12 11:53:34 +020033// Status of the consensus service. It represents either a running consensus
34// service to which a client can connect and on which management can be
35// performed, or a stopped service.
36type Status struct {
Serge Bazanski5839e972021-11-16 15:46:19 +010037 // localPeerURL and localMemberID are the expected public URL and etcd member ID
38 // of the etcd server wrapped by this consensus instance. If set, a sub-runnable
39 // of the consensus will ensure that the given memberID always has localPeerURL
40 // set as its peer URL.
41 //
42 // These will not be set when the Status has been generated by a
43 // testServiceHandle.
Serge Bazanskif05e80a2021-10-12 11:53:34 +020044 localPeerURL string
45 localMemberID uint64
Serge Bazanski5839e972021-11-16 15:46:19 +010046 // cl is the root etcd client to the underlying cluster.
47 cl *clientv3.Client
48 // ca is the PKI CA used to authenticate etcd members.
49 ca *pki.Certificate
50 // stopped is set to true if the underlying service has been stopped or hasn't
51 // yet been started.
52 stopped bool
Mateusz Zalegabb2edbe2022-06-08 11:57:09 +020053
54 // noClusterMemberManagement disables etcd cluster member management in
55 // UpdateNodeRoles. This is currently necessary in order to test the call,
56 // due to limitations of the test harness.
57 noClusterMemberManagement bool
Serge Bazanskif05e80a2021-10-12 11:53:34 +020058}
59
60// Running returns true if this status represents a running consensus service
61// which can be connected to or managed. These calls are not guaranteed to
62// succeed (as the server might have stopped in the meantime), but the caller
63// can use this value as a hint to whether attempts to access the consensus
64// service should be done.
65func (s *Status) Running() bool {
66 return !s.stopped
67}
68
69func (s *Status) pkiClient() (client.Namespaced, error) {
70 return clientFor(s.cl, "namespaced", "etcd-pki")
71}
72
Serge Bazanski5839e972021-11-16 15:46:19 +010073// CuratorClient returns a namespaced etcd client for use by the Curator.
74func (s *Status) CuratorClient() (client.Namespaced, error) {
75 return clientFor(s.cl, "namespaced", "curator")
76}
77
78// KubernetesClient returns a namespaced etcd client for use by Kubernetes.
79func (s *Status) KubernetesClient() (client.Namespaced, error) {
80 return clientFor(s.cl, "namespaced", "kubernetes")
81}
82
83// ClusterClient returns an etcd management API client, for use by downstream
84// clients that wish to perform maintenance operations on the etcd cluster (eg.
85// list/modify nodes, promote learners, ...).
86func (s *Status) ClusterClient() clientv3.Cluster {
87 return s.cl
Serge Bazanskif05e80a2021-10-12 11:53:34 +020088}
89
90// AddNode creates a new consensus member corresponding to a given Ed25519 node
91// public key if one does not yet exist. The member will at first be marked as a
92// Learner, ensuring it does not take part in quorum until it has finished
93// catching up to the state of the etcd store. As it does, the autopromoter will
94// turn it into a 'full' node and it will start taking part in the quorum and be
95// able to perform all etcd operations.
96func (s *Status) AddNode(ctx context.Context, pk ed25519.PublicKey, opts ...*AddNodeOption) (*JoinCluster, error) {
97 clPKI, err := s.pkiClient()
98 if err != nil {
99 return nil, err
100 }
101
102 nodeID := identity.NodeID(pk)
103 var extraNames []string
104 name := nodeID
105 port := int(node.ConsensusPort)
106 for _, opt := range opts {
107 if opt.externalAddress != "" {
108 name = opt.externalAddress
109 extraNames = append(extraNames, name)
110 }
111 if opt.externalPort != 0 {
112 port = opt.externalPort
113 }
114 }
115
116 member := pki.Certificate{
117 Name: nodeID,
118 Namespace: &pkiNamespace,
119 Issuer: s.ca,
120 Template: pkiPeerCertificate(pk, extraNames),
121 Mode: pki.CertificateExternal,
122 PublicKey: pk,
123 }
124 caBytes, err := s.ca.Ensure(ctx, clPKI)
125 if err != nil {
126 return nil, fmt.Errorf("could not ensure CA certificate: %w", err)
127 }
128 memberBytes, err := member.Ensure(ctx, clPKI)
129 if err != nil {
130 return nil, fmt.Errorf("could not ensure member certificate: %w", err)
131 }
132 caCert, err := x509.ParseCertificate(caBytes)
133 if err != nil {
134 return nil, fmt.Errorf("could not parse CA certificate: %w", err)
135 }
136 memberCert, err := x509.ParseCertificate(memberBytes)
137 if err != nil {
138 return nil, fmt.Errorf("could not parse newly issued member certificate: %w", err)
139 }
140
141 members, err := s.cl.MemberList(ctx)
142 if err != nil {
143 return nil, fmt.Errorf("could not retrieve existing members: %w", err)
144 }
145
146 var existingNodes []ExistingNode
147 var newExists bool
148 for _, m := range members.Members {
149 if m.Name == nodeID {
150 newExists = true
151 }
152 if m.IsLearner {
153 continue
154 }
155 if len(m.PeerURLs) < 1 {
156 continue
157 }
158 existingNodes = append(existingNodes, ExistingNode{
159 Name: m.Name,
160 URL: m.PeerURLs[0],
161 })
162 }
163
164 crlW := s.ca.WatchCRL(clPKI)
Serge Bazanski50f5ec72022-06-21 14:16:56 +0200165 defer crlW.Close()
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200166 crl, err := crlW.Get(ctx)
167 if err != nil {
168 return nil, fmt.Errorf("could not retrieve initial CRL: %w", err)
169 }
170
Mateusz Zalegabb2edbe2022-06-08 11:57:09 +0200171 if !newExists && !s.noClusterMemberManagement {
Serge Bazanskif05e80a2021-10-12 11:53:34 +0200172 addr := fmt.Sprintf("https://%s", net.JoinHostPort(name, strconv.Itoa(port)))
173 if _, err := s.cl.MemberAddAsLearner(ctx, []string{addr}); err != nil {
174 return nil, fmt.Errorf("could not add new member as learner: %w", err)
175 }
176 }
177
178 return &JoinCluster{
179 CACertificate: caCert,
180 NodeCertificate: memberCert,
181 ExistingNodes: existingNodes,
182 InitialCRL: crl,
183 }, nil
184}
185
186// AddNodeOptions can be passed to AddNode to influence the behaviour of the
187// function. Currently this is only used internally by tests.
188type AddNodeOption struct {
189 externalAddress string
190 externalPort int
191}