metropolis: move curator client watches to curator/watcher
This replaces all the ad-hoc code to watch Curator node(s) with calls
through the new curator/watcher library.
Change-Id: Ie2a82b330e4108b9b725515cb10595916c38b323
Reviewed-on: https://review.monogon.dev/c/monogon/+/2263
Tested-by: Jenkins CI
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/node/core/clusternet/BUILD.bazel b/metropolis/node/core/clusternet/BUILD.bazel
index a5fc41a..1ccce66 100644
--- a/metropolis/node/core/clusternet/BUILD.bazel
+++ b/metropolis/node/core/clusternet/BUILD.bazel
@@ -13,6 +13,7 @@
deps = [
"//metropolis/node",
"//metropolis/node/core/curator/proto/api",
+ "//metropolis/node/core/curator/watcher",
"//metropolis/node/core/localstorage",
"//metropolis/node/core/network",
"//metropolis/pkg/event",
@@ -27,10 +28,7 @@
go_test(
name = "clusternet_test",
- srcs = [
- "clusternet_test.go",
- "types_test.go",
- ],
+ srcs = ["clusternet_test.go"],
embed = [":clusternet"],
deps = [
"//metropolis/node",
diff --git a/metropolis/node/core/clusternet/clusternet.go b/metropolis/node/core/clusternet/clusternet.go
index 2e01c77..85ac63f 100644
--- a/metropolis/node/core/clusternet/clusternet.go
+++ b/metropolis/node/core/clusternet/clusternet.go
@@ -26,9 +26,11 @@
"fmt"
"net"
"net/netip"
+ "slices"
"github.com/cenkalti/backoff/v4"
+ "source.monogon.dev/metropolis/node/core/curator/watcher"
"source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/node/core/network"
"source.monogon.dev/metropolis/pkg/event"
@@ -176,58 +178,45 @@
func (s *Service) pull(ctx context.Context) error {
supervisor.Signal(ctx, supervisor.SignalHealthy)
- srv, err := s.Curator.Watch(ctx, &apb.WatchRequest{
- Kind: &apb.WatchRequest_NodesInCluster_{
- NodesInCluster: &apb.WatchRequest_NodesInCluster{},
+ var batch []*apb.Node
+ return watcher.WatchNodes(ctx, s.Curator, watcher.SimpleFollower{
+ FilterFn: func(a *apb.Node) bool {
+ if a.Clusternet == nil {
+ return false
+ }
+ if a.Clusternet.WireguardPubkey == "" {
+ return false
+ }
+ return true
+ },
+ EqualsFn: func(a *apb.Node, b *apb.Node) bool {
+ if a.Status.ExternalAddress != b.Status.ExternalAddress {
+ return false
+ }
+ if a.Clusternet.WireguardPubkey != b.Clusternet.WireguardPubkey {
+ return false
+ }
+ if !slices.Equal(a.Clusternet.Prefixes, b.Clusternet.Prefixes) {
+ return false
+ }
+ return true
+ },
+ OnNewUpdated: func(new *apb.Node) error {
+ batch = append(batch, new)
+ return nil
+ },
+ OnBatchDone: func() error {
+ if err := s.wg.configurePeers(batch); err != nil {
+ supervisor.Logger(ctx).Errorf("nodes couldn't be configured: %v", err)
+ }
+ batch = nil
+ return nil
+ },
+ OnDeleted: func(prev *apb.Node) error {
+ if err := s.wg.unconfigurePeer(prev); err != nil {
+ supervisor.Logger(ctx).Errorf("Node %s couldn't be unconfigured: %v", prev.Id, err)
+ }
+ return nil
},
})
- if err != nil {
- return fmt.Errorf("curator watch failed: %w", err)
- }
- defer srv.CloseSend()
-
- nodes := newNodemap()
- for {
- ev, err := srv.Recv()
- if err != nil {
- return fmt.Errorf("curator watch recv failed: %w", err)
- }
-
- updated, removed := nodes.update(ctx, ev)
-
- for _, n := range removed {
- supervisor.Logger(ctx).Infof("Node %s removed, unconfiguring", n.id)
- if err := s.wg.unconfigurePeer(n.copy()); err != nil {
- // Do nothing and hope whatever caused this will go away at some point.
- supervisor.Logger(ctx).Errorf("Node %s couldn't be unconfigured: %v", n.id, err)
- }
- }
- var newNodes []*node
- for _, n := range updated {
- newNodes = append(newNodes, n.copy())
- supervisor.Logger(ctx).Infof("Node %s updated: pk %s, address %s, prefixes %v", n.id, n.pubkey, n.address, n.prefixes)
- }
- succeeded := 0
- if err := s.wg.configurePeers(newNodes); err != nil {
- // If configuring all nodes at once failed, go node-by-node to make sure we've
- // done as much as possible.
- supervisor.Logger(ctx).Warningf("Bulk node update call failed, trying node-by-node..: %v", err)
- for _, n := range newNodes {
- if err := s.wg.configurePeers([]*node{n}); err != nil {
- supervisor.Logger(ctx).Errorf("Node %s failed: %v", n.id, err)
- } else {
- succeeded += 1
- }
- }
- } else {
- succeeded = len(newNodes)
- }
-
- if len(newNodes) != 0 {
- supervisor.Logger(ctx).Infof("Successfully updated %d out of %d nodes", succeeded, len(newNodes))
-
- numNodes, numPrefixes := nodes.stats()
- supervisor.Logger(ctx).Infof("Total: %d nodes, %d prefixes.", numNodes, numPrefixes)
- }
- }
}
diff --git a/metropolis/node/core/clusternet/clusternet_test.go b/metropolis/node/core/clusternet/clusternet_test.go
index bda1459..b92cfb2 100644
--- a/metropolis/node/core/clusternet/clusternet_test.go
+++ b/metropolis/node/core/clusternet/clusternet_test.go
@@ -4,7 +4,8 @@
"fmt"
"net"
"os"
- "strings"
+ "slices"
+ "sort"
"sync"
"testing"
"time"
@@ -21,6 +22,7 @@
"source.monogon.dev/metropolis/test/util"
apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
+ cpb "source.monogon.dev/metropolis/proto/common"
)
// fakeWireguard implements wireguard while keeping peer information internally.
@@ -28,7 +30,7 @@
k wgtypes.Key
muNodes sync.Mutex
- nodes map[string]*node
+ nodes map[string]*apb.Node
failNextUpdate bool
}
@@ -40,27 +42,29 @@
func (f *fakeWireguard) setup(clusterNet *net.IPNet) error {
f.muNodes.Lock()
defer f.muNodes.Unlock()
- f.nodes = make(map[string]*node)
+ f.nodes = make(map[string]*apb.Node)
return nil
}
-func (f *fakeWireguard) configurePeers(nodes []*node) error {
+func (f *fakeWireguard) configurePeers(nodes []*apb.Node) error {
f.muNodes.Lock()
defer f.muNodes.Unlock()
+
if f.failNextUpdate {
f.failNextUpdate = false
return fmt.Errorf("synthetic test failure")
}
+
for _, n := range nodes {
- f.nodes[n.id] = n
+ f.nodes[n.Id] = n
}
return nil
}
-func (f *fakeWireguard) unconfigurePeer(n *node) error {
+func (f *fakeWireguard) unconfigurePeer(node *apb.Node) error {
f.muNodes.Lock()
defer f.muNodes.Unlock()
- delete(f.nodes, n.id)
+ delete(f.nodes, node.Id)
return nil
}
@@ -105,7 +109,7 @@
}
supervisor.TestHarness(t, svc.Run)
- checkState := func(nodes map[string]*node) error {
+ checkState := func(nodes map[string]*apb.Node) error {
t.Helper()
wg.muNodes.Lock()
defer wg.muNodes.Unlock()
@@ -114,16 +118,23 @@
if !ok {
return fmt.Errorf("node %q missing in programmed peers", nid)
}
- if n2.pubkey != n.pubkey {
- return fmt.Errorf("node %q pubkey mismatch: %q in programmed peers, %q wanted", nid, n2.pubkey, n.pubkey)
+ if got, want := n2.Clusternet.WireguardPubkey, n.Clusternet.WireguardPubkey; got != want {
+ return fmt.Errorf("node %q pubkey mismatch: %q in programmed peers, %q wanted", nid, got, want)
}
- if n2.address != n.address {
- return fmt.Errorf("node %q address mismatch: %q in programmed peers, %q wanted", nid, n2.address, n.address)
+ if got, want := n2.Status.ExternalAddress, n.Status.ExternalAddress; got != want {
+ return fmt.Errorf("node %q address mismatch: %q in programmed peers, %q wanted", nid, got, want)
}
- p := strings.Join(n.prefixes, ",")
- p2 := strings.Join(n2.prefixes, ",")
- if p != p2 {
- return fmt.Errorf("node %q prefixes mismatch: %v in programmed peers, %v wanted", nid, n2.prefixes, n.prefixes)
+ var p, p2 []string
+ for _, prefix := range n.Clusternet.Prefixes {
+ p = append(p, prefix.Cidr)
+ }
+ for _, prefix := range n2.Clusternet.Prefixes {
+ p2 = append(p2, prefix.Cidr)
+ }
+ sort.Strings(p)
+ sort.Strings(p2)
+ if !slices.Equal(p, p2) {
+ return fmt.Errorf("node %q prefixes mismatch: %v in programmed peers, %v wanted", nid, p2, p)
}
}
for nid, _ := range wg.nodes {
@@ -134,7 +145,7 @@
return nil
}
- assertStateEventual := func(nodes map[string]*node) {
+ assertStateEventual := func(nodes map[string]*apb.Node) {
t.Helper()
deadline := time.Now().Add(5 * time.Second)
for {
@@ -152,34 +163,46 @@
// Start with a single node.
cur.NodeWithPrefixes(key1, "metropolis-fake-1", "1.2.3.4")
- assertStateEventual(map[string]*node{
+ assertStateEventual(map[string]*apb.Node{
"metropolis-fake-1": {
- pubkey: key1.PublicKey().String(),
- address: "1.2.3.4",
- prefixes: nil,
+ Status: &cpb.NodeStatus{
+ ExternalAddress: "1.2.3.4",
+ },
+ Clusternet: &cpb.NodeClusterNetworking{
+ WireguardPubkey: key1.PublicKey().String(),
+ },
},
})
// Change the node's peer address.
cur.NodeWithPrefixes(key1, "metropolis-fake-1", "1.2.3.5")
- assertStateEventual(map[string]*node{
+ assertStateEventual(map[string]*apb.Node{
"metropolis-fake-1": {
- pubkey: key1.PublicKey().String(),
- address: "1.2.3.5",
- prefixes: nil,
+ Status: &cpb.NodeStatus{
+ ExternalAddress: "1.2.3.5",
+ },
+ Clusternet: &cpb.NodeClusterNetworking{
+ WireguardPubkey: key1.PublicKey().String(),
+ },
},
})
// Add another node.
cur.NodeWithPrefixes(key2, "metropolis-fake-2", "1.2.3.6")
- assertStateEventual(map[string]*node{
+ assertStateEventual(map[string]*apb.Node{
"metropolis-fake-1": {
- pubkey: key1.PublicKey().String(),
- address: "1.2.3.5",
- prefixes: nil,
+ Status: &cpb.NodeStatus{
+ ExternalAddress: "1.2.3.5",
+ },
+ Clusternet: &cpb.NodeClusterNetworking{
+ WireguardPubkey: key1.PublicKey().String(),
+ },
},
"metropolis-fake-2": {
- pubkey: key2.PublicKey().String(),
- address: "1.2.3.6",
- prefixes: nil,
+ Status: &cpb.NodeStatus{
+ ExternalAddress: "1.2.3.6",
+ },
+ Clusternet: &cpb.NodeClusterNetworking{
+ WireguardPubkey: key2.PublicKey().String(),
+ },
},
})
// Add some prefixes to both nodes, but fail the next configurePeers call.
@@ -188,30 +211,42 @@
wg.muNodes.Unlock()
cur.NodeWithPrefixes(key1, "metropolis-fake-1", "1.2.3.5", "10.100.10.0/24", "10.100.20.0/24")
cur.NodeWithPrefixes(key2, "metropolis-fake-2", "1.2.3.6", "10.100.30.0/24", "10.100.40.0/24")
- assertStateEventual(map[string]*node{
+ assertStateEventual(map[string]*apb.Node{
"metropolis-fake-1": {
- pubkey: key1.PublicKey().String(),
- address: "1.2.3.5",
- prefixes: []string{
- "10.100.10.0/24", "10.100.20.0/24",
+ Status: &cpb.NodeStatus{
+ ExternalAddress: "1.2.3.5",
+ },
+ Clusternet: &cpb.NodeClusterNetworking{
+ WireguardPubkey: key1.PublicKey().String(),
+ // No prefixes as the call failed.
},
},
"metropolis-fake-2": {
- pubkey: key2.PublicKey().String(),
- address: "1.2.3.6",
- prefixes: []string{
- "10.100.30.0/24", "10.100.40.0/24",
+ Status: &cpb.NodeStatus{
+ ExternalAddress: "1.2.3.6",
+ },
+ Clusternet: &cpb.NodeClusterNetworking{
+ WireguardPubkey: key2.PublicKey().String(),
+ Prefixes: []*cpb.NodeClusterNetworking_Prefix{
+ {Cidr: "10.100.30.0/24"},
+ {Cidr: "10.100.40.0/24"},
+ },
},
},
})
// Delete one of the nodes.
cur.DeleteNode("metropolis-fake-1")
- assertStateEventual(map[string]*node{
+ assertStateEventual(map[string]*apb.Node{
"metropolis-fake-2": {
- pubkey: key2.PublicKey().String(),
- address: "1.2.3.6",
- prefixes: []string{
- "10.100.30.0/24", "10.100.40.0/24",
+ Status: &cpb.NodeStatus{
+ ExternalAddress: "1.2.3.6",
+ },
+ Clusternet: &cpb.NodeClusterNetworking{
+ WireguardPubkey: key2.PublicKey().String(),
+ Prefixes: []*cpb.NodeClusterNetworking_Prefix{
+ {Cidr: "10.100.30.0/24"},
+ {Cidr: "10.100.40.0/24"},
+ },
},
},
})
@@ -289,21 +324,31 @@
if err != nil {
t.Fatalf("Failed to generate private key: %v", err)
}
- err = wg.configurePeers([]*node{
+ err = wg.configurePeers([]*apb.Node{
{
- pubkey: pkeys[0].PublicKey().String(),
- address: "10.100.0.1",
- prefixes: []string{
- "10.0.0.0/24",
- "10.0.1.0/24",
+ Id: "test-0",
+ Status: &cpb.NodeStatus{
+ ExternalAddress: "10.100.0.1",
+ },
+ Clusternet: &cpb.NodeClusterNetworking{
+ WireguardPubkey: pkeys[0].PublicKey().String(),
+ Prefixes: []*cpb.NodeClusterNetworking_Prefix{
+ {Cidr: "10.0.0.0/24"},
+ {Cidr: "10.0.1.0/24"},
+ },
},
},
{
- pubkey: pkeys[1].PublicKey().String(),
- address: "10.100.1.1",
- prefixes: []string{
- "10.1.0.0/24",
- "10.1.1.0/24",
+ Id: "test-1",
+ Status: &cpb.NodeStatus{
+ ExternalAddress: "10.100.1.1",
+ },
+ Clusternet: &cpb.NodeClusterNetworking{
+ WireguardPubkey: pkeys[1].PublicKey().String(),
+ Prefixes: []*cpb.NodeClusterNetworking_Prefix{
+ {Cidr: "10.1.0.0/24"},
+ {Cidr: "10.1.1.0/24"},
+ },
},
},
})
@@ -338,17 +383,22 @@
}
// Update one of the peers and check that things got applied.
- err = wg.configurePeers([]*node{
+ err = wg.configurePeers([]*apb.Node{
{
- pubkey: pkeys[0].PublicKey().String(),
- address: "10.100.0.3",
- prefixes: []string{
- "10.0.0.0/24",
+ Id: "test-0",
+ Status: &cpb.NodeStatus{
+ ExternalAddress: "10.100.0.3",
+ },
+ Clusternet: &cpb.NodeClusterNetworking{
+ WireguardPubkey: pkeys[0].PublicKey().String(),
+ Prefixes: []*cpb.NodeClusterNetworking_Prefix{
+ {Cidr: "10.0.0.0/24"},
+ },
},
},
})
if err != nil {
- t.Fatalf("Failed to connect to netlink's WireGuard config endpoint: %v", err)
+ t.Fatalf("Failed to update peer: %v", err)
}
wgDev, err = wgClient.Device(clusterNetDeviceName)
if err != nil {
@@ -373,14 +423,18 @@
}
// Remove one of the peers and make sure it's gone.
- err = wg.unconfigurePeer(&node{
- pubkey: pkeys[0].PublicKey().String(),
+ err = wg.unconfigurePeer(&apb.Node{
+ Clusternet: &cpb.NodeClusterNetworking{
+ WireguardPubkey: pkeys[0].PublicKey().String(),
+ },
})
if err != nil {
t.Fatalf("Failed to unconfigure peer: %v", err)
}
- err = wg.unconfigurePeer(&node{
- pubkey: pkeys[0].PublicKey().String(),
+ err = wg.unconfigurePeer(&apb.Node{
+ Clusternet: &cpb.NodeClusterNetworking{
+ WireguardPubkey: pkeys[0].PublicKey().String(),
+ },
})
if err != nil {
t.Fatalf("Failed to unconfigure peer a second time: %v", err)
diff --git a/metropolis/node/core/clusternet/types.go b/metropolis/node/core/clusternet/types.go
index 088cf8e..67e401a 100644
--- a/metropolis/node/core/clusternet/types.go
+++ b/metropolis/node/core/clusternet/types.go
@@ -1,13 +1,10 @@
package clusternet
import (
- "context"
"net/netip"
"sort"
"strings"
- apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
- "source.monogon.dev/metropolis/pkg/supervisor"
cpb "source.monogon.dev/metropolis/proto/common"
)
@@ -59,131 +56,3 @@
func (p *Prefixes) Equal(o *Prefixes) bool {
return p.String() == o.String()
}
-
-// node is used for internal statekeeping in the cluster networking service.
-type node struct {
- id string
- pubkey string
- address string
- prefixes []string
-}
-
-func (n *node) copy() *node {
- n2 := *n
- return &n2
-}
-
-// update mutates this node to whatever data is held in the given proto Node, and
-// returns true if any data changed.
-func (n *node) update(p *apb.Node) (changed bool) {
- if n.address != p.Status.ExternalAddress {
- n.address = p.Status.ExternalAddress
- changed = true
- }
- if n.pubkey != p.Clusternet.WireguardPubkey {
- n.pubkey = p.Clusternet.WireguardPubkey
- changed = true
- }
-
- var newPrefixes []string
- for _, prefix := range p.Clusternet.Prefixes {
- if prefix.Cidr == "" {
- continue
- }
- newPrefixes = append(newPrefixes, prefix.Cidr)
- }
- oldPrefixes := make([]string, len(n.prefixes))
- copy(oldPrefixes[:], n.prefixes)
-
- sort.Strings(newPrefixes)
- sort.Strings(oldPrefixes)
- if want, got := strings.Join(newPrefixes, ","), strings.Join(oldPrefixes, ","); want != got {
- n.prefixes = newPrefixes
- changed = true
- }
-
- return
-}
-
-// nodeMap is the main internal statekeeping structure of the pull sub-runnable.
-type nodeMap struct {
- nodes map[string]*node
-}
-
-func newNodemap() *nodeMap {
- return &nodeMap{
- nodes: make(map[string]*node),
- }
-}
-
-func (n *nodeMap) stats() (nodes int, prefixes int) {
- nodes = len(n.nodes)
-
- for _, node := range n.nodes {
- prefixes += len(node.prefixes)
- }
-
- return
-}
-
-// update updates the nodeMap from the given Curator WatchEvent, interpreting
-// both node changes and deletions. Two nodeMaps are returned: the first one
-// contains only nodes that have been added/changed by the given event, the other
-// contains only nodes that have been deleted by the given event.
-func (m *nodeMap) update(ctx context.Context, ev *apb.WatchEvent) (changed, removed map[string]*node) {
- changed = make(map[string]*node)
- removed = make(map[string]*node)
-
- // Make sure we're not getting multiple nodes with the same public key. This is
- // not expected to happen in practice as the Curator should prevent this from
- // happening, but we at least want to make sure we're not blowing up routing if
- // other defenses fail.
-
- // pkeys maps from public key to node ID.
- pkeys := make(map[string]string)
- for _, n := range m.nodes {
- // We don't have to check whether we have any collisions already in m.nodes, as
- // the check below prevents them from happening in the first place.
- pkeys[n.pubkey] = n.id
- }
-
- for _, n := range ev.Nodes {
- // Only care about nodes that have all required configuration set.
- if n.Status == nil || n.Status.ExternalAddress == "" || n.Clusternet == nil || n.Clusternet.WireguardPubkey == "" {
- // We could attempt to delete any matching node currently in this nodemap at this
- // point, but this is likely transient and we don't want to just kill routing for
- // no reason.
- continue
- }
-
- key := n.Clusternet.WireguardPubkey
- if id, ok := pkeys[key]; ok && id != n.Id {
- supervisor.Logger(ctx).Warningf("Nodes %q and %q share wireguard key %q. That should not have happened.", n.Id, id, key)
- continue
- }
-
- if _, ok := m.nodes[n.Id]; !ok {
- m.nodes[n.Id] = &node{
- id: n.Id,
- }
- }
- diff := m.nodes[n.Id].update(n)
- if diff {
- changed[n.Id] = m.nodes[n.Id]
- }
- }
-
- for _, t := range ev.NodeTombstones {
- n, ok := m.nodes[t.NodeId]
- if !ok {
- // This is an indication of us losing data somehow. If this happens, it likely
- // means a Curator bug.
- supervisor.Logger(ctx).Warningf("Node %s: tombstone for unknown node", t.NodeId)
- continue
- }
- removed[n.id] = n
- delete(m.nodes, n.id)
- }
-
- return
-}
diff --git a/metropolis/node/core/clusternet/types_test.go b/metropolis/node/core/clusternet/types_test.go
deleted file mode 100644
index 8f58b4e..0000000
--- a/metropolis/node/core/clusternet/types_test.go
+++ /dev/null
@@ -1,104 +0,0 @@
-package clusternet
-
-import (
- "testing"
-
- apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
- cpb "source.monogon.dev/metropolis/proto/common"
-)
-
-func TestNodeUpdate(t *testing.T) {
- var n node
-
- for i, te := range []struct {
- p *apb.Node
- want bool
- }{
- // Case 0: empty incoming node, no change.
- {
- p: &apb.Node{Status: &cpb.NodeStatus{}, Clusternet: &cpb.NodeClusterNetworking{}},
- want: false,
- },
- // Case 1: wireguard key set.
- {
- p: &apb.Node{Status: &cpb.NodeStatus{}, Clusternet: &cpb.NodeClusterNetworking{WireguardPubkey: "fake"}},
- want: true,
- },
- // Case 2: wireguard key updated.
- {
- p: &apb.Node{Status: &cpb.NodeStatus{}, Clusternet: &cpb.NodeClusterNetworking{WireguardPubkey: "fake2"}},
- want: true,
- },
- // Case 3: external address added.
- {
- p: &apb.Node{Status: &cpb.NodeStatus{ExternalAddress: "1.2.3.4"}, Clusternet: &cpb.NodeClusterNetworking{WireguardPubkey: "fake2"}},
- want: true,
- },
- // Case 4: external address changed.
- {
- p: &apb.Node{Status: &cpb.NodeStatus{ExternalAddress: "1.2.3.5"}, Clusternet: &cpb.NodeClusterNetworking{WireguardPubkey: "fake2"}},
- want: true,
- },
- // Case 5: prefixes added
- {
- p: &apb.Node{
- Status: &cpb.NodeStatus{ExternalAddress: "1.2.3.5"},
- Clusternet: &cpb.NodeClusterNetworking{
- WireguardPubkey: "fake2",
- Prefixes: []*cpb.NodeClusterNetworking_Prefix{
- {Cidr: "10.0.2.0/24"},
- {Cidr: "10.0.1.0/24"},
- },
- },
- },
- want: true,
- },
- // Case 6: prefixes changed
- {
- p: &apb.Node{
- Status: &cpb.NodeStatus{ExternalAddress: "1.2.3.5"},
- Clusternet: &cpb.NodeClusterNetworking{
- WireguardPubkey: "fake2",
- Prefixes: []*cpb.NodeClusterNetworking_Prefix{
- {Cidr: "10.0.3.0/24"},
- {Cidr: "10.0.1.0/24"},
- },
- },
- },
- want: true,
- },
- // Case 6: prefixes reordered (no change expected)
- {
- p: &apb.Node{
- Status: &cpb.NodeStatus{ExternalAddress: "1.2.3.5"},
- Clusternet: &cpb.NodeClusterNetworking{
- WireguardPubkey: "fake2",
- Prefixes: []*cpb.NodeClusterNetworking_Prefix{
- {Cidr: "10.0.3.0/24"},
- {Cidr: "10.0.1.0/24"},
- },
- },
- },
- want: false,
- },
- // Case 6: prefixes removed
- {
- p: &apb.Node{
- Status: &cpb.NodeStatus{ExternalAddress: "1.2.3.5"},
- Clusternet: &cpb.NodeClusterNetworking{
- WireguardPubkey: "fake2",
- Prefixes: []*cpb.NodeClusterNetworking_Prefix{},
- },
- },
- want: true,
- },
- } {
- got := n.update(te.p)
- if te.want && !got {
- t.Fatalf("Case %d: expected change, got no change", i)
- }
- if !te.want && got {
- t.Fatalf("Case %d: expected no change, got change", i)
- }
- }
-}
diff --git a/metropolis/node/core/clusternet/wireguard.go b/metropolis/node/core/clusternet/wireguard.go
index 9ce6d49..4c38c79 100644
--- a/metropolis/node/core/clusternet/wireguard.go
+++ b/metropolis/node/core/clusternet/wireguard.go
@@ -10,6 +10,7 @@
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
common "source.monogon.dev/metropolis/node"
+ ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
"source.monogon.dev/metropolis/node/core/localstorage"
)
@@ -28,8 +29,8 @@
type wireguard interface {
ensureOnDiskKey(dir *localstorage.DataKubernetesClusterNetworkingDirectory) error
setup(clusterNet *net.IPNet) error
- configurePeers(n []*node) error
- unconfigurePeer(n *node) error
+ configurePeers(nodes []*ipb.Node) error
+ unconfigurePeer(n *ipb.Node) error
key() wgtypes.Key
close()
}
@@ -114,31 +115,26 @@
return nil
}
-// configurePeers creates or updates a peers on the local wireguard interface
+// configurePeers creates or updates peers on the local wireguard interface
// based on the given nodes.
-//
-// If any node is somehow invalid and causes a parse/reconfiguration error, the
-// function will return an error. The caller should retry with a different set of
-// nodes, performing search/bisection on its own.
-func (s *localWireguard) configurePeers(nodes []*node) error {
+func (s *localWireguard) configurePeers(nodes []*ipb.Node) error {
var configs []wgtypes.PeerConfig
-
for i, n := range nodes {
- if s.privKey.PublicKey().String() == n.pubkey {
+ if s.privKey.PublicKey().String() == n.Clusternet.WireguardPubkey {
// Node doesn't need to connect to itself
continue
}
- pubkeyParsed, err := wgtypes.ParseKey(n.pubkey)
+ pubkeyParsed, err := wgtypes.ParseKey(n.Clusternet.WireguardPubkey)
if err != nil {
- return fmt.Errorf("node %d: failed to parse public-key %q: %w", i, n.pubkey, err)
+ return fmt.Errorf("node %d: failed to parse public-key %q: %w", i, n.Clusternet.WireguardPubkey, err)
}
- addressParsed := net.ParseIP(n.address)
+ addressParsed := net.ParseIP(n.Status.ExternalAddress)
if addressParsed == nil {
- return fmt.Errorf("node %d: failed to parse address %q: %w", i, n.address, err)
+ return fmt.Errorf("node %d: failed to parse address %q: %w", i, n.Status.ExternalAddress, err)
}
var allowedIPs []net.IPNet
- for _, prefix := range n.prefixes {
- _, podNet, err := net.ParseCIDR(prefix)
+ for _, prefix := range n.Clusternet.Prefixes {
+ _, podNet, err := net.ParseCIDR(prefix.Cidr)
if err != nil {
// Just eat the parse error. Not much we can do here. We have enough validation
// in the rest of the system that we shouldn't ever reach this.
@@ -167,10 +163,10 @@
// unconfigurePeer removes the peer from the local WireGuard interface based on
// the given node. If no peer existed matching the given node, this operation is
// a no-op.
-func (s *localWireguard) unconfigurePeer(n *node) error {
- pubkeyParsed, err := wgtypes.ParseKey(n.pubkey)
+func (s *localWireguard) unconfigurePeer(n *ipb.Node) error {
+ pubkeyParsed, err := wgtypes.ParseKey(n.Clusternet.WireguardPubkey)
if err != nil {
- return fmt.Errorf("failed to parse public-key %q: %w", n.pubkey, err)
+ return fmt.Errorf("failed to parse public-key %q: %w", n.Clusternet.WireguardPubkey, err)
}
err = s.wgClient.ConfigureDevice(clusterNetDeviceName, wgtypes.Config{