metropolis: implement NodeManagement.Logs
This takes the implementation from the debug service, dusts it off a
bit, adds tests and moves eerything to the new node mgmt service.
Change-Id: Id3b70126a2551775d8328c0c4e424ec0e675f40f
Reviewed-on: https://review.monogon.dev/c/monogon/+/1439
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/node/core/mgmt/BUILD.bazel b/metropolis/node/core/mgmt/BUILD.bazel
index 349f13e..dff5bac 100644
--- a/metropolis/node/core/mgmt/BUILD.bazel
+++ b/metropolis/node/core/mgmt/BUILD.bazel
@@ -1,4 +1,4 @@
-load("@io_bazel_rules_go//go:def.bzl", "go_library")
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "mgmt",
@@ -12,10 +12,28 @@
"//metropolis/node",
"//metropolis/node/core/identity",
"//metropolis/node/core/rpc",
+ "//metropolis/pkg/logtree",
"//metropolis/pkg/supervisor",
"//metropolis/proto/api",
+ "//metropolis/proto/common",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
],
)
+
+go_test(
+ name = "mgmt_test",
+ srcs = ["svc_logs_test.go"],
+ embed = [":mgmt"],
+ deps = [
+ "//metropolis/pkg/logtree",
+ "//metropolis/proto/api",
+ "//metropolis/proto/common",
+ "@com_github_google_go_cmp//cmp",
+ "@org_golang_google_grpc//:go_default_library",
+ "@org_golang_google_grpc//credentials/insecure",
+ "@org_golang_google_grpc//test/bufconn",
+ "@org_golang_google_protobuf//testing/protocmp",
+ ],
+)
diff --git a/metropolis/node/core/mgmt/mgmt.go b/metropolis/node/core/mgmt/mgmt.go
index 5fa12a0..2572764 100644
--- a/metropolis/node/core/mgmt/mgmt.go
+++ b/metropolis/node/core/mgmt/mgmt.go
@@ -12,16 +12,34 @@
"source.monogon.dev/metropolis/node"
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/rpc"
+ "source.monogon.dev/metropolis/pkg/logtree"
"source.monogon.dev/metropolis/pkg/supervisor"
apb "source.monogon.dev/metropolis/proto/api"
)
+// Service implements metropolis.proto.api.NodeManagement.
type Service struct {
+ // NodeCredentials used to set up gRPC server.
NodeCredentials *identity.NodeCredentials
+ // LogTree from which NodeManagement.Logs will be served.
+ LogTree *logtree.LogTree
+
+ // Automatically populated on Run.
+ LogService
}
+// Run the Servie as a supervisor runnable.
func (s *Service) Run(ctx context.Context) error {
+ if s.NodeCredentials == nil {
+ return fmt.Errorf("NodeCredentials missing")
+ }
+ if s.LogTree == nil {
+ return fmt.Errorf("LogTree missing")
+ }
+
+ s.LogService.LogTree = s.LogTree
+
sec := rpc.ServerSecurity{
NodeCredentials: s.NodeCredentials,
}
diff --git a/metropolis/node/core/mgmt/svc_logs.go b/metropolis/node/core/mgmt/svc_logs.go
index 6e05ce9..baf9d9b 100644
--- a/metropolis/node/core/mgmt/svc_logs.go
+++ b/metropolis/node/core/mgmt/svc_logs.go
@@ -4,9 +4,156 @@
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
+ "source.monogon.dev/metropolis/pkg/logtree"
"source.monogon.dev/metropolis/proto/api"
+ cpb "source.monogon.dev/metropolis/proto/common"
)
-func (s *Service) Logs(_ *api.GetLogsRequest, _ api.NodeManagement_LogsServer) error {
- return status.Error(codes.Unimplemented, "unimplemented")
+const (
+ logFilterMax = 10
+)
+
+// LogService implements NodeManagement.Logs. This is split away from the rest of
+// the Service to allow the debug service to reuse this implementation.
+type LogService struct {
+ LogTree *logtree.LogTree
+}
+
+func (s *LogService) Logs(req *api.GetLogsRequest, srv api.NodeManagement_LogsServer) error {
+ if len(req.Filters) > logFilterMax {
+ return status.Errorf(codes.InvalidArgument, "requested %d filters, maximum permitted is %d", len(req.Filters), logFilterMax)
+ }
+ dn := logtree.DN(req.Dn)
+ _, err := dn.Path()
+ switch err {
+ case nil:
+ case logtree.ErrInvalidDN:
+ return status.Errorf(codes.InvalidArgument, "invalid DN")
+ default:
+ return status.Errorf(codes.Unavailable, "could not parse DN: %v", err)
+ }
+
+ var options []logtree.LogReadOption
+
+ // Turn backlog mode into logtree option(s).
+ switch req.BacklogMode {
+ case api.GetLogsRequest_BACKLOG_DISABLE:
+ case api.GetLogsRequest_BACKLOG_ALL:
+ options = append(options, logtree.WithBacklog(logtree.BacklogAllAvailable))
+ case api.GetLogsRequest_BACKLOG_COUNT:
+ count := int(req.BacklogCount)
+ if count <= 0 {
+ return status.Errorf(codes.InvalidArgument, "backlog_count must be > 0 if backlog_mode is BACKLOG_COUNT")
+ }
+ options = append(options, logtree.WithBacklog(count))
+ default:
+ return status.Errorf(codes.InvalidArgument, "unknown backlog_mode %d", req.BacklogMode)
+ }
+
+ // Turn stream mode into logtree option(s).
+ streamEnable := false
+ switch req.StreamMode {
+ case api.GetLogsRequest_STREAM_DISABLE:
+ case api.GetLogsRequest_STREAM_UNBUFFERED:
+ streamEnable = true
+ options = append(options, logtree.WithStream())
+ }
+
+ // Parse proto filters into logtree options.
+ for i, filter := range req.Filters {
+ switch inner := filter.Filter.(type) {
+ case *cpb.LogFilter_WithChildren_:
+ options = append(options, logtree.WithChildren())
+ case *cpb.LogFilter_OnlyRaw_:
+ options = append(options, logtree.OnlyRaw())
+ case *cpb.LogFilter_OnlyLeveled_:
+ options = append(options, logtree.OnlyLeveled())
+ case *cpb.LogFilter_LeveledWithMinimumSeverity_:
+ severity, err := logtree.SeverityFromProto(inner.LeveledWithMinimumSeverity.Minimum)
+ if err != nil {
+ return status.Errorf(codes.InvalidArgument, "filter %d has invalid severity: %v", i, err)
+ }
+ options = append(options, logtree.LeveledWithMinimumSeverity(severity))
+ }
+ }
+
+ reader, err := s.LogTree.Read(logtree.DN(req.Dn), options...)
+ switch err {
+ case nil:
+ case logtree.ErrRawAndLeveled:
+ return status.Errorf(codes.InvalidArgument, "requested only raw and only leveled logs simultaneously")
+ default:
+ return status.Errorf(codes.Unavailable, "could not retrieve logs: %v", err)
+ }
+ defer reader.Close()
+
+ // Default protobuf message size limit is 64MB. We want to limit ourselves
+ // to 10MB.
+ // Currently each raw log line can be at most 1024 unicode codepoints (or
+ // 4096 bytes). To cover extra metadata and proto overhead, let's round
+ // this up to 4500 bytes. This in turn means we can store a maximum of
+ // (10e6/4500) == 2222 entries.
+ // Currently each leveled log line can also be at most 1024 unicode
+ // codepoints (or 4096 bytes). To cover extra metadata and proto overhead
+ // let's round this up to 2000 bytes. This in turn means we can store a
+ // maximum of (10e6/5000) == 2000 entries.
+ // The lowever of these numbers, ie the worst case scenario, is 2000
+ // maximum entries.
+ maxChunkSize := 2000
+
+ // Serve all backlog entries in chunks.
+ chunk := make([]*cpb.LogEntry, 0, maxChunkSize)
+ for _, entry := range reader.Backlog {
+ p := entry.Proto()
+ if p == nil {
+ // TODO(q3k): log this once we have logtree/gRPC compatibility.
+ continue
+ }
+ chunk = append(chunk, p)
+
+ if len(chunk) >= maxChunkSize {
+ err := srv.Send(&api.GetLogsResponse{
+ BacklogEntries: chunk,
+ })
+ if err != nil {
+ return err
+ }
+ chunk = make([]*cpb.LogEntry, 0, maxChunkSize)
+ }
+ }
+
+ // Send last chunk of backlog, if present..
+ if len(chunk) > 0 {
+ err := srv.Send(&api.GetLogsResponse{
+ BacklogEntries: chunk,
+ })
+ if err != nil {
+ return err
+ }
+ chunk = make([]*cpb.LogEntry, 0, maxChunkSize)
+ }
+
+ // Start serving streaming data, if streaming has been requested.
+ if !streamEnable {
+ return nil
+ }
+
+ for {
+ entry, ok := <-reader.Stream
+ if !ok {
+ // Streaming has been ended by logtree - tell the client and return.
+ return status.Error(codes.Unavailable, "log streaming aborted by system")
+ }
+ p := entry.Proto()
+ if p == nil {
+ // TODO(q3k): log this once we have logtree/gRPC compatibility.
+ continue
+ }
+ err := srv.Send(&api.GetLogsResponse{
+ StreamEntries: []*cpb.LogEntry{p},
+ })
+ if err != nil {
+ return err
+ }
+ }
}
diff --git a/metropolis/node/core/mgmt/svc_logs_test.go b/metropolis/node/core/mgmt/svc_logs_test.go
new file mode 100644
index 0000000..d5825ad
--- /dev/null
+++ b/metropolis/node/core/mgmt/svc_logs_test.go
@@ -0,0 +1,367 @@
+package mgmt
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "io"
+ "net"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/google/go-cmp/cmp"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+ "google.golang.org/grpc/test/bufconn"
+ "google.golang.org/protobuf/testing/protocmp"
+
+ "source.monogon.dev/metropolis/pkg/logtree"
+ "source.monogon.dev/metropolis/proto/api"
+ cpb "source.monogon.dev/metropolis/proto/common"
+)
+
+func dut(t *testing.T) (*Service, *grpc.ClientConn) {
+ lt := logtree.New()
+ s := &Service{
+ LogTree: lt,
+ LogService: LogService{
+ LogTree: lt,
+ },
+ }
+
+ srv := grpc.NewServer()
+ api.RegisterNodeManagementServer(srv, s)
+ externalLis := bufconn.Listen(1024 * 1024)
+ go func() {
+ if err := srv.Serve(externalLis); err != nil {
+ t.Fatalf("GRPC serve failed: %v", err)
+ }
+ }()
+ withLocalDialer := grpc.WithContextDialer(func(_ context.Context, _ string) (net.Conn, error) {
+ return externalLis.Dial()
+ })
+ cl, err := grpc.Dial("local", withLocalDialer, grpc.WithTransportCredentials(insecure.NewCredentials()))
+ if err != nil {
+ t.Fatalf("Dialing GRPC failed: %v", err)
+ }
+
+ return s, cl
+}
+
+func cleanLogEntry(e *cpb.LogEntry) {
+ // Filter out bits that change too much to test them.
+ switch k := e.Kind.(type) {
+ case *cpb.LogEntry_Leveled_:
+ k.Leveled.Location = ""
+ k.Leveled.Timestamp = nil
+ }
+}
+
+func mkRawEntry(dn string, line string) *cpb.LogEntry {
+ return &cpb.LogEntry{
+ Dn: dn, Kind: &cpb.LogEntry_Raw_{
+ Raw: &cpb.LogEntry_Raw{
+ Data: line,
+ OriginalLength: int64(len(line)),
+ },
+ },
+ }
+}
+
+func mkLeveledEntry(dn string, severity string, lines string) *cpb.LogEntry {
+ var sev cpb.LeveledLogSeverity
+ switch severity {
+ case "i":
+ sev = cpb.LeveledLogSeverity_INFO
+ case "w":
+ sev = cpb.LeveledLogSeverity_WARNING
+ case "e":
+ sev = cpb.LeveledLogSeverity_ERROR
+ }
+ return &cpb.LogEntry{
+ Dn: dn, Kind: &cpb.LogEntry_Leveled_{
+ Leveled: &cpb.LogEntry_Leveled{
+ Lines: strings.Split(lines, "\n"),
+ Severity: sev,
+ },
+ },
+ }
+}
+
+func drainLogs(t *testing.T, srv api.NodeManagement_LogsClient) (res []*cpb.LogEntry) {
+ t.Helper()
+ for {
+ ev, err := srv.Recv()
+ if errors.Is(err, io.EOF) {
+ return
+ }
+ if err != nil {
+ t.Errorf("Recv: %v", err)
+ return
+ }
+ for _, e := range ev.BacklogEntries {
+ res = append(res, e)
+ }
+ }
+}
+
+// TestLogService_Logs_Backlog exercises the basic log API by requesting
+// backlogged leveled log entries.
+func TestLogService_Logs_Backlog(t *testing.T) {
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+
+ s, cl := dut(t)
+
+ mgmt := api.NewNodeManagementClient(cl)
+
+ s.LogTree.MustLeveledFor("init").Infof("Hello")
+ s.LogTree.MustLeveledFor("main").Infof("Starting roleserver...")
+ s.LogTree.MustLeveledFor("main.roleserver").Infof("Waiting for node roles...")
+ s.LogTree.MustLeveledFor("main.roleserver.kubernetes").Infof("Starting kubernetes...")
+ s.LogTree.MustLeveledFor("main.roleserver.controlplane").Infof("Starting control plane...")
+ s.LogTree.MustLeveledFor("main.roleserver.kubernetes").Infof("Kubernetes version: 1.21.37")
+ s.LogTree.MustLeveledFor("main.roleserver.controlplane").Infof("Starting etcd...")
+
+ mkReq := func(dn string, backlog int64) *api.GetLogsRequest {
+ var backlogMode api.GetLogsRequest_BacklogMode
+ var backlogCount int64
+ switch {
+ case backlog < 0:
+ backlogMode = api.GetLogsRequest_BACKLOG_ALL
+ case backlog > 0:
+ backlogMode = api.GetLogsRequest_BACKLOG_COUNT
+ backlogCount = backlog
+ case backlog == 0:
+ backlogMode = api.GetLogsRequest_BACKLOG_DISABLE
+ }
+ return &api.GetLogsRequest{
+ Dn: dn,
+ BacklogMode: backlogMode,
+ BacklogCount: backlogCount,
+ StreamMode: api.GetLogsRequest_STREAM_DISABLE,
+ }
+ }
+ mkRecursive := func(in *api.GetLogsRequest) *api.GetLogsRequest {
+ in.Filters = append(in.Filters, &cpb.LogFilter{
+ Filter: &cpb.LogFilter_WithChildren_{
+ WithChildren: &cpb.LogFilter_WithChildren{},
+ },
+ })
+ return in
+ }
+ for i, te := range []struct {
+ req *api.GetLogsRequest
+ want []*cpb.LogEntry
+ }{
+ {
+ // Test all backlog.
+ req: mkReq("main.roleserver.kubernetes", -1),
+ want: []*cpb.LogEntry{
+ mkLeveledEntry("main.roleserver.kubernetes", "i", "Starting kubernetes..."),
+ mkLeveledEntry("main.roleserver.kubernetes", "i", "Kubernetes version: 1.21.37"),
+ },
+ },
+ {
+ // Test exact backlog.
+ req: mkReq("main.roleserver.kubernetes", 1),
+ want: []*cpb.LogEntry{
+ mkLeveledEntry("main.roleserver.kubernetes", "i", "Kubernetes version: 1.21.37"),
+ },
+ },
+ {
+ // Test no backlog.
+ req: mkReq("main.roleserver.kubernetes", 0),
+ want: nil,
+ },
+
+ {
+ // Test recursion with backlog.
+ req: mkRecursive(mkReq("main.roleserver", 2)),
+ want: []*cpb.LogEntry{
+ mkLeveledEntry("main.roleserver.kubernetes", "i", "Kubernetes version: 1.21.37"),
+ mkLeveledEntry("main.roleserver.controlplane", "i", "Starting etcd..."),
+ },
+ },
+ } {
+ srv, err := mgmt.Logs(ctx, te.req)
+ if err != nil {
+ t.Errorf("Case %d: Logs failed: %v", i, err)
+ continue
+ }
+ logs := drainLogs(t, srv)
+ for _, e := range logs {
+ cleanLogEntry(e)
+ }
+ diff := cmp.Diff(te.want, logs, protocmp.Transform())
+ if diff != "" {
+ t.Errorf("Case %d: diff: \n%s", i, diff)
+ }
+ }
+}
+
+// TestLogService_Logs_Strea, exercises the basic log API by requesting
+// streaming leveled log entries.
+func TestLogService_Logs_Stream(t *testing.T) {
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+
+ s, cl := dut(t)
+
+ // Start streaming all logs.
+ mgmt := api.NewNodeManagementClient(cl)
+ srv, err := mgmt.Logs(ctx, &api.GetLogsRequest{
+ Dn: "",
+ BacklogMode: api.GetLogsRequest_BACKLOG_ALL,
+ StreamMode: api.GetLogsRequest_STREAM_UNBUFFERED,
+ Filters: []*cpb.LogFilter{
+ {
+ Filter: &cpb.LogFilter_WithChildren_{
+ WithChildren: &cpb.LogFilter_WithChildren{},
+ },
+ },
+ },
+ })
+ if err != nil {
+ t.Fatalf("Logs failed: %v", err)
+ }
+
+ // Pipe returned logs into a channel for analysis.
+ logC := make(chan *cpb.LogEntry)
+ go func() {
+ for {
+ ev, err := srv.Recv()
+ if err != nil {
+ return
+ }
+ for _, e := range ev.BacklogEntries {
+ logC <- e
+ }
+ for _, e := range ev.StreamEntries {
+ logC <- e
+ }
+ }
+ }()
+
+ // Submit log entry, expect it on the channel.
+ s.LogTree.MustLeveledFor("test").Infof("Hello, world")
+ select {
+ case e := <-logC:
+ cleanLogEntry(e)
+ if diff := cmp.Diff(mkLeveledEntry("test", "i", "Hello, world"), e, protocmp.Transform()); diff != "" {
+ t.Errorf("Diff:\n%s", diff)
+ }
+ case <-time.After(time.Second * 2):
+ t.Errorf("Timeout")
+ }
+
+ // That could've made it through the backlog. Do it again to make sure it came
+ // through streaming.
+ s.LogTree.MustLeveledFor("test").Infof("Hello again, world")
+ select {
+ case e := <-logC:
+ cleanLogEntry(e)
+ if diff := cmp.Diff(mkLeveledEntry("test", "i", "Hello again, world"), e, protocmp.Transform()); diff != "" {
+ t.Errorf("Diff:\n%s", diff)
+ }
+ case <-time.After(time.Second * 2):
+ t.Errorf("Timeout")
+ }
+}
+
+// TestLogService_Logs_Filters exercises the rest of the filter functionality.
+func TestLogService_Logs_Filters(t *testing.T) {
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+
+ s, cl := dut(t)
+
+ mgmt := api.NewNodeManagementClient(cl)
+ s.LogTree.MustLeveledFor("main").Infof("Hello")
+ s.LogTree.MustLeveledFor("main").Infof("Starting...")
+ s.LogTree.MustLeveledFor("main").Warningf("Something failed!")
+ fmt.Fprintln(s.LogTree.MustRawFor("main"), "medium rare")
+ s.LogTree.MustLeveledFor("main").Errorf("Something failed very hard!")
+
+ for i, te := range []struct {
+ req *api.GetLogsRequest
+ want []*cpb.LogEntry
+ }{
+ // Case 0: request given level
+ {
+ req: &api.GetLogsRequest{
+ Dn: "main",
+ BacklogMode: api.GetLogsRequest_BACKLOG_ALL,
+ StreamMode: api.GetLogsRequest_STREAM_DISABLE,
+ Filters: []*cpb.LogFilter{
+ {
+ Filter: &cpb.LogFilter_LeveledWithMinimumSeverity_{
+ LeveledWithMinimumSeverity: &cpb.LogFilter_LeveledWithMinimumSeverity{
+ Minimum: cpb.LeveledLogSeverity_WARNING,
+ },
+ },
+ },
+ },
+ },
+ want: []*cpb.LogEntry{
+ mkLeveledEntry("main", "w", "Something failed!"),
+ mkLeveledEntry("main", "e", "Something failed very hard!"),
+ },
+ },
+ // Case 1: request raw only
+ {
+ req: &api.GetLogsRequest{
+ Dn: "main",
+ BacklogMode: api.GetLogsRequest_BACKLOG_ALL,
+ StreamMode: api.GetLogsRequest_STREAM_DISABLE,
+ Filters: []*cpb.LogFilter{
+ {
+ Filter: &cpb.LogFilter_OnlyRaw_{
+ OnlyRaw: &cpb.LogFilter_OnlyRaw{},
+ },
+ },
+ },
+ },
+ want: []*cpb.LogEntry{
+ mkRawEntry("main", "medium rare"),
+ },
+ },
+ // Case 2: request leveled only
+ {
+ req: &api.GetLogsRequest{
+ Dn: "main",
+ BacklogMode: api.GetLogsRequest_BACKLOG_ALL,
+ StreamMode: api.GetLogsRequest_STREAM_DISABLE,
+ Filters: []*cpb.LogFilter{
+ {
+ Filter: &cpb.LogFilter_OnlyLeveled_{
+ OnlyLeveled: &cpb.LogFilter_OnlyLeveled{},
+ },
+ },
+ },
+ },
+ want: []*cpb.LogEntry{
+ mkLeveledEntry("main", "i", "Hello"),
+ mkLeveledEntry("main", "i", "Starting..."),
+ mkLeveledEntry("main", "w", "Something failed!"),
+ mkLeveledEntry("main", "e", "Something failed very hard!"),
+ },
+ },
+ } {
+ srv, err := mgmt.Logs(ctx, te.req)
+ if err != nil {
+ t.Errorf("Case %d: Logs failed: %v", i, err)
+ continue
+ }
+ logs := drainLogs(t, srv)
+ for _, e := range logs {
+ cleanLogEntry(e)
+ }
+ diff := cmp.Diff(te.want, logs, protocmp.Transform())
+ if diff != "" {
+ t.Errorf("Case %d: diff: \n%s", i, diff)
+ }
+ }
+
+}