Port kubernetes package to supervisor
This replaces the ad-hoc goroutine and process management
previously in the kubernetes package with a nice supervisor-based
implementation which should make it easier to understand and more
reliable. It also prevents creation of more ad-hoc launching code
for future features (like CSI & Provisioning).
Since porting SmalltownNode is rather involved I just instantiated a
new supervision tree in the Kubernetes main service and wired it
up to the old interface. Once we port SmalltownNode we can just
remove the legacy Start() method and directly call Run().
Test Plan:
Passes Bazel tests, Kubernetes functionality was manually
tested by running `bazel run //core/cmd/dbg -- kubectl run -i --image alpine:edge sh`
to verify that Kubernetes still works properly. Automated tests for this
are being worked on.
X-Origin-Diff: phab/D534
GitOrigin-RevId: 001de38eaa5c7ee661bf5db9a7c3d0125c1b6af2
diff --git a/core/internal/kubernetes/apiserver.go b/core/internal/kubernetes/apiserver.go
index ac43035..dc48b96 100644
--- a/core/internal/kubernetes/apiserver.go
+++ b/core/internal/kubernetes/apiserver.go
@@ -21,13 +21,14 @@
"encoding/pem"
"errors"
"fmt"
- "go.uber.org/zap"
+ "io"
"net"
"os/exec"
"path"
"go.etcd.io/etcd/clientv3"
+ "git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
"git.monogon.dev/source/nexantic.git/core/pkg/fileargs"
)
@@ -65,56 +66,54 @@
return &config, nil
}
-func (s *Service) runAPIServer(ctx context.Context, config apiserverConfig) error {
- args, err := fileargs.New()
- if err != nil {
- panic(err) // If this fails, something is very wrong. Just crash.
- }
- defer args.Close()
- cmd := exec.CommandContext(ctx, "/kubernetes/bin/kube", "kube-apiserver",
- fmt.Sprintf("--advertise-address=%v", config.advertiseAddress.String()),
- "--authorization-mode=Node,RBAC",
- args.FileOpt("--client-ca-file", "client-ca.pem",
- pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: config.idCA})),
- "--enable-admission-plugins=NodeRestriction,PodSecurityPolicy",
- "--enable-aggregator-routing=true",
- "--insecure-port=0",
- // Due to the magic of GRPC this really needs four slashes and a :0
- fmt.Sprintf("--etcd-servers=%v", "unix:////consensus/listener.sock:0"),
- args.FileOpt("--kubelet-client-certificate", "kubelet-client-cert.pem",
- pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: config.kubeletClientCert})),
- args.FileOpt("--kubelet-client-key", "kubelet-client-key.pem",
- pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: config.kubeletClientKey})),
- "--kubelet-preferred-address-types=Hostname",
- args.FileOpt("--proxy-client-cert-file", "aggregation-client-cert.pem",
- pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: config.aggregationClientCert})),
- args.FileOpt("--proxy-client-key-file", "aggregation-client-key.pem",
- pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: config.aggregationClientKey})),
- "--requestheader-allowed-names=front-proxy-client",
- args.FileOpt("--requestheader-client-ca-file", "aggregation-ca.pem",
- pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: config.aggregationCA})),
- "--requestheader-extra-headers-prefix=X-Remote-Extra-",
- "--requestheader-group-headers=X-Remote-Group",
- "--requestheader-username-headers=X-Remote-User",
- args.FileOpt("--service-account-key-file", "service-account-pubkey.pem",
- pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: config.serviceAccountPrivKey})),
- fmt.Sprintf("--service-cluster-ip-range=%v", config.serviceIPRange.String()),
- args.FileOpt("--tls-cert-file", "server-cert.pem",
- pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: config.serverCert})),
- args.FileOpt("--tls-private-key-file", "server-key.pem",
- pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: config.serverKey})),
- )
- if args.Error() != nil {
+func runAPIServer(config apiserverConfig, output io.Writer) supervisor.Runnable {
+ return func(ctx context.Context) error {
+ args, err := fileargs.New()
+ if err != nil {
+ panic(err) // If this fails, something is very wrong. Just crash.
+ }
+ defer args.Close()
+ cmd := exec.CommandContext(ctx, "/kubernetes/bin/kube", "kube-apiserver",
+ fmt.Sprintf("--advertise-address=%v", config.advertiseAddress.String()),
+ "--authorization-mode=Node,RBAC",
+ args.FileOpt("--client-ca-file", "client-ca.pem",
+ pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: config.idCA})),
+ "--enable-admission-plugins=NodeRestriction,PodSecurityPolicy",
+ "--enable-aggregator-routing=true",
+ "--insecure-port=0",
+ // Due to the magic of GRPC this really needs four slashes and a :0
+ fmt.Sprintf("--etcd-servers=%v", "unix:////consensus/listener.sock:0"),
+ args.FileOpt("--kubelet-client-certificate", "kubelet-client-cert.pem",
+ pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: config.kubeletClientCert})),
+ args.FileOpt("--kubelet-client-key", "kubelet-client-key.pem",
+ pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: config.kubeletClientKey})),
+ "--kubelet-preferred-address-types=Hostname",
+ args.FileOpt("--proxy-client-cert-file", "aggregation-client-cert.pem",
+ pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: config.aggregationClientCert})),
+ args.FileOpt("--proxy-client-key-file", "aggregation-client-key.pem",
+ pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: config.aggregationClientKey})),
+ "--requestheader-allowed-names=front-proxy-client",
+ args.FileOpt("--requestheader-client-ca-file", "aggregation-ca.pem",
+ pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: config.aggregationCA})),
+ "--requestheader-extra-headers-prefix=X-Remote-Extra-",
+ "--requestheader-group-headers=X-Remote-Group",
+ "--requestheader-username-headers=X-Remote-User",
+ args.FileOpt("--service-account-key-file", "service-account-pubkey.pem",
+ pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: config.serviceAccountPrivKey})),
+ fmt.Sprintf("--service-cluster-ip-range=%v", config.serviceIPRange.String()),
+ args.FileOpt("--tls-cert-file", "server-cert.pem",
+ pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: config.serverCert})),
+ args.FileOpt("--tls-private-key-file", "server-key.pem",
+ pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: config.serverKey})),
+ )
+ if args.Error() != nil {
+ return err
+ }
+ cmd.Stdout = output
+ cmd.Stderr = output
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ err = cmd.Run()
+ fmt.Fprintf(output, "apiserver stopped: %v\n", err)
return err
}
- cmd.Stdout = s.apiserverLogs
- cmd.Stderr = s.apiserverLogs
- err = cmd.Run()
- fmt.Fprintf(s.apiserverLogs, "apiserver stopped: %v\n", err)
- if ctx.Err() == context.Canceled {
- s.logger.Info("apiserver stopped", zap.Error(err))
- } else {
- s.logger.Warn("apiserver stopped unexpectedly", zap.Error(err))
- }
- return err
}