m/n/core/consensus: refactor for reliability and multinode support
This implements a big refactor of our consensus service/runnable.
First, we move away from the old bespoke API for retrieving the
consensus status (and consensus clients) into using Event Values, as the
rest of the codebase does.
Second, we move away from the bespoke PKI library used to generate
certificates in-memory and then commit them to etcd into using the
standard metropolis pki library. We then change the bootstrap process to
start a PKI-less etcd instance first, generate the PKI data directly on
the running instance, and then restart into a fully PKI-supporting etcd
instance.
We also move away from using etcd-specific private keys into reusing the
node's private key. This makes management slightly easier, but reviewers
should consider the security implications of this change.
Finally, we implement and test multi-member cluster support, which is
done by exposing an AddNode method to the newly exposed status, and a
JoinCluster option in the node configuration.
Change-Id: Iea2bf6114cb699d3792efd45d06de2fa5a48feb1
Reviewed-on: https://review.monogon.dev/c/monogon/+/466
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/node/core/consensus/status.go b/metropolis/node/core/consensus/status.go
new file mode 100644
index 0000000..04b13fa
--- /dev/null
+++ b/metropolis/node/core/consensus/status.go
@@ -0,0 +1,170 @@
+package consensus
+
+import (
+ "context"
+ "crypto/ed25519"
+ "crypto/x509"
+ "fmt"
+ "net"
+ "strconv"
+
+ "go.etcd.io/etcd/clientv3"
+
+ "source.monogon.dev/metropolis/node"
+ "source.monogon.dev/metropolis/node/core/consensus/client"
+ "source.monogon.dev/metropolis/node/core/identity"
+ "source.monogon.dev/metropolis/pkg/event"
+ "source.monogon.dev/metropolis/pkg/pki"
+)
+
+// Watch returns a Event Value compatible Watcher for accessing the State of the
+// consensus Service in a safe manner.
+func (s *Service) Watch() Watcher {
+ return Watcher{s.value.Watch()}
+}
+
+type Watcher struct {
+ event.Watcher
+}
+
+func (w *Watcher) Get(ctx context.Context, opts ...event.GetOption) (*Status, error) {
+ v, err := w.Watcher.Get(ctx, opts...)
+ if err != nil {
+ return nil, err
+ }
+ return v.(*Status), nil
+}
+
+// Status of the consensus service. It represents either a running consensus
+// service to which a client can connect and on which management can be
+// performed, or a stopped service.
+type Status struct {
+ localPeerURL string
+ localMemberID uint64
+ cl *clientv3.Client
+ ca *pki.Certificate
+ stopped bool
+}
+
+// Running returns true if this status represents a running consensus service
+// which can be connected to or managed. These calls are not guaranteed to
+// succeed (as the server might have stopped in the meantime), but the caller
+// can use this value as a hint to whether attempts to access the consensus
+// service should be done.
+func (s *Status) Running() bool {
+ return !s.stopped
+}
+
+func (s *Status) pkiClient() (client.Namespaced, error) {
+ return clientFor(s.cl, "namespaced", "etcd-pki")
+}
+
+// MetropolisClient returns a namespaced etcd client for use by the rest of the
+// metropolis code (thtough the cluster bootstrap code). This method is
+// deprecated, and will be replaced with more granular clients as the cluster
+// bootstrap code gets refactored.
+func (s *Status) MetropolisClient() (client.Namespaced, error) {
+ return clientFor(s.cl, "namespaced", "metropolis")
+}
+
+// AddNode creates a new consensus member corresponding to a given Ed25519 node
+// public key if one does not yet exist. The member will at first be marked as a
+// Learner, ensuring it does not take part in quorum until it has finished
+// catching up to the state of the etcd store. As it does, the autopromoter will
+// turn it into a 'full' node and it will start taking part in the quorum and be
+// able to perform all etcd operations.
+func (s *Status) AddNode(ctx context.Context, pk ed25519.PublicKey, opts ...*AddNodeOption) (*JoinCluster, error) {
+ clPKI, err := s.pkiClient()
+ if err != nil {
+ return nil, err
+ }
+
+ nodeID := identity.NodeID(pk)
+ var extraNames []string
+ name := nodeID
+ port := int(node.ConsensusPort)
+ for _, opt := range opts {
+ if opt.externalAddress != "" {
+ name = opt.externalAddress
+ extraNames = append(extraNames, name)
+ }
+ if opt.externalPort != 0 {
+ port = opt.externalPort
+ }
+ }
+
+ member := pki.Certificate{
+ Name: nodeID,
+ Namespace: &pkiNamespace,
+ Issuer: s.ca,
+ Template: pkiPeerCertificate(pk, extraNames),
+ Mode: pki.CertificateExternal,
+ PublicKey: pk,
+ }
+ caBytes, err := s.ca.Ensure(ctx, clPKI)
+ if err != nil {
+ return nil, fmt.Errorf("could not ensure CA certificate: %w", err)
+ }
+ memberBytes, err := member.Ensure(ctx, clPKI)
+ if err != nil {
+ return nil, fmt.Errorf("could not ensure member certificate: %w", err)
+ }
+ caCert, err := x509.ParseCertificate(caBytes)
+ if err != nil {
+ return nil, fmt.Errorf("could not parse CA certificate: %w", err)
+ }
+ memberCert, err := x509.ParseCertificate(memberBytes)
+ if err != nil {
+ return nil, fmt.Errorf("could not parse newly issued member certificate: %w", err)
+ }
+
+ members, err := s.cl.MemberList(ctx)
+ if err != nil {
+ return nil, fmt.Errorf("could not retrieve existing members: %w", err)
+ }
+
+ var existingNodes []ExistingNode
+ var newExists bool
+ for _, m := range members.Members {
+ if m.Name == nodeID {
+ newExists = true
+ }
+ if m.IsLearner {
+ continue
+ }
+ if len(m.PeerURLs) < 1 {
+ continue
+ }
+ existingNodes = append(existingNodes, ExistingNode{
+ Name: m.Name,
+ URL: m.PeerURLs[0],
+ })
+ }
+
+ crlW := s.ca.WatchCRL(clPKI)
+ crl, err := crlW.Get(ctx)
+ if err != nil {
+ return nil, fmt.Errorf("could not retrieve initial CRL: %w", err)
+ }
+
+ if !newExists {
+ addr := fmt.Sprintf("https://%s", net.JoinHostPort(name, strconv.Itoa(port)))
+ if _, err := s.cl.MemberAddAsLearner(ctx, []string{addr}); err != nil {
+ return nil, fmt.Errorf("could not add new member as learner: %w", err)
+ }
+ }
+
+ return &JoinCluster{
+ CACertificate: caCert,
+ NodeCertificate: memberCert,
+ ExistingNodes: existingNodes,
+ InitialCRL: crl,
+ }, nil
+}
+
+// AddNodeOptions can be passed to AddNode to influence the behaviour of the
+// function. Currently this is only used internally by tests.
+type AddNodeOption struct {
+ externalAddress string
+ externalPort int
+}