| package roleserve |
| |
| import ( |
| "context" |
| "fmt" |
| |
| "google.golang.org/grpc" |
| "google.golang.org/protobuf/encoding/prototext" |
| |
| common "source.monogon.dev/metropolis/node" |
| "source.monogon.dev/metropolis/node/core/network" |
| "source.monogon.dev/metropolis/pkg/event" |
| "source.monogon.dev/metropolis/pkg/event/memory" |
| "source.monogon.dev/metropolis/pkg/supervisor" |
| |
| ipb "source.monogon.dev/metropolis/node/core/curator/proto/api" |
| cpb "source.monogon.dev/metropolis/proto/common" |
| ) |
| |
| // workerStatusPush is the Status Pusher, a service responsible for sending |
| // UpdateNodeStatus RPCs to a cluster whenever a Curator is available. |
| type workerStatusPush struct { |
| network *network.Service |
| |
| // clusterMembership will be read. |
| clusterMembership *memory.Value[*ClusterMembership] |
| // clusterDirectorySaved will be read. |
| clusterDirectorySaved *memory.Value[bool] |
| } |
| |
| // workerStatusPushChannels contain all the channels between the status pusher's |
| // 'map' runnables (waiting on Event Values) and the main loop. |
| type workerStatusPushChannels struct { |
| // address of the node, or empty if none. Retrieved from network service. |
| address chan string |
| // nodeID of this node. Populated whenever available from ClusterMembership. |
| nodeID chan string |
| // curatorClient connecting to the cluster, populated whenever available from |
| // ClusterMembership. Used to actually submit the update. |
| curatorClient chan ipb.CuratorClient |
| // localCurator describing whether this node has a locally running curator on |
| // the default port. Retrieved from ClusterMembership. |
| localCurator chan bool |
| } |
| |
| // workerStatusPushLoop runs the main loop acting on data received from |
| // workerStatusPushChannels. |
| func workerStatusPushLoop(ctx context.Context, chans *workerStatusPushChannels) error { |
| status := cpb.NodeStatus{} |
| var cur ipb.CuratorClient |
| var nodeID string |
| |
| for { |
| changed := false |
| |
| select { |
| case <-ctx.Done(): |
| return fmt.Errorf("while waiting for map updates: %w", ctx.Err()) |
| |
| case address := <-chans.address: |
| if address != status.ExternalAddress { |
| supervisor.Logger(ctx).Infof("Got external address: %s", address) |
| status.ExternalAddress = address |
| changed = true |
| } |
| |
| case newNodeID := <-chans.nodeID: |
| if nodeID != newNodeID { |
| supervisor.Logger(ctx).Infof("Got new NodeID: %s", newNodeID) |
| nodeID = newNodeID |
| changed = true |
| } |
| |
| case cur = <-chans.curatorClient: |
| supervisor.Logger(ctx).Infof("Got curator connection.") |
| |
| case localCurator := <-chans.localCurator: |
| if status.RunningCurator == nil && localCurator { |
| supervisor.Logger(ctx).Infof("Got new local curator state: running") |
| status.RunningCurator = &cpb.NodeStatus_RunningCurator{ |
| Port: int32(common.CuratorServicePort), |
| } |
| changed = true |
| } |
| if status.RunningCurator != nil && !localCurator { |
| supervisor.Logger(ctx).Infof("Got new local curator state: not running") |
| status.RunningCurator = nil |
| changed = true |
| } |
| } |
| |
| if cur != nil && nodeID != "" && changed && status.ExternalAddress != "" { |
| txt, _ := prototext.Marshal(&status) |
| supervisor.Logger(ctx).Infof("Submitting status: %q", txt) |
| _, err := cur.UpdateNodeStatus(ctx, &ipb.UpdateNodeStatusRequest{ |
| NodeId: nodeID, |
| Status: &status, |
| }) |
| if err != nil { |
| return fmt.Errorf("UpdateNodeStatus failed: %w", err) |
| } |
| } |
| } |
| } |
| |
| func (s *workerStatusPush) run(ctx context.Context) error { |
| chans := workerStatusPushChannels{ |
| address: make(chan string), |
| nodeID: make(chan string), |
| curatorClient: make(chan ipb.CuratorClient), |
| localCurator: make(chan bool), |
| } |
| |
| // All the channel sends in the map runnables are preemptible by a context |
| // cancelation. This is because workerStatusPushLoop can crash while processing |
| // the events, requiring a restart of this runnable. Without the preemption this |
| // runnable could get stuck. |
| |
| supervisor.Run(ctx, "map-network", func(ctx context.Context) error { |
| nw := s.network.Watch() |
| defer nw.Close() |
| |
| supervisor.Signal(ctx, supervisor.SignalHealthy) |
| for { |
| st, err := nw.Get(ctx) |
| if err != nil { |
| return fmt.Errorf("getting network status failed: %w", err) |
| } |
| select { |
| case chans.address <- st.ExternalAddress.String(): |
| case <-ctx.Done(): |
| return ctx.Err() |
| } |
| } |
| }) |
| supervisor.Run(ctx, "map-cluster-membership", func(ctx context.Context) error { |
| supervisor.Signal(ctx, supervisor.SignalHealthy) |
| |
| // Do not submit heartbeat until we've got the cluster directory saved. |
| // |
| // TODO(q3k): add a node status field for this instead. |
| supervisor.Logger(ctx).Infof("Waiting for cluster directory to be saved...") |
| cdw := s.clusterDirectorySaved.Watch() |
| _, err := cdw.Get(ctx, event.Filter(func(t bool) bool { return t })) |
| if err != nil { |
| return fmt.Errorf("getting cluster directory state failed: %w", err) |
| } |
| |
| var conn *grpc.ClientConn |
| defer func() { |
| if conn != nil { |
| conn.Close() |
| } |
| }() |
| |
| w := s.clusterMembership.Watch() |
| defer w.Close() |
| supervisor.Logger(ctx).Infof("Cluster directory saved, waiting for cluster membership...") |
| for { |
| cm, err := w.Get(ctx, FilterHome()) |
| if err != nil { |
| return fmt.Errorf("getting cluster membership status failed: %w", err) |
| } |
| |
| if conn == nil { |
| conn, err = cm.DialCurator() |
| if err != nil { |
| return fmt.Errorf("when attempting to connect to curator: %w", err) |
| } |
| select { |
| case chans.curatorClient <- ipb.NewCuratorClient(conn): |
| case <-ctx.Done(): |
| return ctx.Err() |
| } |
| } |
| |
| select { |
| case chans.localCurator <- cm.localCurator != nil: |
| case <-ctx.Done(): |
| return ctx.Err() |
| } |
| select { |
| case chans.nodeID <- cm.NodeID(): |
| case <-ctx.Done(): |
| return ctx.Err() |
| } |
| } |
| }) |
| |
| supervisor.Signal(ctx, supervisor.SignalHealthy) |
| return workerStatusPushLoop(ctx, &chans) |
| } |