Serge Bazanski | 6dff6d6 | 2022-01-28 18:15:14 +0100 | [diff] [blame] | 1 | package roleserve |
| 2 | |
| 3 | import ( |
| 4 | "context" |
| 5 | "fmt" |
| 6 | |
| 7 | ipb "source.monogon.dev/metropolis/node/core/curator/proto/api" |
| 8 | "source.monogon.dev/metropolis/pkg/supervisor" |
| 9 | ) |
| 10 | |
| 11 | // workerRoleFetch is the Role Fetcher, an internal bookkeeping service |
| 12 | // responsible for populating localRoles based on a clusterMembership whenever |
| 13 | // the node is HOME and cluster credentials / curator access is available. |
| 14 | type workerRoleFetch struct { |
| 15 | clusterMembership *ClusterMembershipValue |
| 16 | |
| 17 | // localRoles will be written. |
| 18 | localRoles *localRolesValue |
| 19 | } |
| 20 | |
| 21 | func (s *workerRoleFetch) run(ctx context.Context) error { |
| 22 | w := s.clusterMembership.Watch() |
| 23 | defer w.Close() |
| 24 | supervisor.Logger(ctx).Infof("Waiting for cluster membership...") |
| 25 | cm, err := w.GetHome(ctx) |
| 26 | if err != nil { |
| 27 | return err |
| 28 | } |
| 29 | supervisor.Logger(ctx).Infof("Got cluster membership, starting...") |
| 30 | |
| 31 | nodeID := cm.NodeID() |
| 32 | conn, err := cm.DialCurator() |
| 33 | if err != nil { |
| 34 | return err |
| 35 | } |
| 36 | defer conn.Close() |
| 37 | cur := ipb.NewCuratorClient(conn) |
| 38 | |
| 39 | // Start watch for current node, update localRoles whenever we get something |
| 40 | // new. |
| 41 | srv, err := cur.Watch(ctx, &ipb.WatchRequest{Kind: &ipb.WatchRequest_NodeInCluster_{ |
| 42 | NodeInCluster: &ipb.WatchRequest_NodeInCluster{ |
| 43 | NodeId: nodeID, |
| 44 | }, |
| 45 | }}) |
| 46 | if err != nil { |
| 47 | return fmt.Errorf("watch failed: %w", err) |
| 48 | } |
| 49 | defer srv.CloseSend() |
| 50 | |
| 51 | supervisor.Signal(ctx, supervisor.SignalHealthy) |
| 52 | for { |
| 53 | ev, err := srv.Recv() |
| 54 | if err != nil { |
| 55 | return fmt.Errorf("watch event receive failed: %w", err) |
| 56 | } |
| 57 | for _, n := range ev.Nodes { |
| 58 | n := n |
| 59 | // Skip spuriously sent other nodes. |
| 60 | if n.Id != nodeID { |
| 61 | continue |
| 62 | } |
| 63 | supervisor.Logger(ctx).Infof("Got new node data. Roles:") |
| 64 | if n.Roles.ConsensusMember != nil { |
| 65 | supervisor.Logger(ctx).Infof(" - control plane member, existing peers: %+v", n.Roles.ConsensusMember.Peers) |
| 66 | } |
| 67 | if n.Roles.KubernetesWorker != nil { |
| 68 | supervisor.Logger(ctx).Infof(" - kubernetes worker") |
| 69 | } |
| 70 | s.localRoles.set(n.Roles) |
| 71 | break |
| 72 | } |
| 73 | } |
| 74 | } |