m/n/c/cluster: implement register flow
Change-Id: I197cbfa96d34c9912c7fc19710db25276e7440fc
Reviewed-on: https://review.monogon.dev/c/monogon/+/454
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/node/core/main.go b/metropolis/node/core/main.go
index 5d495fa..0a63d48 100644
--- a/metropolis/node/core/main.go
+++ b/metropolis/node/core/main.go
@@ -144,71 +144,73 @@
return fmt.Errorf("new couldn't find home in new cluster, aborting: %w", err)
}
- // Here starts some hairy stopgap code. In the future, not all nodes will have
- // direct access to etcd (ie. the ability to retrieve an etcd client via
- // status.ConsensusClient).
- // However, we are not ready to implement this yet, as that would require
- // moving more logic into the curator (eg. some of the Kubernetes PKI logic).
+ // Currently, only the first node of the cluster runs etcd. That means only that
+ // node can run the curator, kubernetes and roleserver.
//
- // For now, we keep Kubernetes PKI initialization logic here, and just assume
- // that every node will have direct access to etcd.
+ // This is a temporary stopgap until we land a roleserver rewrite which fixes
+ // this.
- // Retrieve namespaced etcd KV clients for the two main direct etcd users:
- // - Curator
- // - Kubernetes PKI
- ckv, err := status.ConsensusClient(cluster.ConsensusUserCurator)
- if err != nil {
- close(trapdoor)
- return fmt.Errorf("failed to retrieve consensus curator client: %w", err)
- }
- kkv, err := status.ConsensusClient(cluster.ConsensusUserKubernetesPKI)
- if err != nil {
- close(trapdoor)
- return fmt.Errorf("failed to retrieve consensus kubernetes PKI client: %w", err)
- }
+ var rs *roleserve.Service
+ if status.HasLocalConsensus {
+ // Retrieve namespaced etcd KV clients for the two main direct etcd users:
+ // - Curator
+ // - Kubernetes PKI
+ ckv, err := status.ConsensusClient(cluster.ConsensusUserCurator)
+ if err != nil {
+ close(trapdoor)
+ return fmt.Errorf("failed to retrieve consensus curator client: %w", err)
+ }
+ kkv, err := status.ConsensusClient(cluster.ConsensusUserKubernetesPKI)
+ if err != nil {
+ close(trapdoor)
+ return fmt.Errorf("failed to retrieve consensus kubernetes PKI client: %w", err)
+ }
- // TODO(q3k): restart curator on credentials change?
+ // TODO(q3k): restart curator on credentials change?
- // Start cluster curator. The cluster curator is responsible for lifecycle
- // management of the cluster.
- // In the future, this will only be started on nodes that run etcd.
- c := curator.New(curator.Config{
- Etcd: ckv,
- NodeCredentials: status.Credentials,
- // TODO(q3k): make this configurable?
- LeaderTTL: time.Second * 5,
- Directory: &root.Ephemeral.Curator,
- })
- if err := supervisor.Run(ctx, "curator", c.Run); err != nil {
- close(trapdoor)
- return fmt.Errorf("when starting curator: %w", err)
- }
+ // Start cluster curator. The cluster curator is responsible for lifecycle
+ // management of the cluster.
+ // In the future, this will only be started on nodes that run etcd.
+ c := curator.New(curator.Config{
+ Etcd: ckv,
+ NodeCredentials: status.Credentials,
+ // TODO(q3k): make this configurable?
+ LeaderTTL: time.Second * 5,
+ Directory: &root.Ephemeral.Curator,
+ })
+ if err := supervisor.Run(ctx, "curator", c.Run); err != nil {
+ close(trapdoor)
+ return fmt.Errorf("when starting curator: %w", err)
+ }
- // We are now in a cluster. We can thus access our 'node' object and
- // start all services that we should be running.
- logger.Info("Enrolment success, continuing startup.")
+ // We are now in a cluster. We can thus access our 'node' object and
+ // start all services that we should be running.
+ logger.Info("Enrolment success, continuing startup.")
- // Ensure Kubernetes PKI objects exist in etcd. In the future, this logic will
- // be implemented in the curator.
- kpki := pki.New(lt.MustLeveledFor("pki.kubernetes"), kkv)
- if err := kpki.EnsureAll(ctx); err != nil {
- close(trapdoor)
- return fmt.Errorf("failed to ensure kubernetes PKI present: %w", err)
- }
+ // Ensure Kubernetes PKI objects exist in etcd. In the future, this logic will
+ // be implemented in the curator.
+ kpki := pki.New(lt.MustLeveledFor("pki.kubernetes"), kkv)
+ if err := kpki.EnsureAll(ctx); err != nil {
+ close(trapdoor)
+ return fmt.Errorf("failed to ensure kubernetes PKI present: %w", err)
+ }
- // Start the role service. The role service connects to the curator and runs
- // all node-specific role code (eg. Kubernetes services).
- // supervisor.Logger(ctx).Infof("Starting role service...")
- rs := roleserve.New(roleserve.Config{
- CuratorDial: c.DialCluster,
- StorageRoot: root,
- Network: networkSvc,
- KPKI: kpki,
- NodeID: status.Credentials.ID(),
- })
- if err := supervisor.Run(ctx, "role", rs.Run); err != nil {
- close(trapdoor)
- return fmt.Errorf("failed to start role service: %w", err)
+ // Start the role service. The role service connects to the curator and runs
+ // all node-specific role code (eg. Kubernetes services).
+ // supervisor.Logger(ctx).Infof("Starting role service...")
+ rs = roleserve.New(roleserve.Config{
+ CuratorDial: c.DialCluster,
+ StorageRoot: root,
+ Network: networkSvc,
+ KPKI: kpki,
+ NodeID: status.Credentials.ID(),
+ })
+ if err := supervisor.Run(ctx, "role", rs.Run); err != nil {
+ close(trapdoor)
+ return fmt.Errorf("failed to start role service: %w", err)
+ }
+ } else {
+ logger.Warningf("TODO(q3k): Dummy node - will not run Kubernetes/Curator/Roleserver...")
}
// Start the node debug service.