m/n/core/curator: implement leader and Node/PKI state

This implements actual Curator logic for nodes and PKI. These will
replace the cluster manager's equivalent logic.

There are two entry points to this logic:

 - the gRPC service's Watch method for accessing node status
 - bootstrap logic to create a node when the cluster manager bootstraps
   the cluster.

Test plan: a followup CR will introduce tests for the Curator - more
granular than the full E2E suite. DO NOT MERGE UNTIL THEN, as this is
critical code.

Change-Id: I8c40a821b846012b90cf9a5df27901d1b49f388c
Reviewed-on: https://review.monogon.dev/c/monogon/+/186
Reviewed-by: Lorenz Brun <lorenz@nexantic.com>
diff --git a/metropolis/node/core/curator/BUILD.bazel b/metropolis/node/core/curator/BUILD.bazel
index f52233d..aa9eb95 100644
--- a/metropolis/node/core/curator/BUILD.bazel
+++ b/metropolis/node/core/curator/BUILD.bazel
@@ -3,10 +3,13 @@
 go_library(
     name = "go_default_library",
     srcs = [
+        "bootstrap.go",
         "curator.go",
         "impl_follower.go",
         "impl_leader.go",
         "listener.go",
+        "state_node.go",
+        "state_pki.go",
     ],
     importpath = "source.monogon.dev/metropolis/node/core/curator",
     visibility = ["//visibility:public"],
@@ -17,13 +20,18 @@
         "//metropolis/node/core/localstorage:go_default_library",
         "//metropolis/pkg/combinectx:go_default_library",
         "//metropolis/pkg/event:go_default_library",
+        "//metropolis/pkg/event/etcd:go_default_library",
         "//metropolis/pkg/event/memory:go_default_library",
+        "//metropolis/pkg/pki:go_default_library",
         "//metropolis/pkg/supervisor:go_default_library",
+        "//metropolis/proto/common:go_default_library",
+        "@io_etcd_go_etcd//clientv3:go_default_library",
         "@io_etcd_go_etcd//clientv3/concurrency:go_default_library",
         "@org_golang_google_grpc//:go_default_library",
         "@org_golang_google_grpc//codes:go_default_library",
         "@org_golang_google_grpc//status:go_default_library",
         "@org_golang_google_protobuf//proto:go_default_library",
+        "@org_golang_x_sys//unix:go_default_library",
     ],
 )
 
diff --git a/metropolis/node/core/curator/bootstrap.go b/metropolis/node/core/curator/bootstrap.go
new file mode 100644
index 0000000..253ba72
--- /dev/null
+++ b/metropolis/node/core/curator/bootstrap.go
@@ -0,0 +1,81 @@
+package curator
+
+import (
+	"context"
+	"fmt"
+
+	"go.etcd.io/etcd/clientv3"
+	"google.golang.org/protobuf/proto"
+
+	"source.monogon.dev/metropolis/node/core/consensus/client"
+	"source.monogon.dev/metropolis/pkg/pki"
+)
+
+// bootstrap.go contains functions specific for integration between the curator
+// and cluster bootstrap code (//metropolis/node/core/cluster).
+//
+// These functions must only be called by the bootstrap code, and are
+// effectively well-controlled abstraction leaks. An alternative would be to
+// rework the curator API to explicitly support a well-contained and
+// well-defined bootstrap procedure, formalized within bootstrap-specific types.
+// However, that seems to not be worth the effort for a tightly coupled single
+// consumer like the bootstrap code.
+
+// BootstrapNodeCredentials creates node credentials for the first node in a
+// cluster. It can only be called by cluster bootstrap code.
+//
+// TODO(q3k): don't require privkey, but that needs some //metropolis/pkg/pki changes first.
+func BootstrapNodeCredentials(ctx context.Context, etcd client.Namespaced, priv, pub []byte) (*NodeCredentials, error) {
+	id := NodeID(pub)
+
+	caCertBytes, _, err := pkiCA.Ensure(ctx, etcd)
+	if err != nil {
+		return nil, fmt.Errorf("when ensuring CA: %w", err)
+	}
+	nodeCert := pkiNamespace.New(pkiCA, "", pki.Server([]string{id}, nil))
+	nodeCert.UseExistingKey(priv)
+	nodeCertBytes, _, err := nodeCert.Ensure(ctx, etcd)
+	if err != nil {
+		return nil, fmt.Errorf("when ensuring node cert: %w", err)
+	}
+
+	return &NodeCredentials{
+		NodeCertificate: NodeCertificate{
+			PublicKey:     pub,
+			Certificate:   nodeCertBytes,
+			CACertificate: caCertBytes,
+		},
+		PrivateKey: priv,
+	}, nil
+}
+
+// BootstrapStore saves the Node into etcd, without regard for any other cluster
+// state and directly using a given etcd client.
+//
+// This can only be used by the cluster bootstrap logic.
+func (n *Node) BootstrapStore(ctx context.Context, etcd client.Namespaced) error {
+	// Currently the only flow to store a node to etcd is a write-once flow:
+	// once a node is created, it cannot be deleted or updated. In the future,
+	// flows to change cluster node roles might be introduced (ie. to promote
+	// nodes to consensus members, etc).
+	key := n.etcdPath()
+	msg := n.proto()
+	nodeRaw, err := proto.Marshal(msg)
+	if err != nil {
+		return fmt.Errorf("failed to marshal node: %w", err)
+	}
+
+	res, err := etcd.Txn(ctx).If(
+		clientv3.Compare(clientv3.CreateRevision(key), "=", 0),
+	).Then(
+		clientv3.OpPut(key, string(nodeRaw)),
+	).Commit()
+	if err != nil {
+		return fmt.Errorf("failed to store node: %w", err)
+	}
+
+	if !res.Succeeded {
+		return fmt.Errorf("attempted to re-register node (unsupported flow)")
+	}
+	return nil
+}
diff --git a/metropolis/node/core/curator/impl_leader.go b/metropolis/node/core/curator/impl_leader.go
index 2f19fd5..885dee0 100644
--- a/metropolis/node/core/curator/impl_leader.go
+++ b/metropolis/node/core/curator/impl_leader.go
@@ -1,19 +1,109 @@
 package curator
 
 import (
+	"context"
+	"errors"
+	"fmt"
+
+	"go.etcd.io/etcd/clientv3"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
 
 	"source.monogon.dev/metropolis/node/core/consensus/client"
 	apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+	"source.monogon.dev/metropolis/pkg/event/etcd"
 )
 
+// curatorLeader implements the curator acting as the elected leader of a
+// cluster. It performs direct reads/writes from/to etcd as long as it remains
+// leader.
+//
+// It effectively implements all the core management logic of a Metropolis
+// cluster.
 type curatorLeader struct {
+	// lockKey is the etcd key which backs this leader-elected instance.
 	lockKey string
+	// lockRev is the revision at which lockKey was created. The leader will use it
+	// in combination with lockKey to ensure all mutations/reads performed to etcd
+	// succeed only if this leader election is still current.
 	lockRev int64
-	etcd    client.Namespaced
+	// etcd is the etcd client in which curator data and leader election state is
+	// stored.
+	etcd client.Namespaced
+}
+
+var (
+	// lostLeadership is returned by txnAsLeader if the transaction got canceled
+	// because leadership was lost.
+	lostLeadership = errors.New("lost leadership")
+)
+
+// txnAsLeader performs an etcd transaction guarded by continued leadership.
+// lostLeadership will be returned as an error in case the leadership is lost.
+func (c *curatorLeader) txnAsLeader(ctx context.Context, ops ...clientv3.Op) (*clientv3.TxnResponse, error) {
+	resp, err := c.etcd.Txn(ctx).If(
+		clientv3.Compare(clientv3.CreateRevision(c.lockKey), "=", c.lockRev),
+	).Then(ops...).Commit()
+	if err != nil {
+		return nil, fmt.Errorf("when running leader transaction: %w", err)
+	}
+	if !resp.Succeeded {
+		return nil, lostLeadership
+	}
+	return resp, nil
+}
+
+// rpcError attempts to convert a given error to a high-level error that can be
+// directly exposed to RPC clients. If false is returned, the error was not
+// converted and is returned verbatim.
+func rpcError(err error) (error, bool) {
+	if errors.Is(err, lostLeadership) {
+		return status.Error(codes.Unavailable, "lost leadership"), true
+	}
+	if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) {
+		return status.Errorf(codes.Unavailable, "%v", err), true
+	}
+	return err, false
 }
 
 func (l *curatorLeader) Watch(req *apb.WatchRequest, srv apb.Curator_WatchServer) error {
-	return status.Error(codes.Unimplemented, "curator leader not implemented")
+	nic, ok := req.Kind.(*apb.WatchRequest_NodeInCluster_)
+	if !ok {
+		return status.Error(codes.Unimplemented, "unsupported watch kind")
+	}
+	nodeID := nic.NodeInCluster.NodeId
+	// Constructing arbitrary etcd path: this is okay, as we only have node objects
+	// underneath the NodeEtcdPrefix. Worst case an attacker can do is request a node
+	// that doesn't exist, and that will just hang . All access is privileged, so
+	// there's also no need to filter anything.
+	// TODO(q3k): formalize and strongly type etcd paths for cluster state?
+	// Probably worth waiting for type parameters before attempting to do that.
+	nodePath := nodeEtcdPath(nodeID)
+
+	value := etcd.NewValue(l.etcd, nodePath, func(data []byte) (interface{}, error) {
+		return nodeUnmarshal(data)
+	})
+	w := value.Watch()
+	defer w.Close()
+
+	for {
+		v, err := w.Get(srv.Context())
+		if err != nil {
+			if rpcErr, ok := rpcError(err); ok {
+				return rpcErr
+			}
+		}
+		node := v.(*Node)
+		ev := &apb.WatchEvent{
+			Nodes: []*apb.Node{
+				{
+					Id:    node.ID(),
+					Roles: node.proto().Roles,
+				},
+			},
+		}
+		if err := srv.Send(ev); err != nil {
+			return err
+		}
+	}
 }
diff --git a/metropolis/node/core/curator/listener.go b/metropolis/node/core/curator/listener.go
index 29b1167..ae158b9 100644
--- a/metropolis/node/core/curator/listener.go
+++ b/metropolis/node/core/curator/listener.go
@@ -9,6 +9,7 @@
 	"google.golang.org/grpc"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
+
 	"source.monogon.dev/metropolis/node/core/consensus/client"
 	apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
 	"source.monogon.dev/metropolis/node/core/localstorage"
diff --git a/metropolis/node/core/curator/proto/private/BUILD.bazel b/metropolis/node/core/curator/proto/private/BUILD.bazel
index d926d56..e3c032d 100644
--- a/metropolis/node/core/curator/proto/private/BUILD.bazel
+++ b/metropolis/node/core/curator/proto/private/BUILD.bazel
@@ -4,8 +4,12 @@
 
 proto_library(
     name = "private_proto",
-    srcs = ["lock.proto"],
+    srcs = [
+        "lock.proto",
+        "storage.proto",
+    ],
     visibility = ["//visibility:public"],
+    deps = ["//metropolis/proto/common:common_proto"],
 )
 
 go_proto_library(
@@ -13,6 +17,7 @@
     importpath = "source.monogon.dev/metropolis/node/core/curator/proto/private",
     proto = ":private_proto",
     visibility = ["//visibility:public"],
+    deps = ["//metropolis/proto/common:go_default_library"],
 )
 
 go_library(
diff --git a/metropolis/node/core/curator/proto/private/storage.proto b/metropolis/node/core/curator/proto/private/storage.proto
new file mode 100644
index 0000000..5ad01c4
--- /dev/null
+++ b/metropolis/node/core/curator/proto/private/storage.proto
@@ -0,0 +1,24 @@
+syntax = "proto3";
+option go_package = "source.monogon.dev/metropolis/node/core/curator/proto/private";
+package metropolis.node.core.curator.proto.private;
+
+import "metropolis/proto/common/common.proto";
+
+// Node describes a single node's state in etcd. This is only ever visible to
+// the curator, and fully managed by the curator.
+message Node {
+    // The node's public key.
+    bytes public_key = 1;
+    // Node's individual cluster part of the data partition encryption key. It
+    // is combined with the Node Unlock Key (NUK) kept within
+    // SealedConfiguration.
+    bytes cluster_unlock_key = 2;
+
+    // The node's state, as seen by the cluster. This state is persisted and
+    // represents the progress the node is making through registering into the
+    // cluster or joining the cluster.
+    metropolis.proto.common.NodeState fsm_state = 3;
+
+    // The node's intended roles when running.
+    metropolis.proto.common.NodeRoles roles = 4;
+}
diff --git a/metropolis/node/core/curator/state_node.go b/metropolis/node/core/curator/state_node.go
new file mode 100644
index 0000000..6668b19
--- /dev/null
+++ b/metropolis/node/core/curator/state_node.go
@@ -0,0 +1,201 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package curator
+
+import (
+	"context"
+	"encoding/hex"
+	"fmt"
+	"net"
+	"strings"
+
+	"golang.org/x/sys/unix"
+	"google.golang.org/protobuf/proto"
+
+	ppb "source.monogon.dev/metropolis/node/core/curator/proto/private"
+	"source.monogon.dev/metropolis/node/core/localstorage"
+	"source.monogon.dev/metropolis/pkg/supervisor"
+	cpb "source.monogon.dev/metropolis/proto/common"
+)
+
+// Node is a Metropolis cluster member. A node is a virtual or physical machine
+// running Metropolis. This object represents a node only as part of a cluster.
+// A machine running Metropolis that is not yet (attempting to be) part of a
+// cluster is not considered a Node.
+//
+// This object is used internally within the curator code. Curator clients do
+// not have access to this object and instead rely on protobuf representations
+// of objects from the Curator gRPC API. An exception is the cluster bootstrap
+// code which needs to bring up a new curator from scratch alongside the rest of
+// the cluster.
+type Node struct {
+	// clusterUnlockKey is half of the unlock key required to mount the node's
+	// data partition. It's stored in etcd, and will only be provided to the
+	// Node if it can prove its identity via an integrity mechanism (ie. via
+	// TPM), or when the Node was just created (as the key is generated locally
+	// by localstorage on first format/mount).
+	//
+	// The other part of the unlock key is the LocalUnlockKey that's present on the
+	// node's ESP partition.
+	clusterUnlockKey []byte
+
+	// pubkey is the ED25519 public key corresponding to the node's private key
+	// which it stores on its local data partition. The private part of the key
+	// never leaves the node.
+	//
+	// The public key is used to generate the Node's canonical ID.
+	pubkey []byte
+
+	// state is the state of this node as seen from the point of view of the
+	// cluster. See //metropolis/proto:common.proto for more information.
+	state cpb.NodeState
+
+	// A Node can have multiple Roles. Each Role is represented by the presence
+	// of NodeRole* structures in this structure, with a nil pointer
+	// representing the lack of a role.
+
+	// kubernetesWorker is set if this node is a Kubernetes worker, ie. running the
+	// Kubernetes control plan and workload elements.
+	// In the future, this will be split into a separate worker and control plane
+	// role.
+	kubernetesWorker *NodeRoleKubernetesWorker
+}
+
+// NewNodeForBootstrap creates a brand new node without regard for any other
+// cluster state.
+//
+// This can only be used by the cluster bootstrap logic.
+func NewNodeForBootstrap(cuk, pubkey []byte) Node {
+	return Node{
+		clusterUnlockKey: cuk,
+		pubkey:           pubkey,
+		state:            cpb.NodeState_NODE_STATE_UP,
+		// TODO(q3k): make this configurable.
+		kubernetesWorker: &NodeRoleKubernetesWorker{},
+	}
+}
+
+// NodeRoleKubernetesWorker defines that the Node should be running the
+// Kubernetes control and data plane.
+type NodeRoleKubernetesWorker struct {
+}
+
+// nodeIDBare returns the `{pubkeyHash}` part of the node ID.
+func nodeIDBare(pub []byte) string {
+	return hex.EncodeToString(pub[:16])
+}
+
+// NodeID returns the name of this node, which is `metropolis-{pubkeyHash}`.
+// This name should be the primary way to refer to Metropoils nodes within a
+// cluster, and is guaranteed to be unique by relying on cryptographic
+// randomness.
+func NodeID(pub []byte) string {
+	return fmt.Sprintf("metropolis-%s", nodeIDBare(pub))
+}
+
+// ID returns the name of this node. See NodeID for more information.
+func (n *Node) ID() string {
+	return NodeID(n.pubkey)
+}
+
+func (n *Node) String() string {
+	return n.ID()
+}
+
+// KubernetesWorker returns a copy of the NodeRoleKubernetesWorker struct if
+// the Node is a kubernetes worker, otherwise nil.
+func (n *Node) KubernetesWorker() *NodeRoleKubernetesWorker {
+	if n.kubernetesWorker == nil {
+		return nil
+	}
+	kw := *n.kubernetesWorker
+	return &kw
+}
+
+const (
+	nodeEtcdPrefix = "/nodes/"
+)
+
+func nodeEtcdPath(id string) string {
+	return fmt.Sprintf("%s%s", nodeEtcdPrefix, id)
+}
+
+// etcdPath builds the etcd path in which this node's protobuf-serialized state
+// is stored in etcd.
+func (n *Node) etcdPath() string {
+	return nodeEtcdPath(n.ID())
+}
+
+// proto serializes the Node object into protobuf, to be used for saving to
+// etcd.
+func (n *Node) proto() *ppb.Node {
+	msg := &ppb.Node{
+		ClusterUnlockKey: n.clusterUnlockKey,
+		PublicKey:        n.pubkey,
+		FsmState:         n.state,
+		Roles:            &cpb.NodeRoles{},
+	}
+	if n.kubernetesWorker != nil {
+		msg.Roles.KubernetesWorker = &cpb.NodeRoles_KubernetesWorker{}
+	}
+	return msg
+}
+
+func nodeUnmarshal(data []byte) (*Node, error) {
+	msg := ppb.Node{}
+	if err := proto.Unmarshal(data, &msg); err != nil {
+		return nil, fmt.Errorf("could not unmarshal proto: %w", err)
+	}
+	n := &Node{
+		clusterUnlockKey: msg.ClusterUnlockKey,
+		pubkey:           msg.PublicKey,
+		state:            msg.FsmState,
+	}
+	if msg.Roles.KubernetesWorker != nil {
+		n.kubernetesWorker = &NodeRoleKubernetesWorker{}
+	}
+	return n, nil
+}
+
+// ConfigureLocalHostname uses the node's ID as a hostname, and sets the
+// current hostname, and local files like hosts and machine-id accordingly.
+//
+// TODO(q3k): move this to roleserver?
+func (n *Node) ConfigureLocalHostname(ctx context.Context, ephemeral *localstorage.EphemeralDirectory, address net.IP) error {
+	if err := unix.Sethostname([]byte(n.ID())); err != nil {
+		return fmt.Errorf("failed to set runtime hostname: %w", err)
+	}
+	hosts := []string{
+		"127.0.0.1 localhost",
+		"::1 localhost",
+		fmt.Sprintf("%s %s", address.String(), n.ID()),
+	}
+	if err := ephemeral.Hosts.Write([]byte(strings.Join(hosts, "\n")), 0644); err != nil {
+		return fmt.Errorf("failed to write /ephemeral/hosts: %w", err)
+	}
+	if err := ephemeral.MachineID.Write([]byte(nodeIDBare(n.pubkey)), 0644); err != nil {
+		return fmt.Errorf("failed to write /ephemeral/machine-id: %w", err)
+	}
+
+	// Check that we are self-resolvable.
+	ip, err := net.ResolveIPAddr("ip", n.ID())
+	if err != nil {
+		return fmt.Errorf("failed to self-resolve: %w", err)
+	}
+	supervisor.Logger(ctx).Infof("This is node %s at %v", n.ID(), ip)
+	return nil
+}
diff --git a/metropolis/node/core/curator/state_pki.go b/metropolis/node/core/curator/state_pki.go
new file mode 100644
index 0000000..ce28aac
--- /dev/null
+++ b/metropolis/node/core/curator/state_pki.go
@@ -0,0 +1,63 @@
+package curator
+
+import (
+	"fmt"
+
+	"source.monogon.dev/metropolis/node/core/localstorage"
+	"source.monogon.dev/metropolis/pkg/pki"
+)
+
+var (
+	// pkiNamespace is the etcd/pki namespace in which the Metropolis cluster CA
+	// data will live.
+	pkiNamespace = pki.Namespaced("/cluster-pki/")
+	// pkiCA is the main cluster CA, stored in etcd. It is used to emit cluster,
+	// node and user certificates.
+	pkiCA = pkiNamespace.New(pki.SelfSigned, "cluster-ca", pki.CA("Metropolis Cluster CA"))
+)
+
+// NodeCredentials are the public and private part of the credentials of a node.
+//
+// It represents all the data necessary for a node to authenticate over mTLS to
+// other nodes and the rest of the cluster.
+//
+// It must never be made available to any node other than the node it has been
+// emitted for.
+type NodeCredentials struct {
+	NodeCertificate
+	// PrivateKey is the ED25519 private key of the node, corresponding to
+	// NodeCertificate.PublicKey.
+	PrivateKey []byte
+}
+
+// NodeCertificate is the public part of the credential of a node.
+type NodeCertificate struct {
+	// PublicKey is the ED25519 public key of the node.
+	PublicKey []byte
+	// Certificate is the DER-encoded TLS certificate emitted for the node (ie.
+	// PublicKey) by the cluster CA.
+	Certificate []byte
+	// CACertificate is the DER-encoded TLS certificate of the cluster CA at time of
+	// emitting the Certificate.
+	CACertificate []byte
+}
+
+// ID returns the Node ID of the node for which this NodeCertificate was
+// emitted.
+func (c *NodeCertificate) ID() string {
+	return NodeID(c.PublicKey)
+}
+
+// Save stores the given node credentials in local storage.
+func (c *NodeCredentials) Save(d *localstorage.PKIDirectory) error {
+	if err := d.CACertificate.Write(c.CACertificate, 0400); err != nil {
+		return fmt.Errorf("when writing CA certificate: %w", err)
+	}
+	if err := d.Certificate.Write(c.Certificate, 0400); err != nil {
+		return fmt.Errorf("when writing node certificate: %w", err)
+	}
+	if err := d.Key.Write(c.PrivateKey, 0400); err != nil {
+		return fmt.Errorf("when writing node private key: %w", err)
+	}
+	return nil
+}