blob: fdaa9be81ec069671c84fabd26a6d2c1c3e0f8c0 [file] [log] [blame]
package roleserve
import (
"context"
"fmt"
"io"
"time"
"source.monogon.dev/metropolis/node/core/curator"
ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
"source.monogon.dev/metropolis/node/core/network"
"source.monogon.dev/metropolis/pkg/event/memory"
"source.monogon.dev/metropolis/pkg/supervisor"
)
// workerHeartbeat is a service that periodically updates node's heartbeat
// timestamps within the cluster.
type workerHeartbeat struct {
network *network.Service
// clusterMembership will be read.
clusterMembership *memory.Value[*ClusterMembership]
}
func (s *workerHeartbeat) run(ctx context.Context) error {
nw := s.network.Watch()
defer nw.Close()
w := s.clusterMembership.Watch()
defer w.Close()
supervisor.Logger(ctx).Infof("Waiting for cluster membership...")
cm, err := w.Get(ctx, FilterHome())
if err != nil {
return err
}
supervisor.Logger(ctx).Infof("Got cluster membership, starting...")
conn, err := cm.DialCurator()
if err != nil {
return err
}
defer conn.Close()
cur := ipb.NewCuratorClient(conn)
stream, err := cur.Heartbeat(ctx)
if err != nil {
return err
}
for {
if err := stream.Send(&ipb.HeartbeatUpdateRequest{}); err != nil {
return fmt.Errorf("while sending a heartbeat: %v", err)
}
next := time.Now().Add(curator.HeartbeatTimeout)
_, err := stream.Recv()
if err == io.EOF {
return fmt.Errorf("stream closed by the server. Restarting worker...")
}
if err != nil {
return fmt.Errorf("while receiving a heartbeat reply: %v", err)
}
time.Sleep(time.Until(next))
}
}