metropolis/node: use Event Value for network status

This moves over the GetIP API to use our fancy new event/value library.
The consumers of this data are currently the cluster manager and the
kubernetes root service. Both are migrated over.

Test Plan: Refactor, covered by E2E tests.

X-Origin-Diff: phab/D711
GitOrigin-RevId: 8a1e0dd35236d55492722f4439323cb2ee9574fc
diff --git a/metropolis/node/core/BUILD.bazel b/metropolis/node/core/BUILD.bazel
index e7e6179..24200d8 100644
--- a/metropolis/node/core/BUILD.bazel
+++ b/metropolis/node/core/BUILD.bazel
@@ -19,7 +19,6 @@
         "//metropolis/node/core/localstorage:go_default_library",
         "//metropolis/node/core/localstorage/declarative:go_default_library",
         "//metropolis/node/core/network:go_default_library",
-        "//metropolis/node/core/network/dns:go_default_library",
         "//metropolis/node/kubernetes:go_default_library",
         "//metropolis/node/kubernetes/containerd:go_default_library",
         "//metropolis/node/kubernetes/pki:go_default_library",
diff --git a/metropolis/node/core/cluster/manager.go b/metropolis/node/core/cluster/manager.go
index 6d6d592..6c85fb3 100644
--- a/metropolis/node/core/cluster/manager.go
+++ b/metropolis/node/core/cluster/manager.go
@@ -291,10 +291,15 @@
 func (m *Manager) stateCreatingCluster(ctx context.Context) error {
 	logger := supervisor.Logger(ctx)
 	logger.Info("Creating new cluster: waiting for IP address...")
-	ip, err := m.networkService.GetIP(ctx, true)
+
+	// STOPGAP: bad use of watcher (should be long-term)
+	watcher := m.networkService.Watch()
+	defer watcher.Close()
+	data, err := watcher.Get(ctx)
 	if err != nil {
 		return fmt.Errorf("when getting IP address: %w", err)
 	}
+	ip := data.ExternalAddress
 	logger.Infof("Creating new cluster: got IP address %s", ip.String())
 
 	logger.Info("Creating new cluster: initializing storage...")
@@ -310,7 +315,7 @@
 		return fmt.Errorf("failed to create new node certificate: %w", err)
 	}
 
-	node := NewNode(cuk, *ip, *cert.Leaf)
+	node := NewNode(cuk, ip, *cert.Leaf)
 
 	m.consensus = consensus.New(consensus.Config{
 		Data:           &m.storageRoot.Data.Etcd,
diff --git a/metropolis/node/core/delve_enabled.go b/metropolis/node/core/delve_enabled.go
index dc17f59..24ea470 100644
--- a/metropolis/node/core/delve_enabled.go
+++ b/metropolis/node/core/delve_enabled.go
@@ -32,7 +32,11 @@
 		// This is intentionally delayed until network becomes available since Delve for some reason connects to itself
 		// and in early-boot no network interface is available to do that through. Also external access isn't possible
 		// early on anyways.
-		networkSvc.GetIP(context.Background(), true)
+		watcher := networkSvc.Watch()
+		_, err := watcher.Get(context.Background())
+		if err != nil {
+			panic(err)
+		}
 		dlvCmd := exec.Command("/dlv", "--headless=true", fmt.Sprintf("--listen=:%v", node.DebuggerPort),
 			"--accept-multiclient", "--only-same-user=false", "attach", "--continue", "1", "/init")
 		if err := dlvCmd.Start(); err != nil {
diff --git a/metropolis/node/core/main.go b/metropolis/node/core/main.go
index 0f6ebd1..7c3f10c 100644
--- a/metropolis/node/core/main.go
+++ b/metropolis/node/core/main.go
@@ -37,7 +37,6 @@
 	"source.monogon.dev/metropolis/node/core/localstorage"
 	"source.monogon.dev/metropolis/node/core/localstorage/declarative"
 	"source.monogon.dev/metropolis/node/core/network"
-	"source.monogon.dev/metropolis/node/core/network/dns"
 	"source.monogon.dev/metropolis/node/kubernetes"
 	"source.monogon.dev/metropolis/node/kubernetes/containerd"
 	"source.monogon.dev/metropolis/node/kubernetes/pki"
@@ -112,9 +111,7 @@
 		logger.Fatalf("Failed to initialize TPM 2.0: %v", err)
 	}
 
-	corednsRegistrationChan := make(chan *dns.ExtraDirective)
-
-	networkSvc := network.New(network.Config{CorednsRegistrationChan: corednsRegistrationChan})
+	networkSvc := network.New()
 
 	// This function initializes a headless Delve if this is a debug build or does nothing if it's not
 	initializeDebugger(networkSvc)
@@ -146,12 +143,6 @@
 			return fmt.Errorf("when starting network: %w", err)
 		}
 
-		// Wait for IP address from network.
-		ip, err := networkSvc.GetIP(ctx, true)
-		if err != nil {
-			return fmt.Errorf("when waiting for IP address: %w", err)
-		}
-
 		// Start cluster manager. This kicks off cluster membership machinery, which will either start
 		// a new cluster, enroll into one or join one.
 		m := cluster.NewManager(root, networkSvc)
@@ -211,8 +202,7 @@
 
 			kubernetesConfig.KPKI = kpki
 			kubernetesConfig.Root = root
-			kubernetesConfig.AdvertiseAddress = *ip
-			kubernetesConfig.CorednsRegistrationChan = corednsRegistrationChan
+			kubernetesConfig.Network = networkSvc
 			kubeSvc = kubernetes.New(kubernetesConfig)
 			if err := supervisor.Run(ctx, "kubernetes", kubeSvc.Run); err != nil {
 				return fmt.Errorf("failed to start kubernetes service: %w", err)
diff --git a/metropolis/node/core/network/BUILD.bazel b/metropolis/node/core/network/BUILD.bazel
index d24e07c..10378c1 100644
--- a/metropolis/node/core/network/BUILD.bazel
+++ b/metropolis/node/core/network/BUILD.bazel
@@ -9,7 +9,7 @@
         "//metropolis/node/core/network/dhcp4c:go_default_library",
         "//metropolis/node/core/network/dhcp4c/callback:go_default_library",
         "//metropolis/node/core/network/dns:go_default_library",
-        "//metropolis/pkg/logtree:go_default_library",
+        "//metropolis/pkg/event:go_default_library",
         "//metropolis/pkg/supervisor:go_default_library",
         "@com_github_google_nftables//:go_default_library",
         "@com_github_google_nftables//expr:go_default_library",
diff --git a/metropolis/node/core/network/main.go b/metropolis/node/core/network/main.go
index c23b85c..fe71634 100644
--- a/metropolis/node/core/network/main.go
+++ b/metropolis/node/core/network/main.go
@@ -18,14 +18,11 @@
 
 import (
 	"context"
-	"errors"
 	"fmt"
 	"net"
 	"os"
 	"path"
 	"strings"
-	"sync"
-	"time"
 
 	"github.com/google/nftables"
 	"github.com/google/nftables/expr"
@@ -35,41 +32,85 @@
 	"source.monogon.dev/metropolis/node/core/network/dhcp4c"
 	dhcpcb "source.monogon.dev/metropolis/node/core/network/dhcp4c/callback"
 	"source.monogon.dev/metropolis/node/core/network/dns"
-	"source.monogon.dev/metropolis/pkg/logtree"
+	"source.monogon.dev/metropolis/pkg/event"
 	"source.monogon.dev/metropolis/pkg/supervisor"
 )
 
-const (
-	resolvConfPath     = "/etc/resolv.conf"
-	resolvConfSwapPath = "/etc/resolv.conf.new"
-)
-
+// Service is the network service for this node. It maintains all
+// networking-related functionality, but is generally not aware of the inner
+// workings of Metropolis, instead functioning in a generic manner. Once created
+// via New, it can be started and restarted arbitrarily, but the service object
+// itself must be long-lived.
 type Service struct {
-	config Config
-	dhcp   *dhcp4c.Client
+	dnsReg chan *dns.ExtraDirective
+	dnsSvc *dns.Service
+
+	// dhcp client for the 'main' interface of the node.
+	dhcp *dhcp4c.Client
 
 	// nftConn is a shared file descriptor handle to nftables, automatically initialized on first use.
 	nftConn             nftables.Conn
 	natTable            *nftables.Table
 	natPostroutingChain *nftables.Chain
 
-	// These are a temporary hack pending the removal of the GetIP interface
-	ipLock       sync.Mutex
-	currentIPTmp net.IP
-
-	logger logtree.LeveledLogger
+	status event.MemoryValue
 }
 
-type Config struct {
-	CorednsRegistrationChan chan *dns.ExtraDirective
-}
-
-func New(config Config) *Service {
+func New() *Service {
+	dnsReg := make(chan *dns.ExtraDirective)
+	dnsSvc := dns.New(dnsReg)
 	return &Service{
-		config: config,
+		dnsReg: dnsReg,
+		dnsSvc: dnsSvc,
 	}
 }
 
+// Status is the current network status of the host. It will be updated by the
+// network Service whenever the node's network configuration changes. Spurious
+// changes might occur, consumers should ensure that the change that occured is
+// meaningful to them.
+type Status struct {
+	ExternalAddress net.IP
+	DNSServers      dhcp4c.DNSServers
+}
+
+// Watcher allows network Service consumers to watch for updates of the current Status.
+type Watcher struct {
+	watcher event.Watcher
+}
+
+// Get returns the newest network Status from a Watcher. It will block until a
+// new Status is available.
+func (w *Watcher) Get(ctx context.Context) (*Status, error) {
+	val, err := w.watcher.Get(ctx)
+	if err != nil {
+		return nil, err
+	}
+	status := val.(Status)
+	return &status, err
+}
+
+func (w *Watcher) Close() error {
+	return w.watcher.Close()
+}
+
+// Watch returns a Watcher, which can be used by consumers of the network
+// Service to retrieve the current network status.
+// Close must be called on the Watcher when it is not used anymore in order to
+// prevent goroutine leaks.
+func (s *Service) Watch() Watcher {
+	return Watcher{
+		watcher: s.status.Watch(),
+	}
+}
+
+// ConfigureDNS sets a DNS ExtraDirective on the built-in DNS server of the
+// network Service. See //metropolis/node/core/network/dns for more
+// information.
+func (s *Service) ConfigureDNS(d *dns.ExtraDirective) {
+	s.dnsReg <- d
+}
+
 // nfifname converts an interface name into 16 bytes padded with zeroes (for nftables)
 func nfifname(n string) []byte {
 	b := make([]byte, 16)
@@ -77,24 +118,22 @@
 	return b
 }
 
-func (s *Service) dhcpDNSCallback(old, new *dhcp4c.Lease) error {
+// statusCallback is the main DHCP client callback connecting updates to the
+// current lease to the rest of Metropolis. It updates the DNS service's
+// configuration to use the received upstream servers, and notifies the rest of
+// Metropolis via en event value that the network configuration has changed.
+func (s *Service) statusCallback(old, new *dhcp4c.Lease) error {
+	// Reconfigure DNS if needed.
 	oldServers := old.DNSServers()
 	newServers := new.DNSServers()
-	if newServers.Equal(oldServers) {
-		return nil // nothing to do
+	if !newServers.Equal(oldServers) {
+		s.ConfigureDNS(dns.NewUpstreamDirective(newServers))
 	}
-	s.logger.Infof("Setting upstream DNS servers to %v", newServers)
-	s.config.CorednsRegistrationChan <- dns.NewUpstreamDirective(newServers)
-	return nil
-}
-
-// TODO(lorenz): Get rid of this once we have robust node resolution
-func (s *Service) getIPCallbackHack(old, new *dhcp4c.Lease) error {
-	if old == nil && new != nil {
-		s.ipLock.Lock()
-		s.currentIPTmp = new.AssignedIP
-		s.ipLock.Unlock()
-	}
+	// Notify status waiters.
+	s.status.Set(Status{
+		ExternalAddress: new.AssignedIP,
+		DNSServers:      new.DNSServers(),
+	})
 	return nil
 }
 
@@ -109,7 +148,7 @@
 	}
 	s.dhcp.VendorClassIdentifier = "dev.monogon.metropolis.node.v1"
 	s.dhcp.RequestedOptions = []dhcpv4.OptionCode{dhcpv4.OptionRouter, dhcpv4.OptionNameServer}
-	s.dhcp.LeaseCallback = dhcpcb.Compose(dhcpcb.ManageIP(iface), dhcpcb.ManageDefaultRoute(iface), s.dhcpDNSCallback, s.getIPCallbackHack)
+	s.dhcp.LeaseCallback = dhcpcb.Compose(dhcpcb.ManageIP(iface), dhcpcb.ManageDefaultRoute(iface), s.statusCallback)
 	err = supervisor.Run(ctx, "dhcp", s.dhcp.Run)
 	if err != nil {
 		return err
@@ -137,28 +176,6 @@
 	return nil
 }
 
-// GetIP returns the current IP (and optionally waits for one to be assigned)
-func (s *Service) GetIP(ctx context.Context, wait bool) (*net.IP, error) {
-	for {
-		var currentIP net.IP
-		s.ipLock.Lock()
-		currentIP = s.currentIPTmp
-		s.ipLock.Unlock()
-		if currentIP == nil {
-			if !wait {
-				return nil, errors.New("no IP available")
-			}
-			select {
-			case <-ctx.Done():
-				return nil, ctx.Err()
-			case <-time.After(1 * time.Second):
-				continue
-			}
-		}
-		return &currentIP, nil
-	}
-}
-
 // sysctlOptions contains sysctl options to apply
 type sysctlOptions map[string]string
 
@@ -181,8 +198,7 @@
 
 func (s *Service) Run(ctx context.Context) error {
 	logger := supervisor.Logger(ctx)
-	dnsSvc := dns.New(s.config.CorednsRegistrationChan)
-	supervisor.Run(ctx, "dns", dnsSvc.Run)
+	supervisor.Run(ctx, "dns", s.dnsSvc.Run)
 	supervisor.Run(ctx, "interfaces", s.runInterfaces)
 
 	s.natTable = s.nftConn.AddTable(&nftables.Table{
@@ -228,12 +244,12 @@
 }
 
 func (s *Service) runInterfaces(ctx context.Context) error {
-	s.logger = supervisor.Logger(ctx)
-	s.logger.Info("Starting network interface management")
+	logger := supervisor.Logger(ctx)
+	logger.Info("Starting network interface management")
 
 	links, err := netlink.LinkList()
 	if err != nil {
-		s.logger.Fatalf("Failed to list network links: %s", err)
+		logger.Fatalf("Failed to list network links: %s", err)
 	}
 
 	var ethernetLinks []netlink.Link
@@ -246,16 +262,16 @@
 				}
 				ethernetLinks = append(ethernetLinks, link)
 			} else {
-				s.logger.Infof("Ignoring non-Ethernet interface %s", attrs.Name)
+				logger.Infof("Ignoring non-Ethernet interface %s", attrs.Name)
 			}
 		} else if link.Attrs().Name == "lo" {
 			if err := netlink.LinkSetUp(link); err != nil {
-				s.logger.Errorf("Failed to bring up loopback interface: %v", err)
+				logger.Errorf("Failed to bring up loopback interface: %v", err)
 			}
 		}
 	}
 	if len(ethernetLinks) != 1 {
-		s.logger.Warningf("Network service needs exactly one link, bailing")
+		logger.Warningf("Network service needs exactly one link, bailing")
 	} else {
 		link := ethernetLinks[0]
 		if err := s.useInterface(ctx, link); err != nil {
diff --git a/metropolis/node/kubernetes/BUILD.bazel b/metropolis/node/kubernetes/BUILD.bazel
index 1ab4c52..3ffe7a9 100644
--- a/metropolis/node/kubernetes/BUILD.bazel
+++ b/metropolis/node/kubernetes/BUILD.bazel
@@ -16,6 +16,7 @@
     deps = [
         "//metropolis/node:go_default_library",
         "//metropolis/node/core/localstorage:go_default_library",
+        "//metropolis/node/core/network:go_default_library",
         "//metropolis/node/core/network/dns:go_default_library",
         "//metropolis/node/kubernetes/clusternet:go_default_library",
         "//metropolis/node/kubernetes/nfproxy:go_default_library",
diff --git a/metropolis/node/kubernetes/service.go b/metropolis/node/kubernetes/service.go
index bd0d211..5c8b037 100644
--- a/metropolis/node/kubernetes/service.go
+++ b/metropolis/node/kubernetes/service.go
@@ -30,6 +30,7 @@
 	"k8s.io/client-go/tools/clientcmd"
 
 	"source.monogon.dev/metropolis/node/core/localstorage"
+	"source.monogon.dev/metropolis/node/core/network"
 	"source.monogon.dev/metropolis/node/core/network/dns"
 	"source.monogon.dev/metropolis/node/kubernetes/clusternet"
 	"source.monogon.dev/metropolis/node/kubernetes/nfproxy"
@@ -41,13 +42,12 @@
 )
 
 type Config struct {
-	AdvertiseAddress net.IP
-	ServiceIPRange   net.IPNet
-	ClusterNet       net.IPNet
+	ServiceIPRange net.IPNet
+	ClusterNet     net.IPNet
 
-	KPKI                    *pki.PKI
-	Root                    *localstorage.Root
-	CorednsRegistrationChan chan *dns.ExtraDirective
+	KPKI    *pki.PKI
+	Root    *localstorage.Root
+	Network *network.Service
 }
 
 type Service struct {
@@ -95,22 +95,58 @@
 		return fmt.Errorf("failed to get hostname: %w", err)
 	}
 
-	dnsHostIP := s.c.AdvertiseAddress // TODO: Which IP to use
+	// Sub-runnable which starts all parts of Kubernetes that depend on the
+	// machine's external IP address. If it changes, the runnable will exit.
+	// TODO(q3k): test this
+	supervisor.Run(ctx, "networked", func(ctx context.Context) error {
+		networkWatch := s.c.Network.Watch()
+		defer networkWatch.Close()
 
-	apiserver := &apiserverService{
-		KPKI:                        s.c.KPKI,
-		AdvertiseAddress:            s.c.AdvertiseAddress,
-		ServiceIPRange:              s.c.ServiceIPRange,
-		EphemeralConsensusDirectory: &s.c.Root.Ephemeral.Consensus,
-	}
+		var status *network.Status
 
-	kubelet := kubeletService{
-		NodeName:           hostname,
-		ClusterDNS:         []net.IP{dnsHostIP},
-		KubeletDirectory:   &s.c.Root.Data.Kubernetes.Kubelet,
-		EphemeralDirectory: &s.c.Root.Ephemeral,
-		KPKI:               s.c.KPKI,
-	}
+		supervisor.Logger(ctx).Info("Waiting for node networking...")
+		for status == nil || status.ExternalAddress == nil {
+			status, err = networkWatch.Get(ctx)
+			if err != nil {
+				return fmt.Errorf("failed to get network status: %w", err)
+			}
+		}
+		address := status.ExternalAddress
+		supervisor.Logger(ctx).Info("Node has active networking, starting apiserver/kubelet")
+
+		apiserver := &apiserverService{
+			KPKI:                        s.c.KPKI,
+			AdvertiseAddress:            address,
+			ServiceIPRange:              s.c.ServiceIPRange,
+			EphemeralConsensusDirectory: &s.c.Root.Ephemeral.Consensus,
+		}
+
+		kubelet := kubeletService{
+			NodeName:           hostname,
+			ClusterDNS:         []net.IP{address},
+			KubeletDirectory:   &s.c.Root.Data.Kubernetes.Kubelet,
+			EphemeralDirectory: &s.c.Root.Ephemeral,
+			KPKI:               s.c.KPKI,
+		}
+
+		err := supervisor.RunGroup(ctx, map[string]supervisor.Runnable{
+			"apiserver": apiserver.Run,
+			"kubelet":   kubelet.Run,
+		})
+		if err != nil {
+			return fmt.Errorf("when starting apiserver/kubelet: %w", err)
+		}
+
+		supervisor.Signal(ctx, supervisor.SignalHealthy)
+
+		for status.ExternalAddress.Equal(address) {
+			status, err = networkWatch.Get(ctx)
+			if err != nil {
+				return fmt.Errorf("when watching for network changes: %w", err)
+			}
+		}
+		return fmt.Errorf("network configuration changed (%s -> %s)", address.String(), status.ExternalAddress.String())
+	})
 
 	csiPlugin := csiPluginServer{
 		KubeletDirectory: &s.c.Root.Data.Kubernetes.Kubelet,
@@ -145,10 +181,8 @@
 		name     string
 		runnable supervisor.Runnable
 	}{
-		{"apiserver", apiserver.Run},
 		{"controller-manager", runControllerManager(*controllerManagerConfig)},
 		{"scheduler", runScheduler(*schedulerConfig)},
-		{"kubelet", kubelet.Run},
 		{"reconciler", reconciler.Run(clientSet)},
 		{"csi-plugin", csiPlugin.Run},
 		{"csi-provisioner", csiProvisioner.Run},
@@ -164,11 +198,11 @@
 
 	supervisor.Logger(ctx).Info("Registering K8s CoreDNS")
 	clusterDNSDirective := dns.NewKubernetesDirective("cluster.local", masterKubeconfig)
-	s.c.CorednsRegistrationChan <- clusterDNSDirective
+	s.c.Network.ConfigureDNS(clusterDNSDirective)
 
 	supervisor.Signal(ctx, supervisor.SignalHealthy)
 	<-ctx.Done()
-	s.c.CorednsRegistrationChan <- dns.CancelDirective(clusterDNSDirective)
+	s.c.Network.ConfigureDNS(dns.CancelDirective(clusterDNSDirective))
 	return nil
 }