m/n/core/clusternet: grab external IP address prefix from network service
This moves the logic for merging the node IP and node prefixes from the
submitter of the prefixes into the clusternet logic itself.
This means clusternet now has two independent sources of prefix data:
the network service's external IP address, and the kubelet's node
prefixes.
This simplifies use in a worker/controller split, where a controller
node normally doesn't submit any prefixes as it's not running a kubelet
or kubelet-adjacent prefixes - but we still want it to submit its
external IP address.
Change-Id: I46c9430228ce966426d3a8d33a765ecfdfca0d29
Reviewed-on: https://review.monogon.dev/c/monogon/+/1479
Tested-by: Jenkins CI
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/node/core/clusternet/BUILD.bazel b/metropolis/node/core/clusternet/BUILD.bazel
index c577d96..35e6903 100644
--- a/metropolis/node/core/clusternet/BUILD.bazel
+++ b/metropolis/node/core/clusternet/BUILD.bazel
@@ -14,6 +14,7 @@
"//metropolis/node",
"//metropolis/node/core/curator/proto/api",
"//metropolis/node/core/localstorage",
+ "//metropolis/node/core/network",
"//metropolis/pkg/event",
"//metropolis/pkg/supervisor",
"//metropolis/proto/common",
@@ -36,6 +37,7 @@
"//metropolis/node/core/curator/proto/api",
"//metropolis/node/core/localstorage",
"//metropolis/node/core/localstorage/declarative",
+ "//metropolis/node/core/network",
"//metropolis/pkg/event/memory",
"//metropolis/pkg/supervisor",
"//metropolis/proto/common",
diff --git a/metropolis/node/core/clusternet/clusternet.go b/metropolis/node/core/clusternet/clusternet.go
index c4b4a8e..dc2de00 100644
--- a/metropolis/node/core/clusternet/clusternet.go
+++ b/metropolis/node/core/clusternet/clusternet.go
@@ -16,22 +16,25 @@
// interface. This is used in practice to allow other host nodes (whose external
// addresses are outside the cluster network) to access the cluster network.
//
-// Second, we only have a single source/owner of prefixes per node: the
-// Kubernetes service. This is reflected as the LocalKubernetesPodNetwork event
-// Value in Service.
+// Second, we have two hardcoded/purpose-specific sources of prefixes:
+// 1. Pod networking node prefixes from the kubelet
+// 2. The host's external IP address (as a /32) from the network service.
package clusternet
import (
"context"
"fmt"
"net"
+ "net/netip"
"github.com/cenkalti/backoff/v4"
- apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
"source.monogon.dev/metropolis/node/core/localstorage"
+ "source.monogon.dev/metropolis/node/core/network"
"source.monogon.dev/metropolis/pkg/event"
"source.monogon.dev/metropolis/pkg/supervisor"
+
+ apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
cpb "source.monogon.dev/metropolis/proto/common"
)
@@ -51,6 +54,8 @@
// be announced into the mesh. This is to be Set by the Kubernetes service once
// it knows about the local node's IPAM address assignment.
LocalKubernetesPodNetwork event.Value[*Prefixes]
+ // Network service used to get the local node's IP address to submit it as a /32.
+ Network event.Value[*network.Status]
// wg is the interface to all the low-level interactions with WireGuard (and
// kernel routing). If not set, this defaults to a production implementation.
@@ -72,10 +77,19 @@
supervisor.Logger(ctx).Infof("Wireguard setup complete, starting updaters...")
- if err := supervisor.Run(ctx, "pusher", s.push); err != nil {
+ kubeC := make(chan *Prefixes)
+ netC := make(chan *network.Status)
+ if err := supervisor.RunGroup(ctx, map[string]supervisor.Runnable{
+ "source-kubernetes": event.Pipe(s.LocalKubernetesPodNetwork, kubeC),
+ "source-network": event.Pipe(s.Network, netC),
+ "push": func(ctx context.Context) error {
+ return s.push(ctx, kubeC, netC)
+ },
+ }); err != nil {
return err
}
- if err := supervisor.Run(ctx, "puller", s.pull); err != nil {
+
+ if err := supervisor.Run(ctx, "pull", s.pull); err != nil {
return err
}
supervisor.Signal(ctx, supervisor.SignalHealthy)
@@ -85,25 +99,42 @@
// push is the sub-runnable responsible for letting the Curator know about what
// prefixes that are originated by this node.
-func (s *Service) push(ctx context.Context) error {
+func (s *Service) push(ctx context.Context, kubeC chan *Prefixes, netC chan *network.Status) error {
supervisor.Signal(ctx, supervisor.SignalHealthy)
- w := s.LocalKubernetesPodNetwork.Watch()
- defer w.Close()
-
+ var kubePrefixes *Prefixes
+ var localAddr net.IP
for {
- // We only submit our wireguard key and prefixes when we're actually ready to
- // announce something.
- k8sPrefixes, err := w.Get(ctx)
- if err != nil {
- return fmt.Errorf("couldn't get k8s prefixes: %w", err)
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ case kubePrefixes = <-kubeC:
+ case n := <-netC:
+ localAddr = n.ExternalAddress
}
- err = backoff.Retry(func() error {
+ // Prepare prefixes to submit to cluster.
+ var prefixes Prefixes
+
+ // Do we have a local node address? Add it to the prefixes.
+ if len(localAddr) > 0 {
+ addr, ok := netip.AddrFromSlice(localAddr)
+ if ok {
+ prefixes = append(prefixes, netip.PrefixFrom(addr, 32))
+ }
+ }
+ // Do we have any kubelet prefixes? Add them, too.
+ if kubePrefixes != nil {
+ prefixes.Update(kubePrefixes)
+ }
+
+ supervisor.Logger(ctx).Infof("Submitting prefixes: %s", prefixes)
+
+ err := backoff.Retry(func() error {
_, err := s.Curator.UpdateNodeClusterNetworking(ctx, &apb.UpdateNodeClusterNetworkingRequest{
Clusternet: &cpb.NodeClusterNetworking{
WireguardPubkey: s.wg.key().PublicKey().String(),
- Prefixes: k8sPrefixes.proto(),
+ Prefixes: prefixes.proto(),
},
})
if err != nil {
diff --git a/metropolis/node/core/clusternet/clusternet_test.go b/metropolis/node/core/clusternet/clusternet_test.go
index 0c70507..509678b 100644
--- a/metropolis/node/core/clusternet/clusternet_test.go
+++ b/metropolis/node/core/clusternet/clusternet_test.go
@@ -19,6 +19,7 @@
common "source.monogon.dev/metropolis/node"
"source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/node/core/localstorage/declarative"
+ "source.monogon.dev/metropolis/node/core/network"
"source.monogon.dev/metropolis/pkg/event/memory"
"source.monogon.dev/metropolis/pkg/supervisor"
@@ -180,6 +181,8 @@
defer cl.Close()
curator := apb.NewCuratorClient(cl)
+ var nval memory.Value[*network.Status]
+
var podNetwork memory.Value[*Prefixes]
wg := &fakeWireguard{}
svc := Service{
@@ -190,6 +193,7 @@
},
DataDirectory: nil,
LocalKubernetesPodNetwork: &podNetwork,
+ Network: &nval,
wg: wg,
}
diff --git a/metropolis/node/core/clusternet/types.go b/metropolis/node/core/clusternet/types.go
index 9cd1052..80adec9 100644
--- a/metropolis/node/core/clusternet/types.go
+++ b/metropolis/node/core/clusternet/types.go
@@ -15,8 +15,8 @@
// Cluster Networking mesh.
type Prefixes []netip.Prefix
-func (p Prefixes) proto() (res []*cpb.NodeClusterNetworking_Prefix) {
- for _, prefix := range p {
+func (p *Prefixes) proto() (res []*cpb.NodeClusterNetworking_Prefix) {
+ for _, prefix := range *p {
res = append(res, &cpb.NodeClusterNetworking_Prefix{
Cidr: prefix.String(),
})
@@ -24,6 +24,33 @@
return
}
+// Update by copying all prefixes from o into p, merging duplicates as necessary.
+func (p *Prefixes) Update(o *Prefixes) {
+ // Gather prefixes we already have.
+ cur := make(map[netip.Prefix]bool)
+ for _, pp := range *p {
+ cur[pp] = true
+ }
+
+ // Copy over any prefix that we don't yet have.
+ for _, pp := range *o {
+ if cur[pp] {
+ continue
+ }
+ cur[pp] = true
+ *p = append(*p, pp)
+ }
+}
+
+// String returns a stringified, comma-dalimited representation of the prefixes.
+func (p *Prefixes) String() string {
+ var strs []string
+ for _, pp := range *p {
+ strs = append(strs, pp.String())
+ }
+ return strings.Join(strs, ", ")
+}
+
// node is used for internal statekeeping in the cluster networking service.
type node struct {
id string
diff --git a/metropolis/node/core/roleserve/roleserve.go b/metropolis/node/core/roleserve/roleserve.go
index a665044..d2e5bf0 100644
--- a/metropolis/node/core/roleserve/roleserve.go
+++ b/metropolis/node/core/roleserve/roleserve.go
@@ -151,6 +151,7 @@
clusterMembership: &s.ClusterMembership,
podNetwork: &s.podNetwork,
+ network: s.Network,
}
s.hostsfile = &workerHostsfile{
diff --git a/metropolis/node/core/roleserve/worker_clusternet.go b/metropolis/node/core/roleserve/worker_clusternet.go
index d80dea1..7dfeba6 100644
--- a/metropolis/node/core/roleserve/worker_clusternet.go
+++ b/metropolis/node/core/roleserve/worker_clusternet.go
@@ -6,6 +6,7 @@
"source.monogon.dev/metropolis/node/core/clusternet"
"source.monogon.dev/metropolis/node/core/localstorage"
+ "source.monogon.dev/metropolis/node/core/network"
"source.monogon.dev/metropolis/pkg/event/memory"
"source.monogon.dev/metropolis/pkg/supervisor"
@@ -19,6 +20,7 @@
clusterMembership *memory.Value[*ClusterMembership]
// podNetwork will be read.
podNetwork *memory.Value[*clusternet.Prefixes]
+ network *network.Service
}
func (s *workerClusternet) run(ctx context.Context) error {
@@ -46,6 +48,7 @@
},
DataDirectory: &s.storageRoot.Data.Kubernetes.ClusterNetworking,
LocalKubernetesPodNetwork: s.podNetwork,
+ Network: s.network.Value(),
}
return svc.Run(ctx)
}