metropolis: move curator client watches to curator/watcher
This replaces all the ad-hoc code to watch Curator node(s) with calls
through the new curator/watcher library.
Change-Id: Ie2a82b330e4108b9b725515cb10595916c38b323
Reviewed-on: https://review.monogon.dev/c/monogon/+/2263
Tested-by: Jenkins CI
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/node/core/clusternet/clusternet.go b/metropolis/node/core/clusternet/clusternet.go
index 2e01c77..85ac63f 100644
--- a/metropolis/node/core/clusternet/clusternet.go
+++ b/metropolis/node/core/clusternet/clusternet.go
@@ -26,9 +26,11 @@
"fmt"
"net"
"net/netip"
+ "slices"
"github.com/cenkalti/backoff/v4"
+ "source.monogon.dev/metropolis/node/core/curator/watcher"
"source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/node/core/network"
"source.monogon.dev/metropolis/pkg/event"
@@ -176,58 +178,45 @@
func (s *Service) pull(ctx context.Context) error {
supervisor.Signal(ctx, supervisor.SignalHealthy)
- srv, err := s.Curator.Watch(ctx, &apb.WatchRequest{
- Kind: &apb.WatchRequest_NodesInCluster_{
- NodesInCluster: &apb.WatchRequest_NodesInCluster{},
+ var batch []*apb.Node
+ return watcher.WatchNodes(ctx, s.Curator, watcher.SimpleFollower{
+ FilterFn: func(a *apb.Node) bool {
+ if a.Clusternet == nil {
+ return false
+ }
+ if a.Clusternet.WireguardPubkey == "" {
+ return false
+ }
+ return true
+ },
+ EqualsFn: func(a *apb.Node, b *apb.Node) bool {
+ if a.Status.ExternalAddress != b.Status.ExternalAddress {
+ return false
+ }
+ if a.Clusternet.WireguardPubkey != b.Clusternet.WireguardPubkey {
+ return false
+ }
+ if !slices.Equal(a.Clusternet.Prefixes, b.Clusternet.Prefixes) {
+ return false
+ }
+ return true
+ },
+ OnNewUpdated: func(new *apb.Node) error {
+ batch = append(batch, new)
+ return nil
+ },
+ OnBatchDone: func() error {
+ if err := s.wg.configurePeers(batch); err != nil {
+ supervisor.Logger(ctx).Errorf("nodes couldn't be configured: %v", err)
+ }
+ batch = nil
+ return nil
+ },
+ OnDeleted: func(prev *apb.Node) error {
+ if err := s.wg.unconfigurePeer(prev); err != nil {
+ supervisor.Logger(ctx).Errorf("Node %s couldn't be unconfigured: %v", prev.Id, err)
+ }
+ return nil
},
})
- if err != nil {
- return fmt.Errorf("curator watch failed: %w", err)
- }
- defer srv.CloseSend()
-
- nodes := newNodemap()
- for {
- ev, err := srv.Recv()
- if err != nil {
- return fmt.Errorf("curator watch recv failed: %w", err)
- }
-
- updated, removed := nodes.update(ctx, ev)
-
- for _, n := range removed {
- supervisor.Logger(ctx).Infof("Node %s removed, unconfiguring", n.id)
- if err := s.wg.unconfigurePeer(n.copy()); err != nil {
- // Do nothing and hope whatever caused this will go away at some point.
- supervisor.Logger(ctx).Errorf("Node %s couldn't be unconfigured: %v", n.id, err)
- }
- }
- var newNodes []*node
- for _, n := range updated {
- newNodes = append(newNodes, n.copy())
- supervisor.Logger(ctx).Infof("Node %s updated: pk %s, address %s, prefixes %v", n.id, n.pubkey, n.address, n.prefixes)
- }
- succeeded := 0
- if err := s.wg.configurePeers(newNodes); err != nil {
- // If configuring all nodes at once failed, go node-by-node to make sure we've
- // done as much as possible.
- supervisor.Logger(ctx).Warningf("Bulk node update call failed, trying node-by-node..: %v", err)
- for _, n := range newNodes {
- if err := s.wg.configurePeers([]*node{n}); err != nil {
- supervisor.Logger(ctx).Errorf("Node %s failed: %v", n.id, err)
- } else {
- succeeded += 1
- }
- }
- } else {
- succeeded = len(newNodes)
- }
-
- if len(newNodes) != 0 {
- supervisor.Logger(ctx).Infof("Successfully updated %d out of %d nodes", succeeded, len(newNodes))
-
- numNodes, numPrefixes := nodes.stats()
- supervisor.Logger(ctx).Infof("Total: %d nodes, %d prefixes.", numNodes, numPrefixes)
- }
- }
}