blob: 2f1355b9f1f7ec21a94862cc23221924487d085d [file] [log] [blame]
Serge Bazanski1ebd1e12020-07-13 19:17:16 +02001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package cluster
18
19import (
20 "context"
Serge Bazanski57b43752020-07-13 19:17:48 +020021 "crypto/x509"
22 "encoding/pem"
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020023 "fmt"
24 "io/ioutil"
25 "os"
Serge Bazanski57b43752020-07-13 19:17:48 +020026 "strings"
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020027 "sync"
28 "time"
29
Serge Bazanski57b43752020-07-13 19:17:48 +020030 apb "git.monogon.dev/source/nexantic.git/core/proto/api"
31
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020032 "github.com/cenkalti/backoff/v4"
Serge Bazanski57b43752020-07-13 19:17:48 +020033 "github.com/golang/protobuf/proto"
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020034 "go.etcd.io/etcd/clientv3"
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020035
Serge Bazanski57b43752020-07-13 19:17:48 +020036 "git.monogon.dev/source/nexantic.git/core/internal/common"
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020037 "git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
38 "git.monogon.dev/source/nexantic.git/core/internal/consensus"
39 "git.monogon.dev/source/nexantic.git/core/internal/localstorage"
Serge Bazanski57b43752020-07-13 19:17:48 +020040 "git.monogon.dev/source/nexantic.git/core/internal/localstorage/declarative"
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020041 "git.monogon.dev/source/nexantic.git/core/internal/network"
42)
43
44// Manager is a finite state machine that joins this node (ie., Smalltown instance running on a virtual/physical machine)
45// into a Smalltown cluster (ie. group of nodes that act as a single control plane for Smalltown services). It does that
46// by bringing up all required operating-system level components, including mounting the local filesystem, bringing up
47// a consensus (etcd) server/client, ...
48//
49// The Manager runs as a single-shot Runnable. It will attempt to progress its state from the initial state (New) to
50// either Running (meaning that the node is now part of a cluster), or Failed (meaning that the node couldn't become
51// part of a cluster). It is not restartable, as it mutates quite a bit of implicit operating-system level state (like
52// filesystem mounts). As such, it's difficult to recover reliably from failures, and since these failures indicate
53// some high issues with the cluster configuration/state, a failure requires a full kernel reboot to retry (or fix/
54// reconfigure the node).
55//
56// Currently, the Manager only supports one flow for bringing up a Node: by creating a new cluster. As such, it's
57// missing the following flows:
58// - joining a new node into an already running cluster
59// - restarting a node into an already existing cluster
60// - restarting a node into an already running cluster (ie. full reboot of whole cluster)
61//
62type Manager struct {
63 storageRoot *localstorage.Root
64 networkService *network.Service
65
66 // stateLock locks all state* variables.
67 stateLock sync.RWMutex
68 // state is the FSM state of the Manager.
69 state State
70 // stateRunningNode is the Node that this Manager got from joining a cluster. It's only valid if the Manager is
71 // Running.
72 stateRunningNode *Node
73 // stateWaiters is a list of channels that wish to be notified (by sending true or false) for when the Manager
74 // reaches a final state (Running or Failed respectively).
75 stateWaiters []chan bool
76
Serge Bazanski57b43752020-07-13 19:17:48 +020077 // goldenTicket is the Golden Ticket present in the enrolment config, if any.
78 goldenTicket *apb.GoldenTicket
79
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020080 // consensus is the spawned etcd/consensus service, if the Manager brought up a Node that should run one.
81 consensus *consensus.Service
82}
83
84// NewManager creates a new cluster Manager. The given localstorage Root must be places, but not yet started (and will
85// be started as the Manager makes progress). The given network Service must already be running.
86func NewManager(storageRoot *localstorage.Root, networkService *network.Service) *Manager {
87 return &Manager{
88 storageRoot: storageRoot,
89 networkService: networkService,
90 }
91}
92
93// State is the state of the Manager finite state machine.
94type State int
95
96const (
97 // StateNew is the initial state of the Manager. It decides how to go about joining or creating a cluster.
98 StateNew State = iota
99 // StateCreatingCluster is when the Manager attempts to create a new cluster - this happens when a node is started
100 // with no EnrolmentConfig.
101 StateCreatingCluster
Serge Bazanski57b43752020-07-13 19:17:48 +0200102 // StateCharlie is when the Manager uses the Golden Ticket debug/stopgap system to join an already
103 // existing cluster. This mechanism will be removed before the first Smalltown release.
104 StateCharlie
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200105 // StateRunning is when the Manager successfully got the node to be part of a cluster. stateRunningNode is valid.
106 StateRunning
107 // StateFailed is when the Manager failed to ge the node to be part of a cluster.
108 StateFailed
109)
110
111func (s State) String() string {
112 switch s {
113 case StateNew:
114 return "New"
115 case StateCreatingCluster:
116 return "CreatingCluster"
Serge Bazanski57b43752020-07-13 19:17:48 +0200117 case StateCharlie:
118 return "Charlie"
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200119 case StateRunning:
120 return "Running"
121 case StateFailed:
122 return "Failed"
123 default:
124 return "UNKNOWN"
125 }
126}
127
128// allowedTransition describes all allowed state transitions (map[From][]To).
129var allowedTransitions = map[State][]State{
Serge Bazanski57b43752020-07-13 19:17:48 +0200130 StateNew: {StateCreatingCluster, StateCharlie},
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200131 StateCreatingCluster: {StateRunning, StateFailed},
Serge Bazanski57b43752020-07-13 19:17:48 +0200132 StateCharlie: {StateRunning, StateFailed},
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200133}
134
135// allowed returns whether a transition from a state to another state is allowed (ie. is defined in allowedTransitions).
136func (m *Manager) allowed(from, to State) bool {
137 for _, allowed := range allowedTransitions[from] {
138 if to == allowed {
139 return true
140 }
141 }
142 return false
143}
144
145// next moves the Manager finite state machine from its current state to `n`, or to Failed if the transition is not
146// allowed.
147func (m *Manager) next(ctx context.Context, n State) {
148 m.stateLock.Lock()
149 defer m.stateLock.Unlock()
150
151 if !m.allowed(m.state, n) {
Serge Bazanskic7359672020-10-30 16:38:57 +0100152 supervisor.Logger(ctx).Errorf("Attempted invalid enrolment state transition, failing enrolment; from: %s, to: %s",
153 m.state.String(), n.String())
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200154 m.state = StateFailed
155 return
156 }
157
Serge Bazanskic7359672020-10-30 16:38:57 +0100158 supervisor.Logger(ctx).Infof("Enrolment state change; from: %s, to: %s", m.state.String(), n.String())
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200159
160 m.state = n
161}
162
163// State returns the state of the Manager. It's safe to call this from any goroutine.
164func (m *Manager) State() State {
165 m.stateLock.RLock()
166 defer m.stateLock.RUnlock()
167 return m.state
168}
169
170// WaitFinished waits until the Manager FSM reaches Running or Failed, and returns true if the FSM is Running. It's
171// safe to call this from any goroutine.
172func (m *Manager) WaitFinished() (success bool) {
173 m.stateLock.Lock()
174 switch m.state {
175 case StateFailed:
176 m.stateLock.Unlock()
177 return false
178 case StateRunning:
179 m.stateLock.Unlock()
180 return true
181 }
182
183 C := make(chan bool)
184 m.stateWaiters = append(m.stateWaiters, C)
185 m.stateLock.Unlock()
186 return <-C
187}
188
189// wakeWaiters wakes any WaitFinished waiters and lets them know about the current state of the Manager.
190// The stateLock must already been taken, and the state must have been set in the same critical section (otherwise
191// this can cause a race condition).
192func (m *Manager) wakeWaiters() {
193 state := m.state
194 waiters := m.stateWaiters
195 m.stateWaiters = nil
196
197 for _, waiter := range waiters {
198 go func(w chan bool) {
199 w <- state == StateRunning
200 }(waiter)
201 }
202}
203
204// Run is the runnable of the Manager, to be started using the Supervisor. It is one-shot, and should not be restarted.
205func (m *Manager) Run(ctx context.Context) error {
206 if state := m.State(); state != StateNew {
Serge Bazanskic7359672020-10-30 16:38:57 +0100207 supervisor.Logger(ctx).Errorf("Manager started with non-New state %s, failing", state.String())
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200208 m.stateLock.Lock()
209 m.state = StateFailed
210 m.wakeWaiters()
211 m.stateLock.Unlock()
212 return nil
213 }
214
215 var err error
216 bo := backoff.NewExponentialBackOff()
217 for {
218 done := false
219 state := m.State()
220 switch state {
221 case StateNew:
222 err = m.stateNew(ctx)
223 case StateCreatingCluster:
224 err = m.stateCreatingCluster(ctx)
Serge Bazanski57b43752020-07-13 19:17:48 +0200225 case StateCharlie:
226 err = m.stateCharlie(ctx)
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200227 default:
228 done = true
229 break
230 }
231
232 if err != nil || done {
233 break
234 }
235
236 if state == m.State() && !m.allowed(state, m.State()) {
Serge Bazanskic7359672020-10-30 16:38:57 +0100237 supervisor.Logger(ctx).Errorf("Enrolment got stuck at %s, failing", m.state.String())
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200238 m.stateLock.Lock()
239 m.state = StateFailed
240 m.stateLock.Unlock()
241 } else {
242 bo.Reset()
243 }
244 }
245
246 m.stateLock.Lock()
247 state := m.state
248 if state != StateRunning {
Serge Bazanskic7359672020-10-30 16:38:57 +0100249 supervisor.Logger(ctx).Errorf("Enrolment failed at %s: %v", m.state.String(), err)
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200250 } else {
251 supervisor.Logger(ctx).Info("Enrolment successful!")
252 }
253 m.wakeWaiters()
254 m.stateLock.Unlock()
255
256 supervisor.Signal(ctx, supervisor.SignalHealthy)
257 supervisor.Signal(ctx, supervisor.SignalDone)
258 return nil
259}
260
261// stateNew is called when a Manager is New. It makes the decision on how to join this node into a cluster.
262func (m *Manager) stateNew(ctx context.Context) error {
263 supervisor.Logger(ctx).Info("Starting enrolment process...")
264
265 // Check for presence of EnrolmentConfig on ESP or in qemu firmware variables.
266 var configRaw []byte
267 configRaw, err := m.storageRoot.ESP.Enrolment.Read()
268 if err != nil && !os.IsNotExist(err) {
269 return fmt.Errorf("could not read local enrolment file: %w", err)
270 } else if err != nil {
271 configRaw, err = ioutil.ReadFile("/sys/firmware/qemu_fw_cfg/by_name/com.nexantic.smalltown/enrolment.pb/raw")
272 if err != nil && !os.IsNotExist(err) {
273 return fmt.Errorf("could not read firmware enrolment file: %w", err)
274 }
275 }
276
277 // If no enrolment file exists, we create a new cluster.
278 if configRaw == nil {
279 m.next(ctx, StateCreatingCluster)
280 return nil
281 }
282
Serge Bazanski57b43752020-07-13 19:17:48 +0200283 // Enrolment file exists, parse it.
284
285 enrolmentConfig := apb.EnrolmentConfig{}
286 if err := proto.Unmarshal(configRaw, &enrolmentConfig); err != nil {
287 return fmt.Errorf("could not unmarshal local enrolment file: %w", err)
288 }
289
290 // If no join ticket exists, we can't do anything yet.
291 if enrolmentConfig.GoldenTicket == nil {
292 return fmt.Errorf("joining a cluster without a golden ticket not yet implemented")
293 }
294
Lorenz Brunca24cfa2020-08-18 13:49:37 +0200295 m.goldenTicket = enrolmentConfig.GoldenTicket
296
Serge Bazanski57b43752020-07-13 19:17:48 +0200297 // Otherwise, we begin enrolling with the Golden Ticket.
298 m.next(ctx, StateCharlie)
299 return nil
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200300}
301
302// stateCreatingCluster is called when the Manager has decided to create a new cluster.
303//
304// The process to create a new cluster is as follows:
305// - wait for IP address
306// - initialize new data partition, by generating local and cluster unlock keys (the local unlock key is saved to
307// the ESP, while the cluster unlock key is returned)
308// - create a new node certificate and Node (with new given cluster unlock key)
309// - start up a new etcd cluster, with this node being the only member
310// - save the new Node to the new etcd cluster (thereby saving the node's cluster unlock key to etcd)
311func (m *Manager) stateCreatingCluster(ctx context.Context) error {
312 logger := supervisor.Logger(ctx)
313 logger.Info("Creating new cluster: waiting for IP address...")
314 ip, err := m.networkService.GetIP(ctx, true)
315 if err != nil {
316 return fmt.Errorf("when getting IP address: %w", err)
317 }
Serge Bazanskic7359672020-10-30 16:38:57 +0100318 logger.Infof("Creating new cluster: got IP address %s", ip.String())
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200319
320 logger.Info("Creating new cluster: initializing storage...")
321 cuk, err := m.storageRoot.Data.MountNew(&m.storageRoot.ESP.LocalUnlock)
322 if err != nil {
323 return fmt.Errorf("when making new data partition: %w", err)
324 }
325 logger.Info("Creating new cluster: storage initialized")
326
327 // Create certificate for node.
328 cert, err := m.storageRoot.Data.Node.EnsureSelfSigned(localstorage.CertificateForNode)
329 if err != nil {
330 return fmt.Errorf("failed to create new node certificate: %w", err)
331 }
332
333 node := NewNode(cuk, *ip, *cert.Leaf)
334
335 m.consensus = consensus.New(consensus.Config{
336 Data: &m.storageRoot.Data.Etcd,
337 Ephemeral: &m.storageRoot.Ephemeral.Consensus,
338 NewCluster: true,
339 Name: node.ID(),
340 InitialCluster: ip.String(),
341 ExternalHost: ip.String(),
342 ListenHost: ip.String(),
343 })
344 if err := supervisor.Run(ctx, "consensus", m.consensus.Run); err != nil {
345 return fmt.Errorf("when starting consensus: %w", err)
346 }
347
348 // TODO(q3k): make timeout configurable?
349 ctxT, ctxC := context.WithTimeout(ctx, 5*time.Second)
350 defer ctxC()
351
352 supervisor.Logger(ctx).Info("Creating new cluster: waiting for consensus...")
353 if err := m.consensus.WaitReady(ctxT); err != nil {
354 return fmt.Errorf("consensus service failed to become ready: %w", err)
355 }
356
357 // Configure node to be a consensus member and kubernetes worker. In the future, different nodes will have
358 // different roles, but for now they're all symmetrical.
359 _, consensusName, err := m.consensus.MemberInfo(ctx)
360 if err != nil {
361 return fmt.Errorf("could not get consensus MemberInfo: %w", err)
362 }
363 if err := node.MakeConsensusMember(consensusName); err != nil {
364 return fmt.Errorf("could not make new node into consensus member: %w", err)
365 }
366 if err := node.MakeKubernetesWorker(node.ID()); err != nil {
367 return fmt.Errorf("could not make new node into kubernetes worker: %w", err)
368 }
369
370 // Save node into etcd.
371 supervisor.Logger(ctx).Info("Creating new cluster: storing first node...")
372 if err := node.Store(ctx, m.consensus.KV("cluster", "enrolment")); err != nil {
373 return fmt.Errorf("could not save new node: %w", err)
374 }
375
376 m.stateLock.Lock()
377 m.stateRunningNode = node
378 m.stateLock.Unlock()
379
380 m.next(ctx, StateRunning)
381 return nil
382}
383
Serge Bazanski57b43752020-07-13 19:17:48 +0200384// stateCharlie is used to join an existing cluster via the GoldenTicket mechanism. This mechanism is temporarily
385// implemented in Smalltown in order to allow for testing multi-node clusters without a TPM attestation flow implemented.
386// The Golden Ticket contains a pregenerated node certificate, etcd certificate, and other data that any node can
387// use to join the cluster.
388// Since this flow is temporary, it has a slight impedance mismatch with methods exposed by localstorage, node, etc.,
389// and the resulting sequencing is a bit odd:
390// - the {node,etcd} certificates/keys are loaded (this already dictates the new node name, as the node name is based
391// off of the node public key)
392// - local storage is initialized, a local/cluster unlock keypair is generated
393// - etcd keys are manually saved to localstorage (vs. being generated locally by CA)
394// - an etcd/consensus member is started, knowing that the remote member was already generated when the golden ticket
395// was generated (vs. being created now by an RPC call, via an promote-node-to-etcd-member flow)
396// - the node is then promoted to a consensus member and kubernetes worker, its clusterunlock key is set, and then it
397// is saved to etcd.
398// As such, in this flow, we first create an etcd member (on goldenticket generation), and then only create a new Smalltown
399// node (when the goldenticket is used).
400func (m *Manager) stateCharlie(ctx context.Context) error {
401 t := m.goldenTicket
402 nodeCert, err := x509.ParseCertificate(t.NodeCert)
403 if err != nil {
404 return fmt.Errorf("parsing node certificate from ticket: %w", err)
405 }
406
407 supervisor.Logger(ctx).Info("Joining cluster: waiting for IP address...")
408 ip, err := m.networkService.GetIP(ctx, true)
409 if err != nil {
410 return fmt.Errorf("when getting IP address: %w", err)
411 }
Serge Bazanskic7359672020-10-30 16:38:57 +0100412 supervisor.Logger(ctx).Info("Joining cluster: got IP address %s", ip.String())
Serge Bazanski57b43752020-07-13 19:17:48 +0200413
414 supervisor.Logger(ctx).Info("Joining cluster: initializing storage...")
415 cuk, err := m.storageRoot.Data.MountNew(&m.storageRoot.ESP.LocalUnlock)
416 if err != nil {
417 return fmt.Errorf("when making new data partition: %w", err)
418 }
419 supervisor.Logger(ctx).Info("Joining cluster: storage initialized")
420 node := NewNode(cuk, *ip, *nodeCert)
421
422 // Save etcd PKI to disk.
423 for _, f := range []struct {
424 target declarative.FilePlacement
425 data []byte
426 blockType string
427 }{
428 {m.storageRoot.Data.Etcd.PeerPKI.Key, t.EtcdClientKey, "PRIVATE KEY"},
429 {m.storageRoot.Data.Etcd.PeerPKI.Certificate, t.EtcdClientCert, "CERTIFICATE"},
430 {m.storageRoot.Data.Etcd.PeerPKI.CACertificate, t.EtcdCaCert, "CERTIFICATE"},
431 } {
432 if err := f.target.Write(pem.EncodeToMemory(&pem.Block{Type: f.blockType, Bytes: f.data}), 0600); err != nil {
433 return fmt.Errorf("when writing etcd PKI data: %w", err)
434 }
435 }
436 if err := m.storageRoot.Data.Etcd.PeerCRL.Write(t.EtcdCrl, 0600); err != nil {
437 return fmt.Errorf("when writing etcd CRL: %w", err)
438 }
439
440 https := func(p *apb.GoldenTicket_EtcdPeer) string {
441 return fmt.Sprintf("%s=https://%s:%d", p.Name, p.Address, common.ConsensusPort)
442 }
443 var initialCluster []string
444 for _, p := range t.Peers {
445 initialCluster = append(initialCluster, https(p))
446 }
447 initialCluster = append(initialCluster, https(t.This))
448
Serge Bazanskic7359672020-10-30 16:38:57 +0100449 supervisor.Logger(ctx).Infof("Joining cluster: starting etcd join, name: %s, initial_cluster: %s", node.ID(), strings.Join(initialCluster, ","))
Serge Bazanski57b43752020-07-13 19:17:48 +0200450 m.consensus = consensus.New(consensus.Config{
451 Data: &m.storageRoot.Data.Etcd,
452 Ephemeral: &m.storageRoot.Ephemeral.Consensus,
453 Name: node.ID(),
454 InitialCluster: strings.Join(initialCluster, ","),
455 ExternalHost: ip.String(),
456 ListenHost: ip.String(),
457 })
458
459 if err := supervisor.Run(ctx, "consensus", m.consensus.Run); err != nil {
460 return fmt.Errorf("when starting consensus: %w", err)
461 }
462
463 // TODO(q3k): make timeout configurable?
464 ctxT, ctxC := context.WithTimeout(ctx, 5*time.Second)
465 defer ctxC()
466
467 supervisor.Logger(ctx).Info("Joining cluster: waiting for consensus...")
468 if err := m.consensus.WaitReady(ctxT); err != nil {
469 return fmt.Errorf("consensus service failed to become ready: %w", err)
470 }
471
472 // Configure node to be a consensus member and kubernetes worker. In the future, different nodes will have
473 // different roles, but for now they're all symmetrical.
474 _, consensusName, err := m.consensus.MemberInfo(ctx)
475 if err != nil {
476 return fmt.Errorf("could not get consensus MemberInfo: %w", err)
477 }
478 if err := node.MakeConsensusMember(consensusName); err != nil {
479 return fmt.Errorf("could not make new node into consensus member: %w", err)
480 }
481 if err := node.MakeKubernetesWorker(node.ID()); err != nil {
482 return fmt.Errorf("could not make new node into kubernetes worker: %w", err)
483 }
484
485 // Save node into etcd.
486 supervisor.Logger(ctx).Info("Creating new cluster: storing first node...")
487 if err := node.Store(ctx, m.consensus.KV("cluster", "enrolment")); err != nil {
488 return fmt.Errorf("could not save new node: %w", err)
489 }
490
491 m.stateLock.Lock()
492 m.stateRunningNode = node
493 m.stateLock.Unlock()
494
495 m.next(ctx, StateRunning)
496 return nil
497}
498
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200499// Node returns the Node that the Manager brought into a cluster, or nil if the Manager is not Running.
500// This is safe to call from any goroutine.
501func (m *Manager) Node() *Node {
502 m.stateLock.Lock()
503 defer m.stateLock.Unlock()
504 if m.state != StateRunning {
505 return nil
506 }
507 return m.stateRunningNode
508}
509
510// ConsensusKV returns a namespaced etcd KV client, or nil if the Manager is not Running.
511// This is safe to call from any goroutine.
512func (m *Manager) ConsensusKV(module, space string) clientv3.KV {
513 m.stateLock.Lock()
514 defer m.stateLock.Unlock()
515 if m.state != StateRunning {
516 return nil
517 }
518 if m.stateRunningNode.ConsensusMember() == nil {
519 // TODO(q3k): in this case, we should return a client to etcd even though this
520 // node is not a member of consensus. For now, all nodes are consensus members.
521 return nil
522 }
523 return m.consensus.KV(module, space)
524}
525
526// ConsensusKVRoot returns a non-namespaced etcd KV client, or nil if the Manager is not Running.
527// This is safe to call from any goroutine.
528func (m *Manager) ConsensusKVRoot() clientv3.KV {
529 m.stateLock.Lock()
530 defer m.stateLock.Unlock()
531 if m.state != StateRunning {
532 return nil
533 }
534 if m.stateRunningNode.ConsensusMember() == nil {
535 // TODO(q3k): in this case, we should return a client to etcd even though this
536 // node is not a member of consensus. For now, all nodes are consensus members.
537 return nil
538 }
539 return m.consensus.KVRoot()
540}
541
542// ConsensusCluster returns an etcd Cluster client, or nil if the Manager is not Running.
543// This is safe to call from any goroutine.
544func (m *Manager) ConsensusCluster() clientv3.Cluster {
545 m.stateLock.Lock()
546 defer m.stateLock.Unlock()
547 if m.state != StateRunning {
548 return nil
549 }
550 if m.stateRunningNode.ConsensusMember() == nil {
551 // TODO(q3k): in this case, we should return a client to etcd even though this
552 // node is not a member of consensus. For now, all nodes are consensus members.
553 return nil
554 }
555 return m.consensus.Cluster()
556}