metropolis/node/core: use curator
This finally switches over the node startup code to use the full Cluster
Manager / Curator / Role Server chain to bring up the node.
Change-Id: Iaf6173671aed107a67b4201d9d1ad8bb33baa90f
Reviewed-on: https://review.monogon.dev/c/monogon/+/189
Reviewed-by: Lorenz Brun <lorenz@nexantic.com>
diff --git a/metropolis/node/core/main.go b/metropolis/node/core/main.go
index 6f13a3a..566e65d 100644
--- a/metropolis/node/core/main.go
+++ b/metropolis/node/core/main.go
@@ -38,8 +38,7 @@
"source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/node/core/localstorage/declarative"
"source.monogon.dev/metropolis/node/core/network"
- "source.monogon.dev/metropolis/node/kubernetes"
- "source.monogon.dev/metropolis/node/kubernetes/containerd"
+ "source.monogon.dev/metropolis/node/core/roleserve"
"source.monogon.dev/metropolis/node/kubernetes/pki"
"source.monogon.dev/metropolis/pkg/logtree"
"source.monogon.dev/metropolis/pkg/supervisor"
@@ -47,23 +46,6 @@
apb "source.monogon.dev/metropolis/proto/api"
)
-var (
- // kubernetesConfig is the static/global part of the Kubernetes service
- // configuration. In the future, this might be configurable by loading it
- // from the EnrolmentConfig. Fow now, it's static and same across all
- // clusters.
- kubernetesConfig = kubernetes.Config{
- ServiceIPRange: net.IPNet{ // TODO(q3k): Decide if configurable / final value
- IP: net.IP{10, 0, 255, 1},
- Mask: net.IPMask{0xff, 0xff, 0xff, 0x00}, // /24, but Go stores as a literal mask
- },
- ClusterNet: net.IPNet{
- IP: net.IP{10, 0, 0, 0},
- Mask: net.IPMask{0xff, 0xff, 0x00, 0x00}, // /16
- },
- }
-)
-
func main() {
defer func() {
if r := recover(); r != nil {
@@ -141,8 +123,6 @@
// starting all services related to the node's roles.
// TODO(q3k): move this to a separate 'init' service.
supervisor.New(ctxS, func(ctx context.Context) error {
- logger := supervisor.Logger(ctx)
-
// Start storage and network - we need this to get anything else done.
if err := root.Start(ctx); err != nil {
return fmt.Errorf("cannot start root FS: %w", err)
@@ -166,63 +146,77 @@
return fmt.Errorf("new couldn't find home in new cluster, aborting: %w", err)
}
- // Start cluster curator.
- kv, err := status.ConsensusClient(cluster.ConsensusUserCurator)
+ // Here starts some hairy stopgap code. In the future, not all nodes will have
+ // direct access to etcd (ie. the ability to retrieve an etcd client via
+ // status.ConsensusClient).
+ // However, we are not ready to implement this yet, as that would require
+ // moving more logic into the curator (eg. some of the Kubernetes PKI logic).
+ //
+ // For now, we keep Kubernetes PKI initialization logic here, and just assume
+ // that every node will have direct access to etcd.
+
+ // Retrieve namespaced etcd KV clients for the two main direct etcd users:
+ // - Curator
+ // - Kubernetes PKI
+ ckv, err := status.ConsensusClient(cluster.ConsensusUserCurator)
if err != nil {
+ close(trapdoor)
return fmt.Errorf("failed to retrieve consensus curator client: %w", err)
}
+ kkv, err := status.ConsensusClient(cluster.ConsensusUserKubernetesPKI)
+ if err != nil {
+ close(trapdoor)
+ return fmt.Errorf("failed to retrieve consensus kubernetes PKI client: %w", err)
+ }
+
+ // Start cluster curator. The cluster curator is responsible for lifecycle
+ // management of the cluster.
+ // In the future, this will only be started on nodes that run etcd.
c := curator.New(curator.Config{
- Etcd: kv,
+ Etcd: ckv,
NodeID: status.Credentials.ID(),
// TODO(q3k): make this configurable?
LeaderTTL: time.Second * 5,
Directory: &root.Ephemeral.Curator,
})
if err := supervisor.Run(ctx, "curator", c.Run); err != nil {
+ close(trapdoor)
return fmt.Errorf("when starting curator: %w", err)
}
// We are now in a cluster. We can thus access our 'node' object and
// start all services that we should be running.
-
logger.Info("Enrolment success, continuing startup.")
- // HACK: always start k8s worker, this is being currently refactored
- // and will get fixed in review/188.
- logger.Info("Starting Kubernetes worker services...")
-
- kkv, err := status.ConsensusClient(cluster.ConsensusUserKubernetesPKI)
- if err != nil {
- return fmt.Errorf("failed to retrieve consensus kubernetes PKI client: %w", err)
- }
-
- // Ensure Kubernetes PKI objects exist in etcd.
+ // Ensure Kubernetes PKI objects exist in etcd. In the future, this logic will
+ // be implemented in the curator.
kpki := pki.New(lt.MustLeveledFor("pki.kubernetes"), kkv)
if err := kpki.EnsureAll(ctx); err != nil {
+ close(trapdoor)
return fmt.Errorf("failed to ensure kubernetes PKI present: %w", err)
}
- containerdSvc := &containerd.Service{
- EphemeralVolume: &root.Ephemeral.Containerd,
- }
- if err := supervisor.Run(ctx, "containerd", containerdSvc.Run); err != nil {
- return fmt.Errorf("failed to start containerd service: %w", err)
- }
-
- kubernetesConfig.KPKI = kpki
- kubernetesConfig.Root = root
- kubernetesConfig.Network = networkSvc
- kubeSvc := kubernetes.New(kubernetesConfig)
- if err := supervisor.Run(ctx, "kubernetes", kubeSvc.Run); err != nil {
- return fmt.Errorf("failed to start kubernetes service: %w", err)
+ // Start the role service. The role service connects to the curator and runs
+ // all node-specific role code (eg. Kubernetes services).
+ // supervisor.Logger(ctx).Infof("Starting role service...")
+ rs := roleserve.New(roleserve.Config{
+ CuratorDial: c.DialCluster,
+ StorageRoot: root,
+ Network: networkSvc,
+ KPKI: kpki,
+ NodeID: status.Credentials.ID(),
+ })
+ if err := supervisor.Run(ctx, "role", rs.Run); err != nil {
+ close(trapdoor)
+ return fmt.Errorf("failed to start role service: %w", err)
}
// Start the node debug service.
+ supervisor.Logger(ctx).Infof("Starting debug service...")
dbg := &debugService{
- cluster: m,
- logtree: lt,
- kubernetes: kubeSvc,
- traceLock: make(chan struct{}, 1),
+ roleserve: rs,
+ logtree: lt,
+ traceLock: make(chan struct{}, 1),
}
dbgSrv := grpc.NewServer()
apb.RegisterNodeDebugServiceServer(dbgSrv, dbg)