m/n/core/{curator,cluster}: refactor against new Consensus API
This updates the Curator and the Cluster Manager to use the new
Consensus API, notably to use JoinParameters and ServiceHandle.Watch.
Using JoinParameters end-to-end requires piping them through a node's
roles. For this we create a new ConsensusMember role and replicate all
the data from JoinParameters there.
We also move a whole bunch of logic that used to live in the Cluster
Manager's Status object away from it. Instead, now the Consensus
ServiceHandle is exposed directly to downstream users, providing the
same functionality.
Change-Id: I8cfa247011554553836019f60ea172dd6069f49c
Reviewed-on: https://review.monogon.dev/c/monogon/+/522
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/node/core/cluster/cluster.go b/metropolis/node/core/cluster/cluster.go
index fe29abd..ea3bfee 100644
--- a/metropolis/node/core/cluster/cluster.go
+++ b/metropolis/node/core/cluster/cluster.go
@@ -120,12 +120,17 @@
switch inner := params.Cluster.(type) {
case *apb.NodeParameters_ClusterBootstrap_:
- return m.bootstrap(ctx, inner.ClusterBootstrap)
+ err = m.bootstrap(ctx, inner.ClusterBootstrap)
case *apb.NodeParameters_ClusterRegister_:
- return m.register(ctx, inner.ClusterRegister)
+ err = m.register(ctx, inner.ClusterRegister)
default:
- return fmt.Errorf("node parameters misconfigured: neither cluster_bootstrap nor cluster_register set")
+ err = fmt.Errorf("node parameters misconfigured: neither cluster_bootstrap nor cluster_register set")
}
+
+ if err == nil {
+ supervisor.Logger(ctx).Info("Cluster enrolment done.")
+ }
+ return err
}
func (m *Manager) nodeParamsFWCFG(ctx context.Context) (*apb.NodeParameters, error) {
diff --git a/metropolis/node/core/cluster/cluster_bootstrap.go b/metropolis/node/core/cluster/cluster_bootstrap.go
index d83913d..c5167a5 100644
--- a/metropolis/node/core/cluster/cluster_bootstrap.go
+++ b/metropolis/node/core/cluster/cluster_bootstrap.go
@@ -86,7 +86,7 @@
return fmt.Errorf("when starting consensus: %w", err)
}
- var metropolisKV client.Namespaced
+ var ckv client.Namespaced
cw := m.consensus.Watch()
for {
st, err := cw.Get(ctx)
@@ -96,42 +96,20 @@
if !st.Running() {
continue
}
- metropolisKV, err = st.MetropolisClient()
+ ckv, err = st.CuratorClient()
if err != nil {
return fmt.Errorf("when retrieving curator client")
}
break
}
- status := Status{
- State: cpb.ClusterState_CLUSTER_STATE_HOME,
- HasLocalConsensus: true,
- consensusClient: metropolisKV,
- // Credentials are set further down once created through a curator
- // short-circuit bootstrap function.
- Credentials: nil,
- }
-
- // Short circuit curator into storing the new node.
- ckv, err := status.ConsensusClient(ConsensusUserCurator)
+ node.EnableKubernetesWorker()
+ caCertBytes, nodeCertBytes, err := curator.BootstrapNodeFinish(ctx, ckv, &node, ownerKey)
if err != nil {
- return fmt.Errorf("when retrieving consensus user for curator: %w", err)
- }
-
- if err := curator.BootstrapFinish(ctx, ckv, &node, ownerKey); err != nil {
return fmt.Errorf("failed to finish bootstrap: %w", err)
}
- // And short-circuit creating the curator CA and node certificate.
- caCert, nodeCert, err := curator.BootstrapNodeCredentials(ctx, ckv, pub)
- if err != nil {
- return fmt.Errorf("failed to bootstrap node credentials: %w", err)
- }
-
- // Using the short-circuited credentials from the curator, build our
- // NodeCredentials. That, and the public part of the credentials
- // (NodeCertificate) are the primary output of the cluster manager.
- creds, err := identity.NewNodeCredentials(priv, nodeCert, caCert)
+ creds, err := identity.NewNodeCredentials(priv, nodeCertBytes, caCertBytes)
if err != nil {
return fmt.Errorf("failed to use newly bootstrapped node credentials: %w", err)
}
@@ -146,8 +124,11 @@
return fmt.Errorf("failed to write node credentials: %w", err)
}
- status.Credentials = creds
- m.status.Set(status)
+ m.status.Set(Status{
+ State: cpb.ClusterState_CLUSTER_STATE_HOME,
+ Consensus: m.consensus,
+ Credentials: creds,
+ })
supervisor.Signal(ctx, supervisor.SignalHealthy)
supervisor.Signal(ctx, supervisor.SignalDone)
diff --git a/metropolis/node/core/cluster/cluster_register.go b/metropolis/node/core/cluster/cluster_register.go
index 81db8c9..1a413c6 100644
--- a/metropolis/node/core/cluster/cluster_register.go
+++ b/metropolis/node/core/cluster/cluster_register.go
@@ -133,9 +133,8 @@
return fmt.Errorf("NewNodeCredentials failed after receiving certificate from cluster: %w", err)
}
status := Status{
- State: cpb.ClusterState_CLUSTER_STATE_HOME,
- HasLocalConsensus: false,
- Credentials: creds,
+ State: cpb.ClusterState_CLUSTER_STATE_HOME,
+ Credentials: creds,
}
m.status.Set(status)
supervisor.Signal(ctx, supervisor.SignalHealthy)
diff --git a/metropolis/node/core/cluster/status.go b/metropolis/node/core/cluster/status.go
index 481c818..da898bf 100644
--- a/metropolis/node/core/cluster/status.go
+++ b/metropolis/node/core/cluster/status.go
@@ -2,9 +2,8 @@
import (
"errors"
- "fmt"
- "source.monogon.dev/metropolis/node/core/consensus/client"
+ "source.monogon.dev/metropolis/node/core/consensus"
"source.monogon.dev/metropolis/node/core/identity"
cpb "source.monogon.dev/metropolis/proto/common"
)
@@ -18,60 +17,11 @@
// State is the current state of the cluster, as seen by the node.
State cpb.ClusterState
- // hasLocalConsensus is true if the local node is running a local consensus
- // (etcd) server.
- HasLocalConsensus bool
- // consensusClient is an etcd client to the local consensus server if the node
- // has such a server and the cluster state is HOME or SPLIT.
- consensusClient client.Namespaced
+ // Consensus is a handle to a running Consensus service, or nil if this node
+ // does not run a Consensus instance.
+ Consensus consensus.ServiceHandle
// Credentials used for the node to authenticate to the Curator and other
// cluster services.
Credentials *identity.NodeCredentials
}
-
-// ConsensusUser is the to-level user of an etcd client in Metropolis node
-// code. These need to be defined ahead of time in an Go 'enum', and different
-// ConsensusUsers should not be shared by different codepaths.
-type ConsensusUser string
-
-const (
- ConsensusUserKubernetesPKI ConsensusUser = "kubernetes-pki"
- ConsensusUserCurator ConsensusUser = "curator"
-)
-
-// ConsensusClient returns an etcd/consensus client for a given ConsensusUser.
-// The node must be running a local consensus/etcd server.
-func (s *Status) ConsensusClient(user ConsensusUser) (client.Namespaced, error) {
- if !s.HasLocalConsensus {
- return nil, ErrNoLocalConsensus
- }
-
- // Ensure that we already are connected to etcd and are in a state in which we
- // should be handing out cluster connectivity.
- if s.consensusClient == nil {
- return nil, fmt.Errorf("not connected")
- }
- switch s.State {
- case cpb.ClusterState_CLUSTER_STATE_HOME:
- case cpb.ClusterState_CLUSTER_STATE_SPLIT:
- // The consensus client is resistant to being split off, and will serve
- // as soon as the split is resolved.
- default:
- return nil, fmt.Errorf("refusing connection with cluster state %v", s.State)
- }
-
- // Ensure only defined 'applications' are used to prevent programmer error and
- // casting to ConsensusUser from an arbitrary string.
- switch user {
- case ConsensusUserKubernetesPKI:
- case ConsensusUserCurator:
- default:
- return nil, fmt.Errorf("unknown ConsensusUser %q", user)
- }
- client, err := s.consensusClient.Sub(string(user))
- if err != nil {
- return nil, fmt.Errorf("retrieving subclient failed: %w", err)
- }
- return client, nil
-}
diff --git a/metropolis/node/core/consensus/BUILD.bazel b/metropolis/node/core/consensus/BUILD.bazel
index f6e30e7..c4f65ec 100644
--- a/metropolis/node/core/consensus/BUILD.bazel
+++ b/metropolis/node/core/consensus/BUILD.bazel
@@ -7,6 +7,7 @@
"consensus.go",
"logparser.go",
"status.go",
+ "testhelpers.go",
],
importpath = "source.monogon.dev/metropolis/node/core/consensus",
visibility = ["//:__subpackages__"],
diff --git a/metropolis/node/core/consensus/consensus.go b/metropolis/node/core/consensus/consensus.go
index f19e923..d4ab964 100644
--- a/metropolis/node/core/consensus/consensus.go
+++ b/metropolis/node/core/consensus/consensus.go
@@ -487,11 +487,16 @@
return fmt.Errorf("failed to get status: %w", err)
}
- peerURL := st.localPeerURL
- if _, err := st.cl.MemberUpdate(ctx, st.localMemberID, []string{peerURL}); err != nil {
- supervisor.Logger(ctx).Warningf("failed to update member: %v", err)
- time.Sleep(1 * time.Second)
- continue
+ if st.localPeerURL != "" {
+ supervisor.Logger(ctx).Infof("Updating local peer URL...")
+ peerURL := st.localPeerURL
+ if _, err := st.cl.MemberUpdate(ctx, st.localMemberID, []string{peerURL}); err != nil {
+ supervisor.Logger(ctx).Warningf("failed to update member: %v", err)
+ time.Sleep(1 * time.Second)
+ continue
+ }
+ } else {
+ supervisor.Logger(ctx).Infof("No local peer URL, not updating.")
}
supervisor.Signal(ctx, supervisor.SignalDone)
diff --git a/metropolis/node/core/consensus/consensus_test.go b/metropolis/node/core/consensus/consensus_test.go
index 8028dd9..b11a053 100644
--- a/metropolis/node/core/consensus/consensus_test.go
+++ b/metropolis/node/core/consensus/consensus_test.go
@@ -94,9 +94,9 @@
if err != nil {
t.Fatalf("Get: %v", err)
}
- cl, err := st.MetropolisClient()
+ cl, err := st.CuratorClient()
if err != nil {
- t.Fatalf("MetropolisClient: %v", err)
+ t.Fatalf("CuratorClient: %v", err)
}
defer cl.Close()
@@ -129,9 +129,9 @@
if err != nil {
t.Fatalf("status get failed: %v", err)
}
- cl, err := st.MetropolisClient()
+ cl, err := st.CuratorClient()
if err != nil {
- t.Fatalf("MetropolisClient: %v", err)
+ t.Fatalf("CuratorClient: %v", err)
}
defer cl.Close()
@@ -177,9 +177,9 @@
if err != nil {
t.Fatalf("status get failed: %v", err)
}
- cl, err = st.MetropolisClient()
+ cl, err = st.CuratorClient()
if err != nil {
- t.Fatalf("MetropolisClient: %v", err)
+ t.Fatalf("CuratorClient: %v", err)
}
defer cl.Close()
@@ -224,9 +224,9 @@
if err != nil {
t.Fatalf("could not get status: %v", err)
}
- cl, err := st.MetropolisClient()
+ cl, err := st.CuratorClient()
if err != nil {
- t.Fatalf("MetropolisClient: %v", err)
+ t.Fatalf("CuratorClient: %v", err)
}
defer cl.Close()
if _, err := cl.Put(b.ctx, "/foo", "bar"); err != nil {
@@ -263,9 +263,9 @@
if err != nil {
t.Fatalf("could not get status: %v", err)
}
- cl2, err := st2.MetropolisClient()
+ cl2, err := st2.CuratorClient()
if err != nil {
- t.Fatalf("MetropolisClient: %v", err)
+ t.Fatalf("CuratorClient: %v", err)
}
defer cl2.Close()
diff --git a/metropolis/node/core/consensus/status.go b/metropolis/node/core/consensus/status.go
index 04b13fa..43a70fd 100644
--- a/metropolis/node/core/consensus/status.go
+++ b/metropolis/node/core/consensus/status.go
@@ -17,6 +17,15 @@
"source.monogon.dev/metropolis/pkg/pki"
)
+// ServiceHandle is implemented by Service and should be the type expected by
+// other code which relies on a Consensus instance. Ie., it's the downstream API
+// for a Consensus Service.
+type ServiceHandle interface {
+ // Watch returns a Event Value compatible Watcher for accessing the State of the
+ // consensus Service in a safe manner.
+ Watch() Watcher
+}
+
// Watch returns a Event Value compatible Watcher for accessing the State of the
// consensus Service in a safe manner.
func (s *Service) Watch() Watcher {
@@ -35,15 +44,38 @@
return v.(*Status), nil
}
+func (w *Watcher) GetRunning(ctx context.Context) (*Status, error) {
+ for {
+ st, err := w.Get(ctx)
+ if err != nil {
+ return nil, err
+ }
+ if st.Running() {
+ return st, nil
+ }
+ }
+}
+
// Status of the consensus service. It represents either a running consensus
// service to which a client can connect and on which management can be
// performed, or a stopped service.
type Status struct {
+ // localPeerURL and localMemberID are the expected public URL and etcd member ID
+ // of the etcd server wrapped by this consensus instance. If set, a sub-runnable
+ // of the consensus will ensure that the given memberID always has localPeerURL
+ // set as its peer URL.
+ //
+ // These will not be set when the Status has been generated by a
+ // testServiceHandle.
localPeerURL string
localMemberID uint64
- cl *clientv3.Client
- ca *pki.Certificate
- stopped bool
+ // cl is the root etcd client to the underlying cluster.
+ cl *clientv3.Client
+ // ca is the PKI CA used to authenticate etcd members.
+ ca *pki.Certificate
+ // stopped is set to true if the underlying service has been stopped or hasn't
+ // yet been started.
+ stopped bool
}
// Running returns true if this status represents a running consensus service
@@ -59,12 +91,21 @@
return clientFor(s.cl, "namespaced", "etcd-pki")
}
-// MetropolisClient returns a namespaced etcd client for use by the rest of the
-// metropolis code (thtough the cluster bootstrap code). This method is
-// deprecated, and will be replaced with more granular clients as the cluster
-// bootstrap code gets refactored.
-func (s *Status) MetropolisClient() (client.Namespaced, error) {
- return clientFor(s.cl, "namespaced", "metropolis")
+// CuratorClient returns a namespaced etcd client for use by the Curator.
+func (s *Status) CuratorClient() (client.Namespaced, error) {
+ return clientFor(s.cl, "namespaced", "curator")
+}
+
+// KubernetesClient returns a namespaced etcd client for use by Kubernetes.
+func (s *Status) KubernetesClient() (client.Namespaced, error) {
+ return clientFor(s.cl, "namespaced", "kubernetes")
+}
+
+// ClusterClient returns an etcd management API client, for use by downstream
+// clients that wish to perform maintenance operations on the etcd cluster (eg.
+// list/modify nodes, promote learners, ...).
+func (s *Status) ClusterClient() clientv3.Cluster {
+ return s.cl
}
// AddNode creates a new consensus member corresponding to a given Ed25519 node
diff --git a/metropolis/node/core/consensus/testhelpers.go b/metropolis/node/core/consensus/testhelpers.go
new file mode 100644
index 0000000..627a278
--- /dev/null
+++ b/metropolis/node/core/consensus/testhelpers.go
@@ -0,0 +1,48 @@
+package consensus
+
+import (
+ "context"
+ "testing"
+
+ "go.etcd.io/etcd/clientv3"
+
+ "source.monogon.dev/metropolis/pkg/event/memory"
+)
+
+type testServiceHandle struct {
+ s memory.Value
+}
+
+// TestServiceHandle builds a somewhat functioning ServiceHandle from a bare
+// etcd connection, effectively creating a fake Consensus service. This must
+// only be used in test code to perform dependency injection of a etcd client
+// into code which expects a Consensus service instance, eg. for testing the
+// Curator.
+//
+// The 'somewhat functioning' description above should serve as a hint to the
+// API stability and backwards/forwards compatibility of this function: there is
+// none.
+func TestServiceHandle(t *testing.T, cl *clientv3.Client) ServiceHandle {
+ ca := pkiCA()
+
+ tsh := testServiceHandle{}
+ st := &Status{
+ cl: cl,
+ ca: ca,
+ }
+ etcdPKI, err := st.pkiClient()
+ if err != nil {
+ t.Fatalf("failed to get PKI etcd client: %v", err)
+ }
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+ if _, err := ca.Ensure(ctx, etcdPKI); err != nil {
+ t.Fatalf("failed to ensure PKI CA: %v", err)
+ }
+ tsh.s.Set(st)
+ return &tsh
+}
+
+func (h *testServiceHandle) Watch() Watcher {
+ return Watcher{h.s.Watch()}
+}
diff --git a/metropolis/node/core/curator/BUILD.bazel b/metropolis/node/core/curator/BUILD.bazel
index 3f39af3..571a7ca 100644
--- a/metropolis/node/core/curator/BUILD.bazel
+++ b/metropolis/node/core/curator/BUILD.bazel
@@ -20,6 +20,7 @@
visibility = ["//visibility:public"],
deps = [
"//metropolis/node:go_default_library",
+ "//metropolis/node/core/consensus:go_default_library",
"//metropolis/node/core/consensus/client:go_default_library",
"//metropolis/node/core/curator/proto/api:go_default_library",
"//metropolis/node/core/curator/proto/private:go_default_library",
@@ -53,6 +54,7 @@
],
embed = [":go_default_library"],
deps = [
+ "//metropolis/node/core/consensus:go_default_library",
"//metropolis/node/core/consensus/client:go_default_library",
"//metropolis/node/core/curator/proto/api:go_default_library",
"//metropolis/node/core/curator/proto/private:go_default_library",
@@ -61,6 +63,7 @@
"//metropolis/node/core/localstorage/declarative:go_default_library",
"//metropolis/node/core/rpc:go_default_library",
"//metropolis/pkg/event/memory:go_default_library",
+ "//metropolis/pkg/pki:go_default_library",
"//metropolis/pkg/supervisor:go_default_library",
"//metropolis/proto/api:go_default_library",
"//metropolis/proto/common:go_default_library",
diff --git a/metropolis/node/core/curator/bootstrap.go b/metropolis/node/core/curator/bootstrap.go
index e1add0d..d05a764 100644
--- a/metropolis/node/core/curator/bootstrap.go
+++ b/metropolis/node/core/curator/bootstrap.go
@@ -2,12 +2,13 @@
import (
"context"
- "crypto/ed25519"
+ "crypto/x509"
"fmt"
"go.etcd.io/etcd/clientv3"
"google.golang.org/protobuf/proto"
+ "source.monogon.dev/metropolis/node/core/consensus"
"source.monogon.dev/metropolis/node/core/consensus/client"
ppb "source.monogon.dev/metropolis/node/core/curator/proto/private"
"source.monogon.dev/metropolis/node/core/identity"
@@ -24,34 +25,6 @@
// However, that seems to not be worth the effort for a tightly coupled single
// consumer like the bootstrap code.
-// BootstrapNodeCredentials creates node credentials for the first node in a
-// cluster. It can only be called by cluster bootstrap code. It returns the
-// generated x509 CA and node certificates.
-func BootstrapNodeCredentials(ctx context.Context, etcd client.Namespaced, pubkey ed25519.PublicKey) (ca, node []byte, err error) {
- id := identity.NodeID(pubkey)
-
- ca, err = pkiCA.Ensure(ctx, etcd)
- if err != nil {
- err = fmt.Errorf("when ensuring CA: %w", err)
- return
- }
- nodeCert := &pki.Certificate{
- Namespace: &pkiNamespace,
- Issuer: pkiCA,
- Template: identity.NodeCertificate(pubkey),
- Mode: pki.CertificateExternal,
- PublicKey: pubkey,
- Name: fmt.Sprintf("node-%s", id),
- }
- node, err = nodeCert.Ensure(ctx, etcd)
- if err != nil {
- err = fmt.Errorf("when ensuring node cert: %w", err)
- return
- }
-
- return
-}
-
// BootstrapFinish saves the given Node and initial cluster owner pubkey into
// etcd, without regard for any other cluster state and directly using a given
// etcd client.
@@ -59,41 +32,91 @@
// This is ran by the cluster bootstrap workflow to finish bootstrapping a
// cluster - afterwards, this cluster will be ready to serve.
//
-// This can only be used by the cluster bootstrap logic, and may only be called
-// once. It's guaranteed to either succeed fully or fail fully, without leaving
-// the cluster in an inconsistent state.
-func BootstrapFinish(ctx context.Context, etcd client.Namespaced, initialNode *Node, pubkey []byte) error {
- nodeKey, err := initialNode.etcdPath()
+// This must only be used by the cluster bootstrap logic. It is idempotent, thus
+// can be called repeatedly in case of intermittent failures in the bootstrap
+// logic.
+func BootstrapNodeFinish(ctx context.Context, etcd client.Namespaced, node *Node, ownerKey []byte) (caCertBytes, nodeCertBytes []byte, err error) {
+ // Workaround for pkiCA being a global, but BootstrapNodeFinish being called for
+ // multiple, different etcd instances in tests. Doing this ensures that we
+ // always resynchronize with etcd, ie. not keep certificates loaded in memory
+ // even though the underlying certificate in etcd changed.
+ //
+ // TODO(q3k): pass pkiCA explicitly, eg. within a curator object?
+ pkiCA.PrivateKey = nil
+ pkiCA.PublicKey = nil
+
+ // Issue CA and node certificates.
+ caCertBytes, err = pkiCA.Ensure(ctx, etcd)
if err != nil {
- return fmt.Errorf("failed to get node key: %w", err)
+ return nil, nil, fmt.Errorf("when ensuring CA: %w", err)
}
- nodeRaw, err := proto.Marshal(initialNode.proto())
+ nodeCert := &pki.Certificate{
+ Namespace: &pkiNamespace,
+ Issuer: pkiCA,
+ Template: identity.NodeCertificate(node.pubkey),
+ Mode: pki.CertificateExternal,
+ PublicKey: node.pubkey,
+ Name: fmt.Sprintf("node-%s", node.ID()),
+ }
+ nodeCertBytes, err = nodeCert.Ensure(ctx, etcd)
if err != nil {
- return fmt.Errorf("failed to marshal node: %w", err)
+ err = fmt.Errorf("when ensuring node cert: %w", err)
+ return
}
- owner := &ppb.InitialOwner{
- PublicKey: pubkey,
- }
- ownerKey := initialOwnerEtcdPath
- ownerRaw, err := proto.Marshal(owner)
+ nodeCertX509, err := x509.ParseCertificate(nodeCertBytes)
if err != nil {
- return fmt.Errorf("failed to marshal iniail owner: %w", err)
+ err = fmt.Errorf("when parsing node cert: %w", err)
+ return
}
- res, err := etcd.Txn(ctx).If(
- clientv3.Compare(clientv3.CreateRevision(nodeKey), "=", 0),
- clientv3.Compare(clientv3.CreateRevision(ownerKey), "=", 0),
+ caCertX509, err := x509.ParseCertificate(caCertBytes)
+ if err != nil {
+ err = fmt.Errorf("when parsing CA cert: %w", err)
+ return
+ }
+
+ w := pkiCA.WatchCRL(etcd)
+ defer w.Close()
+ crl, err := w.Get(ctx)
+ if err != nil {
+ err = fmt.Errorf("when retreiving CRL: %w", err)
+ return
+ }
+
+ node.EnableConsensusMember(&consensus.JoinCluster{
+ CACertificate: caCertX509,
+ NodeCertificate: nodeCertX509,
+ ExistingNodes: nil,
+ InitialCRL: crl,
+ })
+
+ nodePath, err := node.etcdPath()
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to get node key: %w", err)
+ }
+ nodeRaw, err := proto.Marshal(node.proto())
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to marshal node: %w", err)
+ }
+ ownerRaw, err := proto.Marshal(&ppb.InitialOwner{
+ PublicKey: ownerKey,
+ })
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to marshal initial owner: %w", err)
+ }
+
+ // We don't care about the result's success - this is idempotent.
+ _, err = etcd.Txn(ctx).If(
+ clientv3.Compare(clientv3.CreateRevision(nodePath), "=", 0),
+ clientv3.Compare(clientv3.CreateRevision(initialOwnerEtcdPath), "=", 0),
).Then(
- clientv3.OpPut(nodeKey, string(nodeRaw)),
- clientv3.OpPut(ownerKey, string(ownerRaw)),
+ clientv3.OpPut(nodePath, string(nodeRaw)),
+ clientv3.OpPut(initialOwnerEtcdPath, string(ownerRaw)),
).Commit()
if err != nil {
- return fmt.Errorf("failed to store initial cluster state: %w", err)
+ return nil, nil, fmt.Errorf("failed to store initial cluster state: %w", err)
}
- if !res.Succeeded {
- return fmt.Errorf("cluster already bootstrapped")
- }
- return nil
+ return
}
diff --git a/metropolis/node/core/curator/curator.go b/metropolis/node/core/curator/curator.go
index 6d16d62..e247296 100644
--- a/metropolis/node/core/curator/curator.go
+++ b/metropolis/node/core/curator/curator.go
@@ -21,7 +21,7 @@
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
- "source.monogon.dev/metropolis/node/core/consensus/client"
+ "source.monogon.dev/metropolis/node/core/consensus"
ppb "source.monogon.dev/metropolis/node/core/curator/proto/private"
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/localstorage"
@@ -36,9 +36,7 @@
// NodeCredentials are the identity credentials for the node that is running
// this curator.
NodeCredentials *identity.NodeCredentials
- // Etcd is an etcd client in which all curator storage and leader election
- // will be kept.
- Etcd client.Namespaced
+ Consensus consensus.ServiceHandle
// LeaderTTL is the timeout on the lease used to perform leader election.
// Any active leader must continue updating its lease at least this often,
// or the lease (and leadership) will be lost.
@@ -153,8 +151,19 @@
return fmt.Errorf("building lock value failed: %w", err)
}
+ w := s.config.Consensus.Watch()
+ defer w.Close()
+ st, err := w.Get(ctx)
+ if err != nil {
+ return fmt.Errorf("getting consensus status failed: %w", err)
+ }
+ cl, err := st.CuratorClient()
+ if err != nil {
+ return fmt.Errorf("getting consensus client failed: %w", err)
+ }
+
// Establish a lease/session with etcd.
- session, err := concurrency.NewSession(s.config.Etcd.ThinClient(ctx),
+ session, err := concurrency.NewSession(cl.ThinClient(ctx),
concurrency.WithContext(ctx),
concurrency.WithTTL(s.ttl))
if err != nil {
@@ -256,6 +265,19 @@
}
}()
+ supervisor.Logger(ctx).Infof("Waiting for consensus...")
+ w := s.config.Consensus.Watch()
+ defer w.Close()
+ st, err := w.GetRunning(ctx)
+ if err != nil {
+ return fmt.Errorf("while waiting for consensus: %w", err)
+ }
+ supervisor.Logger(ctx).Infof("Got consensus, starting up...")
+ etcd, err := st.CuratorClient()
+ if err != nil {
+ return fmt.Errorf("while retrieving consensus client: %w", err)
+ }
+
// Start listener. This is a gRPC service listening on a local socket,
// providing the Curator API to consumers, dispatching to either a locally
// running leader, or forwarding to a remotely running leader.
@@ -263,8 +285,9 @@
directory: s.config.Directory,
node: s.config.NodeCredentials,
electionWatch: s.electionWatch,
- etcd: s.config.Etcd,
dispatchC: make(chan dispatchRequest),
+ consensus: s.config.Consensus,
+ etcd: etcd,
}
if err := supervisor.Run(ctx, "listener", lis.run); err != nil {
return fmt.Errorf("when starting listener: %w", err)
@@ -292,7 +315,7 @@
}
}
-func (s *Service) DialCluster(ctx context.Context) (*grpc.ClientConn, error) {
+func (s *Service) DialCluster() (*grpc.ClientConn, error) {
remote := fmt.Sprintf("unix://%s", s.config.Directory.ClientSocket.FullPath())
return rpc.NewNodeClient(remote)
}
diff --git a/metropolis/node/core/curator/curator_test.go b/metropolis/node/core/curator/curator_test.go
index 5529e5d..9212a52 100644
--- a/metropolis/node/core/curator/curator_test.go
+++ b/metropolis/node/core/curator/curator_test.go
@@ -10,7 +10,7 @@
"go.etcd.io/etcd/clientv3"
"go.etcd.io/etcd/integration"
- "source.monogon.dev/metropolis/node/core/consensus/client"
+ "source.monogon.dev/metropolis/node/core/consensus"
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/node/core/localstorage/declarative"
@@ -88,10 +88,10 @@
}
svc := New(Config{
- Etcd: client.NewLocal(cli),
NodeCredentials: n,
LeaderTTL: time.Second,
Directory: &dir,
+ Consensus: consensus.TestServiceHandle(t, cli),
})
if err := supervisor.Run(ctx, n.ID(), svc.Run); err != nil {
t.Fatalf("Run %s: %v", n.ID(), err)
diff --git a/metropolis/node/core/curator/impl_leader.go b/metropolis/node/core/curator/impl_leader.go
index a1b9941..75ede5d 100644
--- a/metropolis/node/core/curator/impl_leader.go
+++ b/metropolis/node/core/curator/impl_leader.go
@@ -11,6 +11,7 @@
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
+ "source.monogon.dev/metropolis/node/core/consensus"
"source.monogon.dev/metropolis/node/core/consensus/client"
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/rpc"
@@ -38,6 +39,8 @@
// additionally guarded using etcd transactions.
muNodes sync.Mutex
+ consensus consensus.ServiceHandle
+
// muRegisterTicket guards changes to the register ticket. Its usage semantics
// are the same as for muNodes, as described above.
muRegisterTicket sync.Mutex
diff --git a/metropolis/node/core/curator/impl_leader_curator.go b/metropolis/node/core/curator/impl_leader_curator.go
index 348ec2c..26f3004 100644
--- a/metropolis/node/core/curator/impl_leader_curator.go
+++ b/metropolis/node/core/curator/impl_leader_curator.go
@@ -4,6 +4,7 @@
"context"
"crypto/subtle"
"fmt"
+ "net"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -13,6 +14,7 @@
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/rpc"
"source.monogon.dev/metropolis/pkg/event/etcd"
+ "source.monogon.dev/metropolis/pkg/pki"
cpb "source.monogon.dev/metropolis/proto/common"
)
@@ -225,6 +227,10 @@
return nil, status.Errorf(codes.InvalidArgument, "Status and Status.ExternalAddress must be set")
}
+ if net.ParseIP(req.Status.ExternalAddress) == nil {
+ return nil, status.Errorf(codes.InvalidArgument, "Status.ExternalAddress must be a valid IP address")
+ }
+
// As we're performing a node update with two etcd transactions below (one
// to retrieve, one to save and upate node), take a local lock to ensure
// that we don't have a race between either two UpdateNodeStatus calls or
@@ -363,14 +369,39 @@
// If this fails we are safe to let the client retry, as the PKI code is
// idempotent.
- caCertBytes, nodeCertBytes, err := BootstrapNodeCredentials(ctx, l.etcd, pubkey)
+ caCertBytes, err := pkiCA.Ensure(ctx, l.etcd)
if err != nil {
- return nil, status.Errorf(codes.Unavailable, "could not bootstrap node credentials: %v", err)
+ return nil, status.Errorf(codes.Unavailable, "could not get CA certificate: %v", err)
+ }
+ nodeCert := &pki.Certificate{
+ Namespace: &pkiNamespace,
+ Issuer: pkiCA,
+ Template: identity.NodeCertificate(node.pubkey),
+ Mode: pki.CertificateExternal,
+ PublicKey: node.pubkey,
+ Name: fmt.Sprintf("node-%s", identity.NodeID(pubkey)),
+ }
+ nodeCertBytes, err := nodeCert.Ensure(ctx, l.etcd)
+ if err != nil {
+ return nil, status.Errorf(codes.Unavailable, "could not emit node credentials: %v", err)
+ }
+
+ w := l.consensus.Watch()
+ defer w.Close()
+ st, err := w.GetRunning(ctx)
+ if err != nil {
+ return nil, status.Errorf(codes.Unavailable, "could not get running consensus: %v", err)
+ }
+
+ join, err := st.AddNode(ctx, node.pubkey)
+ if err != nil {
+ return nil, status.Errorf(codes.Unavailable, "could not add node: %v", err)
}
node.state = cpb.NodeState_NODE_STATE_UP
node.clusterUnlockKey = req.ClusterUnlockKey
-
+ node.EnableConsensusMember(join)
+ node.EnableKubernetesWorker()
if err := nodeSave(ctx, l.leadership, node); err != nil {
return nil, err
}
diff --git a/metropolis/node/core/curator/impl_leader_test.go b/metropolis/node/core/curator/impl_leader_test.go
index a893b87..e4829bf 100644
--- a/metropolis/node/core/curator/impl_leader_test.go
+++ b/metropolis/node/core/curator/impl_leader_test.go
@@ -5,6 +5,7 @@
"context"
"crypto/ed25519"
"crypto/rand"
+ "crypto/tls"
"crypto/x509"
"encoding/hex"
"testing"
@@ -14,11 +15,13 @@
"google.golang.org/grpc/test/bufconn"
"google.golang.org/protobuf/proto"
+ "source.monogon.dev/metropolis/node/core/consensus"
"source.monogon.dev/metropolis/node/core/consensus/client"
ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
ppb "source.monogon.dev/metropolis/node/core/curator/proto/private"
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/rpc"
+ "source.monogon.dev/metropolis/pkg/pki"
apb "source.monogon.dev/metropolis/proto/api"
cpb "source.monogon.dev/metropolis/proto/common"
)
@@ -50,35 +53,62 @@
}()
// Create etcd client to test cluster.
- cl := client.NewLocal(cluster.Client(0))
+ curEtcd, _ := client.NewLocal(cluster.Client(0)).Sub("curator")
// Create a fake lock key/value and retrieve its revision. This replaces the
// leader election functionality in the curator to enable faster and more
// focused tests.
lockKey := "/test-lock"
- res, err := cl.Put(ctx, lockKey, "fake key")
+ res, err := curEtcd.Put(ctx, lockKey, "fake key")
if err != nil {
t.Fatalf("setting fake leader key failed: %v", err)
}
lockRev := res.Header.Revision
- // Build a test cluster PKI and node/manager certificates.
- ephemeral := rpc.NewEphemeralClusterCredentials(t, 2)
- nodeCredentials := ephemeral.Nodes[0]
-
- // Build a curator leader object. This implements methods that will be
- // exercised by tests.
- leader := newCuratorLeader(&leadership{
- lockKey: lockKey,
- lockRev: lockRev,
- etcd: cl,
- }, &nodeCredentials.Node)
-
- cNode := NewNodeForBootstrap(nil, nodeCredentials.PublicKey())
- // Inject new node into leader, using curator bootstrap functionality.
- if err := BootstrapFinish(ctx, cl, &cNode, nodeCredentials.PublicKey()); err != nil {
+ // Build cluster PKI with first node, replicating the cluster bootstrap process.
+ nodePub, nodePriv, err := ed25519.GenerateKey(rand.Reader)
+ if err != nil {
+ t.Fatalf("could not generate node keypair: %v", err)
+ }
+ cNode := NewNodeForBootstrap(nil, nodePub)
+ caCertBytes, nodeCertBytes, err := BootstrapNodeFinish(ctx, curEtcd, &cNode, nil)
+ if err != nil {
t.Fatalf("could not finish node bootstrap: %v", err)
}
+ nodeCredentials, err := identity.NewNodeCredentials(nodePriv, nodeCertBytes, caCertBytes)
+ if err != nil {
+ t.Fatalf("could not build node credentials: %v", err)
+ }
+
+ // Generate credentials for cluster manager. This doesn't go through the owner
+ // credentials escrow, instead manually generating a certificate that would be
+ // generated by the escrow call.
+ ownerPub, ownerPriv, err := ed25519.GenerateKey(rand.Reader)
+ if err != nil {
+ t.Fatalf("could not generate owner keypair: %v", err)
+ }
+ oc := pki.Certificate{
+ Namespace: &pkiNamespace,
+ Issuer: pkiCA,
+ Template: identity.UserCertificate("owner"),
+ Name: "owner",
+ Mode: pki.CertificateExternal,
+ PublicKey: ownerPub,
+ }
+ ownerCertBytes, err := oc.Ensure(ctx, curEtcd)
+ if err != nil {
+ t.Fatalf("could not issue owner certificate: %v", err)
+ }
+ ownerCreds := tls.Certificate{
+ PrivateKey: ownerPriv,
+ Certificate: [][]byte{ownerCertBytes},
+ }
+
+ // Generate keypair for some third-party, unknown node.
+ otherPub, otherPriv, err := ed25519.GenerateKey(rand.Reader)
+ if err != nil {
+ t.Fatalf("could not generate unknown node keypair: %v", err)
+ }
// Create security interceptors for both gRPC listeners.
externalSec := &rpc.ExternalServerSecurity{
@@ -87,6 +117,16 @@
localSec := &rpc.LocalServerSecurity{
Node: &nodeCredentials.Node,
}
+
+ // Build a curator leader object. This implements methods that will be
+ // exercised by tests.
+ leader := newCuratorLeader(&leadership{
+ lockKey: lockKey,
+ lockRev: lockRev,
+ etcd: curEtcd,
+ consensus: consensus.TestServiceHandle(t, cluster.Client(0)),
+ }, &nodeCredentials.Node)
+
// Create a curator gRPC server which performs authentication as per the created
// listenerSecurity and is backed by the created leader.
externalSrv := externalSec.SetupExternalGRPC(nil, leader)
@@ -113,7 +153,7 @@
}()
// Create an authenticated manager gRPC client.
- mcl, err := rpc.NewAuthenticatedClientTest(externalLis, ephemeral.Manager, ephemeral.CA)
+ mcl, err := rpc.NewAuthenticatedClientTest(externalLis, ownerCreds, nodeCredentials.ClusterCA())
if err != nil {
t.Fatalf("Dialing external GRPC failed: %v", err)
}
@@ -125,8 +165,7 @@
}
// Create an ephemeral node gRPC client for the 'other node'.
- otherNode := ephemeral.Nodes[1]
- ocl, err := rpc.NewEphemeralClientTest(externalLis, otherNode.TLSCredentials().PrivateKey.(ed25519.PrivateKey), ephemeral.CA)
+ ocl, err := rpc.NewEphemeralClientTest(externalLis, otherPriv, nodeCredentials.ClusterCA())
if err != nil {
t.Fatalf("Dialing external GRPC failed: %v", err)
}
@@ -143,11 +182,11 @@
localNodeConn: lcl,
localNodeID: nodeCredentials.ID(),
otherNodeConn: ocl,
- otherNodeID: otherNode.ID(),
- otherNodePriv: otherNode.TLSCredentials().PrivateKey.(ed25519.PrivateKey),
- caPubKey: ephemeral.CA.PublicKey.(ed25519.PublicKey),
+ otherNodeID: identity.NodeID(otherPub),
+ otherNodePriv: otherPriv,
+ ca: nodeCredentials.ClusterCA(),
cancel: ctxC,
- etcd: cl,
+ etcd: curEtcd,
}
}
@@ -172,8 +211,7 @@
// otherNodePriv is the private key of the other node present in the curator
// state, ie. the private key for otherNodeID.
otherNodePriv ed25519.PrivateKey
- // caPubKey is the public key of the CA for this cluster.
- caPubKey ed25519.PublicKey
+ ca *x509.Certificate
// cancel shuts down the fake leader and all client connections.
cancel context.CancelFunc
// etcd contains a low-level connection to the curator K/V store, which can be
@@ -612,7 +650,7 @@
}
// TestManagementClusterInfo exercises GetClusterInfo after setting a status.
-func TestMangementClusterInfo(t *testing.T) {
+func TestManagementClusterInfo(t *testing.T) {
cl := fakeLeader(t)
defer cl.cancel()
@@ -658,7 +696,7 @@
if err != nil {
t.Fatalf("CaCertificate could not be parsed: %v", err)
}
- if want, got := cl.caPubKey, ca.PublicKey.(ed25519.PublicKey); !bytes.Equal(want, got) {
+ if want, got := cl.ca.PublicKey.(ed25519.PublicKey), ca.PublicKey.(ed25519.PublicKey); !bytes.Equal(want, got) {
t.Fatalf("CaPublicKey mismatch (wanted %s, got %s)", hex.EncodeToString(want), hex.EncodeToString(got))
}
}
diff --git a/metropolis/node/core/curator/listener.go b/metropolis/node/core/curator/listener.go
index b0b8d5d..d142851 100644
--- a/metropolis/node/core/curator/listener.go
+++ b/metropolis/node/core/curator/listener.go
@@ -11,6 +11,7 @@
"google.golang.org/grpc/status"
"source.monogon.dev/metropolis/node"
+ "source.monogon.dev/metropolis/node/core/consensus"
"source.monogon.dev/metropolis/node/core/consensus/client"
cpb "source.monogon.dev/metropolis/node/core/curator/proto/api"
"source.monogon.dev/metropolis/node/core/identity"
@@ -52,6 +53,8 @@
// directly.
electionWatch func() electionWatcher
+ consensus consensus.ServiceHandle
+
dispatchC chan dispatchRequest
}
@@ -146,9 +149,10 @@
// and a new one with the lock freed) the previous leader will fail on
// txnAsLeader due to the leadership being outdated.
t.impl = newCuratorLeader(&leadership{
- lockKey: leader.lockKey,
- lockRev: leader.lockRev,
- etcd: l.etcd,
+ lockKey: leader.lockKey,
+ lockRev: leader.lockRev,
+ etcd: l.etcd,
+ consensus: l.consensus,
}, &l.node.Node)
} else {
supervisor.Logger(ctx).Info("Dispatcher switching over to follower")
diff --git a/metropolis/node/core/curator/state_node.go b/metropolis/node/core/curator/state_node.go
index e4012cd..f20f981 100644
--- a/metropolis/node/core/curator/state_node.go
+++ b/metropolis/node/core/curator/state_node.go
@@ -18,6 +18,7 @@
import (
"context"
+ "crypto/x509"
"fmt"
"go.etcd.io/etcd/clientv3"
@@ -25,9 +26,11 @@
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
+ "source.monogon.dev/metropolis/node/core/consensus"
ppb "source.monogon.dev/metropolis/node/core/curator/proto/private"
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/rpc"
+ "source.monogon.dev/metropolis/pkg/pki"
cpb "source.monogon.dev/metropolis/proto/common"
)
@@ -74,6 +77,8 @@
// In the future, this will be split into a separate worker and control plane
// role.
kubernetesWorker *NodeRoleKubernetesWorker
+
+ consensusMember *NodeRoleConsensusMember
}
// NewNodeForBootstrap creates a brand new node without regard for any other
@@ -85,8 +90,6 @@
clusterUnlockKey: cuk,
pubkey: pubkey,
state: cpb.NodeState_NODE_STATE_UP,
- // TODO(q3k): make this configurable.
- kubernetesWorker: &NodeRoleKubernetesWorker{},
}
}
@@ -95,6 +98,37 @@
type NodeRoleKubernetesWorker struct {
}
+// NodeRoleConsensusMember defines that the Node should be running a
+// consensus/etcd instance.
+type NodeRoleConsensusMember struct {
+ // CACertificate, PeerCertificate are the X509 certificates to be used by the
+ // node's etcd member to serve peer traffic.
+ CACertificate, PeerCertificate *x509.Certificate
+ // CRL is an initial certificate revocation list that the etcd member should
+ // start with.
+ //
+ // TODO(q3k): don't store this in etcd like that, instead have the node retrieve
+ // an initial CRL using gRPC/Curator.Watch.
+ CRL *pki.CRL
+
+ // Peers are a list of etcd members that the node's etcd member should attempt
+ // to connect to.
+ //
+ // TODO(q3k): don't store this in etcd like that, instead have this be
+ // dynamically generated at time of retrieval.
+ Peers []NodeRoleConsensusMemberPeer
+}
+
+// NodeRoleConsensusMemberPeer is a name/URL pair pointing to an etcd member's
+// peer listener.
+type NodeRoleConsensusMemberPeer struct {
+ // Name is the name of the etcd member, equal to the Metropolis node's ID that
+ // the etcd member is running on.
+ Name string
+ // URL is a https://host:port string that can be passed to etcd on startup.
+ URL string
+}
+
// ID returns the name of this node. See NodeID for more information.
func (n *Node) ID() string {
return identity.NodeID(n.pubkey)
@@ -114,6 +148,24 @@
return &kw
}
+func (n *Node) EnableKubernetesWorker() {
+ n.kubernetesWorker = &NodeRoleKubernetesWorker{}
+}
+
+func (n *Node) EnableConsensusMember(jc *consensus.JoinCluster) {
+ peers := make([]NodeRoleConsensusMemberPeer, len(jc.ExistingNodes))
+ for i, n := range jc.ExistingNodes {
+ peers[i].Name = n.Name
+ peers[i].URL = n.URL
+ }
+ n.consensusMember = &NodeRoleConsensusMember{
+ CACertificate: jc.CACertificate,
+ PeerCertificate: jc.NodeCertificate,
+ Peers: peers,
+ CRL: jc.InitialCRL,
+ }
+}
+
var (
nodeEtcdPrefix = mustNewEtcdPrefix("/nodes/")
)
@@ -137,6 +189,21 @@
if n.kubernetesWorker != nil {
msg.Roles.KubernetesWorker = &cpb.NodeRoles_KubernetesWorker{}
}
+ if n.consensusMember != nil {
+ peers := make([]*cpb.NodeRoles_ConsensusMember_Peer, len(n.consensusMember.Peers))
+ for i, p := range n.consensusMember.Peers {
+ peers[i] = &cpb.NodeRoles_ConsensusMember_Peer{
+ Name: p.Name,
+ URL: p.URL,
+ }
+ }
+ msg.Roles.ConsensusMember = &cpb.NodeRoles_ConsensusMember{
+ CaCertificate: n.consensusMember.CACertificate.Raw,
+ PeerCertificate: n.consensusMember.PeerCertificate.Raw,
+ InitialCrl: n.consensusMember.CRL.Raw,
+ Peers: peers,
+ }
+ }
return msg
}
@@ -154,6 +221,36 @@
if msg.Roles.KubernetesWorker != nil {
n.kubernetesWorker = &NodeRoleKubernetesWorker{}
}
+ if cm := msg.Roles.ConsensusMember; cm != nil {
+ caCert, err := x509.ParseCertificate(cm.CaCertificate)
+ if err != nil {
+ return nil, fmt.Errorf("could not unmarshal consensus ca certificate: %w", err)
+ }
+ peerCert, err := x509.ParseCertificate(cm.PeerCertificate)
+ if err != nil {
+ return nil, fmt.Errorf("could not unmarshal consensus peer certificate: %w", err)
+ }
+ crl, err := x509.ParseCRL(cm.InitialCrl)
+ if err != nil {
+ return nil, fmt.Errorf("could not unmarshal consensus crl: %w", err)
+ }
+ var peers []NodeRoleConsensusMemberPeer
+ for _, p := range cm.Peers {
+ peers = append(peers, NodeRoleConsensusMemberPeer{
+ Name: p.Name,
+ URL: p.URL,
+ })
+ }
+ n.consensusMember = &NodeRoleConsensusMember{
+ CACertificate: caCert,
+ PeerCertificate: peerCert,
+ CRL: &pki.CRL{
+ Raw: cm.InitialCrl,
+ List: crl,
+ },
+ Peers: peers,
+ }
+ }
return n, nil
}
diff --git a/metropolis/node/core/main.go b/metropolis/node/core/main.go
index f14b589..34fa05e 100644
--- a/metropolis/node/core/main.go
+++ b/metropolis/node/core/main.go
@@ -161,16 +161,15 @@
// this.
var rs *roleserve.Service
- if status.HasLocalConsensus {
- // Retrieve namespaced etcd KV clients for the two main direct etcd users:
- // - Curator
- // - Kubernetes PKI
- ckv, err := status.ConsensusClient(cluster.ConsensusUserCurator)
+ if status.Consensus != nil {
+ w := status.Consensus.Watch()
+ supervisor.Logger(ctx).Infof("Waiting for consensus before continuing control plane startup...")
+ st, err := w.GetRunning(ctx)
if err != nil {
- close(trapdoor)
- return fmt.Errorf("failed to retrieve consensus curator client: %w", err)
+ return fmt.Errorf("while waiting for running consensus: %w", err)
}
- kkv, err := status.ConsensusClient(cluster.ConsensusUserKubernetesPKI)
+ supervisor.Logger(ctx).Infof("Got consensus, continuing control plane startup...")
+ kkv, err := st.KubernetesClient()
if err != nil {
close(trapdoor)
return fmt.Errorf("failed to retrieve consensus kubernetes PKI client: %w", err)
@@ -182,7 +181,7 @@
// management of the cluster.
// In the future, this will only be started on nodes that run etcd.
c := curator.New(curator.Config{
- Etcd: ckv,
+ Consensus: status.Consensus,
NodeCredentials: status.Credentials,
// TODO(q3k): make this configurable?
LeaderTTL: time.Second * 5,
@@ -195,7 +194,7 @@
// We are now in a cluster. We can thus access our 'node' object and
// start all services that we should be running.
- logger.Info("Enrolment success, continuing startup.")
+ logger.Info("Control plane running, starting roleserver....")
// Ensure Kubernetes PKI objects exist in etcd. In the future, this logic will
// be implemented in the curator.
diff --git a/metropolis/node/core/roleserve/roleserve.go b/metropolis/node/core/roleserve/roleserve.go
index 57e6a7a..557ed68 100644
--- a/metropolis/node/core/roleserve/roleserve.go
+++ b/metropolis/node/core/roleserve/roleserve.go
@@ -38,7 +38,7 @@
// As both the curator listener and roleserver might restart, this dial function
// is needed to possibly re-establish connectivity after a full restart of
// either.
- CuratorDial func(ctx context.Context) (*grpc.ClientConn, error)
+ CuratorDial func() (*grpc.ClientConn, error)
// StorageRoot is a handle to access all of the Node's storage. This is needed
// as the roleserver spawns complex workloads like Kubernetes which need access
@@ -117,7 +117,7 @@
// start/stop subordinate services as the Node's roles change.
func (s *Service) Run(ctx context.Context) error {
supervisor.Logger(ctx).Info("Dialing curator...")
- conn, err := s.CuratorDial(ctx)
+ conn, err := s.CuratorDial()
if err != nil {
return fmt.Errorf("could not dial cluster curator: %w", err)
}
diff --git a/metropolis/proto/common/common.proto b/metropolis/proto/common/common.proto
index 63e6cfb..07ff460 100644
--- a/metropolis/proto/common/common.proto
+++ b/metropolis/proto/common/common.proto
@@ -21,10 +21,39 @@
// NodeRoles are the possible roles that a Metropolis Node should run within the
// cluster. These are configured by the cluster and can be retrieved through the
// Curator.
+//
+// Fields contained within each individual are publicly available, so while they
+// can be used to carry required data to start up services for a given role,
+// this must not be confidential/private data.
message NodeRoles {
message KubernetesWorker {
}
+ message ConsensusMember {
+ // ca_certificate is a DER-encoded x509 certificate of the etcd
+ // cluster's CA. The member must use this certificate to verify the
+ // identity of the cluster it's connecting to.
+ bytes ca_certificate = 1;
+ // pper_certificate is a DER-encoded x509 certificate of this node's
+ // etcd peer listener. The member must serve member traffic using this
+ // certificate. The private key corresponding to this certificate is
+ // the same as the node's primary private keypair.
+ bytes peer_certificate = 2;
+ // initial_crl is a certificate revocation list that the etcd member
+ // should be started with. After startup, the member will maintain its
+ // own CRL by updating it from its primary storage location, and etcd
+ // value.
+ //
+ // TODO(q3k): don't pass this here, instead pass this over an etcd
+ // watcher and curator.Watch.
+ bytes initial_crl = 3;
+ message Peer {
+ string Name = 1;
+ string URL = 2;
+ }
+ repeated Peer peers = 4;
+ }
KubernetesWorker kubernetes_worker = 1;
+ ConsensusMember consensus_member = 2;
}
// NodeState is the state of a Metropolis node from the point of view of the