core: replace logbuffer with logtree

Test Plan: Component logs are currently untested?

X-Origin-Diff: phab/D643
GitOrigin-RevId: 44ace0a1937aee9ba6a49db6e862907ec24d6ea3
diff --git a/core/internal/common/supervisor/supervisor_support.go b/core/internal/common/supervisor/supervisor_support.go
index f11afcc..d54b35c 100644
--- a/core/internal/common/supervisor/supervisor_support.go
+++ b/core/internal/common/supervisor/supervisor_support.go
@@ -51,12 +51,12 @@
 	}
 }
 
-// Command will create a Runnable that starts a long-running command, whose exit is determined to be a failure.
-func Command(name string, arg ...string) Runnable {
-	return func(ctx context.Context) error {
-		Signal(ctx, SignalHealthy)
-
-		cmd := exec.CommandContext(ctx, name, arg...)
-		return cmd.Run()
-	}
+// RunCommand will create a Runnable that starts a long-running command, whose exit is determined to be a failure.
+func RunCommand(ctx context.Context, cmd *exec.Cmd) error {
+	Signal(ctx, SignalHealthy)
+	cmd.Stdout = RawLogger(ctx)
+	cmd.Stderr = RawLogger(ctx)
+	err := cmd.Run()
+	Logger(ctx).Infof("Command returned: %v", err)
+	return err
 }
diff --git a/core/internal/containerd/BUILD.bazel b/core/internal/containerd/BUILD.bazel
index 5c74a63..0e2de21 100644
--- a/core/internal/containerd/BUILD.bazel
+++ b/core/internal/containerd/BUILD.bazel
@@ -8,7 +8,6 @@
     deps = [
         "//core/internal/common/supervisor:go_default_library",
         "//core/internal/localstorage:go_default_library",
-        "//core/pkg/logbuffer:go_default_library",
         "@com_github_containerd_containerd//:go_default_library",
         "@com_github_containerd_containerd//namespaces:go_default_library",
     ],
diff --git a/core/internal/containerd/main.go b/core/internal/containerd/main.go
index 192039e..78fee97 100644
--- a/core/internal/containerd/main.go
+++ b/core/internal/containerd/main.go
@@ -32,7 +32,6 @@
 
 	"git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
 	"git.monogon.dev/source/nexantic.git/core/internal/localstorage"
-	"git.monogon.dev/source/nexantic.git/core/pkg/logbuffer"
 )
 
 const (
@@ -41,32 +40,41 @@
 
 type Service struct {
 	EphemeralVolume *localstorage.EphemeralContainerdDirectory
-	Log             *logbuffer.LogBuffer
-	RunscLog        *logbuffer.LogBuffer
 }
 
 func (s *Service) Run(ctx context.Context) error {
-	if s.Log == nil {
-		s.Log = logbuffer.New(5000, 16384)
-	}
-	if s.RunscLog == nil {
-		s.RunscLog = logbuffer.New(5000, 16384)
-	}
-
-	logger := supervisor.Logger(ctx)
-
 	cmd := exec.CommandContext(ctx, "/containerd/bin/containerd", "--config", "/containerd/conf/config.toml")
-	cmd.Stdout = s.Log
-	cmd.Stderr = s.Log
 	cmd.Env = []string{"PATH=/containerd/bin", "TMPDIR=" + s.EphemeralVolume.Tmp.FullPath()}
 
 	runscFifo, err := os.OpenFile(s.EphemeralVolume.RunSCLogsFIFO.FullPath(), os.O_CREATE|os.O_RDONLY, os.ModeNamedPipe|0777)
 	if err != nil {
 		return err
 	}
-	go func() {
+
+	if err := supervisor.Run(ctx, "runsc", s.logPump(runscFifo)); err != nil {
+		return fmt.Errorf("failed to start runsc log pump: %w", err)
+	}
+
+	if err := supervisor.Run(ctx, "preseed", s.runPreseed); err != nil {
+		return fmt.Errorf("failed to start preseed runnable: %w", err)
+	}
+	return supervisor.RunCommand(ctx, cmd)
+}
+
+// logPump returns a runnable that pipes data from a file/FIFO into its raw logger.
+// TODO(q3k): refactor this out to a generic function in supervisor or logtree.
+func (s *Service) logPump(fifo *os.File) supervisor.Runnable {
+	return func(ctx context.Context) error {
+		supervisor.Signal(ctx, supervisor.SignalHealthy)
 		for {
-			n, err := io.Copy(s.RunscLog, runscFifo)
+			// Quit if requested.
+			select {
+			case <-ctx.Done():
+				return ctx.Err()
+			default:
+			}
+
+			n, err := io.Copy(supervisor.RawLogger(ctx), fifo)
 			if n == 0 && err == nil {
 				// Hack because pipes/FIFOs can return zero reads when nobody is writing. To avoid busy-looping,
 				// sleep a bit before retrying. This does not loose data since the FIFO internal buffer will
@@ -74,20 +82,10 @@
 				// debug logs) is not an issue for us.
 				time.Sleep(10 * time.Millisecond)
 			} else if err != nil {
-				logger.Errorf("gVisor log pump failed, stopping it: %v", err)
-				return // It's likely that this will busy-loop printing errors if it encounters one, so bail
+				return fmt.Errorf("log pump failed: %v", err)
 			}
 		}
-	}()
-
-	if err := supervisor.Run(ctx, "preseed", s.runPreseed); err != nil {
-		return fmt.Errorf("failed to start preseed runnable: %w", err)
 	}
-	supervisor.Signal(ctx, supervisor.SignalHealthy)
-
-	err = cmd.Run()
-	fmt.Fprintf(s.Log, "containerd stopped: %v\n", err)
-	return err
 }
 
 // runPreseed loads OCI bundles in tar form from preseedNamespacesDir into containerd at startup.
diff --git a/core/internal/kubernetes/BUILD.bazel b/core/internal/kubernetes/BUILD.bazel
index bef0eb7..d68d1e8 100644
--- a/core/internal/kubernetes/BUILD.bazel
+++ b/core/internal/kubernetes/BUILD.bazel
@@ -25,7 +25,6 @@
         "//core/internal/network/dns:go_default_library",
         "//core/pkg/fileargs:go_default_library",
         "//core/pkg/fsquota:go_default_library",
-        "//core/pkg/logbuffer:go_default_library",
         "//core/pkg/logtree:go_default_library",
         "//core/proto/api:go_default_library",
         "@com_github_container_storage_interface_spec//lib/go/csi:go_default_library",
diff --git a/core/internal/kubernetes/controller-manager.go b/core/internal/kubernetes/controller-manager.go
index 126076e..690a553 100644
--- a/core/internal/kubernetes/controller-manager.go
+++ b/core/internal/kubernetes/controller-manager.go
@@ -20,7 +20,6 @@
 	"context"
 	"encoding/pem"
 	"fmt"
-	"io"
 	"net"
 	"os/exec"
 
@@ -61,7 +60,7 @@
 	return &config, nil
 }
 
-func runControllerManager(config controllerManagerConfig, output io.Writer) supervisor.Runnable {
+func runControllerManager(config controllerManagerConfig) supervisor.Runnable {
 	return func(ctx context.Context) error {
 		args, err := fileargs.New()
 		if err != nil {
@@ -89,11 +88,6 @@
 		if args.Error() != nil {
 			return fmt.Errorf("failed to use fileargs: %w", err)
 		}
-		cmd.Stdout = output
-		cmd.Stderr = output
-		supervisor.Signal(ctx, supervisor.SignalHealthy)
-		err = cmd.Run()
-		fmt.Fprintf(output, "controller-manager stopped: %v\n", err)
-		return err
+		return supervisor.RunCommand(ctx, cmd)
 	}
 }
diff --git a/core/internal/kubernetes/kubelet.go b/core/internal/kubernetes/kubelet.go
index d45a238..7a80f60 100644
--- a/core/internal/kubernetes/kubelet.go
+++ b/core/internal/kubernetes/kubelet.go
@@ -141,11 +141,5 @@
 		fmt.Sprintf("--root-dir=%s", s.KubeletDirectory.FullPath()),
 	)
 	cmd.Env = []string{"PATH=/kubernetes/bin"}
-	cmd.Stdout = s.Output
-	cmd.Stderr = s.Output
-
-	supervisor.Signal(ctx, supervisor.SignalHealthy)
-	err = cmd.Run()
-	fmt.Fprintf(s.Output, "kubelet stopped: %v\n", err)
-	return err
+	return supervisor.RunCommand(ctx, cmd)
 }
diff --git a/core/internal/kubernetes/scheduler.go b/core/internal/kubernetes/scheduler.go
index 5a91134..26f8bb1 100644
--- a/core/internal/kubernetes/scheduler.go
+++ b/core/internal/kubernetes/scheduler.go
@@ -20,7 +20,6 @@
 	"context"
 	"encoding/pem"
 	"fmt"
-	"io"
 	"os/exec"
 
 	"git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
@@ -48,7 +47,7 @@
 	return &config, nil
 }
 
-func runScheduler(config schedulerConfig, output io.Writer) supervisor.Runnable {
+func runScheduler(config schedulerConfig) supervisor.Runnable {
 	return func(ctx context.Context) error {
 		args, err := fileargs.New()
 		if err != nil {
@@ -66,11 +65,6 @@
 		if args.Error() != nil {
 			return fmt.Errorf("failed to use fileargs: %w", err)
 		}
-		cmd.Stdout = output
-		cmd.Stderr = output
-		supervisor.Signal(ctx, supervisor.SignalHealthy)
-		err = cmd.Run()
-		fmt.Fprintf(output, "scheduler stopped: %v\n", err)
-		return err
+		return supervisor.RunCommand(ctx, cmd)
 	}
 }
diff --git a/core/internal/kubernetes/service.go b/core/internal/kubernetes/service.go
index e4391f5..0075470 100644
--- a/core/internal/kubernetes/service.go
+++ b/core/internal/kubernetes/service.go
@@ -18,11 +18,9 @@
 
 import (
 	"context"
-	"errors"
 	"fmt"
 	"net"
 	"os"
-	"sync"
 	"time"
 
 	"google.golang.org/grpc/codes"
@@ -38,7 +36,6 @@
 	"git.monogon.dev/source/nexantic.git/core/internal/kubernetes/reconciler"
 	"git.monogon.dev/source/nexantic.git/core/internal/localstorage"
 	"git.monogon.dev/source/nexantic.git/core/internal/network/dns"
-	"git.monogon.dev/source/nexantic.git/core/pkg/logbuffer"
 	apb "git.monogon.dev/source/nexantic.git/core/proto/api"
 )
 
@@ -52,17 +49,8 @@
 	CorednsRegistrationChan chan *dns.ExtraDirective
 }
 
-type state struct {
-	apiserverLogs         *logbuffer.LogBuffer
-	controllerManagerLogs *logbuffer.LogBuffer
-	schedulerLogs         *logbuffer.LogBuffer
-	kubeletLogs           *logbuffer.LogBuffer
-}
-
 type Service struct {
-	c       Config
-	stateMu sync.Mutex
-	state   *state
+	c Config
 }
 
 func New(c Config) *Service {
@@ -72,23 +60,7 @@
 	return s
 }
 
-func (s *Service) getState() *state {
-	s.stateMu.Lock()
-	defer s.stateMu.Unlock()
-	return s.state
-}
-
 func (s *Service) Run(ctx context.Context) error {
-	st := &state{
-		apiserverLogs:         logbuffer.New(5000, 16384),
-		controllerManagerLogs: logbuffer.New(5000, 16384),
-		schedulerLogs:         logbuffer.New(5000, 16384),
-		kubeletLogs:           logbuffer.New(5000, 16384),
-	}
-	s.stateMu.Lock()
-	s.state = st
-	s.stateMu.Unlock()
-
 	controllerManagerConfig, err := getPKIControllerManagerConfig(ctx, s.c.KPKI)
 	if err != nil {
 		return fmt.Errorf("could not generate controller manager pki config: %w", err)
@@ -128,7 +100,6 @@
 		KPKI:                        s.c.KPKI,
 		AdvertiseAddress:            s.c.AdvertiseAddress,
 		ServiceIPRange:              s.c.ServiceIPRange,
-		Output:                      st.apiserverLogs,
 		EphemeralConsensusDirectory: &s.c.Root.Ephemeral.Consensus,
 	}
 
@@ -137,7 +108,6 @@
 		ClusterDNS:         []net.IP{dnsHostIP},
 		KubeletDirectory:   &s.c.Root.Data.Kubernetes.Kubelet,
 		EphemeralDirectory: &s.c.Root.Ephemeral,
-		Output:             st.kubeletLogs,
 		KPKI:               s.c.KPKI,
 	}
 
@@ -171,8 +141,8 @@
 		runnable supervisor.Runnable
 	}{
 		{"apiserver", apiserver.Run},
-		{"controller-manager", runControllerManager(*controllerManagerConfig, st.controllerManagerLogs)},
-		{"scheduler", runScheduler(*schedulerConfig, st.schedulerLogs)},
+		{"controller-manager", runControllerManager(*controllerManagerConfig)},
+		{"scheduler", runScheduler(*schedulerConfig)},
 		{"kubelet", kubelet.Run},
 		{"reconciler", reconciler.Run(clientSet)},
 		{"csi-plugin", csiPlugin.Run},
@@ -196,28 +166,6 @@
 	return nil
 }
 
-// GetComponentLogs grabs logs from various Kubernetes binaries
-func (s *Service) GetComponentLogs(component string, n int) ([]string, error) {
-	s.stateMu.Lock()
-	defer s.stateMu.Unlock()
-	if s.state == nil {
-		return nil, errors.New("kubernetes not started yet")
-	}
-
-	switch component {
-	case "apiserver":
-		return s.state.apiserverLogs.ReadLinesTruncated(n, "..."), nil
-	case "controller-manager":
-		return s.state.controllerManagerLogs.ReadLinesTruncated(n, "..."), nil
-	case "scheduler":
-		return s.state.schedulerLogs.ReadLinesTruncated(n, "..."), nil
-	case "kubelet":
-		return s.state.kubeletLogs.ReadLinesTruncated(n, "..."), nil
-	default:
-		return nil, errors.New("component not available")
-	}
-}
-
 // GetDebugKubeconfig issues a kubeconfig for an arbitrary given identity. Useful for debugging and testing.
 func (s *Service) GetDebugKubeconfig(ctx context.Context, request *apb.GetDebugKubeconfigRequest) (*apb.GetDebugKubeconfigResponse, error) {
 	ca := s.c.KPKI.Certificates[pki.IdCA]
diff --git a/core/internal/network/dns/BUILD.bazel b/core/internal/network/dns/BUILD.bazel
index efc2727..469fbab 100644
--- a/core/internal/network/dns/BUILD.bazel
+++ b/core/internal/network/dns/BUILD.bazel
@@ -11,6 +11,5 @@
     deps = [
         "//core/internal/common/supervisor:go_default_library",
         "//core/pkg/fileargs:go_default_library",
-        "//core/pkg/logbuffer:go_default_library",
     ],
 )
diff --git a/core/internal/network/dns/coredns.go b/core/internal/network/dns/coredns.go
index 037c7c1..a9fdc0f 100644
--- a/core/internal/network/dns/coredns.go
+++ b/core/internal/network/dns/coredns.go
@@ -28,7 +28,6 @@
 
 	"git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
 	"git.monogon.dev/source/nexantic.git/core/pkg/fileargs"
-	"git.monogon.dev/source/nexantic.git/core/pkg/logbuffer"
 )
 
 const corefileBase = `
@@ -43,7 +42,6 @@
 `
 
 type Service struct {
-	Logs                  *logbuffer.LogBuffer
 	directiveRegistration chan *ExtraDirective
 	directives            map[string]ExtraDirective
 	cmd                   *exec.Cmd
@@ -58,7 +56,6 @@
 // to create a removal message.
 func New(directiveRegistration chan *ExtraDirective) *Service {
 	return &Service{
-		Logs:                  logbuffer.New(5000, 16384),
 		directives:            map[string]ExtraDirective{},
 		directiveRegistration: directiveRegistration,
 	}
@@ -106,7 +103,7 @@
 	defer args.Close()
 	s.args = args
 
-	cmd := exec.CommandContext(ctx, "/kubernetes/bin/coredns",
+	s.cmd = exec.CommandContext(ctx, "/kubernetes/bin/coredns",
 		args.FileOpt("-conf", "Corefile", s.makeCorefile(args)),
 	)
 
@@ -115,26 +112,8 @@
 		return fmt.Errorf("failed to use fileargs: %w", err)
 	}
 
-	cmd.Stdout = s.Logs
-	cmd.Stderr = s.Logs
-
-	s.cmd = cmd
-
-	err = cmd.Start()
-
-	// Release stateMu after the process has attempted to start and is either dead or running
 	s.stateMu.Unlock()
-
-	if err != nil {
-		return fmt.Errorf("failed to start CoreDNS: %w", err)
-	}
-
-	supervisor.Signal(ctx, supervisor.SignalHealthy)
-
-	err = cmd.Wait()
-
-	fmt.Fprintf(s.Logs, "coredns stopped: %v\n", err)
-	return err
+	return supervisor.RunCommand(ctx, s.cmd)
 }
 
 // runRegistration runs the background registration runnable which has a different lifecycle from the CoreDNS