m/n/core/consensus: refactor for reliability and multinode support

This implements a big refactor of our consensus service/runnable.

First, we move away from the old bespoke API for retrieving the
consensus status (and consensus clients) into using Event Values, as the
rest of the codebase does.

Second, we move away from the bespoke PKI library used to generate
certificates in-memory and then commit them to etcd into using the
standard metropolis pki library. We then change the bootstrap process to
start a PKI-less etcd instance first, generate the PKI data directly on
the running instance, and then restart into a fully PKI-supporting etcd
instance.

We also move away from using etcd-specific private keys into reusing the
node's private key. This makes management slightly easier, but reviewers
should consider the security implications of this change.

Finally, we implement and test multi-member cluster support, which is
done by exposing an AddNode method to the newly exposed status, and a
JoinCluster option in the node configuration.

Change-Id: Iea2bf6114cb699d3792efd45d06de2fa5a48feb1
Reviewed-on: https://review.monogon.dev/c/monogon/+/466
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/node/core/consensus/configuration.go b/metropolis/node/core/consensus/configuration.go
new file mode 100644
index 0000000..1e7cff6
--- /dev/null
+++ b/metropolis/node/core/consensus/configuration.go
@@ -0,0 +1,160 @@
+package consensus
+
+import (
+	"crypto/ed25519"
+	"crypto/x509"
+	"fmt"
+	"net"
+	"net/url"
+	"strconv"
+	"time"
+
+	"go.etcd.io/etcd/clientv3"
+	"go.etcd.io/etcd/embed"
+
+	"source.monogon.dev/metropolis/node"
+	"source.monogon.dev/metropolis/node/core/identity"
+	"source.monogon.dev/metropolis/node/core/localstorage"
+	"source.monogon.dev/metropolis/pkg/pki"
+)
+
+// Config describes the startup configuration of a consensus instance.
+type Config struct {
+	// Data directory (persistent, encrypted storage) for etcd.
+	Data *localstorage.DataEtcdDirectory
+	// Ephemeral directory for etcd.
+	Ephemeral *localstorage.EphemeralConsensusDirectory
+
+	// JoinCluster is set if this instance is to join an existing cluster for the
+	// first time. If not set, it's assumed this instance has ran before and has all
+	// the state on disk required to become part of whatever cluster it was before.
+	// If that data is not present, a new cluster will be bootstrapped.
+	JoinCluster *JoinCluster
+
+	// NodePrivateKey is the node's main private key which is also used for
+	// Metropolis PKI. The same key will be used to identify consensus nodes, but
+	// different certificates will be used.
+	NodePrivateKey ed25519.PrivateKey
+
+	testOverrides testOverrides
+}
+
+// JoinCluster is all the data required for a node to join (for the first time)
+// an already running cluster. This data is available from an already running
+// consensus member by performing AddNode, which is called by the Curator when
+// new etcd nodes are added to the cluster.
+type JoinCluster struct {
+	CACertificate   *x509.Certificate
+	NodeCertificate *x509.Certificate
+	// ExistingNodes are an arbitrarily ordered list of other consensus members that
+	// the node should attempt to contact.
+	ExistingNodes []ExistingNode
+	// InitialCRL is a certificate revocation list for this cluster. After the node
+	// starts, a CRL on disk will be maintained reflecting the PKI state within etcd.
+	InitialCRL *pki.CRL
+}
+
+// ExistingNode is the peer URL and name of an already running consensus instance.
+type ExistingNode struct {
+	Name string
+	URL  string
+}
+
+func (e *ExistingNode) connectionString() string {
+	return fmt.Sprintf("%s=%s", e.Name, e.URL)
+}
+
+func (c *Config) nodePublicKey() ed25519.PublicKey {
+	return c.NodePrivateKey.Public().(ed25519.PublicKey)
+}
+
+// testOverrides are available to test code to make some things easier in a test
+// environment.
+type testOverrides struct {
+	// externalPort overrides the default port used by the node.
+	externalPort int
+	// externalAddress overrides the address of the node, which is usually its ID.
+	externalAddress string
+}
+
+// build takes a Config and returns an etcd embed.Config.
+//
+// enablePeers selects whether the etcd instance will listen for peer traffic
+// over TLS. This requires TLS credentials to be present on disk, and will be
+// disabled for bootstrapping the instance.
+func (c *Config) build(enablePeers bool) *embed.Config {
+	nodeID := identity.NodeID(c.nodePublicKey())
+	port := int(node.ConsensusPort)
+	if p := c.testOverrides.externalPort; p != 0 {
+		port = p
+	}
+	host := nodeID
+	var extraNames []string
+	if c.testOverrides.externalAddress != "" {
+		host = c.testOverrides.externalAddress
+		extraNames = append(extraNames, host)
+	}
+
+	cfg := embed.NewConfig()
+
+	cfg.Name = nodeID
+	cfg.ClusterState = "existing"
+	cfg.InitialClusterToken = "METROPOLIS"
+	cfg.Logger = "zap"
+	cfg.LogOutputs = []string{c.Ephemeral.ServerLogsFIFO.FullPath()}
+
+	cfg.Dir = c.Data.Data.FullPath()
+
+	// Client URL, ie. local UNIX socket to listen on for trusted, unauthenticated
+	// traffic.
+	cfg.LCUrls = []url.URL{{
+		Scheme: "unix",
+		Path:   c.Ephemeral.ClientSocket.FullPath() + ":0",
+	}}
+
+	if enablePeers {
+		cfg.PeerTLSInfo.CertFile = c.Data.PeerPKI.Certificate.FullPath()
+		cfg.PeerTLSInfo.KeyFile = c.Data.PeerPKI.Key.FullPath()
+		cfg.PeerTLSInfo.TrustedCAFile = c.Data.PeerPKI.CACertificate.FullPath()
+		cfg.PeerTLSInfo.ClientCertAuth = true
+		cfg.PeerTLSInfo.CRLFile = c.Data.PeerCRL.FullPath()
+
+		cfg.LPUrls = []url.URL{{
+			Scheme: "https",
+			Host:   fmt.Sprintf("[::]:%d", port),
+		}}
+		cfg.APUrls = []url.URL{{
+			Scheme: "https",
+			Host:   net.JoinHostPort(host, strconv.Itoa(port)),
+		}}
+	} else {
+		// When not enabling peer traffic, listen on loopback. We would not listen at
+		// all, but etcd seems to prevent us from doing that.
+		cfg.LPUrls = []url.URL{{
+			Scheme: "http",
+			Host:   fmt.Sprintf("127.0.0.1:%d", port),
+		}}
+		cfg.APUrls = []url.URL{{
+			Scheme: "http",
+			Host:   fmt.Sprintf("127.0.0.1:%d", port),
+		}}
+	}
+
+	cfg.InitialCluster = cfg.InitialClusterFromName(nodeID)
+	if c.JoinCluster != nil {
+		for _, n := range c.JoinCluster.ExistingNodes {
+			cfg.InitialCluster += "," + n.connectionString()
+		}
+	}
+	return cfg
+}
+
+// localClient returns an etcd client connected to the socket as configured in
+// Config.
+func (c *Config) localClient() (*clientv3.Client, error) {
+	socket := c.Ephemeral.ClientSocket.FullPath()
+	return clientv3.New(clientv3.Config{
+		Endpoints:   []string{fmt.Sprintf("unix://%s:0", socket)},
+		DialTimeout: time.Second,
+	})
+}