blob: 6bb87f45002c6d174f4d65d4fb0914483cda16f2 [file] [log] [blame]
Serge Bazanski1ebd1e12020-07-13 19:17:16 +02001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package cluster
18
19import (
20 "context"
Serge Bazanski57b43752020-07-13 19:17:48 +020021 "crypto/x509"
22 "encoding/pem"
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020023 "fmt"
24 "io/ioutil"
25 "os"
Serge Bazanski57b43752020-07-13 19:17:48 +020026 "strings"
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020027 "sync"
28 "time"
29
30 "github.com/cenkalti/backoff/v4"
Serge Bazanski57b43752020-07-13 19:17:48 +020031 "github.com/golang/protobuf/proto"
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020032 "go.etcd.io/etcd/clientv3"
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020033
Serge Bazanski77cb6c52020-12-19 00:09:22 +010034 common "git.monogon.dev/source/nexantic.git/metropolis/node"
35 "git.monogon.dev/source/nexantic.git/metropolis/node/common/supervisor"
36 "git.monogon.dev/source/nexantic.git/metropolis/node/core/consensus"
37 "git.monogon.dev/source/nexantic.git/metropolis/node/core/localstorage"
38 "git.monogon.dev/source/nexantic.git/metropolis/node/core/localstorage/declarative"
39 "git.monogon.dev/source/nexantic.git/metropolis/node/core/network"
40 apb "git.monogon.dev/source/nexantic.git/metropolis/proto/api"
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020041)
42
43// Manager is a finite state machine that joins this node (ie., Smalltown instance running on a virtual/physical machine)
44// into a Smalltown cluster (ie. group of nodes that act as a single control plane for Smalltown services). It does that
45// by bringing up all required operating-system level components, including mounting the local filesystem, bringing up
46// a consensus (etcd) server/client, ...
47//
48// The Manager runs as a single-shot Runnable. It will attempt to progress its state from the initial state (New) to
49// either Running (meaning that the node is now part of a cluster), or Failed (meaning that the node couldn't become
50// part of a cluster). It is not restartable, as it mutates quite a bit of implicit operating-system level state (like
51// filesystem mounts). As such, it's difficult to recover reliably from failures, and since these failures indicate
52// some high issues with the cluster configuration/state, a failure requires a full kernel reboot to retry (or fix/
53// reconfigure the node).
54//
55// Currently, the Manager only supports one flow for bringing up a Node: by creating a new cluster. As such, it's
56// missing the following flows:
57// - joining a new node into an already running cluster
58// - restarting a node into an already existing cluster
59// - restarting a node into an already running cluster (ie. full reboot of whole cluster)
60//
61type Manager struct {
62 storageRoot *localstorage.Root
63 networkService *network.Service
64
65 // stateLock locks all state* variables.
66 stateLock sync.RWMutex
67 // state is the FSM state of the Manager.
68 state State
69 // stateRunningNode is the Node that this Manager got from joining a cluster. It's only valid if the Manager is
70 // Running.
71 stateRunningNode *Node
72 // stateWaiters is a list of channels that wish to be notified (by sending true or false) for when the Manager
73 // reaches a final state (Running or Failed respectively).
74 stateWaiters []chan bool
75
Serge Bazanski57b43752020-07-13 19:17:48 +020076 // goldenTicket is the Golden Ticket present in the enrolment config, if any.
77 goldenTicket *apb.GoldenTicket
78
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020079 // consensus is the spawned etcd/consensus service, if the Manager brought up a Node that should run one.
80 consensus *consensus.Service
81}
82
83// NewManager creates a new cluster Manager. The given localstorage Root must be places, but not yet started (and will
84// be started as the Manager makes progress). The given network Service must already be running.
85func NewManager(storageRoot *localstorage.Root, networkService *network.Service) *Manager {
86 return &Manager{
87 storageRoot: storageRoot,
88 networkService: networkService,
89 }
90}
91
92// State is the state of the Manager finite state machine.
93type State int
94
95const (
96 // StateNew is the initial state of the Manager. It decides how to go about joining or creating a cluster.
97 StateNew State = iota
98 // StateCreatingCluster is when the Manager attempts to create a new cluster - this happens when a node is started
99 // with no EnrolmentConfig.
100 StateCreatingCluster
Serge Bazanski57b43752020-07-13 19:17:48 +0200101 // StateCharlie is when the Manager uses the Golden Ticket debug/stopgap system to join an already
102 // existing cluster. This mechanism will be removed before the first Smalltown release.
103 StateCharlie
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200104 // StateRunning is when the Manager successfully got the node to be part of a cluster. stateRunningNode is valid.
105 StateRunning
106 // StateFailed is when the Manager failed to ge the node to be part of a cluster.
107 StateFailed
108)
109
110func (s State) String() string {
111 switch s {
112 case StateNew:
113 return "New"
114 case StateCreatingCluster:
115 return "CreatingCluster"
Serge Bazanski57b43752020-07-13 19:17:48 +0200116 case StateCharlie:
117 return "Charlie"
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200118 case StateRunning:
119 return "Running"
120 case StateFailed:
121 return "Failed"
122 default:
123 return "UNKNOWN"
124 }
125}
126
127// allowedTransition describes all allowed state transitions (map[From][]To).
128var allowedTransitions = map[State][]State{
Serge Bazanski57b43752020-07-13 19:17:48 +0200129 StateNew: {StateCreatingCluster, StateCharlie},
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200130 StateCreatingCluster: {StateRunning, StateFailed},
Serge Bazanski57b43752020-07-13 19:17:48 +0200131 StateCharlie: {StateRunning, StateFailed},
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200132}
133
134// allowed returns whether a transition from a state to another state is allowed (ie. is defined in allowedTransitions).
135func (m *Manager) allowed(from, to State) bool {
136 for _, allowed := range allowedTransitions[from] {
137 if to == allowed {
138 return true
139 }
140 }
141 return false
142}
143
144// next moves the Manager finite state machine from its current state to `n`, or to Failed if the transition is not
145// allowed.
146func (m *Manager) next(ctx context.Context, n State) {
147 m.stateLock.Lock()
148 defer m.stateLock.Unlock()
149
150 if !m.allowed(m.state, n) {
Serge Bazanskic7359672020-10-30 16:38:57 +0100151 supervisor.Logger(ctx).Errorf("Attempted invalid enrolment state transition, failing enrolment; from: %s, to: %s",
152 m.state.String(), n.String())
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200153 m.state = StateFailed
154 return
155 }
156
Serge Bazanskic7359672020-10-30 16:38:57 +0100157 supervisor.Logger(ctx).Infof("Enrolment state change; from: %s, to: %s", m.state.String(), n.String())
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200158
159 m.state = n
160}
161
162// State returns the state of the Manager. It's safe to call this from any goroutine.
163func (m *Manager) State() State {
164 m.stateLock.RLock()
165 defer m.stateLock.RUnlock()
166 return m.state
167}
168
169// WaitFinished waits until the Manager FSM reaches Running or Failed, and returns true if the FSM is Running. It's
170// safe to call this from any goroutine.
171func (m *Manager) WaitFinished() (success bool) {
172 m.stateLock.Lock()
173 switch m.state {
174 case StateFailed:
175 m.stateLock.Unlock()
176 return false
177 case StateRunning:
178 m.stateLock.Unlock()
179 return true
180 }
181
182 C := make(chan bool)
183 m.stateWaiters = append(m.stateWaiters, C)
184 m.stateLock.Unlock()
185 return <-C
186}
187
188// wakeWaiters wakes any WaitFinished waiters and lets them know about the current state of the Manager.
189// The stateLock must already been taken, and the state must have been set in the same critical section (otherwise
190// this can cause a race condition).
191func (m *Manager) wakeWaiters() {
192 state := m.state
193 waiters := m.stateWaiters
194 m.stateWaiters = nil
195
196 for _, waiter := range waiters {
197 go func(w chan bool) {
198 w <- state == StateRunning
199 }(waiter)
200 }
201}
202
203// Run is the runnable of the Manager, to be started using the Supervisor. It is one-shot, and should not be restarted.
204func (m *Manager) Run(ctx context.Context) error {
205 if state := m.State(); state != StateNew {
Serge Bazanskic7359672020-10-30 16:38:57 +0100206 supervisor.Logger(ctx).Errorf("Manager started with non-New state %s, failing", state.String())
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200207 m.stateLock.Lock()
208 m.state = StateFailed
209 m.wakeWaiters()
210 m.stateLock.Unlock()
211 return nil
212 }
213
214 var err error
215 bo := backoff.NewExponentialBackOff()
216 for {
217 done := false
218 state := m.State()
219 switch state {
220 case StateNew:
221 err = m.stateNew(ctx)
222 case StateCreatingCluster:
223 err = m.stateCreatingCluster(ctx)
Serge Bazanski57b43752020-07-13 19:17:48 +0200224 case StateCharlie:
225 err = m.stateCharlie(ctx)
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200226 default:
227 done = true
228 break
229 }
230
231 if err != nil || done {
232 break
233 }
234
235 if state == m.State() && !m.allowed(state, m.State()) {
Serge Bazanskic7359672020-10-30 16:38:57 +0100236 supervisor.Logger(ctx).Errorf("Enrolment got stuck at %s, failing", m.state.String())
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200237 m.stateLock.Lock()
238 m.state = StateFailed
239 m.stateLock.Unlock()
240 } else {
241 bo.Reset()
242 }
243 }
244
245 m.stateLock.Lock()
246 state := m.state
247 if state != StateRunning {
Serge Bazanskic7359672020-10-30 16:38:57 +0100248 supervisor.Logger(ctx).Errorf("Enrolment failed at %s: %v", m.state.String(), err)
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200249 } else {
250 supervisor.Logger(ctx).Info("Enrolment successful!")
251 }
252 m.wakeWaiters()
253 m.stateLock.Unlock()
254
255 supervisor.Signal(ctx, supervisor.SignalHealthy)
256 supervisor.Signal(ctx, supervisor.SignalDone)
257 return nil
258}
259
260// stateNew is called when a Manager is New. It makes the decision on how to join this node into a cluster.
261func (m *Manager) stateNew(ctx context.Context) error {
262 supervisor.Logger(ctx).Info("Starting enrolment process...")
263
264 // Check for presence of EnrolmentConfig on ESP or in qemu firmware variables.
265 var configRaw []byte
266 configRaw, err := m.storageRoot.ESP.Enrolment.Read()
267 if err != nil && !os.IsNotExist(err) {
268 return fmt.Errorf("could not read local enrolment file: %w", err)
269 } else if err != nil {
270 configRaw, err = ioutil.ReadFile("/sys/firmware/qemu_fw_cfg/by_name/com.nexantic.smalltown/enrolment.pb/raw")
271 if err != nil && !os.IsNotExist(err) {
272 return fmt.Errorf("could not read firmware enrolment file: %w", err)
273 }
274 }
275
276 // If no enrolment file exists, we create a new cluster.
277 if configRaw == nil {
278 m.next(ctx, StateCreatingCluster)
279 return nil
280 }
281
Serge Bazanski57b43752020-07-13 19:17:48 +0200282 // Enrolment file exists, parse it.
283
284 enrolmentConfig := apb.EnrolmentConfig{}
285 if err := proto.Unmarshal(configRaw, &enrolmentConfig); err != nil {
286 return fmt.Errorf("could not unmarshal local enrolment file: %w", err)
287 }
288
289 // If no join ticket exists, we can't do anything yet.
290 if enrolmentConfig.GoldenTicket == nil {
291 return fmt.Errorf("joining a cluster without a golden ticket not yet implemented")
292 }
293
Lorenz Brunca24cfa2020-08-18 13:49:37 +0200294 m.goldenTicket = enrolmentConfig.GoldenTicket
295
Serge Bazanski57b43752020-07-13 19:17:48 +0200296 // Otherwise, we begin enrolling with the Golden Ticket.
297 m.next(ctx, StateCharlie)
298 return nil
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200299}
300
301// stateCreatingCluster is called when the Manager has decided to create a new cluster.
302//
303// The process to create a new cluster is as follows:
304// - wait for IP address
305// - initialize new data partition, by generating local and cluster unlock keys (the local unlock key is saved to
306// the ESP, while the cluster unlock key is returned)
307// - create a new node certificate and Node (with new given cluster unlock key)
308// - start up a new etcd cluster, with this node being the only member
309// - save the new Node to the new etcd cluster (thereby saving the node's cluster unlock key to etcd)
310func (m *Manager) stateCreatingCluster(ctx context.Context) error {
311 logger := supervisor.Logger(ctx)
312 logger.Info("Creating new cluster: waiting for IP address...")
313 ip, err := m.networkService.GetIP(ctx, true)
314 if err != nil {
315 return fmt.Errorf("when getting IP address: %w", err)
316 }
Serge Bazanskic7359672020-10-30 16:38:57 +0100317 logger.Infof("Creating new cluster: got IP address %s", ip.String())
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200318
319 logger.Info("Creating new cluster: initializing storage...")
320 cuk, err := m.storageRoot.Data.MountNew(&m.storageRoot.ESP.LocalUnlock)
321 if err != nil {
322 return fmt.Errorf("when making new data partition: %w", err)
323 }
324 logger.Info("Creating new cluster: storage initialized")
325
326 // Create certificate for node.
327 cert, err := m.storageRoot.Data.Node.EnsureSelfSigned(localstorage.CertificateForNode)
328 if err != nil {
329 return fmt.Errorf("failed to create new node certificate: %w", err)
330 }
331
332 node := NewNode(cuk, *ip, *cert.Leaf)
333
334 m.consensus = consensus.New(consensus.Config{
335 Data: &m.storageRoot.Data.Etcd,
336 Ephemeral: &m.storageRoot.Ephemeral.Consensus,
337 NewCluster: true,
338 Name: node.ID(),
339 InitialCluster: ip.String(),
340 ExternalHost: ip.String(),
341 ListenHost: ip.String(),
342 })
343 if err := supervisor.Run(ctx, "consensus", m.consensus.Run); err != nil {
344 return fmt.Errorf("when starting consensus: %w", err)
345 }
346
347 // TODO(q3k): make timeout configurable?
348 ctxT, ctxC := context.WithTimeout(ctx, 5*time.Second)
349 defer ctxC()
350
351 supervisor.Logger(ctx).Info("Creating new cluster: waiting for consensus...")
352 if err := m.consensus.WaitReady(ctxT); err != nil {
353 return fmt.Errorf("consensus service failed to become ready: %w", err)
354 }
355
356 // Configure node to be a consensus member and kubernetes worker. In the future, different nodes will have
357 // different roles, but for now they're all symmetrical.
358 _, consensusName, err := m.consensus.MemberInfo(ctx)
359 if err != nil {
360 return fmt.Errorf("could not get consensus MemberInfo: %w", err)
361 }
362 if err := node.MakeConsensusMember(consensusName); err != nil {
363 return fmt.Errorf("could not make new node into consensus member: %w", err)
364 }
365 if err := node.MakeKubernetesWorker(node.ID()); err != nil {
366 return fmt.Errorf("could not make new node into kubernetes worker: %w", err)
367 }
368
369 // Save node into etcd.
370 supervisor.Logger(ctx).Info("Creating new cluster: storing first node...")
371 if err := node.Store(ctx, m.consensus.KV("cluster", "enrolment")); err != nil {
372 return fmt.Errorf("could not save new node: %w", err)
373 }
374
375 m.stateLock.Lock()
376 m.stateRunningNode = node
377 m.stateLock.Unlock()
378
379 m.next(ctx, StateRunning)
380 return nil
381}
382
Serge Bazanski57b43752020-07-13 19:17:48 +0200383// stateCharlie is used to join an existing cluster via the GoldenTicket mechanism. This mechanism is temporarily
384// implemented in Smalltown in order to allow for testing multi-node clusters without a TPM attestation flow implemented.
385// The Golden Ticket contains a pregenerated node certificate, etcd certificate, and other data that any node can
386// use to join the cluster.
387// Since this flow is temporary, it has a slight impedance mismatch with methods exposed by localstorage, node, etc.,
388// and the resulting sequencing is a bit odd:
389// - the {node,etcd} certificates/keys are loaded (this already dictates the new node name, as the node name is based
390// off of the node public key)
391// - local storage is initialized, a local/cluster unlock keypair is generated
392// - etcd keys are manually saved to localstorage (vs. being generated locally by CA)
393// - an etcd/consensus member is started, knowing that the remote member was already generated when the golden ticket
394// was generated (vs. being created now by an RPC call, via an promote-node-to-etcd-member flow)
395// - the node is then promoted to a consensus member and kubernetes worker, its clusterunlock key is set, and then it
396// is saved to etcd.
397// As such, in this flow, we first create an etcd member (on goldenticket generation), and then only create a new Smalltown
398// node (when the goldenticket is used).
399func (m *Manager) stateCharlie(ctx context.Context) error {
400 t := m.goldenTicket
401 nodeCert, err := x509.ParseCertificate(t.NodeCert)
402 if err != nil {
403 return fmt.Errorf("parsing node certificate from ticket: %w", err)
404 }
405
406 supervisor.Logger(ctx).Info("Joining cluster: waiting for IP address...")
407 ip, err := m.networkService.GetIP(ctx, true)
408 if err != nil {
409 return fmt.Errorf("when getting IP address: %w", err)
410 }
Serge Bazanskic7359672020-10-30 16:38:57 +0100411 supervisor.Logger(ctx).Info("Joining cluster: got IP address %s", ip.String())
Serge Bazanski57b43752020-07-13 19:17:48 +0200412
413 supervisor.Logger(ctx).Info("Joining cluster: initializing storage...")
414 cuk, err := m.storageRoot.Data.MountNew(&m.storageRoot.ESP.LocalUnlock)
415 if err != nil {
416 return fmt.Errorf("when making new data partition: %w", err)
417 }
418 supervisor.Logger(ctx).Info("Joining cluster: storage initialized")
419 node := NewNode(cuk, *ip, *nodeCert)
420
421 // Save etcd PKI to disk.
422 for _, f := range []struct {
423 target declarative.FilePlacement
424 data []byte
425 blockType string
426 }{
427 {m.storageRoot.Data.Etcd.PeerPKI.Key, t.EtcdClientKey, "PRIVATE KEY"},
428 {m.storageRoot.Data.Etcd.PeerPKI.Certificate, t.EtcdClientCert, "CERTIFICATE"},
429 {m.storageRoot.Data.Etcd.PeerPKI.CACertificate, t.EtcdCaCert, "CERTIFICATE"},
430 } {
431 if err := f.target.Write(pem.EncodeToMemory(&pem.Block{Type: f.blockType, Bytes: f.data}), 0600); err != nil {
432 return fmt.Errorf("when writing etcd PKI data: %w", err)
433 }
434 }
435 if err := m.storageRoot.Data.Etcd.PeerCRL.Write(t.EtcdCrl, 0600); err != nil {
436 return fmt.Errorf("when writing etcd CRL: %w", err)
437 }
438
439 https := func(p *apb.GoldenTicket_EtcdPeer) string {
440 return fmt.Sprintf("%s=https://%s:%d", p.Name, p.Address, common.ConsensusPort)
441 }
442 var initialCluster []string
443 for _, p := range t.Peers {
444 initialCluster = append(initialCluster, https(p))
445 }
446 initialCluster = append(initialCluster, https(t.This))
447
Serge Bazanskic7359672020-10-30 16:38:57 +0100448 supervisor.Logger(ctx).Infof("Joining cluster: starting etcd join, name: %s, initial_cluster: %s", node.ID(), strings.Join(initialCluster, ","))
Serge Bazanski57b43752020-07-13 19:17:48 +0200449 m.consensus = consensus.New(consensus.Config{
450 Data: &m.storageRoot.Data.Etcd,
451 Ephemeral: &m.storageRoot.Ephemeral.Consensus,
452 Name: node.ID(),
453 InitialCluster: strings.Join(initialCluster, ","),
454 ExternalHost: ip.String(),
455 ListenHost: ip.String(),
456 })
457
458 if err := supervisor.Run(ctx, "consensus", m.consensus.Run); err != nil {
459 return fmt.Errorf("when starting consensus: %w", err)
460 }
461
462 // TODO(q3k): make timeout configurable?
463 ctxT, ctxC := context.WithTimeout(ctx, 5*time.Second)
464 defer ctxC()
465
466 supervisor.Logger(ctx).Info("Joining cluster: waiting for consensus...")
467 if err := m.consensus.WaitReady(ctxT); err != nil {
468 return fmt.Errorf("consensus service failed to become ready: %w", err)
469 }
470
471 // Configure node to be a consensus member and kubernetes worker. In the future, different nodes will have
472 // different roles, but for now they're all symmetrical.
473 _, consensusName, err := m.consensus.MemberInfo(ctx)
474 if err != nil {
475 return fmt.Errorf("could not get consensus MemberInfo: %w", err)
476 }
477 if err := node.MakeConsensusMember(consensusName); err != nil {
478 return fmt.Errorf("could not make new node into consensus member: %w", err)
479 }
480 if err := node.MakeKubernetesWorker(node.ID()); err != nil {
481 return fmt.Errorf("could not make new node into kubernetes worker: %w", err)
482 }
483
484 // Save node into etcd.
485 supervisor.Logger(ctx).Info("Creating new cluster: storing first node...")
486 if err := node.Store(ctx, m.consensus.KV("cluster", "enrolment")); err != nil {
487 return fmt.Errorf("could not save new node: %w", err)
488 }
489
490 m.stateLock.Lock()
491 m.stateRunningNode = node
492 m.stateLock.Unlock()
493
494 m.next(ctx, StateRunning)
495 return nil
496}
497
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200498// Node returns the Node that the Manager brought into a cluster, or nil if the Manager is not Running.
499// This is safe to call from any goroutine.
500func (m *Manager) Node() *Node {
501 m.stateLock.Lock()
502 defer m.stateLock.Unlock()
503 if m.state != StateRunning {
504 return nil
505 }
506 return m.stateRunningNode
507}
508
509// ConsensusKV returns a namespaced etcd KV client, or nil if the Manager is not Running.
510// This is safe to call from any goroutine.
511func (m *Manager) ConsensusKV(module, space string) clientv3.KV {
512 m.stateLock.Lock()
513 defer m.stateLock.Unlock()
514 if m.state != StateRunning {
515 return nil
516 }
517 if m.stateRunningNode.ConsensusMember() == nil {
518 // TODO(q3k): in this case, we should return a client to etcd even though this
519 // node is not a member of consensus. For now, all nodes are consensus members.
520 return nil
521 }
522 return m.consensus.KV(module, space)
523}
524
525// ConsensusKVRoot returns a non-namespaced etcd KV client, or nil if the Manager is not Running.
526// This is safe to call from any goroutine.
527func (m *Manager) ConsensusKVRoot() clientv3.KV {
528 m.stateLock.Lock()
529 defer m.stateLock.Unlock()
530 if m.state != StateRunning {
531 return nil
532 }
533 if m.stateRunningNode.ConsensusMember() == nil {
534 // TODO(q3k): in this case, we should return a client to etcd even though this
535 // node is not a member of consensus. For now, all nodes are consensus members.
536 return nil
537 }
538 return m.consensus.KVRoot()
539}
540
541// ConsensusCluster returns an etcd Cluster client, or nil if the Manager is not Running.
542// This is safe to call from any goroutine.
543func (m *Manager) ConsensusCluster() clientv3.Cluster {
544 m.stateLock.Lock()
545 defer m.stateLock.Unlock()
546 if m.state != StateRunning {
547 return nil
548 }
549 if m.stateRunningNode.ConsensusMember() == nil {
550 // TODO(q3k): in this case, we should return a client to etcd even though this
551 // node is not a member of consensus. For now, all nodes are consensus members.
552 return nil
553 }
554 return m.consensus.Cluster()
555}