core -> metropolis

Smalltown is now called Metropolis!

This is the first commit in a series of cleanup commits that prepare us
for an open source release. This one just some Bazel packages around to
follow a stricter directory layout.

All of Metropolis now lives in `//metropolis`.

All of Metropolis Node code now lives in `//metropolis/node`.

All of the main /init now lives in `//m/n/core`.

All of the Kubernetes functionality/glue now lives in `//m/n/kubernetes`.

Next steps:
     - hunt down all references to Smalltown and replace them appropriately
     - narrow down visibility rules
     - document new code organization
     - move `//build/toolchain` to `//monogon/build/toolchain`
     - do another cleanup pass between `//golibs` and
       `//monogon/node/{core,common}`.
     - remove `//delta` and `//anubis`

Fixes T799.

Test Plan: Just a very large refactor. CI should help us out here.

Bug: T799

X-Origin-Diff: phab/D667
GitOrigin-RevId: 6029b8d4edc42325d50042596b639e8b122d0ded
diff --git a/metropolis/node/core/debug_service.go b/metropolis/node/core/debug_service.go
new file mode 100644
index 0000000..0155cc6
--- /dev/null
+++ b/metropolis/node/core/debug_service.go
@@ -0,0 +1,243 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+	"context"
+	"crypto/x509"
+	"fmt"
+	"net"
+
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+
+	common "git.monogon.dev/source/nexantic.git/metropolis/node"
+	"git.monogon.dev/source/nexantic.git/metropolis/node/core/cluster"
+	"git.monogon.dev/source/nexantic.git/metropolis/node/core/consensus/ca"
+	"git.monogon.dev/source/nexantic.git/metropolis/node/core/logtree"
+	"git.monogon.dev/source/nexantic.git/metropolis/node/kubernetes"
+	apb "git.monogon.dev/source/nexantic.git/metropolis/proto/api"
+)
+
+const (
+	logFilterMax = 1000
+)
+
+// debugService implements the Smalltown node debug API.
+type debugService struct {
+	cluster    *cluster.Manager
+	kubernetes *kubernetes.Service
+	logtree    *logtree.LogTree
+}
+
+func (s *debugService) GetGoldenTicket(ctx context.Context, req *apb.GetGoldenTicketRequest) (*apb.GetGoldenTicketResponse, error) {
+	ip := net.ParseIP(req.ExternalIp)
+	if ip == nil {
+		return nil, status.Errorf(codes.InvalidArgument, "could not parse IP %q", req.ExternalIp)
+	}
+	this := s.cluster.Node()
+
+	certRaw, key, err := s.nodeCertificate()
+	if err != nil {
+		return nil, status.Errorf(codes.Unavailable, "failed to generate node certificate: %v", err)
+	}
+	cert, err := x509.ParseCertificate(certRaw)
+	if err != nil {
+		panic(err)
+	}
+	kv := s.cluster.ConsensusKVRoot()
+	ca, err := ca.Load(ctx, kv)
+	if err != nil {
+		return nil, status.Errorf(codes.Unavailable, "could not load CA: %v", err)
+	}
+	etcdCert, etcdKey, err := ca.Issue(ctx, kv, cert.Subject.CommonName, ip)
+	if err != nil {
+		return nil, status.Errorf(codes.Unavailable, "could not generate etcd peer certificate: %v", err)
+	}
+	etcdCRL, err := ca.GetCurrentCRL(ctx, kv)
+	if err != nil {
+		return nil, status.Errorf(codes.Unavailable, "could not get etcd CRL: %v", err)
+	}
+
+	// Add new etcd member to etcd cluster.
+	etcd := s.cluster.ConsensusCluster()
+	etcdAddr := fmt.Sprintf("https://%s:%d", ip.String(), common.ConsensusPort)
+	_, err = etcd.MemberAddAsLearner(ctx, []string{etcdAddr})
+	if err != nil {
+		return nil, status.Errorf(codes.Unavailable, "could not add as new etcd consensus member: %v", err)
+	}
+
+	return &apb.GetGoldenTicketResponse{
+		Ticket: &apb.GoldenTicket{
+			EtcdCaCert:     ca.CACertRaw,
+			EtcdClientCert: etcdCert,
+			EtcdClientKey:  etcdKey,
+			EtcdCrl:        etcdCRL,
+			Peers: []*apb.GoldenTicket_EtcdPeer{
+				{Name: this.ID(), Address: this.Address().String()},
+			},
+			This: &apb.GoldenTicket_EtcdPeer{Name: cert.Subject.CommonName, Address: ip.String()},
+
+			NodeId:   cert.Subject.CommonName,
+			NodeCert: certRaw,
+			NodeKey:  key,
+		},
+	}, nil
+}
+
+func (s *debugService) GetDebugKubeconfig(ctx context.Context, req *apb.GetDebugKubeconfigRequest) (*apb.GetDebugKubeconfigResponse, error) {
+	return s.kubernetes.GetDebugKubeconfig(ctx, req)
+}
+
+func (s *debugService) GetLogs(req *apb.GetLogsRequest, srv apb.NodeDebugService_GetLogsServer) error {
+	if len(req.Filters) > logFilterMax {
+		return status.Errorf(codes.InvalidArgument, "requested %d filters, maximum permitted is %d", len(req.Filters), logFilterMax)
+	}
+	dn := logtree.DN(req.Dn)
+	_, err := dn.Path()
+	switch err {
+	case nil:
+	case logtree.ErrInvalidDN:
+		return status.Errorf(codes.InvalidArgument, "invalid DN")
+	default:
+		return status.Errorf(codes.Unavailable, "could not parse DN: %v", err)
+	}
+
+	var options []logtree.LogReadOption
+
+	// Turn backlog mode into logtree option(s).
+	switch req.BacklogMode {
+	case apb.GetLogsRequest_BACKLOG_DISABLE:
+	case apb.GetLogsRequest_BACKLOG_ALL:
+		options = append(options, logtree.WithBacklog(logtree.BacklogAllAvailable))
+	case apb.GetLogsRequest_BACKLOG_COUNT:
+		count := int(req.BacklogCount)
+		if count <= 0 {
+			return status.Errorf(codes.InvalidArgument, "backlog_count must be > 0 if backlog_mode is BACKLOG_COUNT")
+		}
+		options = append(options, logtree.WithBacklog(count))
+	default:
+		return status.Errorf(codes.InvalidArgument, "unknown backlog_mode %d", req.BacklogMode)
+	}
+
+	// Turn stream mode into logtree option(s).
+	streamEnable := false
+	switch req.StreamMode {
+	case apb.GetLogsRequest_STREAM_DISABLE:
+	case apb.GetLogsRequest_STREAM_UNBUFFERED:
+		streamEnable = true
+		options = append(options, logtree.WithStream())
+	}
+
+	// Parse proto filters into logtree options.
+	for i, filter := range req.Filters {
+		switch inner := filter.Filter.(type) {
+		case *apb.LogFilter_WithChildren_:
+			options = append(options, logtree.WithChildren())
+		case *apb.LogFilter_OnlyRaw_:
+			options = append(options, logtree.OnlyRaw())
+		case *apb.LogFilter_OnlyLeveled_:
+			options = append(options, logtree.OnlyLeveled())
+		case *apb.LogFilter_LeveledWithMinimumSeverity_:
+			severity, err := logtree.SeverityFromProto(inner.LeveledWithMinimumSeverity.Minimum)
+			if err != nil {
+				return status.Errorf(codes.InvalidArgument, "filter %d has invalid severity: %v", i, err)
+			}
+			options = append(options, logtree.LeveledWithMinimumSeverity(severity))
+		}
+	}
+
+	reader, err := s.logtree.Read(logtree.DN(req.Dn), options...)
+	switch err {
+	case nil:
+	case logtree.ErrRawAndLeveled:
+		return status.Errorf(codes.InvalidArgument, "requested only raw and only leveled logs simultaneously")
+	default:
+		return status.Errorf(codes.Unavailable, "could not retrieve logs: %v", err)
+	}
+	defer reader.Close()
+
+	// Default protobuf message size limit is 64MB. We want to limit ourselves
+	// to 10MB.
+	// Currently each raw log line can be at most 1024 unicode codepoints (or
+	// 4096 bytes). To cover extra metadata and proto overhead, let's round
+	// this up to 4500 bytes. This in turn means we can store a maximum of
+	// (10e6/4500) == 2222 entries.
+	// Currently each leveled log line can also be at most 1024 unicode
+	// codepoints (or 4096 bytes). To cover extra metadata and proto overhead
+	// let's round this up to 2000 bytes. This in turn means we can store a
+	// maximum of (10e6/5000) == 2000 entries.
+	// The lowever of these numbers, ie the worst case scenario, is 2000
+	// maximum entries.
+	maxChunkSize := 2000
+
+	// Serve all backlog entries in chunks.
+	chunk := make([]*apb.LogEntry, 0, maxChunkSize)
+	for _, entry := range reader.Backlog {
+		p := entry.Proto()
+		if p == nil {
+			// TODO(q3k): log this once we have logtree/gRPC compatibility.
+			continue
+		}
+		chunk = append(chunk, p)
+
+		if len(chunk) >= maxChunkSize {
+			err := srv.Send(&apb.GetLogsResponse{
+				BacklogEntries: chunk,
+			})
+			if err != nil {
+				return err
+			}
+			chunk = make([]*apb.LogEntry, 0, maxChunkSize)
+		}
+	}
+
+	// Send last chunk of backlog, if present..
+	if len(chunk) > 0 {
+		err := srv.Send(&apb.GetLogsResponse{
+			BacklogEntries: chunk,
+		})
+		if err != nil {
+			return err
+		}
+		chunk = make([]*apb.LogEntry, 0, maxChunkSize)
+	}
+
+	// Start serving streaming data, if streaming has been requested.
+	if !streamEnable {
+		return nil
+	}
+
+	for {
+		entry, ok := <-reader.Stream
+		if !ok {
+			// Streaming has been ended by logtree - tell the client and return.
+			return status.Error(codes.Unavailable, "log streaming aborted by system")
+		}
+		p := entry.Proto()
+		if p == nil {
+			// TODO(q3k): log this once we have logtree/gRPC compatibility.
+			continue
+		}
+		err := srv.Send(&apb.GetLogsResponse{
+			StreamEntries: []*apb.LogEntry{p},
+		})
+		if err != nil {
+			return err
+		}
+	}
+}