core: plug logtree into NodeDebugService

This introduces a new Proto API for accessing debug logs. Currently this
is implemented to be used by the debug service. However, these proto
definitions will likely be reused for production cluster APIs.

The implementation mostly consists of adding the proto, implementing
to/from conversion methods, and altering the debug service to use the
new API.

We also move all of the debug service implementation into a separate file,
to slightly clean up main.go. This produces an unfortunately colorful
diff, but it's just moving code around.

Test Plan: Manually tested using the dbg tool. We currently don't properly test the debug service. I suppose we should do that for the production cluster APIs, and just keep on going for now.

X-Origin-Diff: phab/D649
GitOrigin-RevId: ac454681e4b72b2876e313b3aeababa179eb1fa3
diff --git a/core/cmd/dbg/BUILD.bazel b/core/cmd/dbg/BUILD.bazel
index 3ca06d0..563be82 100644
--- a/core/cmd/dbg/BUILD.bazel
+++ b/core/cmd/dbg/BUILD.bazel
@@ -6,6 +6,7 @@
     importpath = "git.monogon.dev/source/nexantic.git/core/cmd/dbg",
     visibility = ["//visibility:private"],
     deps = [
+        "//core/pkg/logtree:go_default_library",
         "//core/proto/api:go_default_library",
         "@com_github_spf13_pflag//:go_default_library",
         "@io_k8s_component_base//cli/flag:go_default_library",
diff --git a/core/cmd/dbg/main.go b/core/cmd/dbg/main.go
index f2d8fc0..176973f 100644
--- a/core/cmd/dbg/main.go
+++ b/core/cmd/dbg/main.go
@@ -20,12 +20,14 @@
 	"context"
 	"flag"
 	"fmt"
+	"io"
 	"io/ioutil"
 	"math/rand"
 	"os"
-	"strings"
 	"time"
 
+	"git.monogon.dev/source/nexantic.git/core/pkg/logtree"
+
 	"github.com/spf13/pflag"
 	"google.golang.org/grpc"
 	cliflag "k8s.io/component-base/cli/flag"
@@ -51,12 +53,14 @@
 	}
 
 	logsCmd := flag.NewFlagSet("logs", flag.ExitOnError)
-	logsTailN := logsCmd.Uint("tail", 0, "Get last n lines (0 = whole buffer)")
+	logsTailN := logsCmd.Int("tail", -1, "Get last n lines (-1 = whole buffer, 0 = disable)")
+	logsStream := logsCmd.Bool("follow", false, "Stream log entries live from the system")
+	logsRecursive := logsCmd.Bool("recursive", false, "Get entries from entire DN subtree")
 	logsCmd.Usage = func() {
-		fmt.Fprintf(os.Stderr, "Usage: %s %s [options] component_path\n", os.Args[0], os.Args[1])
+		fmt.Fprintf(os.Stderr, "Usage: %s %s [options] dn\n", os.Args[0], os.Args[1])
 		flag.PrintDefaults()
 
-		fmt.Fprintf(os.Stderr, "Example:\n  %s %s --tail 5 kube.apiserver\n", os.Args[0], os.Args[1])
+		fmt.Fprintf(os.Stderr, "Example:\n  %s %s --tail 5 --follow init\n", os.Args[0], os.Args[1])
 	}
 	goldenticketCmd := flag.NewFlagSet("goldenticket", flag.ExitOnError)
 	conditionCmd := flag.NewFlagSet("condition", flag.ExitOnError)
@@ -66,19 +70,60 @@
 
 		fmt.Fprintf(os.Stderr, "Example:\n  %s %s IPAssigned\n", os.Args[0], os.Args[1])
 	}
+
 	switch os.Args[1] {
 	case "logs":
 		logsCmd.Parse(os.Args[2:])
-		componentPath := strings.Split(logsCmd.Arg(0), ".")
-		res, err := debugClient.GetComponentLogs(ctx, &apb.GetComponentLogsRequest{ComponentPath: componentPath, TailLines: uint32(*logsTailN)})
+		dn := logsCmd.Arg(0)
+		req := &apb.GetLogsRequest{
+			Dn:          dn,
+			BacklogMode: apb.GetLogsRequest_BACKLOG_DISABLE,
+			StreamMode:  apb.GetLogsRequest_STREAM_DISABLE,
+			Filters:     nil,
+		}
+
+		switch *logsTailN {
+		case 0:
+		case -1:
+			req.BacklogMode = apb.GetLogsRequest_BACKLOG_ALL
+		default:
+			req.BacklogMode = apb.GetLogsRequest_BACKLOG_COUNT
+			req.BacklogCount = int64(*logsTailN)
+		}
+
+		if *logsStream {
+			req.StreamMode = apb.GetLogsRequest_STREAM_UNBUFFERED
+		}
+
+		if *logsRecursive {
+			req.Filters = append(req.Filters, &apb.LogFilter{
+				Filter: &apb.LogFilter_WithChildren_{WithChildren: &apb.LogFilter_WithChildren{}},
+			})
+		}
+
+		stream, err := debugClient.GetLogs(ctx, req)
 		if err != nil {
 			fmt.Fprintf(os.Stderr, "Failed to get logs: %v\n", err)
 			os.Exit(1)
 		}
-		for _, line := range res.Line {
-			fmt.Println(line)
+		for {
+			res, err := stream.Recv()
+			if err != nil {
+				if err == io.EOF {
+					os.Exit(0)
+				}
+				fmt.Fprintf(os.Stderr, "Failed to stream logs: %v\n", err)
+				os.Exit(1)
+			}
+			for _, entry := range res.BacklogEntries {
+				entry, err := logtree.LogEntryFromProto(entry)
+				if err != nil {
+					fmt.Printf("error decoding entry: %v", err)
+					continue
+				}
+				fmt.Println(entry.String())
+			}
 		}
-		return
 	case "goldenticket":
 		goldenticketCmd.Parse(os.Args[2:])
 		ip := goldenticketCmd.Arg(0)
diff --git a/core/cmd/init/debug_service.go b/core/cmd/init/debug_service.go
index faa0135..6cb9620 100644
--- a/core/cmd/init/debug_service.go
+++ b/core/cmd/init/debug_service.go
@@ -18,31 +18,226 @@
 
 import (
 	"context"
-
-	"git.monogon.dev/source/nexantic.git/core/internal/cluster"
-	"git.monogon.dev/source/nexantic.git/core/internal/containerd"
-	"git.monogon.dev/source/nexantic.git/core/internal/kubernetes"
-	apb "git.monogon.dev/source/nexantic.git/core/proto/api"
+	"crypto/x509"
+	"fmt"
+	"net"
 
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
+
+	"git.monogon.dev/source/nexantic.git/core/internal/cluster"
+	"git.monogon.dev/source/nexantic.git/core/internal/common"
+	"git.monogon.dev/source/nexantic.git/core/internal/consensus/ca"
+	"git.monogon.dev/source/nexantic.git/core/internal/kubernetes"
+	"git.monogon.dev/source/nexantic.git/core/pkg/logtree"
+	apb "git.monogon.dev/source/nexantic.git/core/proto/api"
+)
+
+const (
+	logFilterMax = 1000
 )
 
 // debugService implements the Smalltown node debug API.
-// TODO(q3k): this should probably be implemented somewhere else way once we have a better
-// supervision introspection/status API.
 type debugService struct {
 	cluster    *cluster.Manager
 	kubernetes *kubernetes.Service
-	containerd *containerd.Service
+	logtree    *logtree.LogTree
+}
+
+func (s *debugService) GetGoldenTicket(ctx context.Context, req *apb.GetGoldenTicketRequest) (*apb.GetGoldenTicketResponse, error) {
+	ip := net.ParseIP(req.ExternalIp)
+	if ip == nil {
+		return nil, status.Errorf(codes.InvalidArgument, "could not parse IP %q", req.ExternalIp)
+	}
+	this := s.cluster.Node()
+
+	certRaw, key, err := s.nodeCertificate()
+	if err != nil {
+		return nil, status.Errorf(codes.Unavailable, "failed to generate node certificate: %v", err)
+	}
+	cert, err := x509.ParseCertificate(certRaw)
+	if err != nil {
+		panic(err)
+	}
+	kv := s.cluster.ConsensusKVRoot()
+	ca, err := ca.Load(ctx, kv)
+	if err != nil {
+		return nil, status.Errorf(codes.Unavailable, "could not load CA: %v", err)
+	}
+	etcdCert, etcdKey, err := ca.Issue(ctx, kv, cert.Subject.CommonName, ip)
+	if err != nil {
+		return nil, status.Errorf(codes.Unavailable, "could not generate etcd peer certificate: %v", err)
+	}
+	etcdCRL, err := ca.GetCurrentCRL(ctx, kv)
+	if err != nil {
+		return nil, status.Errorf(codes.Unavailable, "could not get etcd CRL: %v", err)
+	}
+
+	// Add new etcd member to etcd cluster.
+	etcd := s.cluster.ConsensusCluster()
+	etcdAddr := fmt.Sprintf("https://%s:%d", ip.String(), common.ConsensusPort)
+	_, err = etcd.MemberAddAsLearner(ctx, []string{etcdAddr})
+	if err != nil {
+		return nil, status.Errorf(codes.Unavailable, "could not add as new etcd consensus member: %v", err)
+	}
+
+	return &apb.GetGoldenTicketResponse{
+		Ticket: &apb.GoldenTicket{
+			EtcdCaCert:     ca.CACertRaw,
+			EtcdClientCert: etcdCert,
+			EtcdClientKey:  etcdKey,
+			EtcdCrl:        etcdCRL,
+			Peers: []*apb.GoldenTicket_EtcdPeer{
+				{Name: this.ID(), Address: this.Address().String()},
+			},
+			This: &apb.GoldenTicket_EtcdPeer{Name: cert.Subject.CommonName, Address: ip.String()},
+
+			NodeId:   cert.Subject.CommonName,
+			NodeCert: certRaw,
+			NodeKey:  key,
+		},
+	}, nil
 }
 
 func (s *debugService) GetDebugKubeconfig(ctx context.Context, req *apb.GetDebugKubeconfigRequest) (*apb.GetDebugKubeconfigResponse, error) {
 	return s.kubernetes.GetDebugKubeconfig(ctx, req)
 }
 
-// GetComponentLogs gets various logbuffers from binaries we call. This function just deals with the first path component,
-// delegating the rest to the service-specific handlers.
-func (s *debugService) GetComponentLogs(ctx context.Context, req *apb.GetComponentLogsRequest) (*apb.GetComponentLogsResponse, error) {
-	return nil, status.Error(codes.Unimplemented, "unimplemented")
+func (s *debugService) GetLogs(req *apb.GetLogsRequest, srv apb.NodeDebugService_GetLogsServer) error {
+	if len(req.Filters) > logFilterMax {
+		return status.Errorf(codes.InvalidArgument, "requested %d filters, maximum permitted is %d", len(req.Filters), logFilterMax)
+	}
+	dn := logtree.DN(req.Dn)
+	_, err := dn.Path()
+	switch err {
+	case nil:
+	case logtree.ErrInvalidDN:
+		return status.Errorf(codes.InvalidArgument, "invalid DN")
+	default:
+		return status.Errorf(codes.Unavailable, "could not parse DN: %v", err)
+	}
+
+	var options []logtree.LogReadOption
+
+	// Turn backlog mode into logtree option(s).
+	switch req.BacklogMode {
+	case apb.GetLogsRequest_BACKLOG_DISABLE:
+	case apb.GetLogsRequest_BACKLOG_ALL:
+		options = append(options, logtree.WithBacklog(logtree.BacklogAllAvailable))
+	case apb.GetLogsRequest_BACKLOG_COUNT:
+		count := int(req.BacklogCount)
+		if count <= 0 {
+			return status.Errorf(codes.InvalidArgument, "backlog_count must be > 0 if backlog_mode is BACKLOG_COUNT")
+		}
+		options = append(options, logtree.WithBacklog(count))
+	default:
+		return status.Errorf(codes.InvalidArgument, "unknown backlog_mode %d", req.BacklogMode)
+	}
+
+	// Turn stream mode into logtree option(s).
+	streamEnable := false
+	switch req.StreamMode {
+	case apb.GetLogsRequest_STREAM_DISABLE:
+	case apb.GetLogsRequest_STREAM_UNBUFFERED:
+		streamEnable = true
+		options = append(options, logtree.WithStream())
+	}
+
+	// Parse proto filters into logtree options.
+	for i, filter := range req.Filters {
+		switch inner := filter.Filter.(type) {
+		case *apb.LogFilter_WithChildren_:
+			options = append(options, logtree.WithChildren())
+		case *apb.LogFilter_OnlyRaw_:
+			options = append(options, logtree.OnlyRaw())
+		case *apb.LogFilter_OnlyLeveled_:
+			options = append(options, logtree.OnlyLeveled())
+		case *apb.LogFilter_LeveledWithMinimumSeverity_:
+			severity, err := logtree.SeverityFromProto(inner.LeveledWithMinimumSeverity.Minimum)
+			if err != nil {
+				return status.Errorf(codes.InvalidArgument, "filter %d has invalid severity: %v", i, err)
+			}
+			options = append(options, logtree.LeveledWithMinimumSeverity(severity))
+		}
+	}
+
+	reader, err := s.logtree.Read(logtree.DN(req.Dn), options...)
+	switch err {
+	case nil:
+	case logtree.ErrRawAndLeveled:
+		return status.Errorf(codes.InvalidArgument, "requested only raw and only leveled logs simultaneously")
+	default:
+		return status.Errorf(codes.Unavailable, "could not retrieve logs: %v", err)
+	}
+	defer reader.Close()
+
+	// Default protobuf message size limit is 64MB. We want to limit ourselves
+	// to 10MB.
+	// Currently each raw log line can be at most 1024 unicode codepoints (or
+	// 4096 bytes). To cover extra metadata and proto overhead, let's round
+	// this up to 4500 bytes. This in turn means we can store a maximum of
+	// (10e6/4500) == 2222 entries.
+	// Currently each leveled log line can also be at most 1024 unicode
+	// codepoints (or 4096 bytes). To cover extra metadata and proto overhead
+	// let's round this up to 2000 bytes. This in turn means we can store a
+	// maximum of (10e6/5000) == 2000 entries.
+	// The lowever of these numbers, ie the worst case scenario, is 2000
+	// maximum entries.
+	maxChunkSize := 2000
+
+	// Serve all backlog entries in chunks.
+	chunk := make([]*apb.LogEntry, 0, maxChunkSize)
+	for _, entry := range reader.Backlog {
+		p := entry.Proto()
+		if p == nil {
+			// TODO(q3k): log this once we have logtree/gRPC compatibility.
+			continue
+		}
+		chunk = append(chunk, p)
+
+		if len(chunk) >= maxChunkSize {
+			err := srv.Send(&apb.GetLogsResponse{
+				BacklogEntries: chunk,
+			})
+			if err != nil {
+				return err
+			}
+			chunk = make([]*apb.LogEntry, 0, maxChunkSize)
+		}
+	}
+
+	// Send last chunk of backlog, if present..
+	if len(chunk) > 0 {
+		err := srv.Send(&apb.GetLogsResponse{
+			BacklogEntries: chunk,
+		})
+		if err != nil {
+			return err
+		}
+		chunk = make([]*apb.LogEntry, 0, maxChunkSize)
+	}
+
+	// Start serving streaming data, if streaming has been requested.
+	if !streamEnable {
+		return nil
+	}
+
+	for {
+		entry, ok := <-reader.Stream
+		if !ok {
+			// Streaming has been ended by logtree - tell the client and return.
+			return status.Error(codes.Unavailable, "log streaming aborted by system")
+		}
+		p := entry.Proto()
+		if p == nil {
+			// TODO(q3k): log this once we have logtree/gRPC compatibility.
+			continue
+		}
+		err := srv.Send(&apb.GetLogsResponse{
+			StreamEntries: []*apb.LogEntry{p},
+		})
+		if err != nil {
+			return err
+		}
+	}
 }
diff --git a/core/cmd/init/main.go b/core/cmd/init/main.go
index 989f953..4ba991c 100644
--- a/core/cmd/init/main.go
+++ b/core/cmd/init/main.go
@@ -29,25 +29,20 @@
 	"os/signal"
 	"runtime/debug"
 
-	"git.monogon.dev/source/nexantic.git/core/pkg/logtree"
-
-	"git.monogon.dev/source/nexantic.git/core/internal/network/dns"
-
 	"golang.org/x/sys/unix"
 	"google.golang.org/grpc"
-	"google.golang.org/grpc/codes"
-	"google.golang.org/grpc/status"
 
 	"git.monogon.dev/source/nexantic.git/core/internal/cluster"
 	"git.monogon.dev/source/nexantic.git/core/internal/common"
 	"git.monogon.dev/source/nexantic.git/core/internal/common/supervisor"
-	"git.monogon.dev/source/nexantic.git/core/internal/consensus/ca"
 	"git.monogon.dev/source/nexantic.git/core/internal/containerd"
 	"git.monogon.dev/source/nexantic.git/core/internal/kubernetes"
 	"git.monogon.dev/source/nexantic.git/core/internal/kubernetes/pki"
 	"git.monogon.dev/source/nexantic.git/core/internal/localstorage"
 	"git.monogon.dev/source/nexantic.git/core/internal/localstorage/declarative"
 	"git.monogon.dev/source/nexantic.git/core/internal/network"
+	"git.monogon.dev/source/nexantic.git/core/internal/network/dns"
+	"git.monogon.dev/source/nexantic.git/core/pkg/logtree"
 	"git.monogon.dev/source/nexantic.git/core/pkg/tpm"
 	apb "git.monogon.dev/source/nexantic.git/core/proto/api"
 )
@@ -90,17 +85,7 @@
 	go func() {
 		for {
 			p := <-reader.Stream
-			if p.Leveled != nil {
-				// Use glog-like layout, but with supervisor DN instead of filename.
-				timestamp := p.Leveled.Timestamp()
-				_, month, day := timestamp.Date()
-				hour, minute, second := timestamp.Clock()
-				nsec := timestamp.Nanosecond() / 1000
-				fmt.Fprintf(os.Stderr, "%s%02d%02d %02d:%02d:%02d.%06d %s] %s\n", p.Leveled.Severity(), month, day, hour, minute, second, nsec, p.DN, p.Leveled.Message())
-			}
-			if p.Raw != nil {
-				fmt.Fprintf(os.Stderr, "%-32s R %s\n", p.DN, p.Raw)
-			}
+			fmt.Fprintf(os.Stderr, "%s\n", p.String())
 		}
 	}()
 
@@ -236,11 +221,9 @@
 		}
 
 		// Start the node debug service.
-		// TODO(q3k): this needs to be done in a smarter way once LogTree lands, and then a few things can be
-		// refactored to start this earlier, or this can be split up into a multiple gRPC service on a single listener.
 		dbg := &debugService{
 			cluster:    m,
-			containerd: containerdSvc,
+			logtree:    lt,
 			kubernetes: kubeSvc,
 		}
 		dbgSrv := grpc.NewServer()
@@ -336,58 +319,3 @@
 	}
 	return
 }
-
-func (s *debugService) GetGoldenTicket(ctx context.Context, req *apb.GetGoldenTicketRequest) (*apb.GetGoldenTicketResponse, error) {
-	ip := net.ParseIP(req.ExternalIp)
-	if ip == nil {
-		return nil, status.Errorf(codes.InvalidArgument, "could not parse IP %q", req.ExternalIp)
-	}
-	this := s.cluster.Node()
-
-	certRaw, key, err := s.nodeCertificate()
-	if err != nil {
-		return nil, status.Errorf(codes.Unavailable, "failed to generate node certificate: %v", err)
-	}
-	cert, err := x509.ParseCertificate(certRaw)
-	if err != nil {
-		panic(err)
-	}
-	kv := s.cluster.ConsensusKVRoot()
-	ca, err := ca.Load(ctx, kv)
-	if err != nil {
-		return nil, status.Errorf(codes.Unavailable, "could not load CA: %v", err)
-	}
-	etcdCert, etcdKey, err := ca.Issue(ctx, kv, cert.Subject.CommonName, ip)
-	if err != nil {
-		return nil, status.Errorf(codes.Unavailable, "could not generate etcd peer certificate: %v", err)
-	}
-	etcdCRL, err := ca.GetCurrentCRL(ctx, kv)
-	if err != nil {
-		return nil, status.Errorf(codes.Unavailable, "could not get etcd CRL: %v", err)
-	}
-
-	// Add new etcd member to etcd cluster.
-	etcd := s.cluster.ConsensusCluster()
-	etcdAddr := fmt.Sprintf("https://%s:%d", ip.String(), common.ConsensusPort)
-	_, err = etcd.MemberAddAsLearner(ctx, []string{etcdAddr})
-	if err != nil {
-		return nil, status.Errorf(codes.Unavailable, "could not add as new etcd consensus member: %v", err)
-	}
-
-	return &apb.GetGoldenTicketResponse{
-		Ticket: &apb.GoldenTicket{
-			EtcdCaCert:     ca.CACertRaw,
-			EtcdClientCert: etcdCert,
-			EtcdClientKey:  etcdKey,
-			EtcdCrl:        etcdCRL,
-			Peers: []*apb.GoldenTicket_EtcdPeer{
-				{Name: this.ID(), Address: this.Address().String()},
-			},
-			This: &apb.GoldenTicket_EtcdPeer{Name: cert.Subject.CommonName, Address: ip.String()},
-
-			NodeId:   cert.Subject.CommonName,
-			NodeCert: certRaw,
-			NodeKey:  key,
-		},
-	}, nil
-}
diff --git a/core/pkg/logbuffer/BUILD.bazel b/core/pkg/logbuffer/BUILD.bazel
index fb7512a..958389e 100644
--- a/core/pkg/logbuffer/BUILD.bazel
+++ b/core/pkg/logbuffer/BUILD.bazel
@@ -8,6 +8,7 @@
     ],
     importpath = "git.monogon.dev/source/nexantic.git/core/pkg/logbuffer",
     visibility = ["//visibility:public"],
+    deps = ["//core/proto/api:go_default_library"],
 )
 
 go_test(
diff --git a/core/pkg/logbuffer/linebuffer.go b/core/pkg/logbuffer/linebuffer.go
index 6ee7d6b..fa4dc33 100644
--- a/core/pkg/logbuffer/linebuffer.go
+++ b/core/pkg/logbuffer/linebuffer.go
@@ -21,6 +21,8 @@
 	"fmt"
 	"strings"
 	"sync"
+
+	apb "git.monogon.dev/source/nexantic.git/core/proto/api"
 )
 
 // Line is a line stored in the log buffer - a string, that has been perhaps truncated (due to exceeded limits).
@@ -43,6 +45,29 @@
 	return l.Data
 }
 
+// ProtoLog returns a Logging-specific protobuf structure.
+func (l *Line) ProtoLog() *apb.LogEntry_Raw {
+	return &apb.LogEntry_Raw{
+		Data:           l.Data,
+		OriginalLength: int64(l.OriginalLength),
+	}
+}
+
+// LineFromLogProto converts a Logging-specific protobuf message back into a Line.
+func LineFromLogProto(raw *apb.LogEntry_Raw) (*Line, error) {
+	if raw.OriginalLength < int64(len(raw.Data)) {
+		return nil, fmt.Errorf("original_length smaller than length of data")
+	}
+	originalLength := int(raw.OriginalLength)
+	if int64(originalLength) < raw.OriginalLength {
+		return nil, fmt.Errorf("original_length larger than native int size")
+	}
+	return &Line{
+		Data:           raw.Data,
+		OriginalLength: originalLength,
+	}, nil
+}
+
 // LineBuffer is a io.WriteCloser that will call a given callback every time a line is completed.
 type LineBuffer struct {
 	maxLineLength int
diff --git a/core/pkg/logtree/BUILD.bazel b/core/pkg/logtree/BUILD.bazel
index 68abcfb..7b899a4 100644
--- a/core/pkg/logtree/BUILD.bazel
+++ b/core/pkg/logtree/BUILD.bazel
@@ -15,7 +15,10 @@
     ],
     importpath = "git.monogon.dev/source/nexantic.git/core/pkg/logtree",
     visibility = ["//visibility:public"],
-    deps = ["//core/pkg/logbuffer:go_default_library"],
+    deps = [
+        "//core/pkg/logbuffer:go_default_library",
+        "//core/proto/api:go_default_library",
+    ],
 )
 
 go_test(
diff --git a/core/pkg/logtree/journal.go b/core/pkg/logtree/journal.go
index 893eff0..78c55a1 100644
--- a/core/pkg/logtree/journal.go
+++ b/core/pkg/logtree/journal.go
@@ -17,7 +17,7 @@
 package logtree
 
 import (
-	"fmt"
+	"errors"
 	"strings"
 	"sync"
 )
@@ -27,6 +27,10 @@
 // the root node of the tree.
 type DN string
 
+var (
+	ErrInvalidDN = errors.New("invalid DN")
+)
+
 // Path return the parts of a DN, ie. all the elements of the dot-delimited DN path. For the root node, an empty list
 // will be returned. An error will be returned if the DN is invalid (contains empty parts, eg. `foo..bar`, `.foo` or
 // `foo.`.
@@ -37,7 +41,7 @@
 	parts := strings.Split(string(d), ".")
 	for _, p := range parts {
 		if p == "" {
-			return nil, fmt.Errorf("invalid DN")
+			return nil, ErrInvalidDN
 		}
 	}
 	return parts, nil
diff --git a/core/pkg/logtree/leveled.go b/core/pkg/logtree/leveled.go
index 2c8fcc4..125e1df 100644
--- a/core/pkg/logtree/leveled.go
+++ b/core/pkg/logtree/leveled.go
@@ -16,6 +16,12 @@
 
 package logtree
 
+import (
+	"fmt"
+
+	apb "git.monogon.dev/source/nexantic.git/core/proto/api"
+)
+
 // LeveledLogger is a generic interface for glog-style logging. There are four hardcoded log severities, in increasing
 // order: INFO, WARNING, ERROR, FATAL. Logging at a certain severity level logs not only to consumers expecting data
 // at that severity level, but also all lower severity levels. For example, an ERROR log will also be passed to
@@ -106,3 +112,33 @@
 	}
 	return false
 }
+
+func SeverityFromProto(s apb.LeveledLogSeverity) (Severity, error) {
+	switch s {
+	case apb.LeveledLogSeverity_INFO:
+		return INFO, nil
+	case apb.LeveledLogSeverity_WARNING:
+		return WARNING, nil
+	case apb.LeveledLogSeverity_ERROR:
+		return ERROR, nil
+	case apb.LeveledLogSeverity_FATAL:
+		return FATAL, nil
+	default:
+		return "", fmt.Errorf("unknown severity value %d", s)
+	}
+}
+
+func (s Severity) ToProto() apb.LeveledLogSeverity {
+	switch s {
+	case INFO:
+		return apb.LeveledLogSeverity_INFO
+	case WARNING:
+		return apb.LeveledLogSeverity_WARNING
+	case ERROR:
+		return apb.LeveledLogSeverity_ERROR
+	case FATAL:
+		return apb.LeveledLogSeverity_FATAL
+	default:
+		return apb.LeveledLogSeverity_INVALID
+	}
+}
diff --git a/core/pkg/logtree/logtree.go b/core/pkg/logtree/logtree.go
index 064b6e7..2d405f8 100644
--- a/core/pkg/logtree/logtree.go
+++ b/core/pkg/logtree/logtree.go
@@ -66,6 +66,8 @@
 		tree:     tree,
 		children: make(map[string]*node),
 	}
+	// TODO(q3k): make this limit configurable. If this happens, or the default (1024) gets changes, max chunk size
+	// calculations when serving the logs (eg. in NodeDebugService) must reflect this.
 	n.rawLineBuffer = logbuffer.NewLineBuffer(1024, n.logRaw)
 	return n
 }
diff --git a/core/pkg/logtree/logtree_access.go b/core/pkg/logtree/logtree_access.go
index ee93df2..bb8a524 100644
--- a/core/pkg/logtree/logtree_access.go
+++ b/core/pkg/logtree/logtree_access.go
@@ -17,11 +17,12 @@
 package logtree
 
 import (
+	"errors"
 	"fmt"
-	"log"
 	"sync/atomic"
 
 	"git.monogon.dev/source/nexantic.git/core/pkg/logbuffer"
+	apb "git.monogon.dev/source/nexantic.git/core/proto/api"
 )
 
 // LogReadOption describes options for the LogTree.Read call.
@@ -101,6 +102,79 @@
 	DN DN
 }
 
+// Convert this LogEntry to proto. Returned value may be nil if given LogEntry is invalid, eg. contains neither a Raw
+// nor Leveled entry.
+func (l *LogEntry) Proto() *apb.LogEntry {
+	p := &apb.LogEntry{
+		Dn: string(l.DN),
+	}
+	switch {
+	case l.Leveled != nil:
+		leveled := l.Leveled
+		p.Kind = &apb.LogEntry_Leveled_{
+			Leveled: leveled.Proto(),
+		}
+	case l.Raw != nil:
+		raw := l.Raw
+		p.Kind = &apb.LogEntry_Raw_{
+			Raw: raw.ProtoLog(),
+		}
+	default:
+		return nil
+	}
+	return p
+}
+
+// String returns a standardized human-readable representation of either underlying raw or leveled entry. The returned
+// data is pre-formatted to be displayed in a fixed-width font.
+func (l *LogEntry) String() string {
+	if l.Leveled != nil {
+		// Use glog-like layout, but with supervisor DN instead of filename.
+		timestamp := l.Leveled.Timestamp()
+		_, month, day := timestamp.Date()
+		hour, minute, second := timestamp.Clock()
+		nsec := timestamp.Nanosecond() / 1000
+		return fmt.Sprintf("%s%02d%02d %02d:%02d:%02d.%06d %s] %s", l.Leveled.Severity(), month, day, hour, minute, second, nsec, l.DN, l.Leveled.Message())
+	}
+	if l.Raw != nil {
+		return fmt.Sprintf("%-32s R %s", l.DN, l.Raw)
+	}
+	return "INVALID"
+}
+
+// Parse a proto LogEntry back into internal structure. This can be used in log proto API consumers to easily print
+// received log entries.
+func LogEntryFromProto(l *apb.LogEntry) (*LogEntry, error) {
+	dn := DN(l.Dn)
+	if _, err := dn.Path(); err != nil {
+		return nil, fmt.Errorf("could not convert DN: %w", err)
+	}
+	res := &LogEntry{
+		DN: dn,
+	}
+	switch inner := l.Kind.(type) {
+	case *apb.LogEntry_Leveled_:
+		leveled, err := LeveledPayloadFromProto(inner.Leveled)
+		if err != nil {
+			return nil, fmt.Errorf("could not convert leveled entry: %w", err)
+		}
+		res.Leveled = leveled
+	case *apb.LogEntry_Raw_:
+		line, err := logbuffer.LineFromLogProto(inner.Raw)
+		if err != nil {
+			return nil, fmt.Errorf("could not convert raw entry: %w", err)
+		}
+		res.Raw = line
+	default:
+		return nil, fmt.Errorf("proto has neither Leveled nor Raw set")
+	}
+	return res, nil
+}
+
+var (
+	ErrRawAndLeveled = errors.New("cannot return logs that are simultaneously OnlyRaw and OnlyLeveled")
+)
+
 // Read and/or stream entries from a LogTree. The returned LogReader is influenced by the LogReadOptions passed, which
 // influence whether the Read will return existing entries, a stream, or both. In addition the options also dictate
 // whether only entries for that particular DN are returned, or for all sub-DNs as well.
@@ -136,7 +210,7 @@
 	}
 
 	if onlyLeveled && onlyRaw {
-		return nil, fmt.Errorf("cannot return logs that are simultaneously OnlyRaw and OnlyLeveled")
+		return nil, ErrRawAndLeveled
 	}
 
 	var filters []filter
@@ -182,7 +256,6 @@
 	lr := &LogReader{}
 	lr.Backlog = make([]*LogEntry, len(entries))
 	for i, entry := range entries {
-		log.Printf("backlog %d %+v %+v", i, entry.raw, entry.leveled)
 		lr.Backlog[i] = entry.external()
 	}
 	if stream {
diff --git a/core/pkg/logtree/logtree_publisher.go b/core/pkg/logtree/logtree_publisher.go
index 3a81ec6..c898012 100644
--- a/core/pkg/logtree/logtree_publisher.go
+++ b/core/pkg/logtree/logtree_publisher.go
@@ -91,6 +91,7 @@
 			file = file[slash+1:]
 		}
 	}
+
 	p := &LeveledPayload{
 		timestamp: time.Now(),
 		severity:  severity,
diff --git a/core/pkg/logtree/payload.go b/core/pkg/logtree/payload.go
index 2d64a7a..ca7a0a0 100644
--- a/core/pkg/logtree/payload.go
+++ b/core/pkg/logtree/payload.go
@@ -18,7 +18,11 @@
 
 import (
 	"fmt"
+	"strconv"
+	"strings"
 	"time"
+
+	apb "git.monogon.dev/source/nexantic.git/core/proto/api"
 )
 
 // LeveledPayload is a log entry for leveled logs (as per leveled.go). It contains not only the log message itself and
@@ -62,3 +66,37 @@
 
 // Severity returns the Severity with which this entry was logged.
 func (p *LeveledPayload) Severity() Severity { return p.severity }
+
+// Proto converts a LeveledPayload to protobuf format.
+func (p *LeveledPayload) Proto() *apb.LogEntry_Leveled {
+	return &apb.LogEntry_Leveled{
+		Message:   p.Message(),
+		Timestamp: p.Timestamp().UnixNano(),
+		Severity:  p.Severity().ToProto(),
+		Location:  p.Location(),
+	}
+}
+
+// LeveledPayloadFromProto parses a protobuf message into the internal format.
+func LeveledPayloadFromProto(p *apb.LogEntry_Leveled) (*LeveledPayload, error) {
+	severity, err := SeverityFromProto(p.Severity)
+	if err != nil {
+		return nil, fmt.Errorf("could not convert severity: %w", err)
+	}
+	parts := strings.Split(p.Location, ":")
+	if len(parts) != 2 {
+		return nil, fmt.Errorf("invalid location, must be two :-delimited parts, is %d parts", len(parts))
+	}
+	file := parts[0]
+	line, err := strconv.Atoi(parts[1])
+	if err != nil {
+		return nil, fmt.Errorf("invalid location line number: %w", err)
+	}
+	return &LeveledPayload{
+		message:   p.Message,
+		timestamp: time.Unix(0, p.Timestamp),
+		severity:  severity,
+		file:      file,
+		line:      line,
+	}, nil
+}
diff --git a/core/proto/api/debug.proto b/core/proto/api/debug.proto
index 74d314a..b0bbb57 100644
--- a/core/proto/api/debug.proto
+++ b/core/proto/api/debug.proto
@@ -26,8 +26,22 @@
 service NodeDebugService {
     // GetDebugKubeconfig issues kubeconfigs with arbitrary identities and groups for debugging
     rpc GetDebugKubeconfig(GetDebugKubeconfigRequest) returns (GetDebugKubeconfigResponse);
-    // GetComponentLogs dumps various log ringbuffers for binaries that we run.
-    rpc GetComponentLogs(GetComponentLogsRequest) returns (GetComponentLogsResponse);
+
+    // GetLogs Returns historical and/or streaming logs for a given DN with given filters from the system global
+    // LogTree.
+    //
+    // For more information about this API, see //core/pkg/logtree. But, in summary:
+    //   - All logging is performed to a DN (distinguished name), which is a dot-delimited string like foo.bar.baz.
+    //   - Log entries can be either raw (coming from unstructured logging from an external service, like a running
+    //     process) or leveled (emitted by Smalltown code with a source line, timestamp, and severity).
+    //   - The DNs form a tree of logging nodes - and when requesting logs, a given subtree of DNs can be requested,
+    //     instead of just a given DN.
+    //   - All supervised processes live at `root.<supervisor DN>`. For more example paths, see the console logs of
+    //     a running Smalltown instance, or request all logs (at DN "").
+    //
+    // TODO(q3k): move method and its related messages to the non-debug node endpoint once we have one.
+    rpc GetLogs(GetLogsRequest) returns (stream GetLogsResponse);
+
     // GetGoldenTicket requests a 'golden ticket' which can be used to enroll any node into the cluster.
     // This bypasses integrity checks.
     rpc GetGoldenTicket(GetGoldenTicketRequest) returns (GetGoldenTicketResponse);
@@ -43,14 +57,100 @@
     string debug_kubeconfig = 1;
 }
 
-message GetComponentLogsRequest {
-    // For supported paths see core/internal/node/debug.go
-    repeated string component_path = 1;
-    uint32 tail_lines = 2; // 0 = whole ring buffer
+// Severity level corresponding to //core/pkg/logtree.Severity.
+enum LeveledLogSeverity {
+    INVALID = 0;
+    INFO = 1;
+    WARNING = 2;
+    ERROR = 3;
+    FATAL = 4;
 }
 
-message GetComponentLogsResponse {
-    repeated string line = 1;
+// Filter set when requesting logs for a given DN. This message is equivalent to the following GADT enum:
+// data LogFilter = WithChildren
+//                | OnlyRaw
+//                | OnlyLeveled
+//                | LeveledWithMinimumSeverity(Severity)
+//
+// Multiple LogFilters can be chained/combined when requesting logs, as long as they do not conflict.
+message LogFilter {
+    // Entries will be returned not only for the given DN, but all child DNs as well. For instance, if the
+    // requested DN is foo, entries logged to foo, foo.bar and foo.bar.baz will all be returned.
+    message WithChildren {
+    }
+    // Only raw logging entries will be returned. Conflicts with OnlyLeveled filters.
+    message OnlyRaw {
+    }
+    // Only leveled logging entries will be returned. Conflicts with OnlyRaw filters.
+    message OnlyLeveled {
+    }
+    // If leveled logs are returned, all entries at severity lower than `minimum` will be discarded.
+    message LeveledWithMinimumSeverity {
+        LeveledLogSeverity minimum = 1;
+    }
+    oneof filter {
+        WithChildren with_children = 1;
+        OnlyRaw only_raw = 3;
+        OnlyLeveled only_leveled = 4;
+        LeveledWithMinimumSeverity leveled_with_minimum_severity = 5;
+    }
+}
+
+message GetLogsRequest {
+    // DN from which to request logs. All supervised runnables live at `root.`, the init code lives at `init.`.
+    string dn = 1;
+    // Filters to apply to returned data.
+    repeated LogFilter filters = 2;
+
+    enum BacklogMode {
+        BACKLOG_INVALID = 0;
+        // No historic data will be returned.
+        BACKLOG_DISABLE = 1;
+        // All available historic data will be returned.
+        BACKLOG_ALL = 2;
+        // At most backlog_count entries will be returned, if available.
+        BACKLOG_COUNT = 3;
+    }
+    BacklogMode backlog_mode = 3;
+    int64 backlog_count = 4;
+
+    enum StreamMode {
+        STREAM_INVALID = 0;
+        // No streaming entries, gRPC stream will be closed as soon as all backlog data is served.
+        STREAM_DISABLE = 1;
+        // Entries will be streamed as early as available right after all backlog data is served.
+        STREAM_UNBUFFERED = 2;
+    }
+    StreamMode stream_mode = 5;
+}
+
+message GetLogsResponse {
+    // Entries from the requested historical entries (via WithBackLog). They will all be served before the first
+    // stream_entries are served (if any).
+    repeated LogEntry backlog_entries = 1;
+    // Entries streamed as they arrive. Currently no server-side buffering is enabled, instead every line is served
+    // as early as it arrives. However, this might change in the future, so this behaviour cannot be depended
+    // upon.
+    repeated LogEntry stream_entries = 2;
+}
+
+message LogEntry {
+    message Leveled {
+        string message = 1;
+        int64 timestamp = 2;
+        LeveledLogSeverity severity = 3;
+        string location = 4;
+    }
+    message Raw {
+        string data = 1;
+        int64 original_length = 2;
+    }
+
+    string dn = 1;
+    oneof kind {
+        Leveled leveled = 2;
+        Raw raw = 3;
+    }
 }
 
 message GetGoldenTicketRequest {