metropolis: stub out log service

The server side and client-side implementations are not quite ready yet,
but we're commiting this early so that we can start implementing more
node-local management RPCs.

Change-Id: I81b615b0f77dc7750cc738d60ee4923c3182721b
Reviewed-on: https://review.monogon.dev/c/monogon/+/1429
Tested-by: Jenkins CI
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/cli/metroctl/BUILD.bazel b/metropolis/cli/metroctl/BUILD.bazel
index b2000fb..73a0a6e 100644
--- a/metropolis/cli/metroctl/BUILD.bazel
+++ b/metropolis/cli/metroctl/BUILD.bazel
@@ -7,6 +7,7 @@
         "cmd_k8scredplugin.go",
         "cmd_node.go",
         "cmd_node_approve.go",
+        "cmd_node_logs.go",
         "cmd_node_set.go",
         "cmd_takeownership.go",
         "main.go",
@@ -29,6 +30,7 @@
         "//metropolis/node",
         "//metropolis/node/core/identity",
         "//metropolis/node/core/rpc",
+        "//metropolis/pkg/logtree",
         "//metropolis/proto/api",
         "@com_github_adrg_xdg//:xdg",
         "@com_github_spf13_cobra//:cobra",
diff --git a/metropolis/cli/metroctl/cmd_node.go b/metropolis/cli/metroctl/cmd_node.go
index 97198d8..0f841ef 100644
--- a/metropolis/cli/metroctl/cmd_node.go
+++ b/metropolis/cli/metroctl/cmd_node.go
@@ -60,6 +60,7 @@
 func init() {
 	nodeCmd.AddCommand(nodeDescribeCmd)
 	nodeCmd.AddCommand(nodeListCmd)
+	nodeCmd.AddCommand(nodeLogsCmd)
 	rootCmd.AddCommand(nodeCmd)
 }
 
diff --git a/metropolis/cli/metroctl/cmd_node_logs.go b/metropolis/cli/metroctl/cmd_node_logs.go
new file mode 100644
index 0000000..21d5637
--- /dev/null
+++ b/metropolis/cli/metroctl/cmd_node_logs.go
@@ -0,0 +1,94 @@
+package main
+
+import (
+	"crypto/x509"
+	"errors"
+	"fmt"
+	"io"
+
+	"github.com/spf13/cobra"
+
+	"source.monogon.dev/metropolis/cli/metroctl/core"
+	"source.monogon.dev/metropolis/pkg/logtree"
+	"source.monogon.dev/metropolis/proto/api"
+)
+
+var nodeLogsCmd = &cobra.Command{
+	Short: "Get/stream logs from node",
+	Use:   "logs [node-id]",
+	Args:  cobra.MinimumNArgs(1),
+	RunE: func(cmd *cobra.Command, args []string) error {
+		ctx := cmd.Context()
+
+		// First connect to the main management service and figure out the node's IP
+		// address.
+		cc := dialAuthenticated(ctx)
+		mgmt := api.NewManagementClient(cc)
+		nodes, err := core.GetNodes(ctx, mgmt, fmt.Sprintf("node.id == %q", args[0]))
+		if err != nil {
+			return fmt.Errorf("when getting node info: %w", err)
+		}
+
+		if len(nodes) == 0 {
+			return fmt.Errorf("no such node")
+		}
+		if len(nodes) > 1 {
+			return fmt.Errorf("expression matched more than one node")
+		}
+		n := nodes[0]
+		if n.Status == nil || n.Status.ExternalAddress == "" {
+			return fmt.Errorf("node has no external address")
+		}
+
+		// TODO(q3k): save CA certificate on takeover
+		info, err := mgmt.GetClusterInfo(ctx, &api.GetClusterInfoRequest{})
+		if err != nil {
+			return fmt.Errorf("couldn't get cluster info: %w", err)
+		}
+		cacert, err := x509.ParseCertificate(info.CaCertificate)
+		if err != nil {
+			return fmt.Errorf("remote CA certificate invalid: %w", err)
+		}
+
+		fmt.Printf("Getting logs from %s (%s)...\n", n.Id, n.Status.ExternalAddress)
+		// Dial the actual node at its management port.
+		cl := dialAuthenticatedNode(ctx, n.Id, n.Status.ExternalAddress, cacert)
+		nmgmt := api.NewNodeManagementClient(cl)
+
+		srv, err := nmgmt.Logs(ctx, &api.GetLogsRequest{
+			Dn:          "",
+			BacklogMode: api.GetLogsRequest_BACKLOG_ALL,
+			StreamMode:  api.GetLogsRequest_STREAM_DISABLE,
+			Filters: []*api.LogFilter{
+				{
+					Filter: &api.LogFilter_WithChildren_{
+						WithChildren: &api.LogFilter_WithChildren{},
+					},
+				},
+			},
+		})
+		if err != nil {
+			return fmt.Errorf("failed to get logs: %w", err)
+		}
+		for {
+			res, err := srv.Recv()
+			if errors.Is(err, io.EOF) {
+				fmt.Println("Done.")
+				break
+			}
+			if err != nil {
+				return fmt.Errorf("log stream failed: %w", err)
+			}
+			for _, entry := range res.BacklogEntries {
+				entry, err := logtree.LogEntryFromProto(entry)
+				if err != nil {
+					fmt.Printf("invalid entry: %v\n", err)
+					continue
+				}
+				fmt.Println(entry.String())
+			}
+		}
+
+		return nil
+	},
+}
diff --git a/metropolis/cli/metroctl/core/rpc.go b/metropolis/cli/metroctl/core/rpc.go
index 00f1f01..4aeda75 100644
--- a/metropolis/cli/metroctl/core/rpc.go
+++ b/metropolis/cli/metroctl/core/rpc.go
@@ -81,6 +81,33 @@
 	return c, nil
 }
 
+func DialNode(ctx context.Context, opkey ed25519.PrivateKey, ocert, ca *x509.Certificate, proxyAddr, nodeId, nodeAddr string) (*grpc.ClientConn, error) {
+	var dialOpts []grpc.DialOption
+
+	if opkey == nil {
+		return nil, fmt.Errorf("an owner's private key must be provided")
+	}
+	if proxyAddr != "" {
+		socksDialer, err := proxy.SOCKS5("tcp", proxyAddr, nil, proxy.Direct)
+		if err != nil {
+			return nil, fmt.Errorf("failed to build a SOCKS dialer: %v", err)
+		}
+		grpcd := func(_ context.Context, addr string) (net.Conn, error) {
+			return socksDialer.Dial("tcp", addr)
+		}
+		dialOpts = append(dialOpts, grpc.WithContextDialer(grpcd))
+	}
+	tlsc := tls.Certificate{
+		Certificate: [][]byte{ocert.Raw},
+		PrivateKey:  opkey,
+	}
+	creds := rpc.NewAuthenticatedCredentials(tlsc, rpc.WantRemoteCluster(ca), rpc.WantRemoteNode(nodeId))
+	dialOpts = append(dialOpts, grpc.WithTransportCredentials(creds))
+
+	endpoint := net.JoinHostPort(nodeAddr, node.NodeManagement.PortString())
+	return grpc.Dial(endpoint, dialOpts...)
+}
+
 // GetNodes retrieves node records, filtered by the supplied node filter
 // expression fexp.
 func GetNodes(ctx context.Context, mgmt api.ManagementClient, fexp string) ([]*api.Node, error) {
diff --git a/metropolis/cli/metroctl/rpc.go b/metropolis/cli/metroctl/rpc.go
index 8e8ee48..6d7beab 100644
--- a/metropolis/cli/metroctl/rpc.go
+++ b/metropolis/cli/metroctl/rpc.go
@@ -2,6 +2,7 @@
 
 import (
 	"context"
+	"crypto/x509"
 	"log"
 
 	"google.golang.org/grpc"
@@ -25,3 +26,17 @@
 	}
 	return cc
 }
+
+func dialAuthenticatedNode(ctx context.Context, id, address string, cacert *x509.Certificate) *grpc.ClientConn {
+	// Collect credentials, validate command parameters, and try dialing the
+	// cluster.
+	ocert, opkey, err := core.GetOwnerCredentials(flags.configPath)
+	if err == core.NoCredentialsError {
+		log.Fatalf("You have to take ownership of the cluster first: %v", err)
+	}
+	cc, err := core.DialNode(ctx, opkey, ocert, cacert, flags.proxyAddr, id, address)
+	if err != nil {
+		log.Fatalf("While dialing node: %v", err)
+	}
+	return cc
+}
diff --git a/metropolis/node/core/mgmt/BUILD.bazel b/metropolis/node/core/mgmt/BUILD.bazel
index 41a25d2..349f13e 100644
--- a/metropolis/node/core/mgmt/BUILD.bazel
+++ b/metropolis/node/core/mgmt/BUILD.bazel
@@ -2,7 +2,10 @@
 
 go_library(
     name = "mgmt",
-    srcs = ["mgmt.go"],
+    srcs = [
+        "mgmt.go",
+        "svc_logs.go",
+    ],
     importpath = "source.monogon.dev/metropolis/node/core/mgmt",
     visibility = ["//visibility:public"],
     deps = [
@@ -12,5 +15,7 @@
         "//metropolis/pkg/supervisor",
         "//metropolis/proto/api",
         "@org_golang_google_grpc//:go_default_library",
+        "@org_golang_google_grpc//codes",
+        "@org_golang_google_grpc//status",
     ],
 )
diff --git a/metropolis/node/core/mgmt/svc_logs.go b/metropolis/node/core/mgmt/svc_logs.go
new file mode 100644
index 0000000..6e05ce9
--- /dev/null
+++ b/metropolis/node/core/mgmt/svc_logs.go
@@ -0,0 +1,12 @@
+package mgmt
+
+import (
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+
+	"source.monogon.dev/metropolis/proto/api"
+)
+
+func (s *Service) Logs(_ *api.GetLogsRequest, _ api.NodeManagement_LogsServer) error {
+	return status.Error(codes.Unimplemented, "unimplemented")
+}
diff --git a/metropolis/proto/api/debug.proto b/metropolis/proto/api/debug.proto
index c8deb73..156f036 100644
--- a/metropolis/proto/api/debug.proto
+++ b/metropolis/proto/api/debug.proto
@@ -19,6 +19,7 @@
 option go_package = "source.monogon.dev/metropolis/proto/api";
 
 import "google/protobuf/timestamp.proto";
+import "metropolis/proto/api/management.proto";
 
 // NodeDebugService exposes debug and testing endpoints that allow introspection into a running Metropolis node.
 // It is not authenticated and will be disabled in production. It is currently consumed by metropolis/cli/dbg and
@@ -67,102 +68,6 @@
     string debug_kubeconfig = 1;
 }
 
-// Severity level corresponding to //metropolis/pkg/logtree.Severity.
-enum LeveledLogSeverity {
-    INVALID = 0;
-    INFO = 1;
-    WARNING = 2;
-    ERROR = 3;
-    FATAL = 4;
-}
-
-// Filter set when requesting logs for a given DN. This message is equivalent to the following GADT enum:
-// data LogFilter = WithChildren
-//                | OnlyRaw
-//                | OnlyLeveled
-//                | LeveledWithMinimumSeverity(Severity)
-//
-// Multiple LogFilters can be chained/combined when requesting logs, as long as they do not conflict.
-message LogFilter {
-    // Entries will be returned not only for the given DN, but all child DNs as well. For instance, if the
-    // requested DN is foo, entries logged to foo, foo.bar and foo.bar.baz will all be returned.
-    message WithChildren {
-    }
-    // Only raw logging entries will be returned. Conflicts with OnlyLeveled filters.
-    message OnlyRaw {
-    }
-    // Only leveled logging entries will be returned. Conflicts with OnlyRaw filters.
-    message OnlyLeveled {
-    }
-    // If leveled logs are returned, all entries at severity lower than `minimum` will be discarded.
-    message LeveledWithMinimumSeverity {
-        LeveledLogSeverity minimum = 1;
-    }
-    oneof filter {
-        WithChildren with_children = 1;
-        OnlyRaw only_raw = 3;
-        OnlyLeveled only_leveled = 4;
-        LeveledWithMinimumSeverity leveled_with_minimum_severity = 5;
-    }
-}
-
-message GetLogsRequest {
-    // DN from which to request logs. All supervised runnables live at `root.`, the init code lives at `init.`.
-    string dn = 1;
-    // Filters to apply to returned data.
-    repeated LogFilter filters = 2;
-
-    enum BacklogMode {
-        BACKLOG_INVALID = 0;
-        // No historic data will be returned.
-        BACKLOG_DISABLE = 1;
-        // All available historic data will be returned.
-        BACKLOG_ALL = 2;
-        // At most backlog_count entries will be returned, if available.
-        BACKLOG_COUNT = 3;
-    }
-    BacklogMode backlog_mode = 3;
-    int64 backlog_count = 4;
-
-    enum StreamMode {
-        STREAM_INVALID = 0;
-        // No streaming entries, gRPC stream will be closed as soon as all backlog data is served.
-        STREAM_DISABLE = 1;
-        // Entries will be streamed as early as available right after all backlog data is served.
-        STREAM_UNBUFFERED = 2;
-    }
-    StreamMode stream_mode = 5;
-}
-
-message GetLogsResponse {
-    // Entries from the requested historical entries (via WithBackLog). They will all be served before the first
-    // stream_entries are served (if any).
-    repeated LogEntry backlog_entries = 1;
-    // Entries streamed as they arrive. Currently no server-side buffering is enabled, instead every line is served
-    // as early as it arrives. However, this might change in the future, so this behaviour cannot be depended
-    // upon.
-    repeated LogEntry stream_entries = 2;
-}
-
-message LogEntry {
-    message Leveled {
-        repeated string lines = 1;
-        google.protobuf.Timestamp timestamp = 2;
-        LeveledLogSeverity severity = 3;
-        string location = 4;
-    }
-    message Raw {
-        string data = 1;
-        int64 original_length = 2;
-    }
-
-    string dn = 1;
-    oneof kind {
-        Leveled leveled = 2;
-        Raw raw = 3;
-    }
-}
-
 message TraceRequest {
     // Name of the tracer to use. Defined in https://www.kernel.org/doc/html/latest/trace/ftrace.html#the-tracers.
     // Useful ones enabled in Metropolis: function_graph, function.
diff --git a/metropolis/proto/api/management.proto b/metropolis/proto/api/management.proto
index 012bb07..3c869a9 100644
--- a/metropolis/proto/api/management.proto
+++ b/metropolis/proto/api/management.proto
@@ -3,6 +3,7 @@
 option go_package = "source.monogon.dev/metropolis/proto/api";
 
 import "google/protobuf/duration.proto";
+import "google/protobuf/timestamp.proto";
 
 import "metropolis/proto/common/common.proto";
 import "metropolis/proto/ext/authorization.proto";
@@ -184,4 +185,113 @@
 // NodeManagement runs on every node of the cluster and providers management
 // and troubleshooting RPCs to operators. All requests must be authenticated.
 service NodeManagement {
+  rpc Logs(GetLogsRequest) returns (stream GetLogsResponse) {
+    option (metropolis.proto.ext.authorization) = {
+      need: PERMISSION_READ_NODE_LOGS
+    };
+  }
+}
+
+
+// Severity level corresponding to //metropolis/pkg/logtree.Severity.
+enum LeveledLogSeverity {
+  INVALID = 0;
+  INFO = 1;
+  WARNING = 2;
+  ERROR = 3;
+  FATAL = 4;
+}
+
+// Filter set when requesting logs for a given DN. This message is equivalent to
+// the following GADT enum:
+// data LogFilter = WithChildren
+//                | OnlyRaw
+//                | OnlyLeveled
+//                | LeveledWithMinimumSeverity(Severity)
+//
+// Multiple LogFilters can be chained/combined when requesting logs, as long as
+// they do not conflict.
+message LogFilter {
+  // Entries will be returned not only for the given DN, but all child DNs as
+  // well. For instance, if the requested DN is foo, entries logged to foo,
+  // foo.bar and foo.bar.baz will all be returned.
+  message WithChildren {
+  }
+  // Only raw logging entries will be returned. Conflicts with OnlyLeveled
+  // filters.
+  message OnlyRaw {
+  }
+  // Only leveled logging entries will be returned. Conflicts with OnlyRaw
+  // filters.
+  message OnlyLeveled {
+  }
+  // If leveled logs are returned, all entries at severity lower than `minimum`
+  // will be discarded.
+  message LeveledWithMinimumSeverity {
+    LeveledLogSeverity minimum = 1;
+  }
+  oneof filter {
+    WithChildren with_children = 1;
+    OnlyRaw only_raw = 3;
+    OnlyLeveled only_leveled = 4;
+    LeveledWithMinimumSeverity leveled_with_minimum_severity = 5;
+  }
+}
+
+message GetLogsRequest {
+  // DN from which to request logs. All supervised runnables live at `root.`,
+  // the init code lives at `init.`.
+  string dn = 1;
+  // Filters to apply to returned data.
+  repeated LogFilter filters = 2;
+
+  enum BacklogMode {
+    BACKLOG_INVALID = 0;
+    // No historic data will be returned.
+    BACKLOG_DISABLE = 1;
+    // All available historic data will be returned.
+    BACKLOG_ALL = 2;
+    // At most backlog_count entries will be returned, if available.
+    BACKLOG_COUNT = 3;
+  }
+  BacklogMode backlog_mode = 3;
+  int64 backlog_count = 4;
+
+  enum StreamMode {
+    STREAM_INVALID = 0;
+    // No streaming entries, gRPC stream will be closed as soon as all backlog data is served.
+    STREAM_DISABLE = 1;
+    // Entries will be streamed as early as available right after all backlog data is served.
+    STREAM_UNBUFFERED = 2;
+  }
+  StreamMode stream_mode = 5;
+}
+
+message LogEntry {
+  message Leveled {
+    repeated string lines = 1;
+    google.protobuf.Timestamp timestamp = 2;
+    LeveledLogSeverity severity = 3;
+    string location = 4;
+  }
+  message Raw {
+    string data = 1;
+    int64 original_length = 2;
+  }
+
+  string dn = 1;
+  oneof kind {
+    Leveled leveled = 2;
+    Raw raw = 3;
+  }
+}
+
+message GetLogsResponse {
+  // Entries from the requested historical entries (via WithBackLog). They will all be served before the first
+  // stream_entries are served (if any).
+  repeated LogEntry backlog_entries = 1;
+  // Entries streamed as they arrive. Currently no server-side buffering is enabled, instead every line is served
+  // as early as it arrives. However, this might change in the future, so this behaviour cannot be depended
+  // upon.
+  repeated LogEntry stream_entries = 2;
 }
\ No newline at end of file
diff --git a/metropolis/proto/ext/authorization.proto b/metropolis/proto/ext/authorization.proto
index 0275bba..12780f7 100644
--- a/metropolis/proto/ext/authorization.proto
+++ b/metropolis/proto/ext/authorization.proto
@@ -24,6 +24,7 @@
     PERMISSION_UPDATE_NODE_SELF = 3;
     PERMISSION_APPROVE_NODE = 4;
     PERMISSION_UPDATE_NODE_ROLES = 5;
+    PERMISSION_READ_NODE_LOGS = 6;
 }
 
 // Authorization policy for an RPC method. This message/API does not have the