metropolis: implement NodeManagement.Logs

This takes the implementation from the debug service, dusts it off a
bit, adds tests and moves eerything to the new node mgmt service.

Change-Id: Id3b70126a2551775d8328c0c4e424ec0e675f40f
Reviewed-on: https://review.monogon.dev/c/monogon/+/1439
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/cli/metroctl/cmd_node.go b/metropolis/cli/metroctl/cmd_node.go
index 0f841ef..97198d8 100644
--- a/metropolis/cli/metroctl/cmd_node.go
+++ b/metropolis/cli/metroctl/cmd_node.go
@@ -60,7 +60,6 @@
 func init() {
 	nodeCmd.AddCommand(nodeDescribeCmd)
 	nodeCmd.AddCommand(nodeListCmd)
-	nodeCmd.AddCommand(nodeLogsCmd)
 	rootCmd.AddCommand(nodeCmd)
 }
 
diff --git a/metropolis/cli/metroctl/cmd_node_logs.go b/metropolis/cli/metroctl/cmd_node_logs.go
index beae6ab..a04558c 100644
--- a/metropolis/cli/metroctl/cmd_node_logs.go
+++ b/metropolis/cli/metroctl/cmd_node_logs.go
@@ -11,13 +11,51 @@
 	"source.monogon.dev/metropolis/cli/metroctl/core"
 	"source.monogon.dev/metropolis/pkg/logtree"
 	"source.monogon.dev/metropolis/proto/api"
+
 	cpb "source.monogon.dev/metropolis/proto/common"
 )
 
+type metroctlLogFlags struct {
+	// follow (ie. stream) logs live.
+	follow bool
+	// dn to query.
+	dn string
+	// exact dn query, i.e. without children/recursion.
+	exact bool
+	// concise logging output format.
+	concise bool
+	// backlog: >0 for a concrete limit, -1 for all, 0 for none
+	backlog int
+}
+
+var logFlags metroctlLogFlags
+
 var nodeLogsCmd = &cobra.Command{
 	Short: "Get/stream logs from node",
-	Use:   "logs [node-id]",
-	Args:  cobra.MinimumNArgs(1),
+	Long: `Get or stream logs from node.
+
+Node logs are structured in a 'log tree' structure, in which different subsystems
+log to DNs (distinguished names). For example, service 'foo' might log to
+root.role.foo, while service 'bar' might log to root.role.bar.
+
+To set the DN you want to request logs from, use --dn. The default is to return
+all logs. The default output is also also a good starting point to figure out
+what DNs are active in the system.
+
+When requesting logs for a DN by default all sub-DNs will also be returned (ie.
+with the above example, when requesting DN 'root.role' logs at root.role.foo and
+root.role.bar would also be returned). This behaviour can be disabled by setting
+--exact.
+
+To stream logs, use --follow.
+
+By default, all available logs are returned. To limit the number of historical
+log lines (a.k.a. 'backlog') to return, set --backlog. This similar to requesting
+all lines and then piping the result through 'tail' - but more efficient, as no
+unnecessary lines are fetched.
+`,
+	Use:  "logs [node-id]",
+	Args: cobra.MinimumNArgs(1),
 	RunE: func(cmd *cobra.Command, args []string) error {
 		ctx := cmd.Context()
 
@@ -51,22 +89,39 @@
 			return fmt.Errorf("remote CA certificate invalid: %w", err)
 		}
 
-		fmt.Printf("Getting logs from %s (%s)...\n", n.Id, n.Status.ExternalAddress)
+		fmt.Printf("=== Logs from %s (%s):\n", n.Id, n.Status.ExternalAddress)
 		// Dial the actual node at its management port.
 		cl := dialAuthenticatedNode(ctx, n.Id, n.Status.ExternalAddress, cacert)
 		nmgmt := api.NewNodeManagementClient(cl)
 
-		srv, err := nmgmt.Logs(ctx, &api.GetLogsRequest{
-			Dn:          "",
-			BacklogMode: api.GetLogsRequest_BACKLOG_ALL,
-			StreamMode:  api.GetLogsRequest_STREAM_DISABLE,
-			Filters: []*cpb.LogFilter{
-				{
-					Filter: &cpb.LogFilter_WithChildren_{
-						WithChildren: &cpb.LogFilter_WithChildren{},
-					},
+		streamMode := api.GetLogsRequest_STREAM_DISABLE
+		if logFlags.follow {
+			streamMode = api.GetLogsRequest_STREAM_UNBUFFERED
+		}
+		var filters []*cpb.LogFilter
+		if !logFlags.exact {
+			filters = append(filters, &cpb.LogFilter{
+				Filter: &cpb.LogFilter_WithChildren_{
+					WithChildren: &cpb.LogFilter_WithChildren{},
 				},
-			},
+			})
+		}
+		backlogMode := api.GetLogsRequest_BACKLOG_ALL
+		var backlogCount int64
+		switch {
+		case logFlags.backlog > 0:
+			backlogMode = api.GetLogsRequest_BACKLOG_COUNT
+			backlogCount = int64(logFlags.backlog)
+		case logFlags.backlog == 0:
+			backlogMode = api.GetLogsRequest_BACKLOG_DISABLE
+		}
+
+		srv, err := nmgmt.Logs(ctx, &api.GetLogsRequest{
+			Dn:           logFlags.dn,
+			BacklogMode:  backlogMode,
+			BacklogCount: backlogCount,
+			StreamMode:   streamMode,
+			Filters:      filters,
 		})
 		if err != nil {
 			return fmt.Errorf("failed to get logs: %w", err)
@@ -74,22 +129,42 @@
 		for {
 			res, err := srv.Recv()
 			if errors.Is(err, io.EOF) {
-				fmt.Println("Done.")
+				fmt.Println("=== Done.")
 				break
 			}
 			if err != nil {
 				return fmt.Errorf("log stream failed: %w", err)
 			}
 			for _, entry := range res.BacklogEntries {
-				entry, err := logtree.LogEntryFromProto(entry)
-				if err != nil {
-					fmt.Printf("invalid entry: %v\n", err)
-					continue
-				}
-				fmt.Println(entry.String())
+				printEntry(entry)
+			}
+			for _, entry := range res.StreamEntries {
+				printEntry(entry)
 			}
 		}
 
 		return nil
 	},
 }
+
+func printEntry(e *cpb.LogEntry) {
+	entry, err := logtree.LogEntryFromProto(e)
+	if err != nil {
+		fmt.Printf("invalid stream entry: %v\n", err)
+		return
+	}
+	if logFlags.concise {
+		fmt.Println(entry.ConciseString(logtree.MetropolisShortenDict, 0))
+	} else {
+		fmt.Println(entry.String())
+	}
+}
+
+func init() {
+	nodeLogsCmd.Flags().BoolVarP(&logFlags.follow, "follow", "f", false, "Continue streaming logs after fetching backlog.")
+	nodeLogsCmd.Flags().StringVar(&logFlags.dn, "dn", "", "Distinguished Name to get logs from (and children, if --exact is not set). If not set, defaults to '', which is the top-level DN.")
+	nodeLogsCmd.Flags().BoolVarP(&logFlags.exact, "exact", "e", false, "Only show logs for exactly the DN, do not recurse down the tree.")
+	nodeLogsCmd.Flags().BoolVarP(&logFlags.concise, "concise", "c", false, "Output concise logs.")
+	nodeLogsCmd.Flags().IntVar(&logFlags.backlog, "backlog", -1, "How many lines of historical log data to return. The default (-1) returns all available lines. Zero value means no backlog is returned (useful when using --follow).")
+	nodeCmd.AddCommand(nodeLogsCmd)
+}
diff --git a/metropolis/cli/metroctl/main.go b/metropolis/cli/metroctl/main.go
index 19b378d..a4ea438 100644
--- a/metropolis/cli/metroctl/main.go
+++ b/metropolis/cli/metroctl/main.go
@@ -44,7 +44,7 @@
 	rootCmd.PersistentFlags().StringVar(&flags.proxyAddr, "proxy", "", "SOCKS5 proxy address")
 	rootCmd.PersistentFlags().StringVar(&flags.configPath, "config", filepath.Join(xdg.ConfigHome, "metroctl"), "An alternative cluster config path")
 	rootCmd.PersistentFlags().BoolVar(&flags.verbose, "verbose", false, "Log additional runtime information")
-	rootCmd.PersistentFlags().StringVarP(&flags.format, "format", "f", "plaintext", "Data output format")
+	rootCmd.PersistentFlags().StringVar(&flags.format, "format", "plaintext", "Data output format")
 	rootCmd.PersistentFlags().StringVar(&flags.filter, "filter", "", "The object filter applied to the output data")
 	rootCmd.PersistentFlags().StringVarP(&flags.output, "output", "o", "", "Redirects output to the specified file")
 }
diff --git a/metropolis/cli/metroctl/test/test.go b/metropolis/cli/metroctl/test/test.go
index 5468834..872f7b3 100644
--- a/metropolis/cli/metroctl/test/test.go
+++ b/metropolis/cli/metroctl/test/test.go
@@ -265,6 +265,20 @@
 			return nil
 		})
 	})
+	t.Run("logs [nodeID]", func(t *testing.T) {
+		util.TestEventual(t, "metroctl logs [nodeID]", ctx, 10*time.Second, func(ctx context.Context) error {
+			var args []string
+			args = append(args, commonOpts...)
+			args = append(args, endpointOpts...)
+			args = append(args, "node", "logs", cl.NodeIDs[1])
+
+			if err := mctlFailIfMissing(t, ctx, args, "Cluster enrolment done."); err != nil {
+				return err
+			}
+
+			return nil
+		})
+	})
 	t.Run("set/unset role", func(t *testing.T) {
 		util.TestEventual(t, "metroctl set/unset role KubernetesController", ctx, 10*time.Second, func(ctx context.Context) error {
 			nid := cl.NodeIDs[1]
diff --git a/metropolis/node/core/BUILD.bazel b/metropolis/node/core/BUILD.bazel
index 281266b..07f4f6e 100644
--- a/metropolis/node/core/BUILD.bazel
+++ b/metropolis/node/core/BUILD.bazel
@@ -25,6 +25,7 @@
         "//metropolis/node/core/cluster",
         "//metropolis/node/core/localstorage",
         "//metropolis/node/core/localstorage/declarative",
+        "//metropolis/node/core/mgmt",
         "//metropolis/node/core/network",
         "//metropolis/node/core/network/hostsfile",
         "//metropolis/node/core/roleserve",
@@ -35,7 +36,6 @@
         "//metropolis/pkg/supervisor",
         "//metropolis/pkg/tpm",
         "//metropolis/proto/api",
-        "//metropolis/proto/common",
         "@com_github_containerd_containerd//:containerd",
         "@com_github_containerd_containerd//namespaces",
         "@org_golang_google_grpc//:go_default_library",
diff --git a/metropolis/node/core/debug_service_enabled.go b/metropolis/node/core/debug_service_enabled.go
index 3494270..75e92bc 100644
--- a/metropolis/node/core/debug_service_enabled.go
+++ b/metropolis/node/core/debug_service_enabled.go
@@ -31,13 +31,14 @@
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
 
-	common "source.monogon.dev/metropolis/node"
 	"source.monogon.dev/metropolis/node/core/localstorage"
+	"source.monogon.dev/metropolis/node/core/mgmt"
 	"source.monogon.dev/metropolis/node/core/roleserve"
 	"source.monogon.dev/metropolis/pkg/logtree"
 	"source.monogon.dev/metropolis/pkg/supervisor"
+
+	common "source.monogon.dev/metropolis/node"
 	apb "source.monogon.dev/metropolis/proto/api"
-	cpb "source.monogon.dev/metropolis/proto/common"
 )
 
 const (
@@ -98,142 +99,10 @@
 }
 
 func (s *debugService) GetLogs(req *apb.GetLogsRequest, srv apb.NodeDebugService_GetLogsServer) error {
-	if len(req.Filters) > logFilterMax {
-		return status.Errorf(codes.InvalidArgument, "requested %d filters, maximum permitted is %d", len(req.Filters), logFilterMax)
+	svc := mgmt.LogService{
+		LogTree: s.logtree,
 	}
-	dn := logtree.DN(req.Dn)
-	_, err := dn.Path()
-	switch err {
-	case nil:
-	case logtree.ErrInvalidDN:
-		return status.Errorf(codes.InvalidArgument, "invalid DN")
-	default:
-		return status.Errorf(codes.Unavailable, "could not parse DN: %v", err)
-	}
-
-	var options []logtree.LogReadOption
-
-	// Turn backlog mode into logtree option(s).
-	switch req.BacklogMode {
-	case apb.GetLogsRequest_BACKLOG_DISABLE:
-	case apb.GetLogsRequest_BACKLOG_ALL:
-		options = append(options, logtree.WithBacklog(logtree.BacklogAllAvailable))
-	case apb.GetLogsRequest_BACKLOG_COUNT:
-		count := int(req.BacklogCount)
-		if count <= 0 {
-			return status.Errorf(codes.InvalidArgument, "backlog_count must be > 0 if backlog_mode is BACKLOG_COUNT")
-		}
-		options = append(options, logtree.WithBacklog(count))
-	default:
-		return status.Errorf(codes.InvalidArgument, "unknown backlog_mode %d", req.BacklogMode)
-	}
-
-	// Turn stream mode into logtree option(s).
-	streamEnable := false
-	switch req.StreamMode {
-	case apb.GetLogsRequest_STREAM_DISABLE:
-	case apb.GetLogsRequest_STREAM_UNBUFFERED:
-		streamEnable = true
-		options = append(options, logtree.WithStream())
-	}
-
-	// Parse proto filters into logtree options.
-	for i, filter := range req.Filters {
-		switch inner := filter.Filter.(type) {
-		case *cpb.LogFilter_WithChildren_:
-			options = append(options, logtree.WithChildren())
-		case *cpb.LogFilter_OnlyRaw_:
-			options = append(options, logtree.OnlyRaw())
-		case *cpb.LogFilter_OnlyLeveled_:
-			options = append(options, logtree.OnlyLeveled())
-		case *cpb.LogFilter_LeveledWithMinimumSeverity_:
-			severity, err := logtree.SeverityFromProto(inner.LeveledWithMinimumSeverity.Minimum)
-			if err != nil {
-				return status.Errorf(codes.InvalidArgument, "filter %d has invalid severity: %v", i, err)
-			}
-			options = append(options, logtree.LeveledWithMinimumSeverity(severity))
-		}
-	}
-
-	reader, err := s.logtree.Read(logtree.DN(req.Dn), options...)
-	switch err {
-	case nil:
-	case logtree.ErrRawAndLeveled:
-		return status.Errorf(codes.InvalidArgument, "requested only raw and only leveled logs simultaneously")
-	default:
-		return status.Errorf(codes.Unavailable, "could not retrieve logs: %v", err)
-	}
-	defer reader.Close()
-
-	// Default protobuf message size limit is 64MB. We want to limit ourselves
-	// to 10MB.
-	// Currently each raw log line can be at most 1024 unicode codepoints (or
-	// 4096 bytes). To cover extra metadata and proto overhead, let's round
-	// this up to 4500 bytes. This in turn means we can store a maximum of
-	// (10e6/4500) == 2222 entries.
-	// Currently each leveled log line can also be at most 1024 unicode
-	// codepoints (or 4096 bytes). To cover extra metadata and proto overhead
-	// let's round this up to 2000 bytes. This in turn means we can store a
-	// maximum of (10e6/5000) == 2000 entries.
-	// The lowever of these numbers, ie the worst case scenario, is 2000
-	// maximum entries.
-	maxChunkSize := 2000
-
-	// Serve all backlog entries in chunks.
-	chunk := make([]*cpb.LogEntry, 0, maxChunkSize)
-	for _, entry := range reader.Backlog {
-		p := entry.Proto()
-		if p == nil {
-			// TODO(q3k): log this once we have logtree/gRPC compatibility.
-			continue
-		}
-		chunk = append(chunk, p)
-
-		if len(chunk) >= maxChunkSize {
-			err := srv.Send(&apb.GetLogsResponse{
-				BacklogEntries: chunk,
-			})
-			if err != nil {
-				return err
-			}
-			chunk = make([]*cpb.LogEntry, 0, maxChunkSize)
-		}
-	}
-
-	// Send last chunk of backlog, if present..
-	if len(chunk) > 0 {
-		err := srv.Send(&apb.GetLogsResponse{
-			BacklogEntries: chunk,
-		})
-		if err != nil {
-			return err
-		}
-		chunk = make([]*cpb.LogEntry, 0, maxChunkSize)
-	}
-
-	// Start serving streaming data, if streaming has been requested.
-	if !streamEnable {
-		return nil
-	}
-
-	for {
-		entry, ok := <-reader.Stream
-		if !ok {
-			// Streaming has been ended by logtree - tell the client and return.
-			return status.Error(codes.Unavailable, "log streaming aborted by system")
-		}
-		p := entry.Proto()
-		if p == nil {
-			// TODO(q3k): log this once we have logtree/gRPC compatibility.
-			continue
-		}
-		err := srv.Send(&apb.GetLogsResponse{
-			StreamEntries: []*cpb.LogEntry{p},
-		})
-		if err != nil {
-			return err
-		}
-	}
+	return svc.Logs(req, srv)
 }
 
 // Validate property names as they are used in path construction and we really
diff --git a/metropolis/node/core/main.go b/metropolis/node/core/main.go
index 8496e47..0538478 100644
--- a/metropolis/node/core/main.go
+++ b/metropolis/node/core/main.go
@@ -154,6 +154,7 @@
 			StorageRoot: root,
 			Network:     networkSvc,
 			Resolver:    res,
+			LogTree:     lt,
 		})
 		if err := supervisor.Run(ctx, "role", rs.Run); err != nil {
 			return fmt.Errorf("failed to start role service: %w", err)
diff --git a/metropolis/node/core/mgmt/BUILD.bazel b/metropolis/node/core/mgmt/BUILD.bazel
index 349f13e..dff5bac 100644
--- a/metropolis/node/core/mgmt/BUILD.bazel
+++ b/metropolis/node/core/mgmt/BUILD.bazel
@@ -1,4 +1,4 @@
-load("@io_bazel_rules_go//go:def.bzl", "go_library")
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
 
 go_library(
     name = "mgmt",
@@ -12,10 +12,28 @@
         "//metropolis/node",
         "//metropolis/node/core/identity",
         "//metropolis/node/core/rpc",
+        "//metropolis/pkg/logtree",
         "//metropolis/pkg/supervisor",
         "//metropolis/proto/api",
+        "//metropolis/proto/common",
         "@org_golang_google_grpc//:go_default_library",
         "@org_golang_google_grpc//codes",
         "@org_golang_google_grpc//status",
     ],
 )
+
+go_test(
+    name = "mgmt_test",
+    srcs = ["svc_logs_test.go"],
+    embed = [":mgmt"],
+    deps = [
+        "//metropolis/pkg/logtree",
+        "//metropolis/proto/api",
+        "//metropolis/proto/common",
+        "@com_github_google_go_cmp//cmp",
+        "@org_golang_google_grpc//:go_default_library",
+        "@org_golang_google_grpc//credentials/insecure",
+        "@org_golang_google_grpc//test/bufconn",
+        "@org_golang_google_protobuf//testing/protocmp",
+    ],
+)
diff --git a/metropolis/node/core/mgmt/mgmt.go b/metropolis/node/core/mgmt/mgmt.go
index 5fa12a0..2572764 100644
--- a/metropolis/node/core/mgmt/mgmt.go
+++ b/metropolis/node/core/mgmt/mgmt.go
@@ -12,16 +12,34 @@
 	"source.monogon.dev/metropolis/node"
 	"source.monogon.dev/metropolis/node/core/identity"
 	"source.monogon.dev/metropolis/node/core/rpc"
+	"source.monogon.dev/metropolis/pkg/logtree"
 	"source.monogon.dev/metropolis/pkg/supervisor"
 
 	apb "source.monogon.dev/metropolis/proto/api"
 )
 
+// Service implements metropolis.proto.api.NodeManagement.
 type Service struct {
+	// NodeCredentials used to set up gRPC server.
 	NodeCredentials *identity.NodeCredentials
+	// LogTree from which NodeManagement.Logs will be served.
+	LogTree *logtree.LogTree
+
+	// Automatically populated on Run.
+	LogService
 }
 
+// Run the Servie as a supervisor runnable.
 func (s *Service) Run(ctx context.Context) error {
+	if s.NodeCredentials == nil {
+		return fmt.Errorf("NodeCredentials missing")
+	}
+	if s.LogTree == nil {
+		return fmt.Errorf("LogTree missing")
+	}
+
+	s.LogService.LogTree = s.LogTree
+
 	sec := rpc.ServerSecurity{
 		NodeCredentials: s.NodeCredentials,
 	}
diff --git a/metropolis/node/core/mgmt/svc_logs.go b/metropolis/node/core/mgmt/svc_logs.go
index 6e05ce9..baf9d9b 100644
--- a/metropolis/node/core/mgmt/svc_logs.go
+++ b/metropolis/node/core/mgmt/svc_logs.go
@@ -4,9 +4,156 @@
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
 
+	"source.monogon.dev/metropolis/pkg/logtree"
 	"source.monogon.dev/metropolis/proto/api"
+	cpb "source.monogon.dev/metropolis/proto/common"
 )
 
-func (s *Service) Logs(_ *api.GetLogsRequest, _ api.NodeManagement_LogsServer) error {
-	return status.Error(codes.Unimplemented, "unimplemented")
+const (
+	logFilterMax = 10
+)
+
+// LogService implements NodeManagement.Logs. This is split away from the rest of
+// the Service to allow the debug service to reuse this implementation.
+type LogService struct {
+	LogTree *logtree.LogTree
+}
+
+func (s *LogService) Logs(req *api.GetLogsRequest, srv api.NodeManagement_LogsServer) error {
+	if len(req.Filters) > logFilterMax {
+		return status.Errorf(codes.InvalidArgument, "requested %d filters, maximum permitted is %d", len(req.Filters), logFilterMax)
+	}
+	dn := logtree.DN(req.Dn)
+	_, err := dn.Path()
+	switch err {
+	case nil:
+	case logtree.ErrInvalidDN:
+		return status.Errorf(codes.InvalidArgument, "invalid DN")
+	default:
+		return status.Errorf(codes.Unavailable, "could not parse DN: %v", err)
+	}
+
+	var options []logtree.LogReadOption
+
+	// Turn backlog mode into logtree option(s).
+	switch req.BacklogMode {
+	case api.GetLogsRequest_BACKLOG_DISABLE:
+	case api.GetLogsRequest_BACKLOG_ALL:
+		options = append(options, logtree.WithBacklog(logtree.BacklogAllAvailable))
+	case api.GetLogsRequest_BACKLOG_COUNT:
+		count := int(req.BacklogCount)
+		if count <= 0 {
+			return status.Errorf(codes.InvalidArgument, "backlog_count must be > 0 if backlog_mode is BACKLOG_COUNT")
+		}
+		options = append(options, logtree.WithBacklog(count))
+	default:
+		return status.Errorf(codes.InvalidArgument, "unknown backlog_mode %d", req.BacklogMode)
+	}
+
+	// Turn stream mode into logtree option(s).
+	streamEnable := false
+	switch req.StreamMode {
+	case api.GetLogsRequest_STREAM_DISABLE:
+	case api.GetLogsRequest_STREAM_UNBUFFERED:
+		streamEnable = true
+		options = append(options, logtree.WithStream())
+	}
+
+	// Parse proto filters into logtree options.
+	for i, filter := range req.Filters {
+		switch inner := filter.Filter.(type) {
+		case *cpb.LogFilter_WithChildren_:
+			options = append(options, logtree.WithChildren())
+		case *cpb.LogFilter_OnlyRaw_:
+			options = append(options, logtree.OnlyRaw())
+		case *cpb.LogFilter_OnlyLeveled_:
+			options = append(options, logtree.OnlyLeveled())
+		case *cpb.LogFilter_LeveledWithMinimumSeverity_:
+			severity, err := logtree.SeverityFromProto(inner.LeveledWithMinimumSeverity.Minimum)
+			if err != nil {
+				return status.Errorf(codes.InvalidArgument, "filter %d has invalid severity: %v", i, err)
+			}
+			options = append(options, logtree.LeveledWithMinimumSeverity(severity))
+		}
+	}
+
+	reader, err := s.LogTree.Read(logtree.DN(req.Dn), options...)
+	switch err {
+	case nil:
+	case logtree.ErrRawAndLeveled:
+		return status.Errorf(codes.InvalidArgument, "requested only raw and only leveled logs simultaneously")
+	default:
+		return status.Errorf(codes.Unavailable, "could not retrieve logs: %v", err)
+	}
+	defer reader.Close()
+
+	// Default protobuf message size limit is 64MB. We want to limit ourselves
+	// to 10MB.
+	// Currently each raw log line can be at most 1024 unicode codepoints (or
+	// 4096 bytes). To cover extra metadata and proto overhead, let's round
+	// this up to 4500 bytes. This in turn means we can store a maximum of
+	// (10e6/4500) == 2222 entries.
+	// Currently each leveled log line can also be at most 1024 unicode
+	// codepoints (or 4096 bytes). To cover extra metadata and proto overhead
+	// let's round this up to 2000 bytes. This in turn means we can store a
+	// maximum of (10e6/5000) == 2000 entries.
+	// The lowever of these numbers, ie the worst case scenario, is 2000
+	// maximum entries.
+	maxChunkSize := 2000
+
+	// Serve all backlog entries in chunks.
+	chunk := make([]*cpb.LogEntry, 0, maxChunkSize)
+	for _, entry := range reader.Backlog {
+		p := entry.Proto()
+		if p == nil {
+			// TODO(q3k): log this once we have logtree/gRPC compatibility.
+			continue
+		}
+		chunk = append(chunk, p)
+
+		if len(chunk) >= maxChunkSize {
+			err := srv.Send(&api.GetLogsResponse{
+				BacklogEntries: chunk,
+			})
+			if err != nil {
+				return err
+			}
+			chunk = make([]*cpb.LogEntry, 0, maxChunkSize)
+		}
+	}
+
+	// Send last chunk of backlog, if present..
+	if len(chunk) > 0 {
+		err := srv.Send(&api.GetLogsResponse{
+			BacklogEntries: chunk,
+		})
+		if err != nil {
+			return err
+		}
+		chunk = make([]*cpb.LogEntry, 0, maxChunkSize)
+	}
+
+	// Start serving streaming data, if streaming has been requested.
+	if !streamEnable {
+		return nil
+	}
+
+	for {
+		entry, ok := <-reader.Stream
+		if !ok {
+			// Streaming has been ended by logtree - tell the client and return.
+			return status.Error(codes.Unavailable, "log streaming aborted by system")
+		}
+		p := entry.Proto()
+		if p == nil {
+			// TODO(q3k): log this once we have logtree/gRPC compatibility.
+			continue
+		}
+		err := srv.Send(&api.GetLogsResponse{
+			StreamEntries: []*cpb.LogEntry{p},
+		})
+		if err != nil {
+			return err
+		}
+	}
 }
diff --git a/metropolis/node/core/mgmt/svc_logs_test.go b/metropolis/node/core/mgmt/svc_logs_test.go
new file mode 100644
index 0000000..d5825ad
--- /dev/null
+++ b/metropolis/node/core/mgmt/svc_logs_test.go
@@ -0,0 +1,367 @@
+package mgmt
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"io"
+	"net"
+	"strings"
+	"testing"
+	"time"
+
+	"github.com/google/go-cmp/cmp"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/credentials/insecure"
+	"google.golang.org/grpc/test/bufconn"
+	"google.golang.org/protobuf/testing/protocmp"
+
+	"source.monogon.dev/metropolis/pkg/logtree"
+	"source.monogon.dev/metropolis/proto/api"
+	cpb "source.monogon.dev/metropolis/proto/common"
+)
+
+func dut(t *testing.T) (*Service, *grpc.ClientConn) {
+	lt := logtree.New()
+	s := &Service{
+		LogTree: lt,
+		LogService: LogService{
+			LogTree: lt,
+		},
+	}
+
+	srv := grpc.NewServer()
+	api.RegisterNodeManagementServer(srv, s)
+	externalLis := bufconn.Listen(1024 * 1024)
+	go func() {
+		if err := srv.Serve(externalLis); err != nil {
+			t.Fatalf("GRPC serve failed: %v", err)
+		}
+	}()
+	withLocalDialer := grpc.WithContextDialer(func(_ context.Context, _ string) (net.Conn, error) {
+		return externalLis.Dial()
+	})
+	cl, err := grpc.Dial("local", withLocalDialer, grpc.WithTransportCredentials(insecure.NewCredentials()))
+	if err != nil {
+		t.Fatalf("Dialing GRPC failed: %v", err)
+	}
+
+	return s, cl
+}
+
+func cleanLogEntry(e *cpb.LogEntry) {
+	// Filter out bits that change too much to test them.
+	switch k := e.Kind.(type) {
+	case *cpb.LogEntry_Leveled_:
+		k.Leveled.Location = ""
+		k.Leveled.Timestamp = nil
+	}
+}
+
+func mkRawEntry(dn string, line string) *cpb.LogEntry {
+	return &cpb.LogEntry{
+		Dn: dn, Kind: &cpb.LogEntry_Raw_{
+			Raw: &cpb.LogEntry_Raw{
+				Data:           line,
+				OriginalLength: int64(len(line)),
+			},
+		},
+	}
+}
+
+func mkLeveledEntry(dn string, severity string, lines string) *cpb.LogEntry {
+	var sev cpb.LeveledLogSeverity
+	switch severity {
+	case "i":
+		sev = cpb.LeveledLogSeverity_INFO
+	case "w":
+		sev = cpb.LeveledLogSeverity_WARNING
+	case "e":
+		sev = cpb.LeveledLogSeverity_ERROR
+	}
+	return &cpb.LogEntry{
+		Dn: dn, Kind: &cpb.LogEntry_Leveled_{
+			Leveled: &cpb.LogEntry_Leveled{
+				Lines:    strings.Split(lines, "\n"),
+				Severity: sev,
+			},
+		},
+	}
+}
+
+func drainLogs(t *testing.T, srv api.NodeManagement_LogsClient) (res []*cpb.LogEntry) {
+	t.Helper()
+	for {
+		ev, err := srv.Recv()
+		if errors.Is(err, io.EOF) {
+			return
+		}
+		if err != nil {
+			t.Errorf("Recv: %v", err)
+			return
+		}
+		for _, e := range ev.BacklogEntries {
+			res = append(res, e)
+		}
+	}
+}
+
+// TestLogService_Logs_Backlog exercises the basic log API by requesting
+// backlogged leveled log entries.
+func TestLogService_Logs_Backlog(t *testing.T) {
+	ctx, ctxC := context.WithCancel(context.Background())
+	defer ctxC()
+
+	s, cl := dut(t)
+
+	mgmt := api.NewNodeManagementClient(cl)
+
+	s.LogTree.MustLeveledFor("init").Infof("Hello")
+	s.LogTree.MustLeveledFor("main").Infof("Starting roleserver...")
+	s.LogTree.MustLeveledFor("main.roleserver").Infof("Waiting for node roles...")
+	s.LogTree.MustLeveledFor("main.roleserver.kubernetes").Infof("Starting kubernetes...")
+	s.LogTree.MustLeveledFor("main.roleserver.controlplane").Infof("Starting control plane...")
+	s.LogTree.MustLeveledFor("main.roleserver.kubernetes").Infof("Kubernetes version: 1.21.37")
+	s.LogTree.MustLeveledFor("main.roleserver.controlplane").Infof("Starting etcd...")
+
+	mkReq := func(dn string, backlog int64) *api.GetLogsRequest {
+		var backlogMode api.GetLogsRequest_BacklogMode
+		var backlogCount int64
+		switch {
+		case backlog < 0:
+			backlogMode = api.GetLogsRequest_BACKLOG_ALL
+		case backlog > 0:
+			backlogMode = api.GetLogsRequest_BACKLOG_COUNT
+			backlogCount = backlog
+		case backlog == 0:
+			backlogMode = api.GetLogsRequest_BACKLOG_DISABLE
+		}
+		return &api.GetLogsRequest{
+			Dn:           dn,
+			BacklogMode:  backlogMode,
+			BacklogCount: backlogCount,
+			StreamMode:   api.GetLogsRequest_STREAM_DISABLE,
+		}
+	}
+	mkRecursive := func(in *api.GetLogsRequest) *api.GetLogsRequest {
+		in.Filters = append(in.Filters, &cpb.LogFilter{
+			Filter: &cpb.LogFilter_WithChildren_{
+				WithChildren: &cpb.LogFilter_WithChildren{},
+			},
+		})
+		return in
+	}
+	for i, te := range []struct {
+		req  *api.GetLogsRequest
+		want []*cpb.LogEntry
+	}{
+		{
+			// Test all backlog.
+			req: mkReq("main.roleserver.kubernetes", -1),
+			want: []*cpb.LogEntry{
+				mkLeveledEntry("main.roleserver.kubernetes", "i", "Starting kubernetes..."),
+				mkLeveledEntry("main.roleserver.kubernetes", "i", "Kubernetes version: 1.21.37"),
+			},
+		},
+		{
+			// Test exact backlog.
+			req: mkReq("main.roleserver.kubernetes", 1),
+			want: []*cpb.LogEntry{
+				mkLeveledEntry("main.roleserver.kubernetes", "i", "Kubernetes version: 1.21.37"),
+			},
+		},
+		{
+			// Test no backlog.
+			req:  mkReq("main.roleserver.kubernetes", 0),
+			want: nil,
+		},
+
+		{
+			// Test recursion with backlog.
+			req: mkRecursive(mkReq("main.roleserver", 2)),
+			want: []*cpb.LogEntry{
+				mkLeveledEntry("main.roleserver.kubernetes", "i", "Kubernetes version: 1.21.37"),
+				mkLeveledEntry("main.roleserver.controlplane", "i", "Starting etcd..."),
+			},
+		},
+	} {
+		srv, err := mgmt.Logs(ctx, te.req)
+		if err != nil {
+			t.Errorf("Case %d: Logs failed: %v", i, err)
+			continue
+		}
+		logs := drainLogs(t, srv)
+		for _, e := range logs {
+			cleanLogEntry(e)
+		}
+		diff := cmp.Diff(te.want, logs, protocmp.Transform())
+		if diff != "" {
+			t.Errorf("Case %d: diff: \n%s", i, diff)
+		}
+	}
+}
+
+// TestLogService_Logs_Strea, exercises the basic log API by requesting
+// streaming leveled log entries.
+func TestLogService_Logs_Stream(t *testing.T) {
+	ctx, ctxC := context.WithCancel(context.Background())
+	defer ctxC()
+
+	s, cl := dut(t)
+
+	// Start streaming all logs.
+	mgmt := api.NewNodeManagementClient(cl)
+	srv, err := mgmt.Logs(ctx, &api.GetLogsRequest{
+		Dn:          "",
+		BacklogMode: api.GetLogsRequest_BACKLOG_ALL,
+		StreamMode:  api.GetLogsRequest_STREAM_UNBUFFERED,
+		Filters: []*cpb.LogFilter{
+			{
+				Filter: &cpb.LogFilter_WithChildren_{
+					WithChildren: &cpb.LogFilter_WithChildren{},
+				},
+			},
+		},
+	})
+	if err != nil {
+		t.Fatalf("Logs failed: %v", err)
+	}
+
+	// Pipe returned logs into a channel for analysis.
+	logC := make(chan *cpb.LogEntry)
+	go func() {
+		for {
+			ev, err := srv.Recv()
+			if err != nil {
+				return
+			}
+			for _, e := range ev.BacklogEntries {
+				logC <- e
+			}
+			for _, e := range ev.StreamEntries {
+				logC <- e
+			}
+		}
+	}()
+
+	// Submit log entry, expect it on the channel.
+	s.LogTree.MustLeveledFor("test").Infof("Hello, world")
+	select {
+	case e := <-logC:
+		cleanLogEntry(e)
+		if diff := cmp.Diff(mkLeveledEntry("test", "i", "Hello, world"), e, protocmp.Transform()); diff != "" {
+			t.Errorf("Diff:\n%s", diff)
+		}
+	case <-time.After(time.Second * 2):
+		t.Errorf("Timeout")
+	}
+
+	// That could've made it through the backlog. Do it again to make sure it came
+	// through streaming.
+	s.LogTree.MustLeveledFor("test").Infof("Hello again, world")
+	select {
+	case e := <-logC:
+		cleanLogEntry(e)
+		if diff := cmp.Diff(mkLeveledEntry("test", "i", "Hello again, world"), e, protocmp.Transform()); diff != "" {
+			t.Errorf("Diff:\n%s", diff)
+		}
+	case <-time.After(time.Second * 2):
+		t.Errorf("Timeout")
+	}
+}
+
+// TestLogService_Logs_Filters exercises the rest of the filter functionality.
+func TestLogService_Logs_Filters(t *testing.T) {
+	ctx, ctxC := context.WithCancel(context.Background())
+	defer ctxC()
+
+	s, cl := dut(t)
+
+	mgmt := api.NewNodeManagementClient(cl)
+	s.LogTree.MustLeveledFor("main").Infof("Hello")
+	s.LogTree.MustLeveledFor("main").Infof("Starting...")
+	s.LogTree.MustLeveledFor("main").Warningf("Something failed!")
+	fmt.Fprintln(s.LogTree.MustRawFor("main"), "medium rare")
+	s.LogTree.MustLeveledFor("main").Errorf("Something failed very hard!")
+
+	for i, te := range []struct {
+		req  *api.GetLogsRequest
+		want []*cpb.LogEntry
+	}{
+		// Case 0: request given level
+		{
+			req: &api.GetLogsRequest{
+				Dn:          "main",
+				BacklogMode: api.GetLogsRequest_BACKLOG_ALL,
+				StreamMode:  api.GetLogsRequest_STREAM_DISABLE,
+				Filters: []*cpb.LogFilter{
+					{
+						Filter: &cpb.LogFilter_LeveledWithMinimumSeverity_{
+							LeveledWithMinimumSeverity: &cpb.LogFilter_LeveledWithMinimumSeverity{
+								Minimum: cpb.LeveledLogSeverity_WARNING,
+							},
+						},
+					},
+				},
+			},
+			want: []*cpb.LogEntry{
+				mkLeveledEntry("main", "w", "Something failed!"),
+				mkLeveledEntry("main", "e", "Something failed very hard!"),
+			},
+		},
+		// Case 1: request raw only
+		{
+			req: &api.GetLogsRequest{
+				Dn:          "main",
+				BacklogMode: api.GetLogsRequest_BACKLOG_ALL,
+				StreamMode:  api.GetLogsRequest_STREAM_DISABLE,
+				Filters: []*cpb.LogFilter{
+					{
+						Filter: &cpb.LogFilter_OnlyRaw_{
+							OnlyRaw: &cpb.LogFilter_OnlyRaw{},
+						},
+					},
+				},
+			},
+			want: []*cpb.LogEntry{
+				mkRawEntry("main", "medium rare"),
+			},
+		},
+		// Case 2: request leveled only
+		{
+			req: &api.GetLogsRequest{
+				Dn:          "main",
+				BacklogMode: api.GetLogsRequest_BACKLOG_ALL,
+				StreamMode:  api.GetLogsRequest_STREAM_DISABLE,
+				Filters: []*cpb.LogFilter{
+					{
+						Filter: &cpb.LogFilter_OnlyLeveled_{
+							OnlyLeveled: &cpb.LogFilter_OnlyLeveled{},
+						},
+					},
+				},
+			},
+			want: []*cpb.LogEntry{
+				mkLeveledEntry("main", "i", "Hello"),
+				mkLeveledEntry("main", "i", "Starting..."),
+				mkLeveledEntry("main", "w", "Something failed!"),
+				mkLeveledEntry("main", "e", "Something failed very hard!"),
+			},
+		},
+	} {
+		srv, err := mgmt.Logs(ctx, te.req)
+		if err != nil {
+			t.Errorf("Case %d: Logs failed: %v", i, err)
+			continue
+		}
+		logs := drainLogs(t, srv)
+		for _, e := range logs {
+			cleanLogEntry(e)
+		}
+		diff := cmp.Diff(te.want, logs, protocmp.Transform())
+		if diff != "" {
+			t.Errorf("Case %d: diff: \n%s", i, diff)
+		}
+	}
+
+}
diff --git a/metropolis/node/core/roleserve/BUILD.bazel b/metropolis/node/core/roleserve/BUILD.bazel
index c2dada4..8aa4fc0 100644
--- a/metropolis/node/core/roleserve/BUILD.bazel
+++ b/metropolis/node/core/roleserve/BUILD.bazel
@@ -33,6 +33,7 @@
         "//metropolis/node/kubernetes/pki",
         "//metropolis/pkg/event",
         "//metropolis/pkg/event/memory",
+        "//metropolis/pkg/logtree",
         "//metropolis/pkg/pki",
         "//metropolis/pkg/supervisor",
         "//metropolis/proto/common",
diff --git a/metropolis/node/core/roleserve/roleserve.go b/metropolis/node/core/roleserve/roleserve.go
index 8f9bb47..a1969bf 100644
--- a/metropolis/node/core/roleserve/roleserve.go
+++ b/metropolis/node/core/roleserve/roleserve.go
@@ -49,7 +49,9 @@
 	"source.monogon.dev/metropolis/node/core/network"
 	"source.monogon.dev/metropolis/node/core/rpc/resolver"
 	"source.monogon.dev/metropolis/pkg/event/memory"
+	"source.monogon.dev/metropolis/pkg/logtree"
 	"source.monogon.dev/metropolis/pkg/supervisor"
+
 	cpb "source.monogon.dev/metropolis/proto/common"
 )
 
@@ -68,6 +70,8 @@
 	// created early in the roleserver lifecycle, and is seeded with node
 	// information as the first subordinate runs DialCurator().
 	Resolver *resolver.Resolver
+
+	LogTree *logtree.LogTree
 }
 
 // Service is the roleserver/“Role Server” service. See the package-level
@@ -132,6 +136,7 @@
 
 	s.nodeMgmt = &workerNodeMgmt{
 		clusterMembership: &s.ClusterMembership,
+		logTree:           s.LogTree,
 	}
 
 	return s
diff --git a/metropolis/node/core/roleserve/worker_nodemgmt.go b/metropolis/node/core/roleserve/worker_nodemgmt.go
index 889f0eb..9ffb112 100644
--- a/metropolis/node/core/roleserve/worker_nodemgmt.go
+++ b/metropolis/node/core/roleserve/worker_nodemgmt.go
@@ -5,11 +5,13 @@
 
 	"source.monogon.dev/metropolis/node/core/mgmt"
 	"source.monogon.dev/metropolis/pkg/event/memory"
+	"source.monogon.dev/metropolis/pkg/logtree"
 	"source.monogon.dev/metropolis/pkg/supervisor"
 )
 
 type workerNodeMgmt struct {
 	clusterMembership *memory.Value[*ClusterMembership]
+	logTree           *logtree.LogTree
 }
 
 func (s *workerNodeMgmt) run(ctx context.Context) error {
@@ -24,6 +26,7 @@
 	supervisor.Logger(ctx).Infof("Got cluster membership, starting...")
 	srv := mgmt.Service{
 		NodeCredentials: cm.credentials,
+		LogTree:         s.logTree,
 	}
 	return srv.Run(ctx)
 }
diff --git a/metropolis/pkg/cmd/run.go b/metropolis/pkg/cmd/run.go
index 9f554b5..7834c99 100644
--- a/metropolis/pkg/cmd/run.go
+++ b/metropolis/pkg/cmd/run.go
@@ -33,13 +33,18 @@
 	// after the reader loop is broken, avoid deadlocks by making lineC a
 	// buffered channel.
 	lineC := make(chan string, 2)
-	outBuffer := logbuffer.NewLineBuffer(1024, func(l *logbuffer.Line) {
-		lineC <- l.Data
-	})
+	lineCB := func(l *logbuffer.Line) {
+		// If the context is canceled, no-one is listening on lineC anymore, so we would
+		// block.
+		select {
+		case <-ctx.Done():
+			return
+		case lineC <- l.Data:
+		}
+	}
+	outBuffer := logbuffer.NewLineBuffer(1024, lineCB)
 	defer outBuffer.Close()
-	errBuffer := logbuffer.NewLineBuffer(1024, func(l *logbuffer.Line) {
-		lineC <- l.Data
-	})
+	errBuffer := logbuffer.NewLineBuffer(1024, lineCB)
 	defer errBuffer.Close()
 
 	// Prepare the command context, and start the process.