metropolis: implement NodeManagement.Logs

This takes the implementation from the debug service, dusts it off a
bit, adds tests and moves eerything to the new node mgmt service.

Change-Id: Id3b70126a2551775d8328c0c4e424ec0e675f40f
Reviewed-on: https://review.monogon.dev/c/monogon/+/1439
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/node/core/BUILD.bazel b/metropolis/node/core/BUILD.bazel
index 281266b..07f4f6e 100644
--- a/metropolis/node/core/BUILD.bazel
+++ b/metropolis/node/core/BUILD.bazel
@@ -25,6 +25,7 @@
         "//metropolis/node/core/cluster",
         "//metropolis/node/core/localstorage",
         "//metropolis/node/core/localstorage/declarative",
+        "//metropolis/node/core/mgmt",
         "//metropolis/node/core/network",
         "//metropolis/node/core/network/hostsfile",
         "//metropolis/node/core/roleserve",
@@ -35,7 +36,6 @@
         "//metropolis/pkg/supervisor",
         "//metropolis/pkg/tpm",
         "//metropolis/proto/api",
-        "//metropolis/proto/common",
         "@com_github_containerd_containerd//:containerd",
         "@com_github_containerd_containerd//namespaces",
         "@org_golang_google_grpc//:go_default_library",
diff --git a/metropolis/node/core/debug_service_enabled.go b/metropolis/node/core/debug_service_enabled.go
index 3494270..75e92bc 100644
--- a/metropolis/node/core/debug_service_enabled.go
+++ b/metropolis/node/core/debug_service_enabled.go
@@ -31,13 +31,14 @@
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
 
-	common "source.monogon.dev/metropolis/node"
 	"source.monogon.dev/metropolis/node/core/localstorage"
+	"source.monogon.dev/metropolis/node/core/mgmt"
 	"source.monogon.dev/metropolis/node/core/roleserve"
 	"source.monogon.dev/metropolis/pkg/logtree"
 	"source.monogon.dev/metropolis/pkg/supervisor"
+
+	common "source.monogon.dev/metropolis/node"
 	apb "source.monogon.dev/metropolis/proto/api"
-	cpb "source.monogon.dev/metropolis/proto/common"
 )
 
 const (
@@ -98,142 +99,10 @@
 }
 
 func (s *debugService) GetLogs(req *apb.GetLogsRequest, srv apb.NodeDebugService_GetLogsServer) error {
-	if len(req.Filters) > logFilterMax {
-		return status.Errorf(codes.InvalidArgument, "requested %d filters, maximum permitted is %d", len(req.Filters), logFilterMax)
+	svc := mgmt.LogService{
+		LogTree: s.logtree,
 	}
-	dn := logtree.DN(req.Dn)
-	_, err := dn.Path()
-	switch err {
-	case nil:
-	case logtree.ErrInvalidDN:
-		return status.Errorf(codes.InvalidArgument, "invalid DN")
-	default:
-		return status.Errorf(codes.Unavailable, "could not parse DN: %v", err)
-	}
-
-	var options []logtree.LogReadOption
-
-	// Turn backlog mode into logtree option(s).
-	switch req.BacklogMode {
-	case apb.GetLogsRequest_BACKLOG_DISABLE:
-	case apb.GetLogsRequest_BACKLOG_ALL:
-		options = append(options, logtree.WithBacklog(logtree.BacklogAllAvailable))
-	case apb.GetLogsRequest_BACKLOG_COUNT:
-		count := int(req.BacklogCount)
-		if count <= 0 {
-			return status.Errorf(codes.InvalidArgument, "backlog_count must be > 0 if backlog_mode is BACKLOG_COUNT")
-		}
-		options = append(options, logtree.WithBacklog(count))
-	default:
-		return status.Errorf(codes.InvalidArgument, "unknown backlog_mode %d", req.BacklogMode)
-	}
-
-	// Turn stream mode into logtree option(s).
-	streamEnable := false
-	switch req.StreamMode {
-	case apb.GetLogsRequest_STREAM_DISABLE:
-	case apb.GetLogsRequest_STREAM_UNBUFFERED:
-		streamEnable = true
-		options = append(options, logtree.WithStream())
-	}
-
-	// Parse proto filters into logtree options.
-	for i, filter := range req.Filters {
-		switch inner := filter.Filter.(type) {
-		case *cpb.LogFilter_WithChildren_:
-			options = append(options, logtree.WithChildren())
-		case *cpb.LogFilter_OnlyRaw_:
-			options = append(options, logtree.OnlyRaw())
-		case *cpb.LogFilter_OnlyLeveled_:
-			options = append(options, logtree.OnlyLeveled())
-		case *cpb.LogFilter_LeveledWithMinimumSeverity_:
-			severity, err := logtree.SeverityFromProto(inner.LeveledWithMinimumSeverity.Minimum)
-			if err != nil {
-				return status.Errorf(codes.InvalidArgument, "filter %d has invalid severity: %v", i, err)
-			}
-			options = append(options, logtree.LeveledWithMinimumSeverity(severity))
-		}
-	}
-
-	reader, err := s.logtree.Read(logtree.DN(req.Dn), options...)
-	switch err {
-	case nil:
-	case logtree.ErrRawAndLeveled:
-		return status.Errorf(codes.InvalidArgument, "requested only raw and only leveled logs simultaneously")
-	default:
-		return status.Errorf(codes.Unavailable, "could not retrieve logs: %v", err)
-	}
-	defer reader.Close()
-
-	// Default protobuf message size limit is 64MB. We want to limit ourselves
-	// to 10MB.
-	// Currently each raw log line can be at most 1024 unicode codepoints (or
-	// 4096 bytes). To cover extra metadata and proto overhead, let's round
-	// this up to 4500 bytes. This in turn means we can store a maximum of
-	// (10e6/4500) == 2222 entries.
-	// Currently each leveled log line can also be at most 1024 unicode
-	// codepoints (or 4096 bytes). To cover extra metadata and proto overhead
-	// let's round this up to 2000 bytes. This in turn means we can store a
-	// maximum of (10e6/5000) == 2000 entries.
-	// The lowever of these numbers, ie the worst case scenario, is 2000
-	// maximum entries.
-	maxChunkSize := 2000
-
-	// Serve all backlog entries in chunks.
-	chunk := make([]*cpb.LogEntry, 0, maxChunkSize)
-	for _, entry := range reader.Backlog {
-		p := entry.Proto()
-		if p == nil {
-			// TODO(q3k): log this once we have logtree/gRPC compatibility.
-			continue
-		}
-		chunk = append(chunk, p)
-
-		if len(chunk) >= maxChunkSize {
-			err := srv.Send(&apb.GetLogsResponse{
-				BacklogEntries: chunk,
-			})
-			if err != nil {
-				return err
-			}
-			chunk = make([]*cpb.LogEntry, 0, maxChunkSize)
-		}
-	}
-
-	// Send last chunk of backlog, if present..
-	if len(chunk) > 0 {
-		err := srv.Send(&apb.GetLogsResponse{
-			BacklogEntries: chunk,
-		})
-		if err != nil {
-			return err
-		}
-		chunk = make([]*cpb.LogEntry, 0, maxChunkSize)
-	}
-
-	// Start serving streaming data, if streaming has been requested.
-	if !streamEnable {
-		return nil
-	}
-
-	for {
-		entry, ok := <-reader.Stream
-		if !ok {
-			// Streaming has been ended by logtree - tell the client and return.
-			return status.Error(codes.Unavailable, "log streaming aborted by system")
-		}
-		p := entry.Proto()
-		if p == nil {
-			// TODO(q3k): log this once we have logtree/gRPC compatibility.
-			continue
-		}
-		err := srv.Send(&apb.GetLogsResponse{
-			StreamEntries: []*cpb.LogEntry{p},
-		})
-		if err != nil {
-			return err
-		}
-	}
+	return svc.Logs(req, srv)
 }
 
 // Validate property names as they are used in path construction and we really
diff --git a/metropolis/node/core/main.go b/metropolis/node/core/main.go
index 8496e47..0538478 100644
--- a/metropolis/node/core/main.go
+++ b/metropolis/node/core/main.go
@@ -154,6 +154,7 @@
 			StorageRoot: root,
 			Network:     networkSvc,
 			Resolver:    res,
+			LogTree:     lt,
 		})
 		if err := supervisor.Run(ctx, "role", rs.Run); err != nil {
 			return fmt.Errorf("failed to start role service: %w", err)
diff --git a/metropolis/node/core/mgmt/BUILD.bazel b/metropolis/node/core/mgmt/BUILD.bazel
index 349f13e..dff5bac 100644
--- a/metropolis/node/core/mgmt/BUILD.bazel
+++ b/metropolis/node/core/mgmt/BUILD.bazel
@@ -1,4 +1,4 @@
-load("@io_bazel_rules_go//go:def.bzl", "go_library")
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
 
 go_library(
     name = "mgmt",
@@ -12,10 +12,28 @@
         "//metropolis/node",
         "//metropolis/node/core/identity",
         "//metropolis/node/core/rpc",
+        "//metropolis/pkg/logtree",
         "//metropolis/pkg/supervisor",
         "//metropolis/proto/api",
+        "//metropolis/proto/common",
         "@org_golang_google_grpc//:go_default_library",
         "@org_golang_google_grpc//codes",
         "@org_golang_google_grpc//status",
     ],
 )
+
+go_test(
+    name = "mgmt_test",
+    srcs = ["svc_logs_test.go"],
+    embed = [":mgmt"],
+    deps = [
+        "//metropolis/pkg/logtree",
+        "//metropolis/proto/api",
+        "//metropolis/proto/common",
+        "@com_github_google_go_cmp//cmp",
+        "@org_golang_google_grpc//:go_default_library",
+        "@org_golang_google_grpc//credentials/insecure",
+        "@org_golang_google_grpc//test/bufconn",
+        "@org_golang_google_protobuf//testing/protocmp",
+    ],
+)
diff --git a/metropolis/node/core/mgmt/mgmt.go b/metropolis/node/core/mgmt/mgmt.go
index 5fa12a0..2572764 100644
--- a/metropolis/node/core/mgmt/mgmt.go
+++ b/metropolis/node/core/mgmt/mgmt.go
@@ -12,16 +12,34 @@
 	"source.monogon.dev/metropolis/node"
 	"source.monogon.dev/metropolis/node/core/identity"
 	"source.monogon.dev/metropolis/node/core/rpc"
+	"source.monogon.dev/metropolis/pkg/logtree"
 	"source.monogon.dev/metropolis/pkg/supervisor"
 
 	apb "source.monogon.dev/metropolis/proto/api"
 )
 
+// Service implements metropolis.proto.api.NodeManagement.
 type Service struct {
+	// NodeCredentials used to set up gRPC server.
 	NodeCredentials *identity.NodeCredentials
+	// LogTree from which NodeManagement.Logs will be served.
+	LogTree *logtree.LogTree
+
+	// Automatically populated on Run.
+	LogService
 }
 
+// Run the Servie as a supervisor runnable.
 func (s *Service) Run(ctx context.Context) error {
+	if s.NodeCredentials == nil {
+		return fmt.Errorf("NodeCredentials missing")
+	}
+	if s.LogTree == nil {
+		return fmt.Errorf("LogTree missing")
+	}
+
+	s.LogService.LogTree = s.LogTree
+
 	sec := rpc.ServerSecurity{
 		NodeCredentials: s.NodeCredentials,
 	}
diff --git a/metropolis/node/core/mgmt/svc_logs.go b/metropolis/node/core/mgmt/svc_logs.go
index 6e05ce9..baf9d9b 100644
--- a/metropolis/node/core/mgmt/svc_logs.go
+++ b/metropolis/node/core/mgmt/svc_logs.go
@@ -4,9 +4,156 @@
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
 
+	"source.monogon.dev/metropolis/pkg/logtree"
 	"source.monogon.dev/metropolis/proto/api"
+	cpb "source.monogon.dev/metropolis/proto/common"
 )
 
-func (s *Service) Logs(_ *api.GetLogsRequest, _ api.NodeManagement_LogsServer) error {
-	return status.Error(codes.Unimplemented, "unimplemented")
+const (
+	logFilterMax = 10
+)
+
+// LogService implements NodeManagement.Logs. This is split away from the rest of
+// the Service to allow the debug service to reuse this implementation.
+type LogService struct {
+	LogTree *logtree.LogTree
+}
+
+func (s *LogService) Logs(req *api.GetLogsRequest, srv api.NodeManagement_LogsServer) error {
+	if len(req.Filters) > logFilterMax {
+		return status.Errorf(codes.InvalidArgument, "requested %d filters, maximum permitted is %d", len(req.Filters), logFilterMax)
+	}
+	dn := logtree.DN(req.Dn)
+	_, err := dn.Path()
+	switch err {
+	case nil:
+	case logtree.ErrInvalidDN:
+		return status.Errorf(codes.InvalidArgument, "invalid DN")
+	default:
+		return status.Errorf(codes.Unavailable, "could not parse DN: %v", err)
+	}
+
+	var options []logtree.LogReadOption
+
+	// Turn backlog mode into logtree option(s).
+	switch req.BacklogMode {
+	case api.GetLogsRequest_BACKLOG_DISABLE:
+	case api.GetLogsRequest_BACKLOG_ALL:
+		options = append(options, logtree.WithBacklog(logtree.BacklogAllAvailable))
+	case api.GetLogsRequest_BACKLOG_COUNT:
+		count := int(req.BacklogCount)
+		if count <= 0 {
+			return status.Errorf(codes.InvalidArgument, "backlog_count must be > 0 if backlog_mode is BACKLOG_COUNT")
+		}
+		options = append(options, logtree.WithBacklog(count))
+	default:
+		return status.Errorf(codes.InvalidArgument, "unknown backlog_mode %d", req.BacklogMode)
+	}
+
+	// Turn stream mode into logtree option(s).
+	streamEnable := false
+	switch req.StreamMode {
+	case api.GetLogsRequest_STREAM_DISABLE:
+	case api.GetLogsRequest_STREAM_UNBUFFERED:
+		streamEnable = true
+		options = append(options, logtree.WithStream())
+	}
+
+	// Parse proto filters into logtree options.
+	for i, filter := range req.Filters {
+		switch inner := filter.Filter.(type) {
+		case *cpb.LogFilter_WithChildren_:
+			options = append(options, logtree.WithChildren())
+		case *cpb.LogFilter_OnlyRaw_:
+			options = append(options, logtree.OnlyRaw())
+		case *cpb.LogFilter_OnlyLeveled_:
+			options = append(options, logtree.OnlyLeveled())
+		case *cpb.LogFilter_LeveledWithMinimumSeverity_:
+			severity, err := logtree.SeverityFromProto(inner.LeveledWithMinimumSeverity.Minimum)
+			if err != nil {
+				return status.Errorf(codes.InvalidArgument, "filter %d has invalid severity: %v", i, err)
+			}
+			options = append(options, logtree.LeveledWithMinimumSeverity(severity))
+		}
+	}
+
+	reader, err := s.LogTree.Read(logtree.DN(req.Dn), options...)
+	switch err {
+	case nil:
+	case logtree.ErrRawAndLeveled:
+		return status.Errorf(codes.InvalidArgument, "requested only raw and only leveled logs simultaneously")
+	default:
+		return status.Errorf(codes.Unavailable, "could not retrieve logs: %v", err)
+	}
+	defer reader.Close()
+
+	// Default protobuf message size limit is 64MB. We want to limit ourselves
+	// to 10MB.
+	// Currently each raw log line can be at most 1024 unicode codepoints (or
+	// 4096 bytes). To cover extra metadata and proto overhead, let's round
+	// this up to 4500 bytes. This in turn means we can store a maximum of
+	// (10e6/4500) == 2222 entries.
+	// Currently each leveled log line can also be at most 1024 unicode
+	// codepoints (or 4096 bytes). To cover extra metadata and proto overhead
+	// let's round this up to 2000 bytes. This in turn means we can store a
+	// maximum of (10e6/5000) == 2000 entries.
+	// The lowever of these numbers, ie the worst case scenario, is 2000
+	// maximum entries.
+	maxChunkSize := 2000
+
+	// Serve all backlog entries in chunks.
+	chunk := make([]*cpb.LogEntry, 0, maxChunkSize)
+	for _, entry := range reader.Backlog {
+		p := entry.Proto()
+		if p == nil {
+			// TODO(q3k): log this once we have logtree/gRPC compatibility.
+			continue
+		}
+		chunk = append(chunk, p)
+
+		if len(chunk) >= maxChunkSize {
+			err := srv.Send(&api.GetLogsResponse{
+				BacklogEntries: chunk,
+			})
+			if err != nil {
+				return err
+			}
+			chunk = make([]*cpb.LogEntry, 0, maxChunkSize)
+		}
+	}
+
+	// Send last chunk of backlog, if present..
+	if len(chunk) > 0 {
+		err := srv.Send(&api.GetLogsResponse{
+			BacklogEntries: chunk,
+		})
+		if err != nil {
+			return err
+		}
+		chunk = make([]*cpb.LogEntry, 0, maxChunkSize)
+	}
+
+	// Start serving streaming data, if streaming has been requested.
+	if !streamEnable {
+		return nil
+	}
+
+	for {
+		entry, ok := <-reader.Stream
+		if !ok {
+			// Streaming has been ended by logtree - tell the client and return.
+			return status.Error(codes.Unavailable, "log streaming aborted by system")
+		}
+		p := entry.Proto()
+		if p == nil {
+			// TODO(q3k): log this once we have logtree/gRPC compatibility.
+			continue
+		}
+		err := srv.Send(&api.GetLogsResponse{
+			StreamEntries: []*cpb.LogEntry{p},
+		})
+		if err != nil {
+			return err
+		}
+	}
 }
diff --git a/metropolis/node/core/mgmt/svc_logs_test.go b/metropolis/node/core/mgmt/svc_logs_test.go
new file mode 100644
index 0000000..d5825ad
--- /dev/null
+++ b/metropolis/node/core/mgmt/svc_logs_test.go
@@ -0,0 +1,367 @@
+package mgmt
+
+import (
+	"context"
+	"errors"
+	"fmt"
+	"io"
+	"net"
+	"strings"
+	"testing"
+	"time"
+
+	"github.com/google/go-cmp/cmp"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/credentials/insecure"
+	"google.golang.org/grpc/test/bufconn"
+	"google.golang.org/protobuf/testing/protocmp"
+
+	"source.monogon.dev/metropolis/pkg/logtree"
+	"source.monogon.dev/metropolis/proto/api"
+	cpb "source.monogon.dev/metropolis/proto/common"
+)
+
+func dut(t *testing.T) (*Service, *grpc.ClientConn) {
+	lt := logtree.New()
+	s := &Service{
+		LogTree: lt,
+		LogService: LogService{
+			LogTree: lt,
+		},
+	}
+
+	srv := grpc.NewServer()
+	api.RegisterNodeManagementServer(srv, s)
+	externalLis := bufconn.Listen(1024 * 1024)
+	go func() {
+		if err := srv.Serve(externalLis); err != nil {
+			t.Fatalf("GRPC serve failed: %v", err)
+		}
+	}()
+	withLocalDialer := grpc.WithContextDialer(func(_ context.Context, _ string) (net.Conn, error) {
+		return externalLis.Dial()
+	})
+	cl, err := grpc.Dial("local", withLocalDialer, grpc.WithTransportCredentials(insecure.NewCredentials()))
+	if err != nil {
+		t.Fatalf("Dialing GRPC failed: %v", err)
+	}
+
+	return s, cl
+}
+
+func cleanLogEntry(e *cpb.LogEntry) {
+	// Filter out bits that change too much to test them.
+	switch k := e.Kind.(type) {
+	case *cpb.LogEntry_Leveled_:
+		k.Leveled.Location = ""
+		k.Leveled.Timestamp = nil
+	}
+}
+
+func mkRawEntry(dn string, line string) *cpb.LogEntry {
+	return &cpb.LogEntry{
+		Dn: dn, Kind: &cpb.LogEntry_Raw_{
+			Raw: &cpb.LogEntry_Raw{
+				Data:           line,
+				OriginalLength: int64(len(line)),
+			},
+		},
+	}
+}
+
+func mkLeveledEntry(dn string, severity string, lines string) *cpb.LogEntry {
+	var sev cpb.LeveledLogSeverity
+	switch severity {
+	case "i":
+		sev = cpb.LeveledLogSeverity_INFO
+	case "w":
+		sev = cpb.LeveledLogSeverity_WARNING
+	case "e":
+		sev = cpb.LeveledLogSeverity_ERROR
+	}
+	return &cpb.LogEntry{
+		Dn: dn, Kind: &cpb.LogEntry_Leveled_{
+			Leveled: &cpb.LogEntry_Leveled{
+				Lines:    strings.Split(lines, "\n"),
+				Severity: sev,
+			},
+		},
+	}
+}
+
+func drainLogs(t *testing.T, srv api.NodeManagement_LogsClient) (res []*cpb.LogEntry) {
+	t.Helper()
+	for {
+		ev, err := srv.Recv()
+		if errors.Is(err, io.EOF) {
+			return
+		}
+		if err != nil {
+			t.Errorf("Recv: %v", err)
+			return
+		}
+		for _, e := range ev.BacklogEntries {
+			res = append(res, e)
+		}
+	}
+}
+
+// TestLogService_Logs_Backlog exercises the basic log API by requesting
+// backlogged leveled log entries.
+func TestLogService_Logs_Backlog(t *testing.T) {
+	ctx, ctxC := context.WithCancel(context.Background())
+	defer ctxC()
+
+	s, cl := dut(t)
+
+	mgmt := api.NewNodeManagementClient(cl)
+
+	s.LogTree.MustLeveledFor("init").Infof("Hello")
+	s.LogTree.MustLeveledFor("main").Infof("Starting roleserver...")
+	s.LogTree.MustLeveledFor("main.roleserver").Infof("Waiting for node roles...")
+	s.LogTree.MustLeveledFor("main.roleserver.kubernetes").Infof("Starting kubernetes...")
+	s.LogTree.MustLeveledFor("main.roleserver.controlplane").Infof("Starting control plane...")
+	s.LogTree.MustLeveledFor("main.roleserver.kubernetes").Infof("Kubernetes version: 1.21.37")
+	s.LogTree.MustLeveledFor("main.roleserver.controlplane").Infof("Starting etcd...")
+
+	mkReq := func(dn string, backlog int64) *api.GetLogsRequest {
+		var backlogMode api.GetLogsRequest_BacklogMode
+		var backlogCount int64
+		switch {
+		case backlog < 0:
+			backlogMode = api.GetLogsRequest_BACKLOG_ALL
+		case backlog > 0:
+			backlogMode = api.GetLogsRequest_BACKLOG_COUNT
+			backlogCount = backlog
+		case backlog == 0:
+			backlogMode = api.GetLogsRequest_BACKLOG_DISABLE
+		}
+		return &api.GetLogsRequest{
+			Dn:           dn,
+			BacklogMode:  backlogMode,
+			BacklogCount: backlogCount,
+			StreamMode:   api.GetLogsRequest_STREAM_DISABLE,
+		}
+	}
+	mkRecursive := func(in *api.GetLogsRequest) *api.GetLogsRequest {
+		in.Filters = append(in.Filters, &cpb.LogFilter{
+			Filter: &cpb.LogFilter_WithChildren_{
+				WithChildren: &cpb.LogFilter_WithChildren{},
+			},
+		})
+		return in
+	}
+	for i, te := range []struct {
+		req  *api.GetLogsRequest
+		want []*cpb.LogEntry
+	}{
+		{
+			// Test all backlog.
+			req: mkReq("main.roleserver.kubernetes", -1),
+			want: []*cpb.LogEntry{
+				mkLeveledEntry("main.roleserver.kubernetes", "i", "Starting kubernetes..."),
+				mkLeveledEntry("main.roleserver.kubernetes", "i", "Kubernetes version: 1.21.37"),
+			},
+		},
+		{
+			// Test exact backlog.
+			req: mkReq("main.roleserver.kubernetes", 1),
+			want: []*cpb.LogEntry{
+				mkLeveledEntry("main.roleserver.kubernetes", "i", "Kubernetes version: 1.21.37"),
+			},
+		},
+		{
+			// Test no backlog.
+			req:  mkReq("main.roleserver.kubernetes", 0),
+			want: nil,
+		},
+
+		{
+			// Test recursion with backlog.
+			req: mkRecursive(mkReq("main.roleserver", 2)),
+			want: []*cpb.LogEntry{
+				mkLeveledEntry("main.roleserver.kubernetes", "i", "Kubernetes version: 1.21.37"),
+				mkLeveledEntry("main.roleserver.controlplane", "i", "Starting etcd..."),
+			},
+		},
+	} {
+		srv, err := mgmt.Logs(ctx, te.req)
+		if err != nil {
+			t.Errorf("Case %d: Logs failed: %v", i, err)
+			continue
+		}
+		logs := drainLogs(t, srv)
+		for _, e := range logs {
+			cleanLogEntry(e)
+		}
+		diff := cmp.Diff(te.want, logs, protocmp.Transform())
+		if diff != "" {
+			t.Errorf("Case %d: diff: \n%s", i, diff)
+		}
+	}
+}
+
+// TestLogService_Logs_Strea, exercises the basic log API by requesting
+// streaming leveled log entries.
+func TestLogService_Logs_Stream(t *testing.T) {
+	ctx, ctxC := context.WithCancel(context.Background())
+	defer ctxC()
+
+	s, cl := dut(t)
+
+	// Start streaming all logs.
+	mgmt := api.NewNodeManagementClient(cl)
+	srv, err := mgmt.Logs(ctx, &api.GetLogsRequest{
+		Dn:          "",
+		BacklogMode: api.GetLogsRequest_BACKLOG_ALL,
+		StreamMode:  api.GetLogsRequest_STREAM_UNBUFFERED,
+		Filters: []*cpb.LogFilter{
+			{
+				Filter: &cpb.LogFilter_WithChildren_{
+					WithChildren: &cpb.LogFilter_WithChildren{},
+				},
+			},
+		},
+	})
+	if err != nil {
+		t.Fatalf("Logs failed: %v", err)
+	}
+
+	// Pipe returned logs into a channel for analysis.
+	logC := make(chan *cpb.LogEntry)
+	go func() {
+		for {
+			ev, err := srv.Recv()
+			if err != nil {
+				return
+			}
+			for _, e := range ev.BacklogEntries {
+				logC <- e
+			}
+			for _, e := range ev.StreamEntries {
+				logC <- e
+			}
+		}
+	}()
+
+	// Submit log entry, expect it on the channel.
+	s.LogTree.MustLeveledFor("test").Infof("Hello, world")
+	select {
+	case e := <-logC:
+		cleanLogEntry(e)
+		if diff := cmp.Diff(mkLeveledEntry("test", "i", "Hello, world"), e, protocmp.Transform()); diff != "" {
+			t.Errorf("Diff:\n%s", diff)
+		}
+	case <-time.After(time.Second * 2):
+		t.Errorf("Timeout")
+	}
+
+	// That could've made it through the backlog. Do it again to make sure it came
+	// through streaming.
+	s.LogTree.MustLeveledFor("test").Infof("Hello again, world")
+	select {
+	case e := <-logC:
+		cleanLogEntry(e)
+		if diff := cmp.Diff(mkLeveledEntry("test", "i", "Hello again, world"), e, protocmp.Transform()); diff != "" {
+			t.Errorf("Diff:\n%s", diff)
+		}
+	case <-time.After(time.Second * 2):
+		t.Errorf("Timeout")
+	}
+}
+
+// TestLogService_Logs_Filters exercises the rest of the filter functionality.
+func TestLogService_Logs_Filters(t *testing.T) {
+	ctx, ctxC := context.WithCancel(context.Background())
+	defer ctxC()
+
+	s, cl := dut(t)
+
+	mgmt := api.NewNodeManagementClient(cl)
+	s.LogTree.MustLeveledFor("main").Infof("Hello")
+	s.LogTree.MustLeveledFor("main").Infof("Starting...")
+	s.LogTree.MustLeveledFor("main").Warningf("Something failed!")
+	fmt.Fprintln(s.LogTree.MustRawFor("main"), "medium rare")
+	s.LogTree.MustLeveledFor("main").Errorf("Something failed very hard!")
+
+	for i, te := range []struct {
+		req  *api.GetLogsRequest
+		want []*cpb.LogEntry
+	}{
+		// Case 0: request given level
+		{
+			req: &api.GetLogsRequest{
+				Dn:          "main",
+				BacklogMode: api.GetLogsRequest_BACKLOG_ALL,
+				StreamMode:  api.GetLogsRequest_STREAM_DISABLE,
+				Filters: []*cpb.LogFilter{
+					{
+						Filter: &cpb.LogFilter_LeveledWithMinimumSeverity_{
+							LeveledWithMinimumSeverity: &cpb.LogFilter_LeveledWithMinimumSeverity{
+								Minimum: cpb.LeveledLogSeverity_WARNING,
+							},
+						},
+					},
+				},
+			},
+			want: []*cpb.LogEntry{
+				mkLeveledEntry("main", "w", "Something failed!"),
+				mkLeveledEntry("main", "e", "Something failed very hard!"),
+			},
+		},
+		// Case 1: request raw only
+		{
+			req: &api.GetLogsRequest{
+				Dn:          "main",
+				BacklogMode: api.GetLogsRequest_BACKLOG_ALL,
+				StreamMode:  api.GetLogsRequest_STREAM_DISABLE,
+				Filters: []*cpb.LogFilter{
+					{
+						Filter: &cpb.LogFilter_OnlyRaw_{
+							OnlyRaw: &cpb.LogFilter_OnlyRaw{},
+						},
+					},
+				},
+			},
+			want: []*cpb.LogEntry{
+				mkRawEntry("main", "medium rare"),
+			},
+		},
+		// Case 2: request leveled only
+		{
+			req: &api.GetLogsRequest{
+				Dn:          "main",
+				BacklogMode: api.GetLogsRequest_BACKLOG_ALL,
+				StreamMode:  api.GetLogsRequest_STREAM_DISABLE,
+				Filters: []*cpb.LogFilter{
+					{
+						Filter: &cpb.LogFilter_OnlyLeveled_{
+							OnlyLeveled: &cpb.LogFilter_OnlyLeveled{},
+						},
+					},
+				},
+			},
+			want: []*cpb.LogEntry{
+				mkLeveledEntry("main", "i", "Hello"),
+				mkLeveledEntry("main", "i", "Starting..."),
+				mkLeveledEntry("main", "w", "Something failed!"),
+				mkLeveledEntry("main", "e", "Something failed very hard!"),
+			},
+		},
+	} {
+		srv, err := mgmt.Logs(ctx, te.req)
+		if err != nil {
+			t.Errorf("Case %d: Logs failed: %v", i, err)
+			continue
+		}
+		logs := drainLogs(t, srv)
+		for _, e := range logs {
+			cleanLogEntry(e)
+		}
+		diff := cmp.Diff(te.want, logs, protocmp.Transform())
+		if diff != "" {
+			t.Errorf("Case %d: diff: \n%s", i, diff)
+		}
+	}
+
+}
diff --git a/metropolis/node/core/roleserve/BUILD.bazel b/metropolis/node/core/roleserve/BUILD.bazel
index c2dada4..8aa4fc0 100644
--- a/metropolis/node/core/roleserve/BUILD.bazel
+++ b/metropolis/node/core/roleserve/BUILD.bazel
@@ -33,6 +33,7 @@
         "//metropolis/node/kubernetes/pki",
         "//metropolis/pkg/event",
         "//metropolis/pkg/event/memory",
+        "//metropolis/pkg/logtree",
         "//metropolis/pkg/pki",
         "//metropolis/pkg/supervisor",
         "//metropolis/proto/common",
diff --git a/metropolis/node/core/roleserve/roleserve.go b/metropolis/node/core/roleserve/roleserve.go
index 8f9bb47..a1969bf 100644
--- a/metropolis/node/core/roleserve/roleserve.go
+++ b/metropolis/node/core/roleserve/roleserve.go
@@ -49,7 +49,9 @@
 	"source.monogon.dev/metropolis/node/core/network"
 	"source.monogon.dev/metropolis/node/core/rpc/resolver"
 	"source.monogon.dev/metropolis/pkg/event/memory"
+	"source.monogon.dev/metropolis/pkg/logtree"
 	"source.monogon.dev/metropolis/pkg/supervisor"
+
 	cpb "source.monogon.dev/metropolis/proto/common"
 )
 
@@ -68,6 +70,8 @@
 	// created early in the roleserver lifecycle, and is seeded with node
 	// information as the first subordinate runs DialCurator().
 	Resolver *resolver.Resolver
+
+	LogTree *logtree.LogTree
 }
 
 // Service is the roleserver/“Role Server” service. See the package-level
@@ -132,6 +136,7 @@
 
 	s.nodeMgmt = &workerNodeMgmt{
 		clusterMembership: &s.ClusterMembership,
+		logTree:           s.LogTree,
 	}
 
 	return s
diff --git a/metropolis/node/core/roleserve/worker_nodemgmt.go b/metropolis/node/core/roleserve/worker_nodemgmt.go
index 889f0eb..9ffb112 100644
--- a/metropolis/node/core/roleserve/worker_nodemgmt.go
+++ b/metropolis/node/core/roleserve/worker_nodemgmt.go
@@ -5,11 +5,13 @@
 
 	"source.monogon.dev/metropolis/node/core/mgmt"
 	"source.monogon.dev/metropolis/pkg/event/memory"
+	"source.monogon.dev/metropolis/pkg/logtree"
 	"source.monogon.dev/metropolis/pkg/supervisor"
 )
 
 type workerNodeMgmt struct {
 	clusterMembership *memory.Value[*ClusterMembership]
+	logTree           *logtree.LogTree
 }
 
 func (s *workerNodeMgmt) run(ctx context.Context) error {
@@ -24,6 +26,7 @@
 	supervisor.Logger(ctx).Infof("Got cluster membership, starting...")
 	srv := mgmt.Service{
 		NodeCredentials: cm.credentials,
+		LogTree:         s.logTree,
 	}
 	return srv.Run(ctx)
 }