m/n/roleserve: reactive service management
Bottom line up first: this starts etcd, the curator and Kubernetes on
nodes that register into the cluster. Effectively, this is multi-node
support.
This significantly refactors the node roleserver to start both the
control plane and Kubernetes on demand, based on roles assigned by the
cluster (or due to bootstrapping a new cluster). Most importantly, we
pretty much remove all cluster-bootstrapping code from the node startup
process, thereby making the first node and any subsequent nodes not go
through different codepaths.
In addition, access to the cluster Curators is now also mediated via
the roleserver, which is the component aware whether the node code
should connect to the local curator (if the control plane is running) or
to remote curators (if the control plane is not [yet] running).
This implementation is a bit verbose as we make heavy use of untyped
Event Values, and we add quite a few lines repeated of code to combine
data from different values into something that a goroutine can wait on.
Once Go 1.18 lands we should be able to make this code much nicer.
There's still a few things that need to be implemented for all flows to
be working fully (notably, we can end up with stale curator clients,
curator clients are not load balanced across multiple curators, and
cluster directories for connecting to the curator do not get updated
after startup). However, these are all features that we should be able
to easily implement once this lands.
Currently this is only covered by the e2e test. The individual workers
within roleserver should be able to be independently tested, and this is
something I plan on doing very soon as another change on top, while this
one is being reviewed.
With time, the two large startup components (the cluster "enrolment"
manager and the roleserver) have slightly lost their original purpose
and their names aren't exactly fitting anymore. I might rename them in
an upcoming change, if anyone has any good naming ideas I'm all ears :).
Change-Id: Iaf0fc9f6fdd2122e6aae19607be1648382063e66
Reviewed-on: https://review.monogon.dev/c/monogon/+/532
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/node/core/roleserve/BUILD.bazel b/metropolis/node/core/roleserve/BUILD.bazel
index 06e6e10..ec1d443 100644
--- a/metropolis/node/core/roleserve/BUILD.bazel
+++ b/metropolis/node/core/roleserve/BUILD.bazel
@@ -3,22 +3,33 @@
go_library(
name = "go_default_library",
srcs = [
- "cluster_agent.go",
- "kubernetes_worker.go",
"roleserve.go",
+ "value_bootstrapdata.go",
+ "value_clustermembership.go",
+ "value_kubernetes.go",
+ "value_node.go",
+ "worker_controlplane.go",
+ "worker_kubernetes.go",
+ "worker_rolefetch.go",
+ "worker_statuspush.go",
],
importpath = "source.monogon.dev/metropolis/node/core/roleserve",
visibility = ["//visibility:public"],
deps = [
+ "//metropolis/node:go_default_library",
+ "//metropolis/node/core/consensus:go_default_library",
+ "//metropolis/node/core/curator:go_default_library",
"//metropolis/node/core/curator/proto/api:go_default_library",
"//metropolis/node/core/identity:go_default_library",
"//metropolis/node/core/localstorage:go_default_library",
"//metropolis/node/core/network:go_default_library",
+ "//metropolis/node/core/rpc:go_default_library",
"//metropolis/node/kubernetes:go_default_library",
"//metropolis/node/kubernetes/containerd:go_default_library",
"//metropolis/node/kubernetes/pki:go_default_library",
"//metropolis/pkg/event:go_default_library",
"//metropolis/pkg/event/memory:go_default_library",
+ "//metropolis/pkg/pki:go_default_library",
"//metropolis/pkg/supervisor:go_default_library",
"//metropolis/proto/common:go_default_library",
"@org_golang_google_grpc//:go_default_library",
diff --git a/metropolis/node/core/roleserve/cluster_agent.go b/metropolis/node/core/roleserve/cluster_agent.go
deleted file mode 100644
index 842b841..0000000
--- a/metropolis/node/core/roleserve/cluster_agent.go
+++ /dev/null
@@ -1,48 +0,0 @@
-package roleserve
-
-import (
- "context"
- "fmt"
- "net"
-
- ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
- "source.monogon.dev/metropolis/pkg/supervisor"
- cpb "source.monogon.dev/metropolis/proto/common"
-)
-
-// runClusterAgent runs the ClusterAgent, a runnable responsible for reporting
-// the status of the local node to the cluster.
-//
-// This currently only reports the node's external address to the cluster
-// whenever it changes.
-func (s *Service) runClusterAgent(ctx context.Context) error {
- w := s.Network.Watch()
- defer w.Close()
-
- var external net.IP
-
- for {
- st, err := w.Get(ctx)
- if err != nil {
- return fmt.Errorf("getting network status failed: %w", err)
- }
-
- if external.Equal(st.ExternalAddress) {
- continue
- }
-
- external = st.ExternalAddress
- supervisor.Logger(ctx).Infof("New external address (%s), submitting update to cluster...", external.String())
-
- _, err = s.curator.UpdateNodeStatus(ctx, &ipb.UpdateNodeStatusRequest{
- NodeId: s.Node.ID(),
- Status: &cpb.NodeStatus{
- ExternalAddress: external.String(),
- },
- })
- if err != nil {
- return fmt.Errorf("UpdateNodeStatus failed: %w", err)
- }
- supervisor.Logger(ctx).Infof("Updated.")
- }
-}
diff --git a/metropolis/node/core/roleserve/kubernetes_worker.go b/metropolis/node/core/roleserve/kubernetes_worker.go
deleted file mode 100644
index 027e0ec..0000000
--- a/metropolis/node/core/roleserve/kubernetes_worker.go
+++ /dev/null
@@ -1,74 +0,0 @@
-package roleserve
-
-import (
- "context"
- "fmt"
- "net"
-
- cpb "source.monogon.dev/metropolis/node/core/curator/proto/api"
- "source.monogon.dev/metropolis/node/kubernetes"
- "source.monogon.dev/metropolis/node/kubernetes/containerd"
- "source.monogon.dev/metropolis/pkg/supervisor"
-)
-
-// runKubernetesWorkerLauncher runs a launcher responsible for maintaining the
-// main Kubernetes Worker service from //metropolis/node/kubernetes.
-//
-// TODO(q3k): make this generic as we have more workloads/launchers (and maybe
-// Go type parameters).
-func (s *Service) runKubernetesWorkerLauncher(ctx context.Context) error {
- var n *cpb.Node
- select {
- case <-ctx.Done():
- return ctx.Err()
- case n = <-s.kwC:
- }
-
- kw := n.Roles.KubernetesWorker
- if kw != nil {
- supervisor.Logger(ctx).Info("Node is a Kubernetes Worker, starting...")
- containerdSvc := &containerd.Service{
- EphemeralVolume: &s.StorageRoot.Ephemeral.Containerd,
- }
- if err := supervisor.Run(ctx, "containerd", containerdSvc.Run); err != nil {
- return fmt.Errorf("failed to start containerd service: %w", err)
- }
-
- kubeSvc := kubernetes.New(kubernetes.Config{
- // TODO(q3k): make this configurable.
- ServiceIPRange: net.IPNet{
- IP: net.IP{10, 0, 255, 1},
- // That's a /24.
- Mask: net.IPMask{0xff, 0xff, 0xff, 0x00},
- },
- ClusterNet: net.IPNet{
- IP: net.IP{10, 0, 0, 0},
- // That's a /16.
- Mask: net.IPMask{0xff, 0xff, 0x00, 0x00},
- },
- KPKI: s.KPKI,
- Root: s.StorageRoot,
- Network: s.Network,
- Node: s.Node,
- })
- if err := supervisor.Run(ctx, "run", kubeSvc.Run); err != nil {
- return fmt.Errorf("failed to start kubernetes service: %w", err)
- }
- s.kwSvcC <- kubeSvc
- }
- supervisor.Signal(ctx, supervisor.SignalHealthy)
-
- for {
- select {
- case <-ctx.Done():
- return ctx.Err()
- case n := <-s.kwC:
- if kw == nil && n.Roles.KubernetesWorker != nil {
- return fmt.Errorf("node is now a kubernetes worker, restarting...")
- }
- if kw != nil && n.Roles.KubernetesWorker == nil {
- return fmt.Errorf("node is not a kubernetes worker anymore, restarting...")
- }
- }
- }
-}
diff --git a/metropolis/node/core/roleserve/roleserve.go b/metropolis/node/core/roleserve/roleserve.go
index 557ed68..2c2e885 100644
--- a/metropolis/node/core/roleserve/roleserve.go
+++ b/metropolis/node/core/roleserve/roleserve.go
@@ -1,45 +1,59 @@
// package roleserve implements the roleserver/“Role Server”.
//
-// The Role Server is responsible for watching the cluster/node state and
-// starting/stopping any 'application' code (also called workload code) based on
-// the Node's roles.
+// The Role Server runs on every node and is responsible for running all of the
+// node's role dependant services, like the control plane (Consensus/etcd and
+// Curator) and Kubernetes. It watches the node roles as assigned by the
+// cluster's curator, updates the status of the node within the curator, and
+// spawns on-demand services.
//
-// Each workload code (which would usually be a supervisor runnable) is started
-// by a dedicated 'launcher'. These launchers wait for node roles to be
-// available from the curator, and either start the related workload sub-runners
-// or do nothing ; then they declare themselves as healthy to the supervisor. If
-// at any point the role of the node changes (meaning that the node should now
-// start or stop the workloads) the launcher just exits and the supervisor
-// will restart it.
//
-// Currently, this is used to start up the Kubernetes worker code.
+// .-----------. .--------. Watches .------------.
+// | Cluster |--------->| Role |<----------| Node Roles |
+// | Enrolment | Provides | Server | Updates '------------'
+// '-----------' Data | |----. .-------------.
+// '--------' '----->| Node Status |
+// Spawns | | Spawns '-------------'
+// .-----' '-----.
+// V V
+// .-----------. .------------.
+// | Consensus | | Kubernetes |
+// | & Curator | | |
+// '-----------' '------------'
+//
+// The internal state of the Role Server (eg. status of services, input from
+// Cluster Enrolment, current node roles as retrieved from the cluster) is
+// stored as in-memory Event Value variables, with some of them being exposed
+// externally for other services to consume (ie. ones that wish to depend on
+// some information managed by the Role Server but which do not need to be
+// spawned on demand by the Role Server). These Event Values and code which acts
+// upon them form a reactive/dataflow-driven model which drives the Role Server
+// logic forward.
+//
+// The Role Server also has to handle the complex bootstrap problem involved in
+// simultaneously accessing the control plane (for node roles and other cluster
+// data) while maintaining (possibly the only one in the cluster) control plane
+// instance. The state of of resolution of this bootstrap problem is maintained
+// within ClusterMembership, which contains critical information about the
+// control plane, like the information required to connect to a Curator (local
+// or remote). It is updated both by external processes (ie. data from the
+// Cluster Enrolment) as well as logic responsible for spawning the control
+// plane.
+//
package roleserve
import (
"context"
- "fmt"
+ "crypto/ed25519"
- "google.golang.org/grpc"
-
- cpb "source.monogon.dev/metropolis/node/core/curator/proto/api"
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/node/core/network"
- "source.monogon.dev/metropolis/node/kubernetes"
- "source.monogon.dev/metropolis/node/kubernetes/pki"
- "source.monogon.dev/metropolis/pkg/event"
- "source.monogon.dev/metropolis/pkg/event/memory"
"source.monogon.dev/metropolis/pkg/supervisor"
+ cpb "source.monogon.dev/metropolis/proto/common"
)
// Config is the configuration of the role server.
type Config struct {
- // CuratorDial is a function that the roleserver will use to dial the curator.
- // As both the curator listener and roleserver might restart, this dial function
- // is needed to possibly re-establish connectivity after a full restart of
- // either.
- CuratorDial func() (*grpc.ClientConn, error)
-
// StorageRoot is a handle to access all of the Node's storage. This is needed
// as the roleserver spawns complex workloads like Kubernetes which need access
// to a broad range of storage.
@@ -47,13 +61,6 @@
// Network is a handle to the network service, used by workloads.
Network *network.Service
-
- // KPKI is a handle to initialized Kubernetes PKI stored on etcd. In the future
- // this will probably be provisioned by the Kubernetes workload itself.
- KPKI *pki.PKI
-
- // Node is the node identity on which the roleserver is running.
- Node *identity.Node
}
// Service is the roleserver/“Role Server” service. See the package-level
@@ -61,125 +68,84 @@
type Service struct {
Config
- value memory.Value
+ ClusterMembership ClusterMembershipValue
+ KubernetesStatus KubernetesStatusValue
+ bootstrapData bootstrapDataValue
+ localRoles localRolesValue
- // kwC is a channel populated with updates to the local Node object from the
- // curator, passed over to the Kubernetes Worker launcher.
- kwC chan *cpb.Node
- // kwSvcC is a channel populated by the Kubernetes Worker launcher when the
- // service is started (which then contains the value of spawned Kubernetes
- // workload service) or stopped (which is then nil).
- kwSvcC chan *kubernetes.Service
-
- // gRPC channel to curator, acquired via Config.CuratorDial, active for the
- // lifecycle of the Service runnable. It's used by the updater
- // sub-runnable.
- curator cpb.CuratorClient
-}
-
-// Status is updated by the role service any time one of the subordinate
-// workload services is started or stopped.
-type Status struct {
- // Kubernetes is set to the Kubernetes workload Service if started/restarted or
- // nil if stopped.
- Kubernetes *kubernetes.Service
+ controlPlane *workerControlPlane
+ statusPush *workerStatusPush
+ kubernetes *workerKubernetes
+ rolefetch *workerRoleFetch
}
// New creates a Role Server services from a Config.
func New(c Config) *Service {
- return &Service{
+ s := &Service{
Config: c,
- kwC: make(chan *cpb.Node),
- kwSvcC: make(chan *kubernetes.Service),
}
+
+ s.controlPlane = &workerControlPlane{
+ storageRoot: s.StorageRoot,
+
+ bootstrapData: &s.bootstrapData,
+ clusterMembership: &s.ClusterMembership,
+ localRoles: &s.localRoles,
+ }
+
+ s.statusPush = &workerStatusPush{
+ network: s.Network,
+
+ clusterMembership: &s.ClusterMembership,
+ }
+
+ s.kubernetes = &workerKubernetes{
+ network: s.Network,
+ storageRoot: s.StorageRoot,
+
+ localRoles: &s.localRoles,
+ clusterMembership: &s.ClusterMembership,
+
+ kubernetesStatus: &s.KubernetesStatus,
+ }
+
+ s.rolefetch = &workerRoleFetch{
+ clusterMembership: &s.ClusterMembership,
+
+ localRoles: &s.localRoles,
+ }
+
+ return s
}
-type Watcher struct {
- event.Watcher
+func (s *Service) ProvideBootstrapData(privkey ed25519.PrivateKey, iok, cuk []byte) {
+ s.ClusterMembership.set(&ClusterMembership{
+ pubkey: privkey.Public().(ed25519.PublicKey),
+ })
+ s.bootstrapData.set(&bootstrapData{
+ nodePrivateKey: privkey,
+ initialOwnerKey: iok,
+ clusterUnlockKey: cuk,
+ })
}
-func (s *Service) Watch() Watcher {
- return Watcher{
- Watcher: s.value.Watch(),
- }
-}
-
-func (w *Watcher) Get(ctx context.Context) (*Status, error) {
- v, err := w.Watcher.Get(ctx)
- if err != nil {
- return nil, err
- }
- st := v.(Status)
- return &st, nil
+func (s *Service) ProvideRegisterData(credentials identity.NodeCredentials, directory *cpb.ClusterDirectory) {
+ s.ClusterMembership.set(&ClusterMembership{
+ remoteCurators: directory,
+ credentials: &credentials,
+ pubkey: credentials.PublicKey(),
+ })
}
// Run the Role Server service, which uses intermediary workload launchers to
// start/stop subordinate services as the Node's roles change.
func (s *Service) Run(ctx context.Context) error {
- supervisor.Logger(ctx).Info("Dialing curator...")
- conn, err := s.CuratorDial()
- if err != nil {
- return fmt.Errorf("could not dial cluster curator: %w", err)
- }
- defer conn.Close()
- s.curator = cpb.NewCuratorClient(conn)
-
- if err := supervisor.Run(ctx, "updater", s.runUpdater); err != nil {
- return fmt.Errorf("failed to launch updater: %w", err)
- }
-
- if err := supervisor.Run(ctx, "cluster-agent", s.runClusterAgent); err != nil {
- return fmt.Errorf("failed to launch cluster agent: %w", err)
- }
-
- if err := supervisor.Run(ctx, "kubernetes-worker", s.runKubernetesWorkerLauncher); err != nil {
- return fmt.Errorf("failed to start kubernetes-worker launcher: %w", err)
- }
-
+ supervisor.Run(ctx, "controlplane", s.controlPlane.run)
+ supervisor.Run(ctx, "kubernetes", s.kubernetes.run)
+ supervisor.Run(ctx, "statuspush", s.statusPush.run)
+ supervisor.Run(ctx, "rolefetch", s.rolefetch.run)
supervisor.Signal(ctx, supervisor.SignalHealthy)
- status := Status{}
- for {
- select {
- case <-ctx.Done():
- return ctx.Err()
- case svc := <-s.kwSvcC:
- status.Kubernetes = svc
- s.value.Set(status)
- }
- }
-}
-
-// runUpdater runs the updater, a runnable which watchers the cluster via
-// curator for any pertinent node changes and distributes them to respective
-// workload launchers.
-//
-// TODO(q3k): this should probably be implemented somewhere as a curator client
-// library, maybe one that implements the Event Value interface.
-func (s *Service) runUpdater(ctx context.Context) error {
- srv, err := s.curator.Watch(ctx, &cpb.WatchRequest{Kind: &cpb.WatchRequest_NodeInCluster_{
- NodeInCluster: &cpb.WatchRequest_NodeInCluster{
- NodeId: s.Node.ID(),
- },
- }})
- if err != nil {
- return fmt.Errorf("watch failed: %w", err)
- }
- defer srv.CloseSend()
-
- supervisor.Signal(ctx, supervisor.SignalHealthy)
- for {
- ev, err := srv.Recv()
- if err != nil {
- return fmt.Errorf("watch event receive failed: %w", err)
- }
- supervisor.Logger(ctx).Infof("Received node event: %+v", ev)
- for _, node := range ev.Nodes {
- if node.Id != s.Node.ID() {
- continue
- }
- s.kwC <- node
- }
- }
-
+ <-ctx.Done()
+ return ctx.Err()
}
diff --git a/metropolis/node/core/roleserve/value_bootstrapdata.go b/metropolis/node/core/roleserve/value_bootstrapdata.go
new file mode 100644
index 0000000..4ab1250
--- /dev/null
+++ b/metropolis/node/core/roleserve/value_bootstrapdata.go
@@ -0,0 +1,45 @@
+package roleserve
+
+import (
+ "context"
+ "crypto/ed25519"
+
+ "source.monogon.dev/metropolis/pkg/event"
+ "source.monogon.dev/metropolis/pkg/event/memory"
+)
+
+// bootstrapData is an internal EventValue structure which is populated by the
+// Cluster Enrolment logic via ProvideBootstrapData. It contains data needed by
+// the control plane logic to go into bootstrap mode and bring up a control
+// plane from scratch.
+type bootstrapData struct {
+ nodePrivateKey ed25519.PrivateKey
+ clusterUnlockKey []byte
+ initialOwnerKey []byte
+}
+
+type bootstrapDataValue struct {
+ value memory.Value
+}
+
+func (c *bootstrapDataValue) Watch() *bootstrapDataWatcher {
+ return &bootstrapDataWatcher{
+ Watcher: c.value.Watch(),
+ }
+}
+
+func (c *bootstrapDataValue) set(v *bootstrapData) {
+ c.value.Set(v)
+}
+
+type bootstrapDataWatcher struct {
+ event.Watcher
+}
+
+func (c *bootstrapDataWatcher) Get(ctx context.Context) (*bootstrapData, error) {
+ v, err := c.Watcher.Get(ctx)
+ if err != nil {
+ return nil, err
+ }
+ return v.(*bootstrapData), nil
+}
diff --git a/metropolis/node/core/roleserve/value_clustermembership.go b/metropolis/node/core/roleserve/value_clustermembership.go
new file mode 100644
index 0000000..4f9d196
--- /dev/null
+++ b/metropolis/node/core/roleserve/value_clustermembership.go
@@ -0,0 +1,161 @@
+package roleserve
+
+import (
+ "context"
+ "crypto/ed25519"
+ "fmt"
+ "net"
+
+ "google.golang.org/grpc"
+
+ common "source.monogon.dev/metropolis/node"
+ "source.monogon.dev/metropolis/node/core/consensus"
+ "source.monogon.dev/metropolis/node/core/curator"
+ "source.monogon.dev/metropolis/node/core/identity"
+ "source.monogon.dev/metropolis/node/core/rpc"
+ "source.monogon.dev/metropolis/pkg/event"
+ "source.monogon.dev/metropolis/pkg/event/memory"
+ cpb "source.monogon.dev/metropolis/proto/common"
+)
+
+// ClusterMembership is an Event Value structure used to keep state of the
+// membership of this node in a cluster, the location of a working Curator API
+// (local or remote) and the state of a locally running control plane.
+//
+// This amalgam of seemingly unrelated data is all required to have a single
+// structure that can answer a seemingly trivial question: “Who am I and how do
+// I contact a Curator?”.
+//
+// This structure is available to roleserver-internal workers (eg. the Kubernetes
+// Worker Launcher and Updater) and to external code (eg. the Hostsfile
+// service). It is also deeply intertwined with the Control Plane Worker which
+// not only populates it when a Control Plane (and thus Curator) gets started,
+// but also accesses it to pass over information about already known remote
+// curators and to get the local node's identity.
+type ClusterMembership struct {
+ // localCurator is set by the Control Plane Worker when this node runs control
+ // plane services.
+ localCurator *curator.Service
+ // localConsensus is set by the Control Plane Worker when this node runs control
+ // plane services.
+ localConsensus *consensus.Service
+ // remoteCurators gets set by Cluster Enrolment code when Registering into a
+ // cluster and gets propagated by the Control Plane Worker to maintain
+ // connectivity to external Curators regardless of local curator health.
+ //
+ // TODO(q3k): also update this based on a live Cluster Directory from the
+ // cluster.
+ remoteCurators *cpb.ClusterDirectory
+ // credentials is set whenever this node has full access to the Cluster and is
+ // the a of credentials which can be used to perform authenticated (as the node)
+ // access to the Curator.
+ credentials *identity.NodeCredentials
+ // pubkey is the public key of the local node, and is always set.
+ pubkey ed25519.PublicKey
+}
+
+type ClusterMembershipValue struct {
+ value memory.Value
+}
+
+func (c *ClusterMembershipValue) Watch() *ClusterMembershipWatcher {
+ return &ClusterMembershipWatcher{
+ w: c.value.Watch(),
+ }
+}
+
+func (c *ClusterMembershipValue) set(v *ClusterMembership) {
+ c.value.Set(v)
+}
+
+type ClusterMembershipWatcher struct {
+ w event.Watcher
+}
+
+func (c *ClusterMembershipWatcher) Close() error {
+ return c.w.Close()
+}
+
+func (c *ClusterMembershipWatcher) getAny(ctx context.Context) (*ClusterMembership, error) {
+ v, err := c.w.Get(ctx)
+ if err != nil {
+ return nil, err
+ }
+ return v.(*ClusterMembership), nil
+}
+
+// GetNodeID returns the Node ID of the locally running node whenever available.
+// NodeIDs are available early on in the node startup process and are guaranteed
+// to never change at runtime. The watcher will then block all further Get calls
+// until new information is available. This method should only be used if
+// GetNodeID is the only method ran on the watcher.
+func (c *ClusterMembershipWatcher) GetNodeID(ctx context.Context) (string, error) {
+ for {
+ cm, err := c.getAny(ctx)
+ if err != nil {
+ return "", err
+ }
+ if cm.pubkey != nil {
+ return identity.NodeID(cm.pubkey), nil
+ }
+ }
+}
+
+// GetHome returns a ClusterMembership whenever the local node is HOME to a
+// cluster (ie. whenever the node is fully a member of a cluster and can dial
+// the cluster's Curator). See proto.common.ClusterState for more information
+// about cluster states. The watcher will then block all futher Get calls until
+// new information is available.
+func (c *ClusterMembershipWatcher) GetHome(ctx context.Context) (*ClusterMembership, error) {
+ for {
+ cm, err := c.getAny(ctx)
+ if err != nil {
+ return nil, err
+ }
+ if cm.credentials == nil {
+ continue
+ }
+ if cm.localCurator == nil && cm.remoteCurators == nil {
+ continue
+ }
+ return cm, nil
+ }
+}
+
+// DialCurator returns an authenticated gRPC client connection to the Curator,
+// either local or remote. No load balancing will be performed across local and
+// remote curators, so if the local node starts running a local curator but old
+// connections are still used, they will continue to target only remote
+// curators. Same goes for local consensus being turned down - however, in this
+// case, calls will error out and the client can be redialed on errors.
+//
+// It is thus recommended to only use DialCurator in short-lived contexts, and
+// perform a GetHome/DialCurator process on any gRPC error. A smarter
+// load-balancing/re-dialing client will be implemented in the future.
+func (m *ClusterMembership) DialCurator() (*grpc.ClientConn, error) {
+ if m.localCurator != nil {
+ return m.localCurator.DialCluster()
+ }
+
+ // Dial first curator.
+ // TODO(q3k): load balance
+ if m.remoteCurators == nil || len(m.remoteCurators.Nodes) < 1 {
+ return nil, fmt.Errorf("no local or remote curators available")
+ }
+ host := m.remoteCurators.Nodes[0].Addresses[0].Host
+ addr := net.JoinHostPort(host, common.CuratorServicePort.PortString())
+ return rpc.NewAuthenticatedClient(addr, m.credentials.TLSCredentials(), m.credentials.ClusterCA())
+}
+
+func (m *ClusterMembership) NodePubkey() ed25519.PublicKey {
+ if m.pubkey == nil {
+ // This shouldn't happen - it means a user got this structure too early or
+ // constructed it from scratch.
+ panic("node pubkey not available")
+ }
+ return m.pubkey
+}
+
+func (m *ClusterMembership) NodeID() string {
+ return identity.NodeID(m.NodePubkey())
+}
diff --git a/metropolis/node/core/roleserve/value_kubernetes.go b/metropolis/node/core/roleserve/value_kubernetes.go
new file mode 100644
index 0000000..4334dc3
--- /dev/null
+++ b/metropolis/node/core/roleserve/value_kubernetes.go
@@ -0,0 +1,46 @@
+package roleserve
+
+import (
+ "context"
+
+ "source.monogon.dev/metropolis/node/kubernetes"
+ "source.monogon.dev/metropolis/pkg/event"
+ "source.monogon.dev/metropolis/pkg/event/memory"
+)
+
+// KubernetesStatus is an Event Value structure populated by a running
+// Kubernetes instance. It allows external services to access the Kubernetes
+// Service whenever available (ie. enabled and started by the Role Server).
+type KubernetesStatus struct {
+ Svc *kubernetes.Service
+}
+
+type KubernetesStatusValue struct {
+ value memory.Value
+}
+
+func (k *KubernetesStatusValue) Watch() *KubernetesStatusWatcher {
+ return &KubernetesStatusWatcher{
+ Watcher: k.value.Watch(),
+ }
+}
+
+func (k *KubernetesStatusValue) set(v *KubernetesStatus) {
+ k.value.Set(v)
+}
+
+type KubernetesStatusWatcher struct {
+ event.Watcher
+}
+
+// Get waits until the Kubernetes services is available. The returned
+// KubernetesStatus is guaranteed to contain a kubernetes.Service that was
+// running at the time of this call returning (but which might have since been
+// stopped).
+func (k *KubernetesStatusWatcher) Get(ctx context.Context) (*KubernetesStatus, error) {
+ v, err := k.Watcher.Get(ctx)
+ if err != nil {
+ return nil, err
+ }
+ return v.(*KubernetesStatus), nil
+}
diff --git a/metropolis/node/core/roleserve/value_node.go b/metropolis/node/core/roleserve/value_node.go
new file mode 100644
index 0000000..f095f2f
--- /dev/null
+++ b/metropolis/node/core/roleserve/value_node.go
@@ -0,0 +1,36 @@
+package roleserve
+
+import (
+ "context"
+
+ "source.monogon.dev/metropolis/pkg/event"
+ "source.monogon.dev/metropolis/pkg/event/memory"
+ cpb "source.monogon.dev/metropolis/proto/common"
+)
+
+type localRolesValue struct {
+ value memory.Value
+}
+
+func (c *localRolesValue) Watch() *localRolesWatcher {
+ return &localRolesWatcher{
+ Watcher: c.value.Watch(),
+ }
+}
+
+func (c *localRolesValue) set(v *cpb.NodeRoles) {
+ c.value.Set(v)
+}
+
+type localRolesWatcher struct {
+ event.Watcher
+}
+
+// Get retrieves the roles assigned to the local node by the cluster.
+func (c *localRolesWatcher) Get(ctx context.Context) (*cpb.NodeRoles, error) {
+ v, err := c.Watcher.Get(ctx)
+ if err != nil {
+ return nil, err
+ }
+ return v.(*cpb.NodeRoles), nil
+}
diff --git a/metropolis/node/core/roleserve/worker_controlplane.go b/metropolis/node/core/roleserve/worker_controlplane.go
new file mode 100644
index 0000000..6193c5a
--- /dev/null
+++ b/metropolis/node/core/roleserve/worker_controlplane.go
@@ -0,0 +1,386 @@
+package roleserve
+
+import (
+ "context"
+ "crypto/ed25519"
+ "crypto/x509"
+ "fmt"
+ "time"
+
+ "source.monogon.dev/metropolis/node/core/consensus"
+ "source.monogon.dev/metropolis/node/core/curator"
+ "source.monogon.dev/metropolis/node/core/identity"
+ "source.monogon.dev/metropolis/node/core/localstorage"
+ "source.monogon.dev/metropolis/pkg/event/memory"
+ "source.monogon.dev/metropolis/pkg/pki"
+ "source.monogon.dev/metropolis/pkg/supervisor"
+ cpb "source.monogon.dev/metropolis/proto/common"
+)
+
+// workerControlPlane is the Control Plane Worker, responsible for maintaining a
+// locally running Control Plane (Consensus and Curator service pair) if needed.
+//
+// The Control Plane will run under the following conditions:
+// - This node has been started in BOOTSTRAP mode and bootstrapData was provided
+// by the cluster enrolment logic. In this case, the Control Plane Worker will
+// perform the required bootstrap steps, creating a local node with appropriate
+// roles, and will start Consensus and the Curator.
+// - This node has the ConsensusMember Node Role. This will be true for nodes
+// which are REGISTERing into the cluster, as well as already running nodes that
+// have been assigned the role.
+//
+// In either case, ClusterMembership will be updated to allow connecting to the
+// newly locally running control plane. For nodes that are bootstrapping the
+// cluster, this will be the fist time the rest of the node can reach the
+// Curator. For other cases, this will be the new, preferred way to reach
+// consensus, without having to rely on external Control Plane nodes.
+type workerControlPlane struct {
+ storageRoot *localstorage.Root
+
+ // bootstrapData will be read.
+ bootstrapData *bootstrapDataValue
+ // clusterMembership will be read and written.
+ clusterMembership *ClusterMembershipValue
+ // localRoles will be read.
+ localRoles *localRolesValue
+}
+
+// controlPlaneStartup is used internally to provide a reduced (as in MapReduce)
+// datum for the main Control Plane launcher responsible for launching the
+// Control Plane Services, if at all.
+type controlPlaneStartup struct {
+ // consensusConfig is set if the node should run the control plane, and will
+ // contain the configuration of the Consensus service.
+ consensusConfig *consensus.Config
+ // bootstrap is set if this node should bootstrap consensus. It contains all
+ // data required to perform this bootstrap step.
+ bootstrap *bootstrapData
+
+ // existingMembership is the ClusterMembership that the node already had
+ // available before deciding to run the Control Plane. This will be used to
+ // carry over existing data from the membership into the new membership as
+ // affected by starting the control plane.
+ existingMembership *ClusterMembership
+}
+
+// changed informs the Control Plane launcher whether two different
+// controlPlaneStartups differ to the point where a restart of the control plane
+// should happen.
+//
+// Currently this is only true when a node switches to/from having a Control
+// Plane role.
+func (c *controlPlaneStartup) changed(o *controlPlaneStartup) bool {
+ hasConsensusA := c.consensusConfig != nil
+ hasConsensusB := o.consensusConfig != nil
+ if hasConsensusA != hasConsensusB {
+ return true
+ }
+
+ return false
+}
+
+func (s *workerControlPlane) run(ctx context.Context) error {
+ // Map/Reduce a *controlPlaneStartup from different data sources. This will then
+ // populate an Event Value that the actual launcher will use to start the
+ // Control Plane.
+ //
+ // bootstrapData -M-> bootstrapDataC ------.
+ // |
+ // ClusterMambership -M-> clusterMembershipC --R---> startupV
+ // |
+ // NodeRoles -M-> rolesC --------------'
+ //
+ var startupV memory.Value
+
+ // Channels are used as intermediaries between map stages and the final reduce,
+ // which is okay as long as the entire tree restarts simultaneously (which we
+ // ensure via RunGroup).
+ bootstrapDataC := make(chan *bootstrapData)
+ clusterMembershipC := make(chan *ClusterMembership)
+ rolesC := make(chan *cpb.NodeRoles)
+
+ supervisor.RunGroup(ctx, map[string]supervisor.Runnable{
+ // Plain conversion from Event Value to channel.
+ "map-bootstrap-data": func(ctx context.Context) error {
+ w := s.bootstrapData.Watch()
+ defer w.Close()
+ for {
+ v, err := w.Get(ctx)
+ if err != nil {
+ return err
+ }
+ bootstrapDataC <- v
+ }
+ },
+ // Plain conversion from Event Value to channel.
+ "map-cluster-membership": func(ctx context.Context) error {
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ w := s.clusterMembership.Watch()
+ defer w.Close()
+ for {
+ v, err := w.GetHome(ctx)
+ if err != nil {
+ return err
+ }
+ clusterMembershipC <- v
+ }
+ },
+ // Plain conversion from Event Value to channel.
+ "map-roles": func(ctx context.Context) error {
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ w := s.localRoles.Watch()
+ defer w.Close()
+ for {
+ v, err := w.Get(ctx)
+ if err != nil {
+ return err
+ }
+ rolesC <- v
+ }
+ },
+ // Provide config from clusterMembership and roles.
+ "reduce-config": func(ctx context.Context) error {
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ var lr *cpb.NodeRoles
+ var cm *ClusterMembership
+ var bd *bootstrapData
+ for {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case lr = <-rolesC:
+ case cm = <-clusterMembershipC:
+ case bd = <-bootstrapDataC:
+ }
+
+ // If we have any bootstrap config ever, always use that.
+ //
+ // If there is a conflict between two available configuration methods (bootstrap
+ // and non-bootstrap) there effectively shouldn't be any difference between the
+ // two and it shouldn't matter which one we pick. That is because the bootstrap
+ // data is only effectively used to populate the JoinCluster parameter of etcd,
+ // which in turns is only used when a node is starting without any data present.
+ // And since we managed to get our own node roles and that won the race against
+ // bootstrap data, it means the bootstrap was successful and we can now start
+ // without the bootstrap data.
+ //
+ // The only problem is when we remove a ConsensusMember from a node which still
+ // has BootstrapData lingering from first bootup. However, we currently do not
+ // support removing consensus roles (or any roles for that matter).
+ //
+ // TODO(q3k): support the above edge case. This can be done, for example, by
+ // rewriting the reduction to wait for all data to be available and by
+ // pre-populating all values to be nil at startup, thereby allowing for priority
+ // encoding and removing the above race condition.
+ if bd != nil {
+ supervisor.Logger(ctx).Infof("Using bootstrap data...")
+ startupV.Set(&controlPlaneStartup{
+ consensusConfig: &consensus.Config{
+ Data: &s.storageRoot.Data.Etcd,
+ Ephemeral: &s.storageRoot.Ephemeral.Consensus,
+ NodePrivateKey: bd.nodePrivateKey,
+ },
+ bootstrap: bd,
+ })
+ continue
+ }
+
+ // Otherwise, try to interpret node roles if available.
+ if lr != nil && cm != nil {
+ supervisor.Logger(ctx).Infof("Using role assigned by cluter...")
+ role := lr.ConsensusMember
+ if role == nil {
+ supervisor.Logger(ctx).Infof("Not a control plane node.")
+ startupV.Set(&controlPlaneStartup{})
+ continue
+ }
+ supervisor.Logger(ctx).Infof("Control plane node, building config...")
+
+ // Parse X509 data from NodeRoles.
+ caCert, err := x509.ParseCertificate(role.CaCertificate)
+ if err != nil {
+ supervisor.Logger(ctx).Errorf("Could not parse CA certificate: %v", err)
+ continue
+ }
+ peerCert, err := x509.ParseCertificate(role.PeerCertificate)
+ if err != nil {
+ supervisor.Logger(ctx).Errorf("Could not parse peer certificate: %v", err)
+ continue
+ }
+ crl, err := x509.ParseCRL(role.InitialCrl)
+ if err != nil {
+ supervisor.Logger(ctx).Errorf("Could not parse CRL: %v", err)
+ continue
+ }
+
+ // Convert NodeRoles peers into consensus peers. Let the user know what peers
+ // we're starting with.
+ supervisor.Logger(ctx).Infof("Node role mandates cluster membership with initial peers:")
+ for _, p := range role.Peers {
+ supervisor.Logger(ctx).Infof(" - %s (%s)", p.Name, p.URL)
+ }
+ nodes := make([]consensus.ExistingNode, len(role.Peers))
+ for i, p := range role.Peers {
+ nodes[i].Name = p.Name
+ nodes[i].URL = p.URL
+ }
+
+ // Build and submit config to startup V.
+ startupV.Set(&controlPlaneStartup{
+ consensusConfig: &consensus.Config{
+ Data: &s.storageRoot.Data.Etcd,
+ Ephemeral: &s.storageRoot.Ephemeral.Consensus,
+ NodePrivateKey: cm.credentials.TLSCredentials().PrivateKey.(ed25519.PrivateKey),
+ JoinCluster: &consensus.JoinCluster{
+ CACertificate: caCert,
+ NodeCertificate: peerCert,
+ InitialCRL: &pki.CRL{
+ Raw: role.InitialCrl,
+ List: crl,
+ },
+ ExistingNodes: nodes,
+ },
+ },
+ existingMembership: cm,
+ })
+ }
+ }
+ },
+ })
+
+ // Run main Control Plane launcher. This depends on a config being put to
+ // startupV.
+ supervisor.Run(ctx, "launcher", func(ctx context.Context) error {
+ supervisor.Logger(ctx).Infof("Waiting for start data...")
+
+ // Read config from startupV.
+ w := startupV.Watch()
+ defer w.Close()
+ startupI, err := w.Get(ctx)
+ if err != nil {
+ return err
+ }
+ startup := startupI.(*controlPlaneStartup)
+
+ // Start Control Plane if we have a config.
+ if startup.consensusConfig == nil {
+ supervisor.Logger(ctx).Infof("No consensus config, not starting up control plane.")
+ } else {
+ supervisor.Logger(ctx).Infof("Got config, starting consensus and curator...")
+
+ // Start consensus with config from startupV. This bootstraps the consensus
+ // service if needed.
+ con := consensus.New(*startup.consensusConfig)
+ if err := supervisor.Run(ctx, "consensus", con.Run); err != nil {
+ return fmt.Errorf("failed to start consensus service: %w", err)
+ }
+
+ // Prepare curator config, notably performing a bootstrap step if necessary. The
+ // preparation will result in a set of node credentials to run the curator with
+ // and a previously used cluster directory to be passed over to the new
+ // ClusterMembership, if any.
+ var creds *identity.NodeCredentials
+ var directory *cpb.ClusterDirectory
+ if b := startup.bootstrap; b != nil {
+ supervisor.Logger(ctx).Infof("Bootstrapping control plane. Waiting for consensus...")
+
+ // Connect to etcd as curator to perform the bootstrap step.
+ w := con.Watch()
+ st, err := w.Get(ctx)
+ if err != nil {
+ return fmt.Errorf("while waiting for consensus for bootstrap: %w", err)
+ }
+ ckv, err := st.CuratorClient()
+ if err != nil {
+ return fmt.Errorf("when retrieving curator client for bootstarp: %w", err)
+ }
+
+ supervisor.Logger(ctx).Infof("Bootstrapping control plane. Performing bootstrap...")
+
+ // Perform curator bootstrap step in etcd.
+ //
+ // This is all idempotent, so there's no harm in re-running this on every
+ // curator startup.
+ //
+ // TODO(q3k): collapse the curator bootstrap shenanigans into a single function.
+ n := curator.NewNodeForBootstrap(b.clusterUnlockKey, b.nodePrivateKey.Public().(ed25519.PublicKey))
+ n.EnableKubernetesWorker()
+ caCert, nodeCert, err := curator.BootstrapNodeFinish(ctx, ckv, &n, b.initialOwnerKey)
+ if err != nil {
+ return fmt.Errorf("while bootstrapping node: %w", err)
+ }
+ // ... and build new credentials from bootstrap step.
+ creds, err = identity.NewNodeCredentials(b.nodePrivateKey, nodeCert, caCert)
+ if err != nil {
+ return fmt.Errorf("when creating bootstrap node credentials: %w", err)
+ }
+ supervisor.Logger(ctx).Infof("Control plane bootstrap complete, starting curator...")
+ } else {
+ // Not bootstrapping, just starting consensus with credentials we already have.
+
+ // First, run a few assertions. This should never happen with the Map/Reduce
+ // logic above, ideally we would encode this in the type system.
+ if startup.existingMembership == nil {
+ panic("no existingMembership but not bootstrapping either")
+ }
+ if startup.existingMembership.credentials == nil {
+ panic("no existingMembership.credentials but not bootstrapping either")
+ }
+ if startup.existingMembership.remoteCurators == nil {
+ panic("no existingMembership.remoteCurators but not bootstrapping either")
+ }
+
+ // Use already existing credentials, and pass over already known curators (as
+ // we're not the only node, and we'd like downstream consumers to be able to
+ // keep connecting to existing curators in case the local one fails).
+ creds = startup.existingMembership.credentials
+ directory = startup.existingMembership.remoteCurators
+ }
+
+ // Start curator.
+ cur := curator.New(curator.Config{
+ NodeCredentials: creds,
+ Consensus: con,
+ LeaderTTL: 10 * time.Second,
+ Directory: &s.storageRoot.Ephemeral.Curator,
+ })
+ if err := supervisor.Run(ctx, "curator", cur.Run); err != nil {
+ return fmt.Errorf("failed to start curator: %w", err)
+ }
+
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ supervisor.Logger(ctx).Infof("Control plane running, submitting clusterMembership.")
+
+ // We now have a locally running ControlPlane. Reflect that in a new
+ // ClusterMembership.
+ s.clusterMembership.set(&ClusterMembership{
+ localCurator: cur,
+ localConsensus: con,
+ credentials: creds,
+ remoteCurators: directory,
+ pubkey: creds.PublicKey(),
+ })
+ }
+
+ // Restart everything if we get a significantly different config (ie., a config
+ // whose change would/should either turn up or tear down the Control Plane).
+ //
+ // Not restarting on every single change prevents us from going in a
+ // ClusterMembership -> ClusterDirectory -> ClusterMembership thrashing loop.
+ for {
+ ncI, err := w.Get(ctx)
+ if err != nil {
+ return err
+ }
+ nc := ncI.(*controlPlaneStartup)
+ if nc.changed(startup) {
+ supervisor.Logger(ctx).Infof("Configuration changed, restarting...")
+ return fmt.Errorf("config changed, restarting")
+ }
+ }
+ })
+
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ <-ctx.Done()
+ return ctx.Err()
+}
diff --git a/metropolis/node/core/roleserve/worker_kubernetes.go b/metropolis/node/core/roleserve/worker_kubernetes.go
new file mode 100644
index 0000000..06e6735
--- /dev/null
+++ b/metropolis/node/core/roleserve/worker_kubernetes.go
@@ -0,0 +1,219 @@
+package roleserve
+
+import (
+ "context"
+ "fmt"
+ "net"
+
+ "source.monogon.dev/metropolis/node/core/localstorage"
+ "source.monogon.dev/metropolis/node/core/network"
+ "source.monogon.dev/metropolis/node/kubernetes"
+ "source.monogon.dev/metropolis/node/kubernetes/containerd"
+ kpki "source.monogon.dev/metropolis/node/kubernetes/pki"
+ "source.monogon.dev/metropolis/pkg/event/memory"
+ "source.monogon.dev/metropolis/pkg/supervisor"
+ cpb "source.monogon.dev/metropolis/proto/common"
+)
+
+// workerKubernetes is the Kubernetes Worker, responsible for launching
+// (currently a control plane / data plane converged) Kubernetes payload on
+// Metropolis.
+//
+// This currently requires locally available Consensus, until it is split up
+// into Control/Data plane parts (where then the API server must still be
+// colocated with Consensus, but all the other services don't have to).
+type workerKubernetes struct {
+ network *network.Service
+ storageRoot *localstorage.Root
+
+ localRoles *localRolesValue
+ clusterMembership *ClusterMembershipValue
+ kubernetesStatus *KubernetesStatusValue
+}
+
+// kubernetesStartup is used internally to provide a reduced (as in MapReduce
+// reduced) datum for the main Kubernetes launcher responsible for starting the
+// service, if at all.
+type kubernetesStartup struct {
+ roles *cpb.NodeRoles
+ membership *ClusterMembership
+}
+
+// changed informs the Kubernetes launcher whether two different
+// kubernetesStartups differ to the point where a restart of Kubernetes should
+// happen.
+func (k *kubernetesStartup) changed(o *kubernetesStartup) bool {
+ hasKubernetesA := k.roles.KubernetesWorker != nil
+ hasKubernetesB := o.roles.KubernetesWorker != nil
+ if hasKubernetesA != hasKubernetesB {
+ return true
+ }
+
+ return false
+}
+
+func (s *workerKubernetes) run(ctx context.Context) error {
+ // TODO(q3k): stop depending on local consensus, split up k8s into control plane
+ // and workers.
+
+ // Map/Reduce a *kubernetesStartup from different data sources. This will then
+ // populate an Event Value that the actual launcher will use to start
+ // Kubernetes.
+ //
+ // ClusterMambership -M-> clusterMembershipC --R---> startupV
+ // |
+ // NodeRoles -M-> rolesC --------------'
+ //
+ var startupV memory.Value
+
+ clusterMembershipC := make(chan *ClusterMembership)
+ rolesC := make(chan *cpb.NodeRoles)
+
+ supervisor.RunGroup(ctx, map[string]supervisor.Runnable{
+ // Plain conversion from Event Value to channel.
+ "map-cluster-membership": func(ctx context.Context) error {
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ w := s.clusterMembership.Watch()
+ defer w.Close()
+ for {
+ v, err := w.GetHome(ctx)
+ if err != nil {
+ return err
+ }
+ clusterMembershipC <- v
+ }
+ },
+ // Plain conversion from Event Value to channel.
+ "map-roles": func(ctx context.Context) error {
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ w := s.localRoles.Watch()
+ defer w.Close()
+ for {
+ v, err := w.Get(ctx)
+ if err != nil {
+ return err
+ }
+ rolesC <- v
+ }
+ },
+ // Provide config from clusterMembership and roles.
+ "reduce-config": func(ctx context.Context) error {
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ var lr *cpb.NodeRoles
+ var cm *ClusterMembership
+ for {
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case lr = <-rolesC:
+ case cm = <-clusterMembershipC:
+ }
+ if lr != nil && cm != nil {
+ startupV.Set(&kubernetesStartup{
+ roles: lr,
+ membership: cm,
+ })
+ }
+ }
+ },
+ })
+
+ supervisor.Run(ctx, "run", func(ctx context.Context) error {
+ w := startupV.Watch()
+ defer w.Close()
+ supervisor.Logger(ctx).Infof("Waiting for startup data...")
+
+ // Acquire kubernetesStartup, waiting for it to contain local consensus and a
+ // KubernetesWorker local role.
+ var d *kubernetesStartup
+ for {
+ dV, err := w.Get(ctx)
+ if err != nil {
+ return err
+ }
+ d = dV.(*kubernetesStartup)
+ supervisor.Logger(ctx).Infof("Got new startup data.")
+ if d.roles.KubernetesWorker == nil {
+ supervisor.Logger(ctx).Infof("No Kubernetes role, not starting.")
+ continue
+ }
+ if d.membership.localConsensus == nil {
+ supervisor.Logger(ctx).Warningf("No local consensus, cannot start.")
+ continue
+ }
+
+ break
+ }
+ supervisor.Logger(ctx).Infof("Waiting for local consensus...")
+ cstW := d.membership.localConsensus.Watch()
+ defer cstW.Close()
+ cst, err := cstW.GetRunning(ctx)
+ if err != nil {
+ return fmt.Errorf("waiting for local consensus: %w", err)
+ }
+
+ supervisor.Logger(ctx).Infof("Got data, starting Kubernetes...")
+ kkv, err := cst.KubernetesClient()
+ if err != nil {
+ return fmt.Errorf("retrieving kubernetes client: %w", err)
+ }
+
+ // Start containerd.
+ containerdSvc := &containerd.Service{
+ EphemeralVolume: &s.storageRoot.Ephemeral.Containerd,
+ }
+ if err := supervisor.Run(ctx, "containerd", containerdSvc.Run); err != nil {
+ return fmt.Errorf("failed to start containerd service: %w", err)
+ }
+
+ // Start building Kubernetes service...
+ pki := kpki.New(supervisor.Logger(ctx), kkv)
+
+ kubeSvc := kubernetes.New(kubernetes.Config{
+ Node: &d.membership.credentials.Node,
+ // TODO(q3k): make this configurable.
+ ServiceIPRange: net.IPNet{
+ IP: net.IP{10, 0, 255, 1},
+ // That's a /24.
+ Mask: net.IPMask{0xff, 0xff, 0xff, 0x00},
+ },
+ ClusterNet: net.IPNet{
+ IP: net.IP{10, 0, 0, 0},
+ // That's a /16.
+ Mask: net.IPMask{0xff, 0xff, 0x00, 0x00},
+ },
+ KPKI: pki,
+ Root: s.storageRoot,
+ Network: s.network,
+ })
+ // Start Kubernetes.
+ if err := supervisor.Run(ctx, "kubernetes", kubeSvc.Run); err != nil {
+ return fmt.Errorf("failed to start kubernetes service: %w", err)
+ }
+
+ // Let downstream know that Kubernetes is running.
+ s.kubernetesStatus.set(&KubernetesStatus{
+ Svc: kubeSvc,
+ })
+
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+
+ // Restart everything if we get a significantly different config (ie., a config
+ // whose change would/should either turn up or tear down Kubernetes).
+ for {
+ ncI, err := w.Get(ctx)
+ if err != nil {
+ return err
+ }
+ nc := ncI.(*kubernetesStartup)
+ if nc.changed(d) {
+ supervisor.Logger(ctx).Errorf("watcher got new config, restarting")
+ return fmt.Errorf("restarting")
+ }
+ }
+ })
+
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ <-ctx.Done()
+ return ctx.Err()
+}
diff --git a/metropolis/node/core/roleserve/worker_rolefetch.go b/metropolis/node/core/roleserve/worker_rolefetch.go
new file mode 100644
index 0000000..77b3bf2
--- /dev/null
+++ b/metropolis/node/core/roleserve/worker_rolefetch.go
@@ -0,0 +1,74 @@
+package roleserve
+
+import (
+ "context"
+ "fmt"
+
+ ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+ "source.monogon.dev/metropolis/pkg/supervisor"
+)
+
+// workerRoleFetch is the Role Fetcher, an internal bookkeeping service
+// responsible for populating localRoles based on a clusterMembership whenever
+// the node is HOME and cluster credentials / curator access is available.
+type workerRoleFetch struct {
+ clusterMembership *ClusterMembershipValue
+
+ // localRoles will be written.
+ localRoles *localRolesValue
+}
+
+func (s *workerRoleFetch) run(ctx context.Context) error {
+ w := s.clusterMembership.Watch()
+ defer w.Close()
+ supervisor.Logger(ctx).Infof("Waiting for cluster membership...")
+ cm, err := w.GetHome(ctx)
+ if err != nil {
+ return err
+ }
+ supervisor.Logger(ctx).Infof("Got cluster membership, starting...")
+
+ nodeID := cm.NodeID()
+ conn, err := cm.DialCurator()
+ if err != nil {
+ return err
+ }
+ defer conn.Close()
+ cur := ipb.NewCuratorClient(conn)
+
+ // Start watch for current node, update localRoles whenever we get something
+ // new.
+ srv, err := cur.Watch(ctx, &ipb.WatchRequest{Kind: &ipb.WatchRequest_NodeInCluster_{
+ NodeInCluster: &ipb.WatchRequest_NodeInCluster{
+ NodeId: nodeID,
+ },
+ }})
+ if err != nil {
+ return fmt.Errorf("watch failed: %w", err)
+ }
+ defer srv.CloseSend()
+
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ for {
+ ev, err := srv.Recv()
+ if err != nil {
+ return fmt.Errorf("watch event receive failed: %w", err)
+ }
+ for _, n := range ev.Nodes {
+ n := n
+ // Skip spuriously sent other nodes.
+ if n.Id != nodeID {
+ continue
+ }
+ supervisor.Logger(ctx).Infof("Got new node data. Roles:")
+ if n.Roles.ConsensusMember != nil {
+ supervisor.Logger(ctx).Infof(" - control plane member, existing peers: %+v", n.Roles.ConsensusMember.Peers)
+ }
+ if n.Roles.KubernetesWorker != nil {
+ supervisor.Logger(ctx).Infof(" - kubernetes worker")
+ }
+ s.localRoles.set(n.Roles)
+ break
+ }
+ }
+}
diff --git a/metropolis/node/core/roleserve/worker_statuspush.go b/metropolis/node/core/roleserve/worker_statuspush.go
new file mode 100644
index 0000000..cdeeb26
--- /dev/null
+++ b/metropolis/node/core/roleserve/worker_statuspush.go
@@ -0,0 +1,73 @@
+package roleserve
+
+import (
+ "context"
+ "fmt"
+ "net"
+
+ ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+ "source.monogon.dev/metropolis/node/core/network"
+ "source.monogon.dev/metropolis/pkg/supervisor"
+ cpb "source.monogon.dev/metropolis/proto/common"
+)
+
+// workerStatusPush is the Status Pusher, a service responsible for sending
+// UpdateNodeStatus RPCs to a cluster whenever a Curator is available.
+//
+// TODO(q3k): factor this out of the roleserver, there's no need for this to be
+// internal, as it only depends on ClusterMembership. This could maybe even live
+// in the Network service?
+type workerStatusPush struct {
+ network *network.Service
+
+ // clusterMembership will be read.
+ clusterMembership *ClusterMembershipValue
+}
+
+func (s *workerStatusPush) run(ctx context.Context) error {
+ nw := s.network.Watch()
+ defer nw.Close()
+
+ w := s.clusterMembership.Watch()
+ defer w.Close()
+ supervisor.Logger(ctx).Infof("Waiting for cluster membership...")
+ cm, err := w.GetHome(ctx)
+ if err != nil {
+ return err
+ }
+ supervisor.Logger(ctx).Infof("Got cluster membership, starting...")
+
+ nodeID := cm.NodeID()
+ conn, err := cm.DialCurator()
+ if err != nil {
+ return err
+ }
+ defer conn.Close()
+ cur := ipb.NewCuratorClient(conn)
+
+ // Start watch on Network service, update IP address whenever new one is set.
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ var external net.IP
+ for {
+ st, err := nw.Get(ctx)
+ if err != nil {
+ return fmt.Errorf("getting network status failed: %w", err)
+ }
+
+ if external.Equal(st.ExternalAddress) {
+ continue
+ }
+ supervisor.Logger(ctx).Infof("New external address (%s), submitting update to cluster...", st.ExternalAddress.String())
+ _, err = cur.UpdateNodeStatus(ctx, &ipb.UpdateNodeStatusRequest{
+ NodeId: nodeID,
+ Status: &cpb.NodeStatus{
+ ExternalAddress: st.ExternalAddress.String(),
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("UpdateNodeStatus failed: %w", err)
+ }
+ external = st.ExternalAddress
+ supervisor.Logger(ctx).Infof("Updated.")
+ }
+}