metropolis/node/kubernetes: move worker services to KubernetesWorker nodes
This finalizes the Big Split. After this change, nodes will only run a
kubelet (and related services) if they have a KubernetesWorker role
attached.
The first node in a new cluster now starts out with KubernetesController
and ConsensusMember. All joined nodes start with no roles attached.
Change-Id: I25a059318450b7d2dd3c19f3653fc15367867693
Reviewed-on: https://review.monogon.dev/c/monogon/+/1380
Tested-by: Jenkins CI
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/node/kubernetes/BUILD.bazel b/metropolis/node/kubernetes/BUILD.bazel
index 1279cff..cbad367 100644
--- a/metropolis/node/kubernetes/BUILD.bazel
+++ b/metropolis/node/kubernetes/BUILD.bazel
@@ -36,7 +36,6 @@
"//metropolis/pkg/fsquota",
"//metropolis/pkg/logtree",
"//metropolis/pkg/loop",
- "//metropolis/pkg/pki",
"//metropolis/pkg/supervisor",
"//metropolis/proto/api",
"@com_github_container_storage_interface_spec//lib/go/csi",
diff --git a/metropolis/node/kubernetes/kubelet.go b/metropolis/node/kubernetes/kubelet.go
index 7a0d362..e262534 100644
--- a/metropolis/node/kubernetes/kubelet.go
+++ b/metropolis/node/kubernetes/kubelet.go
@@ -18,61 +18,64 @@
import (
"context"
+ "crypto/ed25519"
"encoding/json"
+ "encoding/pem"
"fmt"
- "io"
"net"
"os/exec"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
kubeletconfig "k8s.io/kubelet/config/v1beta1"
+ ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
"source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/node/kubernetes/pki"
"source.monogon.dev/metropolis/node/kubernetes/reconciler"
"source.monogon.dev/metropolis/pkg/fileargs"
- opki "source.monogon.dev/metropolis/pkg/pki"
"source.monogon.dev/metropolis/pkg/supervisor"
)
type kubeletService struct {
- NodeName string
ClusterDNS []net.IP
ClusterDomain string
KubeletDirectory *localstorage.DataKubernetesKubeletDirectory
EphemeralDirectory *localstorage.EphemeralDirectory
- Output io.Writer
- KPKI *pki.PKI
- mount *opki.FilesystemCertificate
- mountKubeconfigPath string
+ kubeconfig []byte
+ serverCACert []byte
+ serverCert []byte
}
-func (s *kubeletService) createCertificates(ctx context.Context) error {
- server, client, err := s.KPKI.VolatileKubelet(ctx, s.NodeName)
+func (s *kubeletService) getPubkey(ctx context.Context) (ed25519.PublicKey, error) {
+ // First make sure we have a local ED25519 private key, and generate one if not.
+ if err := s.KubeletDirectory.PKI.GeneratePrivateKey(); err != nil {
+ return nil, fmt.Errorf("failed to generate private key: %w", err)
+ }
+ priv, err := s.KubeletDirectory.PKI.ReadPrivateKey()
if err != nil {
- return fmt.Errorf("when generating local kubelet credentials: %w", err)
+ return nil, fmt.Errorf("could not read keypair: %w", err)
+ }
+ pubkey := priv.Public().(ed25519.PublicKey)
+ return pubkey, nil
+}
+
+func (s *kubeletService) setCertificates(kw *ipb.IssueCertificateResponse_KubernetesWorker) error {
+ key, err := s.KubeletDirectory.PKI.ReadPrivateKey()
+ if err != nil {
+ return fmt.Errorf("could not read private key from disk: %w", err)
}
- clientKubeconfig, err := pki.Kubeconfig(ctx, s.KPKI.KV, client, pki.KubernetesAPIEndpointForController)
+ s.kubeconfig, err = pki.KubeconfigRaw(kw.IdentityCaCertificate, kw.KubeletClientCertificate, key, pki.KubernetesAPIEndpointForWorker)
if err != nil {
return fmt.Errorf("when generating kubeconfig: %w", err)
}
-
- // Use a single fileargs mount for server certificate and client kubeconfig.
- mounted, err := server.Mount(ctx, s.KPKI.KV)
- if err != nil {
- return fmt.Errorf("could not mount kubelet cert dir: %w", err)
- }
- // mounted is closed by Run() on process exit.
-
- s.mount = mounted
- s.mountKubeconfigPath = mounted.ArgPath("kubeconfig", clientKubeconfig)
-
+ s.serverCACert = pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: kw.IdentityCaCertificate})
+ s.serverCert = pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: kw.KubeletServerCertificate})
return nil
}
-func (s *kubeletService) configure() *kubeletconfig.KubeletConfiguration {
+func (s *kubeletService) configure(fargs *fileargs.FileArgs) *kubeletconfig.KubeletConfiguration {
var clusterDNS []string
for _, dnsIP := range s.ClusterDNS {
clusterDNS = append(clusterDNS, dnsIP.String())
@@ -83,13 +86,13 @@
Kind: "KubeletConfiguration",
APIVersion: kubeletconfig.GroupName + "/v1beta1",
},
- TLSCertFile: s.mount.CertPath,
- TLSPrivateKeyFile: s.mount.KeyPath,
+ TLSCertFile: fargs.ArgPath("server.crt", s.serverCert),
+ TLSPrivateKeyFile: s.KubeletDirectory.PKI.Key.FullPath(),
TLSMinVersion: "VersionTLS13",
ClusterDNS: clusterDNS,
Authentication: kubeletconfig.KubeletAuthentication{
X509: kubeletconfig.KubeletX509Authentication{
- ClientCAFile: s.mount.CACertPath,
+ ClientCAFile: fargs.ArgPath("ca.crt", s.serverCACert),
},
},
// TODO(q3k): move reconciler.False to a generic package, fix the following references.
@@ -111,24 +114,25 @@
}
func (s *kubeletService) Run(ctx context.Context) error {
- if err := s.createCertificates(ctx); err != nil {
- return fmt.Errorf("when creating certificates: %w", err)
- }
- defer s.mount.Close()
-
- configRaw, err := json.Marshal(s.configure())
- if err != nil {
- return fmt.Errorf("when marshaling kubelet configuration: %w", err)
+ if len(s.serverCert) == 0 || len(s.serverCACert) == 0 || len(s.kubeconfig) == 0 {
+ return fmt.Errorf("setCertificates was not called")
}
fargs, err := fileargs.New()
if err != nil {
return err
}
+ defer fargs.Close()
+
+ configRaw, err := json.Marshal(s.configure(fargs))
+ if err != nil {
+ return fmt.Errorf("when marshaling kubelet configuration: %w", err)
+ }
+
cmd := exec.CommandContext(ctx, "/kubernetes/bin/kube", "kubelet",
fargs.FileOpt("--config", "config.json", configRaw),
fmt.Sprintf("--container-runtime-endpoint=unix://%s", s.EphemeralDirectory.Containerd.ClientSocket.FullPath()),
- fmt.Sprintf("--kubeconfig=%s", s.mountKubeconfigPath),
+ fargs.FileOpt("--kubeconfig", "kubeconfig", s.kubeconfig),
fmt.Sprintf("--root-dir=%s", s.KubeletDirectory.FullPath()),
)
cmd.Env = []string{"PATH=/kubernetes/bin"}
diff --git a/metropolis/node/kubernetes/pki/kubernetes.go b/metropolis/node/kubernetes/pki/kubernetes.go
index dbebf73..ead8897 100644
--- a/metropolis/node/kubernetes/pki/kubernetes.go
+++ b/metropolis/node/kubernetes/pki/kubernetes.go
@@ -392,29 +392,25 @@
return client, nil
}
-// VolatileKubelet returns a pair of server/client ceritficates for the Kubelet
-// to use. The certificates are ephemeral, meaning they are not stored in etcd,
-// and instead are regenerated any time this function is called.
-func (k *PKI) VolatileKubelet(ctx context.Context, name string) (server *opki.Certificate, client *opki.Certificate, err error) {
- name = fmt.Sprintf("system:node:%s", name)
+// NetServices returns a certificate to be used by nfproxy and clusternet running
+// on a worker node.
+func (k *PKI) NetServices(ctx context.Context, name string, pubkey ed25519.PublicKey) (client *opki.Certificate, err error) {
+ name = fmt.Sprintf("metropolis:netservices:%s", name)
err = k.EnsureAll(ctx)
if err != nil {
- return nil, nil, fmt.Errorf("could not ensure certificates exist: %w", err)
+ return nil, fmt.Errorf("could not ensure certificates exist: %w", err)
}
kubeCA := k.Certificates[IdCA]
- server = &opki.Certificate{
- Namespace: &k.namespace,
- Issuer: kubeCA,
- Template: opki.Server([]string{name}, nil),
- Mode: opki.CertificateEphemeral,
- }
+ clientName := fmt.Sprintf("netservices-%s", name)
client = &opki.Certificate{
+ Name: clientName,
Namespace: &k.namespace,
Issuer: kubeCA,
- Template: opki.Client(name, []string{"system:nodes"}),
- Mode: opki.CertificateEphemeral,
+ Template: opki.Client(name, []string{"metropolis:netservices"}),
+ Mode: opki.CertificateExternal,
+ PublicKey: pubkey,
}
- return server, client, nil
+ return client, nil
}
// VolatileClient returns a client certificate for Kubernetes clients to use.
diff --git a/metropolis/node/kubernetes/reconciler/resources_rbac.go b/metropolis/node/kubernetes/reconciler/resources_rbac.go
index 0976ba5..4eab82e 100644
--- a/metropolis/node/kubernetes/reconciler/resources_rbac.go
+++ b/metropolis/node/kubernetes/reconciler/resources_rbac.go
@@ -29,6 +29,10 @@
clusterRoleBindingDefaultPSP = builtinRBACName("default-psp-for-sa")
clusterRoleBindingAPIServerKubeletClient = builtinRBACName("apiserver-kubelet-client")
clusterRoleBindingOwnerAdmin = builtinRBACName("owner-admin")
+ clusterRoleCSIProvisioner = builtinRBACName("csi-provisioner")
+ clusterRoleBindingCSIProvisioners = builtinRBACName("csi-provisioner")
+ clusterRoleNetServices = builtinRBACName("netservices")
+ clusterRoleBindingNetServices = builtinRBACName("netservices")
)
type resourceClusterRoles struct {
@@ -75,6 +79,53 @@
},
},
},
+ clusterRoleCSIProvisioner: &rbac.ClusterRole{
+ ObjectMeta: meta.ObjectMeta{
+ Name: clusterRoleCSIProvisioner,
+ Labels: builtinLabels(nil),
+ Annotations: map[string]string{
+ "kubernetes.io/description": "This role grants access to PersistentVolumes, PersistentVolumeClaims and StorageClassses, as used the the CSI provisioner running on nodes.",
+ },
+ },
+ Rules: []rbac.PolicyRule{
+ {
+ APIGroups: []string{""},
+ Resources: []string{"events"},
+ Verbs: []string{"get", "list", "watch", "create", "update", "patch"},
+ },
+ {
+ APIGroups: []string{"storage.k8s.io"},
+ Resources: []string{"storageclasses"},
+ Verbs: []string{"get", "list", "watch"},
+ },
+ {
+ APIGroups: []string{""},
+ Resources: []string{"persistentvolumes", "persistentvolumeclaims"},
+ Verbs: []string{"*"},
+ },
+ },
+ },
+ clusterRoleNetServices: &rbac.ClusterRole{
+ ObjectMeta: meta.ObjectMeta{
+ Name: clusterRoleNetServices,
+ Labels: builtinLabels(nil),
+ Annotations: map[string]string{
+ "kubernetes.io/description": "This role grants access to the minimum set of resources that are needed to run networking services for a node.",
+ },
+ },
+ Rules: []rbac.PolicyRule{
+ {
+ APIGroups: []string{"discovery.k8s.io"},
+ Resources: []string{"endpointslices"},
+ Verbs: []string{"get", "list", "watch"},
+ },
+ {
+ APIGroups: []string{""},
+ Resources: []string{"services", "nodes", "namespaces"},
+ Verbs: []string{"get", "list", "watch"},
+ },
+ },
+ },
}
}
@@ -173,5 +224,47 @@
},
},
},
+ clusterRoleBindingCSIProvisioners: &rbac.ClusterRoleBinding{
+ ObjectMeta: meta.ObjectMeta{
+ Name: clusterRoleBindingCSIProvisioners,
+ Labels: builtinLabels(nil),
+ Annotations: map[string]string{
+ "kubernetes.io/description": "This role binding grants CSI provisioners running on nodes access to the necessary resources.",
+ },
+ },
+ RoleRef: rbac.RoleRef{
+ APIGroup: rbac.GroupName,
+ Kind: "ClusterRole",
+ Name: clusterRoleCSIProvisioner,
+ },
+ Subjects: []rbac.Subject{
+ {
+ APIGroup: rbac.GroupName,
+ Kind: "Group",
+ Name: "metropolis:csi-provisioner",
+ },
+ },
+ },
+ clusterRoleBindingNetServices: &rbac.ClusterRoleBinding{
+ ObjectMeta: meta.ObjectMeta{
+ Name: clusterRoleBindingNetServices,
+ Labels: builtinLabels(nil),
+ Annotations: map[string]string{
+ "kubernetes.io/description": "This role binding grants node network services access to necessary resources.",
+ },
+ },
+ RoleRef: rbac.RoleRef{
+ APIGroup: rbac.GroupName,
+ Kind: "ClusterRole",
+ Name: clusterRoleNetServices,
+ },
+ Subjects: []rbac.Subject{
+ {
+ APIGroup: rbac.GroupName,
+ Kind: "Group",
+ Name: "metropolis:netservices",
+ },
+ },
+ },
}
}
diff --git a/metropolis/node/kubernetes/service_controller.go b/metropolis/node/kubernetes/service_controller.go
index d1de0b2..a662666 100644
--- a/metropolis/node/kubernetes/service_controller.go
+++ b/metropolis/node/kubernetes/service_controller.go
@@ -24,22 +24,16 @@
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
- "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
- oclusternet "source.monogon.dev/metropolis/node/core/clusternet"
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/node/core/network"
"source.monogon.dev/metropolis/node/core/network/dns"
"source.monogon.dev/metropolis/node/kubernetes/authproxy"
- "source.monogon.dev/metropolis/node/kubernetes/clusternet"
- "source.monogon.dev/metropolis/node/kubernetes/nfproxy"
"source.monogon.dev/metropolis/node/kubernetes/pki"
- "source.monogon.dev/metropolis/node/kubernetes/plugins/kvmdevice"
"source.monogon.dev/metropolis/node/kubernetes/reconciler"
- "source.monogon.dev/metropolis/pkg/event"
"source.monogon.dev/metropolis/pkg/supervisor"
apb "source.monogon.dev/metropolis/proto/api"
@@ -50,11 +44,10 @@
ClusterNet net.IPNet
ClusterDomain string
- KPKI *pki.PKI
- Root *localstorage.Root
- Network *network.Service
- Node *identity.Node
- PodNetwork event.Value[*oclusternet.Prefixes]
+ KPKI *pki.PKI
+ Root *localstorage.Root
+ Network *network.Service
+ Node *identity.Node
}
type Controller struct {
@@ -95,12 +88,9 @@
return fmt.Errorf("could not generate kubernetes client: %w", err)
}
- informerFactory := informers.NewSharedInformerFactory(clientSet, 5*time.Minute)
-
// Sub-runnable which starts all parts of Kubernetes that depend on the
// machine's external IP address. If it changes, the runnable will exit.
// TODO(q3k): test this
- startKubelet := make(chan struct{})
supervisor.Run(ctx, "networked", func(ctx context.Context) error {
networkWatch := s.c.Network.Watch()
defer networkWatch.Close()
@@ -124,21 +114,8 @@
EphemeralConsensusDirectory: &s.c.Root.Ephemeral.Consensus,
}
- kubelet := kubeletService{
- NodeName: s.c.Node.ID(),
- ClusterDNS: []net.IP{address},
- ClusterDomain: s.c.ClusterDomain,
- KubeletDirectory: &s.c.Root.Data.Kubernetes.Kubelet,
- EphemeralDirectory: &s.c.Root.Ephemeral,
- KPKI: s.c.KPKI,
- }
-
err := supervisor.RunGroup(ctx, map[string]supervisor.Runnable{
"apiserver": apiserver.Run,
- "kubelet": func(ctx context.Context) error {
- <-startKubelet
- return kubelet.Run(ctx)
- },
})
if err != nil {
return fmt.Errorf("when starting apiserver/kubelet: %w", err)
@@ -165,7 +142,6 @@
err := reconciler.ReconcileAll(ctx, clientSet)
if err == nil {
supervisor.Logger(ctx).Infof("Initial resource reconciliation succeeded.")
- close(startKubelet)
break
}
if time.Now().After(startLogging) {
@@ -175,33 +151,6 @@
time.Sleep(100 * time.Millisecond)
}
- csiPlugin := csiPluginServer{
- KubeletDirectory: &s.c.Root.Data.Kubernetes.Kubelet,
- VolumesDirectory: &s.c.Root.Data.Volumes,
- }
-
- csiProvisioner := csiProvisionerServer{
- NodeName: s.c.Node.ID(),
- Kubernetes: clientSet,
- InformerFactory: informerFactory,
- VolumesDirectory: &s.c.Root.Data.Volumes,
- }
-
- clusternet := clusternet.Service{
- NodeName: s.c.Node.ID(),
- Kubernetes: clientSet,
- Prefixes: s.c.PodNetwork,
- }
-
- nfproxy := nfproxy.Service{
- ClusterCIDR: s.c.ClusterNet,
- ClientSet: clientSet,
- }
-
- kvmDevicePlugin := kvmdevice.Plugin{
- KubeletDirectory: &s.c.Root.Data.Kubernetes.Kubelet,
- }
-
authProxy := authproxy.Service{
KPKI: s.c.KPKI,
Node: s.c.Node,
@@ -214,11 +163,6 @@
{"controller-manager", runControllerManager(*controllerManagerConfig)},
{"scheduler", runScheduler(*schedulerConfig)},
{"reconciler", reconciler.Maintain(clientSet)},
- {"csi-plugin", csiPlugin.Run},
- {"csi-provisioner", csiProvisioner.Run},
- {"clusternet", clusternet.Run},
- {"nfproxy", nfproxy.Run},
- {"kvmdeviceplugin", kvmDevicePlugin.Run},
{"authproxy", authProxy.Run},
} {
err := supervisor.Run(ctx, sub.name, sub.runnable)
diff --git a/metropolis/node/kubernetes/service_worker.go b/metropolis/node/kubernetes/service_worker.go
index 2e6e190..d9f333e 100644
--- a/metropolis/node/kubernetes/service_worker.go
+++ b/metropolis/node/kubernetes/service_worker.go
@@ -2,14 +2,25 @@
import (
"context"
+ "crypto/ed25519"
"fmt"
"net"
+ "time"
+
+ "k8s.io/client-go/informers"
+ "k8s.io/client-go/kubernetes"
+ "k8s.io/client-go/tools/clientcmd"
"source.monogon.dev/go/net/tinylb"
"source.monogon.dev/metropolis/node"
oclusternet "source.monogon.dev/metropolis/node/core/clusternet"
"source.monogon.dev/metropolis/node/core/localstorage"
"source.monogon.dev/metropolis/node/core/network"
+ "source.monogon.dev/metropolis/node/core/network/dns"
+ "source.monogon.dev/metropolis/node/kubernetes/clusternet"
+ "source.monogon.dev/metropolis/node/kubernetes/nfproxy"
+ kpki "source.monogon.dev/metropolis/node/kubernetes/pki"
+ "source.monogon.dev/metropolis/node/kubernetes/plugins/kvmdevice"
"source.monogon.dev/metropolis/pkg/event"
"source.monogon.dev/metropolis/pkg/event/memory"
"source.monogon.dev/metropolis/pkg/supervisor"
@@ -71,7 +82,201 @@
return err
}
+ kubelet := kubeletService{
+ ClusterDomain: s.c.ClusterDomain,
+ KubeletDirectory: &s.c.Root.Data.Kubernetes.Kubelet,
+ EphemeralDirectory: &s.c.Root.Ephemeral,
+ }
+
+ // Gather all required material to send over for certficiate issuance to the
+ // curator...
+ kwr := &ipb.IssueCertificateRequest_KubernetesWorker{}
+
+ kubeletPK, err := kubelet.getPubkey(ctx)
+ if err != nil {
+ return fmt.Errorf("when getting kubelet pubkey: %w", err)
+ }
+ kwr.KubeletPubkey = kubeletPK
+
+ clients := map[string]*struct {
+ dir *localstorage.PKIDirectory
+
+ sk ed25519.PrivateKey
+ pk ed25519.PublicKey
+
+ client *kubernetes.Clientset
+ informers informers.SharedInformerFactory
+ kubeconfig []byte
+
+ certFrom func(kw *ipb.IssueCertificateResponse_KubernetesWorker) []byte
+ }{
+ "csi": {
+ dir: &s.c.Root.Data.Kubernetes.CSIProvisioner.PKI,
+ certFrom: func(kw *ipb.IssueCertificateResponse_KubernetesWorker) []byte {
+ return kw.CsiProvisionerCertificate
+ },
+ },
+ "netserv": {
+ dir: &s.c.Root.Data.Kubernetes.Netservices.PKI,
+ certFrom: func(kw *ipb.IssueCertificateResponse_KubernetesWorker) []byte {
+ return kw.NetservicesCertificate
+ },
+ },
+ }
+
+ for name, c := range clients {
+ if err := c.dir.GeneratePrivateKey(); err != nil {
+ return fmt.Errorf("generating %s key: %w", name, err)
+ }
+ k, err := c.dir.ReadPrivateKey()
+ if err != nil {
+ return fmt.Errorf("reading %s key: %w", name, err)
+ }
+ c.sk = k
+ c.pk = c.sk.Public().(ed25519.PublicKey)
+ }
+ kwr.CsiProvisionerPubkey = clients["csi"].pk
+ kwr.NetservicesPubkey = clients["netserv"].pk
+
+ // ...issue certificates...
+ res, err := s.c.CuratorClient.IssueCertificate(ctx, &ipb.IssueCertificateRequest{
+ Kind: &ipb.IssueCertificateRequest_KubernetesWorker_{
+ KubernetesWorker: kwr,
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("failed to get certificates from curator: %w", err)
+ }
+ kw := res.Kind.(*ipb.IssueCertificateResponse_KubernetesWorker_).KubernetesWorker
+
+ // ...write them...
+ if err := kubelet.setCertificates(kw); err != nil {
+ return fmt.Errorf("failed to write kubelet certs: %w", err)
+ }
+ for name, c := range clients {
+ if c.dir == nil {
+ continue
+ }
+ if err := c.dir.WriteCertificates(kw.IdentityCaCertificate, c.certFrom(kw)); err != nil {
+ return fmt.Errorf("failed to write %s certs: %w", name, err)
+ }
+ }
+
+ // ... and set up connections.
+ for name, c := range clients {
+ kubeconf, err := kpki.KubeconfigRaw(kw.IdentityCaCertificate, c.certFrom(kw), c.sk, kpki.KubernetesAPIEndpointForWorker)
+ if err != nil {
+ return fmt.Errorf("failed to make %s kubeconfig: %w", name, err)
+ }
+ c.kubeconfig = kubeconf
+ cs, informers, err := connectByKubeconfig(kubeconf)
+ if err != nil {
+ return fmt.Errorf("failed to connect with %s: %w", name, err)
+ }
+ c.client = cs
+ c.informers = informers
+ }
+
+ // Sub-runnable which starts all parts of Kubernetes that depend on the
+ // machine's external IP address. If it changes, the runnable will exit.
+ // TODO(q3k): test this
+ supervisor.Run(ctx, "networked", func(ctx context.Context) error {
+ networkWatch := s.c.Network.Watch()
+ defer networkWatch.Close()
+
+ var status *network.Status
+
+ supervisor.Logger(ctx).Info("Waiting for node networking...")
+ for status == nil || status.ExternalAddress == nil {
+ status, err = networkWatch.Get(ctx)
+ if err != nil {
+ return fmt.Errorf("failed to get network status: %w", err)
+ }
+ }
+ address := status.ExternalAddress
+ supervisor.Logger(ctx).Info("Node has active networking, starting apiserver/kubelet")
+ kubelet.ClusterDNS = []net.IP{address}
+ err := supervisor.RunGroup(ctx, map[string]supervisor.Runnable{
+ "kubelet": kubelet.Run,
+ })
+ if err != nil {
+ return fmt.Errorf("when starting kubelet: %w", err)
+ }
+
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+
+ for status.ExternalAddress.Equal(address) {
+ status, err = networkWatch.Get(ctx)
+ if err != nil {
+ return fmt.Errorf("when watching for network changes: %w", err)
+ }
+ }
+ return fmt.Errorf("network configuration changed (%s -> %s)", address.String(), status.ExternalAddress.String())
+ })
+
+ csiPlugin := csiPluginServer{
+ KubeletDirectory: &s.c.Root.Data.Kubernetes.Kubelet,
+ VolumesDirectory: &s.c.Root.Data.Volumes,
+ }
+
+ csiProvisioner := csiProvisionerServer{
+ NodeName: s.c.NodeID,
+ Kubernetes: clients["csi"].client,
+ InformerFactory: clients["csi"].informers,
+ VolumesDirectory: &s.c.Root.Data.Volumes,
+ }
+
+ clusternet := clusternet.Service{
+ NodeName: s.c.NodeID,
+ Kubernetes: clients["netserv"].client,
+ Prefixes: s.c.PodNetwork,
+ }
+
+ nfproxy := nfproxy.Service{
+ ClusterCIDR: s.c.ClusterNet,
+ ClientSet: clients["netserv"].client,
+ }
+
+ kvmDevicePlugin := kvmdevice.Plugin{
+ KubeletDirectory: &s.c.Root.Data.Kubernetes.Kubelet,
+ }
+
+ for _, sub := range []struct {
+ name string
+ runnable supervisor.Runnable
+ }{
+ {"csi-plugin", csiPlugin.Run},
+ {"csi-provisioner", csiProvisioner.Run},
+ {"clusternet", clusternet.Run},
+ {"nfproxy", nfproxy.Run},
+ {"kvmdeviceplugin", kvmDevicePlugin.Run},
+ } {
+ err := supervisor.Run(ctx, sub.name, sub.runnable)
+ if err != nil {
+ return fmt.Errorf("could not run sub-service %q: %w", sub.name, err)
+ }
+ }
+
+ supervisor.Logger(ctx).Info("Registering K8s CoreDNS")
+ clusterDNSDirective := dns.NewKubernetesDirective(s.c.ClusterDomain, clients["netserv"].kubeconfig)
+ s.c.Network.ConfigureDNS(clusterDNSDirective)
+
supervisor.Signal(ctx, supervisor.SignalHealthy)
<-ctx.Done()
+ s.c.Network.ConfigureDNS(dns.CancelDirective(clusterDNSDirective))
return nil
}
+
+func connectByKubeconfig(kubeconfig []byte) (*kubernetes.Clientset, informers.SharedInformerFactory, error) {
+ rawClientConfig, err := clientcmd.NewClientConfigFromBytes(kubeconfig)
+ if err != nil {
+ return nil, nil, fmt.Errorf("could not generate kubernetes client config: %w", err)
+ }
+ clientConfig, err := rawClientConfig.ClientConfig()
+ clientSet, err := kubernetes.NewForConfig(clientConfig)
+ if err != nil {
+ return nil, nil, fmt.Errorf("could not generate kubernetes client: %w", err)
+ }
+ informerFactory := informers.NewSharedInformerFactory(clientSet, 5*time.Minute)
+ return clientSet, informerFactory, nil
+}