core: plug logtree into NodeDebugService

This introduces a new Proto API for accessing debug logs. Currently this
is implemented to be used by the debug service. However, these proto
definitions will likely be reused for production cluster APIs.

The implementation mostly consists of adding the proto, implementing
to/from conversion methods, and altering the debug service to use the
new API.

We also move all of the debug service implementation into a separate file,
to slightly clean up main.go. This produces an unfortunately colorful
diff, but it's just moving code around.

Test Plan: Manually tested using the dbg tool. We currently don't properly test the debug service. I suppose we should do that for the production cluster APIs, and just keep on going for now.

X-Origin-Diff: phab/D649
GitOrigin-RevId: ac454681e4b72b2876e313b3aeababa179eb1fa3
diff --git a/core/cmd/init/debug_service.go b/core/cmd/init/debug_service.go
index faa0135..6cb9620 100644
--- a/core/cmd/init/debug_service.go
+++ b/core/cmd/init/debug_service.go
@@ -18,31 +18,226 @@
 
 import (
 	"context"
-
-	"git.monogon.dev/source/nexantic.git/core/internal/cluster"
-	"git.monogon.dev/source/nexantic.git/core/internal/containerd"
-	"git.monogon.dev/source/nexantic.git/core/internal/kubernetes"
-	apb "git.monogon.dev/source/nexantic.git/core/proto/api"
+	"crypto/x509"
+	"fmt"
+	"net"
 
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
+
+	"git.monogon.dev/source/nexantic.git/core/internal/cluster"
+	"git.monogon.dev/source/nexantic.git/core/internal/common"
+	"git.monogon.dev/source/nexantic.git/core/internal/consensus/ca"
+	"git.monogon.dev/source/nexantic.git/core/internal/kubernetes"
+	"git.monogon.dev/source/nexantic.git/core/pkg/logtree"
+	apb "git.monogon.dev/source/nexantic.git/core/proto/api"
+)
+
+const (
+	logFilterMax = 1000
 )
 
 // debugService implements the Smalltown node debug API.
-// TODO(q3k): this should probably be implemented somewhere else way once we have a better
-// supervision introspection/status API.
 type debugService struct {
 	cluster    *cluster.Manager
 	kubernetes *kubernetes.Service
-	containerd *containerd.Service
+	logtree    *logtree.LogTree
+}
+
+func (s *debugService) GetGoldenTicket(ctx context.Context, req *apb.GetGoldenTicketRequest) (*apb.GetGoldenTicketResponse, error) {
+	ip := net.ParseIP(req.ExternalIp)
+	if ip == nil {
+		return nil, status.Errorf(codes.InvalidArgument, "could not parse IP %q", req.ExternalIp)
+	}
+	this := s.cluster.Node()
+
+	certRaw, key, err := s.nodeCertificate()
+	if err != nil {
+		return nil, status.Errorf(codes.Unavailable, "failed to generate node certificate: %v", err)
+	}
+	cert, err := x509.ParseCertificate(certRaw)
+	if err != nil {
+		panic(err)
+	}
+	kv := s.cluster.ConsensusKVRoot()
+	ca, err := ca.Load(ctx, kv)
+	if err != nil {
+		return nil, status.Errorf(codes.Unavailable, "could not load CA: %v", err)
+	}
+	etcdCert, etcdKey, err := ca.Issue(ctx, kv, cert.Subject.CommonName, ip)
+	if err != nil {
+		return nil, status.Errorf(codes.Unavailable, "could not generate etcd peer certificate: %v", err)
+	}
+	etcdCRL, err := ca.GetCurrentCRL(ctx, kv)
+	if err != nil {
+		return nil, status.Errorf(codes.Unavailable, "could not get etcd CRL: %v", err)
+	}
+
+	// Add new etcd member to etcd cluster.
+	etcd := s.cluster.ConsensusCluster()
+	etcdAddr := fmt.Sprintf("https://%s:%d", ip.String(), common.ConsensusPort)
+	_, err = etcd.MemberAddAsLearner(ctx, []string{etcdAddr})
+	if err != nil {
+		return nil, status.Errorf(codes.Unavailable, "could not add as new etcd consensus member: %v", err)
+	}
+
+	return &apb.GetGoldenTicketResponse{
+		Ticket: &apb.GoldenTicket{
+			EtcdCaCert:     ca.CACertRaw,
+			EtcdClientCert: etcdCert,
+			EtcdClientKey:  etcdKey,
+			EtcdCrl:        etcdCRL,
+			Peers: []*apb.GoldenTicket_EtcdPeer{
+				{Name: this.ID(), Address: this.Address().String()},
+			},
+			This: &apb.GoldenTicket_EtcdPeer{Name: cert.Subject.CommonName, Address: ip.String()},
+
+			NodeId:   cert.Subject.CommonName,
+			NodeCert: certRaw,
+			NodeKey:  key,
+		},
+	}, nil
 }
 
 func (s *debugService) GetDebugKubeconfig(ctx context.Context, req *apb.GetDebugKubeconfigRequest) (*apb.GetDebugKubeconfigResponse, error) {
 	return s.kubernetes.GetDebugKubeconfig(ctx, req)
 }
 
-// GetComponentLogs gets various logbuffers from binaries we call. This function just deals with the first path component,
-// delegating the rest to the service-specific handlers.
-func (s *debugService) GetComponentLogs(ctx context.Context, req *apb.GetComponentLogsRequest) (*apb.GetComponentLogsResponse, error) {
-	return nil, status.Error(codes.Unimplemented, "unimplemented")
+func (s *debugService) GetLogs(req *apb.GetLogsRequest, srv apb.NodeDebugService_GetLogsServer) error {
+	if len(req.Filters) > logFilterMax {
+		return status.Errorf(codes.InvalidArgument, "requested %d filters, maximum permitted is %d", len(req.Filters), logFilterMax)
+	}
+	dn := logtree.DN(req.Dn)
+	_, err := dn.Path()
+	switch err {
+	case nil:
+	case logtree.ErrInvalidDN:
+		return status.Errorf(codes.InvalidArgument, "invalid DN")
+	default:
+		return status.Errorf(codes.Unavailable, "could not parse DN: %v", err)
+	}
+
+	var options []logtree.LogReadOption
+
+	// Turn backlog mode into logtree option(s).
+	switch req.BacklogMode {
+	case apb.GetLogsRequest_BACKLOG_DISABLE:
+	case apb.GetLogsRequest_BACKLOG_ALL:
+		options = append(options, logtree.WithBacklog(logtree.BacklogAllAvailable))
+	case apb.GetLogsRequest_BACKLOG_COUNT:
+		count := int(req.BacklogCount)
+		if count <= 0 {
+			return status.Errorf(codes.InvalidArgument, "backlog_count must be > 0 if backlog_mode is BACKLOG_COUNT")
+		}
+		options = append(options, logtree.WithBacklog(count))
+	default:
+		return status.Errorf(codes.InvalidArgument, "unknown backlog_mode %d", req.BacklogMode)
+	}
+
+	// Turn stream mode into logtree option(s).
+	streamEnable := false
+	switch req.StreamMode {
+	case apb.GetLogsRequest_STREAM_DISABLE:
+	case apb.GetLogsRequest_STREAM_UNBUFFERED:
+		streamEnable = true
+		options = append(options, logtree.WithStream())
+	}
+
+	// Parse proto filters into logtree options.
+	for i, filter := range req.Filters {
+		switch inner := filter.Filter.(type) {
+		case *apb.LogFilter_WithChildren_:
+			options = append(options, logtree.WithChildren())
+		case *apb.LogFilter_OnlyRaw_:
+			options = append(options, logtree.OnlyRaw())
+		case *apb.LogFilter_OnlyLeveled_:
+			options = append(options, logtree.OnlyLeveled())
+		case *apb.LogFilter_LeveledWithMinimumSeverity_:
+			severity, err := logtree.SeverityFromProto(inner.LeveledWithMinimumSeverity.Minimum)
+			if err != nil {
+				return status.Errorf(codes.InvalidArgument, "filter %d has invalid severity: %v", i, err)
+			}
+			options = append(options, logtree.LeveledWithMinimumSeverity(severity))
+		}
+	}
+
+	reader, err := s.logtree.Read(logtree.DN(req.Dn), options...)
+	switch err {
+	case nil:
+	case logtree.ErrRawAndLeveled:
+		return status.Errorf(codes.InvalidArgument, "requested only raw and only leveled logs simultaneously")
+	default:
+		return status.Errorf(codes.Unavailable, "could not retrieve logs: %v", err)
+	}
+	defer reader.Close()
+
+	// Default protobuf message size limit is 64MB. We want to limit ourselves
+	// to 10MB.
+	// Currently each raw log line can be at most 1024 unicode codepoints (or
+	// 4096 bytes). To cover extra metadata and proto overhead, let's round
+	// this up to 4500 bytes. This in turn means we can store a maximum of
+	// (10e6/4500) == 2222 entries.
+	// Currently each leveled log line can also be at most 1024 unicode
+	// codepoints (or 4096 bytes). To cover extra metadata and proto overhead
+	// let's round this up to 2000 bytes. This in turn means we can store a
+	// maximum of (10e6/5000) == 2000 entries.
+	// The lowever of these numbers, ie the worst case scenario, is 2000
+	// maximum entries.
+	maxChunkSize := 2000
+
+	// Serve all backlog entries in chunks.
+	chunk := make([]*apb.LogEntry, 0, maxChunkSize)
+	for _, entry := range reader.Backlog {
+		p := entry.Proto()
+		if p == nil {
+			// TODO(q3k): log this once we have logtree/gRPC compatibility.
+			continue
+		}
+		chunk = append(chunk, p)
+
+		if len(chunk) >= maxChunkSize {
+			err := srv.Send(&apb.GetLogsResponse{
+				BacklogEntries: chunk,
+			})
+			if err != nil {
+				return err
+			}
+			chunk = make([]*apb.LogEntry, 0, maxChunkSize)
+		}
+	}
+
+	// Send last chunk of backlog, if present..
+	if len(chunk) > 0 {
+		err := srv.Send(&apb.GetLogsResponse{
+			BacklogEntries: chunk,
+		})
+		if err != nil {
+			return err
+		}
+		chunk = make([]*apb.LogEntry, 0, maxChunkSize)
+	}
+
+	// Start serving streaming data, if streaming has been requested.
+	if !streamEnable {
+		return nil
+	}
+
+	for {
+		entry, ok := <-reader.Stream
+		if !ok {
+			// Streaming has been ended by logtree - tell the client and return.
+			return status.Error(codes.Unavailable, "log streaming aborted by system")
+		}
+		p := entry.Proto()
+		if p == nil {
+			// TODO(q3k): log this once we have logtree/gRPC compatibility.
+			continue
+		}
+		err := srv.Send(&apb.GetLogsResponse{
+			StreamEntries: []*apb.LogEntry{p},
+		})
+		if err != nil {
+			return err
+		}
+	}
 }