blob: db068453323fc7bf550b09b9c8820c38c497679a [file] [log] [blame]
Mateusz Zalega32b19292022-05-17 13:26:55 +02001package roleserve
2
3import (
4 "context"
5 "fmt"
6 "io"
7 "time"
8
9 "source.monogon.dev/metropolis/node/core/curator"
10 ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
11 "source.monogon.dev/metropolis/node/core/network"
12 "source.monogon.dev/metropolis/pkg/supervisor"
13)
14
15// workerHeartbeat is a service that periodically updates node's heartbeat
16// timestamps within the cluster.
17type workerHeartbeat struct {
18 network *network.Service
19
20 // clusterMembership will be read.
21 clusterMembership *ClusterMembershipValue
22}
23
24func (s *workerHeartbeat) run(ctx context.Context) error {
25 nw := s.network.Watch()
26 defer nw.Close()
27
28 w := s.clusterMembership.Watch()
29 defer w.Close()
30 supervisor.Logger(ctx).Infof("Waiting for cluster membership...")
31 cm, err := w.GetHome(ctx)
32 if err != nil {
33 return err
34 }
35 supervisor.Logger(ctx).Infof("Got cluster membership, starting...")
36
37 conn, err := cm.DialCurator()
38 if err != nil {
39 return err
40 }
41 defer conn.Close()
42 cur := ipb.NewCuratorClient(conn)
43
44 stream, err := cur.Heartbeat(ctx)
45 if err != nil {
46 return err
47 }
48
49 for {
50 if err := stream.Send(&ipb.HeartbeatUpdateRequest{}); err != nil {
51 return fmt.Errorf("while sending a heartbeat: %v", err)
52 }
53 next := time.Now().Add(curator.HeartbeatTimeout)
54
55 _, err := stream.Recv()
56 if err == io.EOF {
57 return fmt.Errorf("stream closed by the server. Restarting worker...")
58 }
59 if err != nil {
60 return fmt.Errorf("while receiving a heartbeat reply: %v", err)
61 }
62
63 time.Sleep(time.Until(next))
64 }
65}