blob: d7e3d2f3893f9c8769c48a31c5348417dfc3c5f3 [file] [log] [blame]
Serge Bazanski0d937772021-06-17 15:54:40 +02001// package roleserve implements the roleserver/“Role Server”.
2//
3// The Role Server is responsible for watching the cluster/node state and
4// starting/stopping any 'application' code (also called workload code) based on
5// the Node's roles.
6//
7// Each workload code (which would usually be a supervisor runnable) is started
8// by a dedicated 'launcher'. These launchers wait for node roles to be
9// available from the curator, and either start the related workload sub-runners
10// or do nothing ; then they declare themselves as healthy to the supervisor. If
11// at any point the role of the node changes (meaning that the node should now
12// start or stop the workloads) the launcher just exits and the supervisor
13// will restart it.
14//
15// Currently, this is used to start up the Kubernetes worker code.
16package roleserve
17
18import (
19 "context"
20 "fmt"
21
22 "google.golang.org/grpc"
Serge Bazanski96043bc2021-10-05 12:10:13 +020023
Serge Bazanski0d937772021-06-17 15:54:40 +020024 cpb "source.monogon.dev/metropolis/node/core/curator/proto/api"
25 "source.monogon.dev/metropolis/node/core/localstorage"
26 "source.monogon.dev/metropolis/node/core/network"
27 "source.monogon.dev/metropolis/node/kubernetes"
28 "source.monogon.dev/metropolis/node/kubernetes/pki"
29 "source.monogon.dev/metropolis/pkg/event"
30 "source.monogon.dev/metropolis/pkg/event/memory"
31 "source.monogon.dev/metropolis/pkg/supervisor"
32)
33
34// Config is the configuration of the role server.
35type Config struct {
36 // CuratorDial is a function that the roleserver will use to dial the curator.
37 // As both the curator listener and roleserver might restart, this dial function
38 // is needed to possibly re-establish connectivity after a full restart of
39 // either.
40 CuratorDial func(ctx context.Context) (*grpc.ClientConn, error)
41
42 // StorageRoot is a handle to access all of the Node's storage. This is needed
43 // as the roleserver spawns complex workloads like Kubernetes which need access
44 // to a broad range of storage.
45 StorageRoot *localstorage.Root
46
47 // Network is a handle to the network service, used by workloads.
48 Network *network.Service
49
50 // KPKI is a handle to initialized Kubernetes PKI stored on etcd. In the future
51 // this will probably be provisioned by the Kubernetes workload itself.
52 KPKI *pki.PKI
53
54 // NodeID is the node ID on which the roleserver is running.
55 NodeID string
56}
57
58// Service is the roleserver/“Role Server” service. See the package-level
59// documentation for more details.
60type Service struct {
61 Config
62
63 value memory.Value
64
65 // kwC is a channel populated with updates to the local Node object from the
66 // curator, passed over to the Kubernetes Worker launcher.
67 kwC chan *cpb.Node
68 // kwSvcC is a channel populated by the Kubernetes Worker launcher when the
69 // service is started (which then contains the value of spawned Kubernetes
70 // workload service) or stopped (which is then nil).
71 kwSvcC chan *kubernetes.Service
72
73 // gRPC channel to curator, acquired via Config.CuratorDial, active for the
74 // lifecycle of the Service runnable. It's used by the updater
75 // sub-runnable.
76 curator cpb.CuratorClient
77}
78
79// Status is updated by the role service any time one of the subordinate
80// workload services is started or stopped.
81type Status struct {
82 // Kubernetes is set to the Kubernetes workload Service if started/restarted or
83 // nil if stopped.
84 Kubernetes *kubernetes.Service
85}
86
87// New creates a Role Server services from a Config.
88func New(c Config) *Service {
89 return &Service{
90 Config: c,
91 kwC: make(chan *cpb.Node),
92 kwSvcC: make(chan *kubernetes.Service),
93 }
94}
95
96type Watcher struct {
97 event.Watcher
98}
99
100func (s *Service) Watch() Watcher {
101 return Watcher{
102 Watcher: s.value.Watch(),
103 }
104}
105
106func (w *Watcher) Get(ctx context.Context) (*Status, error) {
107 v, err := w.Watcher.Get(ctx)
108 if err != nil {
109 return nil, err
110 }
111 st := v.(Status)
112 return &st, nil
113}
114
115// Run the Role Server service, which uses intermediary workload launchers to
116// start/stop subordinate services as the Node's roles change.
117func (s *Service) Run(ctx context.Context) error {
118 supervisor.Logger(ctx).Info("Dialing curator...")
119 conn, err := s.CuratorDial(ctx)
120 if err != nil {
121 return fmt.Errorf("could not dial cluster curator: %w", err)
122 }
123 defer conn.Close()
124 s.curator = cpb.NewCuratorClient(conn)
125
126 if err := supervisor.Run(ctx, "updater", s.runUpdater); err != nil {
127 return fmt.Errorf("failed to launch updater: %w", err)
128 }
129
Serge Bazanskibf68fa92021-10-05 17:53:58 +0200130 if err := supervisor.Run(ctx, "cluster-agent", s.runClusterAgent); err != nil {
131 return fmt.Errorf("failed to launch cluster agent: %w", err)
132 }
133
Serge Bazanski0d937772021-06-17 15:54:40 +0200134 if err := supervisor.Run(ctx, "kubernetes-worker", s.runKubernetesWorkerLauncher); err != nil {
135 return fmt.Errorf("failed to start kubernetes-worker launcher: %w", err)
136 }
137
138 supervisor.Signal(ctx, supervisor.SignalHealthy)
139
140 status := Status{}
141 for {
142 select {
143 case <-ctx.Done():
144 return ctx.Err()
145 case svc := <-s.kwSvcC:
146 status.Kubernetes = svc
147 s.value.Set(status)
148 }
149 }
150}
151
152// runUpdater runs the updater, a runnable which watchers the cluster via
153// curator for any pertinent node changes and distributes them to respective
154// workload launchers.
155//
156// TODO(q3k): this should probably be implemented somewhere as a curator client
157// library, maybe one that implements the Event Value interface.
158func (s *Service) runUpdater(ctx context.Context) error {
159 srv, err := s.curator.Watch(ctx, &cpb.WatchRequest{Kind: &cpb.WatchRequest_NodeInCluster_{
160 NodeInCluster: &cpb.WatchRequest_NodeInCluster{
161 NodeId: s.NodeID,
162 },
163 }})
164 if err != nil {
165 return fmt.Errorf("watch failed: %w", err)
166 }
167 defer srv.CloseSend()
168
169 supervisor.Signal(ctx, supervisor.SignalHealthy)
170 for {
171 ev, err := srv.Recv()
172 if err != nil {
173 return fmt.Errorf("watch event receive failed: %w", err)
174 }
175 supervisor.Logger(ctx).Infof("Received node event: %+v", ev)
176 for _, node := range ev.Nodes {
177 if node.Id != s.NodeID {
178 continue
179 }
180 s.kwC <- node
181 }
182 }
183
184}