metropolis: implement NodeManagement.Logs

This takes the implementation from the debug service, dusts it off a
bit, adds tests and moves eerything to the new node mgmt service.

Change-Id: Id3b70126a2551775d8328c0c4e424ec0e675f40f
Reviewed-on: https://review.monogon.dev/c/monogon/+/1439
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/cli/metroctl/cmd_node.go b/metropolis/cli/metroctl/cmd_node.go
index 0f841ef..97198d8 100644
--- a/metropolis/cli/metroctl/cmd_node.go
+++ b/metropolis/cli/metroctl/cmd_node.go
@@ -60,7 +60,6 @@
 func init() {
 	nodeCmd.AddCommand(nodeDescribeCmd)
 	nodeCmd.AddCommand(nodeListCmd)
-	nodeCmd.AddCommand(nodeLogsCmd)
 	rootCmd.AddCommand(nodeCmd)
 }
 
diff --git a/metropolis/cli/metroctl/cmd_node_logs.go b/metropolis/cli/metroctl/cmd_node_logs.go
index beae6ab..a04558c 100644
--- a/metropolis/cli/metroctl/cmd_node_logs.go
+++ b/metropolis/cli/metroctl/cmd_node_logs.go
@@ -11,13 +11,51 @@
 	"source.monogon.dev/metropolis/cli/metroctl/core"
 	"source.monogon.dev/metropolis/pkg/logtree"
 	"source.monogon.dev/metropolis/proto/api"
+
 	cpb "source.monogon.dev/metropolis/proto/common"
 )
 
+type metroctlLogFlags struct {
+	// follow (ie. stream) logs live.
+	follow bool
+	// dn to query.
+	dn string
+	// exact dn query, i.e. without children/recursion.
+	exact bool
+	// concise logging output format.
+	concise bool
+	// backlog: >0 for a concrete limit, -1 for all, 0 for none
+	backlog int
+}
+
+var logFlags metroctlLogFlags
+
 var nodeLogsCmd = &cobra.Command{
 	Short: "Get/stream logs from node",
-	Use:   "logs [node-id]",
-	Args:  cobra.MinimumNArgs(1),
+	Long: `Get or stream logs from node.
+
+Node logs are structured in a 'log tree' structure, in which different subsystems
+log to DNs (distinguished names). For example, service 'foo' might log to
+root.role.foo, while service 'bar' might log to root.role.bar.
+
+To set the DN you want to request logs from, use --dn. The default is to return
+all logs. The default output is also also a good starting point to figure out
+what DNs are active in the system.
+
+When requesting logs for a DN by default all sub-DNs will also be returned (ie.
+with the above example, when requesting DN 'root.role' logs at root.role.foo and
+root.role.bar would also be returned). This behaviour can be disabled by setting
+--exact.
+
+To stream logs, use --follow.
+
+By default, all available logs are returned. To limit the number of historical
+log lines (a.k.a. 'backlog') to return, set --backlog. This similar to requesting
+all lines and then piping the result through 'tail' - but more efficient, as no
+unnecessary lines are fetched.
+`,
+	Use:  "logs [node-id]",
+	Args: cobra.MinimumNArgs(1),
 	RunE: func(cmd *cobra.Command, args []string) error {
 		ctx := cmd.Context()
 
@@ -51,22 +89,39 @@
 			return fmt.Errorf("remote CA certificate invalid: %w", err)
 		}
 
-		fmt.Printf("Getting logs from %s (%s)...\n", n.Id, n.Status.ExternalAddress)
+		fmt.Printf("=== Logs from %s (%s):\n", n.Id, n.Status.ExternalAddress)
 		// Dial the actual node at its management port.
 		cl := dialAuthenticatedNode(ctx, n.Id, n.Status.ExternalAddress, cacert)
 		nmgmt := api.NewNodeManagementClient(cl)
 
-		srv, err := nmgmt.Logs(ctx, &api.GetLogsRequest{
-			Dn:          "",
-			BacklogMode: api.GetLogsRequest_BACKLOG_ALL,
-			StreamMode:  api.GetLogsRequest_STREAM_DISABLE,
-			Filters: []*cpb.LogFilter{
-				{
-					Filter: &cpb.LogFilter_WithChildren_{
-						WithChildren: &cpb.LogFilter_WithChildren{},
-					},
+		streamMode := api.GetLogsRequest_STREAM_DISABLE
+		if logFlags.follow {
+			streamMode = api.GetLogsRequest_STREAM_UNBUFFERED
+		}
+		var filters []*cpb.LogFilter
+		if !logFlags.exact {
+			filters = append(filters, &cpb.LogFilter{
+				Filter: &cpb.LogFilter_WithChildren_{
+					WithChildren: &cpb.LogFilter_WithChildren{},
 				},
-			},
+			})
+		}
+		backlogMode := api.GetLogsRequest_BACKLOG_ALL
+		var backlogCount int64
+		switch {
+		case logFlags.backlog > 0:
+			backlogMode = api.GetLogsRequest_BACKLOG_COUNT
+			backlogCount = int64(logFlags.backlog)
+		case logFlags.backlog == 0:
+			backlogMode = api.GetLogsRequest_BACKLOG_DISABLE
+		}
+
+		srv, err := nmgmt.Logs(ctx, &api.GetLogsRequest{
+			Dn:           logFlags.dn,
+			BacklogMode:  backlogMode,
+			BacklogCount: backlogCount,
+			StreamMode:   streamMode,
+			Filters:      filters,
 		})
 		if err != nil {
 			return fmt.Errorf("failed to get logs: %w", err)
@@ -74,22 +129,42 @@
 		for {
 			res, err := srv.Recv()
 			if errors.Is(err, io.EOF) {
-				fmt.Println("Done.")
+				fmt.Println("=== Done.")
 				break
 			}
 			if err != nil {
 				return fmt.Errorf("log stream failed: %w", err)
 			}
 			for _, entry := range res.BacklogEntries {
-				entry, err := logtree.LogEntryFromProto(entry)
-				if err != nil {
-					fmt.Printf("invalid entry: %v\n", err)
-					continue
-				}
-				fmt.Println(entry.String())
+				printEntry(entry)
+			}
+			for _, entry := range res.StreamEntries {
+				printEntry(entry)
 			}
 		}
 
 		return nil
 	},
 }
+
+func printEntry(e *cpb.LogEntry) {
+	entry, err := logtree.LogEntryFromProto(e)
+	if err != nil {
+		fmt.Printf("invalid stream entry: %v\n", err)
+		return
+	}
+	if logFlags.concise {
+		fmt.Println(entry.ConciseString(logtree.MetropolisShortenDict, 0))
+	} else {
+		fmt.Println(entry.String())
+	}
+}
+
+func init() {
+	nodeLogsCmd.Flags().BoolVarP(&logFlags.follow, "follow", "f", false, "Continue streaming logs after fetching backlog.")
+	nodeLogsCmd.Flags().StringVar(&logFlags.dn, "dn", "", "Distinguished Name to get logs from (and children, if --exact is not set). If not set, defaults to '', which is the top-level DN.")
+	nodeLogsCmd.Flags().BoolVarP(&logFlags.exact, "exact", "e", false, "Only show logs for exactly the DN, do not recurse down the tree.")
+	nodeLogsCmd.Flags().BoolVarP(&logFlags.concise, "concise", "c", false, "Output concise logs.")
+	nodeLogsCmd.Flags().IntVar(&logFlags.backlog, "backlog", -1, "How many lines of historical log data to return. The default (-1) returns all available lines. Zero value means no backlog is returned (useful when using --follow).")
+	nodeCmd.AddCommand(nodeLogsCmd)
+}
diff --git a/metropolis/cli/metroctl/main.go b/metropolis/cli/metroctl/main.go
index 19b378d..a4ea438 100644
--- a/metropolis/cli/metroctl/main.go
+++ b/metropolis/cli/metroctl/main.go
@@ -44,7 +44,7 @@
 	rootCmd.PersistentFlags().StringVar(&flags.proxyAddr, "proxy", "", "SOCKS5 proxy address")
 	rootCmd.PersistentFlags().StringVar(&flags.configPath, "config", filepath.Join(xdg.ConfigHome, "metroctl"), "An alternative cluster config path")
 	rootCmd.PersistentFlags().BoolVar(&flags.verbose, "verbose", false, "Log additional runtime information")
-	rootCmd.PersistentFlags().StringVarP(&flags.format, "format", "f", "plaintext", "Data output format")
+	rootCmd.PersistentFlags().StringVar(&flags.format, "format", "plaintext", "Data output format")
 	rootCmd.PersistentFlags().StringVar(&flags.filter, "filter", "", "The object filter applied to the output data")
 	rootCmd.PersistentFlags().StringVarP(&flags.output, "output", "o", "", "Redirects output to the specified file")
 }
diff --git a/metropolis/cli/metroctl/test/test.go b/metropolis/cli/metroctl/test/test.go
index 5468834..872f7b3 100644
--- a/metropolis/cli/metroctl/test/test.go
+++ b/metropolis/cli/metroctl/test/test.go
@@ -265,6 +265,20 @@
 			return nil
 		})
 	})
+	t.Run("logs [nodeID]", func(t *testing.T) {
+		util.TestEventual(t, "metroctl logs [nodeID]", ctx, 10*time.Second, func(ctx context.Context) error {
+			var args []string
+			args = append(args, commonOpts...)
+			args = append(args, endpointOpts...)
+			args = append(args, "node", "logs", cl.NodeIDs[1])
+
+			if err := mctlFailIfMissing(t, ctx, args, "Cluster enrolment done."); err != nil {
+				return err
+			}
+
+			return nil
+		})
+	})
 	t.Run("set/unset role", func(t *testing.T) {
 		util.TestEventual(t, "metroctl set/unset role KubernetesController", ctx, 10*time.Second, func(ctx context.Context) error {
 			nid := cl.NodeIDs[1]