metropolis/node/kubernetes: move worker services to KubernetesWorker nodes

This finalizes the Big Split. After this change, nodes will only run a
kubelet (and related services) if they have a KubernetesWorker role
attached.

The first node in a new cluster now starts out with KubernetesController
and ConsensusMember. All joined nodes start with no roles attached.

Change-Id: I25a059318450b7d2dd3c19f3653fc15367867693
Reviewed-on: https://review.monogon.dev/c/monogon/+/1380
Tested-by: Jenkins CI
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/cli/metroctl/test/test.go b/metropolis/cli/metroctl/test/test.go
index 872f7b3..064390b 100644
--- a/metropolis/cli/metroctl/test/test.go
+++ b/metropolis/cli/metroctl/test/test.go
@@ -280,44 +280,49 @@
 		})
 	})
 	t.Run("set/unset role", func(t *testing.T) {
-		util.TestEventual(t, "metroctl set/unset role KubernetesController", ctx, 10*time.Second, func(ctx context.Context) error {
+		util.TestEventual(t, "metroctl set/unset role KubernetesWorker", ctx, 10*time.Second, func(ctx context.Context) error {
 			nid := cl.NodeIDs[1]
 			naddr := cl.Nodes[nid].ManagementAddress
 
 			// In this test we'll unset a node role, make sure that it's been in fact
 			// unset, then set it again, and check again. This exercises commands of
-			// the form "metroctl set/unset role KubernetesController [NodeID, ...]".
+			// the form "metroctl set/unset role KubernetesWorker [NodeID, ...]".
 
-			// Check that KubernetesController role is set initially.
+			// Check that KubernetesWorker role is absent initially.
 			var describeArgs []string
 			describeArgs = append(describeArgs, commonOpts...)
 			describeArgs = append(describeArgs, endpointOpts...)
 			describeArgs = append(describeArgs, "node", "describe", "--filter", fmt.Sprintf("node.status.external_address==\"%s\"", naddr))
-			if err := mctlFailIfMissing(t, ctx, describeArgs, "KubernetesController"); err != nil {
+			if err := mctlFailIfFound(t, ctx, describeArgs, "KubernetesWorker"); err != nil {
 				return err
 			}
-			// Remove the role.
+			// Add the role.
+			var setArgs []string
+			setArgs = append(setArgs, commonOpts...)
+			setArgs = append(setArgs, endpointOpts...)
+			setArgs = append(setArgs, "node", "add", "role", "KubernetesWorker", nid)
+			if err := mctlRun(t, ctx, setArgs); err != nil {
+				return err
+			}
+			// Check that the role is set.
+			if err := mctlFailIfMissing(t, ctx, describeArgs, "KubernetesWorker"); err != nil {
+				return err
+			}
+
+			// Remove the role back to the initial value.
 			var unsetArgs []string
 			unsetArgs = append(unsetArgs, commonOpts...)
 			unsetArgs = append(unsetArgs, endpointOpts...)
-			unsetArgs = append(unsetArgs, "node", "remove", "role", "KubernetesController", nid)
+			unsetArgs = append(unsetArgs, "node", "remove", "role", "KubernetesWorker", nid)
 			if err := mctlRun(t, ctx, unsetArgs); err != nil {
 				return err
 			}
 			// Check that the role is unset.
-			if err := mctlFailIfFound(t, ctx, describeArgs, "KubernetesController"); err != nil {
+			if err := mctlFailIfFound(t, ctx, describeArgs, "KubernetesWorker"); err != nil {
 				return err
 			}
-			// Set the role back to the initial value.
-			var setArgs []string
-			setArgs = append(setArgs, commonOpts...)
-			setArgs = append(setArgs, endpointOpts...)
-			setArgs = append(setArgs, "node", "add", "role", "KubernetesController", nid)
-			if err := mctlRun(t, ctx, setArgs); err != nil {
-				return err
-			}
-			// Chack that the role is set.
-			return mctlFailIfMissing(t, ctx, describeArgs, "KubernetesController")
+
+			return nil
 		})
 	})
 }
diff --git a/metropolis/node/core/curator/impl_leader_certificates.go b/metropolis/node/core/curator/impl_leader_certificates.go
index 9a3a427..73e28a9 100644
--- a/metropolis/node/core/curator/impl_leader_certificates.go
+++ b/metropolis/node/core/curator/impl_leader_certificates.go
@@ -23,7 +23,10 @@
 		return nil, status.Error(codes.InvalidArgument, "kubelet pubkey must be set and valid")
 	}
 	if len(req.CsiProvisionerPubkey) != ed25519.PublicKeySize {
-		return nil, status.Error(codes.InvalidArgument, "worker services pubkey must be set and valid")
+		return nil, status.Error(codes.InvalidArgument, "CSI provisioner pubkey must be set and valid")
+	}
+	if len(req.NetservicesPubkey) != ed25519.PublicKeySize {
+		return nil, status.Error(codes.InvalidArgument, "network services pubkey must be set and valid")
 	}
 
 	kubeletServer, kubeletClient, err := kp.Kubelet(ctx, nodeID, req.KubeletPubkey)
@@ -50,6 +53,16 @@
 		return nil, status.Errorf(codes.Unavailable, "could not ensure CSI provisioner client certificate: %v", err)
 	}
 
+	netservClient, err := kp.NetServices(ctx, nodeID, req.NetservicesPubkey)
+	if err != nil {
+		return nil, status.Errorf(codes.Unavailable, "could not generate netservices client certificates: %v", err)
+	}
+
+	netservClientCert, err := netservClient.Ensure(ctx, kp.KV)
+	if err != nil {
+		return nil, status.Errorf(codes.Unavailable, "could not ensure netservices client certificate: %v", err)
+	}
+
 	return &ipb.IssueCertificateResponse{
 		Kind: &ipb.IssueCertificateResponse_KubernetesWorker_{
 			KubernetesWorker: &ipb.IssueCertificateResponse_KubernetesWorker{
@@ -57,6 +70,7 @@
 				KubeletServerCertificate:  kubeletServerCert,
 				KubeletClientCertificate:  kubeletClientCert,
 				CsiProvisionerCertificate: csiClientCert,
+				NetservicesCertificate:    netservClientCert,
 			},
 		},
 	}, nil
diff --git a/metropolis/node/core/curator/impl_leader_curator.go b/metropolis/node/core/curator/impl_leader_curator.go
index f25b95b..fe753f2 100644
--- a/metropolis/node/core/curator/impl_leader_curator.go
+++ b/metropolis/node/core/curator/impl_leader_curator.go
@@ -15,7 +15,6 @@
 	tpb "google.golang.org/protobuf/types/known/timestamppb"
 
 	common "source.monogon.dev/metropolis/node"
-	"source.monogon.dev/metropolis/node/core/consensus"
 	ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
 	"source.monogon.dev/metropolis/node/core/identity"
 	"source.monogon.dev/metropolis/node/core/rpc"
@@ -428,22 +427,8 @@
 		return nil, status.Errorf(codes.Unavailable, "could not emit node credentials: %v", err)
 	}
 
-	w := l.consensus.Watch()
-	defer w.Close()
-	st, err := w.Get(ctx, consensus.FilterRunning)
-	if err != nil {
-		return nil, status.Errorf(codes.Unavailable, "could not get running consensus: %v", err)
-	}
-
-	join, err := st.AddNode(ctx, node.pubkey)
-	if err != nil {
-		return nil, status.Errorf(codes.Unavailable, "could not add node: %v", err)
-	}
-
 	node.state = cpb.NodeState_NODE_STATE_UP
 	node.clusterUnlockKey = req.ClusterUnlockKey
-	node.EnableConsensusMember(join)
-	node.EnableKubernetesController()
 	if err := nodeSave(ctx, l.leadership, node); err != nil {
 		return nil, err
 	}
diff --git a/metropolis/node/core/curator/impl_leader_test.go b/metropolis/node/core/curator/impl_leader_test.go
index 5063fef..d245f6f 100644
--- a/metropolis/node/core/curator/impl_leader_test.go
+++ b/metropolis/node/core/curator/impl_leader_test.go
@@ -1209,6 +1209,7 @@
 	// Issue certificates for some random pubkey.
 	kpub, _, _ := ed25519.GenerateKey(rand.Reader)
 	cpub, _, _ := ed25519.GenerateKey(rand.Reader)
+	npub, _, _ := ed25519.GenerateKey(rand.Reader)
 
 	curator := ipb.NewCuratorClient(cl.localNodeConn)
 	res, err := curator.IssueCertificate(ctx, &ipb.IssueCertificateRequest{
@@ -1216,6 +1217,7 @@
 			KubernetesWorker: &ipb.IssueCertificateRequest_KubernetesWorker{
 				KubeletPubkey:        kpub,
 				CsiProvisionerPubkey: cpub,
+				NetservicesPubkey:    npub,
 			},
 		},
 	})
@@ -1240,6 +1242,10 @@
 	if err != nil {
 		t.Fatalf("Could not parse CSI provisiooner certificate: %v", err)
 	}
+	ncert, err := x509.ParseCertificate(kw.NetservicesCertificate)
+	if err != nil {
+		t.Fatalf("Could not parse network services certificate: %v", err)
+	}
 
 	if err := scert.CheckSignatureFrom(idca); err != nil {
 		t.Errorf("Server certificate not signed by IdCA: %v", err)
@@ -1250,6 +1256,9 @@
 	if err := pcert.CheckSignatureFrom(idca); err != nil {
 		t.Errorf("CSI provisioner certificate not signed by IdCA: %v", err)
 	}
+	if err := ncert.CheckSignatureFrom(idca); err != nil {
+		t.Errorf("Network services certificate not signed by IdCA: %v", err)
+	}
 	scertPubkey := scert.PublicKey.(ed25519.PublicKey)
 	if !bytes.Equal(scertPubkey, kpub) {
 		t.Errorf("Server certificate not emitted for requested key")
@@ -1262,6 +1271,10 @@
 	if !bytes.Equal(pcertPubkey, cpub) {
 		t.Errorf("CSI provisioner certificate not emitted for requested key")
 	}
+	ncertPubkey := ncert.PublicKey.(ed25519.PublicKey)
+	if !bytes.Equal(ncertPubkey, npub) {
+		t.Errorf("Network services certificate not emitted for requested key")
+	}
 
 	// Try issuing again for the same pubkeys. This should work.
 	_, err = curator.IssueCertificate(ctx, &ipb.IssueCertificateRequest{
@@ -1269,6 +1282,7 @@
 			KubernetesWorker: &ipb.IssueCertificateRequest_KubernetesWorker{
 				KubeletPubkey:        kpub,
 				CsiProvisionerPubkey: cpub,
+				NetservicesPubkey:    npub,
 			},
 		},
 	})
@@ -1279,12 +1293,14 @@
 	// Try issuing again for other pubkey. These should be rejected.
 	kpub2, _, _ := ed25519.GenerateKey(rand.Reader)
 	cpub2, _, _ := ed25519.GenerateKey(rand.Reader)
+	npub2, _, _ := ed25519.GenerateKey(rand.Reader)
 
 	_, err = curator.IssueCertificate(ctx, &ipb.IssueCertificateRequest{
 		Kind: &ipb.IssueCertificateRequest_KubernetesWorker_{
 			KubernetesWorker: &ipb.IssueCertificateRequest_KubernetesWorker{
 				KubeletPubkey:        kpub2,
 				CsiProvisionerPubkey: cpub,
+				NetservicesPubkey:    npub,
 			},
 		},
 	})
@@ -1296,6 +1312,19 @@
 			KubernetesWorker: &ipb.IssueCertificateRequest_KubernetesWorker{
 				KubeletPubkey:        kpub,
 				CsiProvisionerPubkey: cpub2,
+				NetservicesPubkey:    npub,
+			},
+		},
+	})
+	if err == nil {
+		t.Errorf("Certificate has been issued again for a different pubkey")
+	}
+	_, err = curator.IssueCertificate(ctx, &ipb.IssueCertificateRequest{
+		Kind: &ipb.IssueCertificateRequest_KubernetesWorker_{
+			KubernetesWorker: &ipb.IssueCertificateRequest_KubernetesWorker{
+				KubeletPubkey:        kpub,
+				CsiProvisionerPubkey: cpub,
+				NetservicesPubkey:    npub2,
 			},
 		},
 	})
diff --git a/metropolis/node/core/curator/proto/api/api.proto b/metropolis/node/core/curator/proto/api/api.proto
index a035b9d..e9ead1d 100644
--- a/metropolis/node/core/curator/proto/api/api.proto
+++ b/metropolis/node/core/curator/proto/api/api.proto
@@ -345,6 +345,8 @@
         bytes kubelet_pubkey = 1;
         // The ED25519 public key of the keypair that that will run the CSI provisioner.
         bytes csi_provisioner_pubkey = 2;
+        // The ED25519 public key of the keypair that will run nfproxy and clusternet.
+        bytes netservices_pubkey = 3;
     }
     oneof kind {
         KubernetesWorker kubernetes_worker = 1;
@@ -366,6 +368,9 @@
         // DER-encoded (but not PEM armored) certificate to be used by the CSI
         // provisioner when connecting to the api server.
         bytes csi_provisioner_certificate = 4;
+        // DER-encoded (but not PEM armored) certificate to be used by worker
+        // services nfproxy and clusternet when connecting to the apiserver.
+        bytes netservices_certificate = 5;
     }
     oneof kind {
         KubernetesWorker kubernetes_worker = 1;
diff --git a/metropolis/node/core/localstorage/directory_pki.go b/metropolis/node/core/localstorage/directory_pki.go
index 37fcdb6..8df1914 100644
--- a/metropolis/node/core/localstorage/directory_pki.go
+++ b/metropolis/node/core/localstorage/directory_pki.go
@@ -18,6 +18,7 @@
 
 import (
 	"crypto/ed25519"
+	"crypto/rand"
 	"crypto/x509"
 	"encoding/pem"
 	"errors"
@@ -64,6 +65,21 @@
 	return true, nil
 }
 
+// GeneratePrivateKey will generate an ED25519 private key for this PKIDirectory
+// if it doesn't yet exist.
+func (p *PKIDirectory) GeneratePrivateKey() error {
+	// Do nothing if key already exists.
+	_, err := p.Key.Read()
+	if err == nil {
+		return nil
+	}
+	_, priv, err := ed25519.GenerateKey(rand.Reader)
+	if err != nil {
+		return err
+	}
+	return p.WritePrivateKey(priv)
+}
+
 // WritePrivateKey serializes the given private key (PKCS8 + PEM) and writes it
 // to the PKIDirectory, overwriting whatever might already be present there.
 func (p *PKIDirectory) WritePrivateKey(key ed25519.PrivateKey) error {
diff --git a/metropolis/node/core/localstorage/storage.go b/metropolis/node/core/localstorage/storage.go
index 27ffd1d..a37ce8d 100644
--- a/metropolis/node/core/localstorage/storage.go
+++ b/metropolis/node/core/localstorage/storage.go
@@ -102,6 +102,8 @@
 type DataKubernetesDirectory struct {
 	declarative.Directory
 	ClusterNetworking DataKubernetesClusterNetworkingDirectory `dir:"clusternet"`
+	CSIProvisioner    DataKubernetesCSIProvisionerDirectory    `dir:"csiprovisioner"`
+	Netservices       DataKubernetesNetservicesDirectory       `dir:"netservices"`
 	Kubelet           DataKubernetesKubeletDirectory           `dir:"kubelet"`
 }
 
@@ -110,10 +112,19 @@
 	Key declarative.File `file:"private.key"`
 }
 
+type DataKubernetesCSIProvisionerDirectory struct {
+	declarative.Directory
+	PKI PKIDirectory `dir:"pki"`
+}
+
+type DataKubernetesNetservicesDirectory struct {
+	declarative.Directory
+	PKI PKIDirectory `dir:"pki"`
+}
+
 type DataKubernetesKubeletDirectory struct {
 	declarative.Directory
-	Kubeconfig declarative.File `file:"kubeconfig"`
-	PKI        PKIDirectory     `dir:"pki"`
+	PKI PKIDirectory `dir:"pki"`
 
 	DevicePlugins struct {
 		declarative.Directory
diff --git a/metropolis/node/core/roleserve/worker_kubernetes.go b/metropolis/node/core/roleserve/worker_kubernetes.go
index da2aa9f..d8c1b1f 100644
--- a/metropolis/node/core/roleserve/worker_kubernetes.go
+++ b/metropolis/node/core/roleserve/worker_kubernetes.go
@@ -161,14 +161,6 @@
 
 		supervisor.Logger(ctx).Infof("Got data, starting Kubernetes...")
 
-		// Start containerd.
-		containerdSvc := &containerd.Service{
-			EphemeralVolume: &s.storageRoot.Ephemeral.Containerd,
-		}
-		if err := supervisor.Run(ctx, "containerd", containerdSvc.Run); err != nil {
-			return fmt.Errorf("failed to start containerd service: %w", err)
-		}
-
 		controller := kubernetes.NewController(kubernetes.ConfigController{
 			Node:           &d.membership.credentials.Node,
 			ServiceIPRange: serviceIPRange,
@@ -177,7 +169,6 @@
 			KPKI:           pki,
 			Root:           s.storageRoot,
 			Network:        s.network,
-			PodNetwork:     s.podNetwork,
 		})
 		// Start Kubernetes.
 		if err := supervisor.Run(ctx, "run", controller.Run); err != nil {
@@ -250,6 +241,7 @@
 			Network:       s.network,
 			NodeID:        d.membership.NodeID(),
 			CuratorClient: ccli,
+			PodNetwork:    s.podNetwork,
 		})
 		// Start Kubernetes.
 		if err := supervisor.Run(ctx, "run", worker.Run); err != nil {
diff --git a/metropolis/node/kubernetes/BUILD.bazel b/metropolis/node/kubernetes/BUILD.bazel
index 1279cff..cbad367 100644
--- a/metropolis/node/kubernetes/BUILD.bazel
+++ b/metropolis/node/kubernetes/BUILD.bazel
@@ -36,7 +36,6 @@
         "//metropolis/pkg/fsquota",
         "//metropolis/pkg/logtree",
         "//metropolis/pkg/loop",
-        "//metropolis/pkg/pki",
         "//metropolis/pkg/supervisor",
         "//metropolis/proto/api",
         "@com_github_container_storage_interface_spec//lib/go/csi",
diff --git a/metropolis/node/kubernetes/kubelet.go b/metropolis/node/kubernetes/kubelet.go
index 7a0d362..e262534 100644
--- a/metropolis/node/kubernetes/kubelet.go
+++ b/metropolis/node/kubernetes/kubelet.go
@@ -18,61 +18,64 @@
 
 import (
 	"context"
+	"crypto/ed25519"
 	"encoding/json"
+	"encoding/pem"
 	"fmt"
-	"io"
 	"net"
 	"os/exec"
 
 	v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
 	kubeletconfig "k8s.io/kubelet/config/v1beta1"
 
+	ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
 	"source.monogon.dev/metropolis/node/core/localstorage"
 	"source.monogon.dev/metropolis/node/kubernetes/pki"
 	"source.monogon.dev/metropolis/node/kubernetes/reconciler"
 	"source.monogon.dev/metropolis/pkg/fileargs"
-	opki "source.monogon.dev/metropolis/pkg/pki"
 	"source.monogon.dev/metropolis/pkg/supervisor"
 )
 
 type kubeletService struct {
-	NodeName           string
 	ClusterDNS         []net.IP
 	ClusterDomain      string
 	KubeletDirectory   *localstorage.DataKubernetesKubeletDirectory
 	EphemeralDirectory *localstorage.EphemeralDirectory
-	Output             io.Writer
-	KPKI               *pki.PKI
 
-	mount               *opki.FilesystemCertificate
-	mountKubeconfigPath string
+	kubeconfig   []byte
+	serverCACert []byte
+	serverCert   []byte
 }
 
-func (s *kubeletService) createCertificates(ctx context.Context) error {
-	server, client, err := s.KPKI.VolatileKubelet(ctx, s.NodeName)
+func (s *kubeletService) getPubkey(ctx context.Context) (ed25519.PublicKey, error) {
+	// First make sure we have a local ED25519 private key, and generate one if not.
+	if err := s.KubeletDirectory.PKI.GeneratePrivateKey(); err != nil {
+		return nil, fmt.Errorf("failed to generate private key: %w", err)
+	}
+	priv, err := s.KubeletDirectory.PKI.ReadPrivateKey()
 	if err != nil {
-		return fmt.Errorf("when generating local kubelet credentials: %w", err)
+		return nil, fmt.Errorf("could not read keypair: %w", err)
+	}
+	pubkey := priv.Public().(ed25519.PublicKey)
+	return pubkey, nil
+}
+
+func (s *kubeletService) setCertificates(kw *ipb.IssueCertificateResponse_KubernetesWorker) error {
+	key, err := s.KubeletDirectory.PKI.ReadPrivateKey()
+	if err != nil {
+		return fmt.Errorf("could not read private key from disk: %w", err)
 	}
 
-	clientKubeconfig, err := pki.Kubeconfig(ctx, s.KPKI.KV, client, pki.KubernetesAPIEndpointForController)
+	s.kubeconfig, err = pki.KubeconfigRaw(kw.IdentityCaCertificate, kw.KubeletClientCertificate, key, pki.KubernetesAPIEndpointForWorker)
 	if err != nil {
 		return fmt.Errorf("when generating kubeconfig: %w", err)
 	}
-
-	// Use a single fileargs mount for server certificate and client kubeconfig.
-	mounted, err := server.Mount(ctx, s.KPKI.KV)
-	if err != nil {
-		return fmt.Errorf("could not mount kubelet cert dir: %w", err)
-	}
-	// mounted is closed by Run() on process exit.
-
-	s.mount = mounted
-	s.mountKubeconfigPath = mounted.ArgPath("kubeconfig", clientKubeconfig)
-
+	s.serverCACert = pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: kw.IdentityCaCertificate})
+	s.serverCert = pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: kw.KubeletServerCertificate})
 	return nil
 }
 
-func (s *kubeletService) configure() *kubeletconfig.KubeletConfiguration {
+func (s *kubeletService) configure(fargs *fileargs.FileArgs) *kubeletconfig.KubeletConfiguration {
 	var clusterDNS []string
 	for _, dnsIP := range s.ClusterDNS {
 		clusterDNS = append(clusterDNS, dnsIP.String())
@@ -83,13 +86,13 @@
 			Kind:       "KubeletConfiguration",
 			APIVersion: kubeletconfig.GroupName + "/v1beta1",
 		},
-		TLSCertFile:       s.mount.CertPath,
-		TLSPrivateKeyFile: s.mount.KeyPath,
+		TLSCertFile:       fargs.ArgPath("server.crt", s.serverCert),
+		TLSPrivateKeyFile: s.KubeletDirectory.PKI.Key.FullPath(),
 		TLSMinVersion:     "VersionTLS13",
 		ClusterDNS:        clusterDNS,
 		Authentication: kubeletconfig.KubeletAuthentication{
 			X509: kubeletconfig.KubeletX509Authentication{
-				ClientCAFile: s.mount.CACertPath,
+				ClientCAFile: fargs.ArgPath("ca.crt", s.serverCACert),
 			},
 		},
 		// TODO(q3k): move reconciler.False to a generic package, fix the following references.
@@ -111,24 +114,25 @@
 }
 
 func (s *kubeletService) Run(ctx context.Context) error {
-	if err := s.createCertificates(ctx); err != nil {
-		return fmt.Errorf("when creating certificates: %w", err)
-	}
-	defer s.mount.Close()
-
-	configRaw, err := json.Marshal(s.configure())
-	if err != nil {
-		return fmt.Errorf("when marshaling kubelet configuration: %w", err)
+	if len(s.serverCert) == 0 || len(s.serverCACert) == 0 || len(s.kubeconfig) == 0 {
+		return fmt.Errorf("setCertificates was not called")
 	}
 
 	fargs, err := fileargs.New()
 	if err != nil {
 		return err
 	}
+	defer fargs.Close()
+
+	configRaw, err := json.Marshal(s.configure(fargs))
+	if err != nil {
+		return fmt.Errorf("when marshaling kubelet configuration: %w", err)
+	}
+
 	cmd := exec.CommandContext(ctx, "/kubernetes/bin/kube", "kubelet",
 		fargs.FileOpt("--config", "config.json", configRaw),
 		fmt.Sprintf("--container-runtime-endpoint=unix://%s", s.EphemeralDirectory.Containerd.ClientSocket.FullPath()),
-		fmt.Sprintf("--kubeconfig=%s", s.mountKubeconfigPath),
+		fargs.FileOpt("--kubeconfig", "kubeconfig", s.kubeconfig),
 		fmt.Sprintf("--root-dir=%s", s.KubeletDirectory.FullPath()),
 	)
 	cmd.Env = []string{"PATH=/kubernetes/bin"}
diff --git a/metropolis/node/kubernetes/pki/kubernetes.go b/metropolis/node/kubernetes/pki/kubernetes.go
index dbebf73..ead8897 100644
--- a/metropolis/node/kubernetes/pki/kubernetes.go
+++ b/metropolis/node/kubernetes/pki/kubernetes.go
@@ -392,29 +392,25 @@
 	return client, nil
 }
 
-// VolatileKubelet returns a pair of server/client ceritficates for the Kubelet
-// to use. The certificates are ephemeral, meaning they are not stored in etcd,
-// and instead are regenerated any time this function is called.
-func (k *PKI) VolatileKubelet(ctx context.Context, name string) (server *opki.Certificate, client *opki.Certificate, err error) {
-	name = fmt.Sprintf("system:node:%s", name)
+// NetServices returns a certificate to be used by nfproxy and clusternet running
+// on a worker node.
+func (k *PKI) NetServices(ctx context.Context, name string, pubkey ed25519.PublicKey) (client *opki.Certificate, err error) {
+	name = fmt.Sprintf("metropolis:netservices:%s", name)
 	err = k.EnsureAll(ctx)
 	if err != nil {
-		return nil, nil, fmt.Errorf("could not ensure certificates exist: %w", err)
+		return nil, fmt.Errorf("could not ensure certificates exist: %w", err)
 	}
 	kubeCA := k.Certificates[IdCA]
-	server = &opki.Certificate{
-		Namespace: &k.namespace,
-		Issuer:    kubeCA,
-		Template:  opki.Server([]string{name}, nil),
-		Mode:      opki.CertificateEphemeral,
-	}
+	clientName := fmt.Sprintf("netservices-%s", name)
 	client = &opki.Certificate{
+		Name:      clientName,
 		Namespace: &k.namespace,
 		Issuer:    kubeCA,
-		Template:  opki.Client(name, []string{"system:nodes"}),
-		Mode:      opki.CertificateEphemeral,
+		Template:  opki.Client(name, []string{"metropolis:netservices"}),
+		Mode:      opki.CertificateExternal,
+		PublicKey: pubkey,
 	}
-	return server, client, nil
+	return client, nil
 }
 
 // VolatileClient returns a client certificate for Kubernetes clients to use.
diff --git a/metropolis/node/kubernetes/reconciler/resources_rbac.go b/metropolis/node/kubernetes/reconciler/resources_rbac.go
index 0976ba5..4eab82e 100644
--- a/metropolis/node/kubernetes/reconciler/resources_rbac.go
+++ b/metropolis/node/kubernetes/reconciler/resources_rbac.go
@@ -29,6 +29,10 @@
 	clusterRoleBindingDefaultPSP             = builtinRBACName("default-psp-for-sa")
 	clusterRoleBindingAPIServerKubeletClient = builtinRBACName("apiserver-kubelet-client")
 	clusterRoleBindingOwnerAdmin             = builtinRBACName("owner-admin")
+	clusterRoleCSIProvisioner                = builtinRBACName("csi-provisioner")
+	clusterRoleBindingCSIProvisioners        = builtinRBACName("csi-provisioner")
+	clusterRoleNetServices                   = builtinRBACName("netservices")
+	clusterRoleBindingNetServices            = builtinRBACName("netservices")
 )
 
 type resourceClusterRoles struct {
@@ -75,6 +79,53 @@
 				},
 			},
 		},
+		clusterRoleCSIProvisioner: &rbac.ClusterRole{
+			ObjectMeta: meta.ObjectMeta{
+				Name:   clusterRoleCSIProvisioner,
+				Labels: builtinLabels(nil),
+				Annotations: map[string]string{
+					"kubernetes.io/description": "This role grants access to PersistentVolumes, PersistentVolumeClaims and StorageClassses, as used the the CSI provisioner running on nodes.",
+				},
+			},
+			Rules: []rbac.PolicyRule{
+				{
+					APIGroups: []string{""},
+					Resources: []string{"events"},
+					Verbs:     []string{"get", "list", "watch", "create", "update", "patch"},
+				},
+				{
+					APIGroups: []string{"storage.k8s.io"},
+					Resources: []string{"storageclasses"},
+					Verbs:     []string{"get", "list", "watch"},
+				},
+				{
+					APIGroups: []string{""},
+					Resources: []string{"persistentvolumes", "persistentvolumeclaims"},
+					Verbs:     []string{"*"},
+				},
+			},
+		},
+		clusterRoleNetServices: &rbac.ClusterRole{
+			ObjectMeta: meta.ObjectMeta{
+				Name:   clusterRoleNetServices,
+				Labels: builtinLabels(nil),
+				Annotations: map[string]string{
+					"kubernetes.io/description": "This role grants access to the minimum set of resources that are needed to run networking services for a node.",
+				},
+			},
+			Rules: []rbac.PolicyRule{
+				{
+					APIGroups: []string{"discovery.k8s.io"},
+					Resources: []string{"endpointslices"},
+					Verbs:     []string{"get", "list", "watch"},
+				},
+				{
+					APIGroups: []string{""},
+					Resources: []string{"services", "nodes", "namespaces"},
+					Verbs:     []string{"get", "list", "watch"},
+				},
+			},
+		},
 	}
 }
 
@@ -173,5 +224,47 @@
 				},
 			},
 		},
+		clusterRoleBindingCSIProvisioners: &rbac.ClusterRoleBinding{
+			ObjectMeta: meta.ObjectMeta{
+				Name:   clusterRoleBindingCSIProvisioners,
+				Labels: builtinLabels(nil),
+				Annotations: map[string]string{
+					"kubernetes.io/description": "This role binding grants CSI provisioners running on nodes access to the necessary resources.",
+				},
+			},
+			RoleRef: rbac.RoleRef{
+				APIGroup: rbac.GroupName,
+				Kind:     "ClusterRole",
+				Name:     clusterRoleCSIProvisioner,
+			},
+			Subjects: []rbac.Subject{
+				{
+					APIGroup: rbac.GroupName,
+					Kind:     "Group",
+					Name:     "metropolis:csi-provisioner",
+				},
+			},
+		},
+		clusterRoleBindingNetServices: &rbac.ClusterRoleBinding{
+			ObjectMeta: meta.ObjectMeta{
+				Name:   clusterRoleBindingNetServices,
+				Labels: builtinLabels(nil),
+				Annotations: map[string]string{
+					"kubernetes.io/description": "This role binding grants node network services access to necessary resources.",
+				},
+			},
+			RoleRef: rbac.RoleRef{
+				APIGroup: rbac.GroupName,
+				Kind:     "ClusterRole",
+				Name:     clusterRoleNetServices,
+			},
+			Subjects: []rbac.Subject{
+				{
+					APIGroup: rbac.GroupName,
+					Kind:     "Group",
+					Name:     "metropolis:netservices",
+				},
+			},
+		},
 	}
 }
diff --git a/metropolis/node/kubernetes/service_controller.go b/metropolis/node/kubernetes/service_controller.go
index d1de0b2..a662666 100644
--- a/metropolis/node/kubernetes/service_controller.go
+++ b/metropolis/node/kubernetes/service_controller.go
@@ -24,22 +24,16 @@
 
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
-	"k8s.io/client-go/informers"
 	"k8s.io/client-go/kubernetes"
 	"k8s.io/client-go/tools/clientcmd"
 
-	oclusternet "source.monogon.dev/metropolis/node/core/clusternet"
 	"source.monogon.dev/metropolis/node/core/identity"
 	"source.monogon.dev/metropolis/node/core/localstorage"
 	"source.monogon.dev/metropolis/node/core/network"
 	"source.monogon.dev/metropolis/node/core/network/dns"
 	"source.monogon.dev/metropolis/node/kubernetes/authproxy"
-	"source.monogon.dev/metropolis/node/kubernetes/clusternet"
-	"source.monogon.dev/metropolis/node/kubernetes/nfproxy"
 	"source.monogon.dev/metropolis/node/kubernetes/pki"
-	"source.monogon.dev/metropolis/node/kubernetes/plugins/kvmdevice"
 	"source.monogon.dev/metropolis/node/kubernetes/reconciler"
-	"source.monogon.dev/metropolis/pkg/event"
 	"source.monogon.dev/metropolis/pkg/supervisor"
 
 	apb "source.monogon.dev/metropolis/proto/api"
@@ -50,11 +44,10 @@
 	ClusterNet     net.IPNet
 	ClusterDomain  string
 
-	KPKI       *pki.PKI
-	Root       *localstorage.Root
-	Network    *network.Service
-	Node       *identity.Node
-	PodNetwork event.Value[*oclusternet.Prefixes]
+	KPKI    *pki.PKI
+	Root    *localstorage.Root
+	Network *network.Service
+	Node    *identity.Node
 }
 
 type Controller struct {
@@ -95,12 +88,9 @@
 		return fmt.Errorf("could not generate kubernetes client: %w", err)
 	}
 
-	informerFactory := informers.NewSharedInformerFactory(clientSet, 5*time.Minute)
-
 	// Sub-runnable which starts all parts of Kubernetes that depend on the
 	// machine's external IP address. If it changes, the runnable will exit.
 	// TODO(q3k): test this
-	startKubelet := make(chan struct{})
 	supervisor.Run(ctx, "networked", func(ctx context.Context) error {
 		networkWatch := s.c.Network.Watch()
 		defer networkWatch.Close()
@@ -124,21 +114,8 @@
 			EphemeralConsensusDirectory: &s.c.Root.Ephemeral.Consensus,
 		}
 
-		kubelet := kubeletService{
-			NodeName:           s.c.Node.ID(),
-			ClusterDNS:         []net.IP{address},
-			ClusterDomain:      s.c.ClusterDomain,
-			KubeletDirectory:   &s.c.Root.Data.Kubernetes.Kubelet,
-			EphemeralDirectory: &s.c.Root.Ephemeral,
-			KPKI:               s.c.KPKI,
-		}
-
 		err := supervisor.RunGroup(ctx, map[string]supervisor.Runnable{
 			"apiserver": apiserver.Run,
-			"kubelet": func(ctx context.Context) error {
-				<-startKubelet
-				return kubelet.Run(ctx)
-			},
 		})
 		if err != nil {
 			return fmt.Errorf("when starting apiserver/kubelet: %w", err)
@@ -165,7 +142,6 @@
 		err := reconciler.ReconcileAll(ctx, clientSet)
 		if err == nil {
 			supervisor.Logger(ctx).Infof("Initial resource reconciliation succeeded.")
-			close(startKubelet)
 			break
 		}
 		if time.Now().After(startLogging) {
@@ -175,33 +151,6 @@
 		time.Sleep(100 * time.Millisecond)
 	}
 
-	csiPlugin := csiPluginServer{
-		KubeletDirectory: &s.c.Root.Data.Kubernetes.Kubelet,
-		VolumesDirectory: &s.c.Root.Data.Volumes,
-	}
-
-	csiProvisioner := csiProvisionerServer{
-		NodeName:         s.c.Node.ID(),
-		Kubernetes:       clientSet,
-		InformerFactory:  informerFactory,
-		VolumesDirectory: &s.c.Root.Data.Volumes,
-	}
-
-	clusternet := clusternet.Service{
-		NodeName:   s.c.Node.ID(),
-		Kubernetes: clientSet,
-		Prefixes:   s.c.PodNetwork,
-	}
-
-	nfproxy := nfproxy.Service{
-		ClusterCIDR: s.c.ClusterNet,
-		ClientSet:   clientSet,
-	}
-
-	kvmDevicePlugin := kvmdevice.Plugin{
-		KubeletDirectory: &s.c.Root.Data.Kubernetes.Kubelet,
-	}
-
 	authProxy := authproxy.Service{
 		KPKI: s.c.KPKI,
 		Node: s.c.Node,
@@ -214,11 +163,6 @@
 		{"controller-manager", runControllerManager(*controllerManagerConfig)},
 		{"scheduler", runScheduler(*schedulerConfig)},
 		{"reconciler", reconciler.Maintain(clientSet)},
-		{"csi-plugin", csiPlugin.Run},
-		{"csi-provisioner", csiProvisioner.Run},
-		{"clusternet", clusternet.Run},
-		{"nfproxy", nfproxy.Run},
-		{"kvmdeviceplugin", kvmDevicePlugin.Run},
 		{"authproxy", authProxy.Run},
 	} {
 		err := supervisor.Run(ctx, sub.name, sub.runnable)
diff --git a/metropolis/node/kubernetes/service_worker.go b/metropolis/node/kubernetes/service_worker.go
index 2e6e190..d9f333e 100644
--- a/metropolis/node/kubernetes/service_worker.go
+++ b/metropolis/node/kubernetes/service_worker.go
@@ -2,14 +2,25 @@
 
 import (
 	"context"
+	"crypto/ed25519"
 	"fmt"
 	"net"
+	"time"
+
+	"k8s.io/client-go/informers"
+	"k8s.io/client-go/kubernetes"
+	"k8s.io/client-go/tools/clientcmd"
 
 	"source.monogon.dev/go/net/tinylb"
 	"source.monogon.dev/metropolis/node"
 	oclusternet "source.monogon.dev/metropolis/node/core/clusternet"
 	"source.monogon.dev/metropolis/node/core/localstorage"
 	"source.monogon.dev/metropolis/node/core/network"
+	"source.monogon.dev/metropolis/node/core/network/dns"
+	"source.monogon.dev/metropolis/node/kubernetes/clusternet"
+	"source.monogon.dev/metropolis/node/kubernetes/nfproxy"
+	kpki "source.monogon.dev/metropolis/node/kubernetes/pki"
+	"source.monogon.dev/metropolis/node/kubernetes/plugins/kvmdevice"
 	"source.monogon.dev/metropolis/pkg/event"
 	"source.monogon.dev/metropolis/pkg/event/memory"
 	"source.monogon.dev/metropolis/pkg/supervisor"
@@ -71,7 +82,201 @@
 		return err
 	}
 
+	kubelet := kubeletService{
+		ClusterDomain:      s.c.ClusterDomain,
+		KubeletDirectory:   &s.c.Root.Data.Kubernetes.Kubelet,
+		EphemeralDirectory: &s.c.Root.Ephemeral,
+	}
+
+	// Gather all required material to send over for certficiate issuance to the
+	// curator...
+	kwr := &ipb.IssueCertificateRequest_KubernetesWorker{}
+
+	kubeletPK, err := kubelet.getPubkey(ctx)
+	if err != nil {
+		return fmt.Errorf("when getting kubelet pubkey: %w", err)
+	}
+	kwr.KubeletPubkey = kubeletPK
+
+	clients := map[string]*struct {
+		dir *localstorage.PKIDirectory
+
+		sk ed25519.PrivateKey
+		pk ed25519.PublicKey
+
+		client     *kubernetes.Clientset
+		informers  informers.SharedInformerFactory
+		kubeconfig []byte
+
+		certFrom func(kw *ipb.IssueCertificateResponse_KubernetesWorker) []byte
+	}{
+		"csi": {
+			dir: &s.c.Root.Data.Kubernetes.CSIProvisioner.PKI,
+			certFrom: func(kw *ipb.IssueCertificateResponse_KubernetesWorker) []byte {
+				return kw.CsiProvisionerCertificate
+			},
+		},
+		"netserv": {
+			dir: &s.c.Root.Data.Kubernetes.Netservices.PKI,
+			certFrom: func(kw *ipb.IssueCertificateResponse_KubernetesWorker) []byte {
+				return kw.NetservicesCertificate
+			},
+		},
+	}
+
+	for name, c := range clients {
+		if err := c.dir.GeneratePrivateKey(); err != nil {
+			return fmt.Errorf("generating %s key: %w", name, err)
+		}
+		k, err := c.dir.ReadPrivateKey()
+		if err != nil {
+			return fmt.Errorf("reading %s key: %w", name, err)
+		}
+		c.sk = k
+		c.pk = c.sk.Public().(ed25519.PublicKey)
+	}
+	kwr.CsiProvisionerPubkey = clients["csi"].pk
+	kwr.NetservicesPubkey = clients["netserv"].pk
+
+	// ...issue certificates...
+	res, err := s.c.CuratorClient.IssueCertificate(ctx, &ipb.IssueCertificateRequest{
+		Kind: &ipb.IssueCertificateRequest_KubernetesWorker_{
+			KubernetesWorker: kwr,
+		},
+	})
+	if err != nil {
+		return fmt.Errorf("failed to get certificates from curator: %w", err)
+	}
+	kw := res.Kind.(*ipb.IssueCertificateResponse_KubernetesWorker_).KubernetesWorker
+
+	// ...write them...
+	if err := kubelet.setCertificates(kw); err != nil {
+		return fmt.Errorf("failed to write kubelet certs: %w", err)
+	}
+	for name, c := range clients {
+		if c.dir == nil {
+			continue
+		}
+		if err := c.dir.WriteCertificates(kw.IdentityCaCertificate, c.certFrom(kw)); err != nil {
+			return fmt.Errorf("failed to write %s certs: %w", name, err)
+		}
+	}
+
+	// ... and set up connections.
+	for name, c := range clients {
+		kubeconf, err := kpki.KubeconfigRaw(kw.IdentityCaCertificate, c.certFrom(kw), c.sk, kpki.KubernetesAPIEndpointForWorker)
+		if err != nil {
+			return fmt.Errorf("failed to make %s kubeconfig: %w", name, err)
+		}
+		c.kubeconfig = kubeconf
+		cs, informers, err := connectByKubeconfig(kubeconf)
+		if err != nil {
+			return fmt.Errorf("failed to connect with %s: %w", name, err)
+		}
+		c.client = cs
+		c.informers = informers
+	}
+
+	// Sub-runnable which starts all parts of Kubernetes that depend on the
+	// machine's external IP address. If it changes, the runnable will exit.
+	// TODO(q3k): test this
+	supervisor.Run(ctx, "networked", func(ctx context.Context) error {
+		networkWatch := s.c.Network.Watch()
+		defer networkWatch.Close()
+
+		var status *network.Status
+
+		supervisor.Logger(ctx).Info("Waiting for node networking...")
+		for status == nil || status.ExternalAddress == nil {
+			status, err = networkWatch.Get(ctx)
+			if err != nil {
+				return fmt.Errorf("failed to get network status: %w", err)
+			}
+		}
+		address := status.ExternalAddress
+		supervisor.Logger(ctx).Info("Node has active networking, starting apiserver/kubelet")
+		kubelet.ClusterDNS = []net.IP{address}
+		err := supervisor.RunGroup(ctx, map[string]supervisor.Runnable{
+			"kubelet": kubelet.Run,
+		})
+		if err != nil {
+			return fmt.Errorf("when starting kubelet: %w", err)
+		}
+
+		supervisor.Signal(ctx, supervisor.SignalHealthy)
+
+		for status.ExternalAddress.Equal(address) {
+			status, err = networkWatch.Get(ctx)
+			if err != nil {
+				return fmt.Errorf("when watching for network changes: %w", err)
+			}
+		}
+		return fmt.Errorf("network configuration changed (%s -> %s)", address.String(), status.ExternalAddress.String())
+	})
+
+	csiPlugin := csiPluginServer{
+		KubeletDirectory: &s.c.Root.Data.Kubernetes.Kubelet,
+		VolumesDirectory: &s.c.Root.Data.Volumes,
+	}
+
+	csiProvisioner := csiProvisionerServer{
+		NodeName:         s.c.NodeID,
+		Kubernetes:       clients["csi"].client,
+		InformerFactory:  clients["csi"].informers,
+		VolumesDirectory: &s.c.Root.Data.Volumes,
+	}
+
+	clusternet := clusternet.Service{
+		NodeName:   s.c.NodeID,
+		Kubernetes: clients["netserv"].client,
+		Prefixes:   s.c.PodNetwork,
+	}
+
+	nfproxy := nfproxy.Service{
+		ClusterCIDR: s.c.ClusterNet,
+		ClientSet:   clients["netserv"].client,
+	}
+
+	kvmDevicePlugin := kvmdevice.Plugin{
+		KubeletDirectory: &s.c.Root.Data.Kubernetes.Kubelet,
+	}
+
+	for _, sub := range []struct {
+		name     string
+		runnable supervisor.Runnable
+	}{
+		{"csi-plugin", csiPlugin.Run},
+		{"csi-provisioner", csiProvisioner.Run},
+		{"clusternet", clusternet.Run},
+		{"nfproxy", nfproxy.Run},
+		{"kvmdeviceplugin", kvmDevicePlugin.Run},
+	} {
+		err := supervisor.Run(ctx, sub.name, sub.runnable)
+		if err != nil {
+			return fmt.Errorf("could not run sub-service %q: %w", sub.name, err)
+		}
+	}
+
+	supervisor.Logger(ctx).Info("Registering K8s CoreDNS")
+	clusterDNSDirective := dns.NewKubernetesDirective(s.c.ClusterDomain, clients["netserv"].kubeconfig)
+	s.c.Network.ConfigureDNS(clusterDNSDirective)
+
 	supervisor.Signal(ctx, supervisor.SignalHealthy)
 	<-ctx.Done()
+	s.c.Network.ConfigureDNS(dns.CancelDirective(clusterDNSDirective))
 	return nil
 }
+
+func connectByKubeconfig(kubeconfig []byte) (*kubernetes.Clientset, informers.SharedInformerFactory, error) {
+	rawClientConfig, err := clientcmd.NewClientConfigFromBytes(kubeconfig)
+	if err != nil {
+		return nil, nil, fmt.Errorf("could not generate kubernetes client config: %w", err)
+	}
+	clientConfig, err := rawClientConfig.ClientConfig()
+	clientSet, err := kubernetes.NewForConfig(clientConfig)
+	if err != nil {
+		return nil, nil, fmt.Errorf("could not generate kubernetes client: %w", err)
+	}
+	informerFactory := informers.NewSharedInformerFactory(clientSet, 5*time.Minute)
+	return clientSet, informerFactory, nil
+}
diff --git a/metropolis/test/e2e/main_test.go b/metropolis/test/e2e/main_test.go
index 4e2ceb7..4a54d2f 100644
--- a/metropolis/test/e2e/main_test.go
+++ b/metropolis/test/e2e/main_test.go
@@ -20,6 +20,7 @@
 	"context"
 	"errors"
 	"fmt"
+	"io"
 	"net"
 	"net/http"
 	_ "net/http"
@@ -160,13 +161,57 @@
 			if err != nil {
 				t.Fatal(err)
 			}
-			util.TestEventual(t, "Nodes are registered and ready", ctx, largeTestTimeout, func(ctx context.Context) error {
+			util.TestEventual(t, "Add KubernetesWorker roles", ctx, smallTestTimeout, func(ctx context.Context) error {
+				// Find all nodes that are non-controllers.
+				var ids []string
+				srvN, err := mgmt.GetNodes(ctx, &apb.GetNodesRequest{})
+				if err != nil {
+					return fmt.Errorf("GetNodes: %w", err)
+				}
+				defer srvN.CloseSend()
+				for {
+					node, err := srvN.Recv()
+					if err == io.EOF {
+						break
+					}
+					if err != nil {
+						return fmt.Errorf("GetNodes.Recv: %w", err)
+					}
+					if node.Roles.KubernetesController != nil {
+						continue
+					}
+					if node.Roles.ConsensusMember != nil {
+						continue
+					}
+					ids = append(ids, identity.NodeID(node.Pubkey))
+				}
+
+				if len(ids) < 1 {
+					return fmt.Errorf("no appropriate nodes found")
+				}
+
+				// Make all these nodes as KubernetesWorker.
+				for _, id := range ids {
+					tr := true
+					_, err := mgmt.UpdateNodeRoles(ctx, &apb.UpdateNodeRolesRequest{
+						Node: &apb.UpdateNodeRolesRequest_Id{
+							Id: id,
+						},
+						KubernetesWorker: &tr,
+					})
+					if err != nil {
+						return fmt.Errorf("could not make node %q into kubernetes worker: %w", id, err)
+					}
+				}
+				return nil
+			})
+			util.TestEventual(t, "Node is registered and ready", ctx, largeTestTimeout, func(ctx context.Context) error {
 				nodes, err := clientSet.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
 				if err != nil {
 					return err
 				}
-				if len(nodes.Items) < 2 {
-					return errors.New("nodes not yet registered")
+				if len(nodes.Items) < 1 {
+					return errors.New("node not yet registered")
 				}
 				node := nodes.Items[0]
 				for _, cond := range node.Status.Conditions {