m/n/core/clusternet: grab external IP address prefix from network service

This moves the logic for merging the node IP and node prefixes from the
submitter of the prefixes into the clusternet logic itself.

This means clusternet now has two independent sources of prefix data:
the network service's external IP address, and the kubelet's node
prefixes.

This simplifies use in a worker/controller split, where a controller
node normally doesn't submit any prefixes as it's not running a kubelet
or kubelet-adjacent prefixes - but we still want it to submit its
external IP address.

Change-Id: I46c9430228ce966426d3a8d33a765ecfdfca0d29
Reviewed-on: https://review.monogon.dev/c/monogon/+/1479
Tested-by: Jenkins CI
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/node/core/clusternet/clusternet.go b/metropolis/node/core/clusternet/clusternet.go
index c4b4a8e..dc2de00 100644
--- a/metropolis/node/core/clusternet/clusternet.go
+++ b/metropolis/node/core/clusternet/clusternet.go
@@ -16,22 +16,25 @@
 // interface. This is used in practice to allow other host nodes (whose external
 // addresses are outside the cluster network) to access the cluster network.
 //
-// Second, we only have a single source/owner of prefixes per node: the
-// Kubernetes service. This is reflected as the LocalKubernetesPodNetwork event
-// Value in Service.
+// Second, we have two hardcoded/purpose-specific sources of prefixes:
+//  1. Pod networking node prefixes from the kubelet
+//  2. The host's external IP address (as a /32) from the network service.
 package clusternet
 
 import (
 	"context"
 	"fmt"
 	"net"
+	"net/netip"
 
 	"github.com/cenkalti/backoff/v4"
 
-	apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
 	"source.monogon.dev/metropolis/node/core/localstorage"
+	"source.monogon.dev/metropolis/node/core/network"
 	"source.monogon.dev/metropolis/pkg/event"
 	"source.monogon.dev/metropolis/pkg/supervisor"
+
+	apb "source.monogon.dev/metropolis/node/core/curator/proto/api"
 	cpb "source.monogon.dev/metropolis/proto/common"
 )
 
@@ -51,6 +54,8 @@
 	// be announced into the mesh. This is to be Set by the Kubernetes service once
 	// it knows about the local node's IPAM address assignment.
 	LocalKubernetesPodNetwork event.Value[*Prefixes]
+	// Network service used to get the local node's IP address to submit it as a /32.
+	Network event.Value[*network.Status]
 
 	// wg is the interface to all the low-level interactions with WireGuard (and
 	// kernel routing). If not set, this defaults to a production implementation.
@@ -72,10 +77,19 @@
 
 	supervisor.Logger(ctx).Infof("Wireguard setup complete, starting updaters...")
 
-	if err := supervisor.Run(ctx, "pusher", s.push); err != nil {
+	kubeC := make(chan *Prefixes)
+	netC := make(chan *network.Status)
+	if err := supervisor.RunGroup(ctx, map[string]supervisor.Runnable{
+		"source-kubernetes": event.Pipe(s.LocalKubernetesPodNetwork, kubeC),
+		"source-network":    event.Pipe(s.Network, netC),
+		"push": func(ctx context.Context) error {
+			return s.push(ctx, kubeC, netC)
+		},
+	}); err != nil {
 		return err
 	}
-	if err := supervisor.Run(ctx, "puller", s.pull); err != nil {
+
+	if err := supervisor.Run(ctx, "pull", s.pull); err != nil {
 		return err
 	}
 	supervisor.Signal(ctx, supervisor.SignalHealthy)
@@ -85,25 +99,42 @@
 
 // push is the sub-runnable responsible for letting the Curator know about what
 // prefixes that are originated by this node.
-func (s *Service) push(ctx context.Context) error {
+func (s *Service) push(ctx context.Context, kubeC chan *Prefixes, netC chan *network.Status) error {
 	supervisor.Signal(ctx, supervisor.SignalHealthy)
 
-	w := s.LocalKubernetesPodNetwork.Watch()
-	defer w.Close()
-
+	var kubePrefixes *Prefixes
+	var localAddr net.IP
 	for {
-		// We only submit our wireguard key and prefixes when we're actually ready to
-		// announce something.
-		k8sPrefixes, err := w.Get(ctx)
-		if err != nil {
-			return fmt.Errorf("couldn't get k8s prefixes: %w", err)
+		select {
+		case <-ctx.Done():
+			return ctx.Err()
+		case kubePrefixes = <-kubeC:
+		case n := <-netC:
+			localAddr = n.ExternalAddress
 		}
 
-		err = backoff.Retry(func() error {
+		// Prepare prefixes to submit to cluster.
+		var prefixes Prefixes
+
+		// Do we have a local node address? Add it to the prefixes.
+		if len(localAddr) > 0 {
+			addr, ok := netip.AddrFromSlice(localAddr)
+			if ok {
+				prefixes = append(prefixes, netip.PrefixFrom(addr, 32))
+			}
+		}
+		// Do we have any kubelet prefixes? Add them, too.
+		if kubePrefixes != nil {
+			prefixes.Update(kubePrefixes)
+		}
+
+		supervisor.Logger(ctx).Infof("Submitting prefixes: %s", prefixes)
+
+		err := backoff.Retry(func() error {
 			_, err := s.Curator.UpdateNodeClusterNetworking(ctx, &apb.UpdateNodeClusterNetworkingRequest{
 				Clusternet: &cpb.NodeClusterNetworking{
 					WireguardPubkey: s.wg.key().PublicKey().String(),
-					Prefixes:        k8sPrefixes.proto(),
+					Prefixes:        prefixes.proto(),
 				},
 			})
 			if err != nil {