m/node/kubernetes: parse klog output from services

This translates Kubernetes' logging ingo logging that we can
query/filter more easily.

Test Plan: We don't test resulting logs from the system, and I'm not sure we should?

X-Origin-Diff: phab/D716
GitOrigin-RevId: ba3f42b9a4e3172bf058bd7dce4283f50dc8e69d
diff --git a/metropolis/node/kubernetes/apiserver.go b/metropolis/node/kubernetes/apiserver.go
index 704b8df..e3f0d98 100644
--- a/metropolis/node/kubernetes/apiserver.go
+++ b/metropolis/node/kubernetes/apiserver.go
@@ -126,5 +126,5 @@
 	if args.Error() != nil {
 		return err
 	}
-	return supervisor.RunCommand(ctx, cmd)
+	return supervisor.RunCommand(ctx, cmd, supervisor.ParseKLog())
 }
diff --git a/metropolis/node/kubernetes/controller-manager.go b/metropolis/node/kubernetes/controller-manager.go
index 546321c..8298de1 100644
--- a/metropolis/node/kubernetes/controller-manager.go
+++ b/metropolis/node/kubernetes/controller-manager.go
@@ -88,6 +88,6 @@
 		if args.Error() != nil {
 			return fmt.Errorf("failed to use fileargs: %w", err)
 		}
-		return supervisor.RunCommand(ctx, cmd)
+		return supervisor.RunCommand(ctx, cmd, supervisor.ParseKLog())
 	}
 }
diff --git a/metropolis/node/kubernetes/kubelet.go b/metropolis/node/kubernetes/kubelet.go
index 0cacaef..953a201 100644
--- a/metropolis/node/kubernetes/kubelet.go
+++ b/metropolis/node/kubernetes/kubelet.go
@@ -131,5 +131,5 @@
 		fmt.Sprintf("--root-dir=%s", s.KubeletDirectory.FullPath()),
 	)
 	cmd.Env = []string{"PATH=/kubernetes/bin"}
-	return supervisor.RunCommand(ctx, cmd)
+	return supervisor.RunCommand(ctx, cmd, supervisor.ParseKLog())
 }
diff --git a/metropolis/node/kubernetes/scheduler.go b/metropolis/node/kubernetes/scheduler.go
index 15410d6..08ba37f 100644
--- a/metropolis/node/kubernetes/scheduler.go
+++ b/metropolis/node/kubernetes/scheduler.go
@@ -65,6 +65,6 @@
 		if args.Error() != nil {
 			return fmt.Errorf("failed to use fileargs: %w", err)
 		}
-		return supervisor.RunCommand(ctx, cmd)
+		return supervisor.RunCommand(ctx, cmd, supervisor.ParseKLog())
 	}
 }
diff --git a/metropolis/pkg/supervisor/supervisor_support.go b/metropolis/pkg/supervisor/supervisor_support.go
index d54b35c..c2b569c 100644
--- a/metropolis/pkg/supervisor/supervisor_support.go
+++ b/metropolis/pkg/supervisor/supervisor_support.go
@@ -22,6 +22,7 @@
 	"context"
 	"net"
 	"os/exec"
+	"source.monogon.dev/metropolis/pkg/logtree"
 
 	"google.golang.org/grpc"
 )
@@ -52,11 +53,44 @@
 }
 
 // RunCommand will create a Runnable that starts a long-running command, whose exit is determined to be a failure.
-func RunCommand(ctx context.Context, cmd *exec.Cmd) error {
+func RunCommand(ctx context.Context, cmd *exec.Cmd, opts ...RunCommandOption) error {
 	Signal(ctx, SignalHealthy)
-	cmd.Stdout = RawLogger(ctx)
-	cmd.Stderr = RawLogger(ctx)
+
+	var parseKLog bool
+	for _, opt := range opts {
+		if opt.parseKlog {
+			parseKLog = true
+		}
+	}
+
+	if parseKLog {
+		// We make two klogs, one for each of stdout/stderr. This is to prevent
+		// accidental interleaving of both.
+		klogStdout := logtree.KLogParser(Logger(ctx))
+		defer klogStdout.Close()
+		klogStderr := logtree.KLogParser(Logger(ctx))
+		defer klogStderr.Close()
+
+		cmd.Stdout = klogStdout
+		cmd.Stderr = klogStderr
+	} else {
+		cmd.Stdout = RawLogger(ctx)
+		cmd.Stderr = RawLogger(ctx)
+	}
 	err := cmd.Run()
 	Logger(ctx).Infof("Command returned: %v", err)
 	return err
 }
+
+type RunCommandOption struct {
+	parseKlog bool
+}
+
+// ParseKLog signals that the command being run will return klog-compatible logs
+// to stdout and/or stderr, and these will be re-interpreted as structured
+// logging and emitted to the supervisor's logger.
+func ParseKLog() RunCommandOption {
+	return RunCommandOption{
+		parseKlog: true,
+	}
+}