blob: 4cea7c9876edf462b045675711ac4ab3334cb5ed [file] [log] [blame]
package roleserve
import (
"context"
"fmt"
"google.golang.org/grpc"
"google.golang.org/protobuf/encoding/prototext"
common "source.monogon.dev/metropolis/node"
"source.monogon.dev/metropolis/node/core/network"
"source.monogon.dev/metropolis/pkg/event"
"source.monogon.dev/metropolis/pkg/event/memory"
"source.monogon.dev/metropolis/pkg/supervisor"
ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
cpb "source.monogon.dev/metropolis/proto/common"
)
// workerStatusPush is the Status Pusher, a service responsible for sending
// UpdateNodeStatus RPCs to a cluster whenever a Curator is available.
type workerStatusPush struct {
network *network.Service
// clusterMembership will be read.
clusterMembership *memory.Value[*ClusterMembership]
// clusterDirectorySaved will be read.
clusterDirectorySaved *memory.Value[bool]
}
// workerStatusPushChannels contain all the channels between the status pusher's
// 'map' runnables (waiting on Event Values) and the main loop.
type workerStatusPushChannels struct {
// address of the node, or empty if none. Retrieved from network service.
address chan string
// nodeID of this node. Populated whenever available from ClusterMembership.
nodeID chan string
// curatorClient connecting to the cluster, populated whenever available from
// ClusterMembership. Used to actually submit the update.
curatorClient chan ipb.CuratorClient
// localCurator describing whether this node has a locally running curator on
// the default port. Retrieved from ClusterMembership.
localCurator chan bool
}
// workerStatusPushLoop runs the main loop acting on data received from
// workerStatusPushChannels.
func workerStatusPushLoop(ctx context.Context, chans *workerStatusPushChannels) error {
status := cpb.NodeStatus{}
var cur ipb.CuratorClient
var nodeID string
for {
changed := false
select {
case <-ctx.Done():
return fmt.Errorf("while waiting for map updates: %w", ctx.Err())
case address := <-chans.address:
if address != status.ExternalAddress {
supervisor.Logger(ctx).Infof("Got external address: %s", address)
status.ExternalAddress = address
changed = true
}
case newNodeID := <-chans.nodeID:
if nodeID != newNodeID {
supervisor.Logger(ctx).Infof("Got new NodeID: %s", newNodeID)
nodeID = newNodeID
changed = true
}
case cur = <-chans.curatorClient:
supervisor.Logger(ctx).Infof("Got curator connection.")
case localCurator := <-chans.localCurator:
if status.RunningCurator == nil && localCurator {
supervisor.Logger(ctx).Infof("Got new local curator state: running")
status.RunningCurator = &cpb.NodeStatus_RunningCurator{
Port: int32(common.CuratorServicePort),
}
changed = true
}
if status.RunningCurator != nil && !localCurator {
supervisor.Logger(ctx).Infof("Got new local curator state: not running")
status.RunningCurator = nil
changed = true
}
}
if cur != nil && nodeID != "" && changed && status.ExternalAddress != "" {
txt, _ := prototext.Marshal(&status)
supervisor.Logger(ctx).Infof("Submitting status: %q", txt)
_, err := cur.UpdateNodeStatus(ctx, &ipb.UpdateNodeStatusRequest{
NodeId: nodeID,
Status: &status,
})
if err != nil {
return fmt.Errorf("UpdateNodeStatus failed: %w", err)
}
}
}
}
func (s *workerStatusPush) run(ctx context.Context) error {
chans := workerStatusPushChannels{
address: make(chan string),
nodeID: make(chan string),
curatorClient: make(chan ipb.CuratorClient),
localCurator: make(chan bool),
}
// All the channel sends in the map runnables are preemptible by a context
// cancelation. This is because workerStatusPushLoop can crash while processing
// the events, requiring a restart of this runnable. Without the preemption this
// runnable could get stuck.
supervisor.Run(ctx, "map-network", func(ctx context.Context) error {
nw := s.network.Watch()
defer nw.Close()
supervisor.Signal(ctx, supervisor.SignalHealthy)
for {
st, err := nw.Get(ctx)
if err != nil {
return fmt.Errorf("getting network status failed: %w", err)
}
select {
case chans.address <- st.ExternalAddress.String():
case <-ctx.Done():
return ctx.Err()
}
}
})
supervisor.Run(ctx, "map-cluster-membership", func(ctx context.Context) error {
supervisor.Signal(ctx, supervisor.SignalHealthy)
// Do not submit heartbeat until we've got the cluster directory saved.
//
// TODO(q3k): add a node status field for this instead.
supervisor.Logger(ctx).Infof("Waiting for cluster directory to be saved...")
cdw := s.clusterDirectorySaved.Watch()
_, err := cdw.Get(ctx, event.Filter(func(t bool) bool { return t }))
if err != nil {
return fmt.Errorf("getting cluster directory state failed: %w", err)
}
var conn *grpc.ClientConn
defer func() {
if conn != nil {
conn.Close()
}
}()
w := s.clusterMembership.Watch()
defer w.Close()
supervisor.Logger(ctx).Infof("Cluster directory saved, waiting for cluster membership...")
for {
cm, err := w.Get(ctx, FilterHome())
if err != nil {
return fmt.Errorf("getting cluster membership status failed: %w", err)
}
if conn == nil {
conn, err = cm.DialCurator()
if err != nil {
return fmt.Errorf("when attempting to connect to curator: %w", err)
}
select {
case chans.curatorClient <- ipb.NewCuratorClient(conn):
case <-ctx.Done():
return ctx.Err()
}
}
select {
case chans.localCurator <- cm.localCurator != nil:
case <-ctx.Done():
return ctx.Err()
}
select {
case chans.nodeID <- cm.NodeID():
case <-ctx.Done():
return ctx.Err()
}
}
})
supervisor.Signal(ctx, supervisor.SignalHealthy)
return workerStatusPushLoop(ctx, &chans)
}