Port kubernetes package to supervisor
This replaces the ad-hoc goroutine and process management
previously in the kubernetes package with a nice supervisor-based
implementation which should make it easier to understand and more
reliable. It also prevents creation of more ad-hoc launching code
for future features (like CSI & Provisioning).
Since porting SmalltownNode is rather involved I just instantiated a
new supervision tree in the Kubernetes main service and wired it
up to the old interface. Once we port SmalltownNode we can just
remove the legacy Start() method and directly call Run().
Test Plan:
Passes Bazel tests, Kubernetes functionality was manually
tested by running `bazel run //core/cmd/dbg -- kubectl run -i --image alpine:edge sh`
to verify that Kubernetes still works properly. Automated tests for this
are being worked on.
X-Origin-Diff: phab/D534
GitOrigin-RevId: 001de38eaa5c7ee661bf5db9a7c3d0125c1b6af2
diff --git a/core/internal/kubernetes/BUILD.bazel b/core/internal/kubernetes/BUILD.bazel
index 534bf6e..166d13f 100644
--- a/core/internal/kubernetes/BUILD.bazel
+++ b/core/internal/kubernetes/BUILD.bazel
@@ -15,7 +15,7 @@
visibility = ["//core:__subpackages__"],
deps = [
"//core/api/api:go_default_library",
- "//core/internal/common/service:go_default_library",
+ "//core/internal/common/supervisor:go_default_library",
"//core/internal/consensus:go_default_library",
"//core/pkg/fileargs:go_default_library",
"//core/pkg/logbuffer:go_default_library",
diff --git a/core/internal/kubernetes/apiserver.go b/core/internal/kubernetes/apiserver.go
index ac43035..dc48b96 100644
--- a/core/internal/kubernetes/apiserver.go
+++ b/core/internal/kubernetes/apiserver.go
@@ -21,13 +21,14 @@
"encoding/pem"
"errors"
"fmt"
- "go.uber.org/zap"
+ "io"
"net"
"os/exec"
"path"
"go.etcd.io/etcd/clientv3"
+ "git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
"git.monogon.dev/source/nexantic.git/core/pkg/fileargs"
)
@@ -65,56 +66,54 @@
return &config, nil
}
-func (s *Service) runAPIServer(ctx context.Context, config apiserverConfig) error {
- args, err := fileargs.New()
- if err != nil {
- panic(err) // If this fails, something is very wrong. Just crash.
- }
- defer args.Close()
- cmd := exec.CommandContext(ctx, "/kubernetes/bin/kube", "kube-apiserver",
- fmt.Sprintf("--advertise-address=%v", config.advertiseAddress.String()),
- "--authorization-mode=Node,RBAC",
- args.FileOpt("--client-ca-file", "client-ca.pem",
- pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: config.idCA})),
- "--enable-admission-plugins=NodeRestriction,PodSecurityPolicy",
- "--enable-aggregator-routing=true",
- "--insecure-port=0",
- // Due to the magic of GRPC this really needs four slashes and a :0
- fmt.Sprintf("--etcd-servers=%v", "unix:////consensus/listener.sock:0"),
- args.FileOpt("--kubelet-client-certificate", "kubelet-client-cert.pem",
- pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: config.kubeletClientCert})),
- args.FileOpt("--kubelet-client-key", "kubelet-client-key.pem",
- pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: config.kubeletClientKey})),
- "--kubelet-preferred-address-types=Hostname",
- args.FileOpt("--proxy-client-cert-file", "aggregation-client-cert.pem",
- pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: config.aggregationClientCert})),
- args.FileOpt("--proxy-client-key-file", "aggregation-client-key.pem",
- pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: config.aggregationClientKey})),
- "--requestheader-allowed-names=front-proxy-client",
- args.FileOpt("--requestheader-client-ca-file", "aggregation-ca.pem",
- pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: config.aggregationCA})),
- "--requestheader-extra-headers-prefix=X-Remote-Extra-",
- "--requestheader-group-headers=X-Remote-Group",
- "--requestheader-username-headers=X-Remote-User",
- args.FileOpt("--service-account-key-file", "service-account-pubkey.pem",
- pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: config.serviceAccountPrivKey})),
- fmt.Sprintf("--service-cluster-ip-range=%v", config.serviceIPRange.String()),
- args.FileOpt("--tls-cert-file", "server-cert.pem",
- pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: config.serverCert})),
- args.FileOpt("--tls-private-key-file", "server-key.pem",
- pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: config.serverKey})),
- )
- if args.Error() != nil {
+func runAPIServer(config apiserverConfig, output io.Writer) supervisor.Runnable {
+ return func(ctx context.Context) error {
+ args, err := fileargs.New()
+ if err != nil {
+ panic(err) // If this fails, something is very wrong. Just crash.
+ }
+ defer args.Close()
+ cmd := exec.CommandContext(ctx, "/kubernetes/bin/kube", "kube-apiserver",
+ fmt.Sprintf("--advertise-address=%v", config.advertiseAddress.String()),
+ "--authorization-mode=Node,RBAC",
+ args.FileOpt("--client-ca-file", "client-ca.pem",
+ pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: config.idCA})),
+ "--enable-admission-plugins=NodeRestriction,PodSecurityPolicy",
+ "--enable-aggregator-routing=true",
+ "--insecure-port=0",
+ // Due to the magic of GRPC this really needs four slashes and a :0
+ fmt.Sprintf("--etcd-servers=%v", "unix:////consensus/listener.sock:0"),
+ args.FileOpt("--kubelet-client-certificate", "kubelet-client-cert.pem",
+ pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: config.kubeletClientCert})),
+ args.FileOpt("--kubelet-client-key", "kubelet-client-key.pem",
+ pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: config.kubeletClientKey})),
+ "--kubelet-preferred-address-types=Hostname",
+ args.FileOpt("--proxy-client-cert-file", "aggregation-client-cert.pem",
+ pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: config.aggregationClientCert})),
+ args.FileOpt("--proxy-client-key-file", "aggregation-client-key.pem",
+ pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: config.aggregationClientKey})),
+ "--requestheader-allowed-names=front-proxy-client",
+ args.FileOpt("--requestheader-client-ca-file", "aggregation-ca.pem",
+ pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: config.aggregationCA})),
+ "--requestheader-extra-headers-prefix=X-Remote-Extra-",
+ "--requestheader-group-headers=X-Remote-Group",
+ "--requestheader-username-headers=X-Remote-User",
+ args.FileOpt("--service-account-key-file", "service-account-pubkey.pem",
+ pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: config.serviceAccountPrivKey})),
+ fmt.Sprintf("--service-cluster-ip-range=%v", config.serviceIPRange.String()),
+ args.FileOpt("--tls-cert-file", "server-cert.pem",
+ pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: config.serverCert})),
+ args.FileOpt("--tls-private-key-file", "server-key.pem",
+ pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: config.serverKey})),
+ )
+ if args.Error() != nil {
+ return err
+ }
+ cmd.Stdout = output
+ cmd.Stderr = output
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ err = cmd.Run()
+ fmt.Fprintf(output, "apiserver stopped: %v\n", err)
return err
}
- cmd.Stdout = s.apiserverLogs
- cmd.Stderr = s.apiserverLogs
- err = cmd.Run()
- fmt.Fprintf(s.apiserverLogs, "apiserver stopped: %v\n", err)
- if ctx.Err() == context.Canceled {
- s.logger.Info("apiserver stopped", zap.Error(err))
- } else {
- s.logger.Warn("apiserver stopped unexpectedly", zap.Error(err))
- }
- return err
}
diff --git a/core/internal/kubernetes/controller-manager.go b/core/internal/kubernetes/controller-manager.go
index a67f6fd..20d4605 100644
--- a/core/internal/kubernetes/controller-manager.go
+++ b/core/internal/kubernetes/controller-manager.go
@@ -20,12 +20,13 @@
"context"
"encoding/pem"
"fmt"
- "go.uber.org/zap"
+ "io"
"net"
"os/exec"
"go.etcd.io/etcd/clientv3"
+ "git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
"git.monogon.dev/source/nexantic.git/core/pkg/fileargs"
)
@@ -61,37 +62,36 @@
return &config, nil
}
-func (s *Service) runControllerManager(ctx context.Context, config controllerManagerConfig) error {
- args, err := fileargs.New()
- if err != nil {
- panic(err) // If this fails, something is very wrong. Just crash.
+func runControllerManager(config controllerManagerConfig, output io.Writer) supervisor.Runnable {
+ return func(ctx context.Context) error {
+ args, err := fileargs.New()
+ if err != nil {
+ panic(err) // If this fails, something is very wrong. Just crash.
+ }
+ defer args.Close()
+
+ cmd := exec.CommandContext(ctx, "/kubernetes/bin/kube", "kube-controller-manager",
+ args.FileOpt("--kubeconfig", "kubeconfig", config.kubeConfig),
+ args.FileOpt("--service-account-private-key-file", "service-account-privkey.pem",
+ pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: config.serviceAccountPrivKey})),
+ args.FileOpt("--root-ca-file", "root-ca.pem",
+ pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: config.rootCA})),
+ "--port=0", // Kill insecure serving
+ "--use-service-account-credentials=true", // Enables things like PSP enforcement
+ fmt.Sprintf("--cluster-cidr=%v", config.clusterNet.String()),
+ args.FileOpt("--tls-cert-file", "server-cert.pem",
+ pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: config.serverCert})),
+ args.FileOpt("--tls-private-key-file", "server-key.pem",
+ pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: config.serverKey})),
+ )
+ if args.Error() != nil {
+ return fmt.Errorf("failed to use fileargs: %w", err)
+ }
+ cmd.Stdout = output
+ cmd.Stderr = output
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ err = cmd.Run()
+ fmt.Fprintf(output, "controller-manager stopped: %v\n", err)
+ return err
}
- defer args.Close()
- cmd := exec.CommandContext(ctx, "/kubernetes/bin/kube", "kube-controller-manager",
- args.FileOpt("--kubeconfig", "kubeconfig", config.kubeConfig),
- args.FileOpt("--service-account-private-key-file", "service-account-privkey.pem",
- pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: config.serviceAccountPrivKey})),
- args.FileOpt("--root-ca-file", "root-ca.pem",
- pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: config.rootCA})),
- "--port=0", // Kill insecure serving
- "--use-service-account-credentials=true", // Enables things like PSP enforcement
- fmt.Sprintf("--cluster-cidr=%v", config.clusterNet.String()),
- args.FileOpt("--tls-cert-file", "server-cert.pem",
- pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: config.serverCert})),
- args.FileOpt("--tls-private-key-file", "server-key.pem",
- pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: config.serverKey})),
- )
- if args.Error() != nil {
- return fmt.Errorf("failed to use fileargs: %w", err)
- }
- cmd.Stdout = s.controllerManagerLogs
- cmd.Stderr = s.controllerManagerLogs
- err = cmd.Run()
- fmt.Fprintf(s.controllerManagerLogs, "controller-manager stopped: %v\n", err)
- if ctx.Err() == context.Canceled {
- s.logger.Info("controller-manager stopped", zap.Error(err))
- } else {
- s.logger.Warn("controller-manager stopped unexpectedly", zap.Error(err))
- }
- return err
}
diff --git a/core/internal/kubernetes/kubelet.go b/core/internal/kubernetes/kubelet.go
index b7d8157..3b1a123 100644
--- a/core/internal/kubernetes/kubelet.go
+++ b/core/internal/kubernetes/kubelet.go
@@ -22,17 +22,18 @@
"encoding/json"
"encoding/pem"
"fmt"
- "go.uber.org/zap"
+ "io"
"io/ioutil"
+ "net"
"os"
"os/exec"
- "net"
-
- "git.monogon.dev/source/nexantic.git/core/pkg/fileargs"
"go.etcd.io/etcd/clientv3"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
- "k8s.io/kubelet/config/v1beta1"
+ kubeletconfig "k8s.io/kubelet/config/v1beta1"
+
+ "git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
+ "git.monogon.dev/source/nexantic.git/core/pkg/fileargs"
)
type KubeletSpec struct {
@@ -77,65 +78,63 @@
return nil
}
-func (s *Service) runKubelet(ctx context.Context, spec *KubeletSpec) error {
- fargs, err := fileargs.New()
- if err != nil {
- return err
- }
- var clusterDNS []string
- for _, dnsIP := range spec.clusterDNS {
- clusterDNS = append(clusterDNS, dnsIP.String())
- }
+func runKubelet(spec *KubeletSpec, output io.Writer) supervisor.Runnable {
+ return func(ctx context.Context) error {
+ fargs, err := fileargs.New()
+ if err != nil {
+ return err
+ }
+ var clusterDNS []string
+ for _, dnsIP := range spec.clusterDNS {
+ clusterDNS = append(clusterDNS, dnsIP.String())
+ }
- kubeletConf := &v1beta1.KubeletConfiguration{
- TypeMeta: v1.TypeMeta{
- Kind: "KubeletConfiguration",
- APIVersion: v1beta1.GroupName + "/v1beta1",
- },
- TLSCertFile: "/data/kubernetes/kubelet.crt",
- TLSPrivateKeyFile: "/data/kubernetes/kubelet.key",
- TLSMinVersion: "VersionTLS13",
- ClusterDNS: clusterDNS,
- Authentication: v1beta1.KubeletAuthentication{
- X509: v1beta1.KubeletX509Authentication{
- ClientCAFile: "/data/kubernetes/ca.crt",
+ kubeletConf := &kubeletconfig.KubeletConfiguration{
+ TypeMeta: v1.TypeMeta{
+ Kind: "KubeletConfiguration",
+ APIVersion: kubeletconfig.GroupName + "/v1beta1",
},
- },
- ClusterDomain: "cluster.local",
- EnableControllerAttachDetach: False(),
- HairpinMode: "none",
- MakeIPTablesUtilChains: False(), // We don't have iptables
- FailSwapOn: False(), // Our kernel doesn't have swap enabled which breaks Kubelet's detection
- KubeReserved: map[string]string{
- "cpu": "200m",
- "memory": "300Mi",
- },
- // We're not going to use this, but let's make it point to a known-empty directory in case anybody manages to
- // trigger it.
- VolumePluginDir: "/kubernetes/conf/flexvolume-plugins",
- }
+ TLSCertFile: "/data/kubernetes/kubelet.crt",
+ TLSPrivateKeyFile: "/data/kubernetes/kubelet.key",
+ TLSMinVersion: "VersionTLS13",
+ ClusterDNS: clusterDNS,
+ Authentication: kubeletconfig.KubeletAuthentication{
+ X509: kubeletconfig.KubeletX509Authentication{
+ ClientCAFile: "/data/kubernetes/ca.crt",
+ },
+ },
+ ClusterDomain: "cluster.local", // cluster.local is hardcoded in the certificate too currently
+ EnableControllerAttachDetach: False(),
+ HairpinMode: "none",
+ MakeIPTablesUtilChains: False(), // We don't have iptables
+ FailSwapOn: False(), // Our kernel doesn't have swap enabled which breaks Kubelet's detection
+ KubeReserved: map[string]string{
+ "cpu": "200m",
+ "memory": "300Mi",
+ },
+ // We're not going to use this, but let's make it point to a known-empty directory in case anybody manages to
+ // trigger it.
+ VolumePluginDir: "/kubernetes/conf/flexvolume-plugins",
+ }
- configRaw, err := json.Marshal(kubeletConf)
- if err != nil {
+ configRaw, err := json.Marshal(kubeletConf)
+ if err != nil {
+ return err
+ }
+ cmd := exec.CommandContext(ctx, "/kubernetes/bin/kube", "kubelet",
+ fargs.FileOpt("--config", "config.json", configRaw),
+ "--container-runtime=remote",
+ "--container-runtime-endpoint=unix:///containerd/run/containerd.sock",
+ "--kubeconfig=/data/kubernetes/kubelet.kubeconfig",
+ "--root-dir=/data/kubernetes/kubelet",
+ )
+ cmd.Env = []string{"PATH=/kubernetes/bin"}
+ cmd.Stdout = output
+ cmd.Stderr = output
+
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ err = cmd.Run()
+ fmt.Fprintf(output, "kubelet stopped: %v\n", err)
return err
}
- cmd := exec.CommandContext(ctx, "/kubernetes/bin/kube", "kubelet",
- fargs.FileOpt("--config", "config.json", configRaw),
- "--container-runtime=remote",
- "--container-runtime-endpoint=unix:///containerd/run/containerd.sock",
- "--kubeconfig=/data/kubernetes/kubelet.kubeconfig",
- "--root-dir=/data/kubernetes/kubelet",
- )
- cmd.Env = []string{"PATH=/kubernetes/bin"}
- cmd.Stdout = s.kubeletLogs
- cmd.Stderr = s.kubeletLogs
-
- err = cmd.Run()
- fmt.Fprintf(s.kubeletLogs, "kubelet stopped: %v\n", err)
- if ctx.Err() == context.Canceled {
- s.logger.Info("kubelet stopped", zap.Error(err))
- } else {
- s.logger.Warn("kubelet stopped unexpectedly", zap.Error(err))
- }
- return err
}
diff --git a/core/internal/kubernetes/reconcile.go b/core/internal/kubernetes/reconcile.go
index cf991ce..092cd8e 100644
--- a/core/internal/kubernetes/reconcile.go
+++ b/core/internal/kubernetes/reconcile.go
@@ -33,6 +33,8 @@
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
+
+ "git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
)
const builtinRBACPrefix = "smalltown:"
@@ -174,47 +176,49 @@
},
}
-func runReconciler(ctx context.Context, masterKubeconfig []byte, log *zap.Logger) error {
- rawClientConfig, err := clientcmd.NewClientConfigFromBytes(masterKubeconfig)
- if err != nil {
- return err
- }
+type reconciler func(context.Context, *kubernetes.Clientset) error
- clientConfig, err := rawClientConfig.ClientConfig()
- clientset, err := kubernetes.NewForConfig(clientConfig)
- if err != nil {
- return err
- }
- t := time.NewTicker(10 * time.Second)
- for {
- err = reconcile(ctx, clientset)
- select {
- case <-t.C:
- err = reconcile(ctx, clientset)
- if err != nil {
- log.Warn("Failed to reconcile built-in resources", zap.Error(err))
+func runReconciler(masterKubeconfig []byte) supervisor.Runnable {
+ return func(ctx context.Context) error {
+ log := supervisor.Logger(ctx)
+ rawClientConfig, err := clientcmd.NewClientConfigFromBytes(masterKubeconfig)
+ if err != nil {
+ return err
+ }
+
+ clientConfig, err := rawClientConfig.ClientConfig()
+ clientSet, err := kubernetes.NewForConfig(clientConfig)
+ if err != nil {
+ return err
+ }
+ reconcilers := map[string]reconciler{
+ "psps": reconcilePSPs,
+ "clusterroles": reconcileClusterRoles,
+ "clusterrolebindings": reconcileClusterRoleBindings,
+ }
+ t := time.NewTicker(10 * time.Second)
+ reconcile := func() {
+ for name, reconciler := range reconcilers {
+ if err := reconciler(ctx, clientSet); err != nil {
+ log.Warn("Failed to reconcile built-in resources", zap.String("kind", name), zap.Error(err))
+ }
}
- case <-ctx.Done():
- return nil
+ }
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ reconcile()
+ for {
+ select {
+ case <-t.C:
+ reconcile()
+ case <-ctx.Done():
+ return nil
+ }
}
}
}
-func reconcile(ctx context.Context, clientset *kubernetes.Clientset) error {
- if err := reconcilePSPs(ctx, clientset); err != nil {
- return err
- }
- if err := reconcileClusterRoles(ctx, clientset); err != nil {
- return err
- }
- if err := reconcileClusterRoleBindings(ctx, clientset); err != nil {
- return err
- }
- return nil
-}
-
-func reconcilePSPs(ctx context.Context, clientset *kubernetes.Clientset) error {
- pspClient := clientset.PolicyV1beta1().PodSecurityPolicies()
+func reconcilePSPs(ctx context.Context, clientSet *kubernetes.Clientset) error {
+ pspClient := clientSet.PolicyV1beta1().PodSecurityPolicies()
availablePSPs, err := pspClient.List(ctx, metav1.ListOptions{
LabelSelector: "smalltown.com/builtin=true",
})
@@ -246,8 +250,8 @@
return nil
}
-func reconcileClusterRoles(ctx context.Context, clientset *kubernetes.Clientset) error {
- crClient := clientset.RbacV1().ClusterRoles()
+func reconcileClusterRoles(ctx context.Context, clientSet *kubernetes.Clientset) error {
+ crClient := clientSet.RbacV1().ClusterRoles()
availableCRs, err := crClient.List(ctx, metav1.ListOptions{
LabelSelector: "smalltown.com/builtin=true",
})
diff --git a/core/internal/kubernetes/scheduler.go b/core/internal/kubernetes/scheduler.go
index 75dea97..d3ee20b 100644
--- a/core/internal/kubernetes/scheduler.go
+++ b/core/internal/kubernetes/scheduler.go
@@ -20,11 +20,12 @@
"context"
"encoding/pem"
"fmt"
+ "io"
"os/exec"
"go.etcd.io/etcd/clientv3"
- "go.uber.org/zap"
+ "git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
"git.monogon.dev/source/nexantic.git/core/pkg/fileargs"
)
@@ -48,31 +49,29 @@
return &config, nil
}
-func (s *Service) runScheduler(ctx context.Context, config schedulerConfig) error {
- args, err := fileargs.New()
- if err != nil {
- panic(err) // If this fails, something is very wrong. Just crash.
+func runScheduler(config schedulerConfig, output io.Writer) supervisor.Runnable {
+ return func(ctx context.Context) error {
+ args, err := fileargs.New()
+ if err != nil {
+ panic(err) // If this fails, something is very wrong. Just crash.
+ }
+ defer args.Close()
+ cmd := exec.CommandContext(ctx, "/kubernetes/bin/kube", "kube-scheduler",
+ args.FileOpt("--kubeconfig", "kubeconfig", config.kubeConfig),
+ "--port=0", // Kill insecure serving
+ args.FileOpt("--tls-cert-file", "server-cert.pem",
+ pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: config.serverCert})),
+ args.FileOpt("--tls-private-key-file", "server-key.pem",
+ pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: config.serverKey})),
+ )
+ if args.Error() != nil {
+ return fmt.Errorf("failed to use fileargs: %w", err)
+ }
+ cmd.Stdout = output
+ cmd.Stderr = output
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ err = cmd.Run()
+ fmt.Fprintf(output, "scheduler stopped: %v\n", err)
+ return err
}
- defer args.Close()
- cmd := exec.CommandContext(ctx, "/kubernetes/bin/kube", "kube-scheduler",
- args.FileOpt("--kubeconfig", "kubeconfig", config.kubeConfig),
- "--port=0", // Kill insecure serving
- args.FileOpt("--tls-cert-file", "server-cert.pem",
- pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: config.serverCert})),
- args.FileOpt("--tls-private-key-file", "server-key.pem",
- pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: config.serverKey})),
- )
- if args.Error() != nil {
- return fmt.Errorf("failed to use fileargs: %w", err)
- }
- cmd.Stdout = s.schedulerLogs
- cmd.Stderr = s.schedulerLogs
- err = cmd.Run()
- fmt.Fprintf(s.schedulerLogs, "scheduler stopped: %v\n", err)
- if ctx.Err() == context.Canceled {
- s.logger.Info("scheduler stopped", zap.Error(err))
- } else {
- s.logger.Warn("scheduler stopped unexpectedly", zap.Error(err))
- }
- return err
}
diff --git a/core/internal/kubernetes/service.go b/core/internal/kubernetes/service.go
index 5e28292..ae93f4e 100644
--- a/core/internal/kubernetes/service.go
+++ b/core/internal/kubernetes/service.go
@@ -20,20 +20,17 @@
"context"
"crypto/ed25519"
"errors"
- "fmt"
"net"
+ "go.etcd.io/etcd/clientv3"
+ "go.uber.org/zap"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
schema "git.monogon.dev/source/nexantic.git/core/generated/api"
- "git.monogon.dev/source/nexantic.git/core/pkg/logbuffer"
-
- "go.etcd.io/etcd/clientv3"
- "go.uber.org/zap"
-
- "git.monogon.dev/source/nexantic.git/core/internal/common/service"
+ "git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
"git.monogon.dev/source/nexantic.git/core/internal/consensus"
+ "git.monogon.dev/source/nexantic.git/core/pkg/logbuffer"
)
type Config struct {
@@ -43,7 +40,6 @@
}
type Service struct {
- *service.BaseService
consensusService *consensus.Service
logger *zap.Logger
apiserverLogs *logbuffer.LogBuffer
@@ -61,7 +57,6 @@
schedulerLogs: logbuffer.New(5000, 16384),
kubeletLogs: logbuffer.New(5000, 16384),
}
- s.BaseService = service.NewBaseService("kubernetes", logger, s)
return s
}
@@ -107,65 +102,66 @@
return &schema.GetDebugKubeconfigResponse{DebugKubeconfig: string(debugKubeconfig)}, nil
}
-func (s *Service) OnStart() error {
- config := Config{
- AdvertiseAddress: net.IP{10, 0, 2, 15}, // Depends on networking
- ServiceIPRange: net.IPNet{ // TODO: Decide if configurable / final value
- IP: net.IP{192, 168, 188, 0},
- Mask: net.IPMask{0xff, 0xff, 0xff, 0x00}, // /24, but Go stores as a literal mask
- },
- ClusterNet: net.IPNet{
- IP: net.IP{192, 168, 188, 0},
- Mask: net.IPMask{0xff, 0xff, 0xfd, 0x00}, // /22
- },
- }
- consensusKV := s.getKV()
- apiserverConfig, err := getPKIApiserverConfig(consensusKV)
- if err != nil {
- return err
- }
- apiserverConfig.advertiseAddress = config.AdvertiseAddress
- apiserverConfig.serviceIPRange = config.ServiceIPRange
- controllerManagerConfig, err := getPKIControllerManagerConfig(consensusKV)
- if err != nil {
- return err
- }
- controllerManagerConfig.clusterNet = config.ClusterNet
- schedulerConfig, err := getPKISchedulerConfig(consensusKV)
- if err != nil {
- return err
- }
-
- masterKubeconfig, err := getSingle(consensusKV, "master.kubeconfig")
- if err != nil {
- return err
- }
-
- // TODO(lorenz): Once internal/node is part of the supervisor tree, these should all be supervisor runnables
- go func() {
- s.runAPIServer(context.TODO(), *apiserverConfig)
- }()
- go func() {
- s.runControllerManager(context.TODO(), *controllerManagerConfig)
- }()
- go func() {
- s.runScheduler(context.TODO(), *schedulerConfig)
- }()
-
- go func() {
- if err := s.runKubelet(context.TODO(), &KubeletSpec{}); err != nil {
- fmt.Printf("Failed to launch kubelet: %v\n", err)
- }
- }()
-
- go func() {
- go runReconciler(context.TODO(), masterKubeconfig, s.logger)
- }()
-
+func (s *Service) Start() error {
+ // TODO(lorenz): This creates a new supervision tree, it should instead attach to the root one. But for that SmalltownNode needs
+ // to be ported to supervisor.
+ supervisor.New(context.TODO(), s.logger, s.Run())
return nil
}
-func (s *Service) OnStop() error {
- // Requires advanced process management and not necessary for MVP
- return errors.New("Not implemented")
+func (s *Service) Run() supervisor.Runnable {
+ return func(ctx context.Context) error {
+ config := Config{
+ AdvertiseAddress: net.IP{10, 0, 2, 15}, // Depends on networking
+ ServiceIPRange: net.IPNet{ // TODO: Decide if configurable / final value
+ IP: net.IP{192, 168, 188, 0},
+ Mask: net.IPMask{0xff, 0xff, 0xff, 0x00}, // /24, but Go stores as a literal mask
+ },
+ ClusterNet: net.IPNet{
+ IP: net.IP{192, 168, 188, 0},
+ Mask: net.IPMask{0xff, 0xff, 0xfd, 0x00}, // /22
+ },
+ }
+ consensusKV := s.getKV()
+ apiserverConfig, err := getPKIApiserverConfig(consensusKV)
+ if err != nil {
+ return err
+ }
+ apiserverConfig.advertiseAddress = config.AdvertiseAddress
+ apiserverConfig.serviceIPRange = config.ServiceIPRange
+ controllerManagerConfig, err := getPKIControllerManagerConfig(consensusKV)
+ if err != nil {
+ return err
+ }
+ controllerManagerConfig.clusterNet = config.ClusterNet
+ schedulerConfig, err := getPKISchedulerConfig(consensusKV)
+ if err != nil {
+ return err
+ }
+
+ masterKubeconfig, err := getSingle(consensusKV, "master.kubeconfig")
+ if err != nil {
+ return err
+ }
+
+ if err := supervisor.Run(ctx, "apiserver", runAPIServer(*apiserverConfig, s.apiserverLogs)); err != nil {
+ return err
+ }
+ if err := supervisor.Run(ctx, "controller-manager", runControllerManager(*controllerManagerConfig, s.controllerManagerLogs)); err != nil {
+ return err
+ }
+ if err := supervisor.Run(ctx, "scheduler", runScheduler(*schedulerConfig, s.schedulerLogs)); err != nil {
+ return err
+ }
+
+ if err := supervisor.Run(ctx, "kubelet", runKubelet(&KubeletSpec{}, s.kubeletLogs)); err != nil {
+ return err
+ }
+ if err := supervisor.Run(ctx, "reconciler", runReconciler(masterKubeconfig)); err != nil {
+ return err
+ }
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ supervisor.Signal(ctx, supervisor.SignalDone)
+ return nil
+ }
}