metropolis: move curator client watches to curator/watcher
This replaces all the ad-hoc code to watch Curator node(s) with calls
through the new curator/watcher library.
Change-Id: Ie2a82b330e4108b9b725515cb10595916c38b323
Reviewed-on: https://review.monogon.dev/c/monogon/+/2263
Tested-by: Jenkins CI
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/node/core/clusternet/BUILD.bazel b/metropolis/node/core/clusternet/BUILD.bazel
index a5fc41a..1ccce66 100644
--- a/metropolis/node/core/clusternet/BUILD.bazel
+++ b/metropolis/node/core/clusternet/BUILD.bazel
@@ -13,6 +13,7 @@
deps = [
"//metropolis/node",
"//metropolis/node/core/curator/proto/api",
+ "//metropolis/node/core/curator/watcher",
"//metropolis/node/core/localstorage",
"//metropolis/node/core/network",
"//metropolis/pkg/event",
@@ -27,10 +28,7 @@
go_test(
name = "clusternet_test",
- srcs = [
- "clusternet_test.go",
- "types_test.go",
- ],
+ srcs = ["clusternet_test.go"],
embed = [":clusternet"],
deps = [
"//metropolis/node",
diff --git a/metropolis/node/core/clusternet/clusternet.go b/metropolis/node/core/clusternet/clusternet.go
index 2e01c77..85ac63f 100644
--- a/metropolis/node/core/clusternet/clusternet.go
+++ b/metropolis/node/core/clusternet/clusternet.go
@@ -26,9 +26,11 @@
"fmt"
"net"
"net/netip"
+ "slices"
"github.com/cenkalti/backoff/v4"
+ "source.monogon.dev/metropolis/node/core/curator/watcher"
"source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/node/core/network"
"source.monogon.dev/metropolis/pkg/event"
@@ -176,58 +178,45 @@
func (s *Service) pull(ctx context.Context) error {
supervisor.Signal(ctx, supervisor.SignalHealthy)
- srv, err := s.Curator.Watch(ctx, &apb.WatchRequest{
- Kind: &apb.WatchRequest_NodesInCluster_{
- NodesInCluster: &apb.WatchRequest_NodesInCluster{},
+ var batch []*apb.Node
+ return watcher.WatchNodes(ctx, s.Curator, watcher.SimpleFollower{
+ FilterFn: func(a *apb.Node) bool {
+ if a.Clusternet == nil {
+ return false
+ }
+ if a.Clusternet.WireguardPubkey == "" {
+ return false
+ }
+ return true
+ },
+ EqualsFn: func(a *apb.Node, b *apb.Node) bool {
+ if a.Status.ExternalAddress != b.Status.ExternalAddress {
+ return false
+ }
+ if a.Clusternet.WireguardPubkey != b.Clusternet.WireguardPubkey {
+ return false
+ }
+ if !slices.Equal(a.Clusternet.Prefixes, b.Clusternet.Prefixes) {
+ return false
+ }
+ return true
+ },
+ OnNewUpdated: func(new *apb.Node) error {
+ batch = append(batch, new)
+ return nil
+ },
+ OnBatchDone: func() error {
+ if err := s.wg.configurePeers(batch); err != nil {
+ supervisor.Logger(ctx).Errorf("nodes couldn't be configured: %v", err)
+ }
+ batch = nil
+ return nil
+ },
+ OnDeleted: func(prev *apb.Node) error {
+ if err := s.wg.unconfigurePeer(prev); err != nil {
+ supervisor.Logger(ctx).Errorf("Node %s couldn't be unconfigured: %v", prev.Id, err)
+ }
+ return nil
},
})
- if err != nil {
- return fmt.Errorf("curator watch failed: %w", err)
- }
- defer srv.CloseSend()
-
- nodes := newNodemap()
- for {
- ev, err := srv.Recv()
- if err != nil {
- return fmt.Errorf("curator watch recv failed: %w", err)
- }
-
- updated, removed := nodes.update(ctx, ev)
-
- for _, n := range removed {
- supervisor.Logger(ctx).Infof("Node %s removed, unconfiguring", n.id)
- if err := s.wg.unconfigurePeer(n.copy()); err != nil {
- // Do nothing and hope whatever caused this will go away at some point.
- supervisor.Logger(ctx).Errorf("Node %s couldn't be unconfigured: %v", n.id, err)
- }
- }
- var newNodes []*node
- for _, n := range updated {
- newNodes = append(newNodes, n.copy())
- supervisor.Logger(ctx).Infof("Node %s updated: pk %s, address %s, prefixes %v", n.id, n.pubkey, n.address, n.prefixes)
- }
- succeeded := 0
- if err := s.wg.configurePeers(newNodes); err != nil {
- // If configuring all nodes at once failed, go node-by-node to make sure we've
- // done as much as possible.
- supervisor.Logger(ctx).Warningf("Bulk node update call failed, trying node-by-node..: %v", err)
- for _, n := range newNodes {
- if err := s.wg.configurePeers([]*node{n}); err != nil {
- supervisor.Logger(ctx).Errorf("Node %s failed: %v", n.id, err)
- } else {
- succeeded += 1
- }
- }
- } else {
- succeeded = len(newNodes)
- }
-
- if len(newNodes) != 0 {
- supervisor.Logger(ctx).Infof("Successfully updated %d out of %d nodes", succeeded, len(newNodes))
-
- numNodes, numPrefixes := nodes.stats()
- supervisor.Logger(ctx).Infof("Total: %d nodes, %d prefixes.", numNodes, numPrefixes)
- }
- }
}
diff --git a/metropolis/node/core/clusternet/clusternet_test.go b/metropolis/node/core/clusternet/clusternet_test.go
index bda1459..b92cfb2 100644
--- a/metropolis/node/core/clusternet/clusternet_test.go
+++ b/metropolis/node/core/clusternet/clusternet_test.go
@@ -4,7 +4,8 @@
"fmt"
"net"
"os"
- "strings"
+ "slices"
+ "sort"
"sync"
"testing"
"time"
@@ -21,6 +22,7 @@
"source.monogon.dev/metropolis/test/util"
apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+ cpb "source.monogon.dev/metropolis/proto/common"
)
// fakeWireguard implements wireguard while keeping peer information internally.
@@ -28,7 +30,7 @@
k wgtypes.Key
muNodes sync.Mutex
- nodes map[string]*node
+ nodes map[string]*apb.Node
failNextUpdate bool
}
@@ -40,27 +42,29 @@
func (f *fakeWireguard) setup(clusterNet *net.IPNet) error {
f.muNodes.Lock()
defer f.muNodes.Unlock()
- f.nodes = make(map[string]*node)
+ f.nodes = make(map[string]*apb.Node)
return nil
}
-func (f *fakeWireguard) configurePeers(nodes []*node) error {
+func (f *fakeWireguard) configurePeers(nodes []*apb.Node) error {
f.muNodes.Lock()
defer f.muNodes.Unlock()
+
if f.failNextUpdate {
f.failNextUpdate = false
return fmt.Errorf("synthetic test failure")
}
+
for _, n := range nodes {
- f.nodes[n.id] = n
+ f.nodes[n.Id] = n
}
return nil
}
-func (f *fakeWireguard) unconfigurePeer(n *node) error {
+func (f *fakeWireguard) unconfigurePeer(node *apb.Node) error {
f.muNodes.Lock()
defer f.muNodes.Unlock()
- delete(f.nodes, n.id)
+ delete(f.nodes, node.Id)
return nil
}
@@ -105,7 +109,7 @@
}
supervisor.TestHarness(t, svc.Run)
- checkState := func(nodes map[string]*node) error {
+ checkState := func(nodes map[string]*apb.Node) error {
t.Helper()
wg.muNodes.Lock()
defer wg.muNodes.Unlock()
@@ -114,16 +118,23 @@
if !ok {
return fmt.Errorf("node %q missing in programmed peers", nid)
}
- if n2.pubkey != n.pubkey {
- return fmt.Errorf("node %q pubkey mismatch: %q in programmed peers, %q wanted", nid, n2.pubkey, n.pubkey)
+ if got, want := n2.Clusternet.WireguardPubkey, n.Clusternet.WireguardPubkey; got != want {
+ return fmt.Errorf("node %q pubkey mismatch: %q in programmed peers, %q wanted", nid, got, want)
}
- if n2.address != n.address {
- return fmt.Errorf("node %q address mismatch: %q in programmed peers, %q wanted", nid, n2.address, n.address)
+ if got, want := n2.Status.ExternalAddress, n.Status.ExternalAddress; got != want {
+ return fmt.Errorf("node %q address mismatch: %q in programmed peers, %q wanted", nid, got, want)
}
- p := strings.Join(n.prefixes, ",")
- p2 := strings.Join(n2.prefixes, ",")
- if p != p2 {
- return fmt.Errorf("node %q prefixes mismatch: %v in programmed peers, %v wanted", nid, n2.prefixes, n.prefixes)
+ var p, p2 []string
+ for _, prefix := range n.Clusternet.Prefixes {
+ p = append(p, prefix.Cidr)
+ }
+ for _, prefix := range n2.Clusternet.Prefixes {
+ p2 = append(p2, prefix.Cidr)
+ }
+ sort.Strings(p)
+ sort.Strings(p2)
+ if !slices.Equal(p, p2) {
+ return fmt.Errorf("node %q prefixes mismatch: %v in programmed peers, %v wanted", nid, p2, p)
}
}
for nid, _ := range wg.nodes {
@@ -134,7 +145,7 @@
return nil
}
- assertStateEventual := func(nodes map[string]*node) {
+ assertStateEventual := func(nodes map[string]*apb.Node) {
t.Helper()
deadline := time.Now().Add(5 * time.Second)
for {
@@ -152,34 +163,46 @@
// Start with a single node.
cur.NodeWithPrefixes(key1, "metropolis-fake-1", "1.2.3.4")
- assertStateEventual(map[string]*node{
+ assertStateEventual(map[string]*apb.Node{
"metropolis-fake-1": {
- pubkey: key1.PublicKey().String(),
- address: "1.2.3.4",
- prefixes: nil,
+ Status: &cpb.NodeStatus{
+ ExternalAddress: "1.2.3.4",
+ },
+ Clusternet: &cpb.NodeClusterNetworking{
+ WireguardPubkey: key1.PublicKey().String(),
+ },
},
})
// Change the node's peer address.
cur.NodeWithPrefixes(key1, "metropolis-fake-1", "1.2.3.5")
- assertStateEventual(map[string]*node{
+ assertStateEventual(map[string]*apb.Node{
"metropolis-fake-1": {
- pubkey: key1.PublicKey().String(),
- address: "1.2.3.5",
- prefixes: nil,
+ Status: &cpb.NodeStatus{
+ ExternalAddress: "1.2.3.5",
+ },
+ Clusternet: &cpb.NodeClusterNetworking{
+ WireguardPubkey: key1.PublicKey().String(),
+ },
},
})
// Add another node.
cur.NodeWithPrefixes(key2, "metropolis-fake-2", "1.2.3.6")
- assertStateEventual(map[string]*node{
+ assertStateEventual(map[string]*apb.Node{
"metropolis-fake-1": {
- pubkey: key1.PublicKey().String(),
- address: "1.2.3.5",
- prefixes: nil,
+ Status: &cpb.NodeStatus{
+ ExternalAddress: "1.2.3.5",
+ },
+ Clusternet: &cpb.NodeClusterNetworking{
+ WireguardPubkey: key1.PublicKey().String(),
+ },
},
"metropolis-fake-2": {
- pubkey: key2.PublicKey().String(),
- address: "1.2.3.6",
- prefixes: nil,
+ Status: &cpb.NodeStatus{
+ ExternalAddress: "1.2.3.6",
+ },
+ Clusternet: &cpb.NodeClusterNetworking{
+ WireguardPubkey: key2.PublicKey().String(),
+ },
},
})
// Add some prefixes to both nodes, but fail the next configurePeers call.
@@ -188,30 +211,42 @@
wg.muNodes.Unlock()
cur.NodeWithPrefixes(key1, "metropolis-fake-1", "1.2.3.5", "10.100.10.0/24", "10.100.20.0/24")
cur.NodeWithPrefixes(key2, "metropolis-fake-2", "1.2.3.6", "10.100.30.0/24", "10.100.40.0/24")
- assertStateEventual(map[string]*node{
+ assertStateEventual(map[string]*apb.Node{
"metropolis-fake-1": {
- pubkey: key1.PublicKey().String(),
- address: "1.2.3.5",
- prefixes: []string{
- "10.100.10.0/24", "10.100.20.0/24",
+ Status: &cpb.NodeStatus{
+ ExternalAddress: "1.2.3.5",
+ },
+ Clusternet: &cpb.NodeClusterNetworking{
+ WireguardPubkey: key1.PublicKey().String(),
+ // No prefixes as the call failed.
},
},
"metropolis-fake-2": {
- pubkey: key2.PublicKey().String(),
- address: "1.2.3.6",
- prefixes: []string{
- "10.100.30.0/24", "10.100.40.0/24",
+ Status: &cpb.NodeStatus{
+ ExternalAddress: "1.2.3.6",
+ },
+ Clusternet: &cpb.NodeClusterNetworking{
+ WireguardPubkey: key2.PublicKey().String(),
+ Prefixes: []*cpb.NodeClusterNetworking_Prefix{
+ {Cidr: "10.100.30.0/24"},
+ {Cidr: "10.100.40.0/24"},
+ },
},
},
})
// Delete one of the nodes.
cur.DeleteNode("metropolis-fake-1")
- assertStateEventual(map[string]*node{
+ assertStateEventual(map[string]*apb.Node{
"metropolis-fake-2": {
- pubkey: key2.PublicKey().String(),
- address: "1.2.3.6",
- prefixes: []string{
- "10.100.30.0/24", "10.100.40.0/24",
+ Status: &cpb.NodeStatus{
+ ExternalAddress: "1.2.3.6",
+ },
+ Clusternet: &cpb.NodeClusterNetworking{
+ WireguardPubkey: key2.PublicKey().String(),
+ Prefixes: []*cpb.NodeClusterNetworking_Prefix{
+ {Cidr: "10.100.30.0/24"},
+ {Cidr: "10.100.40.0/24"},
+ },
},
},
})
@@ -289,21 +324,31 @@
if err != nil {
t.Fatalf("Failed to generate private key: %v", err)
}
- err = wg.configurePeers([]*node{
+ err = wg.configurePeers([]*apb.Node{
{
- pubkey: pkeys[0].PublicKey().String(),
- address: "10.100.0.1",
- prefixes: []string{
- "10.0.0.0/24",
- "10.0.1.0/24",
+ Id: "test-0",
+ Status: &cpb.NodeStatus{
+ ExternalAddress: "10.100.0.1",
+ },
+ Clusternet: &cpb.NodeClusterNetworking{
+ WireguardPubkey: pkeys[0].PublicKey().String(),
+ Prefixes: []*cpb.NodeClusterNetworking_Prefix{
+ {Cidr: "10.0.0.0/24"},
+ {Cidr: "10.0.1.0/24"},
+ },
},
},
{
- pubkey: pkeys[1].PublicKey().String(),
- address: "10.100.1.1",
- prefixes: []string{
- "10.1.0.0/24",
- "10.1.1.0/24",
+ Id: "test-1",
+ Status: &cpb.NodeStatus{
+ ExternalAddress: "10.100.1.1",
+ },
+ Clusternet: &cpb.NodeClusterNetworking{
+ WireguardPubkey: pkeys[1].PublicKey().String(),
+ Prefixes: []*cpb.NodeClusterNetworking_Prefix{
+ {Cidr: "10.1.0.0/24"},
+ {Cidr: "10.1.1.0/24"},
+ },
},
},
})
@@ -338,17 +383,22 @@
}
// Update one of the peers and check that things got applied.
- err = wg.configurePeers([]*node{
+ err = wg.configurePeers([]*apb.Node{
{
- pubkey: pkeys[0].PublicKey().String(),
- address: "10.100.0.3",
- prefixes: []string{
- "10.0.0.0/24",
+ Id: "test-0",
+ Status: &cpb.NodeStatus{
+ ExternalAddress: "10.100.0.3",
+ },
+ Clusternet: &cpb.NodeClusterNetworking{
+ WireguardPubkey: pkeys[0].PublicKey().String(),
+ Prefixes: []*cpb.NodeClusterNetworking_Prefix{
+ {Cidr: "10.0.0.0/24"},
+ },
},
},
})
if err != nil {
- t.Fatalf("Failed to connect to netlink's WireGuard config endpoint: %v", err)
+ t.Fatalf("Failed to update peer: %v", err)
}
wgDev, err = wgClient.Device(clusterNetDeviceName)
if err != nil {
@@ -373,14 +423,18 @@
}
// Remove one of the peers and make sure it's gone.
- err = wg.unconfigurePeer(&node{
- pubkey: pkeys[0].PublicKey().String(),
+ err = wg.unconfigurePeer(&apb.Node{
+ Clusternet: &cpb.NodeClusterNetworking{
+ WireguardPubkey: pkeys[0].PublicKey().String(),
+ },
})
if err != nil {
t.Fatalf("Failed to unconfigure peer: %v", err)
}
- err = wg.unconfigurePeer(&node{
- pubkey: pkeys[0].PublicKey().String(),
+ err = wg.unconfigurePeer(&apb.Node{
+ Clusternet: &cpb.NodeClusterNetworking{
+ WireguardPubkey: pkeys[0].PublicKey().String(),
+ },
})
if err != nil {
t.Fatalf("Failed to unconfigure peer a second time: %v", err)
diff --git a/metropolis/node/core/clusternet/types.go b/metropolis/node/core/clusternet/types.go
index 088cf8e..67e401a 100644
--- a/metropolis/node/core/clusternet/types.go
+++ b/metropolis/node/core/clusternet/types.go
@@ -1,13 +1,10 @@
package clusternet
import (
- "context"
"net/netip"
"sort"
"strings"
- apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
- "source.monogon.dev/metropolis/pkg/supervisor"
cpb "source.monogon.dev/metropolis/proto/common"
)
@@ -59,131 +56,3 @@
func (p *Prefixes) Equal(o *Prefixes) bool {
return p.String() == o.String()
}
-
-// node is used for internal statekeeping in the cluster networking service.
-type node struct {
- id string
- pubkey string
- address string
- prefixes []string
-}
-
-func (n *node) copy() *node {
- n2 := *n
- return &n2
-}
-
-// update mutates this node to whatever data is held in the given proto Node, and
-// returns true if any data changed.
-func (n *node) update(p *apb.Node) (changed bool) {
- if n.address != p.Status.ExternalAddress {
- n.address = p.Status.ExternalAddress
- changed = true
- }
- if n.pubkey != p.Clusternet.WireguardPubkey {
- n.pubkey = p.Clusternet.WireguardPubkey
- changed = true
- }
-
- var newPrefixes []string
- for _, prefix := range p.Clusternet.Prefixes {
- if prefix.Cidr == "" {
- continue
- }
- newPrefixes = append(newPrefixes, prefix.Cidr)
- }
- oldPrefixes := make([]string, len(n.prefixes))
- copy(oldPrefixes[:], n.prefixes)
-
- sort.Strings(newPrefixes)
- sort.Strings(oldPrefixes)
- if want, got := strings.Join(newPrefixes, ","), strings.Join(oldPrefixes, ","); want != got {
- n.prefixes = newPrefixes
- changed = true
- }
-
- return
-}
-
-// nodeMap is the main internal statekeeping structure of the pull sub-runnable.
-type nodeMap struct {
- nodes map[string]*node
-}
-
-func newNodemap() *nodeMap {
- return &nodeMap{
- nodes: make(map[string]*node),
- }
-}
-
-func (n *nodeMap) stats() (nodes int, prefixes int) {
- nodes = len(n.nodes)
-
- for _, node := range n.nodes {
- prefixes += len(node.prefixes)
- }
-
- return
-}
-
-// update updates the nodeMap from the given Curator WatchEvent, interpreting
-// both node changes and deletions. Two nodeMaps are returned: the first one
-// contains only nodes that have been added/changed by the given event, the other
-// contains only nodes that have been deleted by the given event.
-func (m *nodeMap) update(ctx context.Context, ev *apb.WatchEvent) (changed, removed map[string]*node) {
- changed = make(map[string]*node)
- removed = make(map[string]*node)
-
- // Make sure we're not getting multiple nodes with the same public key. This is
- // not expected to happen in practice as the Curator should prevent this from
- // happening, but we at least want to make sure we're not blowing up routing if
- // other defenses fail.
-
- // pkeys maps from public key to node ID.
- pkeys := make(map[string]string)
- for _, n := range m.nodes {
- // We don't have to check whether we have any collisions already in m.nodes, as
- // the check below prevents them from happening in the first place.
- pkeys[n.pubkey] = n.id
- }
-
- for _, n := range ev.Nodes {
- // Only care about nodes that have all required configuration set.
- if n.Status == nil || n.Status.ExternalAddress == "" || n.Clusternet == nil || n.Clusternet.WireguardPubkey == "" {
- // We could attempt to delete any matching node currently in this nodemap at this
- // point, but this is likely transient and we don't want to just kill routing for
- // no reason.
- continue
- }
-
- key := n.Clusternet.WireguardPubkey
- if id, ok := pkeys[key]; ok && id != n.Id {
- supervisor.Logger(ctx).Warningf("Nodes %q and %q share wireguard key %q. That should not have happened.", n.Id, id, key)
- continue
- }
-
- if _, ok := m.nodes[n.Id]; !ok {
- m.nodes[n.Id] = &node{
- id: n.Id,
- }
- }
- diff := m.nodes[n.Id].update(n)
- if diff {
- changed[n.Id] = m.nodes[n.Id]
- }
- }
-
- for _, t := range ev.NodeTombstones {
- n, ok := m.nodes[t.NodeId]
- if !ok {
- // This is an indication of us losing data somehow. If this happens, it likely
- // means a Curator bug.
- supervisor.Logger(ctx).Warningf("Node %s: tombstone for unknown node", t.NodeId)
- continue
- }
- removed[n.id] = n
- delete(m.nodes, n.id)
- }
-
- return
-}
diff --git a/metropolis/node/core/clusternet/types_test.go b/metropolis/node/core/clusternet/types_test.go
deleted file mode 100644
index 8f58b4e..0000000
--- a/metropolis/node/core/clusternet/types_test.go
+++ /dev/null
@@ -1,104 +0,0 @@
-package clusternet
-
-import (
- "testing"
-
- apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
- cpb "source.monogon.dev/metropolis/proto/common"
-)
-
-func TestNodeUpdate(t *testing.T) {
- var n node
-
- for i, te := range []struct {
- p *apb.Node
- want bool
- }{
- // Case 0: empty incoming node, no change.
- {
- p: &apb.Node{Status: &cpb.NodeStatus{}, Clusternet: &cpb.NodeClusterNetworking{}},
- want: false,
- },
- // Case 1: wireguard key set.
- {
- p: &apb.Node{Status: &cpb.NodeStatus{}, Clusternet: &cpb.NodeClusterNetworking{WireguardPubkey: "fake"}},
- want: true,
- },
- // Case 2: wireguard key updated.
- {
- p: &apb.Node{Status: &cpb.NodeStatus{}, Clusternet: &cpb.NodeClusterNetworking{WireguardPubkey: "fake2"}},
- want: true,
- },
- // Case 3: external address added.
- {
- p: &apb.Node{Status: &cpb.NodeStatus{ExternalAddress: "1.2.3.4"}, Clusternet: &cpb.NodeClusterNetworking{WireguardPubkey: "fake2"}},
- want: true,
- },
- // Case 4: external address changed.
- {
- p: &apb.Node{Status: &cpb.NodeStatus{ExternalAddress: "1.2.3.5"}, Clusternet: &cpb.NodeClusterNetworking{WireguardPubkey: "fake2"}},
- want: true,
- },
- // Case 5: prefixes added
- {
- p: &apb.Node{
- Status: &cpb.NodeStatus{ExternalAddress: "1.2.3.5"},
- Clusternet: &cpb.NodeClusterNetworking{
- WireguardPubkey: "fake2",
- Prefixes: []*cpb.NodeClusterNetworking_Prefix{
- {Cidr: "10.0.2.0/24"},
- {Cidr: "10.0.1.0/24"},
- },
- },
- },
- want: true,
- },
- // Case 6: prefixes changed
- {
- p: &apb.Node{
- Status: &cpb.NodeStatus{ExternalAddress: "1.2.3.5"},
- Clusternet: &cpb.NodeClusterNetworking{
- WireguardPubkey: "fake2",
- Prefixes: []*cpb.NodeClusterNetworking_Prefix{
- {Cidr: "10.0.3.0/24"},
- {Cidr: "10.0.1.0/24"},
- },
- },
- },
- want: true,
- },
- // Case 6: prefixes reordered (no change expected)
- {
- p: &apb.Node{
- Status: &cpb.NodeStatus{ExternalAddress: "1.2.3.5"},
- Clusternet: &cpb.NodeClusterNetworking{
- WireguardPubkey: "fake2",
- Prefixes: []*cpb.NodeClusterNetworking_Prefix{
- {Cidr: "10.0.3.0/24"},
- {Cidr: "10.0.1.0/24"},
- },
- },
- },
- want: false,
- },
- // Case 6: prefixes removed
- {
- p: &apb.Node{
- Status: &cpb.NodeStatus{ExternalAddress: "1.2.3.5"},
- Clusternet: &cpb.NodeClusterNetworking{
- WireguardPubkey: "fake2",
- Prefixes: []*cpb.NodeClusterNetworking_Prefix{},
- },
- },
- want: true,
- },
- } {
- got := n.update(te.p)
- if te.want && !got {
- t.Fatalf("Case %d: expected change, got no change", i)
- }
- if !te.want && got {
- t.Fatalf("Case %d: expected no change, got change", i)
- }
- }
-}
diff --git a/metropolis/node/core/clusternet/wireguard.go b/metropolis/node/core/clusternet/wireguard.go
index 9ce6d49..4c38c79 100644
--- a/metropolis/node/core/clusternet/wireguard.go
+++ b/metropolis/node/core/clusternet/wireguard.go
@@ -10,6 +10,7 @@
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
common "source.monogon.dev/metropolis/node"
+ ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
"source.monogon.dev/metropolis/node/core/localstorage"
)
@@ -28,8 +29,8 @@
type wireguard interface {
ensureOnDiskKey(dir *localstorage.DataKubernetesClusterNetworkingDirectory) error
setup(clusterNet *net.IPNet) error
- configurePeers(n []*node) error
- unconfigurePeer(n *node) error
+ configurePeers(nodes []*ipb.Node) error
+ unconfigurePeer(n *ipb.Node) error
key() wgtypes.Key
close()
}
@@ -114,31 +115,26 @@
return nil
}
-// configurePeers creates or updates a peers on the local wireguard interface
+// configurePeers creates or updates peers on the local wireguard interface
// based on the given nodes.
-//
-// If any node is somehow invalid and causes a parse/reconfiguration error, the
-// function will return an error. The caller should retry with a different set of
-// nodes, performing search/bisection on its own.
-func (s *localWireguard) configurePeers(nodes []*node) error {
+func (s *localWireguard) configurePeers(nodes []*ipb.Node) error {
var configs []wgtypes.PeerConfig
-
for i, n := range nodes {
- if s.privKey.PublicKey().String() == n.pubkey {
+ if s.privKey.PublicKey().String() == n.Clusternet.WireguardPubkey {
// Node doesn't need to connect to itself
continue
}
- pubkeyParsed, err := wgtypes.ParseKey(n.pubkey)
+ pubkeyParsed, err := wgtypes.ParseKey(n.Clusternet.WireguardPubkey)
if err != nil {
- return fmt.Errorf("node %d: failed to parse public-key %q: %w", i, n.pubkey, err)
+ return fmt.Errorf("node %d: failed to parse public-key %q: %w", i, n.Clusternet.WireguardPubkey, err)
}
- addressParsed := net.ParseIP(n.address)
+ addressParsed := net.ParseIP(n.Status.ExternalAddress)
if addressParsed == nil {
- return fmt.Errorf("node %d: failed to parse address %q: %w", i, n.address, err)
+ return fmt.Errorf("node %d: failed to parse address %q: %w", i, n.Status.ExternalAddress, err)
}
var allowedIPs []net.IPNet
- for _, prefix := range n.prefixes {
- _, podNet, err := net.ParseCIDR(prefix)
+ for _, prefix := range n.Clusternet.Prefixes {
+ _, podNet, err := net.ParseCIDR(prefix.Cidr)
if err != nil {
// Just eat the parse error. Not much we can do here. We have enough validation
// in the rest of the system that we shouldn't ever reach this.
@@ -167,10 +163,10 @@
// unconfigurePeer removes the peer from the local WireGuard interface based on
// the given node. If no peer existed matching the given node, this operation is
// a no-op.
-func (s *localWireguard) unconfigurePeer(n *node) error {
- pubkeyParsed, err := wgtypes.ParseKey(n.pubkey)
+func (s *localWireguard) unconfigurePeer(n *ipb.Node) error {
+ pubkeyParsed, err := wgtypes.ParseKey(n.Clusternet.WireguardPubkey)
if err != nil {
- return fmt.Errorf("failed to parse public-key %q: %w", n.pubkey, err)
+ return fmt.Errorf("failed to parse public-key %q: %w", n.Clusternet.WireguardPubkey, err)
}
err = s.wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{
diff --git a/metropolis/node/core/metrics/BUILD.bazel b/metropolis/node/core/metrics/BUILD.bazel
index 88cdc1b..6483377 100644
--- a/metropolis/node/core/metrics/BUILD.bazel
+++ b/metropolis/node/core/metrics/BUILD.bazel
@@ -10,8 +10,10 @@
importpath = "source.monogon.dev/metropolis/node/core/metrics",
visibility = ["//visibility:public"],
deps = [
+ "//go/types/mapsets",
"//metropolis/node",
"//metropolis/node/core/curator/proto/api",
+ "//metropolis/node/core/curator/watcher",
"//metropolis/node/core/identity",
"//metropolis/pkg/supervisor",
],
diff --git a/metropolis/node/core/metrics/discovery.go b/metropolis/node/core/metrics/discovery.go
index 037d3b0..d5a992b 100644
--- a/metropolis/node/core/metrics/discovery.go
+++ b/metropolis/node/core/metrics/discovery.go
@@ -7,23 +7,22 @@
"net/http"
"sync"
- apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
- ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
-
+ "source.monogon.dev/go/types/mapsets"
+ "source.monogon.dev/metropolis/node/core/curator/watcher"
"source.monogon.dev/metropolis/pkg/supervisor"
+
+ ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
)
type Discovery struct {
Curator ipb.CuratorClient
// sdResp will contain the cached sdResponse
- sdResp sdResponse
+ sdResp mapsets.OrderedMap[string, sdTarget]
// sdRespMtx is the mutex for sdResp to allow usage inside the http handler.
sdRespMtx sync.RWMutex
}
-type sdResponse []sdTarget
-
type sdTarget struct {
Targets []string `json:"targets"`
Labels map[string]string `json:"labels"`
@@ -33,73 +32,63 @@
func (s *Discovery) Run(ctx context.Context) error {
supervisor.Signal(ctx, supervisor.SignalHealthy)
- srv, err := s.Curator.Watch(ctx, &apb.WatchRequest{
- Kind: &apb.WatchRequest_NodesInCluster_{
- NodesInCluster: &apb.WatchRequest_NodesInCluster{},
- },
- })
- if err != nil {
- return fmt.Errorf("curator watch failed: %w", err)
- }
- defer srv.CloseSend()
-
defer func() {
s.sdRespMtx.Lock()
// disable the metrics endpoint until the new routine takes over
- s.sdResp = nil
+ s.sdResp.Clear()
s.sdRespMtx.Unlock()
}()
- nodes := make(map[string]*apb.Node)
- for {
- ev, err := srv.Recv()
- if err != nil {
- // The watcher wont return a properly wrapped error which confuses
- // our testing harness. Lets just return the context error directly
- // if it exists.
- if ctx.Err() != nil {
- return ctx.Err()
+ return watcher.WatchNodes(ctx, s.Curator, watcher.SimpleFollower{
+ FilterFn: func(a *ipb.Node) bool {
+ if a.Status == nil {
+ return false
}
-
- return fmt.Errorf("curator watch recv failed: %w", err)
- }
-
- for _, n := range ev.Nodes {
- nodes[n.Id] = n
- }
-
- for _, t := range ev.NodeTombstones {
- n, ok := nodes[t.NodeId]
- if !ok {
- // This is an indication of us losing data somehow. If this happens, it likely
- // means a Curator bug.
- supervisor.Logger(ctx).Warningf("Node %s: tombstone for unknown node", t.NodeId)
- continue
+ if a.Status.ExternalAddress == "" {
+ return false
}
- delete(nodes, n.Id)
- }
-
- s.sdRespMtx.Lock()
-
- s.sdResp = nil
- for _, n := range nodes {
- // Only care about nodes that have all required configuration set.
- if n.Status == nil || n.Status.ExternalAddress == "" || n.Roles == nil {
- continue
+ if a.Roles == nil {
+ return false
}
+ return true
+ },
+ EqualsFn: func(a *ipb.Node, b *ipb.Node) bool {
+ if (a.Roles.ConsensusMember == nil) != (b.Roles.ConsensusMember == nil) {
+ return false
+ }
+ if (a.Roles.KubernetesController == nil) != (b.Roles.KubernetesController == nil) {
+ return false
+ }
+ if (a.Roles.ConsensusMember == nil) != (b.Roles.ConsensusMember == nil) {
+ return false
+ }
+ if a.Status.ExternalAddress != b.Status.ExternalAddress {
+ return false
+ }
+ return true
+ },
+ OnNewUpdated: func(new *ipb.Node) error {
+ s.sdRespMtx.Lock()
+ defer s.sdRespMtx.Unlock()
- s.sdResp = append(s.sdResp, sdTarget{
- Targets: []string{n.Status.ExternalAddress},
+ s.sdResp.Insert(new.Id, sdTarget{
+ Targets: []string{new.Status.ExternalAddress},
Labels: map[string]string{
- "__meta_metropolis_role_kubernetes_worker": fmt.Sprintf("%t", n.Roles.KubernetesWorker != nil),
- "__meta_metropolis_role_kubernetes_controller": fmt.Sprintf("%t", n.Roles.KubernetesController != nil),
- "__meta_metropolis_role_consensus_member": fmt.Sprintf("%t", n.Roles.ConsensusMember != nil),
+ "__meta_metropolis_role_kubernetes_worker": fmt.Sprintf("%t", new.Roles.KubernetesWorker != nil),
+ "__meta_metropolis_role_kubernetes_controller": fmt.Sprintf("%t", new.Roles.KubernetesController != nil),
+ "__meta_metropolis_role_consensus_member": fmt.Sprintf("%t", new.Roles.ConsensusMember != nil),
},
})
- }
+ return nil
+ },
+ OnDeleted: func(prev *ipb.Node) error {
+ s.sdRespMtx.Lock()
+ defer s.sdRespMtx.Unlock()
- s.sdRespMtx.Unlock()
- }
+ s.sdResp.Delete(prev.Id)
+ return nil
+ },
+ })
}
func (s *Discovery) ServeHTTP(w http.ResponseWriter, r *http.Request) {
@@ -111,9 +100,10 @@
s.sdRespMtx.RLock()
defer s.sdRespMtx.RUnlock()
- // If sdResp is nil, which only happens if we are not a master node
- // or we are still booting, we respond with NotImplemented.
- if s.sdResp == nil {
+ // If sdResp is empty, respond with Service Unavailable. This will only happen
+ // early enough in the lifecycle of a control plane node that it doesn't know
+ // about itself, or if this is not a control plane node:
+ if s.sdResp.Count() == 0 {
w.WriteHeader(http.StatusServiceUnavailable)
return
}
@@ -121,7 +111,12 @@
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
- if err := json.NewEncoder(w).Encode(s.sdResp); err != nil {
+ // Turn into a plain array as expected by the service discovery API.
+ var res []sdTarget
+ for _, v := range s.sdResp.Values() {
+ res = append(res, v.Value)
+ }
+ if err := json.NewEncoder(w).Encode(res); err != nil {
// If the encoder fails its mostly because of closed connections
// so lets just ignore these errors.
return
diff --git a/metropolis/node/core/network/hostsfile/BUILD.bazel b/metropolis/node/core/network/hostsfile/BUILD.bazel
index e86850e..51b7f4f 100644
--- a/metropolis/node/core/network/hostsfile/BUILD.bazel
+++ b/metropolis/node/core/network/hostsfile/BUILD.bazel
@@ -7,6 +7,7 @@
visibility = ["//visibility:public"],
deps = [
"//metropolis/node/core/curator/proto/api",
+ "//metropolis/node/core/curator/watcher",
"//metropolis/node/core/localstorage",
"//metropolis/node/core/network",
"//metropolis/pkg/event",
diff --git a/metropolis/node/core/network/hostsfile/hostsfile.go b/metropolis/node/core/network/hostsfile/hostsfile.go
index 4407169..660cf3b 100644
--- a/metropolis/node/core/network/hostsfile/hostsfile.go
+++ b/metropolis/node/core/network/hostsfile/hostsfile.go
@@ -27,6 +27,7 @@
"google.golang.org/grpc"
"google.golang.org/protobuf/proto"
+ "source.monogon.dev/metropolis/node/core/curator/watcher"
"source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/node/core/network"
"source.monogon.dev/metropolis/pkg/event"
@@ -270,41 +271,38 @@
// reflects the up-to-date view of the cluster returned from the Curator Watch
// call, including any node deletions.
func (s *Service) runCluster(ctx context.Context) error {
- w, err := s.Curator.Watch(ctx, &ipb.WatchRequest{
- Kind: &ipb.WatchRequest_NodesInCluster_{
- NodesInCluster: &ipb.WatchRequest_NodesInCluster{},
+ nodes := make(nodeMap)
+ return watcher.WatchNodes(ctx, s.Curator, watcher.SimpleFollower{
+ FilterFn: func(a *ipb.Node) bool {
+ if a.Status == nil || a.Status.ExternalAddress == "" {
+ return false
+ }
+ return true
+ },
+ EqualsFn: func(a *ipb.Node, b *ipb.Node) bool {
+ return a.Status.ExternalAddress == b.Status.ExternalAddress
+ },
+ OnNewUpdated: func(new *ipb.Node) error {
+ nodes[new.Id] = nodeInfo{
+ address: new.Status.ExternalAddress,
+ local: false,
+ controlPlane: new.Roles.ConsensusMember != nil,
+ }
+ return nil
+ },
+ OnDeleted: func(prev *ipb.Node) error {
+ delete(nodes, prev.Id)
+ return nil
+ },
+ OnBatchDone: func() error {
+ // Send copy over channel, as we need to decouple the gRPC receiver from the main
+ // processing loop of the worker.
+ nodesCopy := make(nodeMap)
+ for k, v := range nodes {
+ nodesCopy[k] = v
+ }
+ s.clusterC <- nodesCopy
+ return nil
},
})
- if err != nil {
- return fmt.Errorf("curator watch failed: %w", err)
- }
-
- nodes := make(nodeMap)
- for {
- ev, err := w.Recv()
- if err != nil {
- return fmt.Errorf("receive failed: %w", err)
- }
- for _, n := range ev.Nodes {
- if n.Status == nil || n.Status.ExternalAddress == "" {
- continue
- }
- nodes[n.Id] = nodeInfo{
- address: n.Status.ExternalAddress,
- local: false,
- controlPlane: n.Roles.ConsensusMember != nil,
- }
- }
- for _, t := range ev.NodeTombstones {
- delete(nodes, t.NodeId)
- }
-
- // Copy nodemap before passing it over to the main goroutine. The values don't
- // need to be deep copied as they're not ever changed (only inserted).
- nodesCopy := make(nodeMap)
- for k, v := range nodes {
- nodesCopy[k] = v
- }
- s.clusterC <- nodesCopy
- }
}
diff --git a/metropolis/node/core/roleserve/BUILD.bazel b/metropolis/node/core/roleserve/BUILD.bazel
index ce0b5cc..dd224d5 100644
--- a/metropolis/node/core/roleserve/BUILD.bazel
+++ b/metropolis/node/core/roleserve/BUILD.bazel
@@ -23,6 +23,7 @@
"//metropolis/node/core/consensus",
"//metropolis/node/core/curator",
"//metropolis/node/core/curator/proto/api",
+ "//metropolis/node/core/curator/watcher",
"//metropolis/node/core/identity",
"//metropolis/node/core/localstorage",
"//metropolis/node/core/metrics",
diff --git a/metropolis/node/core/roleserve/roleserve.go b/metropolis/node/core/roleserve/roleserve.go
index 3140ea8..718c394 100644
--- a/metropolis/node/core/roleserve/roleserve.go
+++ b/metropolis/node/core/roleserve/roleserve.go
@@ -50,7 +50,6 @@
"source.monogon.dev/metropolis/pkg/event/memory"
"source.monogon.dev/metropolis/pkg/logtree"
"source.monogon.dev/metropolis/pkg/supervisor"
-
cpb "source.monogon.dev/metropolis/proto/common"
)
diff --git a/metropolis/node/core/roleserve/worker_rolefetch.go b/metropolis/node/core/roleserve/worker_rolefetch.go
index 40d2ffd..aaac076 100644
--- a/metropolis/node/core/roleserve/worker_rolefetch.go
+++ b/metropolis/node/core/roleserve/worker_rolefetch.go
@@ -2,10 +2,10 @@
import (
"context"
- "fmt"
"google.golang.org/protobuf/proto"
+ "source.monogon.dev/metropolis/node/core/curator/watcher"
"source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/pkg/event/memory"
"source.monogon.dev/metropolis/pkg/supervisor"
@@ -70,65 +70,48 @@
// Run networked part in a sub-runnable so that network errors don't cause us to
// retry the above and make us possibly trigger spurious restarts.
supervisor.Run(ctx, "watcher", func(ctx context.Context) error {
- w := s.curatorConnection.Watch()
- defer w.Close()
- cc, err := w.Get(ctx)
+ cw := s.curatorConnection.Watch()
+ defer cw.Close()
+ cc, err := cw.Get(ctx)
if err != nil {
return err
}
nodeID := cc.nodeID()
cur := ipb.NewCuratorClient(cc.conn)
-
- // Start watch for current node, update localRoles whenever we get something
- // new.
- srv, err := cur.Watch(ctx, &ipb.WatchRequest{Kind: &ipb.WatchRequest_NodeInCluster_{
- NodeInCluster: &ipb.WatchRequest_NodeInCluster{
- NodeId: nodeID,
- },
- }})
- if err != nil {
- return fmt.Errorf("watch failed: %w", err)
- }
- defer srv.CloseSend()
+ w := watcher.WatchNode(ctx, cur, nodeID)
+ defer w.Close()
supervisor.Signal(ctx, supervisor.SignalHealthy)
- for {
- ev, err := srv.Recv()
- if err != nil {
- return fmt.Errorf("watch event receive failed: %w", err)
- }
- for _, n := range ev.Nodes {
- n := n
- // Skip spuriously sent other nodes.
- if n.Id != nodeID {
- continue
- }
- supervisor.Logger(ctx).Infof("Got new node data. Roles:")
- if n.Roles.ConsensusMember != nil {
- supervisor.Logger(ctx).Infof(" - control plane member, existing peers: %+v", n.Roles.ConsensusMember.Peers)
- }
- if n.Roles.KubernetesController != nil {
- supervisor.Logger(ctx).Infof(" - kubernetes controller")
- }
- if n.Roles.KubernetesWorker != nil {
- supervisor.Logger(ctx).Infof(" - kubernetes worker")
- }
- s.localRoles.Set(n.Roles)
- // Persist role data to disk.
- bytes, err := proto.Marshal(n.Roles)
+ // Run watch for current node, update localRoles whenever we get something
+ // new.
+ for w.Next() {
+ n := w.Node()
+ supervisor.Logger(ctx).Infof("Got new node data. Roles:")
+ if n.Roles.ConsensusMember != nil {
+ supervisor.Logger(ctx).Infof(" - control plane member, existing peers: %+v", n.Roles.ConsensusMember.Peers)
+ }
+ if n.Roles.KubernetesController != nil {
+ supervisor.Logger(ctx).Infof(" - kubernetes controller")
+ }
+ if n.Roles.KubernetesWorker != nil {
+ supervisor.Logger(ctx).Infof(" - kubernetes worker")
+ }
+ s.localRoles.Set(n.Roles)
+
+ // Persist role data to disk.
+ bytes, err := proto.Marshal(n.Roles)
+ if err != nil {
+ supervisor.Logger(ctx).Errorf("Failed to marshal node roles: %w", err)
+ } else {
+ err = s.storageRoot.Data.Node.PersistedRoles.Write(bytes, 0400)
if err != nil {
- supervisor.Logger(ctx).Errorf("Failed to marshal node roles: %w", err)
- } else {
- err = s.storageRoot.Data.Node.PersistedRoles.Write(bytes, 0400)
- if err != nil {
- supervisor.Logger(ctx).Errorf("Failed to write node roles: %w", err)
- }
+ supervisor.Logger(ctx).Errorf("Failed to write node roles: %w", err)
}
- break
}
}
+ return w.Error()
})
supervisor.Signal(ctx, supervisor.SignalHealthy)
diff --git a/metropolis/node/core/rpc/resolver/BUILD.bazel b/metropolis/node/core/rpc/resolver/BUILD.bazel
index 3a2e6cd..4acf31e 100644
--- a/metropolis/node/core/rpc/resolver/BUILD.bazel
+++ b/metropolis/node/core/rpc/resolver/BUILD.bazel
@@ -12,6 +12,7 @@
deps = [
"//metropolis/node",
"//metropolis/node/core/curator/proto/api",
+ "//metropolis/node/core/curator/watcher",
"//metropolis/proto/common",
"@com_github_cenkalti_backoff_v4//:backoff",
"@org_golang_google_grpc//:go_default_library",
diff --git a/metropolis/node/core/rpc/resolver/processor.go b/metropolis/node/core/rpc/resolver/processor.go
index 975b2df..40174bf 100644
--- a/metropolis/node/core/rpc/resolver/processor.go
+++ b/metropolis/node/core/rpc/resolver/processor.go
@@ -34,47 +34,11 @@
resC chan *curatorMap
}
-type nodeStatusMap map[string]*cpb.NodeStatus
-
-func (n nodeStatusMap) equals(o nodeStatusMap) bool {
- // Check that we have the same keys on both maps.
- for k, _ := range n {
- _, ok := o[k]
- if !ok {
- return false
- }
- }
- for k, _ := range o {
- _, ok := n[k]
- if !ok {
- return false
- }
- }
- // Keys are equal, compare values.
- for k, v1 := range n {
- v2 := o[k]
-
- cur1 := v1.RunningCurator != nil
- cur2 := v2.RunningCurator != nil
- if cur1 != cur2 {
- return false
- }
- if v1.ExternalAddress != v2.ExternalAddress {
- return false
- }
-
- if cur1 && cur2 && v1.RunningCurator.Port != v2.RunningCurator.Port {
- return false
- }
- }
- return true
-}
-
// requestNodesUpdate is received from the curator updater, and carries
// information about the current curators as seen by the cluster control plane.
type requestNodesUpdate struct {
// nodes is a map from node ID to received status
- nodes nodeStatusMap
+ nodes map[string]*cpb.NodeStatus
}
// requestSeedAdd is received from AddEndpoint calls. It updates the processor's
diff --git a/metropolis/node/core/rpc/resolver/resolver.go b/metropolis/node/core/rpc/resolver/resolver.go
index 36ad180..5b89f73 100644
--- a/metropolis/node/core/rpc/resolver/resolver.go
+++ b/metropolis/node/core/rpc/resolver/resolver.go
@@ -13,6 +13,8 @@
"google.golang.org/grpc"
"google.golang.org/grpc/keepalive"
+ "source.monogon.dev/metropolis/node/core/curator/watcher"
+
common "source.monogon.dev/metropolis/node"
apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
cpb "source.monogon.dev/metropolis/proto/common"
@@ -223,66 +225,54 @@
defer cl.Close()
cur := apb.NewCuratorClient(cl)
- prevCurators := make(nodeStatusMap)
return backoff.RetryNotify(func() error {
- w, err := cur.Watch(ctx, &apb.WatchRequest{
- Kind: &apb.WatchRequest_NodesInCluster_{
- NodesInCluster: &apb.WatchRequest_NodesInCluster{},
- },
- })
- if err != nil {
- return fmt.Errorf("could not watch nodes: %v", err)
- }
-
// Map from node ID to status.
nodes := make(map[string]*cpb.NodeStatus)
- // Keep updating map from watcher.
- for {
- ev, err := w.Recv()
- if err != nil {
- return fmt.Errorf("when receiving node: %w", err)
- }
- bo.Reset()
-
- // Update internal map but only care about curators.
- for _, n := range ev.Nodes {
- if n.Status == nil || n.Status.RunningCurator == nil {
- delete(nodes, n.Id)
- continue
+ return watcher.WatchNodes(ctx, cur, &watcher.SimpleFollower{
+ FilterFn: func(a *apb.Node) bool {
+ if a.Status == nil {
+ return false
}
- nodes[n.Id] = n.Status
- }
- for _, n := range ev.NodeTombstones {
- delete(nodes, n.NodeId)
- }
-
- // Make a copy to submit to client.
- curators := make(nodeStatusMap)
- var curatorNames []string
- for k, v := range nodes {
- if v == nil {
- continue
+ if a.Status.ExternalAddress == "" {
+ return false
}
- curators[k] = v
- curatorNames = append(curatorNames, k)
- }
-
- // Only submit an update (and log) if the effective curator map actually changed.
- if prevCurators.equals(curators) {
- continue
- }
- prevCurators = curators
-
- r.logger("CURUPDATE: got new curators: %s", strings.Join(curatorNames, ", "))
-
- select {
- case r.reqC <- &request{nu: &requestNodesUpdate{nodes: curators}}:
- case <-ctx.Done():
- return ctx.Err()
- }
- }
+ if a.Status.RunningCurator == nil {
+ return false
+ }
+ return true
+ },
+ EqualsFn: func(a *apb.Node, b *apb.Node) bool {
+ if a.Status.ExternalAddress != b.Status.ExternalAddress {
+ return false
+ }
+ if (a.Status.RunningCurator == nil) != (b.Status.RunningCurator == nil) {
+ return false
+ }
+ return true
+ },
+ OnNewUpdated: func(new *apb.Node) error {
+ nodes[new.Id] = new.Status
+ return nil
+ },
+ OnDeleted: func(prev *apb.Node) error {
+ delete(nodes, prev.Id)
+ return nil
+ },
+ OnBatchDone: func() error {
+ nodesClone := make(map[string]*cpb.NodeStatus)
+ for k, v := range nodes {
+ nodesClone[k] = v
+ }
+ select {
+ case r.reqC <- &request{nu: &requestNodesUpdate{nodes: nodesClone}}:
+ case <-ctx.Done():
+ return ctx.Err()
+ }
+ return nil
+ },
+ })
}, backoff.WithContext(bo, ctx), func(err error, t time.Duration) {
c := make(chan *responseDebug)
r.reqC <- &request{dbg: &requestDebug{resC: c}}
diff --git a/metropolis/node/kubernetes/BUILD.bazel b/metropolis/node/kubernetes/BUILD.bazel
index a88b4b3..8e68973 100644
--- a/metropolis/node/kubernetes/BUILD.bazel
+++ b/metropolis/node/kubernetes/BUILD.bazel
@@ -20,6 +20,7 @@
"//metropolis/node",
"//metropolis/node/core/clusternet",
"//metropolis/node/core/curator/proto/api",
+ "//metropolis/node/core/curator/watcher",
"//metropolis/node/core/identity",
"//metropolis/node/core/localstorage",
"//metropolis/node/core/network",
diff --git a/metropolis/node/kubernetes/apiproxy.go b/metropolis/node/kubernetes/apiproxy.go
index 9f9c851..d937824 100644
--- a/metropolis/node/kubernetes/apiproxy.go
+++ b/metropolis/node/kubernetes/apiproxy.go
@@ -2,12 +2,12 @@
import (
"context"
- "fmt"
"net"
"source.monogon.dev/go/net/tinylb"
"source.monogon.dev/metropolis/node"
ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+ "source.monogon.dev/metropolis/node/core/curator/watcher"
"source.monogon.dev/metropolis/pkg/event/memory"
)
@@ -15,38 +15,36 @@
// the currently known nodes running a Kubernetes apiserver as retrieved from the
// given curator client.
func updateLoadbalancerAPIServers(ctx context.Context, val *memory.Value[tinylb.BackendSet], cur ipb.CuratorClient) error {
- w, err := cur.Watch(ctx, &ipb.WatchRequest{
- Kind: &ipb.WatchRequest_NodesInCluster_{
- NodesInCluster: &ipb.WatchRequest_NodesInCluster{},
- },
- })
- if err != nil {
- return fmt.Errorf("watch failed: %w", err)
- }
- defer w.CloseSend()
-
set := &tinylb.BackendSet{}
val.Set(set.Clone())
- for {
- ev, err := w.Recv()
- if err != nil {
- return fmt.Errorf("receive failed: %w", err)
- }
- for _, n := range ev.Nodes {
- if n.Status == nil || n.Status.ExternalAddress == "" {
- set.Delete(n.Id)
- continue
+ return watcher.WatchNodes(ctx, cur, watcher.SimpleFollower{
+ FilterFn: func(a *ipb.Node) bool {
+ if a.Status == nil {
+ return false
}
- if n.Roles.KubernetesController == nil {
- set.Delete(n.Id)
- continue
+ if a.Status.ExternalAddress == "" {
+ return false
}
- set.Insert(n.Id, &tinylb.SimpleTCPBackend{Remote: net.JoinHostPort(n.Status.ExternalAddress, node.KubernetesAPIPort.PortString())})
- }
- for _, t := range ev.NodeTombstones {
- set.Delete(t.NodeId)
- }
- val.Set(set.Clone())
- }
+ if a.Roles.KubernetesController == nil {
+ return false
+ }
+ return true
+ },
+ EqualsFn: func(a *ipb.Node, b *ipb.Node) bool {
+ return a.Status.ExternalAddress == b.Status.ExternalAddress
+ },
+ OnNewUpdated: func(new *ipb.Node) error {
+ set.Insert(new.Id, &tinylb.SimpleTCPBackend{
+ Remote: net.JoinHostPort(new.Status.ExternalAddress, node.KubernetesAPIPort.PortString()),
+ })
+ val.Set(set.Clone())
+ return nil
+ },
+ OnDeleted: func(prev *ipb.Node) error {
+ set.Delete(prev.Id)
+ val.Set(set.Clone())
+ return nil
+ },
+ })
}