blob: 40d2ffdd6ed1bc42f63c3584c7d95302fd85bbdc [file] [log] [blame]
Serge Bazanski6dff6d62022-01-28 18:15:14 +01001package roleserve
2
3import (
4 "context"
5 "fmt"
6
Serge Bazanski186109c2023-06-21 16:57:36 +02007 "google.golang.org/protobuf/proto"
8
9 "source.monogon.dev/metropolis/node/core/localstorage"
Serge Bazanski37110c32023-03-01 13:57:27 +000010 "source.monogon.dev/metropolis/pkg/event/memory"
Serge Bazanski6dff6d62022-01-28 18:15:14 +010011 "source.monogon.dev/metropolis/pkg/supervisor"
Serge Bazanski186109c2023-06-21 16:57:36 +020012
13 ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
Serge Bazanski37110c32023-03-01 13:57:27 +000014 cpb "source.monogon.dev/metropolis/proto/common"
Serge Bazanski6dff6d62022-01-28 18:15:14 +010015)
16
17// workerRoleFetch is the Role Fetcher, an internal bookkeeping service
Serge Bazanskife3d8fd2023-05-30 20:50:09 +020018// responsible for populating localRoles based on a curatorConnection whenever
Serge Bazanski6dff6d62022-01-28 18:15:14 +010019// the node is HOME and cluster credentials / curator access is available.
20type workerRoleFetch struct {
Serge Bazanski186109c2023-06-21 16:57:36 +020021 storageRoot *localstorage.Root
Serge Bazanskife3d8fd2023-05-30 20:50:09 +020022 curatorConnection *memory.Value[*curatorConnection]
Serge Bazanski6dff6d62022-01-28 18:15:14 +010023
24 // localRoles will be written.
Serge Bazanski37110c32023-03-01 13:57:27 +000025 localRoles *memory.Value[*cpb.NodeRoles]
Serge Bazanski6dff6d62022-01-28 18:15:14 +010026}
27
28func (s *workerRoleFetch) run(ctx context.Context) error {
Serge Bazanski186109c2023-06-21 16:57:36 +020029 // Wait for a 'curator connection' first. This isn't a real connection but just a
30 // set of credentials and a potentially working resolver. Here, we just use its
31 // presence as a marking that the local disk has been mounted and thus we can
32 // attempt to read a persisted cluster directory.
Serge Bazanskife3d8fd2023-05-30 20:50:09 +020033 w := s.curatorConnection.Watch()
Serge Bazanski186109c2023-06-21 16:57:36 +020034 _, err := w.Get(ctx)
Serge Bazanski6dff6d62022-01-28 18:15:14 +010035 if err != nil {
Serge Bazanski186109c2023-06-21 16:57:36 +020036 w.Close()
Serge Bazanski6dff6d62022-01-28 18:15:14 +010037 return err
38 }
Serge Bazanski186109c2023-06-21 16:57:36 +020039 w.Close()
Serge Bazanski6dff6d62022-01-28 18:15:14 +010040
Serge Bazanski186109c2023-06-21 16:57:36 +020041 // Read persisted roles if available.
42 exists, _ := s.storageRoot.Data.Node.PersistedRoles.Exists()
43 if exists {
44 supervisor.Logger(ctx).Infof("Attempting to read persisted node roles...")
45 data, err := s.storageRoot.Data.Node.PersistedRoles.Read()
Serge Bazanski6dff6d62022-01-28 18:15:14 +010046 if err != nil {
Serge Bazanski186109c2023-06-21 16:57:36 +020047 supervisor.Logger(ctx).Errorf("Failed to read persisted roles: %w", err)
48 } else {
49 var nr cpb.NodeRoles
50 if err := proto.Unmarshal(data, &nr); err != nil {
51 supervisor.Logger(ctx).Errorf("Failed to unmarshal persisted roles: %w", err)
52 } else {
53 supervisor.Logger(ctx).Infof("Got persisted role data from disk:")
Serge Bazanski6dff6d62022-01-28 18:15:14 +010054 }
Serge Bazanski186109c2023-06-21 16:57:36 +020055 if nr.ConsensusMember != nil {
56 supervisor.Logger(ctx).Infof(" - control plane member, existing peers: %+v", nr.ConsensusMember.Peers)
Serge Bazanski6dff6d62022-01-28 18:15:14 +010057 }
Serge Bazanski186109c2023-06-21 16:57:36 +020058 if nr.KubernetesController != nil {
Serge Bazanski15f7f632023-03-14 17:17:20 +010059 supervisor.Logger(ctx).Infof(" - kubernetes controller")
60 }
Serge Bazanski186109c2023-06-21 16:57:36 +020061 if nr.KubernetesWorker != nil {
Serge Bazanski6dff6d62022-01-28 18:15:14 +010062 supervisor.Logger(ctx).Infof(" - kubernetes worker")
63 }
Serge Bazanski186109c2023-06-21 16:57:36 +020064 s.localRoles.Set(&nr)
Serge Bazanski6dff6d62022-01-28 18:15:14 +010065 }
Serge Bazanski186109c2023-06-21 16:57:36 +020066 } else {
67 supervisor.Logger(ctx).Infof("No persisted node roles.")
Serge Bazanski6dff6d62022-01-28 18:15:14 +010068 }
Serge Bazanski186109c2023-06-21 16:57:36 +020069
70 // Run networked part in a sub-runnable so that network errors don't cause us to
71 // retry the above and make us possibly trigger spurious restarts.
72 supervisor.Run(ctx, "watcher", func(ctx context.Context) error {
73 w := s.curatorConnection.Watch()
74 defer w.Close()
75 cc, err := w.Get(ctx)
76 if err != nil {
77 return err
78 }
79
80 nodeID := cc.nodeID()
81 cur := ipb.NewCuratorClient(cc.conn)
82
83 // Start watch for current node, update localRoles whenever we get something
84 // new.
85 srv, err := cur.Watch(ctx, &ipb.WatchRequest{Kind: &ipb.WatchRequest_NodeInCluster_{
86 NodeInCluster: &ipb.WatchRequest_NodeInCluster{
87 NodeId: nodeID,
88 },
89 }})
90 if err != nil {
91 return fmt.Errorf("watch failed: %w", err)
92 }
93 defer srv.CloseSend()
94
95 supervisor.Signal(ctx, supervisor.SignalHealthy)
96 for {
97 ev, err := srv.Recv()
98 if err != nil {
99 return fmt.Errorf("watch event receive failed: %w", err)
100 }
101 for _, n := range ev.Nodes {
102 n := n
103 // Skip spuriously sent other nodes.
104 if n.Id != nodeID {
105 continue
106 }
107 supervisor.Logger(ctx).Infof("Got new node data. Roles:")
108 if n.Roles.ConsensusMember != nil {
109 supervisor.Logger(ctx).Infof(" - control plane member, existing peers: %+v", n.Roles.ConsensusMember.Peers)
110 }
111 if n.Roles.KubernetesController != nil {
112 supervisor.Logger(ctx).Infof(" - kubernetes controller")
113 }
114 if n.Roles.KubernetesWorker != nil {
115 supervisor.Logger(ctx).Infof(" - kubernetes worker")
116 }
117 s.localRoles.Set(n.Roles)
118
119 // Persist role data to disk.
120 bytes, err := proto.Marshal(n.Roles)
121 if err != nil {
122 supervisor.Logger(ctx).Errorf("Failed to marshal node roles: %w", err)
123 } else {
124 err = s.storageRoot.Data.Node.PersistedRoles.Write(bytes, 0400)
125 if err != nil {
126 supervisor.Logger(ctx).Errorf("Failed to write node roles: %w", err)
127 }
128 }
129 break
130 }
131 }
132 })
133
134 supervisor.Signal(ctx, supervisor.SignalHealthy)
135 <-ctx.Done()
136 return ctx.Err()
137
Serge Bazanski6dff6d62022-01-28 18:15:14 +0100138}