blob: 7107674ce39d1fcfc6938fbe74a542ca36470d0c [file] [log] [blame]
Serge Bazanski0d937772021-06-17 15:54:40 +02001// package roleserve implements the roleserver/“Role Server”.
2//
3// The Role Server is responsible for watching the cluster/node state and
4// starting/stopping any 'application' code (also called workload code) based on
5// the Node's roles.
6//
7// Each workload code (which would usually be a supervisor runnable) is started
8// by a dedicated 'launcher'. These launchers wait for node roles to be
9// available from the curator, and either start the related workload sub-runners
10// or do nothing ; then they declare themselves as healthy to the supervisor. If
11// at any point the role of the node changes (meaning that the node should now
12// start or stop the workloads) the launcher just exits and the supervisor
13// will restart it.
14//
15// Currently, this is used to start up the Kubernetes worker code.
16package roleserve
17
18import (
19 "context"
20 "fmt"
21
22 "google.golang.org/grpc"
Serge Bazanski96043bc2021-10-05 12:10:13 +020023
Serge Bazanski0d937772021-06-17 15:54:40 +020024 cpb "source.monogon.dev/metropolis/node/core/curator/proto/api"
25 "source.monogon.dev/metropolis/node/core/localstorage"
26 "source.monogon.dev/metropolis/node/core/network"
27 "source.monogon.dev/metropolis/node/kubernetes"
28 "source.monogon.dev/metropolis/node/kubernetes/pki"
29 "source.monogon.dev/metropolis/pkg/event"
30 "source.monogon.dev/metropolis/pkg/event/memory"
31 "source.monogon.dev/metropolis/pkg/supervisor"
32)
33
34// Config is the configuration of the role server.
35type Config struct {
36 // CuratorDial is a function that the roleserver will use to dial the curator.
37 // As both the curator listener and roleserver might restart, this dial function
38 // is needed to possibly re-establish connectivity after a full restart of
39 // either.
40 CuratorDial func(ctx context.Context) (*grpc.ClientConn, error)
41
42 // StorageRoot is a handle to access all of the Node's storage. This is needed
43 // as the roleserver spawns complex workloads like Kubernetes which need access
44 // to a broad range of storage.
45 StorageRoot *localstorage.Root
46
47 // Network is a handle to the network service, used by workloads.
48 Network *network.Service
49
50 // KPKI is a handle to initialized Kubernetes PKI stored on etcd. In the future
51 // this will probably be provisioned by the Kubernetes workload itself.
52 KPKI *pki.PKI
53
54 // NodeID is the node ID on which the roleserver is running.
55 NodeID string
56}
57
58// Service is the roleserver/“Role Server” service. See the package-level
59// documentation for more details.
60type Service struct {
61 Config
62
63 value memory.Value
64
65 // kwC is a channel populated with updates to the local Node object from the
66 // curator, passed over to the Kubernetes Worker launcher.
67 kwC chan *cpb.Node
68 // kwSvcC is a channel populated by the Kubernetes Worker launcher when the
69 // service is started (which then contains the value of spawned Kubernetes
70 // workload service) or stopped (which is then nil).
71 kwSvcC chan *kubernetes.Service
72
73 // gRPC channel to curator, acquired via Config.CuratorDial, active for the
74 // lifecycle of the Service runnable. It's used by the updater
75 // sub-runnable.
76 curator cpb.CuratorClient
77}
78
79// Status is updated by the role service any time one of the subordinate
80// workload services is started or stopped.
81type Status struct {
82 // Kubernetes is set to the Kubernetes workload Service if started/restarted or
83 // nil if stopped.
84 Kubernetes *kubernetes.Service
85}
86
87// New creates a Role Server services from a Config.
88func New(c Config) *Service {
89 return &Service{
90 Config: c,
91 kwC: make(chan *cpb.Node),
92 kwSvcC: make(chan *kubernetes.Service),
93 }
94}
95
96type Watcher struct {
97 event.Watcher
98}
99
100func (s *Service) Watch() Watcher {
101 return Watcher{
102 Watcher: s.value.Watch(),
103 }
104}
105
106func (w *Watcher) Get(ctx context.Context) (*Status, error) {
107 v, err := w.Watcher.Get(ctx)
108 if err != nil {
109 return nil, err
110 }
111 st := v.(Status)
112 return &st, nil
113}
114
115// Run the Role Server service, which uses intermediary workload launchers to
116// start/stop subordinate services as the Node's roles change.
117func (s *Service) Run(ctx context.Context) error {
118 supervisor.Logger(ctx).Info("Dialing curator...")
119 conn, err := s.CuratorDial(ctx)
120 if err != nil {
121 return fmt.Errorf("could not dial cluster curator: %w", err)
122 }
123 defer conn.Close()
124 s.curator = cpb.NewCuratorClient(conn)
125
126 if err := supervisor.Run(ctx, "updater", s.runUpdater); err != nil {
127 return fmt.Errorf("failed to launch updater: %w", err)
128 }
129
130 if err := supervisor.Run(ctx, "kubernetes-worker", s.runKubernetesWorkerLauncher); err != nil {
131 return fmt.Errorf("failed to start kubernetes-worker launcher: %w", err)
132 }
133
134 supervisor.Signal(ctx, supervisor.SignalHealthy)
135
136 status := Status{}
137 for {
138 select {
139 case <-ctx.Done():
140 return ctx.Err()
141 case svc := <-s.kwSvcC:
142 status.Kubernetes = svc
143 s.value.Set(status)
144 }
145 }
146}
147
148// runUpdater runs the updater, a runnable which watchers the cluster via
149// curator for any pertinent node changes and distributes them to respective
150// workload launchers.
151//
152// TODO(q3k): this should probably be implemented somewhere as a curator client
153// library, maybe one that implements the Event Value interface.
154func (s *Service) runUpdater(ctx context.Context) error {
155 srv, err := s.curator.Watch(ctx, &cpb.WatchRequest{Kind: &cpb.WatchRequest_NodeInCluster_{
156 NodeInCluster: &cpb.WatchRequest_NodeInCluster{
157 NodeId: s.NodeID,
158 },
159 }})
160 if err != nil {
161 return fmt.Errorf("watch failed: %w", err)
162 }
163 defer srv.CloseSend()
164
165 supervisor.Signal(ctx, supervisor.SignalHealthy)
166 for {
167 ev, err := srv.Recv()
168 if err != nil {
169 return fmt.Errorf("watch event receive failed: %w", err)
170 }
171 supervisor.Logger(ctx).Infof("Received node event: %+v", ev)
172 for _, node := range ev.Nodes {
173 if node.Id != s.NodeID {
174 continue
175 }
176 s.kwC <- node
177 }
178 }
179
180}