metropolis/node: use Event Value for network status
This moves over the GetIP API to use our fancy new event/value library.
The consumers of this data are currently the cluster manager and the
kubernetes root service. Both are migrated over.
Test Plan: Refactor, covered by E2E tests.
X-Origin-Diff: phab/D711
GitOrigin-RevId: 8a1e0dd35236d55492722f4439323cb2ee9574fc
diff --git a/metropolis/node/kubernetes/service.go b/metropolis/node/kubernetes/service.go
index bd0d211..5c8b037 100644
--- a/metropolis/node/kubernetes/service.go
+++ b/metropolis/node/kubernetes/service.go
@@ -30,6 +30,7 @@
"k8s.io/client-go/tools/clientcmd"
"source.monogon.dev/metropolis/node/core/localstorage"
+ "source.monogon.dev/metropolis/node/core/network"
"source.monogon.dev/metropolis/node/core/network/dns"
"source.monogon.dev/metropolis/node/kubernetes/clusternet"
"source.monogon.dev/metropolis/node/kubernetes/nfproxy"
@@ -41,13 +42,12 @@
)
type Config struct {
- AdvertiseAddress net.IP
- ServiceIPRange net.IPNet
- ClusterNet net.IPNet
+ ServiceIPRange net.IPNet
+ ClusterNet net.IPNet
- KPKI *pki.PKI
- Root *localstorage.Root
- CorednsRegistrationChan chan *dns.ExtraDirective
+ KPKI *pki.PKI
+ Root *localstorage.Root
+ Network *network.Service
}
type Service struct {
@@ -95,22 +95,58 @@
return fmt.Errorf("failed to get hostname: %w", err)
}
- dnsHostIP := s.c.AdvertiseAddress // TODO: Which IP to use
+ // Sub-runnable which starts all parts of Kubernetes that depend on the
+ // machine's external IP address. If it changes, the runnable will exit.
+ // TODO(q3k): test this
+ supervisor.Run(ctx, "networked", func(ctx context.Context) error {
+ networkWatch := s.c.Network.Watch()
+ defer networkWatch.Close()
- apiserver := &apiserverService{
- KPKI: s.c.KPKI,
- AdvertiseAddress: s.c.AdvertiseAddress,
- ServiceIPRange: s.c.ServiceIPRange,
- EphemeralConsensusDirectory: &s.c.Root.Ephemeral.Consensus,
- }
+ var status *network.Status
- kubelet := kubeletService{
- NodeName: hostname,
- ClusterDNS: []net.IP{dnsHostIP},
- KubeletDirectory: &s.c.Root.Data.Kubernetes.Kubelet,
- EphemeralDirectory: &s.c.Root.Ephemeral,
- KPKI: s.c.KPKI,
- }
+ supervisor.Logger(ctx).Info("Waiting for node networking...")
+ for status == nil || status.ExternalAddress == nil {
+ status, err = networkWatch.Get(ctx)
+ if err != nil {
+ return fmt.Errorf("failed to get network status: %w", err)
+ }
+ }
+ address := status.ExternalAddress
+ supervisor.Logger(ctx).Info("Node has active networking, starting apiserver/kubelet")
+
+ apiserver := &apiserverService{
+ KPKI: s.c.KPKI,
+ AdvertiseAddress: address,
+ ServiceIPRange: s.c.ServiceIPRange,
+ EphemeralConsensusDirectory: &s.c.Root.Ephemeral.Consensus,
+ }
+
+ kubelet := kubeletService{
+ NodeName: hostname,
+ ClusterDNS: []net.IP{address},
+ KubeletDirectory: &s.c.Root.Data.Kubernetes.Kubelet,
+ EphemeralDirectory: &s.c.Root.Ephemeral,
+ KPKI: s.c.KPKI,
+ }
+
+ err := supervisor.RunGroup(ctx, map[string]supervisor.Runnable{
+ "apiserver": apiserver.Run,
+ "kubelet": kubelet.Run,
+ })
+ if err != nil {
+ return fmt.Errorf("when starting apiserver/kubelet: %w", err)
+ }
+
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+
+ for status.ExternalAddress.Equal(address) {
+ status, err = networkWatch.Get(ctx)
+ if err != nil {
+ return fmt.Errorf("when watching for network changes: %w", err)
+ }
+ }
+ return fmt.Errorf("network configuration changed (%s -> %s)", address.String(), status.ExternalAddress.String())
+ })
csiPlugin := csiPluginServer{
KubeletDirectory: &s.c.Root.Data.Kubernetes.Kubelet,
@@ -145,10 +181,8 @@
name string
runnable supervisor.Runnable
}{
- {"apiserver", apiserver.Run},
{"controller-manager", runControllerManager(*controllerManagerConfig)},
{"scheduler", runScheduler(*schedulerConfig)},
- {"kubelet", kubelet.Run},
{"reconciler", reconciler.Run(clientSet)},
{"csi-plugin", csiPlugin.Run},
{"csi-provisioner", csiProvisioner.Run},
@@ -164,11 +198,11 @@
supervisor.Logger(ctx).Info("Registering K8s CoreDNS")
clusterDNSDirective := dns.NewKubernetesDirective("cluster.local", masterKubeconfig)
- s.c.CorednsRegistrationChan <- clusterDNSDirective
+ s.c.Network.ConfigureDNS(clusterDNSDirective)
supervisor.Signal(ctx, supervisor.SignalHealthy)
<-ctx.Done()
- s.c.CorednsRegistrationChan <- dns.CancelDirective(clusterDNSDirective)
+ s.c.Network.ConfigureDNS(dns.CancelDirective(clusterDNSDirective))
return nil
}