blob: bed0a3cb5db37521054f8ea98e460865234e53bc [file] [log] [blame]
Serge Bazanski0d937772021-06-17 15:54:40 +02001// package roleserve implements the roleserver/“Role Server”.
2//
3// The Role Server is responsible for watching the cluster/node state and
4// starting/stopping any 'application' code (also called workload code) based on
5// the Node's roles.
6//
7// Each workload code (which would usually be a supervisor runnable) is started
8// by a dedicated 'launcher'. These launchers wait for node roles to be
9// available from the curator, and either start the related workload sub-runners
10// or do nothing ; then they declare themselves as healthy to the supervisor. If
11// at any point the role of the node changes (meaning that the node should now
12// start or stop the workloads) the launcher just exits and the supervisor
13// will restart it.
14//
15// Currently, this is used to start up the Kubernetes worker code.
16package roleserve
17
18import (
19 "context"
20 "fmt"
21
22 "google.golang.org/grpc"
23 cpb "source.monogon.dev/metropolis/node/core/curator/proto/api"
24 "source.monogon.dev/metropolis/node/core/localstorage"
25 "source.monogon.dev/metropolis/node/core/network"
26 "source.monogon.dev/metropolis/node/kubernetes"
27 "source.monogon.dev/metropolis/node/kubernetes/pki"
28 "source.monogon.dev/metropolis/pkg/event"
29 "source.monogon.dev/metropolis/pkg/event/memory"
30 "source.monogon.dev/metropolis/pkg/supervisor"
31)
32
33// Config is the configuration of the role server.
34type Config struct {
35 // CuratorDial is a function that the roleserver will use to dial the curator.
36 // As both the curator listener and roleserver might restart, this dial function
37 // is needed to possibly re-establish connectivity after a full restart of
38 // either.
39 CuratorDial func(ctx context.Context) (*grpc.ClientConn, error)
40
41 // StorageRoot is a handle to access all of the Node's storage. This is needed
42 // as the roleserver spawns complex workloads like Kubernetes which need access
43 // to a broad range of storage.
44 StorageRoot *localstorage.Root
45
46 // Network is a handle to the network service, used by workloads.
47 Network *network.Service
48
49 // KPKI is a handle to initialized Kubernetes PKI stored on etcd. In the future
50 // this will probably be provisioned by the Kubernetes workload itself.
51 KPKI *pki.PKI
52
53 // NodeID is the node ID on which the roleserver is running.
54 NodeID string
55}
56
57// Service is the roleserver/“Role Server” service. See the package-level
58// documentation for more details.
59type Service struct {
60 Config
61
62 value memory.Value
63
64 // kwC is a channel populated with updates to the local Node object from the
65 // curator, passed over to the Kubernetes Worker launcher.
66 kwC chan *cpb.Node
67 // kwSvcC is a channel populated by the Kubernetes Worker launcher when the
68 // service is started (which then contains the value of spawned Kubernetes
69 // workload service) or stopped (which is then nil).
70 kwSvcC chan *kubernetes.Service
71
72 // gRPC channel to curator, acquired via Config.CuratorDial, active for the
73 // lifecycle of the Service runnable. It's used by the updater
74 // sub-runnable.
75 curator cpb.CuratorClient
76}
77
78// Status is updated by the role service any time one of the subordinate
79// workload services is started or stopped.
80type Status struct {
81 // Kubernetes is set to the Kubernetes workload Service if started/restarted or
82 // nil if stopped.
83 Kubernetes *kubernetes.Service
84}
85
86// New creates a Role Server services from a Config.
87func New(c Config) *Service {
88 return &Service{
89 Config: c,
90 kwC: make(chan *cpb.Node),
91 kwSvcC: make(chan *kubernetes.Service),
92 }
93}
94
95type Watcher struct {
96 event.Watcher
97}
98
99func (s *Service) Watch() Watcher {
100 return Watcher{
101 Watcher: s.value.Watch(),
102 }
103}
104
105func (w *Watcher) Get(ctx context.Context) (*Status, error) {
106 v, err := w.Watcher.Get(ctx)
107 if err != nil {
108 return nil, err
109 }
110 st := v.(Status)
111 return &st, nil
112}
113
114// Run the Role Server service, which uses intermediary workload launchers to
115// start/stop subordinate services as the Node's roles change.
116func (s *Service) Run(ctx context.Context) error {
117 supervisor.Logger(ctx).Info("Dialing curator...")
118 conn, err := s.CuratorDial(ctx)
119 if err != nil {
120 return fmt.Errorf("could not dial cluster curator: %w", err)
121 }
122 defer conn.Close()
123 s.curator = cpb.NewCuratorClient(conn)
124
125 if err := supervisor.Run(ctx, "updater", s.runUpdater); err != nil {
126 return fmt.Errorf("failed to launch updater: %w", err)
127 }
128
129 if err := supervisor.Run(ctx, "kubernetes-worker", s.runKubernetesWorkerLauncher); err != nil {
130 return fmt.Errorf("failed to start kubernetes-worker launcher: %w", err)
131 }
132
133 supervisor.Signal(ctx, supervisor.SignalHealthy)
134
135 status := Status{}
136 for {
137 select {
138 case <-ctx.Done():
139 return ctx.Err()
140 case svc := <-s.kwSvcC:
141 status.Kubernetes = svc
142 s.value.Set(status)
143 }
144 }
145}
146
147// runUpdater runs the updater, a runnable which watchers the cluster via
148// curator for any pertinent node changes and distributes them to respective
149// workload launchers.
150//
151// TODO(q3k): this should probably be implemented somewhere as a curator client
152// library, maybe one that implements the Event Value interface.
153func (s *Service) runUpdater(ctx context.Context) error {
154 srv, err := s.curator.Watch(ctx, &cpb.WatchRequest{Kind: &cpb.WatchRequest_NodeInCluster_{
155 NodeInCluster: &cpb.WatchRequest_NodeInCluster{
156 NodeId: s.NodeID,
157 },
158 }})
159 if err != nil {
160 return fmt.Errorf("watch failed: %w", err)
161 }
162 defer srv.CloseSend()
163
164 supervisor.Signal(ctx, supervisor.SignalHealthy)
165 for {
166 ev, err := srv.Recv()
167 if err != nil {
168 return fmt.Errorf("watch event receive failed: %w", err)
169 }
170 supervisor.Logger(ctx).Infof("Received node event: %+v", ev)
171 for _, node := range ev.Nodes {
172 if node.Id != s.NodeID {
173 continue
174 }
175 s.kwC <- node
176 }
177 }
178
179}