metropolis: move curator client watches to curator/watcher
This replaces all the ad-hoc code to watch Curator node(s) with calls
through the new curator/watcher library.
Change-Id: Ie2a82b330e4108b9b725515cb10595916c38b323
Reviewed-on: https://review.monogon.dev/c/monogon/+/2263
Tested-by: Jenkins CI
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/node/kubernetes/BUILD.bazel b/metropolis/node/kubernetes/BUILD.bazel
index a88b4b3..8e68973 100644
--- a/metropolis/node/kubernetes/BUILD.bazel
+++ b/metropolis/node/kubernetes/BUILD.bazel
@@ -20,6 +20,7 @@
"//metropolis/node",
"//metropolis/node/core/clusternet",
"//metropolis/node/core/curator/proto/api",
+ "//metropolis/node/core/curator/watcher",
"//metropolis/node/core/identity",
"//metropolis/node/core/localstorage",
"//metropolis/node/core/network",
diff --git a/metropolis/node/kubernetes/apiproxy.go b/metropolis/node/kubernetes/apiproxy.go
index 9f9c851..d937824 100644
--- a/metropolis/node/kubernetes/apiproxy.go
+++ b/metropolis/node/kubernetes/apiproxy.go
@@ -2,12 +2,12 @@
import (
"context"
- "fmt"
"net"
"source.monogon.dev/go/net/tinylb"
"source.monogon.dev/metropolis/node"
ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+ "source.monogon.dev/metropolis/node/core/curator/watcher"
"source.monogon.dev/metropolis/pkg/event/memory"
)
@@ -15,38 +15,36 @@
// the currently known nodes running a Kubernetes apiserver as retrieved from the
// given curator client.
func updateLoadbalancerAPIServers(ctx context.Context, val *memory.Value[tinylb.BackendSet], cur ipb.CuratorClient) error {
- w, err := cur.Watch(ctx, &ipb.WatchRequest{
- Kind: &ipb.WatchRequest_NodesInCluster_{
- NodesInCluster: &ipb.WatchRequest_NodesInCluster{},
- },
- })
- if err != nil {
- return fmt.Errorf("watch failed: %w", err)
- }
- defer w.CloseSend()
-
set := &tinylb.BackendSet{}
val.Set(set.Clone())
- for {
- ev, err := w.Recv()
- if err != nil {
- return fmt.Errorf("receive failed: %w", err)
- }
- for _, n := range ev.Nodes {
- if n.Status == nil || n.Status.ExternalAddress == "" {
- set.Delete(n.Id)
- continue
+ return watcher.WatchNodes(ctx, cur, watcher.SimpleFollower{
+ FilterFn: func(a *ipb.Node) bool {
+ if a.Status == nil {
+ return false
}
- if n.Roles.KubernetesController == nil {
- set.Delete(n.Id)
- continue
+ if a.Status.ExternalAddress == "" {
+ return false
}
- set.Insert(n.Id, &tinylb.SimpleTCPBackend{Remote: net.JoinHostPort(n.Status.ExternalAddress, node.KubernetesAPIPort.PortString())})
- }
- for _, t := range ev.NodeTombstones {
- set.Delete(t.NodeId)
- }
- val.Set(set.Clone())
- }
+ if a.Roles.KubernetesController == nil {
+ return false
+ }
+ return true
+ },
+ EqualsFn: func(a *ipb.Node, b *ipb.Node) bool {
+ return a.Status.ExternalAddress == b.Status.ExternalAddress
+ },
+ OnNewUpdated: func(new *ipb.Node) error {
+ set.Insert(new.Id, &tinylb.SimpleTCPBackend{
+ Remote: net.JoinHostPort(new.Status.ExternalAddress, node.KubernetesAPIPort.PortString()),
+ })
+ val.Set(set.Clone())
+ return nil
+ },
+ OnDeleted: func(prev *ipb.Node) error {
+ set.Delete(prev.Id)
+ val.Set(set.Clone())
+ return nil
+ },
+ })
}