m/n/core/clusternet: grab external IP address prefix from network service

This moves the logic for merging the node IP and node prefixes from the
submitter of the prefixes into the clusternet logic itself.

This means clusternet now has two independent sources of prefix data:
the network service's external IP address, and the kubelet's node
prefixes.

This simplifies use in a worker/controller split, where a controller
node normally doesn't submit any prefixes as it's not running a kubelet
or kubelet-adjacent prefixes - but we still want it to submit its
external IP address.

Change-Id: I46c9430228ce966426d3a8d33a765ecfdfca0d29
Reviewed-on: https://review.monogon.dev/c/monogon/+/1479
Tested-by: Jenkins CI
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/node/core/clusternet/BUILD.bazel b/metropolis/node/core/clusternet/BUILD.bazel
index c577d96..35e6903 100644
--- a/metropolis/node/core/clusternet/BUILD.bazel
+++ b/metropolis/node/core/clusternet/BUILD.bazel
@@ -14,6 +14,7 @@
         "//metropolis/node",
         "//metropolis/node/core/curator/proto/api",
         "//metropolis/node/core/localstorage",
+        "//metropolis/node/core/network",
         "//metropolis/pkg/event",
         "//metropolis/pkg/supervisor",
         "//metropolis/proto/common",
@@ -36,6 +37,7 @@
         "//metropolis/node/core/curator/proto/api",
         "//metropolis/node/core/localstorage",
         "//metropolis/node/core/localstorage/declarative",
+        "//metropolis/node/core/network",
         "//metropolis/pkg/event/memory",
         "//metropolis/pkg/supervisor",
         "//metropolis/proto/common",
diff --git a/metropolis/node/core/clusternet/clusternet.go b/metropolis/node/core/clusternet/clusternet.go
index c4b4a8e..dc2de00 100644
--- a/metropolis/node/core/clusternet/clusternet.go
+++ b/metropolis/node/core/clusternet/clusternet.go
@@ -16,22 +16,25 @@
 // interface. This is used in practice to allow other host nodes (whose external
 // addresses are outside the cluster network) to access the cluster network.
 //
-// Second, we only have a single source/owner of prefixes per node: the
-// Kubernetes service. This is reflected as the LocalKubernetesPodNetwork event
-// Value in Service.
+// Second, we have two hardcoded/purpose-specific sources of prefixes:
+//  1. Pod networking node prefixes from the kubelet
+//  2. The host's external IP address (as a /32) from the network service.
 package clusternet
 
 import (
 	"context"
 	"fmt"
 	"net"
+	"net/netip"
 
 	"github.com/cenkalti/backoff/v4"
 
-	apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
 	"source.monogon.dev/metropolis/node/core/localstorage"
+	"source.monogon.dev/metropolis/node/core/network"
 	"source.monogon.dev/metropolis/pkg/event"
 	"source.monogon.dev/metropolis/pkg/supervisor"
+
+	apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
 	cpb "source.monogon.dev/metropolis/proto/common"
 )
 
@@ -51,6 +54,8 @@
 	// be announced into the mesh. This is to be Set by the Kubernetes service once
 	// it knows about the local node's IPAM address assignment.
 	LocalKubernetesPodNetwork event.Value[*Prefixes]
+	// Network service used to get the local node's IP address to submit it as a /32.
+	Network event.Value[*network.Status]
 
 	// wg is the interface to all the low-level interactions with WireGuard (and
 	// kernel routing). If not set, this defaults to a production implementation.
@@ -72,10 +77,19 @@
 
 	supervisor.Logger(ctx).Infof("Wireguard setup complete, starting updaters...")
 
-	if err := supervisor.Run(ctx, "pusher", s.push); err != nil {
+	kubeC := make(chan *Prefixes)
+	netC := make(chan *network.Status)
+	if err := supervisor.RunGroup(ctx, map[string]supervisor.Runnable{
+		"source-kubernetes": event.Pipe(s.LocalKubernetesPodNetwork, kubeC),
+		"source-network":    event.Pipe(s.Network, netC),
+		"push": func(ctx context.Context) error {
+			return s.push(ctx, kubeC, netC)
+		},
+	}); err != nil {
 		return err
 	}
-	if err := supervisor.Run(ctx, "puller", s.pull); err != nil {
+
+	if err := supervisor.Run(ctx, "pull", s.pull); err != nil {
 		return err
 	}
 	supervisor.Signal(ctx, supervisor.SignalHealthy)
@@ -85,25 +99,42 @@
 
 // push is the sub-runnable responsible for letting the Curator know about what
 // prefixes that are originated by this node.
-func (s *Service) push(ctx context.Context) error {
+func (s *Service) push(ctx context.Context, kubeC chan *Prefixes, netC chan *network.Status) error {
 	supervisor.Signal(ctx, supervisor.SignalHealthy)
 
-	w := s.LocalKubernetesPodNetwork.Watch()
-	defer w.Close()
-
+	var kubePrefixes *Prefixes
+	var localAddr net.IP
 	for {
-		// We only submit our wireguard key and prefixes when we're actually ready to
-		// announce something.
-		k8sPrefixes, err := w.Get(ctx)
-		if err != nil {
-			return fmt.Errorf("couldn't get k8s prefixes: %w", err)
+		select {
+		case <-ctx.Done():
+			return ctx.Err()
+		case kubePrefixes = <-kubeC:
+		case n := <-netC:
+			localAddr = n.ExternalAddress
 		}
 
-		err = backoff.Retry(func() error {
+		// Prepare prefixes to submit to cluster.
+		var prefixes Prefixes
+
+		// Do we have a local node address? Add it to the prefixes.
+		if len(localAddr) > 0 {
+			addr, ok := netip.AddrFromSlice(localAddr)
+			if ok {
+				prefixes = append(prefixes, netip.PrefixFrom(addr, 32))
+			}
+		}
+		// Do we have any kubelet prefixes? Add them, too.
+		if kubePrefixes != nil {
+			prefixes.Update(kubePrefixes)
+		}
+
+		supervisor.Logger(ctx).Infof("Submitting prefixes: %s", prefixes)
+
+		err := backoff.Retry(func() error {
 			_, err := s.Curator.UpdateNodeClusterNetworking(ctx, &apb.UpdateNodeClusterNetworkingRequest{
 				Clusternet: &cpb.NodeClusterNetworking{
 					WireguardPubkey: s.wg.key().PublicKey().String(),
-					Prefixes:        k8sPrefixes.proto(),
+					Prefixes:        prefixes.proto(),
 				},
 			})
 			if err != nil {
diff --git a/metropolis/node/core/clusternet/clusternet_test.go b/metropolis/node/core/clusternet/clusternet_test.go
index 0c70507..509678b 100644
--- a/metropolis/node/core/clusternet/clusternet_test.go
+++ b/metropolis/node/core/clusternet/clusternet_test.go
@@ -19,6 +19,7 @@
 	common "source.monogon.dev/metropolis/node"
 	"source.monogon.dev/metropolis/node/core/localstorage"
 	"source.monogon.dev/metropolis/node/core/localstorage/declarative"
+	"source.monogon.dev/metropolis/node/core/network"
 	"source.monogon.dev/metropolis/pkg/event/memory"
 	"source.monogon.dev/metropolis/pkg/supervisor"
 
@@ -180,6 +181,8 @@
 	defer cl.Close()
 	curator := apb.NewCuratorClient(cl)
 
+	var nval memory.Value[*network.Status]
+
 	var podNetwork memory.Value[*Prefixes]
 	wg := &fakeWireguard{}
 	svc := Service{
@@ -190,6 +193,7 @@
 		},
 		DataDirectory:             nil,
 		LocalKubernetesPodNetwork: &podNetwork,
+		Network:                   &nval,
 
 		wg: wg,
 	}
diff --git a/metropolis/node/core/clusternet/types.go b/metropolis/node/core/clusternet/types.go
index 9cd1052..80adec9 100644
--- a/metropolis/node/core/clusternet/types.go
+++ b/metropolis/node/core/clusternet/types.go
@@ -15,8 +15,8 @@
 // Cluster Networking mesh.
 type Prefixes []netip.Prefix
 
-func (p Prefixes) proto() (res []*cpb.NodeClusterNetworking_Prefix) {
-	for _, prefix := range p {
+func (p *Prefixes) proto() (res []*cpb.NodeClusterNetworking_Prefix) {
+	for _, prefix := range *p {
 		res = append(res, &cpb.NodeClusterNetworking_Prefix{
 			Cidr: prefix.String(),
 		})
@@ -24,6 +24,33 @@
 	return
 }
 
+// Update by copying all prefixes from o into p, merging duplicates as necessary.
+func (p *Prefixes) Update(o *Prefixes) {
+	// Gather prefixes we already have.
+	cur := make(map[netip.Prefix]bool)
+	for _, pp := range *p {
+		cur[pp] = true
+	}
+
+	// Copy over any prefix that we don't yet have.
+	for _, pp := range *o {
+		if cur[pp] {
+			continue
+		}
+		cur[pp] = true
+		*p = append(*p, pp)
+	}
+}
+
+// String returns a stringified, comma-dalimited representation of the prefixes.
+func (p *Prefixes) String() string {
+	var strs []string
+	for _, pp := range *p {
+		strs = append(strs, pp.String())
+	}
+	return strings.Join(strs, ", ")
+}
+
 // node is used for internal statekeeping in the cluster networking service.
 type node struct {
 	id       string
diff --git a/metropolis/node/core/roleserve/roleserve.go b/metropolis/node/core/roleserve/roleserve.go
index a665044..d2e5bf0 100644
--- a/metropolis/node/core/roleserve/roleserve.go
+++ b/metropolis/node/core/roleserve/roleserve.go
@@ -151,6 +151,7 @@
 
 		clusterMembership: &s.ClusterMembership,
 		podNetwork:        &s.podNetwork,
+		network:           s.Network,
 	}
 
 	s.hostsfile = &workerHostsfile{
diff --git a/metropolis/node/core/roleserve/worker_clusternet.go b/metropolis/node/core/roleserve/worker_clusternet.go
index d80dea1..7dfeba6 100644
--- a/metropolis/node/core/roleserve/worker_clusternet.go
+++ b/metropolis/node/core/roleserve/worker_clusternet.go
@@ -6,6 +6,7 @@
 
 	"source.monogon.dev/metropolis/node/core/clusternet"
 	"source.monogon.dev/metropolis/node/core/localstorage"
+	"source.monogon.dev/metropolis/node/core/network"
 	"source.monogon.dev/metropolis/pkg/event/memory"
 	"source.monogon.dev/metropolis/pkg/supervisor"
 
@@ -19,6 +20,7 @@
 	clusterMembership *memory.Value[*ClusterMembership]
 	// podNetwork will be read.
 	podNetwork *memory.Value[*clusternet.Prefixes]
+	network    *network.Service
 }
 
 func (s *workerClusternet) run(ctx context.Context) error {
@@ -46,6 +48,7 @@
 		},
 		DataDirectory:             &s.storageRoot.Data.Kubernetes.ClusterNetworking,
 		LocalKubernetesPodNetwork: s.podNetwork,
+		Network:                   s.network.Value(),
 	}
 	return svc.Run(ctx)
 }
diff --git a/metropolis/node/kubernetes/clusternet/clusternet.go b/metropolis/node/kubernetes/clusternet/clusternet.go
index f4974eb..7b51c30 100644
--- a/metropolis/node/kubernetes/clusternet/clusternet.go
+++ b/metropolis/node/kubernetes/clusternet/clusternet.go
@@ -33,7 +33,6 @@
 
 import (
 	"context"
-	"errors"
 	"net/netip"
 	"time"
 
@@ -64,25 +63,6 @@
 		return nil
 	}
 
-	var internalIP netip.Addr
-	for _, addr := range newNode.Status.Addresses {
-		if addr.Type == corev1.NodeInternalIP {
-			if internalIP.IsUnspecified() {
-				s.logger.Warningf("More than one NodeInternalIP specified, using the first one")
-				break
-			}
-			ip, err := netip.ParseAddr(addr.Address)
-			if err != nil {
-				s.logger.Warningf("Failed to parse Internal IP %s", addr.Address)
-				continue
-			}
-			internalIP = ip
-		}
-	}
-	if internalIP.IsUnspecified() {
-		return errors.New("node has no Internal IP")
-	}
-
 	var prefixes oclusternet.Prefixes
 	for _, podNetStr := range newNode.Spec.PodCIDRs {
 		prefix, err := netip.ParsePrefix(podNetStr)
@@ -92,7 +72,6 @@
 		}
 		prefixes = append(prefixes, prefix)
 	}
-	prefixes = append(prefixes, netip.PrefixFrom(internalIP, 32))
 
 	s.logger.V(1).Infof("Updating locally originated prefixes: %+v", prefixes)
 	s.Prefixes.Set(&prefixes)
diff --git a/metropolis/node/kubernetes/service_worker.go b/metropolis/node/kubernetes/service_worker.go
index 5ddc32d..2e6e190 100644
--- a/metropolis/node/kubernetes/service_worker.go
+++ b/metropolis/node/kubernetes/service_worker.go
@@ -7,8 +7,10 @@
 
 	"source.monogon.dev/go/net/tinylb"
 	"source.monogon.dev/metropolis/node"
+	oclusternet "source.monogon.dev/metropolis/node/core/clusternet"
 	"source.monogon.dev/metropolis/node/core/localstorage"
 	"source.monogon.dev/metropolis/node/core/network"
+	"source.monogon.dev/metropolis/pkg/event"
 	"source.monogon.dev/metropolis/pkg/event/memory"
 	"source.monogon.dev/metropolis/pkg/supervisor"
 
@@ -24,6 +26,7 @@
 	Network       *network.Service
 	NodeID        string
 	CuratorClient ipb.CuratorClient
+	PodNetwork    event.Value[*oclusternet.Prefixes]
 }
 
 type Worker struct {
@@ -67,6 +70,7 @@
 	if err != nil {
 		return err
 	}
+
 	supervisor.Signal(ctx, supervisor.SignalHealthy)
 	<-ctx.Done()
 	return nil