blob: 3856f62f461e743cd11cb228282e3dbad1915094 [file] [log] [blame]
package roleserve
import (
"context"
"fmt"
ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
"source.monogon.dev/metropolis/pkg/supervisor"
)
// workerRoleFetch is the Role Fetcher, an internal bookkeeping service
// responsible for populating localRoles based on a clusterMembership whenever
// the node is HOME and cluster credentials / curator access is available.
type workerRoleFetch struct {
clusterMembership *ClusterMembershipValue
// localRoles will be written.
localRoles *localRolesValue
}
func (s *workerRoleFetch) run(ctx context.Context) error {
w := s.clusterMembership.Watch()
defer w.Close()
supervisor.Logger(ctx).Infof("Waiting for cluster membership...")
cm, err := w.GetHome(ctx)
if err != nil {
return err
}
supervisor.Logger(ctx).Infof("Got cluster membership, starting...")
nodeID := cm.NodeID()
conn, err := cm.DialCurator()
if err != nil {
return err
}
defer conn.Close()
cur := ipb.NewCuratorClient(conn)
// Start watch for current node, update localRoles whenever we get something
// new.
srv, err := cur.Watch(ctx, &ipb.WatchRequest{Kind: &ipb.WatchRequest_NodeInCluster_{
NodeInCluster: &ipb.WatchRequest_NodeInCluster{
NodeId: nodeID,
},
}})
if err != nil {
return fmt.Errorf("watch failed: %w", err)
}
defer srv.CloseSend()
supervisor.Signal(ctx, supervisor.SignalHealthy)
for {
ev, err := srv.Recv()
if err != nil {
return fmt.Errorf("watch event receive failed: %w", err)
}
for _, n := range ev.Nodes {
n := n
// Skip spuriously sent other nodes.
if n.Id != nodeID {
continue
}
supervisor.Logger(ctx).Infof("Got new node data. Roles:")
if n.Roles.ConsensusMember != nil {
supervisor.Logger(ctx).Infof(" - control plane member, existing peers: %+v", n.Roles.ConsensusMember.Peers)
}
if n.Roles.KubernetesController != nil {
supervisor.Logger(ctx).Infof(" - kubernetes controller")
}
if n.Roles.KubernetesWorker != nil {
supervisor.Logger(ctx).Infof(" - kubernetes worker")
}
s.localRoles.set(n.Roles)
break
}
}
}