core: replace logbuffer with logtree
Test Plan: Component logs are currently untested?
X-Origin-Diff: phab/D643
GitOrigin-RevId: 44ace0a1937aee9ba6a49db6e862907ec24d6ea3
diff --git a/core/cmd/init/debug_service.go b/core/cmd/init/debug_service.go
index ef940d9..faa0135 100644
--- a/core/cmd/init/debug_service.go
+++ b/core/cmd/init/debug_service.go
@@ -18,7 +18,6 @@
import (
"context"
- "math"
"git.monogon.dev/source/nexantic.git/core/internal/cluster"
"git.monogon.dev/source/nexantic.git/core/internal/containerd"
@@ -45,32 +44,5 @@
// GetComponentLogs gets various logbuffers from binaries we call. This function just deals with the first path component,
// delegating the rest to the service-specific handlers.
func (s *debugService) GetComponentLogs(ctx context.Context, req *apb.GetComponentLogsRequest) (*apb.GetComponentLogsResponse, error) {
- if len(req.ComponentPath) < 1 {
- return nil, status.Error(codes.InvalidArgument, "component_path needs to contain at least one part")
- }
- linesToRead := int(req.TailLines)
- if linesToRead == 0 {
- linesToRead = math.MaxInt32
- }
- var lines []string
- var err error
- switch req.ComponentPath[0] {
- case "containerd":
- if len(req.ComponentPath) < 2 {
- lines = s.containerd.Log.ReadLinesTruncated(linesToRead, "...")
- } else if req.ComponentPath[1] == "runsc" {
- lines = s.containerd.RunscLog.ReadLinesTruncated(linesToRead, "...")
- }
- case "kube":
- if len(req.ComponentPath) < 2 {
- return nil, status.Error(codes.NotFound, "Component not found")
- }
- lines, err = s.kubernetes.GetComponentLogs(req.ComponentPath[1], linesToRead)
- if err != nil {
- return nil, status.Error(codes.NotFound, "Component not found")
- }
- default:
- return nil, status.Error(codes.NotFound, "component not found")
- }
- return &apb.GetComponentLogsResponse{Line: lines}, nil
+ return nil, status.Error(codes.Unimplemented, "unimplemented")
}
diff --git a/core/internal/common/supervisor/supervisor_support.go b/core/internal/common/supervisor/supervisor_support.go
index f11afcc..d54b35c 100644
--- a/core/internal/common/supervisor/supervisor_support.go
+++ b/core/internal/common/supervisor/supervisor_support.go
@@ -51,12 +51,12 @@
}
}
-// Command will create a Runnable that starts a long-running command, whose exit is determined to be a failure.
-func Command(name string, arg ...string) Runnable {
- return func(ctx context.Context) error {
- Signal(ctx, SignalHealthy)
-
- cmd := exec.CommandContext(ctx, name, arg...)
- return cmd.Run()
- }
+// RunCommand will create a Runnable that starts a long-running command, whose exit is determined to be a failure.
+func RunCommand(ctx context.Context, cmd *exec.Cmd) error {
+ Signal(ctx, SignalHealthy)
+ cmd.Stdout = RawLogger(ctx)
+ cmd.Stderr = RawLogger(ctx)
+ err := cmd.Run()
+ Logger(ctx).Infof("Command returned: %v", err)
+ return err
}
diff --git a/core/internal/containerd/BUILD.bazel b/core/internal/containerd/BUILD.bazel
index 5c74a63..0e2de21 100644
--- a/core/internal/containerd/BUILD.bazel
+++ b/core/internal/containerd/BUILD.bazel
@@ -8,7 +8,6 @@
deps = [
"//core/internal/common/supervisor:go_default_library",
"//core/internal/localstorage:go_default_library",
- "//core/pkg/logbuffer:go_default_library",
"@com_github_containerd_containerd//:go_default_library",
"@com_github_containerd_containerd//namespaces:go_default_library",
],
diff --git a/core/internal/containerd/main.go b/core/internal/containerd/main.go
index 192039e..78fee97 100644
--- a/core/internal/containerd/main.go
+++ b/core/internal/containerd/main.go
@@ -32,7 +32,6 @@
"git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
"git.monogon.dev/source/nexantic.git/core/internal/localstorage"
- "git.monogon.dev/source/nexantic.git/core/pkg/logbuffer"
)
const (
@@ -41,32 +40,41 @@
type Service struct {
EphemeralVolume *localstorage.EphemeralContainerdDirectory
- Log *logbuffer.LogBuffer
- RunscLog *logbuffer.LogBuffer
}
func (s *Service) Run(ctx context.Context) error {
- if s.Log == nil {
- s.Log = logbuffer.New(5000, 16384)
- }
- if s.RunscLog == nil {
- s.RunscLog = logbuffer.New(5000, 16384)
- }
-
- logger := supervisor.Logger(ctx)
-
cmd := exec.CommandContext(ctx, "/containerd/bin/containerd", "--config", "/containerd/conf/config.toml")
- cmd.Stdout = s.Log
- cmd.Stderr = s.Log
cmd.Env = []string{"PATH=/containerd/bin", "TMPDIR=" + s.EphemeralVolume.Tmp.FullPath()}
runscFifo, err := os.OpenFile(s.EphemeralVolume.RunSCLogsFIFO.FullPath(), os.O_CREATE|os.O_RDONLY, os.ModeNamedPipe|0777)
if err != nil {
return err
}
- go func() {
+
+ if err := supervisor.Run(ctx, "runsc", s.logPump(runscFifo)); err != nil {
+ return fmt.Errorf("failed to start runsc log pump: %w", err)
+ }
+
+ if err := supervisor.Run(ctx, "preseed", s.runPreseed); err != nil {
+ return fmt.Errorf("failed to start preseed runnable: %w", err)
+ }
+ return supervisor.RunCommand(ctx, cmd)
+}
+
+// logPump returns a runnable that pipes data from a file/FIFO into its raw logger.
+// TODO(q3k): refactor this out to a generic function in supervisor or logtree.
+func (s *Service) logPump(fifo *os.File) supervisor.Runnable {
+ return func(ctx context.Context) error {
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
for {
- n, err := io.Copy(s.RunscLog, runscFifo)
+ // Quit if requested.
+ select {
+ case <-ctx.Done():
+ return ctx.Err()
+ default:
+ }
+
+ n, err := io.Copy(supervisor.RawLogger(ctx), fifo)
if n == 0 && err == nil {
// Hack because pipes/FIFOs can return zero reads when nobody is writing. To avoid busy-looping,
// sleep a bit before retrying. This does not loose data since the FIFO internal buffer will
@@ -74,20 +82,10 @@
// debug logs) is not an issue for us.
time.Sleep(10 * time.Millisecond)
} else if err != nil {
- logger.Errorf("gVisor log pump failed, stopping it: %v", err)
- return // It's likely that this will busy-loop printing errors if it encounters one, so bail
+ return fmt.Errorf("log pump failed: %v", err)
}
}
- }()
-
- if err := supervisor.Run(ctx, "preseed", s.runPreseed); err != nil {
- return fmt.Errorf("failed to start preseed runnable: %w", err)
}
- supervisor.Signal(ctx, supervisor.SignalHealthy)
-
- err = cmd.Run()
- fmt.Fprintf(s.Log, "containerd stopped: %v\n", err)
- return err
}
// runPreseed loads OCI bundles in tar form from preseedNamespacesDir into containerd at startup.
diff --git a/core/internal/kubernetes/BUILD.bazel b/core/internal/kubernetes/BUILD.bazel
index bef0eb7..d68d1e8 100644
--- a/core/internal/kubernetes/BUILD.bazel
+++ b/core/internal/kubernetes/BUILD.bazel
@@ -25,7 +25,6 @@
"//core/internal/network/dns:go_default_library",
"//core/pkg/fileargs:go_default_library",
"//core/pkg/fsquota:go_default_library",
- "//core/pkg/logbuffer:go_default_library",
"//core/pkg/logtree:go_default_library",
"//core/proto/api:go_default_library",
"@com_github_container_storage_interface_spec//lib/go/csi:go_default_library",
diff --git a/core/internal/kubernetes/controller-manager.go b/core/internal/kubernetes/controller-manager.go
index 126076e..690a553 100644
--- a/core/internal/kubernetes/controller-manager.go
+++ b/core/internal/kubernetes/controller-manager.go
@@ -20,7 +20,6 @@
"context"
"encoding/pem"
"fmt"
- "io"
"net"
"os/exec"
@@ -61,7 +60,7 @@
return &config, nil
}
-func runControllerManager(config controllerManagerConfig, output io.Writer) supervisor.Runnable {
+func runControllerManager(config controllerManagerConfig) supervisor.Runnable {
return func(ctx context.Context) error {
args, err := fileargs.New()
if err != nil {
@@ -89,11 +88,6 @@
if args.Error() != nil {
return fmt.Errorf("failed to use fileargs: %w", err)
}
- cmd.Stdout = output
- cmd.Stderr = output
- supervisor.Signal(ctx, supervisor.SignalHealthy)
- err = cmd.Run()
- fmt.Fprintf(output, "controller-manager stopped: %v\n", err)
- return err
+ return supervisor.RunCommand(ctx, cmd)
}
}
diff --git a/core/internal/kubernetes/kubelet.go b/core/internal/kubernetes/kubelet.go
index d45a238..7a80f60 100644
--- a/core/internal/kubernetes/kubelet.go
+++ b/core/internal/kubernetes/kubelet.go
@@ -141,11 +141,5 @@
fmt.Sprintf("--root-dir=%s", s.KubeletDirectory.FullPath()),
)
cmd.Env = []string{"PATH=/kubernetes/bin"}
- cmd.Stdout = s.Output
- cmd.Stderr = s.Output
-
- supervisor.Signal(ctx, supervisor.SignalHealthy)
- err = cmd.Run()
- fmt.Fprintf(s.Output, "kubelet stopped: %v\n", err)
- return err
+ return supervisor.RunCommand(ctx, cmd)
}
diff --git a/core/internal/kubernetes/scheduler.go b/core/internal/kubernetes/scheduler.go
index 5a91134..26f8bb1 100644
--- a/core/internal/kubernetes/scheduler.go
+++ b/core/internal/kubernetes/scheduler.go
@@ -20,7 +20,6 @@
"context"
"encoding/pem"
"fmt"
- "io"
"os/exec"
"git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
@@ -48,7 +47,7 @@
return &config, nil
}
-func runScheduler(config schedulerConfig, output io.Writer) supervisor.Runnable {
+func runScheduler(config schedulerConfig) supervisor.Runnable {
return func(ctx context.Context) error {
args, err := fileargs.New()
if err != nil {
@@ -66,11 +65,6 @@
if args.Error() != nil {
return fmt.Errorf("failed to use fileargs: %w", err)
}
- cmd.Stdout = output
- cmd.Stderr = output
- supervisor.Signal(ctx, supervisor.SignalHealthy)
- err = cmd.Run()
- fmt.Fprintf(output, "scheduler stopped: %v\n", err)
- return err
+ return supervisor.RunCommand(ctx, cmd)
}
}
diff --git a/core/internal/kubernetes/service.go b/core/internal/kubernetes/service.go
index e4391f5..0075470 100644
--- a/core/internal/kubernetes/service.go
+++ b/core/internal/kubernetes/service.go
@@ -18,11 +18,9 @@
import (
"context"
- "errors"
"fmt"
"net"
"os"
- "sync"
"time"
"google.golang.org/grpc/codes"
@@ -38,7 +36,6 @@
"git.monogon.dev/source/nexantic.git/core/internal/kubernetes/reconciler"
"git.monogon.dev/source/nexantic.git/core/internal/localstorage"
"git.monogon.dev/source/nexantic.git/core/internal/network/dns"
- "git.monogon.dev/source/nexantic.git/core/pkg/logbuffer"
apb "git.monogon.dev/source/nexantic.git/core/proto/api"
)
@@ -52,17 +49,8 @@
CorednsRegistrationChan chan *dns.ExtraDirective
}
-type state struct {
- apiserverLogs *logbuffer.LogBuffer
- controllerManagerLogs *logbuffer.LogBuffer
- schedulerLogs *logbuffer.LogBuffer
- kubeletLogs *logbuffer.LogBuffer
-}
-
type Service struct {
- c Config
- stateMu sync.Mutex
- state *state
+ c Config
}
func New(c Config) *Service {
@@ -72,23 +60,7 @@
return s
}
-func (s *Service) getState() *state {
- s.stateMu.Lock()
- defer s.stateMu.Unlock()
- return s.state
-}
-
func (s *Service) Run(ctx context.Context) error {
- st := &state{
- apiserverLogs: logbuffer.New(5000, 16384),
- controllerManagerLogs: logbuffer.New(5000, 16384),
- schedulerLogs: logbuffer.New(5000, 16384),
- kubeletLogs: logbuffer.New(5000, 16384),
- }
- s.stateMu.Lock()
- s.state = st
- s.stateMu.Unlock()
-
controllerManagerConfig, err := getPKIControllerManagerConfig(ctx, s.c.KPKI)
if err != nil {
return fmt.Errorf("could not generate controller manager pki config: %w", err)
@@ -128,7 +100,6 @@
KPKI: s.c.KPKI,
AdvertiseAddress: s.c.AdvertiseAddress,
ServiceIPRange: s.c.ServiceIPRange,
- Output: st.apiserverLogs,
EphemeralConsensusDirectory: &s.c.Root.Ephemeral.Consensus,
}
@@ -137,7 +108,6 @@
ClusterDNS: []net.IP{dnsHostIP},
KubeletDirectory: &s.c.Root.Data.Kubernetes.Kubelet,
EphemeralDirectory: &s.c.Root.Ephemeral,
- Output: st.kubeletLogs,
KPKI: s.c.KPKI,
}
@@ -171,8 +141,8 @@
runnable supervisor.Runnable
}{
{"apiserver", apiserver.Run},
- {"controller-manager", runControllerManager(*controllerManagerConfig, st.controllerManagerLogs)},
- {"scheduler", runScheduler(*schedulerConfig, st.schedulerLogs)},
+ {"controller-manager", runControllerManager(*controllerManagerConfig)},
+ {"scheduler", runScheduler(*schedulerConfig)},
{"kubelet", kubelet.Run},
{"reconciler", reconciler.Run(clientSet)},
{"csi-plugin", csiPlugin.Run},
@@ -196,28 +166,6 @@
return nil
}
-// GetComponentLogs grabs logs from various Kubernetes binaries
-func (s *Service) GetComponentLogs(component string, n int) ([]string, error) {
- s.stateMu.Lock()
- defer s.stateMu.Unlock()
- if s.state == nil {
- return nil, errors.New("kubernetes not started yet")
- }
-
- switch component {
- case "apiserver":
- return s.state.apiserverLogs.ReadLinesTruncated(n, "..."), nil
- case "controller-manager":
- return s.state.controllerManagerLogs.ReadLinesTruncated(n, "..."), nil
- case "scheduler":
- return s.state.schedulerLogs.ReadLinesTruncated(n, "..."), nil
- case "kubelet":
- return s.state.kubeletLogs.ReadLinesTruncated(n, "..."), nil
- default:
- return nil, errors.New("component not available")
- }
-}
-
// GetDebugKubeconfig issues a kubeconfig for an arbitrary given identity. Useful for debugging and testing.
func (s *Service) GetDebugKubeconfig(ctx context.Context, request *apb.GetDebugKubeconfigRequest) (*apb.GetDebugKubeconfigResponse, error) {
ca := s.c.KPKI.Certificates[pki.IdCA]
diff --git a/core/internal/network/dns/BUILD.bazel b/core/internal/network/dns/BUILD.bazel
index efc2727..469fbab 100644
--- a/core/internal/network/dns/BUILD.bazel
+++ b/core/internal/network/dns/BUILD.bazel
@@ -11,6 +11,5 @@
deps = [
"//core/internal/common/supervisor:go_default_library",
"//core/pkg/fileargs:go_default_library",
- "//core/pkg/logbuffer:go_default_library",
],
)
diff --git a/core/internal/network/dns/coredns.go b/core/internal/network/dns/coredns.go
index 037c7c1..a9fdc0f 100644
--- a/core/internal/network/dns/coredns.go
+++ b/core/internal/network/dns/coredns.go
@@ -28,7 +28,6 @@
"git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
"git.monogon.dev/source/nexantic.git/core/pkg/fileargs"
- "git.monogon.dev/source/nexantic.git/core/pkg/logbuffer"
)
const corefileBase = `
@@ -43,7 +42,6 @@
`
type Service struct {
- Logs *logbuffer.LogBuffer
directiveRegistration chan *ExtraDirective
directives map[string]ExtraDirective
cmd *exec.Cmd
@@ -58,7 +56,6 @@
// to create a removal message.
func New(directiveRegistration chan *ExtraDirective) *Service {
return &Service{
- Logs: logbuffer.New(5000, 16384),
directives: map[string]ExtraDirective{},
directiveRegistration: directiveRegistration,
}
@@ -106,7 +103,7 @@
defer args.Close()
s.args = args
- cmd := exec.CommandContext(ctx, "/kubernetes/bin/coredns",
+ s.cmd = exec.CommandContext(ctx, "/kubernetes/bin/coredns",
args.FileOpt("-conf", "Corefile", s.makeCorefile(args)),
)
@@ -115,26 +112,8 @@
return fmt.Errorf("failed to use fileargs: %w", err)
}
- cmd.Stdout = s.Logs
- cmd.Stderr = s.Logs
-
- s.cmd = cmd
-
- err = cmd.Start()
-
- // Release stateMu after the process has attempted to start and is either dead or running
s.stateMu.Unlock()
-
- if err != nil {
- return fmt.Errorf("failed to start CoreDNS: %w", err)
- }
-
- supervisor.Signal(ctx, supervisor.SignalHealthy)
-
- err = cmd.Wait()
-
- fmt.Fprintf(s.Logs, "coredns stopped: %v\n", err)
- return err
+ return supervisor.RunCommand(ctx, s.cmd)
}
// runRegistration runs the background registration runnable which has a different lifecycle from the CoreDNS