Serge Bazanski | 6dff6d6 | 2022-01-28 18:15:14 +0100 | [diff] [blame] | 1 | package roleserve |
| 2 | |
| 3 | import ( |
| 4 | "context" |
| 5 | "fmt" |
| 6 | |
| 7 | ipb "source.monogon.dev/metropolis/node/core/curator/proto/api" |
Serge Bazanski | 37110c3 | 2023-03-01 13:57:27 +0000 | [diff] [blame] | 8 | "source.monogon.dev/metropolis/pkg/event/memory" |
Serge Bazanski | 6dff6d6 | 2022-01-28 18:15:14 +0100 | [diff] [blame] | 9 | "source.monogon.dev/metropolis/pkg/supervisor" |
Serge Bazanski | 37110c3 | 2023-03-01 13:57:27 +0000 | [diff] [blame] | 10 | cpb "source.monogon.dev/metropolis/proto/common" |
Serge Bazanski | 6dff6d6 | 2022-01-28 18:15:14 +0100 | [diff] [blame] | 11 | ) |
| 12 | |
| 13 | // workerRoleFetch is the Role Fetcher, an internal bookkeeping service |
Serge Bazanski | fe3d8fd | 2023-05-30 20:50:09 +0200 | [diff] [blame^] | 14 | // responsible for populating localRoles based on a curatorConnection whenever |
Serge Bazanski | 6dff6d6 | 2022-01-28 18:15:14 +0100 | [diff] [blame] | 15 | // the node is HOME and cluster credentials / curator access is available. |
| 16 | type workerRoleFetch struct { |
Serge Bazanski | fe3d8fd | 2023-05-30 20:50:09 +0200 | [diff] [blame^] | 17 | curatorConnection *memory.Value[*curatorConnection] |
Serge Bazanski | 6dff6d6 | 2022-01-28 18:15:14 +0100 | [diff] [blame] | 18 | |
| 19 | // localRoles will be written. |
Serge Bazanski | 37110c3 | 2023-03-01 13:57:27 +0000 | [diff] [blame] | 20 | localRoles *memory.Value[*cpb.NodeRoles] |
Serge Bazanski | 6dff6d6 | 2022-01-28 18:15:14 +0100 | [diff] [blame] | 21 | } |
| 22 | |
| 23 | func (s *workerRoleFetch) run(ctx context.Context) error { |
Serge Bazanski | fe3d8fd | 2023-05-30 20:50:09 +0200 | [diff] [blame^] | 24 | w := s.curatorConnection.Watch() |
Serge Bazanski | 6dff6d6 | 2022-01-28 18:15:14 +0100 | [diff] [blame] | 25 | defer w.Close() |
Serge Bazanski | fe3d8fd | 2023-05-30 20:50:09 +0200 | [diff] [blame^] | 26 | supervisor.Logger(ctx).Infof("Waiting for curator connection...") |
| 27 | cc, err := w.Get(ctx) |
Serge Bazanski | 6dff6d6 | 2022-01-28 18:15:14 +0100 | [diff] [blame] | 28 | if err != nil { |
| 29 | return err |
| 30 | } |
Serge Bazanski | fe3d8fd | 2023-05-30 20:50:09 +0200 | [diff] [blame^] | 31 | supervisor.Logger(ctx).Infof("Got curator connection, starting...") |
Serge Bazanski | 6dff6d6 | 2022-01-28 18:15:14 +0100 | [diff] [blame] | 32 | |
Serge Bazanski | fe3d8fd | 2023-05-30 20:50:09 +0200 | [diff] [blame^] | 33 | nodeID := cc.nodeID() |
| 34 | cur := ipb.NewCuratorClient(cc.conn) |
Serge Bazanski | 6dff6d6 | 2022-01-28 18:15:14 +0100 | [diff] [blame] | 35 | |
| 36 | // Start watch for current node, update localRoles whenever we get something |
| 37 | // new. |
| 38 | srv, err := cur.Watch(ctx, &ipb.WatchRequest{Kind: &ipb.WatchRequest_NodeInCluster_{ |
| 39 | NodeInCluster: &ipb.WatchRequest_NodeInCluster{ |
| 40 | NodeId: nodeID, |
| 41 | }, |
| 42 | }}) |
| 43 | if err != nil { |
| 44 | return fmt.Errorf("watch failed: %w", err) |
| 45 | } |
| 46 | defer srv.CloseSend() |
| 47 | |
| 48 | supervisor.Signal(ctx, supervisor.SignalHealthy) |
| 49 | for { |
| 50 | ev, err := srv.Recv() |
| 51 | if err != nil { |
| 52 | return fmt.Errorf("watch event receive failed: %w", err) |
| 53 | } |
| 54 | for _, n := range ev.Nodes { |
| 55 | n := n |
| 56 | // Skip spuriously sent other nodes. |
| 57 | if n.Id != nodeID { |
| 58 | continue |
| 59 | } |
| 60 | supervisor.Logger(ctx).Infof("Got new node data. Roles:") |
| 61 | if n.Roles.ConsensusMember != nil { |
| 62 | supervisor.Logger(ctx).Infof(" - control plane member, existing peers: %+v", n.Roles.ConsensusMember.Peers) |
| 63 | } |
Serge Bazanski | 15f7f63 | 2023-03-14 17:17:20 +0100 | [diff] [blame] | 64 | if n.Roles.KubernetesController != nil { |
| 65 | supervisor.Logger(ctx).Infof(" - kubernetes controller") |
| 66 | } |
Serge Bazanski | 6dff6d6 | 2022-01-28 18:15:14 +0100 | [diff] [blame] | 67 | if n.Roles.KubernetesWorker != nil { |
| 68 | supervisor.Logger(ctx).Infof(" - kubernetes worker") |
| 69 | } |
Serge Bazanski | 37110c3 | 2023-03-01 13:57:27 +0000 | [diff] [blame] | 70 | s.localRoles.Set(n.Roles) |
Serge Bazanski | 6dff6d6 | 2022-01-28 18:15:14 +0100 | [diff] [blame] | 71 | break |
| 72 | } |
| 73 | } |
| 74 | } |