m/n/core/curator: implement leader and Node/PKI state
This implements actual Curator logic for nodes and PKI. These will
replace the cluster manager's equivalent logic.
There are two entry points to this logic:
- the gRPC service's Watch method for accessing node status
- bootstrap logic to create a node when the cluster manager bootstraps
the cluster.
Test plan: a followup CR will introduce tests for the Curator - more
granular than the full E2E suite. DO NOT MERGE UNTIL THEN, as this is
critical code.
Change-Id: I8c40a821b846012b90cf9a5df27901d1b49f388c
Reviewed-on: https://review.monogon.dev/c/monogon/+/186
Reviewed-by: Lorenz Brun <lorenz@nexantic.com>
diff --git a/metropolis/node/core/curator/BUILD.bazel b/metropolis/node/core/curator/BUILD.bazel
index f52233d..aa9eb95 100644
--- a/metropolis/node/core/curator/BUILD.bazel
+++ b/metropolis/node/core/curator/BUILD.bazel
@@ -3,10 +3,13 @@
go_library(
name = "go_default_library",
srcs = [
+ "bootstrap.go",
"curator.go",
"impl_follower.go",
"impl_leader.go",
"listener.go",
+ "state_node.go",
+ "state_pki.go",
],
importpath = "source.monogon.dev/metropolis/node/core/curator",
visibility = ["//visibility:public"],
@@ -17,13 +20,18 @@
"//metropolis/node/core/localstorage:go_default_library",
"//metropolis/pkg/combinectx:go_default_library",
"//metropolis/pkg/event:go_default_library",
+ "//metropolis/pkg/event/etcd:go_default_library",
"//metropolis/pkg/event/memory:go_default_library",
+ "//metropolis/pkg/pki:go_default_library",
"//metropolis/pkg/supervisor:go_default_library",
+ "//metropolis/proto/common:go_default_library",
+ "@io_etcd_go_etcd//clientv3:go_default_library",
"@io_etcd_go_etcd//clientv3/concurrency:go_default_library",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//codes:go_default_library",
"@org_golang_google_grpc//status:go_default_library",
"@org_golang_google_protobuf//proto:go_default_library",
+ "@org_golang_x_sys//unix:go_default_library",
],
)
diff --git a/metropolis/node/core/curator/bootstrap.go b/metropolis/node/core/curator/bootstrap.go
new file mode 100644
index 0000000..253ba72
--- /dev/null
+++ b/metropolis/node/core/curator/bootstrap.go
@@ -0,0 +1,81 @@
+package curator
+
+import (
+ "context"
+ "fmt"
+
+ "go.etcd.io/etcd/clientv3"
+ "google.golang.org/protobuf/proto"
+
+ "source.monogon.dev/metropolis/node/core/consensus/client"
+ "source.monogon.dev/metropolis/pkg/pki"
+)
+
+// bootstrap.go contains functions specific for integration between the curator
+// and cluster bootstrap code (//metropolis/node/core/cluster).
+//
+// These functions must only be called by the bootstrap code, and are
+// effectively well-controlled abstraction leaks. An alternative would be to
+// rework the curator API to explicitly support a well-contained and
+// well-defined bootstrap procedure, formalized within bootstrap-specific types.
+// However, that seems to not be worth the effort for a tightly coupled single
+// consumer like the bootstrap code.
+
+// BootstrapNodeCredentials creates node credentials for the first node in a
+// cluster. It can only be called by cluster bootstrap code.
+//
+// TODO(q3k): don't require privkey, but that needs some //metropolis/pkg/pki changes first.
+func BootstrapNodeCredentials(ctx context.Context, etcd client.Namespaced, priv, pub []byte) (*NodeCredentials, error) {
+ id := NodeID(pub)
+
+ caCertBytes, _, err := pkiCA.Ensure(ctx, etcd)
+ if err != nil {
+ return nil, fmt.Errorf("when ensuring CA: %w", err)
+ }
+ nodeCert := pkiNamespace.New(pkiCA, "", pki.Server([]string{id}, nil))
+ nodeCert.UseExistingKey(priv)
+ nodeCertBytes, _, err := nodeCert.Ensure(ctx, etcd)
+ if err != nil {
+ return nil, fmt.Errorf("when ensuring node cert: %w", err)
+ }
+
+ return &NodeCredentials{
+ NodeCertificate: NodeCertificate{
+ PublicKey: pub,
+ Certificate: nodeCertBytes,
+ CACertificate: caCertBytes,
+ },
+ PrivateKey: priv,
+ }, nil
+}
+
+// BootstrapStore saves the Node into etcd, without regard for any other cluster
+// state and directly using a given etcd client.
+//
+// This can only be used by the cluster bootstrap logic.
+func (n *Node) BootstrapStore(ctx context.Context, etcd client.Namespaced) error {
+ // Currently the only flow to store a node to etcd is a write-once flow:
+ // once a node is created, it cannot be deleted or updated. In the future,
+ // flows to change cluster node roles might be introduced (ie. to promote
+ // nodes to consensus members, etc).
+ key := n.etcdPath()
+ msg := n.proto()
+ nodeRaw, err := proto.Marshal(msg)
+ if err != nil {
+ return fmt.Errorf("failed to marshal node: %w", err)
+ }
+
+ res, err := etcd.Txn(ctx).If(
+ clientv3.Compare(clientv3.CreateRevision(key), "=", 0),
+ ).Then(
+ clientv3.OpPut(key, string(nodeRaw)),
+ ).Commit()
+ if err != nil {
+ return fmt.Errorf("failed to store node: %w", err)
+ }
+
+ if !res.Succeeded {
+ return fmt.Errorf("attempted to re-register node (unsupported flow)")
+ }
+ return nil
+}
diff --git a/metropolis/node/core/curator/impl_leader.go b/metropolis/node/core/curator/impl_leader.go
index 2f19fd5..885dee0 100644
--- a/metropolis/node/core/curator/impl_leader.go
+++ b/metropolis/node/core/curator/impl_leader.go
@@ -1,19 +1,109 @@
package curator
import (
+ "context"
+ "errors"
+ "fmt"
+
+ "go.etcd.io/etcd/clientv3"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"source.monogon.dev/metropolis/node/core/consensus/client"
apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+ "source.monogon.dev/metropolis/pkg/event/etcd"
)
+// curatorLeader implements the curator acting as the elected leader of a
+// cluster. It performs direct reads/writes from/to etcd as long as it remains
+// leader.
+//
+// It effectively implements all the core management logic of a Metropolis
+// cluster.
type curatorLeader struct {
+ // lockKey is the etcd key which backs this leader-elected instance.
lockKey string
+ // lockRev is the revision at which lockKey was created. The leader will use it
+ // in combination with lockKey to ensure all mutations/reads performed to etcd
+ // succeed only if this leader election is still current.
lockRev int64
- etcd client.Namespaced
+ // etcd is the etcd client in which curator data and leader election state is
+ // stored.
+ etcd client.Namespaced
+}
+
+var (
+ // lostLeadership is returned by txnAsLeader if the transaction got canceled
+ // because leadership was lost.
+ lostLeadership = errors.New("lost leadership")
+)
+
+// txnAsLeader performs an etcd transaction guarded by continued leadership.
+// lostLeadership will be returned as an error in case the leadership is lost.
+func (c *curatorLeader) txnAsLeader(ctx context.Context, ops ...clientv3.Op) (*clientv3.TxnResponse, error) {
+ resp, err := c.etcd.Txn(ctx).If(
+ clientv3.Compare(clientv3.CreateRevision(c.lockKey), "=", c.lockRev),
+ ).Then(ops...).Commit()
+ if err != nil {
+ return nil, fmt.Errorf("when running leader transaction: %w", err)
+ }
+ if !resp.Succeeded {
+ return nil, lostLeadership
+ }
+ return resp, nil
+}
+
+// rpcError attempts to convert a given error to a high-level error that can be
+// directly exposed to RPC clients. If false is returned, the error was not
+// converted and is returned verbatim.
+func rpcError(err error) (error, bool) {
+ if errors.Is(err, lostLeadership) {
+ return status.Error(codes.Unavailable, "lost leadership"), true
+ }
+ if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
+ return status.Errorf(codes.Unavailable, "%v", err), true
+ }
+ return err, false
}
func (l *curatorLeader) Watch(req *apb.WatchRequest, srv apb.Curator_WatchServer) error {
- return status.Error(codes.Unimplemented, "curator leader not implemented")
+ nic, ok := req.Kind.(*apb.WatchRequest_NodeInCluster_)
+ if !ok {
+ return status.Error(codes.Unimplemented, "unsupported watch kind")
+ }
+ nodeID := nic.NodeInCluster.NodeId
+ // Constructing arbitrary etcd path: this is okay, as we only have node objects
+ // underneath the NodeEtcdPrefix. Worst case an attacker can do is request a node
+ // that doesn't exist, and that will just hang . All access is privileged, so
+ // there's also no need to filter anything.
+ // TODO(q3k): formalize and strongly type etcd paths for cluster state?
+ // Probably worth waiting for type parameters before attempting to do that.
+ nodePath := nodeEtcdPath(nodeID)
+
+ value := etcd.NewValue(l.etcd, nodePath, func(data []byte) (interface{}, error) {
+ return nodeUnmarshal(data)
+ })
+ w := value.Watch()
+ defer w.Close()
+
+ for {
+ v, err := w.Get(srv.Context())
+ if err != nil {
+ if rpcErr, ok := rpcError(err); ok {
+ return rpcErr
+ }
+ }
+ node := v.(*Node)
+ ev := &apb.WatchEvent{
+ Nodes: []*apb.Node{
+ {
+ Id: node.ID(),
+ Roles: node.proto().Roles,
+ },
+ },
+ }
+ if err := srv.Send(ev); err != nil {
+ return err
+ }
+ }
}
diff --git a/metropolis/node/core/curator/listener.go b/metropolis/node/core/curator/listener.go
index 29b1167..ae158b9 100644
--- a/metropolis/node/core/curator/listener.go
+++ b/metropolis/node/core/curator/listener.go
@@ -9,6 +9,7 @@
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
+
"source.monogon.dev/metropolis/node/core/consensus/client"
apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
"source.monogon.dev/metropolis/node/core/localstorage"
diff --git a/metropolis/node/core/curator/proto/private/BUILD.bazel b/metropolis/node/core/curator/proto/private/BUILD.bazel
index d926d56..e3c032d 100644
--- a/metropolis/node/core/curator/proto/private/BUILD.bazel
+++ b/metropolis/node/core/curator/proto/private/BUILD.bazel
@@ -4,8 +4,12 @@
proto_library(
name = "private_proto",
- srcs = ["lock.proto"],
+ srcs = [
+ "lock.proto",
+ "storage.proto",
+ ],
visibility = ["//visibility:public"],
+ deps = ["//metropolis/proto/common:common_proto"],
)
go_proto_library(
@@ -13,6 +17,7 @@
importpath = "source.monogon.dev/metropolis/node/core/curator/proto/private",
proto = ":private_proto",
visibility = ["//visibility:public"],
+ deps = ["//metropolis/proto/common:go_default_library"],
)
go_library(
diff --git a/metropolis/node/core/curator/proto/private/storage.proto b/metropolis/node/core/curator/proto/private/storage.proto
new file mode 100644
index 0000000..5ad01c4
--- /dev/null
+++ b/metropolis/node/core/curator/proto/private/storage.proto
@@ -0,0 +1,24 @@
+syntax = "proto3";
+option go_package = "source.monogon.dev/metropolis/node/core/curator/proto/private";
+package metropolis.node.core.curator.proto.private;
+
+import "metropolis/proto/common/common.proto";
+
+// Node describes a single node's state in etcd. This is only ever visible to
+// the curator, and fully managed by the curator.
+message Node {
+ // The node's public key.
+ bytes public_key = 1;
+ // Node's individual cluster part of the data partition encryption key. It
+ // is combined with the Node Unlock Key (NUK) kept within
+ // SealedConfiguration.
+ bytes cluster_unlock_key = 2;
+
+ // The node's state, as seen by the cluster. This state is persisted and
+ // represents the progress the node is making through registering into the
+ // cluster or joining the cluster.
+ metropolis.proto.common.NodeState fsm_state = 3;
+
+ // The node's intended roles when running.
+ metropolis.proto.common.NodeRoles roles = 4;
+}
diff --git a/metropolis/node/core/curator/state_node.go b/metropolis/node/core/curator/state_node.go
new file mode 100644
index 0000000..6668b19
--- /dev/null
+++ b/metropolis/node/core/curator/state_node.go
@@ -0,0 +1,201 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package curator
+
+import (
+ "context"
+ "encoding/hex"
+ "fmt"
+ "net"
+ "strings"
+
+ "golang.org/x/sys/unix"
+ "google.golang.org/protobuf/proto"
+
+ ppb "source.monogon.dev/metropolis/node/core/curator/proto/private"
+ "source.monogon.dev/metropolis/node/core/localstorage"
+ "source.monogon.dev/metropolis/pkg/supervisor"
+ cpb "source.monogon.dev/metropolis/proto/common"
+)
+
+// Node is a Metropolis cluster member. A node is a virtual or physical machine
+// running Metropolis. This object represents a node only as part of a cluster.
+// A machine running Metropolis that is not yet (attempting to be) part of a
+// cluster is not considered a Node.
+//
+// This object is used internally within the curator code. Curator clients do
+// not have access to this object and instead rely on protobuf representations
+// of objects from the Curator gRPC API. An exception is the cluster bootstrap
+// code which needs to bring up a new curator from scratch alongside the rest of
+// the cluster.
+type Node struct {
+ // clusterUnlockKey is half of the unlock key required to mount the node's
+ // data partition. It's stored in etcd, and will only be provided to the
+ // Node if it can prove its identity via an integrity mechanism (ie. via
+ // TPM), or when the Node was just created (as the key is generated locally
+ // by localstorage on first format/mount).
+ //
+ // The other part of the unlock key is the LocalUnlockKey that's present on the
+ // node's ESP partition.
+ clusterUnlockKey []byte
+
+ // pubkey is the ED25519 public key corresponding to the node's private key
+ // which it stores on its local data partition. The private part of the key
+ // never leaves the node.
+ //
+ // The public key is used to generate the Node's canonical ID.
+ pubkey []byte
+
+ // state is the state of this node as seen from the point of view of the
+ // cluster. See //metropolis/proto:common.proto for more information.
+ state cpb.NodeState
+
+ // A Node can have multiple Roles. Each Role is represented by the presence
+ // of NodeRole* structures in this structure, with a nil pointer
+ // representing the lack of a role.
+
+ // kubernetesWorker is set if this node is a Kubernetes worker, ie. running the
+ // Kubernetes control plan and workload elements.
+ // In the future, this will be split into a separate worker and control plane
+ // role.
+ kubernetesWorker *NodeRoleKubernetesWorker
+}
+
+// NewNodeForBootstrap creates a brand new node without regard for any other
+// cluster state.
+//
+// This can only be used by the cluster bootstrap logic.
+func NewNodeForBootstrap(cuk, pubkey []byte) Node {
+ return Node{
+ clusterUnlockKey: cuk,
+ pubkey: pubkey,
+ state: cpb.NodeState_NODE_STATE_UP,
+ // TODO(q3k): make this configurable.
+ kubernetesWorker: &NodeRoleKubernetesWorker{},
+ }
+}
+
+// NodeRoleKubernetesWorker defines that the Node should be running the
+// Kubernetes control and data plane.
+type NodeRoleKubernetesWorker struct {
+}
+
+// nodeIDBare returns the `{pubkeyHash}` part of the node ID.
+func nodeIDBare(pub []byte) string {
+ return hex.EncodeToString(pub[:16])
+}
+
+// NodeID returns the name of this node, which is `metropolis-{pubkeyHash}`.
+// This name should be the primary way to refer to Metropoils nodes within a
+// cluster, and is guaranteed to be unique by relying on cryptographic
+// randomness.
+func NodeID(pub []byte) string {
+ return fmt.Sprintf("metropolis-%s", nodeIDBare(pub))
+}
+
+// ID returns the name of this node. See NodeID for more information.
+func (n *Node) ID() string {
+ return NodeID(n.pubkey)
+}
+
+func (n *Node) String() string {
+ return n.ID()
+}
+
+// KubernetesWorker returns a copy of the NodeRoleKubernetesWorker struct if
+// the Node is a kubernetes worker, otherwise nil.
+func (n *Node) KubernetesWorker() *NodeRoleKubernetesWorker {
+ if n.kubernetesWorker == nil {
+ return nil
+ }
+ kw := *n.kubernetesWorker
+ return &kw
+}
+
+const (
+ nodeEtcdPrefix = "/nodes/"
+)
+
+func nodeEtcdPath(id string) string {
+ return fmt.Sprintf("%s%s", nodeEtcdPrefix, id)
+}
+
+// etcdPath builds the etcd path in which this node's protobuf-serialized state
+// is stored in etcd.
+func (n *Node) etcdPath() string {
+ return nodeEtcdPath(n.ID())
+}
+
+// proto serializes the Node object into protobuf, to be used for saving to
+// etcd.
+func (n *Node) proto() *ppb.Node {
+ msg := &ppb.Node{
+ ClusterUnlockKey: n.clusterUnlockKey,
+ PublicKey: n.pubkey,
+ FsmState: n.state,
+ Roles: &cpb.NodeRoles{},
+ }
+ if n.kubernetesWorker != nil {
+ msg.Roles.KubernetesWorker = &cpb.NodeRoles_KubernetesWorker{}
+ }
+ return msg
+}
+
+func nodeUnmarshal(data []byte) (*Node, error) {
+ msg := ppb.Node{}
+ if err := proto.Unmarshal(data, &msg); err != nil {
+ return nil, fmt.Errorf("could not unmarshal proto: %w", err)
+ }
+ n := &Node{
+ clusterUnlockKey: msg.ClusterUnlockKey,
+ pubkey: msg.PublicKey,
+ state: msg.FsmState,
+ }
+ if msg.Roles.KubernetesWorker != nil {
+ n.kubernetesWorker = &NodeRoleKubernetesWorker{}
+ }
+ return n, nil
+}
+
+// ConfigureLocalHostname uses the node's ID as a hostname, and sets the
+// current hostname, and local files like hosts and machine-id accordingly.
+//
+// TODO(q3k): move this to roleserver?
+func (n *Node) ConfigureLocalHostname(ctx context.Context, ephemeral *localstorage.EphemeralDirectory, address net.IP) error {
+ if err := unix.Sethostname([]byte(n.ID())); err != nil {
+ return fmt.Errorf("failed to set runtime hostname: %w", err)
+ }
+ hosts := []string{
+ "127.0.0.1 localhost",
+ "::1 localhost",
+ fmt.Sprintf("%s %s", address.String(), n.ID()),
+ }
+ if err := ephemeral.Hosts.Write([]byte(strings.Join(hosts, "\n")), 0644); err != nil {
+ return fmt.Errorf("failed to write /ephemeral/hosts: %w", err)
+ }
+ if err := ephemeral.MachineID.Write([]byte(nodeIDBare(n.pubkey)), 0644); err != nil {
+ return fmt.Errorf("failed to write /ephemeral/machine-id: %w", err)
+ }
+
+ // Check that we are self-resolvable.
+ ip, err := net.ResolveIPAddr("ip", n.ID())
+ if err != nil {
+ return fmt.Errorf("failed to self-resolve: %w", err)
+ }
+ supervisor.Logger(ctx).Infof("This is node %s at %v", n.ID(), ip)
+ return nil
+}
diff --git a/metropolis/node/core/curator/state_pki.go b/metropolis/node/core/curator/state_pki.go
new file mode 100644
index 0000000..ce28aac
--- /dev/null
+++ b/metropolis/node/core/curator/state_pki.go
@@ -0,0 +1,63 @@
+package curator
+
+import (
+ "fmt"
+
+ "source.monogon.dev/metropolis/node/core/localstorage"
+ "source.monogon.dev/metropolis/pkg/pki"
+)
+
+var (
+ // pkiNamespace is the etcd/pki namespace in which the Metropolis cluster CA
+ // data will live.
+ pkiNamespace = pki.Namespaced("/cluster-pki/")
+ // pkiCA is the main cluster CA, stored in etcd. It is used to emit cluster,
+ // node and user certificates.
+ pkiCA = pkiNamespace.New(pki.SelfSigned, "cluster-ca", pki.CA("Metropolis Cluster CA"))
+)
+
+// NodeCredentials are the public and private part of the credentials of a node.
+//
+// It represents all the data necessary for a node to authenticate over mTLS to
+// other nodes and the rest of the cluster.
+//
+// It must never be made available to any node other than the node it has been
+// emitted for.
+type NodeCredentials struct {
+ NodeCertificate
+ // PrivateKey is the ED25519 private key of the node, corresponding to
+ // NodeCertificate.PublicKey.
+ PrivateKey []byte
+}
+
+// NodeCertificate is the public part of the credential of a node.
+type NodeCertificate struct {
+ // PublicKey is the ED25519 public key of the node.
+ PublicKey []byte
+ // Certificate is the DER-encoded TLS certificate emitted for the node (ie.
+ // PublicKey) by the cluster CA.
+ Certificate []byte
+ // CACertificate is the DER-encoded TLS certificate of the cluster CA at time of
+ // emitting the Certificate.
+ CACertificate []byte
+}
+
+// ID returns the Node ID of the node for which this NodeCertificate was
+// emitted.
+func (c *NodeCertificate) ID() string {
+ return NodeID(c.PublicKey)
+}
+
+// Save stores the given node credentials in local storage.
+func (c *NodeCredentials) Save(d *localstorage.PKIDirectory) error {
+ if err := d.CACertificate.Write(c.CACertificate, 0400); err != nil {
+ return fmt.Errorf("when writing CA certificate: %w", err)
+ }
+ if err := d.Certificate.Write(c.Certificate, 0400); err != nil {
+ return fmt.Errorf("when writing node certificate: %w", err)
+ }
+ if err := d.Key.Write(c.PrivateKey, 0400); err != nil {
+ return fmt.Errorf("when writing node private key: %w", err)
+ }
+ return nil
+}