blob: fb1fb335cb27c5a40b20b6dcddc565e67e0adb90 [file] [log] [blame]
Serge Bazanski6dff6d62022-01-28 18:15:14 +01001package roleserve
2
3import (
4 "context"
5 "fmt"
6
7 ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
Serge Bazanski37110c32023-03-01 13:57:27 +00008 "source.monogon.dev/metropolis/pkg/event/memory"
Serge Bazanski6dff6d62022-01-28 18:15:14 +01009 "source.monogon.dev/metropolis/pkg/supervisor"
Serge Bazanski37110c32023-03-01 13:57:27 +000010 cpb "source.monogon.dev/metropolis/proto/common"
Serge Bazanski6dff6d62022-01-28 18:15:14 +010011)
12
13// workerRoleFetch is the Role Fetcher, an internal bookkeeping service
Serge Bazanskife3d8fd2023-05-30 20:50:09 +020014// responsible for populating localRoles based on a curatorConnection whenever
Serge Bazanski6dff6d62022-01-28 18:15:14 +010015// the node is HOME and cluster credentials / curator access is available.
16type workerRoleFetch struct {
Serge Bazanskife3d8fd2023-05-30 20:50:09 +020017 curatorConnection *memory.Value[*curatorConnection]
Serge Bazanski6dff6d62022-01-28 18:15:14 +010018
19 // localRoles will be written.
Serge Bazanski37110c32023-03-01 13:57:27 +000020 localRoles *memory.Value[*cpb.NodeRoles]
Serge Bazanski6dff6d62022-01-28 18:15:14 +010021}
22
23func (s *workerRoleFetch) run(ctx context.Context) error {
Serge Bazanskife3d8fd2023-05-30 20:50:09 +020024 w := s.curatorConnection.Watch()
Serge Bazanski6dff6d62022-01-28 18:15:14 +010025 defer w.Close()
Serge Bazanskife3d8fd2023-05-30 20:50:09 +020026 supervisor.Logger(ctx).Infof("Waiting for curator connection...")
27 cc, err := w.Get(ctx)
Serge Bazanski6dff6d62022-01-28 18:15:14 +010028 if err != nil {
29 return err
30 }
Serge Bazanskife3d8fd2023-05-30 20:50:09 +020031 supervisor.Logger(ctx).Infof("Got curator connection, starting...")
Serge Bazanski6dff6d62022-01-28 18:15:14 +010032
Serge Bazanskife3d8fd2023-05-30 20:50:09 +020033 nodeID := cc.nodeID()
34 cur := ipb.NewCuratorClient(cc.conn)
Serge Bazanski6dff6d62022-01-28 18:15:14 +010035
36 // Start watch for current node, update localRoles whenever we get something
37 // new.
38 srv, err := cur.Watch(ctx, &ipb.WatchRequest{Kind: &ipb.WatchRequest_NodeInCluster_{
39 NodeInCluster: &ipb.WatchRequest_NodeInCluster{
40 NodeId: nodeID,
41 },
42 }})
43 if err != nil {
44 return fmt.Errorf("watch failed: %w", err)
45 }
46 defer srv.CloseSend()
47
48 supervisor.Signal(ctx, supervisor.SignalHealthy)
49 for {
50 ev, err := srv.Recv()
51 if err != nil {
52 return fmt.Errorf("watch event receive failed: %w", err)
53 }
54 for _, n := range ev.Nodes {
55 n := n
56 // Skip spuriously sent other nodes.
57 if n.Id != nodeID {
58 continue
59 }
60 supervisor.Logger(ctx).Infof("Got new node data. Roles:")
61 if n.Roles.ConsensusMember != nil {
62 supervisor.Logger(ctx).Infof(" - control plane member, existing peers: %+v", n.Roles.ConsensusMember.Peers)
63 }
Serge Bazanski15f7f632023-03-14 17:17:20 +010064 if n.Roles.KubernetesController != nil {
65 supervisor.Logger(ctx).Infof(" - kubernetes controller")
66 }
Serge Bazanski6dff6d62022-01-28 18:15:14 +010067 if n.Roles.KubernetesWorker != nil {
68 supervisor.Logger(ctx).Infof(" - kubernetes worker")
69 }
Serge Bazanski37110c32023-03-01 13:57:27 +000070 s.localRoles.Set(n.Roles)
Serge Bazanski6dff6d62022-01-28 18:15:14 +010071 break
72 }
73 }
74}