m/n/core/consensus: refactor for reliability and multinode support
This implements a big refactor of our consensus service/runnable.
First, we move away from the old bespoke API for retrieving the
consensus status (and consensus clients) into using Event Values, as the
rest of the codebase does.
Second, we move away from the bespoke PKI library used to generate
certificates in-memory and then commit them to etcd into using the
standard metropolis pki library. We then change the bootstrap process to
start a PKI-less etcd instance first, generate the PKI data directly on
the running instance, and then restart into a fully PKI-supporting etcd
instance.
We also move away from using etcd-specific private keys into reusing the
node's private key. This makes management slightly easier, but reviewers
should consider the security implications of this change.
Finally, we implement and test multi-member cluster support, which is
done by exposing an AddNode method to the newly exposed status, and a
JoinCluster option in the node configuration.
Change-Id: Iea2bf6114cb699d3792efd45d06de2fa5a48feb1
Reviewed-on: https://review.monogon.dev/c/monogon/+/466
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/node/core/consensus/BUILD.bazel b/metropolis/node/core/consensus/BUILD.bazel
index d7b682b..f6e30e7 100644
--- a/metropolis/node/core/consensus/BUILD.bazel
+++ b/metropolis/node/core/consensus/BUILD.bazel
@@ -3,23 +3,27 @@
go_library(
name = "go_default_library",
srcs = [
+ "configuration.go",
"consensus.go",
"logparser.go",
+ "status.go",
],
importpath = "source.monogon.dev/metropolis/node/core/consensus",
visibility = ["//:__subpackages__"],
deps = [
"//metropolis/node:go_default_library",
- "//metropolis/node/core/consensus/ca:go_default_library",
"//metropolis/node/core/consensus/client:go_default_library",
+ "//metropolis/node/core/identity:go_default_library",
"//metropolis/node/core/localstorage:go_default_library",
+ "//metropolis/pkg/event:go_default_library",
+ "//metropolis/pkg/event/memory:go_default_library",
"//metropolis/pkg/logbuffer:go_default_library",
"//metropolis/pkg/logtree:go_default_library",
"//metropolis/pkg/logtree/unraw:go_default_library",
+ "//metropolis/pkg/pki:go_default_library",
"//metropolis/pkg/supervisor:go_default_library",
"@io_etcd_go_etcd//clientv3:go_default_library",
"@io_etcd_go_etcd//embed:go_default_library",
- "@org_uber_go_atomic//:go_default_library",
],
)
@@ -31,10 +35,15 @@
"logparser_test.go",
],
embed = [":go_default_library"],
+ tags = [
+ # Enable network sandboxing by asking the Bazel executor to block any
+ # network access. This is necessary as tests listen on static ports on
+ # loopback.
+ "block-network",
+ ],
deps = [
"//metropolis/node/core/localstorage:go_default_library",
"//metropolis/node/core/localstorage/declarative:go_default_library",
- "//metropolis/pkg/freeport:go_default_library",
"//metropolis/pkg/logbuffer:go_default_library",
"//metropolis/pkg/logtree:go_default_library",
"//metropolis/pkg/supervisor:go_default_library",
diff --git a/metropolis/node/core/consensus/ca/BUILD.bazel b/metropolis/node/core/consensus/ca/BUILD.bazel
deleted file mode 100644
index 4731852..0000000
--- a/metropolis/node/core/consensus/ca/BUILD.bazel
+++ /dev/null
@@ -1,9 +0,0 @@
-load("@io_bazel_rules_go//go:def.bzl", "go_library")
-
-go_library(
- name = "go_default_library",
- srcs = ["ca.go"],
- importpath = "source.monogon.dev/metropolis/node/core/consensus/ca",
- visibility = ["//:__subpackages__"],
- deps = ["@io_etcd_go_etcd//clientv3:go_default_library"],
-)
diff --git a/metropolis/node/core/consensus/ca/ca.go b/metropolis/node/core/consensus/ca/ca.go
deleted file mode 100644
index e0c56b0..0000000
--- a/metropolis/node/core/consensus/ca/ca.go
+++ /dev/null
@@ -1,455 +0,0 @@
-// Copyright 2020 The Monogon Project Authors.
-//
-// SPDX-License-Identifier: Apache-2.0
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-// package ca implements a simple standards-compliant certificate authority.
-// It only supports ed25519 keys, and does not maintain any persistent state.
-//
-// The CA is backed by etcd storage, and can also bootstrap itself without a
-// yet running etcd storage (and commit in-memory secrets to etcd at a later
-// date).
-//
-// This is different from //metropolis/pkg/pki in that it has to solve the
-// certs-for-etcd-on-etcd bootstrap problem. Perhaps it should be rewritten to
-// implement the Issuer/Ceritifcate interface available there.
-//
-// CA and certificates successfully pass https://github.com/zmap/zlint
-// (minus the CA/B rules that a public CA would adhere to, which requires
-// things like OCSP servers, Certificate Policies and ECDSA/RSA-only keys).
-package ca
-
-// TODO(leo): add zlint test
-
-import (
- "context"
- "crypto"
- "crypto/ed25519"
- "crypto/rand"
- "crypto/sha1"
- "crypto/x509"
- "crypto/x509/pkix"
- "encoding/asn1"
- "encoding/hex"
- "errors"
- "fmt"
- "math/big"
- "time"
-
- "go.etcd.io/etcd/clientv3"
-)
-
-const (
- // TODO(q3k): move this to a declarative storage layer
- pathCACertificate = "/etcd-ca/ca.der"
- pathCAKey = "/etcd-ca/ca-key.der"
- pathCACRL = "/etcd-ca/crl.der"
- pathIssuedCertificates = "/etcd-ca/certs/"
-)
-
-func pathIssuedCertificate(serial *big.Int) string {
- return pathIssuedCertificates + hex.EncodeToString(serial.Bytes())
-}
-
-var (
- // From RFC 5280 Section 4.1.2.5
- unknownNotAfter = time.Unix(253402300799, 0)
-)
-
-type CA struct {
- // TODO: Potentially protect the key with memguard
- privateKey *ed25519.PrivateKey
- CACert *x509.Certificate
- CACertRaw []byte
-
- // bootstrapIssued are certificates that have been issued by the CA before
- // it has been successfully Saved to etcd.
- bootstrapIssued [][]byte
- // canBootstrapIssue is set on CAs that have been created by New and not
- // yet stored to etcd. If not set, certificates cannot be issued in-memory.
- canBootstrapIssue bool
-}
-
-// Workaround for https://github.com/golang/go/issues/26676 in Go's
-// crypto/x509. Specifically Go violates Section 4.2.1.2 of RFC 5280 without
-// this.
-// Fixed for 1.15 in https://go-review.googlesource.com/c/go/+/227098/.
-//
-// Taken from https://github.com/FiloSottile/mkcert/blob/master/cert.go#L295
-// written by one of Go's crypto engineers (BSD 3-clause).
-func calculateSKID(pubKey crypto.PublicKey) ([]byte, error) {
- spkiASN1, err := x509.MarshalPKIXPublicKey(pubKey)
- if err != nil {
- return nil, err
- }
-
- var spki struct {
- Algorithm pkix.AlgorithmIdentifier
- SubjectPublicKey asn1.BitString
- }
- _, err = asn1.Unmarshal(spkiASN1, &spki)
- if err != nil {
- return nil, err
- }
- skid := sha1.Sum(spki.SubjectPublicKey.Bytes)
- return skid[:], nil
-}
-
-// New creates a new certificate authority with the given common name. The
-// newly created CA will be stored in memory until committed to etcd by calling
-// .Save.
-func New(name string) (*CA, error) {
- pubKey, privKey, err := ed25519.GenerateKey(rand.Reader)
- if err != nil {
- panic(err)
- }
-
- serialNumberLimit := new(big.Int).Lsh(big.NewInt(1), 127)
- serialNumber, err := rand.Int(rand.Reader, serialNumberLimit)
- if err != nil {
- return nil, fmt.Errorf("failed to generate serial number: %w", err)
- }
-
- skid, err := calculateSKID(pubKey)
- if err != nil {
- return nil, err
- }
-
- caCert := &x509.Certificate{
- SerialNumber: serialNumber,
- Subject: pkix.Name{
- CommonName: name,
- },
- IsCA: true,
- BasicConstraintsValid: true,
- NotBefore: time.Now(),
- NotAfter: unknownNotAfter,
- KeyUsage: x509.KeyUsageCertSign | x509.KeyUsageCRLSign | x509.KeyUsageDigitalSignature,
- ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageOCSPSigning},
- AuthorityKeyId: skid,
- SubjectKeyId: skid,
- }
-
- caCertRaw, err := x509.CreateCertificate(rand.Reader, caCert, caCert, pubKey, privKey)
- if err != nil {
- return nil, fmt.Errorf("failed to create root certificate: %w", err)
- }
-
- ca := &CA{
- privateKey: &privKey,
- CACertRaw: caCertRaw,
- CACert: caCert,
-
- canBootstrapIssue: true,
- }
-
- return ca, nil
-}
-
-// Load restores CA state from etcd.
-func Load(ctx context.Context, kv clientv3.KV) (*CA, error) {
- resp, err := kv.Txn(ctx).Then(
- clientv3.OpGet(pathCACertificate),
- clientv3.OpGet(pathCAKey),
- // We only read the CRL to ensure it exists on etcd (and early fail on
- // inconsistency)
- clientv3.OpGet(pathCACRL)).Commit()
- if err != nil {
- return nil, fmt.Errorf("failed to retrieve CA from etcd: %w", err)
- }
-
- var caCert, caKey, caCRL []byte
- for _, el := range resp.Responses {
- for _, kv := range el.GetResponseRange().GetKvs() {
- switch string(kv.Key) {
- case pathCACertificate:
- caCert = kv.Value
- case pathCAKey:
- caKey = kv.Value
- case pathCACRL:
- caCRL = kv.Value
- }
- }
- }
- if caCert == nil || caKey == nil || caCRL == nil {
- return nil, fmt.Errorf("failed to retrieve CA from etcd, missing at least one of {ca key, ca crt, ca crl}")
- }
-
- if len(caKey) != ed25519.PrivateKeySize {
- return nil, errors.New("invalid CA private key size")
- }
- privateKey := ed25519.PrivateKey(caKey)
-
- caCertVal, err := x509.ParseCertificate(caCert)
- if err != nil {
- return nil, fmt.Errorf("failed to parse CA certificate: %w", err)
- }
- return &CA{
- privateKey: &privateKey,
- CACertRaw: caCert,
- CACert: caCertVal,
- }, nil
-}
-
-// Save stores a newly created CA into etcd, committing both the CA data and
-// any certificates issued until then.
-func (c *CA) Save(ctx context.Context, kv clientv3.KV) error {
- crl, err := c.makeCRL(nil)
- if err != nil {
- return fmt.Errorf("failed to generate initial CRL: %w", err)
- }
-
- ops := []clientv3.Op{
- clientv3.OpPut(pathCACertificate, string(c.CACertRaw)),
- clientv3.OpPut(pathCAKey, string([]byte(*c.privateKey))),
- clientv3.OpPut(pathCACRL, string(crl)),
- }
- for i, certRaw := range c.bootstrapIssued {
- cert, err := x509.ParseCertificate(certRaw)
- if err != nil {
- return fmt.Errorf("failed to parse in-memory certificate %d", i)
- }
- ops = append(ops, clientv3.OpPut(pathIssuedCertificate(cert.SerialNumber), string(certRaw)))
- }
-
- res, err := kv.Txn(ctx).If(
- clientv3.Compare(clientv3.CreateRevision(pathCAKey), "=", 0),
- ).Then(ops...).Commit()
- if err != nil {
- return fmt.Errorf("failed to store CA to etcd: %w", err)
- }
- if !res.Succeeded {
- // This should pretty much never happen, but we want to catch it just in case.
- return fmt.Errorf("failed to store CA to etcd: CA already present - cluster-level data inconsistency")
- }
- c.bootstrapIssued = nil
- c.canBootstrapIssue = false
- return nil
-}
-
-// Issue issues a certificate. If kv is non-nil, the newly issued certificate
-// will be immediately stored to etcd, otherwise it will be kept in memory
-// (until .Save is called). Certificates can only be issued to memory on
-// newly-created CAs that have not been saved to etcd yet.
-func (c *CA) Issue(ctx context.Context, kv clientv3.KV, commonName string) (cert []byte, privkey []byte, err error) {
- serialNumberLimit := new(big.Int).Lsh(big.NewInt(1), 127)
- serialNumber, err := rand.Int(rand.Reader, serialNumberLimit)
- if err != nil {
- err = fmt.Errorf("failed to generate serial number: %w", err)
- return
- }
-
- pubKey, privKeyRaw, err := ed25519.GenerateKey(rand.Reader)
- if err != nil {
- return
- }
- privkey, err = x509.MarshalPKCS8PrivateKey(privKeyRaw)
- if err != nil {
- return
- }
-
- etcdCert := &x509.Certificate{
- SerialNumber: serialNumber,
- Subject: pkix.Name{
- CommonName: commonName,
- OrganizationalUnit: []string{"etcd"},
- },
- IsCA: false,
- BasicConstraintsValid: true,
- NotBefore: time.Now(),
- NotAfter: unknownNotAfter,
- ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth},
- DNSNames: []string{commonName},
- }
- cert, err = x509.CreateCertificate(rand.Reader, etcdCert, c.CACert, pubKey, c.privateKey)
- if err != nil {
- err = fmt.Errorf("failed to sign new certificate: %w", err)
- return
- }
-
- if kv != nil {
- path := pathIssuedCertificate(serialNumber)
- _, err = kv.Put(ctx, path, string(cert))
- if err != nil {
- err = fmt.Errorf("failed to commit new certificate to etcd: %w", err)
- return
- }
- } else {
- if !c.canBootstrapIssue {
- err = fmt.Errorf("cannot issue new certificate to memory on existing, etcd-backed CA")
- return
- }
- c.bootstrapIssued = append(c.bootstrapIssued, cert)
- }
- return
-}
-
-func (c *CA) makeCRL(revoked []pkix.RevokedCertificate) ([]byte, error) {
- crl, err := c.CACert.CreateCRL(rand.Reader, c.privateKey, revoked, time.Now(), unknownNotAfter)
- if err != nil {
- return nil, fmt.Errorf("failed to generate CRL: %w", err)
- }
- return crl, nil
-}
-
-// Revoke revokes a certificate by hostname. The selected hostname will be
-// added to the CRL stored in etcd. This call might fail (safely) if a
-// simultaneous revoke happened that caused the CRL to be bumped. The call can
-// be then retried safely.
-func (c *CA) Revoke(ctx context.Context, kv clientv3.KV, hostname string) error {
- res, err := kv.Txn(ctx).Then(
- clientv3.OpGet(pathCACRL),
- clientv3.OpGet(pathIssuedCertificates, clientv3.WithPrefix())).Commit()
- if err != nil {
- return fmt.Errorf("failed to retrieve certificates and CRL from etcd: %w", err)
- }
-
- var certs []*x509.Certificate
- var crlRevision int64
- var crl *pkix.CertificateList
- for _, el := range res.Responses {
- for _, kv := range el.GetResponseRange().GetKvs() {
- if string(kv.Key) == pathCACRL {
- crl, err = x509.ParseCRL(kv.Value)
- if err != nil {
- return fmt.Errorf("could not parse CRL from etcd: %w", err)
- }
- crlRevision = kv.CreateRevision
- } else {
- cert, err := x509.ParseCertificate(kv.Value)
- if err != nil {
- return fmt.Errorf("could not parse certificate %q from etcd: %w", string(kv.Key), err)
- }
- certs = append(certs, cert)
- }
- }
- }
-
- if crl == nil {
- return fmt.Errorf("could not find CRL in etcd")
- }
- revoked := crl.TBSCertList.RevokedCertificates
-
- // Find requested hostname in issued certificates.
- var serial *big.Int
- for _, cert := range certs {
- for _, dnsName := range cert.DNSNames {
- if dnsName == hostname {
- serial = cert.SerialNumber
- break
- }
- }
- if serial != nil {
- break
- }
- }
- if serial == nil {
- return fmt.Errorf("could not find requested hostname")
- }
-
- // Check if certificate has already been revoked.
- for _, revokedCert := range revoked {
- if revokedCert.SerialNumber.Cmp(serial) == 0 {
- return nil // Already revoked
- }
- }
-
- revoked = append(revoked, pkix.RevokedCertificate{
- SerialNumber: serial,
- RevocationTime: time.Now(),
- })
-
- crlRaw, err := c.makeCRL(revoked)
- if err != nil {
- return fmt.Errorf("when generating new CRL for revocation: %w", err)
- }
-
- res, err = kv.Txn(ctx).If(
- clientv3.Compare(clientv3.CreateRevision(pathCACRL), "=", crlRevision),
- ).Then(
- clientv3.OpPut(pathCACRL, string(crlRaw)),
- ).Commit()
- if err != nil {
- return fmt.Errorf("when saving new CRL: %w", err)
- }
- if !res.Succeeded {
- return fmt.Errorf("CRL save transaction failed, retry possibly")
- }
-
- return nil
-}
-
-// WaitCRLChange returns a channel that will receive a CRLUpdate any time the
-// remote CRL changed. Immediately after calling this method, the current CRL
-// is retrieved from the cluster and put into the channel.
-func (c *CA) WaitCRLChange(ctx context.Context, kv clientv3.KV, w clientv3.Watcher) <-chan CRLUpdate {
- C := make(chan CRLUpdate)
-
- go func(ctx context.Context) {
- ctxC, cancel := context.WithCancel(ctx)
- defer cancel()
-
- fail := func(f string, args ...interface{}) {
- C <- CRLUpdate{Err: fmt.Errorf(f, args...)}
- close(C)
- }
-
- initial, err := kv.Get(ctx, pathCACRL)
- if err != nil {
- fail("failed to retrieve initial CRL: %w", err)
- return
- }
-
- C <- CRLUpdate{CRL: initial.Kvs[0].Value}
-
- for wr := range w.Watch(ctxC, pathCACRL, clientv3.WithRev(initial.Kvs[0].CreateRevision)) {
- if wr.Err() != nil {
- fail("failed watching CRL: %w", wr.Err())
- return
- }
-
- for _, e := range wr.Events {
- if string(e.Kv.Key) != pathCACRL {
- continue
- }
-
- C <- CRLUpdate{CRL: e.Kv.Value}
- }
- }
- }(ctx)
-
- return C
-}
-
-// CRLUpdate is emitted for every remote CRL change, and spuriously on ever new
-// WaitCRLChange.
-type CRLUpdate struct {
- // The new (or existing, in the case of the first call) CRL. If nil, Err
- // will be set.
- CRL []byte
- // If set, an error occurred and the WaitCRLChange call must be restarted.
- // If set, CRL will be nil.
- Err error
-}
-
-// GetCurrentCRL returns the current CRL for the CA. This should only be used
-// for one-shot operations like bootstrapping a new node that doesn't yet have
-// access to etcd - otherwise, WaitCRLChange shoulde be used.
-func (c *CA) GetCurrentCRL(ctx context.Context, kv clientv3.KV) ([]byte, error) {
- initial, err := kv.Get(ctx, pathCACRL)
- if err != nil {
- return nil, fmt.Errorf("failed to retrieve initial CRL: %w", err)
- }
- return initial.Kvs[0].Value, nil
-}
diff --git a/metropolis/node/core/consensus/configuration.go b/metropolis/node/core/consensus/configuration.go
new file mode 100644
index 0000000..1e7cff6
--- /dev/null
+++ b/metropolis/node/core/consensus/configuration.go
@@ -0,0 +1,160 @@
+package consensus
+
+import (
+ "crypto/ed25519"
+ "crypto/x509"
+ "fmt"
+ "net"
+ "net/url"
+ "strconv"
+ "time"
+
+ "go.etcd.io/etcd/clientv3"
+ "go.etcd.io/etcd/embed"
+
+ "source.monogon.dev/metropolis/node"
+ "source.monogon.dev/metropolis/node/core/identity"
+ "source.monogon.dev/metropolis/node/core/localstorage"
+ "source.monogon.dev/metropolis/pkg/pki"
+)
+
+// Config describes the startup configuration of a consensus instance.
+type Config struct {
+ // Data directory (persistent, encrypted storage) for etcd.
+ Data *localstorage.DataEtcdDirectory
+ // Ephemeral directory for etcd.
+ Ephemeral *localstorage.EphemeralConsensusDirectory
+
+ // JoinCluster is set if this instance is to join an existing cluster for the
+ // first time. If not set, it's assumed this instance has ran before and has all
+ // the state on disk required to become part of whatever cluster it was before.
+ // If that data is not present, a new cluster will be bootstrapped.
+ JoinCluster *JoinCluster
+
+ // NodePrivateKey is the node's main private key which is also used for
+ // Metropolis PKI. The same key will be used to identify consensus nodes, but
+ // different certificates will be used.
+ NodePrivateKey ed25519.PrivateKey
+
+ testOverrides testOverrides
+}
+
+// JoinCluster is all the data required for a node to join (for the first time)
+// an already running cluster. This data is available from an already running
+// consensus member by performing AddNode, which is called by the Curator when
+// new etcd nodes are added to the cluster.
+type JoinCluster struct {
+ CACertificate *x509.Certificate
+ NodeCertificate *x509.Certificate
+ // ExistingNodes are an arbitrarily ordered list of other consensus members that
+ // the node should attempt to contact.
+ ExistingNodes []ExistingNode
+ // InitialCRL is a certificate revocation list for this cluster. After the node
+ // starts, a CRL on disk will be maintained reflecting the PKI state within etcd.
+ InitialCRL *pki.CRL
+}
+
+// ExistingNode is the peer URL and name of an already running consensus instance.
+type ExistingNode struct {
+ Name string
+ URL string
+}
+
+func (e *ExistingNode) connectionString() string {
+ return fmt.Sprintf("%s=%s", e.Name, e.URL)
+}
+
+func (c *Config) nodePublicKey() ed25519.PublicKey {
+ return c.NodePrivateKey.Public().(ed25519.PublicKey)
+}
+
+// testOverrides are available to test code to make some things easier in a test
+// environment.
+type testOverrides struct {
+ // externalPort overrides the default port used by the node.
+ externalPort int
+ // externalAddress overrides the address of the node, which is usually its ID.
+ externalAddress string
+}
+
+// build takes a Config and returns an etcd embed.Config.
+//
+// enablePeers selects whether the etcd instance will listen for peer traffic
+// over TLS. This requires TLS credentials to be present on disk, and will be
+// disabled for bootstrapping the instance.
+func (c *Config) build(enablePeers bool) *embed.Config {
+ nodeID := identity.NodeID(c.nodePublicKey())
+ port := int(node.ConsensusPort)
+ if p := c.testOverrides.externalPort; p != 0 {
+ port = p
+ }
+ host := nodeID
+ var extraNames []string
+ if c.testOverrides.externalAddress != "" {
+ host = c.testOverrides.externalAddress
+ extraNames = append(extraNames, host)
+ }
+
+ cfg := embed.NewConfig()
+
+ cfg.Name = nodeID
+ cfg.ClusterState = "existing"
+ cfg.InitialClusterToken = "METROPOLIS"
+ cfg.Logger = "zap"
+ cfg.LogOutputs = []string{c.Ephemeral.ServerLogsFIFO.FullPath()}
+
+ cfg.Dir = c.Data.Data.FullPath()
+
+ // Client URL, ie. local UNIX socket to listen on for trusted, unauthenticated
+ // traffic.
+ cfg.LCUrls = []url.URL{{
+ Scheme: "unix",
+ Path: c.Ephemeral.ClientSocket.FullPath() + ":0",
+ }}
+
+ if enablePeers {
+ cfg.PeerTLSInfo.CertFile = c.Data.PeerPKI.Certificate.FullPath()
+ cfg.PeerTLSInfo.KeyFile = c.Data.PeerPKI.Key.FullPath()
+ cfg.PeerTLSInfo.TrustedCAFile = c.Data.PeerPKI.CACertificate.FullPath()
+ cfg.PeerTLSInfo.ClientCertAuth = true
+ cfg.PeerTLSInfo.CRLFile = c.Data.PeerCRL.FullPath()
+
+ cfg.LPUrls = []url.URL{{
+ Scheme: "https",
+ Host: fmt.Sprintf("[::]:%d", port),
+ }}
+ cfg.APUrls = []url.URL{{
+ Scheme: "https",
+ Host: net.JoinHostPort(host, strconv.Itoa(port)),
+ }}
+ } else {
+ // When not enabling peer traffic, listen on loopback. We would not listen at
+ // all, but etcd seems to prevent us from doing that.
+ cfg.LPUrls = []url.URL{{
+ Scheme: "http",
+ Host: fmt.Sprintf("127.0.0.1:%d", port),
+ }}
+ cfg.APUrls = []url.URL{{
+ Scheme: "http",
+ Host: fmt.Sprintf("127.0.0.1:%d", port),
+ }}
+ }
+
+ cfg.InitialCluster = cfg.InitialClusterFromName(nodeID)
+ if c.JoinCluster != nil {
+ for _, n := range c.JoinCluster.ExistingNodes {
+ cfg.InitialCluster += "," + n.connectionString()
+ }
+ }
+ return cfg
+}
+
+// localClient returns an etcd client connected to the socket as configured in
+// Config.
+func (c *Config) localClient() (*clientv3.Client, error) {
+ socket := c.Ephemeral.ClientSocket.FullPath()
+ return clientv3.New(clientv3.Config{
+ Endpoints: []string{fmt.Sprintf("unix://%s:0", socket)},
+ DialTimeout: time.Second,
+ })
+}
diff --git a/metropolis/node/core/consensus/consensus.go b/metropolis/node/core/consensus/consensus.go
index d0fe83f..f19e923 100644
--- a/metropolis/node/core/consensus/consensus.go
+++ b/metropolis/node/core/consensus/consensus.go
@@ -14,93 +14,138 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-// Package consensus implements a managed etcd cluster member service, with a
-// self-hosted CA system for issuing peer certificates. Currently each
-// Metropolis node runs an etcd member, and connects to the etcd member locally
-// over a domain socket.
+// Package consensus implements a runnable that manages an etcd instance which
+// forms part of a Metropolis etcd cluster. This cluster is a foundational
+// building block of Metropolis and its startup/management sequencing needs to
+// be as robust as possible.
//
-// The service supports two modes of startup:
-// - initializing a new cluster, by bootstrapping the CA in memory, starting a
-// cluster, committing the CA to etcd afterwards, and saving the new node's
-// certificate to local storage
-// - joining an existing cluster, using certificates from local storage and
-// loading the CA from etcd. This flow is also used when the node joins a
-// cluster for the first time (then the certificates required must be
-// provisioned externally before starting the consensus service).
+// Cluster Structure
//
-// Regardless of how the etcd member service was started, the resulting running
-// service is further managed and used in the same way.
+// Each etcd instance listens for two kinds of traffic:
//
+// 1. Peer traffic over TLS on a TCP port of the node's main interface. This is
+// where other etcd instances connect to to exchange peer traffic, perform
+// transactions and build quorum. The TLS credentials are stored in a PKI that
+// is managed internally by the consensus runnable, with its state stored in
+// etcd itself.
+//
+// 2. Client traffic over a local domain socket, with access control based on
+// standard Linux user/group permissions. Currently this allows any code running
+// as root on the host namespace full access to the etcd cluster.
+//
+// This means that if code running on a node wishes to perform etcd
+// transactions, it must also run an etcd instance. This colocation of all
+// direct etcd access and the etcd intances themselves effectively delegate all
+// Metropolis control plane functionality to whatever subset of nodes is running
+// consensus and all codes that connects to etcd directly (the Curator).
+//
+// For example, if nodes foo and bar are parts of the control plane, but node
+// worker is not:
+//
+// .---------------------.
+// | node-foo |
+// |---------------------|
+// | .--------------------.
+// | | etcd |<---etcd/TLS--. (node.ConsensusPort)
+// | '--------------------' |
+// | ^ Domain Socket | |
+// | | etcd/plain | |
+// | .--------------------. |
+// | | curator |<---gRPC/TLS----. (node.CuratorServicePort)
+// | '--------------------' | |
+// | ^ Domain Socket | | |
+// | | gRPC/plain | | |
+// | .-----------------. | | |
+// | | node logic | | | |
+// | '-----------------' | | |
+// '---------------------' | |
+// | |
+// .---------------------. | |
+// | node-baz | | |
+// |---------------------| | |
+// | .--------------------. | |
+// | | etcd |<-------------' |
+// | '--------------------' |
+// | ^ Domain Socket | |
+// | | gRPC/plain | |
+// | .--------------------. |
+// | | curator |<---gRPC/TLS----:
+// | '--------------------' |
+// | ... | |
+// '---------------------' |
+// |
+// .---------------------. |
+// | node-worker | |
+// |---------------------| |
+// | .-----------------. | |
+// | | node logic |-------------------'
+// | '-----------------' |
+// '---------------------'
+//
+
package consensus
import (
"context"
- "encoding/pem"
+ "crypto/ed25519"
+ "crypto/x509"
+ "crypto/x509/pkix"
"fmt"
- "net/url"
- "sync"
+ "math/big"
"time"
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/embed"
- "go.uber.org/atomic"
- node "source.monogon.dev/metropolis/node"
- "source.monogon.dev/metropolis/node/core/consensus/ca"
"source.monogon.dev/metropolis/node/core/consensus/client"
- "source.monogon.dev/metropolis/node/core/localstorage"
+ "source.monogon.dev/metropolis/node/core/identity"
+ "source.monogon.dev/metropolis/pkg/event/memory"
"source.monogon.dev/metropolis/pkg/logtree/unraw"
+ "source.monogon.dev/metropolis/pkg/pki"
"source.monogon.dev/metropolis/pkg/supervisor"
)
-const (
- DefaultClusterToken = "METROPOLIS"
- DefaultLogger = "zap"
+var (
+ pkiNamespace = pki.Namespaced("/pki/")
)
-// Service is the etcd cluster member service.
+func pkiCA() *pki.Certificate {
+ return &pki.Certificate{
+ Name: "CA",
+ Namespace: &pkiNamespace,
+ Issuer: pki.SelfSigned,
+ Template: x509.Certificate{
+ SerialNumber: big.NewInt(1),
+ Subject: pkix.Name{
+ CommonName: "Metropolis etcd CA Certificate",
+ },
+ IsCA: true,
+ KeyUsage: x509.KeyUsageCertSign | x509.KeyUsageCRLSign | x509.KeyUsageDigitalSignature,
+ ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageOCSPSigning},
+ },
+ }
+}
+
+func pkiPeerCertificate(pubkey ed25519.PublicKey, extraNames []string) x509.Certificate {
+ return x509.Certificate{
+ Subject: pkix.Name{
+ CommonName: identity.NodeID(pubkey),
+ },
+ KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageKeyEncipherment,
+ ExtKeyUsage: []x509.ExtKeyUsage{
+ x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth,
+ },
+ DNSNames: append(extraNames, identity.NodeID(pubkey)),
+ }
+}
+
+// Service is the etcd cluster member service. See package-level documentation
+// for more information.
type Service struct {
- // The configuration with which the service was started. This is immutable.
config *Config
- // stateMu guards state. This is locked internally on public methods of
- // Service that require access to state. The state might be recreated on
- // service restart.
- stateMu sync.Mutex
- state *state
-}
-
-// state is the runtime state of a running etcd member.
-type state struct {
- etcd *embed.Etcd
- ready atomic.Bool
-
- ca *ca.CA
- // cl is an etcd client that loops back to the localy running etcd server.
- // This runs over the Client unix domain socket that etcd starts.
- cl *clientv3.Client
-}
-
-type Config struct {
- // Data directory (persistent, encrypted storage) for etcd.
- Data *localstorage.DataEtcdDirectory
- // Ephemeral directory for etcd.
- Ephemeral *localstorage.EphemeralConsensusDirectory
-
- // Name is the cluster name. This must be the same amongst all etcd members
- // within one cluster.
- Name string
- // NewCluster selects whether the etcd member will start a new cluster and
- // bootstrap a CA and the first member certificate, or load existing PKI
- // certificates from disk.
- NewCluster bool
- // Port is the port at which this cluster member will listen for other
- // members. If zero, defaults to the global Metropolis setting.
- Port int
-
- // externalHost is used by tests to override the address at which etcd
- // should listen for peer connections.
- externalHost string
+ value memory.Value
+ ca *pki.Certificate
}
func New(config Config) *Service {
@@ -109,264 +154,297 @@
}
}
-// configure transforms the service configuration into an embedded etcd
-// configuration. This is pure and side effect free.
-func (s *Service) configure(ctx context.Context) (*embed.Config, error) {
- if err := s.config.Ephemeral.MkdirAll(0700); err != nil {
- return nil, fmt.Errorf("failed to create ephemeral directory: %w", err)
- }
- if err := s.config.Data.MkdirAll(0700); err != nil {
- return nil, fmt.Errorf("failed to create data directory: %w", err)
- }
+// Run is a Supervisor runnable that starts the etcd member service. It will
+// become healthy once the member joins the cluster successfully.
+func (s *Service) Run(ctx context.Context) error {
+ // Always re-create CA to make sure we don't have PKI state from previous runs.
+ //
+ // TODO(q3k): make the PKI library immune to this misuse.
+ s.ca = pkiCA()
- if s.config.Name == "" {
- return nil, fmt.Errorf("Name not set")
- }
- port := s.config.Port
- if port == 0 {
- port = int(node.ConsensusPort)
- }
-
- cfg := embed.NewConfig()
-
- cfg.Name = s.config.Name
- cfg.Dir = s.config.Data.Data.FullPath()
- cfg.InitialClusterToken = DefaultClusterToken
-
- cfg.PeerTLSInfo.CertFile = s.config.Data.PeerPKI.Certificate.FullPath()
- cfg.PeerTLSInfo.KeyFile = s.config.Data.PeerPKI.Key.FullPath()
- cfg.PeerTLSInfo.TrustedCAFile = s.config.Data.PeerPKI.CACertificate.FullPath()
- cfg.PeerTLSInfo.ClientCertAuth = true
- cfg.PeerTLSInfo.CRLFile = s.config.Data.PeerCRL.FullPath()
-
- cfg.LCUrls = []url.URL{{
- Scheme: "unix",
- Path: s.config.Ephemeral.ClientSocket.FullPath() + ":0",
- }}
- cfg.ACUrls = []url.URL{}
- cfg.LPUrls = []url.URL{{
- Scheme: "https",
- Host: fmt.Sprintf("[::]:%d", port),
- }}
-
- // Always listen on the address pointed to by our name - unless running in
- // tests, where we can't control our hostname easily.
- externalHost := fmt.Sprintf("%s:%d", s.config.Name, port)
- if s.config.externalHost != "" {
- externalHost = fmt.Sprintf("%s:%d", s.config.externalHost, port)
- }
- cfg.APUrls = []url.URL{{
- Scheme: "https",
- Host: externalHost,
- }}
-
- if s.config.NewCluster {
- cfg.ClusterState = "new"
- cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
- } else {
- cfg.ClusterState = "existing"
- }
-
+ // Create log converter. This will ingest etcd logs and pipe them out to this
+ // runnable's leveled logging facilities.
+ //
+ // TODO(q3k): add support for streaming to a sub-logger in the tree to get
+ // cleaner logs.
converter := unraw.Converter{
- Parser: parseEtcdLogEntry,
- // The initial line from a starting etcd instance is fairly long.
+ Parser: parseEtcdLogEntry,
MaximumLineLength: 8192,
LeveledLogger: supervisor.Logger(ctx),
}
fifoPath := s.config.Ephemeral.ServerLogsFIFO.FullPath()
pipe, err := converter.NamedPipeReader(fifoPath)
if err != nil {
- return nil, fmt.Errorf("could not get named pipe reader: %w", err)
+ return fmt.Errorf("when creating pipe reader: %w", err)
}
- if err := supervisor.Run(ctx, "pipe", pipe); err != nil {
- return nil, fmt.Errorf("could not start server log reader: %w", err)
+ if err := supervisor.Run(ctx, "piper", pipe); err != nil {
+ return fmt.Errorf("when starting log piper: %w", err)
}
- // TODO(q3k): pipe logs from etcd to supervisor.RawLogger via a file.
- cfg.Logger = DefaultLogger
- cfg.LogOutputs = []string{fifoPath}
+ // Create autopromoter, which will automatically promote all learners to full
+ // etcd members.
+ if err := supervisor.Run(ctx, "autopromoter", s.autopromoter); err != nil {
+ return fmt.Errorf("when starting autopromtoer: %w", err)
+ }
- return cfg, nil
+ // Create selfupdater, which will perform a one-shot update of this member's
+ // peer address in etcd.
+ if err := supervisor.Run(ctx, "selfupadater", s.selfupdater); err != nil {
+ return fmt.Errorf("when starting selfupdater: %w", err)
+ }
+
+ // Prepare cluster PKI credentials.
+ ppki := s.config.Data.PeerPKI
+ jc := s.config.JoinCluster
+ if jc != nil {
+ // For nodes that join an existing cluster, or re-join it, always write whatever
+ // we've been given on startup.
+ if err := ppki.WriteAll(jc.NodeCertificate.Raw, s.config.NodePrivateKey, jc.CACertificate.Raw); err != nil {
+ return fmt.Errorf("when writing credentials for join: %w", err)
+ }
+ if err := s.config.Data.PeerCRL.Write(jc.InitialCRL.Raw, 0400); err != nil {
+ return fmt.Errorf("when writing CRL for join: %w", err)
+ }
+ } else {
+ // For other nodes, we should already have credentials from a previous join, or
+ // a previous bootstrap. If none exist, assume we need to bootstrap these
+ // credentials.
+ //
+ // TODO(q3k): once we have node join (ie. node restart from disk) flow, add a
+ // special configuration marker to prevent spurious bootstraps.
+ absent, err := ppki.AllAbsent()
+ if err != nil {
+ return fmt.Errorf("when checking for PKI file absence: %w", err)
+ }
+ if absent {
+ if err := s.bootstrap(ctx, fifoPath); err != nil {
+ return fmt.Errorf("bootstrap failed: %w", err)
+ }
+ } else {
+ supervisor.Logger(ctx).Info("PKI data present, not bootstrapping.")
+ }
+ }
+
+ // Start etcd ...
+ cfg := s.config.build(true)
+ server, err := embed.StartEtcd(cfg)
+ if err != nil {
+ return fmt.Errorf("when starting etcd: %w", err)
+ }
+
+ // ... wait for server to be ready...
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case <-server.Server.ReadyNotify():
+ }
+
+ // ... build a client to its' socket...
+ cl, err := s.config.localClient()
+ if err != nil {
+ return fmt.Errorf("getting local client failed: %w", err)
+ }
+
+ // ... and wait until we're not a learner anymore.
+ for {
+ members, err := cl.MemberList(ctx)
+ if err != nil {
+ supervisor.Logger(ctx).Warningf("MemberList failed: %v", err)
+ time.Sleep(time.Second)
+ continue
+ }
+
+ isMember := false
+ for _, member := range members.Members {
+ if member.ID != uint64(server.Server.ID()) {
+ continue
+ }
+ if !member.IsLearner {
+ isMember = true
+ break
+ }
+ }
+ if isMember {
+ break
+ }
+ supervisor.Logger(ctx).Warningf("Still a learner, waiting...")
+ time.Sleep(time.Second)
+ }
+
+ // All done! Report status.
+ supervisor.Logger(ctx).Infof("etcd server ready")
+
+ st := &Status{
+ localPeerURL: cfg.APUrls[0].String(),
+ localMemberID: uint64(server.Server.ID()),
+ cl: cl,
+ ca: s.ca,
+ }
+ s.value.Set(st)
+
+ // Wait until server dies for whatever reason, update status when that
+ // happens.
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ select {
+ case err = <-server.Err():
+ err = fmt.Errorf("server returned error: %w", err)
+ case <-ctx.Done():
+ server.Close()
+ err = ctx.Err()
+ }
+ st.stopped = true
+ s.value.Set(st)
+ return err
}
-// Run is a Supervisor runnable that starts the etcd member service. It will
-// become healthy once the member joins the cluster successfully.
-func (s *Service) Run(ctx context.Context) error {
- st := &state{
- ready: *atomic.NewBool(false),
- }
- s.stateMu.Lock()
- s.state = st
- s.stateMu.Unlock()
-
- if s.config.NewCluster {
- // Create certificate if absent. It can only be present if we attempt
- // to re-start the service in NewCluster after a failure. This can
- // happen if etcd crashed or failed to start up before (eg. because of
- // networking not having settled yet).
- absent, err := s.config.Data.PeerPKI.AllAbsent()
+func clientFor(kv *clientv3.Client, parts ...string) (client.Namespaced, error) {
+ var err error
+ namespaced := client.NewLocal(kv)
+ for _, el := range parts {
+ namespaced, err = namespaced.Sub(el)
if err != nil {
- return fmt.Errorf("checking certificate existence: %w", err)
+ return nil, fmt.Errorf("when getting sub client: %w", err)
}
- if absent {
- // Generate CA, keep in memory, write it down in etcd later.
- st.ca, err = ca.New("Metropolis etcd peer Root CA")
- if err != nil {
- return fmt.Errorf("when creating new cluster's peer CA: %w", err)
- }
-
- cert, key, err := st.ca.Issue(ctx, nil, s.config.Name)
- if err != nil {
- return fmt.Errorf("when issuing new cluster's first certificate: %w", err)
- }
-
- if err := s.config.Data.PeerPKI.MkdirAll(0700); err != nil {
- return fmt.Errorf("when creating PKI directory: %w", err)
- }
- if err := s.config.Data.PeerPKI.CACertificate.Write(pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: st.ca.CACertRaw}), 0600); err != nil {
- return fmt.Errorf("when writing CA certificate to disk: %w", err)
- }
- if err := s.config.Data.PeerPKI.Certificate.Write(pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: cert}), 0600); err != nil {
- return fmt.Errorf("when writing certificate to disk: %w", err)
- }
- if err := s.config.Data.PeerPKI.Key.Write(pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: key}), 0600); err != nil {
- return fmt.Errorf("when writing certificate to disk: %w", err)
- }
- }
}
+ return namespaced, nil
+}
- // Expect certificate to be present on disk.
- present, err := s.config.Data.PeerPKI.AllExist()
- if err != nil {
- return fmt.Errorf("checking certificate existence: %w", err)
- }
- if !present {
- return fmt.Errorf("etcd starting without fully ready certificates - aborted NewCluster or corrupted local storage?")
- }
+// bootstrap performs a procedure to resolve the following bootstrap problems:
+// in order to start an etcd server for consensus, we need it to serve over TLS.
+// However, these TLS certificates also need to be stored in etcd so that
+// further certificates can be issued for new nodes.
+//
+// This was previously solved by a using a special PKI/TLS management system that
+// could first create certificates and keys in memory, then only commit them to
+// etcd. However, this ended up being somewhat brittle in the face of startup
+// sequencing issues, so we're now going with a different approach.
+//
+// This function starts an etcd instance first without any PKI/TLS support,
+// without listening on any external port for peer traffic. Once the instance is
+// running, it uses the standard metropolis pki library to create all required
+// data directly in the running etcd instance. It then writes all required
+// startup data (node private key, member certificate, CA certificate) to disk,
+// so that a 'full' etcd instance can be started.
+func (s *Service) bootstrap(ctx context.Context, fifoPath string) error {
+ supervisor.Logger(ctx).Infof("Bootstrapping PKI: starting etcd...")
- cfg, err := s.configure(ctx)
- if err != nil {
- return fmt.Errorf("when configuring etcd: %w", err)
- }
+ cfg := s.config.build(false)
+ // This will make etcd create data directories and create a fully new cluster if
+ // needed. If we're restarting due to an error, the old cluster data will still
+ // exist.
+ cfg.ClusterState = "new"
+ // Start the bootstrap etcd instance...
server, err := embed.StartEtcd(cfg)
- keep := false
- defer func() {
- if !keep && server != nil {
- server.Close()
- }
- }()
if err != nil {
return fmt.Errorf("failed to start etcd: %w", err)
}
- st.etcd = server
- okay := true
+ // ... wait for it to run ...
select {
- case <-st.etcd.Server.ReadyNotify():
+ case <-server.Server.ReadyNotify():
case <-ctx.Done():
- okay = false
+ return fmt.Errorf("when waiting for bootstrap etcd: %w", err)
}
- if !okay {
- supervisor.Logger(ctx).Info("context done, aborting wait")
- return ctx.Err()
- }
-
- socket := s.config.Ephemeral.ClientSocket.FullPath()
- cl, err := clientv3.New(clientv3.Config{
- Endpoints: []string{fmt.Sprintf("unix://%s:0", socket)},
- DialTimeout: time.Second,
- })
+ // ... create a client to it ...
+ cl, err := s.config.localClient()
if err != nil {
- return fmt.Errorf("failed to connect to new etcd instance: %w", err)
- }
- st.cl = cl
-
- if s.config.NewCluster {
- if st.ca == nil {
- panic("peerCA has not been generated")
- }
-
- // Save new CA into etcd.
- err = st.ca.Save(ctx, cl.KV)
- if err != nil {
- return fmt.Errorf("failed to save new CA to etcd: %w", err)
- }
- } else {
- // Load existing CA from etcd.
- st.ca, err = ca.Load(ctx, cl.KV)
- if err != nil {
- return fmt.Errorf("failed to load CA from etcd: %w", err)
- }
+ return fmt.Errorf("when getting bootstrap client: %w", err)
}
- // Start CRL watcher.
- if err := supervisor.Run(ctx, "crl", s.watchCRL); err != nil {
- return fmt.Errorf("failed to start CRL watcher: %w", err)
+ // ... and build PKI there. This is idempotent, so we will never override
+ // anything that's already in the cluster, instead just retrieve it.
+ supervisor.Logger(ctx).Infof("Bootstrapping PKI: etcd running, building PKI...")
+ clPKI, err := clientFor(cl, "namespaced", "etcd-pki")
+ if err != nil {
+ return fmt.Errorf("when getting pki client: %w", err)
}
- // Start autopromoter.
- if err := supervisor.Run(ctx, "autopromoter", s.autopromoter); err != nil {
- return fmt.Errorf("failed to start autopromoter: %w", err)
+ defer clPKI.Close()
+ caCert, err := s.ca.Ensure(ctx, clPKI)
+ if err != nil {
+ return fmt.Errorf("failed to ensure CA certificate: %w", err)
}
- supervisor.Logger(ctx).Info("etcd is now ready")
- keep = true
- st.ready.Store(true)
- supervisor.Signal(ctx, supervisor.SignalHealthy)
+ // If we're running with a test overridden external address (eg. localhost), we
+ // need to also make that part of the member certificate.
+ var extraNames []string
+ if external := s.config.testOverrides.externalAddress; external != "" {
+ extraNames = []string{external}
+ }
+ memberTemplate := pki.Certificate{
+ Name: identity.NodeID(s.config.nodePublicKey()),
+ Namespace: &pkiNamespace,
+ Issuer: s.ca,
+ Template: pkiPeerCertificate(s.config.nodePublicKey(), extraNames),
+ Mode: pki.CertificateExternal,
+ PublicKey: s.config.nodePublicKey(),
+ }
+ memberCert, err := memberTemplate.Ensure(ctx, clPKI)
+ if err != nil {
+ return fmt.Errorf("failed to ensure member certificate: %w", err)
+ }
- <-ctx.Done()
- st.etcd.Close()
+ // Retrieve CRL.
+ crlW := s.ca.WatchCRL(clPKI)
+ crl, err := crlW.Get(ctx)
+ if err != nil {
+ return fmt.Errorf("failed to retrieve initial CRL: %w", err)
+ }
+
+ // We have everything we need. Write things to disk.
+ supervisor.Logger(ctx).Infof("Bootstrapping PKI: certificates issued, writing to disk...")
+
+ if err := s.config.Data.PeerPKI.WriteAll(memberCert, s.config.NodePrivateKey, caCert); err != nil {
+ return fmt.Errorf("failed to write bootstrapped certificates: %w", err)
+ }
+ if err := s.config.Data.PeerCRL.Write(crl.Raw, 0400); err != nil {
+ return fmt.Errorf("failed tow rite CRL: %w", err)
+ }
+
+ // Stop the server synchronously (blocking until it's fully shutdown), and
+ // return. The caller can now run the 'full' etcd instance with PKI.
+ supervisor.Logger(ctx).Infof("Bootstrapping PKI: done, stopping server...")
+ server.Close()
return ctx.Err()
}
-// watchCRL is a sub-runnable of the etcd cluster member service that updates
-// the on-local-storage CRL to match the newest available version in etcd.
-func (s *Service) watchCRL(ctx context.Context) error {
- s.stateMu.Lock()
- cl := s.state.cl
- ca := s.state.ca
- s.stateMu.Unlock()
-
- supervisor.Signal(ctx, supervisor.SignalHealthy)
- for e := range ca.WaitCRLChange(ctx, cl.KV, cl.Watcher) {
- if e.Err != nil {
- return fmt.Errorf("watching CRL: %w", e.Err)
- }
-
- if err := s.config.Data.PeerCRL.Write(e.CRL, 0600); err != nil {
- return fmt.Errorf("saving CRL: %w", err)
- }
- }
-
- // unreachable
- return nil
-}
-
+// autopromoter is a runnable which repeatedly attempts to promote etcd learners
+// in the cluster to full followers. This is needed to bring any new cluster
+// members (which are always added as learners) to full membership and make them
+// part of the etcd quorum.
func (s *Service) autopromoter(ctx context.Context) error {
- t := time.NewTicker(5 * time.Second)
- defer t.Stop()
-
- autopromote := func() {
- s.stateMu.Lock()
- st := s.state
- s.stateMu.Unlock()
-
- if st.etcd.Server.Leader() != st.etcd.Server.ID() {
+ autopromote := func(ctx context.Context, cl *clientv3.Client) {
+ // Only autopromote if our endpoint is a leader. This is a bargain bin version
+ // of leader election: it's simple and cheap, but not very reliable. The most
+ // obvious failure mode is that the instance we contacted isn't a leader by the
+ // time we promote a member, but that's fine - the promotion is idempotent. What
+ // we really use the 'leader election' here for isn't for consistency, but to
+ // prevent the cluster from being hammered by spurious leadership promotion
+ // requests from every etcd member.
+ status, err := cl.Status(ctx, cl.Endpoints()[0])
+ if err != nil {
+ supervisor.Logger(ctx).Warningf("Failed to get endpoint status: %v", err)
+ }
+ if status.Leader != status.Header.MemberId {
return
}
- for _, member := range st.etcd.Server.Cluster().Members() {
+ members, err := cl.MemberList(ctx)
+ if err != nil {
+ supervisor.Logger(ctx).Warningf("Failed to list members: %v", err)
+ return
+ }
+ for _, member := range members.Members {
if !member.IsLearner {
continue
}
-
- // We always call PromoteMember since the metadata necessary to
- // decide if we should is private. Luckily etcd already does
- // sanity checks internally and will refuse to promote nodes that
- // aren't connected or are still behind on transactions.
- if _, err := st.etcd.Server.PromoteMember(ctx, uint64(member.ID)); err != nil {
+ // Always call PromoteMember since the metadata necessary to decide if we should
+ // is private. Luckily etcd already does consistency checks internally and will
+ // refuse to promote nodes that aren't connected or are still behind on
+ // transactions.
+ if _, err := cl.MemberPromote(ctx, member.ID); err != nil {
supervisor.Logger(ctx).Infof("Failed to promote consensus node %s: %v", member.Name, err)
} else {
supervisor.Logger(ctx).Infof("Promoted new consensus node %s", member.Name)
@@ -374,80 +452,49 @@
}
}
+ w := s.Watch()
for {
- select {
- case <-ctx.Done():
- return ctx.Err()
- case <-t.C:
- autopromote()
+ st, err := w.Get(ctx)
+ if err != nil {
+ return fmt.Errorf("status get failed: %w", err)
}
- }
-}
-
-// IsReady returns whether etcd is ready and synced
-func (s *Service) IsReady() bool {
- s.stateMu.Lock()
- defer s.stateMu.Unlock()
- if s.state == nil {
- return false
- }
- return s.state.ready.Load()
-}
-
-func (s *Service) WaitReady(ctx context.Context) error {
- // TODO(q3k): reimplement the atomic ready flag as an event synchronization
- // mechanism
- if s.IsReady() {
- return nil
- }
- t := time.NewTicker(100 * time.Millisecond)
- defer t.Stop()
- for {
- select {
- case <-ctx.Done():
- return ctx.Err()
- case <-t.C:
- if s.IsReady() {
- return nil
+ t := time.NewTicker(5 * time.Second)
+ for {
+ autopromote(ctx, st.cl)
+ select {
+ case <-ctx.Done():
+ t.Stop()
+ return ctx.Err()
+ case <-t.C:
}
}
}
}
-func (s *Service) Client() client.Namespaced {
- s.stateMu.Lock()
- defer s.stateMu.Unlock()
- // 'namespaced' is the root of all namespaced clients within the etcd K/V
- // store, with further paths in a colon-separated format, eg.:
- // namespaced:example/
- // namespaced:foo:bar:baz/
- client, err := client.NewLocal(s.state.cl).Sub("namespaced")
- if err != nil {
- // This error can only happen due to a malformed path, which is
- // constant. Thus, this is a programming error and we panic.
- panic(fmt.Errorf("Could not get consensus etcd client: %v", err))
+// selfupdater is a runnable that performs a one-shot (once per Service Run,
+// thus once for each configuration) update of the node's Peer URL in etcd. This
+// is currently only really needed because the first node in the cluster
+// bootstraps itself without any peer URLs at first, and this allows it to then
+// add the peer URLs afterwards. Instead of a runnable, this might as well have
+// been part of the bootstarp logic, but making it a restartable runnable is
+// more robust.
+func (s *Service) selfupdater(ctx context.Context) error {
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ w := s.Watch()
+ for {
+ st, err := w.Get(ctx)
+ if err != nil {
+ return fmt.Errorf("failed to get status: %w", err)
+ }
+
+ peerURL := st.localPeerURL
+ if _, err := st.cl.MemberUpdate(ctx, st.localMemberID, []string{peerURL}); err != nil {
+ supervisor.Logger(ctx).Warningf("failed to update member: %v", err)
+ time.Sleep(1 * time.Second)
+ continue
+ }
+
+ supervisor.Signal(ctx, supervisor.SignalDone)
+ return nil
}
- return client
-}
-
-func (s *Service) Cluster() clientv3.Cluster {
- s.stateMu.Lock()
- defer s.stateMu.Unlock()
- return s.state.cl.Cluster
-}
-
-// MemberInfo returns information about this etcd cluster member: its ID and
-// name. This will block until this information is available (ie. the cluster
-// status is Ready).
-func (s *Service) MemberInfo(ctx context.Context) (id uint64, name string, err error) {
- if err = s.WaitReady(ctx); err != nil {
- err = fmt.Errorf("when waiting for cluster readiness: %w", err)
- return
- }
-
- s.stateMu.Lock()
- defer s.stateMu.Unlock()
- id = uint64(s.state.etcd.Server.ID())
- name = s.config.Name
- return
}
diff --git a/metropolis/node/core/consensus/consensus_test.go b/metropolis/node/core/consensus/consensus_test.go
index 16d6f76..8028dd9 100644
--- a/metropolis/node/core/consensus/consensus_test.go
+++ b/metropolis/node/core/consensus/consensus_test.go
@@ -19,25 +19,26 @@
import (
"bytes"
"context"
- "crypto/x509"
+ "crypto/ed25519"
+ "crypto/rand"
"os"
"testing"
- "time"
"source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/node/core/localstorage/declarative"
- "source.monogon.dev/metropolis/pkg/freeport"
"source.monogon.dev/metropolis/pkg/supervisor"
)
type boilerplate struct {
- ctx context.Context
- ctxC context.CancelFunc
- root *localstorage.Root
- tmpdir string
+ ctx context.Context
+ ctxC context.CancelFunc
+ root *localstorage.Root
+ privkey ed25519.PrivateKey
+ tmpdir string
}
func prep(t *testing.T) *boilerplate {
+ t.Helper()
ctx, ctxC := context.WithCancel(context.Background())
root := &localstorage.Root{}
// Force usage of /tmp as temp directory root, otherwsie TMPDIR from Bazel
@@ -54,11 +55,17 @@
os.MkdirAll(root.Data.Etcd.FullPath(), 0700)
os.MkdirAll(root.Ephemeral.Consensus.FullPath(), 0700)
+ _, privkey, err := ed25519.GenerateKey(rand.Reader)
+ if err != nil {
+ t.Fatal(err)
+ }
+
return &boilerplate{
- ctx: ctx,
- ctxC: ctxC,
- root: root,
- tmpdir: tmp,
+ ctx: ctx,
+ ctxC: ctxC,
+ root: root,
+ privkey: privkey,
+ tmpdir: tmp,
}
}
@@ -67,119 +74,127 @@
os.RemoveAll(b.tmpdir)
}
-func waitEtcd(t *testing.T, s *Service) {
- for {
- if s.IsReady() {
- break
- }
- time.Sleep(100 * time.Millisecond)
- }
-}
-
func TestBootstrap(t *testing.T) {
b := prep(t)
defer b.close()
etcd := New(Config{
- Data: &b.root.Data.Etcd,
- Ephemeral: &b.root.Ephemeral.Consensus,
- Name: "test",
- NewCluster: true,
- Port: freeport.MustConsume(freeport.AllocateTCPPort()),
- externalHost: "127.0.0.1",
+ Data: &b.root.Data.Etcd,
+ Ephemeral: &b.root.Ephemeral.Consensus,
+ NodePrivateKey: b.privkey,
+ testOverrides: testOverrides{
+ externalPort: 1234,
+ },
})
- supervisor.TestHarness(t, etcd.Run)
- waitEtcd(t, etcd)
+ ctxC, _ := supervisor.TestHarness(t, etcd.Run)
+ defer ctxC()
- kv := etcd.Client()
- if _, err := kv.Put(b.ctx, "/foo", "bar"); err != nil {
+ w := etcd.Watch()
+ st, err := w.Get(b.ctx)
+ if err != nil {
+ t.Fatalf("Get: %v", err)
+ }
+ cl, err := st.MetropolisClient()
+ if err != nil {
+ t.Fatalf("MetropolisClient: %v", err)
+ }
+ defer cl.Close()
+
+ if _, err := cl.Put(b.ctx, "/foo", "bar"); err != nil {
t.Fatalf("test key creation failed: %v", err)
}
- if _, err := kv.Get(b.ctx, "/foo"); err != nil {
+ if _, err := cl.Get(b.ctx, "/foo"); err != nil {
t.Fatalf("test key retrieval failed: %v", err)
}
}
-func TestMemberInfo(t *testing.T) {
- b := prep(t)
- defer b.close()
- etcd := New(Config{
- Data: &b.root.Data.Etcd,
- Ephemeral: &b.root.Ephemeral.Consensus,
- Name: "test",
- NewCluster: true,
- Port: freeport.MustConsume(freeport.AllocateTCPPort()),
- externalHost: "127.0.0.1",
- })
- supervisor.TestHarness(t, etcd.Run)
- waitEtcd(t, etcd)
-
- id, name, err := etcd.MemberInfo(b.ctx)
- if err != nil {
- t.Fatalf("MemberInfo: %v", err)
- }
-
- // Compare name with configured name.
- if want, got := "test", name; want != got {
- t.Errorf("MemberInfo returned name %q, wanted %q (per config)", got, want)
- }
-
- // Compare name with cluster information.
- members, err := etcd.Cluster().MemberList(b.ctx)
- if err != nil {
- t.Errorf("MemberList: %v", err)
- }
-
- if want, got := 1, len(members.Members); want != got {
- t.Fatalf("expected one cluster member, got %d", got)
- }
- if want, got := id, members.Members[0].ID; want != got {
- t.Errorf("MemberInfo returned ID %d, Cluster endpoint says %d", want, got)
- }
- if want, got := name, members.Members[0].Name; want != got {
- t.Errorf("MemberInfo returned name %q, Cluster endpoint says %q", want, got)
- }
-}
-
func TestRestartFromDisk(t *testing.T) {
b := prep(t)
defer b.close()
- startEtcd := func(new bool) (*Service, context.CancelFunc) {
- etcd := New(Config{
- Data: &b.root.Data.Etcd,
- Ephemeral: &b.root.Ephemeral.Consensus,
- Name: "test",
- NewCluster: new,
- Port: freeport.MustConsume(freeport.AllocateTCPPort()),
- externalHost: "127.0.0.1",
- })
- ctxC, _ := supervisor.TestHarness(t, etcd.Run)
- waitEtcd(t, etcd)
+ // Start once.
+ etcd := New(Config{
+ Data: &b.root.Data.Etcd,
+ Ephemeral: &b.root.Ephemeral.Consensus,
+ NodePrivateKey: b.privkey,
+ testOverrides: testOverrides{
+ externalPort: 1235,
+ },
+ })
+ ctxC, _ := supervisor.TestHarness(t, etcd.Run)
+ defer ctxC()
- kv := etcd.Client()
- if new {
- if _, err := kv.Put(b.ctx, "/foo", "bar"); err != nil {
- t.Fatalf("test key creation failed: %v", err)
- }
- }
- if _, err := kv.Get(b.ctx, "/foo"); err != nil {
- t.Fatalf("test key retrieval failed: %v", err)
- }
+ w := etcd.Watch()
+ st, err := w.Get(b.ctx)
+ if err != nil {
+ t.Fatalf("status get failed: %v", err)
+ }
+ cl, err := st.MetropolisClient()
+ if err != nil {
+ t.Fatalf("MetropolisClient: %v", err)
+ }
+ defer cl.Close()
- return etcd, ctxC
+ if _, err := cl.Put(b.ctx, "/foo", "bar"); err != nil {
+ t.Fatalf("test key creation failed: %v", err)
+ }
+ firstCA, err := etcd.config.Data.PeerPKI.CACertificate.Read()
+ if err != nil {
+ t.Fatalf("could not read CA file: %v", err)
}
- etcd, ctxC := startEtcd(true)
- etcd.stateMu.Lock()
- firstCA := etcd.state.ca.CACertRaw
- etcd.stateMu.Unlock()
+ // Stop and wait until reported stopped.
ctxC()
+ ctxWait, ctxWaitC := context.WithCancel(context.Background())
+ for {
+ if st.stopped {
+ break
+ }
+ st, err = w.Get(ctxWait)
+ if err != nil {
+ t.Fatalf("status get failed: %v", err)
+ }
+ if st.stopped {
+ break
+ }
+ }
+ ctxWaitC()
- etcd, ctxC = startEtcd(false)
- etcd.stateMu.Lock()
- secondCA := etcd.state.ca.CACertRaw
- etcd.stateMu.Unlock()
+ // Restart.
+ etcd = New(Config{
+ Data: &b.root.Data.Etcd,
+ Ephemeral: &b.root.Ephemeral.Consensus,
+ NodePrivateKey: b.privkey,
+ testOverrides: testOverrides{
+ externalPort: 1235,
+ },
+ })
+ ctxC, _ = supervisor.TestHarness(t, etcd.Run)
+ defer ctxC()
+
+ w = etcd.Watch()
+ st, err = w.Get(b.ctx)
+ if err != nil {
+ t.Fatalf("status get failed: %v", err)
+ }
+ cl, err = st.MetropolisClient()
+ if err != nil {
+ t.Fatalf("MetropolisClient: %v", err)
+ }
+ defer cl.Close()
+
+ res, err := cl.Get(b.ctx, "/foo")
+ if err != nil {
+ t.Fatalf("test key retrieval failed: %v", err)
+ }
+ if len(res.Kvs) != 1 || string(res.Kvs[0].Value) != "bar" {
+ t.Fatalf("test key value missing: %v", res.Kvs)
+ }
+
+ secondCA, err := etcd.config.Data.PeerPKI.CACertificate.Read()
+ if err != nil {
+ t.Fatalf("could not read CA file: %v", err)
+ }
ctxC()
if bytes.Compare(firstCA, secondCA) != 0 {
@@ -187,60 +202,78 @@
}
}
-func TestCRL(t *testing.T) {
+func TestJoin(t *testing.T) {
b := prep(t)
defer b.close()
+
+ // Start first node and perform write.
etcd := New(Config{
- Data: &b.root.Data.Etcd,
- Ephemeral: &b.root.Ephemeral.Consensus,
- Name: "test",
- NewCluster: true,
- Port: freeport.MustConsume(freeport.AllocateTCPPort()),
- externalHost: "127.0.0.1",
+ Data: &b.root.Data.Etcd,
+ Ephemeral: &b.root.Ephemeral.Consensus,
+ NodePrivateKey: b.privkey,
+ testOverrides: testOverrides{
+ externalPort: 3000,
+ externalAddress: "localhost",
+ },
})
- supervisor.TestHarness(t, etcd.Run)
- waitEtcd(t, etcd)
+ ctxC, _ := supervisor.TestHarness(t, etcd.Run)
+ defer ctxC()
- etcd.stateMu.Lock()
- ca := etcd.state.ca
- kv := etcd.state.cl.KV
- etcd.stateMu.Unlock()
-
- certRaw, _, err := ca.Issue(b.ctx, kv, "revoketest")
+ w := etcd.Watch()
+ st, err := w.Get(b.ctx)
if err != nil {
- t.Fatalf("cert issue failed: %v", err)
+ t.Fatalf("could not get status: %v", err)
}
- cert, err := x509.ParseCertificate(certRaw)
+ cl, err := st.MetropolisClient()
if err != nil {
- t.Fatalf("cert parse failed: %v", err)
+ t.Fatalf("MetropolisClient: %v", err)
+ }
+ defer cl.Close()
+ if _, err := cl.Put(b.ctx, "/foo", "bar"); err != nil {
+ t.Fatalf("test key creation failed: %v", err)
}
- if err := ca.Revoke(b.ctx, kv, "revoketest"); err != nil {
- t.Fatalf("cert revoke failed: %v", err)
+ // Start second node and ensure data is present.
+ b2 := prep(t)
+ defer b2.close()
+
+ join, err := st.AddNode(b.ctx, b2.privkey.Public().(ed25519.PublicKey), &AddNodeOption{
+ externalAddress: "localhost",
+ externalPort: 3001,
+ })
+ if err != nil {
+ t.Fatalf("could not add node: %v", err)
}
- for {
- time.Sleep(100 * time.Millisecond)
+ etcd2 := New(Config{
+ Data: &b2.root.Data.Etcd,
+ Ephemeral: &b2.root.Ephemeral.Consensus,
+ NodePrivateKey: b2.privkey,
+ JoinCluster: join,
+ testOverrides: testOverrides{
+ externalPort: 3001,
+ externalAddress: "localhost",
+ },
+ })
+ ctxC, _ = supervisor.TestHarness(t, etcd2.Run)
+ defer ctxC()
- crlRaw, err := b.root.Data.Etcd.PeerCRL.Read()
- if err != nil {
- // That's fine. Maybe it hasn't been written yet.
- continue
- }
- crl, err := x509.ParseCRL(crlRaw)
- if err != nil {
- // That's fine. Maybe it hasn't been written yet.
- continue
- }
+ w2 := etcd2.Watch()
+ st2, err := w2.Get(b.ctx)
+ if err != nil {
+ t.Fatalf("could not get status: %v", err)
+ }
+ cl2, err := st2.MetropolisClient()
+ if err != nil {
+ t.Fatalf("MetropolisClient: %v", err)
+ }
+ defer cl2.Close()
- found := false
- for _, revoked := range crl.TBSCertList.RevokedCertificates {
- if revoked.SerialNumber.Cmp(cert.SerialNumber) == 0 {
- found = true
- }
- }
- if found {
- break
- }
+ res, err := cl2.Get(b.ctx, "/foo")
+ if err != nil {
+ t.Fatalf("test key retrieval failed: %v", err)
+ }
+ if len(res.Kvs) != 1 || string(res.Kvs[0].Value) != "bar" {
+ t.Fatalf("test key value missing: %v", res.Kvs)
}
}
diff --git a/metropolis/node/core/consensus/status.go b/metropolis/node/core/consensus/status.go
new file mode 100644
index 0000000..04b13fa
--- /dev/null
+++ b/metropolis/node/core/consensus/status.go
@@ -0,0 +1,170 @@
+package consensus
+
+import (
+ "context"
+ "crypto/ed25519"
+ "crypto/x509"
+ "fmt"
+ "net"
+ "strconv"
+
+ "go.etcd.io/etcd/clientv3"
+
+ "source.monogon.dev/metropolis/node"
+ "source.monogon.dev/metropolis/node/core/consensus/client"
+ "source.monogon.dev/metropolis/node/core/identity"
+ "source.monogon.dev/metropolis/pkg/event"
+ "source.monogon.dev/metropolis/pkg/pki"
+)
+
+// Watch returns a Event Value compatible Watcher for accessing the State of the
+// consensus Service in a safe manner.
+func (s *Service) Watch() Watcher {
+ return Watcher{s.value.Watch()}
+}
+
+type Watcher struct {
+ event.Watcher
+}
+
+func (w *Watcher) Get(ctx context.Context, opts ...event.GetOption) (*Status, error) {
+ v, err := w.Watcher.Get(ctx, opts...)
+ if err != nil {
+ return nil, err
+ }
+ return v.(*Status), nil
+}
+
+// Status of the consensus service. It represents either a running consensus
+// service to which a client can connect and on which management can be
+// performed, or a stopped service.
+type Status struct {
+ localPeerURL string
+ localMemberID uint64
+ cl *clientv3.Client
+ ca *pki.Certificate
+ stopped bool
+}
+
+// Running returns true if this status represents a running consensus service
+// which can be connected to or managed. These calls are not guaranteed to
+// succeed (as the server might have stopped in the meantime), but the caller
+// can use this value as a hint to whether attempts to access the consensus
+// service should be done.
+func (s *Status) Running() bool {
+ return !s.stopped
+}
+
+func (s *Status) pkiClient() (client.Namespaced, error) {
+ return clientFor(s.cl, "namespaced", "etcd-pki")
+}
+
+// MetropolisClient returns a namespaced etcd client for use by the rest of the
+// metropolis code (thtough the cluster bootstrap code). This method is
+// deprecated, and will be replaced with more granular clients as the cluster
+// bootstrap code gets refactored.
+func (s *Status) MetropolisClient() (client.Namespaced, error) {
+ return clientFor(s.cl, "namespaced", "metropolis")
+}
+
+// AddNode creates a new consensus member corresponding to a given Ed25519 node
+// public key if one does not yet exist. The member will at first be marked as a
+// Learner, ensuring it does not take part in quorum until it has finished
+// catching up to the state of the etcd store. As it does, the autopromoter will
+// turn it into a 'full' node and it will start taking part in the quorum and be
+// able to perform all etcd operations.
+func (s *Status) AddNode(ctx context.Context, pk ed25519.PublicKey, opts ...*AddNodeOption) (*JoinCluster, error) {
+ clPKI, err := s.pkiClient()
+ if err != nil {
+ return nil, err
+ }
+
+ nodeID := identity.NodeID(pk)
+ var extraNames []string
+ name := nodeID
+ port := int(node.ConsensusPort)
+ for _, opt := range opts {
+ if opt.externalAddress != "" {
+ name = opt.externalAddress
+ extraNames = append(extraNames, name)
+ }
+ if opt.externalPort != 0 {
+ port = opt.externalPort
+ }
+ }
+
+ member := pki.Certificate{
+ Name: nodeID,
+ Namespace: &pkiNamespace,
+ Issuer: s.ca,
+ Template: pkiPeerCertificate(pk, extraNames),
+ Mode: pki.CertificateExternal,
+ PublicKey: pk,
+ }
+ caBytes, err := s.ca.Ensure(ctx, clPKI)
+ if err != nil {
+ return nil, fmt.Errorf("could not ensure CA certificate: %w", err)
+ }
+ memberBytes, err := member.Ensure(ctx, clPKI)
+ if err != nil {
+ return nil, fmt.Errorf("could not ensure member certificate: %w", err)
+ }
+ caCert, err := x509.ParseCertificate(caBytes)
+ if err != nil {
+ return nil, fmt.Errorf("could not parse CA certificate: %w", err)
+ }
+ memberCert, err := x509.ParseCertificate(memberBytes)
+ if err != nil {
+ return nil, fmt.Errorf("could not parse newly issued member certificate: %w", err)
+ }
+
+ members, err := s.cl.MemberList(ctx)
+ if err != nil {
+ return nil, fmt.Errorf("could not retrieve existing members: %w", err)
+ }
+
+ var existingNodes []ExistingNode
+ var newExists bool
+ for _, m := range members.Members {
+ if m.Name == nodeID {
+ newExists = true
+ }
+ if m.IsLearner {
+ continue
+ }
+ if len(m.PeerURLs) < 1 {
+ continue
+ }
+ existingNodes = append(existingNodes, ExistingNode{
+ Name: m.Name,
+ URL: m.PeerURLs[0],
+ })
+ }
+
+ crlW := s.ca.WatchCRL(clPKI)
+ crl, err := crlW.Get(ctx)
+ if err != nil {
+ return nil, fmt.Errorf("could not retrieve initial CRL: %w", err)
+ }
+
+ if !newExists {
+ addr := fmt.Sprintf("https://%s", net.JoinHostPort(name, strconv.Itoa(port)))
+ if _, err := s.cl.MemberAddAsLearner(ctx, []string{addr}); err != nil {
+ return nil, fmt.Errorf("could not add new member as learner: %w", err)
+ }
+ }
+
+ return &JoinCluster{
+ CACertificate: caCert,
+ NodeCertificate: memberCert,
+ ExistingNodes: existingNodes,
+ InitialCRL: crl,
+ }, nil
+}
+
+// AddNodeOptions can be passed to AddNode to influence the behaviour of the
+// function. Currently this is only used internally by tests.
+type AddNodeOption struct {
+ externalAddress string
+ externalPort int
+}