blob: 77b3bf2192672181e5f1d3d374cf1aff8f88ef04 [file] [log] [blame]
Serge Bazanski6dff6d62022-01-28 18:15:14 +01001package roleserve
2
3import (
4 "context"
5 "fmt"
6
7 ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
8 "source.monogon.dev/metropolis/pkg/supervisor"
9)
10
11// workerRoleFetch is the Role Fetcher, an internal bookkeeping service
12// responsible for populating localRoles based on a clusterMembership whenever
13// the node is HOME and cluster credentials / curator access is available.
14type workerRoleFetch struct {
15 clusterMembership *ClusterMembershipValue
16
17 // localRoles will be written.
18 localRoles *localRolesValue
19}
20
21func (s *workerRoleFetch) run(ctx context.Context) error {
22 w := s.clusterMembership.Watch()
23 defer w.Close()
24 supervisor.Logger(ctx).Infof("Waiting for cluster membership...")
25 cm, err := w.GetHome(ctx)
26 if err != nil {
27 return err
28 }
29 supervisor.Logger(ctx).Infof("Got cluster membership, starting...")
30
31 nodeID := cm.NodeID()
32 conn, err := cm.DialCurator()
33 if err != nil {
34 return err
35 }
36 defer conn.Close()
37 cur := ipb.NewCuratorClient(conn)
38
39 // Start watch for current node, update localRoles whenever we get something
40 // new.
41 srv, err := cur.Watch(ctx, &ipb.WatchRequest{Kind: &ipb.WatchRequest_NodeInCluster_{
42 NodeInCluster: &ipb.WatchRequest_NodeInCluster{
43 NodeId: nodeID,
44 },
45 }})
46 if err != nil {
47 return fmt.Errorf("watch failed: %w", err)
48 }
49 defer srv.CloseSend()
50
51 supervisor.Signal(ctx, supervisor.SignalHealthy)
52 for {
53 ev, err := srv.Recv()
54 if err != nil {
55 return fmt.Errorf("watch event receive failed: %w", err)
56 }
57 for _, n := range ev.Nodes {
58 n := n
59 // Skip spuriously sent other nodes.
60 if n.Id != nodeID {
61 continue
62 }
63 supervisor.Logger(ctx).Infof("Got new node data. Roles:")
64 if n.Roles.ConsensusMember != nil {
65 supervisor.Logger(ctx).Infof(" - control plane member, existing peers: %+v", n.Roles.ConsensusMember.Peers)
66 }
67 if n.Roles.KubernetesWorker != nil {
68 supervisor.Logger(ctx).Infof(" - kubernetes worker")
69 }
70 s.localRoles.set(n.Roles)
71 break
72 }
73 }
74}