metropolis/node: use Event Value for network status
This moves over the GetIP API to use our fancy new event/value library.
The consumers of this data are currently the cluster manager and the
kubernetes root service. Both are migrated over.
Test Plan: Refactor, covered by E2E tests.
X-Origin-Diff: phab/D711
GitOrigin-RevId: 8a1e0dd35236d55492722f4439323cb2ee9574fc
diff --git a/metropolis/node/core/BUILD.bazel b/metropolis/node/core/BUILD.bazel
index e7e6179..24200d8 100644
--- a/metropolis/node/core/BUILD.bazel
+++ b/metropolis/node/core/BUILD.bazel
@@ -19,7 +19,6 @@
"//metropolis/node/core/localstorage:go_default_library",
"//metropolis/node/core/localstorage/declarative:go_default_library",
"//metropolis/node/core/network:go_default_library",
- "//metropolis/node/core/network/dns:go_default_library",
"//metropolis/node/kubernetes:go_default_library",
"//metropolis/node/kubernetes/containerd:go_default_library",
"//metropolis/node/kubernetes/pki:go_default_library",
diff --git a/metropolis/node/core/cluster/manager.go b/metropolis/node/core/cluster/manager.go
index 6d6d592..6c85fb3 100644
--- a/metropolis/node/core/cluster/manager.go
+++ b/metropolis/node/core/cluster/manager.go
@@ -291,10 +291,15 @@
func (m *Manager) stateCreatingCluster(ctx context.Context) error {
logger := supervisor.Logger(ctx)
logger.Info("Creating new cluster: waiting for IP address...")
- ip, err := m.networkService.GetIP(ctx, true)
+
+ // STOPGAP: bad use of watcher (should be long-term)
+ watcher := m.networkService.Watch()
+ defer watcher.Close()
+ data, err := watcher.Get(ctx)
if err != nil {
return fmt.Errorf("when getting IP address: %w", err)
}
+ ip := data.ExternalAddress
logger.Infof("Creating new cluster: got IP address %s", ip.String())
logger.Info("Creating new cluster: initializing storage...")
@@ -310,7 +315,7 @@
return fmt.Errorf("failed to create new node certificate: %w", err)
}
- node := NewNode(cuk, *ip, *cert.Leaf)
+ node := NewNode(cuk, ip, *cert.Leaf)
m.consensus = consensus.New(consensus.Config{
Data: &m.storageRoot.Data.Etcd,
diff --git a/metropolis/node/core/delve_enabled.go b/metropolis/node/core/delve_enabled.go
index dc17f59..24ea470 100644
--- a/metropolis/node/core/delve_enabled.go
+++ b/metropolis/node/core/delve_enabled.go
@@ -32,7 +32,11 @@
// This is intentionally delayed until network becomes available since Delve for some reason connects to itself
// and in early-boot no network interface is available to do that through. Also external access isn't possible
// early on anyways.
- networkSvc.GetIP(context.Background(), true)
+ watcher := networkSvc.Watch()
+ _, err := watcher.Get(context.Background())
+ if err != nil {
+ panic(err)
+ }
dlvCmd := exec.Command("/dlv", "--headless=true", fmt.Sprintf("--listen=:%v", node.DebuggerPort),
"--accept-multiclient", "--only-same-user=false", "attach", "--continue", "1", "/init")
if err := dlvCmd.Start(); err != nil {
diff --git a/metropolis/node/core/main.go b/metropolis/node/core/main.go
index 0f6ebd1..7c3f10c 100644
--- a/metropolis/node/core/main.go
+++ b/metropolis/node/core/main.go
@@ -37,7 +37,6 @@
"source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/node/core/localstorage/declarative"
"source.monogon.dev/metropolis/node/core/network"
- "source.monogon.dev/metropolis/node/core/network/dns"
"source.monogon.dev/metropolis/node/kubernetes"
"source.monogon.dev/metropolis/node/kubernetes/containerd"
"source.monogon.dev/metropolis/node/kubernetes/pki"
@@ -112,9 +111,7 @@
logger.Fatalf("Failed to initialize TPM 2.0: %v", err)
}
- corednsRegistrationChan := make(chan *dns.ExtraDirective)
-
- networkSvc := network.New(network.Config{CorednsRegistrationChan: corednsRegistrationChan})
+ networkSvc := network.New()
// This function initializes a headless Delve if this is a debug build or does nothing if it's not
initializeDebugger(networkSvc)
@@ -146,12 +143,6 @@
return fmt.Errorf("when starting network: %w", err)
}
- // Wait for IP address from network.
- ip, err := networkSvc.GetIP(ctx, true)
- if err != nil {
- return fmt.Errorf("when waiting for IP address: %w", err)
- }
-
// Start cluster manager. This kicks off cluster membership machinery, which will either start
// a new cluster, enroll into one or join one.
m := cluster.NewManager(root, networkSvc)
@@ -211,8 +202,7 @@
kubernetesConfig.KPKI = kpki
kubernetesConfig.Root = root
- kubernetesConfig.AdvertiseAddress = *ip
- kubernetesConfig.CorednsRegistrationChan = corednsRegistrationChan
+ kubernetesConfig.Network = networkSvc
kubeSvc = kubernetes.New(kubernetesConfig)
if err := supervisor.Run(ctx, "kubernetes", kubeSvc.Run); err != nil {
return fmt.Errorf("failed to start kubernetes service: %w", err)
diff --git a/metropolis/node/core/network/BUILD.bazel b/metropolis/node/core/network/BUILD.bazel
index d24e07c..10378c1 100644
--- a/metropolis/node/core/network/BUILD.bazel
+++ b/metropolis/node/core/network/BUILD.bazel
@@ -9,7 +9,7 @@
"//metropolis/node/core/network/dhcp4c:go_default_library",
"//metropolis/node/core/network/dhcp4c/callback:go_default_library",
"//metropolis/node/core/network/dns:go_default_library",
- "//metropolis/pkg/logtree:go_default_library",
+ "//metropolis/pkg/event:go_default_library",
"//metropolis/pkg/supervisor:go_default_library",
"@com_github_google_nftables//:go_default_library",
"@com_github_google_nftables//expr:go_default_library",
diff --git a/metropolis/node/core/network/main.go b/metropolis/node/core/network/main.go
index c23b85c..fe71634 100644
--- a/metropolis/node/core/network/main.go
+++ b/metropolis/node/core/network/main.go
@@ -18,14 +18,11 @@
import (
"context"
- "errors"
"fmt"
"net"
"os"
"path"
"strings"
- "sync"
- "time"
"github.com/google/nftables"
"github.com/google/nftables/expr"
@@ -35,41 +32,85 @@
"source.monogon.dev/metropolis/node/core/network/dhcp4c"
dhcpcb "source.monogon.dev/metropolis/node/core/network/dhcp4c/callback"
"source.monogon.dev/metropolis/node/core/network/dns"
- "source.monogon.dev/metropolis/pkg/logtree"
+ "source.monogon.dev/metropolis/pkg/event"
"source.monogon.dev/metropolis/pkg/supervisor"
)
-const (
- resolvConfPath = "/etc/resolv.conf"
- resolvConfSwapPath = "/etc/resolv.conf.new"
-)
-
+// Service is the network service for this node. It maintains all
+// networking-related functionality, but is generally not aware of the inner
+// workings of Metropolis, instead functioning in a generic manner. Once created
+// via New, it can be started and restarted arbitrarily, but the service object
+// itself must be long-lived.
type Service struct {
- config Config
- dhcp *dhcp4c.Client
+ dnsReg chan *dns.ExtraDirective
+ dnsSvc *dns.Service
+
+ // dhcp client for the 'main' interface of the node.
+ dhcp *dhcp4c.Client
// nftConn is a shared file descriptor handle to nftables, automatically initialized on first use.
nftConn nftables.Conn
natTable *nftables.Table
natPostroutingChain *nftables.Chain
- // These are a temporary hack pending the removal of the GetIP interface
- ipLock sync.Mutex
- currentIPTmp net.IP
-
- logger logtree.LeveledLogger
+ status event.MemoryValue
}
-type Config struct {
- CorednsRegistrationChan chan *dns.ExtraDirective
-}
-
-func New(config Config) *Service {
+func New() *Service {
+ dnsReg := make(chan *dns.ExtraDirective)
+ dnsSvc := dns.New(dnsReg)
return &Service{
- config: config,
+ dnsReg: dnsReg,
+ dnsSvc: dnsSvc,
}
}
+// Status is the current network status of the host. It will be updated by the
+// network Service whenever the node's network configuration changes. Spurious
+// changes might occur, consumers should ensure that the change that occured is
+// meaningful to them.
+type Status struct {
+ ExternalAddress net.IP
+ DNSServers dhcp4c.DNSServers
+}
+
+// Watcher allows network Service consumers to watch for updates of the current Status.
+type Watcher struct {
+ watcher event.Watcher
+}
+
+// Get returns the newest network Status from a Watcher. It will block until a
+// new Status is available.
+func (w *Watcher) Get(ctx context.Context) (*Status, error) {
+ val, err := w.watcher.Get(ctx)
+ if err != nil {
+ return nil, err
+ }
+ status := val.(Status)
+ return &status, err
+}
+
+func (w *Watcher) Close() error {
+ return w.watcher.Close()
+}
+
+// Watch returns a Watcher, which can be used by consumers of the network
+// Service to retrieve the current network status.
+// Close must be called on the Watcher when it is not used anymore in order to
+// prevent goroutine leaks.
+func (s *Service) Watch() Watcher {
+ return Watcher{
+ watcher: s.status.Watch(),
+ }
+}
+
+// ConfigureDNS sets a DNS ExtraDirective on the built-in DNS server of the
+// network Service. See //metropolis/node/core/network/dns for more
+// information.
+func (s *Service) ConfigureDNS(d *dns.ExtraDirective) {
+ s.dnsReg <- d
+}
+
// nfifname converts an interface name into 16 bytes padded with zeroes (for nftables)
func nfifname(n string) []byte {
b := make([]byte, 16)
@@ -77,24 +118,22 @@
return b
}
-func (s *Service) dhcpDNSCallback(old, new *dhcp4c.Lease) error {
+// statusCallback is the main DHCP client callback connecting updates to the
+// current lease to the rest of Metropolis. It updates the DNS service's
+// configuration to use the received upstream servers, and notifies the rest of
+// Metropolis via en event value that the network configuration has changed.
+func (s *Service) statusCallback(old, new *dhcp4c.Lease) error {
+ // Reconfigure DNS if needed.
oldServers := old.DNSServers()
newServers := new.DNSServers()
- if newServers.Equal(oldServers) {
- return nil // nothing to do
+ if !newServers.Equal(oldServers) {
+ s.ConfigureDNS(dns.NewUpstreamDirective(newServers))
}
- s.logger.Infof("Setting upstream DNS servers to %v", newServers)
- s.config.CorednsRegistrationChan <- dns.NewUpstreamDirective(newServers)
- return nil
-}
-
-// TODO(lorenz): Get rid of this once we have robust node resolution
-func (s *Service) getIPCallbackHack(old, new *dhcp4c.Lease) error {
- if old == nil && new != nil {
- s.ipLock.Lock()
- s.currentIPTmp = new.AssignedIP
- s.ipLock.Unlock()
- }
+ // Notify status waiters.
+ s.status.Set(Status{
+ ExternalAddress: new.AssignedIP,
+ DNSServers: new.DNSServers(),
+ })
return nil
}
@@ -109,7 +148,7 @@
}
s.dhcp.VendorClassIdentifier = "dev.monogon.metropolis.node.v1"
s.dhcp.RequestedOptions = []dhcpv4.OptionCode{dhcpv4.OptionRouter, dhcpv4.OptionNameServer}
- s.dhcp.LeaseCallback = dhcpcb.Compose(dhcpcb.ManageIP(iface), dhcpcb.ManageDefaultRoute(iface), s.dhcpDNSCallback, s.getIPCallbackHack)
+ s.dhcp.LeaseCallback = dhcpcb.Compose(dhcpcb.ManageIP(iface), dhcpcb.ManageDefaultRoute(iface), s.statusCallback)
err = supervisor.Run(ctx, "dhcp", s.dhcp.Run)
if err != nil {
return err
@@ -137,28 +176,6 @@
return nil
}
-// GetIP returns the current IP (and optionally waits for one to be assigned)
-func (s *Service) GetIP(ctx context.Context, wait bool) (*net.IP, error) {
- for {
- var currentIP net.IP
- s.ipLock.Lock()
- currentIP = s.currentIPTmp
- s.ipLock.Unlock()
- if currentIP == nil {
- if !wait {
- return nil, errors.New("no IP available")
- }
- select {
- case <-ctx.Done():
- return nil, ctx.Err()
- case <-time.After(1 * time.Second):
- continue
- }
- }
- return ¤tIP, nil
- }
-}
-
// sysctlOptions contains sysctl options to apply
type sysctlOptions map[string]string
@@ -181,8 +198,7 @@
func (s *Service) Run(ctx context.Context) error {
logger := supervisor.Logger(ctx)
- dnsSvc := dns.New(s.config.CorednsRegistrationChan)
- supervisor.Run(ctx, "dns", dnsSvc.Run)
+ supervisor.Run(ctx, "dns", s.dnsSvc.Run)
supervisor.Run(ctx, "interfaces", s.runInterfaces)
s.natTable = s.nftConn.AddTable(&nftables.Table{
@@ -228,12 +244,12 @@
}
func (s *Service) runInterfaces(ctx context.Context) error {
- s.logger = supervisor.Logger(ctx)
- s.logger.Info("Starting network interface management")
+ logger := supervisor.Logger(ctx)
+ logger.Info("Starting network interface management")
links, err := netlink.LinkList()
if err != nil {
- s.logger.Fatalf("Failed to list network links: %s", err)
+ logger.Fatalf("Failed to list network links: %s", err)
}
var ethernetLinks []netlink.Link
@@ -246,16 +262,16 @@
}
ethernetLinks = append(ethernetLinks, link)
} else {
- s.logger.Infof("Ignoring non-Ethernet interface %s", attrs.Name)
+ logger.Infof("Ignoring non-Ethernet interface %s", attrs.Name)
}
} else if link.Attrs().Name == "lo" {
if err := netlink.LinkSetUp(link); err != nil {
- s.logger.Errorf("Failed to bring up loopback interface: %v", err)
+ logger.Errorf("Failed to bring up loopback interface: %v", err)
}
}
}
if len(ethernetLinks) != 1 {
- s.logger.Warningf("Network service needs exactly one link, bailing")
+ logger.Warningf("Network service needs exactly one link, bailing")
} else {
link := ethernetLinks[0]
if err := s.useInterface(ctx, link); err != nil {