core -> metropolis
Smalltown is now called Metropolis!
This is the first commit in a series of cleanup commits that prepare us
for an open source release. This one just some Bazel packages around to
follow a stricter directory layout.
All of Metropolis now lives in `//metropolis`.
All of Metropolis Node code now lives in `//metropolis/node`.
All of the main /init now lives in `//m/n/core`.
All of the Kubernetes functionality/glue now lives in `//m/n/kubernetes`.
Next steps:
- hunt down all references to Smalltown and replace them appropriately
- narrow down visibility rules
- document new code organization
- move `//build/toolchain` to `//monogon/build/toolchain`
- do another cleanup pass between `//golibs` and
`//monogon/node/{core,common}`.
- remove `//delta` and `//anubis`
Fixes T799.
Test Plan: Just a very large refactor. CI should help us out here.
Bug: T799
X-Origin-Diff: phab/D667
GitOrigin-RevId: 6029b8d4edc42325d50042596b639e8b122d0ded
diff --git a/metropolis/node/core/debug_service.go b/metropolis/node/core/debug_service.go
new file mode 100644
index 0000000..0155cc6
--- /dev/null
+++ b/metropolis/node/core/debug_service.go
@@ -0,0 +1,243 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package main
+
+import (
+ "context"
+ "crypto/x509"
+ "fmt"
+ "net"
+
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+
+ common "git.monogon.dev/source/nexantic.git/metropolis/node"
+ "git.monogon.dev/source/nexantic.git/metropolis/node/core/cluster"
+ "git.monogon.dev/source/nexantic.git/metropolis/node/core/consensus/ca"
+ "git.monogon.dev/source/nexantic.git/metropolis/node/core/logtree"
+ "git.monogon.dev/source/nexantic.git/metropolis/node/kubernetes"
+ apb "git.monogon.dev/source/nexantic.git/metropolis/proto/api"
+)
+
+const (
+ logFilterMax = 1000
+)
+
+// debugService implements the Smalltown node debug API.
+type debugService struct {
+ cluster *cluster.Manager
+ kubernetes *kubernetes.Service
+ logtree *logtree.LogTree
+}
+
+func (s *debugService) GetGoldenTicket(ctx context.Context, req *apb.GetGoldenTicketRequest) (*apb.GetGoldenTicketResponse, error) {
+ ip := net.ParseIP(req.ExternalIp)
+ if ip == nil {
+ return nil, status.Errorf(codes.InvalidArgument, "could not parse IP %q", req.ExternalIp)
+ }
+ this := s.cluster.Node()
+
+ certRaw, key, err := s.nodeCertificate()
+ if err != nil {
+ return nil, status.Errorf(codes.Unavailable, "failed to generate node certificate: %v", err)
+ }
+ cert, err := x509.ParseCertificate(certRaw)
+ if err != nil {
+ panic(err)
+ }
+ kv := s.cluster.ConsensusKVRoot()
+ ca, err := ca.Load(ctx, kv)
+ if err != nil {
+ return nil, status.Errorf(codes.Unavailable, "could not load CA: %v", err)
+ }
+ etcdCert, etcdKey, err := ca.Issue(ctx, kv, cert.Subject.CommonName, ip)
+ if err != nil {
+ return nil, status.Errorf(codes.Unavailable, "could not generate etcd peer certificate: %v", err)
+ }
+ etcdCRL, err := ca.GetCurrentCRL(ctx, kv)
+ if err != nil {
+ return nil, status.Errorf(codes.Unavailable, "could not get etcd CRL: %v", err)
+ }
+
+ // Add new etcd member to etcd cluster.
+ etcd := s.cluster.ConsensusCluster()
+ etcdAddr := fmt.Sprintf("https://%s:%d", ip.String(), common.ConsensusPort)
+ _, err = etcd.MemberAddAsLearner(ctx, []string{etcdAddr})
+ if err != nil {
+ return nil, status.Errorf(codes.Unavailable, "could not add as new etcd consensus member: %v", err)
+ }
+
+ return &apb.GetGoldenTicketResponse{
+ Ticket: &apb.GoldenTicket{
+ EtcdCaCert: ca.CACertRaw,
+ EtcdClientCert: etcdCert,
+ EtcdClientKey: etcdKey,
+ EtcdCrl: etcdCRL,
+ Peers: []*apb.GoldenTicket_EtcdPeer{
+ {Name: this.ID(), Address: this.Address().String()},
+ },
+ This: &apb.GoldenTicket_EtcdPeer{Name: cert.Subject.CommonName, Address: ip.String()},
+
+ NodeId: cert.Subject.CommonName,
+ NodeCert: certRaw,
+ NodeKey: key,
+ },
+ }, nil
+}
+
+func (s *debugService) GetDebugKubeconfig(ctx context.Context, req *apb.GetDebugKubeconfigRequest) (*apb.GetDebugKubeconfigResponse, error) {
+ return s.kubernetes.GetDebugKubeconfig(ctx, req)
+}
+
+func (s *debugService) GetLogs(req *apb.GetLogsRequest, srv apb.NodeDebugService_GetLogsServer) error {
+ if len(req.Filters) > logFilterMax {
+ return status.Errorf(codes.InvalidArgument, "requested %d filters, maximum permitted is %d", len(req.Filters), logFilterMax)
+ }
+ dn := logtree.DN(req.Dn)
+ _, err := dn.Path()
+ switch err {
+ case nil:
+ case logtree.ErrInvalidDN:
+ return status.Errorf(codes.InvalidArgument, "invalid DN")
+ default:
+ return status.Errorf(codes.Unavailable, "could not parse DN: %v", err)
+ }
+
+ var options []logtree.LogReadOption
+
+ // Turn backlog mode into logtree option(s).
+ switch req.BacklogMode {
+ case apb.GetLogsRequest_BACKLOG_DISABLE:
+ case apb.GetLogsRequest_BACKLOG_ALL:
+ options = append(options, logtree.WithBacklog(logtree.BacklogAllAvailable))
+ case apb.GetLogsRequest_BACKLOG_COUNT:
+ count := int(req.BacklogCount)
+ if count <= 0 {
+ return status.Errorf(codes.InvalidArgument, "backlog_count must be > 0 if backlog_mode is BACKLOG_COUNT")
+ }
+ options = append(options, logtree.WithBacklog(count))
+ default:
+ return status.Errorf(codes.InvalidArgument, "unknown backlog_mode %d", req.BacklogMode)
+ }
+
+ // Turn stream mode into logtree option(s).
+ streamEnable := false
+ switch req.StreamMode {
+ case apb.GetLogsRequest_STREAM_DISABLE:
+ case apb.GetLogsRequest_STREAM_UNBUFFERED:
+ streamEnable = true
+ options = append(options, logtree.WithStream())
+ }
+
+ // Parse proto filters into logtree options.
+ for i, filter := range req.Filters {
+ switch inner := filter.Filter.(type) {
+ case *apb.LogFilter_WithChildren_:
+ options = append(options, logtree.WithChildren())
+ case *apb.LogFilter_OnlyRaw_:
+ options = append(options, logtree.OnlyRaw())
+ case *apb.LogFilter_OnlyLeveled_:
+ options = append(options, logtree.OnlyLeveled())
+ case *apb.LogFilter_LeveledWithMinimumSeverity_:
+ severity, err := logtree.SeverityFromProto(inner.LeveledWithMinimumSeverity.Minimum)
+ if err != nil {
+ return status.Errorf(codes.InvalidArgument, "filter %d has invalid severity: %v", i, err)
+ }
+ options = append(options, logtree.LeveledWithMinimumSeverity(severity))
+ }
+ }
+
+ reader, err := s.logtree.Read(logtree.DN(req.Dn), options...)
+ switch err {
+ case nil:
+ case logtree.ErrRawAndLeveled:
+ return status.Errorf(codes.InvalidArgument, "requested only raw and only leveled logs simultaneously")
+ default:
+ return status.Errorf(codes.Unavailable, "could not retrieve logs: %v", err)
+ }
+ defer reader.Close()
+
+ // Default protobuf message size limit is 64MB. We want to limit ourselves
+ // to 10MB.
+ // Currently each raw log line can be at most 1024 unicode codepoints (or
+ // 4096 bytes). To cover extra metadata and proto overhead, let's round
+ // this up to 4500 bytes. This in turn means we can store a maximum of
+ // (10e6/4500) == 2222 entries.
+ // Currently each leveled log line can also be at most 1024 unicode
+ // codepoints (or 4096 bytes). To cover extra metadata and proto overhead
+ // let's round this up to 2000 bytes. This in turn means we can store a
+ // maximum of (10e6/5000) == 2000 entries.
+ // The lowever of these numbers, ie the worst case scenario, is 2000
+ // maximum entries.
+ maxChunkSize := 2000
+
+ // Serve all backlog entries in chunks.
+ chunk := make([]*apb.LogEntry, 0, maxChunkSize)
+ for _, entry := range reader.Backlog {
+ p := entry.Proto()
+ if p == nil {
+ // TODO(q3k): log this once we have logtree/gRPC compatibility.
+ continue
+ }
+ chunk = append(chunk, p)
+
+ if len(chunk) >= maxChunkSize {
+ err := srv.Send(&apb.GetLogsResponse{
+ BacklogEntries: chunk,
+ })
+ if err != nil {
+ return err
+ }
+ chunk = make([]*apb.LogEntry, 0, maxChunkSize)
+ }
+ }
+
+ // Send last chunk of backlog, if present..
+ if len(chunk) > 0 {
+ err := srv.Send(&apb.GetLogsResponse{
+ BacklogEntries: chunk,
+ })
+ if err != nil {
+ return err
+ }
+ chunk = make([]*apb.LogEntry, 0, maxChunkSize)
+ }
+
+ // Start serving streaming data, if streaming has been requested.
+ if !streamEnable {
+ return nil
+ }
+
+ for {
+ entry, ok := <-reader.Stream
+ if !ok {
+ // Streaming has been ended by logtree - tell the client and return.
+ return status.Error(codes.Unavailable, "log streaming aborted by system")
+ }
+ p := entry.Proto()
+ if p == nil {
+ // TODO(q3k): log this once we have logtree/gRPC compatibility.
+ continue
+ }
+ err := srv.Send(&apb.GetLogsResponse{
+ StreamEntries: []*apb.LogEntry{p},
+ })
+ if err != nil {
+ return err
+ }
+ }
+}