m/n/core/consensus: refactor for reliability and multinode support

This implements a big refactor of our consensus service/runnable.

First, we move away from the old bespoke API for retrieving the
consensus status (and consensus clients) into using Event Values, as the
rest of the codebase does.

Second, we move away from the bespoke PKI library used to generate
certificates in-memory and then commit them to etcd into using the
standard metropolis pki library. We then change the bootstrap process to
start a PKI-less etcd instance first, generate the PKI data directly on
the running instance, and then restart into a fully PKI-supporting etcd
instance.

We also move away from using etcd-specific private keys into reusing the
node's private key. This makes management slightly easier, but reviewers
should consider the security implications of this change.

Finally, we implement and test multi-member cluster support, which is
done by exposing an AddNode method to the newly exposed status, and a
JoinCluster option in the node configuration.

Change-Id: Iea2bf6114cb699d3792efd45d06de2fa5a48feb1
Reviewed-on: https://review.monogon.dev/c/monogon/+/466
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/node/core/consensus/consensus.go b/metropolis/node/core/consensus/consensus.go
index d0fe83f..f19e923 100644
--- a/metropolis/node/core/consensus/consensus.go
+++ b/metropolis/node/core/consensus/consensus.go
@@ -14,93 +14,138 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-// Package consensus implements a managed etcd cluster member service, with a
-// self-hosted CA system for issuing peer certificates. Currently each
-// Metropolis node runs an etcd member, and connects to the etcd member locally
-// over a domain socket.
+// Package consensus implements a runnable that manages an etcd instance which
+// forms part of a Metropolis etcd cluster. This cluster is a foundational
+// building block of Metropolis and its startup/management sequencing needs to
+// be as robust as possible.
 //
-// The service supports two modes of startup:
-//  - initializing a new cluster, by bootstrapping the CA in memory, starting a
-//    cluster, committing the CA to etcd afterwards, and saving the new node's
-//    certificate to local storage
-//  - joining an existing cluster, using certificates from local storage and
-//    loading the CA from etcd. This flow is also used when the node joins a
-//    cluster for the first time (then the certificates required must be
-//    provisioned externally before starting the consensus service).
+// Cluster Structure
 //
-// Regardless of how the etcd member service was started, the resulting running
-// service is further managed and used in the same way.
+// Each etcd instance listens for two kinds of traffic:
 //
+// 1. Peer traffic over TLS on a TCP port of the node's main interface. This is
+// where other etcd instances connect to to exchange peer traffic, perform
+// transactions and build quorum. The TLS credentials are stored in a PKI that
+// is managed internally by the consensus runnable, with its state stored in
+// etcd itself.
+//
+// 2. Client traffic over a local domain socket, with access control based on
+// standard Linux user/group permissions. Currently this allows any code running
+// as root on the host namespace full access to the etcd cluster.
+//
+// This means that if code running on a node wishes to perform etcd
+// transactions, it must also run an etcd instance. This colocation of all
+// direct etcd access and the etcd intances themselves effectively delegate all
+// Metropolis control plane functionality to whatever subset of nodes is running
+// consensus and all codes that connects to etcd directly (the Curator).
+//
+// For example, if nodes foo and bar are parts of the control plane, but node
+// worker is not:
+//
+//   .---------------------.
+//   | node-foo            |
+//   |---------------------|
+//   | .--------------------.
+//   | | etcd               |<---etcd/TLS--.   (node.ConsensusPort)
+//   | '--------------------'              |
+//   |     ^ Domain Socket |               |
+//   |     | etcd/plain    |               |
+//   | .--------------------.              |
+//   | | curator            |<---gRPC/TLS----. (node.CuratorServicePort)
+//   | '--------------------'              | |
+//   |     ^ Domain Socket |               | |
+//   |     | gRPC/plain    |               | |
+//   | .-----------------. |               | |
+//   | | node logic      | |               | |
+//   | '-----------------' |               | |
+//   '---------------------'               | |
+//                                         | |
+//   .---------------------.               | |
+//   | node-baz            |               | |
+//   |---------------------|               | |
+//   | .--------------------.              | |
+//   | | etcd               |<-------------' |
+//   | '--------------------'                |
+//   |     ^ Domain Socket |                 |
+//   |     | gRPC/plain    |                 |
+//   | .--------------------.                |
+//   | | curator            |<---gRPC/TLS----:
+//   | '--------------------'                |
+//   |    ...              |                 |
+//   '---------------------'                 |
+//                                           |
+//   .---------------------.                 |
+//   | node-worker         |                 |
+//   |---------------------|                 |
+//   | .-----------------. |                 |
+//   | | node logic      |-------------------'
+//   | '-----------------' |
+//   '---------------------'
+//
+
 package consensus
 
 import (
 	"context"
-	"encoding/pem"
+	"crypto/ed25519"
+	"crypto/x509"
+	"crypto/x509/pkix"
 	"fmt"
-	"net/url"
-	"sync"
+	"math/big"
 	"time"
 
 	"go.etcd.io/etcd/clientv3"
 	"go.etcd.io/etcd/embed"
-	"go.uber.org/atomic"
 
-	node "source.monogon.dev/metropolis/node"
-	"source.monogon.dev/metropolis/node/core/consensus/ca"
 	"source.monogon.dev/metropolis/node/core/consensus/client"
-	"source.monogon.dev/metropolis/node/core/localstorage"
+	"source.monogon.dev/metropolis/node/core/identity"
+	"source.monogon.dev/metropolis/pkg/event/memory"
 	"source.monogon.dev/metropolis/pkg/logtree/unraw"
+	"source.monogon.dev/metropolis/pkg/pki"
 	"source.monogon.dev/metropolis/pkg/supervisor"
 )
 
-const (
-	DefaultClusterToken = "METROPOLIS"
-	DefaultLogger       = "zap"
+var (
+	pkiNamespace = pki.Namespaced("/pki/")
 )
 
-// Service is the etcd cluster member service.
+func pkiCA() *pki.Certificate {
+	return &pki.Certificate{
+		Name:      "CA",
+		Namespace: &pkiNamespace,
+		Issuer:    pki.SelfSigned,
+		Template: x509.Certificate{
+			SerialNumber: big.NewInt(1),
+			Subject: pkix.Name{
+				CommonName: "Metropolis etcd CA Certificate",
+			},
+			IsCA:        true,
+			KeyUsage:    x509.KeyUsageCertSign | x509.KeyUsageCRLSign | x509.KeyUsageDigitalSignature,
+			ExtKeyUsage: []x509.ExtKeyUsage{x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth, x509.ExtKeyUsageOCSPSigning},
+		},
+	}
+}
+
+func pkiPeerCertificate(pubkey ed25519.PublicKey, extraNames []string) x509.Certificate {
+	return x509.Certificate{
+		Subject: pkix.Name{
+			CommonName: identity.NodeID(pubkey),
+		},
+		KeyUsage: x509.KeyUsageDigitalSignature | x509.KeyUsageKeyEncipherment,
+		ExtKeyUsage: []x509.ExtKeyUsage{
+			x509.ExtKeyUsageClientAuth, x509.ExtKeyUsageServerAuth,
+		},
+		DNSNames: append(extraNames, identity.NodeID(pubkey)),
+	}
+}
+
+// Service is the etcd cluster member service. See package-level documentation
+// for more information.
 type Service struct {
-	// The configuration with which the service was started. This is immutable.
 	config *Config
 
-	// stateMu guards state. This is locked internally on public methods of
-	// Service that require access to state. The state might be recreated on
-	// service restart.
-	stateMu sync.Mutex
-	state   *state
-}
-
-// state is the runtime state of a running etcd member.
-type state struct {
-	etcd  *embed.Etcd
-	ready atomic.Bool
-
-	ca *ca.CA
-	// cl is an etcd client that loops back to the localy running etcd server.
-	// This runs over the Client unix domain socket that etcd starts.
-	cl *clientv3.Client
-}
-
-type Config struct {
-	// Data directory (persistent, encrypted storage) for etcd.
-	Data *localstorage.DataEtcdDirectory
-	// Ephemeral directory for etcd.
-	Ephemeral *localstorage.EphemeralConsensusDirectory
-
-	// Name is the cluster name. This must be the same amongst all etcd members
-	// within one cluster.
-	Name string
-	// NewCluster selects whether the etcd member will start a new cluster and
-	// bootstrap a CA and the first member certificate, or load existing PKI
-	// certificates from disk.
-	NewCluster bool
-	// Port is the port at which this cluster member will listen for other
-	// members. If zero, defaults to the global Metropolis setting.
-	Port int
-
-	// externalHost is used by tests to override the address at which etcd
-	// should listen for peer connections.
-	externalHost string
+	value memory.Value
+	ca    *pki.Certificate
 }
 
 func New(config Config) *Service {
@@ -109,264 +154,297 @@
 	}
 }
 
-// configure transforms the service configuration into an embedded etcd
-// configuration. This is pure and side effect free.
-func (s *Service) configure(ctx context.Context) (*embed.Config, error) {
-	if err := s.config.Ephemeral.MkdirAll(0700); err != nil {
-		return nil, fmt.Errorf("failed to create ephemeral directory: %w", err)
-	}
-	if err := s.config.Data.MkdirAll(0700); err != nil {
-		return nil, fmt.Errorf("failed to create data directory: %w", err)
-	}
+// Run is a Supervisor runnable that starts the etcd member service. It will
+// become healthy once the member joins the cluster successfully.
+func (s *Service) Run(ctx context.Context) error {
+	// Always re-create CA to make sure we don't have PKI state from previous runs.
+	//
+	// TODO(q3k): make the PKI library immune to this misuse.
+	s.ca = pkiCA()
 
-	if s.config.Name == "" {
-		return nil, fmt.Errorf("Name not set")
-	}
-	port := s.config.Port
-	if port == 0 {
-		port = int(node.ConsensusPort)
-	}
-
-	cfg := embed.NewConfig()
-
-	cfg.Name = s.config.Name
-	cfg.Dir = s.config.Data.Data.FullPath()
-	cfg.InitialClusterToken = DefaultClusterToken
-
-	cfg.PeerTLSInfo.CertFile = s.config.Data.PeerPKI.Certificate.FullPath()
-	cfg.PeerTLSInfo.KeyFile = s.config.Data.PeerPKI.Key.FullPath()
-	cfg.PeerTLSInfo.TrustedCAFile = s.config.Data.PeerPKI.CACertificate.FullPath()
-	cfg.PeerTLSInfo.ClientCertAuth = true
-	cfg.PeerTLSInfo.CRLFile = s.config.Data.PeerCRL.FullPath()
-
-	cfg.LCUrls = []url.URL{{
-		Scheme: "unix",
-		Path:   s.config.Ephemeral.ClientSocket.FullPath() + ":0",
-	}}
-	cfg.ACUrls = []url.URL{}
-	cfg.LPUrls = []url.URL{{
-		Scheme: "https",
-		Host:   fmt.Sprintf("[::]:%d", port),
-	}}
-
-	// Always listen on the address pointed to by our name - unless running in
-	// tests, where we can't control our hostname easily.
-	externalHost := fmt.Sprintf("%s:%d", s.config.Name, port)
-	if s.config.externalHost != "" {
-		externalHost = fmt.Sprintf("%s:%d", s.config.externalHost, port)
-	}
-	cfg.APUrls = []url.URL{{
-		Scheme: "https",
-		Host:   externalHost,
-	}}
-
-	if s.config.NewCluster {
-		cfg.ClusterState = "new"
-		cfg.InitialCluster = cfg.InitialClusterFromName(cfg.Name)
-	} else {
-		cfg.ClusterState = "existing"
-	}
-
+	// Create log converter. This will ingest etcd logs and pipe them out to this
+	// runnable's leveled logging facilities.
+	//
+	// TODO(q3k): add support for streaming to a sub-logger in the tree to get
+	// cleaner logs.
 	converter := unraw.Converter{
-		Parser: parseEtcdLogEntry,
-		// The initial line from a starting etcd instance is fairly long.
+		Parser:            parseEtcdLogEntry,
 		MaximumLineLength: 8192,
 		LeveledLogger:     supervisor.Logger(ctx),
 	}
 	fifoPath := s.config.Ephemeral.ServerLogsFIFO.FullPath()
 	pipe, err := converter.NamedPipeReader(fifoPath)
 	if err != nil {
-		return nil, fmt.Errorf("could not get named pipe reader: %w", err)
+		return fmt.Errorf("when creating pipe reader: %w", err)
 	}
-	if err := supervisor.Run(ctx, "pipe", pipe); err != nil {
-		return nil, fmt.Errorf("could not start server log reader: %w", err)
+	if err := supervisor.Run(ctx, "piper", pipe); err != nil {
+		return fmt.Errorf("when starting log piper: %w", err)
 	}
 
-	// TODO(q3k): pipe logs from etcd to supervisor.RawLogger via a file.
-	cfg.Logger = DefaultLogger
-	cfg.LogOutputs = []string{fifoPath}
+	// Create autopromoter, which will automatically promote all learners to full
+	// etcd members.
+	if err := supervisor.Run(ctx, "autopromoter", s.autopromoter); err != nil {
+		return fmt.Errorf("when starting autopromtoer: %w", err)
+	}
 
-	return cfg, nil
+	// Create selfupdater, which will perform a one-shot update of this member's
+	// peer address in etcd.
+	if err := supervisor.Run(ctx, "selfupadater", s.selfupdater); err != nil {
+		return fmt.Errorf("when starting selfupdater: %w", err)
+	}
+
+	// Prepare cluster PKI credentials.
+	ppki := s.config.Data.PeerPKI
+	jc := s.config.JoinCluster
+	if jc != nil {
+		// For nodes that join an existing cluster, or re-join it, always write whatever
+		// we've been given on startup.
+		if err := ppki.WriteAll(jc.NodeCertificate.Raw, s.config.NodePrivateKey, jc.CACertificate.Raw); err != nil {
+			return fmt.Errorf("when writing credentials for join: %w", err)
+		}
+		if err := s.config.Data.PeerCRL.Write(jc.InitialCRL.Raw, 0400); err != nil {
+			return fmt.Errorf("when writing CRL for join: %w", err)
+		}
+	} else {
+		// For other nodes, we should already have credentials from a previous join, or
+		// a previous bootstrap. If none exist, assume we need to bootstrap these
+		// credentials.
+		//
+		// TODO(q3k): once we have node join (ie. node restart from disk) flow, add a
+		// special configuration marker to prevent spurious bootstraps.
+		absent, err := ppki.AllAbsent()
+		if err != nil {
+			return fmt.Errorf("when checking for PKI file absence: %w", err)
+		}
+		if absent {
+			if err := s.bootstrap(ctx, fifoPath); err != nil {
+				return fmt.Errorf("bootstrap failed: %w", err)
+			}
+		} else {
+			supervisor.Logger(ctx).Info("PKI data present, not bootstrapping.")
+		}
+	}
+
+	// Start etcd ...
+	cfg := s.config.build(true)
+	server, err := embed.StartEtcd(cfg)
+	if err != nil {
+		return fmt.Errorf("when starting etcd: %w", err)
+	}
+
+	// ... wait for server to be ready...
+	select {
+	case <-ctx.Done():
+		return ctx.Err()
+	case <-server.Server.ReadyNotify():
+	}
+
+	// ... build a client to its' socket...
+	cl, err := s.config.localClient()
+	if err != nil {
+		return fmt.Errorf("getting local client failed: %w", err)
+	}
+
+	// ... and wait until we're not a learner anymore.
+	for {
+		members, err := cl.MemberList(ctx)
+		if err != nil {
+			supervisor.Logger(ctx).Warningf("MemberList failed: %v", err)
+			time.Sleep(time.Second)
+			continue
+		}
+
+		isMember := false
+		for _, member := range members.Members {
+			if member.ID != uint64(server.Server.ID()) {
+				continue
+			}
+			if !member.IsLearner {
+				isMember = true
+				break
+			}
+		}
+		if isMember {
+			break
+		}
+		supervisor.Logger(ctx).Warningf("Still a learner, waiting...")
+		time.Sleep(time.Second)
+	}
+
+	// All done! Report status.
+	supervisor.Logger(ctx).Infof("etcd server ready")
+
+	st := &Status{
+		localPeerURL:  cfg.APUrls[0].String(),
+		localMemberID: uint64(server.Server.ID()),
+		cl:            cl,
+		ca:            s.ca,
+	}
+	s.value.Set(st)
+
+	// Wait until server dies for whatever reason, update status when that
+	// happens.
+	supervisor.Signal(ctx, supervisor.SignalHealthy)
+	select {
+	case err = <-server.Err():
+		err = fmt.Errorf("server returned error: %w", err)
+	case <-ctx.Done():
+		server.Close()
+		err = ctx.Err()
+	}
+	st.stopped = true
+	s.value.Set(st)
+	return err
 }
 
-// Run is a Supervisor runnable that starts the etcd member service. It will
-// become healthy once the member joins the cluster successfully.
-func (s *Service) Run(ctx context.Context) error {
-	st := &state{
-		ready: *atomic.NewBool(false),
-	}
-	s.stateMu.Lock()
-	s.state = st
-	s.stateMu.Unlock()
-
-	if s.config.NewCluster {
-		// Create certificate if absent. It can only be present if we attempt
-		// to re-start the service in NewCluster after a failure. This can
-		// happen if etcd crashed or failed to start up before (eg. because of
-		// networking not having settled yet).
-		absent, err := s.config.Data.PeerPKI.AllAbsent()
+func clientFor(kv *clientv3.Client, parts ...string) (client.Namespaced, error) {
+	var err error
+	namespaced := client.NewLocal(kv)
+	for _, el := range parts {
+		namespaced, err = namespaced.Sub(el)
 		if err != nil {
-			return fmt.Errorf("checking certificate existence: %w", err)
+			return nil, fmt.Errorf("when getting sub client: %w", err)
 		}
 
-		if absent {
-			// Generate CA, keep in memory, write it down in etcd later.
-			st.ca, err = ca.New("Metropolis etcd peer Root CA")
-			if err != nil {
-				return fmt.Errorf("when creating new cluster's peer CA: %w", err)
-			}
-
-			cert, key, err := st.ca.Issue(ctx, nil, s.config.Name)
-			if err != nil {
-				return fmt.Errorf("when issuing new cluster's first certificate: %w", err)
-			}
-
-			if err := s.config.Data.PeerPKI.MkdirAll(0700); err != nil {
-				return fmt.Errorf("when creating PKI directory: %w", err)
-			}
-			if err := s.config.Data.PeerPKI.CACertificate.Write(pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: st.ca.CACertRaw}), 0600); err != nil {
-				return fmt.Errorf("when writing CA certificate to disk: %w", err)
-			}
-			if err := s.config.Data.PeerPKI.Certificate.Write(pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: cert}), 0600); err != nil {
-				return fmt.Errorf("when writing certificate to disk: %w", err)
-			}
-			if err := s.config.Data.PeerPKI.Key.Write(pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: key}), 0600); err != nil {
-				return fmt.Errorf("when writing certificate to disk: %w", err)
-			}
-		}
 	}
+	return namespaced, nil
+}
 
-	// Expect certificate to be present on disk.
-	present, err := s.config.Data.PeerPKI.AllExist()
-	if err != nil {
-		return fmt.Errorf("checking certificate existence: %w", err)
-	}
-	if !present {
-		return fmt.Errorf("etcd starting without fully ready certificates - aborted NewCluster or corrupted local storage?")
-	}
+// bootstrap performs a procedure to resolve the following bootstrap problems:
+// in order to start an etcd server for consensus, we need it to serve over TLS.
+// However, these TLS certificates also need to be stored in etcd so that
+// further certificates can be issued for new nodes.
+//
+// This was previously solved by a using a special PKI/TLS management system that
+// could first create certificates and keys in memory, then only commit them to
+// etcd. However, this ended up being somewhat brittle in the face of startup
+// sequencing issues, so we're now going with a different approach.
+//
+// This function starts an etcd instance first without any PKI/TLS support,
+// without listening on any external port for peer traffic. Once the instance is
+// running, it uses the standard metropolis pki library to create all required
+// data directly in the running etcd instance. It then writes all required
+// startup data (node private key, member certificate, CA certificate) to disk,
+// so that a 'full' etcd instance can be started.
+func (s *Service) bootstrap(ctx context.Context, fifoPath string) error {
+	supervisor.Logger(ctx).Infof("Bootstrapping PKI: starting etcd...")
 
-	cfg, err := s.configure(ctx)
-	if err != nil {
-		return fmt.Errorf("when configuring etcd: %w", err)
-	}
+	cfg := s.config.build(false)
+	// This will make etcd create data directories and create a fully new cluster if
+	// needed. If we're restarting due to an error, the old cluster data will still
+	// exist.
+	cfg.ClusterState = "new"
 
+	// Start the bootstrap etcd instance...
 	server, err := embed.StartEtcd(cfg)
-	keep := false
-	defer func() {
-		if !keep && server != nil {
-			server.Close()
-		}
-	}()
 	if err != nil {
 		return fmt.Errorf("failed to start etcd: %w", err)
 	}
-	st.etcd = server
 
-	okay := true
+	// ... wait for it to run ...
 	select {
-	case <-st.etcd.Server.ReadyNotify():
+	case <-server.Server.ReadyNotify():
 	case <-ctx.Done():
-		okay = false
+		return fmt.Errorf("when waiting for bootstrap etcd: %w", err)
 	}
 
-	if !okay {
-		supervisor.Logger(ctx).Info("context done, aborting wait")
-		return ctx.Err()
-	}
-
-	socket := s.config.Ephemeral.ClientSocket.FullPath()
-	cl, err := clientv3.New(clientv3.Config{
-		Endpoints:   []string{fmt.Sprintf("unix://%s:0", socket)},
-		DialTimeout: time.Second,
-	})
+	// ... create a client to it ...
+	cl, err := s.config.localClient()
 	if err != nil {
-		return fmt.Errorf("failed to connect to new etcd instance: %w", err)
-	}
-	st.cl = cl
-
-	if s.config.NewCluster {
-		if st.ca == nil {
-			panic("peerCA has not been generated")
-		}
-
-		// Save new CA into etcd.
-		err = st.ca.Save(ctx, cl.KV)
-		if err != nil {
-			return fmt.Errorf("failed to save new CA to etcd: %w", err)
-		}
-	} else {
-		// Load existing CA from etcd.
-		st.ca, err = ca.Load(ctx, cl.KV)
-		if err != nil {
-			return fmt.Errorf("failed to load CA from etcd: %w", err)
-		}
+		return fmt.Errorf("when getting bootstrap client: %w", err)
 	}
 
-	// Start CRL watcher.
-	if err := supervisor.Run(ctx, "crl", s.watchCRL); err != nil {
-		return fmt.Errorf("failed to start CRL watcher: %w", err)
+	// ... and build PKI there. This is idempotent, so we will never override
+	// anything that's already in the cluster, instead just retrieve it.
+	supervisor.Logger(ctx).Infof("Bootstrapping PKI: etcd running, building PKI...")
+	clPKI, err := clientFor(cl, "namespaced", "etcd-pki")
+	if err != nil {
+		return fmt.Errorf("when getting pki client: %w", err)
 	}
-	// Start autopromoter.
-	if err := supervisor.Run(ctx, "autopromoter", s.autopromoter); err != nil {
-		return fmt.Errorf("failed to start autopromoter: %w", err)
+	defer clPKI.Close()
+	caCert, err := s.ca.Ensure(ctx, clPKI)
+	if err != nil {
+		return fmt.Errorf("failed to ensure CA certificate: %w", err)
 	}
 
-	supervisor.Logger(ctx).Info("etcd is now ready")
-	keep = true
-	st.ready.Store(true)
-	supervisor.Signal(ctx, supervisor.SignalHealthy)
+	// If we're running with a test overridden external address (eg. localhost), we
+	// need to also make that part of the member certificate.
+	var extraNames []string
+	if external := s.config.testOverrides.externalAddress; external != "" {
+		extraNames = []string{external}
+	}
+	memberTemplate := pki.Certificate{
+		Name:      identity.NodeID(s.config.nodePublicKey()),
+		Namespace: &pkiNamespace,
+		Issuer:    s.ca,
+		Template:  pkiPeerCertificate(s.config.nodePublicKey(), extraNames),
+		Mode:      pki.CertificateExternal,
+		PublicKey: s.config.nodePublicKey(),
+	}
+	memberCert, err := memberTemplate.Ensure(ctx, clPKI)
+	if err != nil {
+		return fmt.Errorf("failed to ensure member certificate: %w", err)
+	}
 
-	<-ctx.Done()
-	st.etcd.Close()
+	// Retrieve CRL.
+	crlW := s.ca.WatchCRL(clPKI)
+	crl, err := crlW.Get(ctx)
+	if err != nil {
+		return fmt.Errorf("failed to retrieve initial CRL: %w", err)
+	}
+
+	// We have everything we need. Write things to disk.
+	supervisor.Logger(ctx).Infof("Bootstrapping PKI: certificates issued, writing to disk...")
+
+	if err := s.config.Data.PeerPKI.WriteAll(memberCert, s.config.NodePrivateKey, caCert); err != nil {
+		return fmt.Errorf("failed to write bootstrapped certificates: %w", err)
+	}
+	if err := s.config.Data.PeerCRL.Write(crl.Raw, 0400); err != nil {
+		return fmt.Errorf("failed tow rite CRL: %w", err)
+	}
+
+	// Stop the server synchronously (blocking until it's fully shutdown), and
+	// return. The caller can now run the 'full' etcd instance with PKI.
+	supervisor.Logger(ctx).Infof("Bootstrapping PKI: done, stopping server...")
+	server.Close()
 	return ctx.Err()
 }
 
-// watchCRL is a sub-runnable of the etcd cluster member service that updates
-// the on-local-storage CRL to match the newest available version in etcd.
-func (s *Service) watchCRL(ctx context.Context) error {
-	s.stateMu.Lock()
-	cl := s.state.cl
-	ca := s.state.ca
-	s.stateMu.Unlock()
-
-	supervisor.Signal(ctx, supervisor.SignalHealthy)
-	for e := range ca.WaitCRLChange(ctx, cl.KV, cl.Watcher) {
-		if e.Err != nil {
-			return fmt.Errorf("watching CRL: %w", e.Err)
-		}
-
-		if err := s.config.Data.PeerCRL.Write(e.CRL, 0600); err != nil {
-			return fmt.Errorf("saving CRL: %w", err)
-		}
-	}
-
-	// unreachable
-	return nil
-}
-
+// autopromoter is a runnable which repeatedly attempts to promote etcd learners
+// in the cluster to full followers. This is needed to bring any new cluster
+// members (which are always added as learners) to full membership and make them
+// part of the etcd quorum.
 func (s *Service) autopromoter(ctx context.Context) error {
-	t := time.NewTicker(5 * time.Second)
-	defer t.Stop()
-
-	autopromote := func() {
-		s.stateMu.Lock()
-		st := s.state
-		s.stateMu.Unlock()
-
-		if st.etcd.Server.Leader() != st.etcd.Server.ID() {
+	autopromote := func(ctx context.Context, cl *clientv3.Client) {
+		// Only autopromote if our endpoint is a leader. This is a bargain bin version
+		// of leader election: it's simple and cheap, but not very reliable. The most
+		// obvious failure mode is that the instance we contacted isn't a leader by the
+		// time we promote a member, but that's fine - the promotion is idempotent. What
+		// we really use the 'leader election' here for isn't for consistency, but to
+		// prevent the cluster from being hammered by spurious leadership promotion
+		// requests from every etcd member.
+		status, err := cl.Status(ctx, cl.Endpoints()[0])
+		if err != nil {
+			supervisor.Logger(ctx).Warningf("Failed to get endpoint status: %v", err)
+		}
+		if status.Leader != status.Header.MemberId {
 			return
 		}
 
-		for _, member := range st.etcd.Server.Cluster().Members() {
+		members, err := cl.MemberList(ctx)
+		if err != nil {
+			supervisor.Logger(ctx).Warningf("Failed to list members: %v", err)
+			return
+		}
+		for _, member := range members.Members {
 			if !member.IsLearner {
 				continue
 			}
-
-			// We always call PromoteMember since the metadata necessary to
-			// decide if we should is private.  Luckily etcd already does
-			// sanity checks internally and will refuse to promote nodes that
-			// aren't connected or are still behind on transactions.
-			if _, err := st.etcd.Server.PromoteMember(ctx, uint64(member.ID)); err != nil {
+			// Always call PromoteMember since the metadata necessary to decide if we should
+			// is private. Luckily etcd already does consistency checks internally and will
+			// refuse to promote nodes that aren't connected or are still behind on
+			// transactions.
+			if _, err := cl.MemberPromote(ctx, member.ID); err != nil {
 				supervisor.Logger(ctx).Infof("Failed to promote consensus node %s: %v", member.Name, err)
 			} else {
 				supervisor.Logger(ctx).Infof("Promoted new consensus node %s", member.Name)
@@ -374,80 +452,49 @@
 		}
 	}
 
+	w := s.Watch()
 	for {
-		select {
-		case <-ctx.Done():
-			return ctx.Err()
-		case <-t.C:
-			autopromote()
+		st, err := w.Get(ctx)
+		if err != nil {
+			return fmt.Errorf("status get failed: %w", err)
 		}
-	}
-}
-
-// IsReady returns whether etcd is ready and synced
-func (s *Service) IsReady() bool {
-	s.stateMu.Lock()
-	defer s.stateMu.Unlock()
-	if s.state == nil {
-		return false
-	}
-	return s.state.ready.Load()
-}
-
-func (s *Service) WaitReady(ctx context.Context) error {
-	// TODO(q3k): reimplement the atomic ready flag as an event synchronization
-	// mechanism
-	if s.IsReady() {
-		return nil
-	}
-	t := time.NewTicker(100 * time.Millisecond)
-	defer t.Stop()
-	for {
-		select {
-		case <-ctx.Done():
-			return ctx.Err()
-		case <-t.C:
-			if s.IsReady() {
-				return nil
+		t := time.NewTicker(5 * time.Second)
+		for {
+			autopromote(ctx, st.cl)
+			select {
+			case <-ctx.Done():
+				t.Stop()
+				return ctx.Err()
+			case <-t.C:
 			}
 		}
 	}
 }
 
-func (s *Service) Client() client.Namespaced {
-	s.stateMu.Lock()
-	defer s.stateMu.Unlock()
-	// 'namespaced' is the root of all namespaced clients within the etcd K/V
-	// store, with further paths in a colon-separated format, eg.:
-	//   namespaced:example/
-	//   namespaced:foo:bar:baz/
-	client, err := client.NewLocal(s.state.cl).Sub("namespaced")
-	if err != nil {
-		// This error can only happen due to a malformed path, which is
-		// constant. Thus, this is a programming error and we panic.
-		panic(fmt.Errorf("Could not get consensus etcd client: %v", err))
+// selfupdater is a runnable that performs a one-shot (once per Service Run,
+// thus once for each configuration) update of the node's Peer URL in etcd. This
+// is currently only really needed because the first node in the cluster
+// bootstraps itself without any peer URLs at first, and this allows it to then
+// add the peer URLs afterwards. Instead of a runnable, this might as well have
+// been part of the bootstarp logic, but making it a restartable runnable is
+// more robust.
+func (s *Service) selfupdater(ctx context.Context) error {
+	supervisor.Signal(ctx, supervisor.SignalHealthy)
+	w := s.Watch()
+	for {
+		st, err := w.Get(ctx)
+		if err != nil {
+			return fmt.Errorf("failed to get status: %w", err)
+		}
+
+		peerURL := st.localPeerURL
+		if _, err := st.cl.MemberUpdate(ctx, st.localMemberID, []string{peerURL}); err != nil {
+			supervisor.Logger(ctx).Warningf("failed to update member: %v", err)
+			time.Sleep(1 * time.Second)
+			continue
+		}
+
+		supervisor.Signal(ctx, supervisor.SignalDone)
+		return nil
 	}
-	return client
-}
-
-func (s *Service) Cluster() clientv3.Cluster {
-	s.stateMu.Lock()
-	defer s.stateMu.Unlock()
-	return s.state.cl.Cluster
-}
-
-// MemberInfo returns information about this etcd cluster member: its ID and
-// name. This will block until this information is available (ie. the cluster
-// status is Ready).
-func (s *Service) MemberInfo(ctx context.Context) (id uint64, name string, err error) {
-	if err = s.WaitReady(ctx); err != nil {
-		err = fmt.Errorf("when waiting for cluster readiness: %w", err)
-		return
-	}
-
-	s.stateMu.Lock()
-	defer s.stateMu.Unlock()
-	id = uint64(s.state.etcd.Server.ID())
-	name = s.config.Name
-	return
 }