blob: 57e6a7a97491159f7cc34cf5be9716155ec8b3ab [file] [log] [blame]
Serge Bazanski0d937772021-06-17 15:54:40 +02001// package roleserve implements the roleserver/“Role Server”.
2//
3// The Role Server is responsible for watching the cluster/node state and
4// starting/stopping any 'application' code (also called workload code) based on
5// the Node's roles.
6//
7// Each workload code (which would usually be a supervisor runnable) is started
8// by a dedicated 'launcher'. These launchers wait for node roles to be
9// available from the curator, and either start the related workload sub-runners
10// or do nothing ; then they declare themselves as healthy to the supervisor. If
11// at any point the role of the node changes (meaning that the node should now
12// start or stop the workloads) the launcher just exits and the supervisor
13// will restart it.
14//
15// Currently, this is used to start up the Kubernetes worker code.
16package roleserve
17
18import (
19 "context"
20 "fmt"
21
22 "google.golang.org/grpc"
Serge Bazanski96043bc2021-10-05 12:10:13 +020023
Serge Bazanski0d937772021-06-17 15:54:40 +020024 cpb "source.monogon.dev/metropolis/node/core/curator/proto/api"
Lorenz Brun1de8b182021-12-21 17:15:18 +010025 "source.monogon.dev/metropolis/node/core/identity"
Serge Bazanski0d937772021-06-17 15:54:40 +020026 "source.monogon.dev/metropolis/node/core/localstorage"
27 "source.monogon.dev/metropolis/node/core/network"
28 "source.monogon.dev/metropolis/node/kubernetes"
29 "source.monogon.dev/metropolis/node/kubernetes/pki"
30 "source.monogon.dev/metropolis/pkg/event"
31 "source.monogon.dev/metropolis/pkg/event/memory"
32 "source.monogon.dev/metropolis/pkg/supervisor"
33)
34
35// Config is the configuration of the role server.
36type Config struct {
37 // CuratorDial is a function that the roleserver will use to dial the curator.
38 // As both the curator listener and roleserver might restart, this dial function
39 // is needed to possibly re-establish connectivity after a full restart of
40 // either.
41 CuratorDial func(ctx context.Context) (*grpc.ClientConn, error)
42
43 // StorageRoot is a handle to access all of the Node's storage. This is needed
44 // as the roleserver spawns complex workloads like Kubernetes which need access
45 // to a broad range of storage.
46 StorageRoot *localstorage.Root
47
48 // Network is a handle to the network service, used by workloads.
49 Network *network.Service
50
51 // KPKI is a handle to initialized Kubernetes PKI stored on etcd. In the future
52 // this will probably be provisioned by the Kubernetes workload itself.
53 KPKI *pki.PKI
54
Lorenz Brun1de8b182021-12-21 17:15:18 +010055 // Node is the node identity on which the roleserver is running.
56 Node *identity.Node
Serge Bazanski0d937772021-06-17 15:54:40 +020057}
58
59// Service is the roleserver/“Role Server” service. See the package-level
60// documentation for more details.
61type Service struct {
62 Config
63
64 value memory.Value
65
66 // kwC is a channel populated with updates to the local Node object from the
67 // curator, passed over to the Kubernetes Worker launcher.
68 kwC chan *cpb.Node
69 // kwSvcC is a channel populated by the Kubernetes Worker launcher when the
70 // service is started (which then contains the value of spawned Kubernetes
71 // workload service) or stopped (which is then nil).
72 kwSvcC chan *kubernetes.Service
73
74 // gRPC channel to curator, acquired via Config.CuratorDial, active for the
75 // lifecycle of the Service runnable. It's used by the updater
76 // sub-runnable.
77 curator cpb.CuratorClient
78}
79
80// Status is updated by the role service any time one of the subordinate
81// workload services is started or stopped.
82type Status struct {
83 // Kubernetes is set to the Kubernetes workload Service if started/restarted or
84 // nil if stopped.
85 Kubernetes *kubernetes.Service
86}
87
88// New creates a Role Server services from a Config.
89func New(c Config) *Service {
90 return &Service{
91 Config: c,
92 kwC: make(chan *cpb.Node),
93 kwSvcC: make(chan *kubernetes.Service),
94 }
95}
96
97type Watcher struct {
98 event.Watcher
99}
100
101func (s *Service) Watch() Watcher {
102 return Watcher{
103 Watcher: s.value.Watch(),
104 }
105}
106
107func (w *Watcher) Get(ctx context.Context) (*Status, error) {
108 v, err := w.Watcher.Get(ctx)
109 if err != nil {
110 return nil, err
111 }
112 st := v.(Status)
113 return &st, nil
114}
115
116// Run the Role Server service, which uses intermediary workload launchers to
117// start/stop subordinate services as the Node's roles change.
118func (s *Service) Run(ctx context.Context) error {
119 supervisor.Logger(ctx).Info("Dialing curator...")
120 conn, err := s.CuratorDial(ctx)
121 if err != nil {
122 return fmt.Errorf("could not dial cluster curator: %w", err)
123 }
124 defer conn.Close()
125 s.curator = cpb.NewCuratorClient(conn)
126
127 if err := supervisor.Run(ctx, "updater", s.runUpdater); err != nil {
128 return fmt.Errorf("failed to launch updater: %w", err)
129 }
130
Serge Bazanskibf68fa92021-10-05 17:53:58 +0200131 if err := supervisor.Run(ctx, "cluster-agent", s.runClusterAgent); err != nil {
132 return fmt.Errorf("failed to launch cluster agent: %w", err)
133 }
134
Serge Bazanski0d937772021-06-17 15:54:40 +0200135 if err := supervisor.Run(ctx, "kubernetes-worker", s.runKubernetesWorkerLauncher); err != nil {
136 return fmt.Errorf("failed to start kubernetes-worker launcher: %w", err)
137 }
138
139 supervisor.Signal(ctx, supervisor.SignalHealthy)
140
141 status := Status{}
142 for {
143 select {
144 case <-ctx.Done():
145 return ctx.Err()
146 case svc := <-s.kwSvcC:
147 status.Kubernetes = svc
148 s.value.Set(status)
149 }
150 }
151}
152
153// runUpdater runs the updater, a runnable which watchers the cluster via
154// curator for any pertinent node changes and distributes them to respective
155// workload launchers.
156//
157// TODO(q3k): this should probably be implemented somewhere as a curator client
158// library, maybe one that implements the Event Value interface.
159func (s *Service) runUpdater(ctx context.Context) error {
160 srv, err := s.curator.Watch(ctx, &cpb.WatchRequest{Kind: &cpb.WatchRequest_NodeInCluster_{
161 NodeInCluster: &cpb.WatchRequest_NodeInCluster{
Lorenz Brun1de8b182021-12-21 17:15:18 +0100162 NodeId: s.Node.ID(),
Serge Bazanski0d937772021-06-17 15:54:40 +0200163 },
164 }})
165 if err != nil {
166 return fmt.Errorf("watch failed: %w", err)
167 }
168 defer srv.CloseSend()
169
170 supervisor.Signal(ctx, supervisor.SignalHealthy)
171 for {
172 ev, err := srv.Recv()
173 if err != nil {
174 return fmt.Errorf("watch event receive failed: %w", err)
175 }
176 supervisor.Logger(ctx).Infof("Received node event: %+v", ev)
177 for _, node := range ev.Nodes {
Lorenz Brun1de8b182021-12-21 17:15:18 +0100178 if node.Id != s.Node.ID() {
Serge Bazanski0d937772021-06-17 15:54:40 +0200179 continue
180 }
181 s.kwC <- node
182 }
183 }
184
185}