m/n/core/consensus: parse etcd server logs

This finally gives us easy to read etcd logs instead of raw JSON dumps
into stdout. Instead of simply parsing them as raw logs, we convert them
into leveled logs.

Change-Id: I7cfe18b9c4e24d7742a01a77f5d9c6ddee647493
Reviewed-on: https://review.monogon.dev/c/monogon/+/209
Reviewed-by: Leopold Schabel <leo@nexantic.com>
diff --git a/metropolis/node/core/consensus/BUILD.bazel b/metropolis/node/core/consensus/BUILD.bazel
index dd3de3c..06ccb12 100644
--- a/metropolis/node/core/consensus/BUILD.bazel
+++ b/metropolis/node/core/consensus/BUILD.bazel
@@ -2,7 +2,10 @@
 
 go_library(
     name = "go_default_library",
-    srcs = ["consensus.go"],
+    srcs = [
+        "consensus.go",
+        "logparser.go",
+    ],
     importpath = "source.monogon.dev/metropolis/node/core/consensus",
     visibility = ["//:__subpackages__"],
     deps = [
@@ -10,6 +13,9 @@
         "//metropolis/node/core/consensus/ca:go_default_library",
         "//metropolis/node/core/consensus/client:go_default_library",
         "//metropolis/node/core/localstorage:go_default_library",
+        "//metropolis/pkg/logbuffer:go_default_library",
+        "//metropolis/pkg/logtree:go_default_library",
+        "//metropolis/pkg/logtree/unraw:go_default_library",
         "//metropolis/pkg/supervisor:go_default_library",
         "@io_etcd_go_etcd//clientv3:go_default_library",
         "@io_etcd_go_etcd//embed:go_default_library",
@@ -19,12 +25,18 @@
 
 go_test(
     name = "go_default_test",
-    srcs = ["consensus_test.go"],
+    srcs = [
+        "consensus_test.go",
+        "logparser_test.go",
+    ],
     embed = [":go_default_library"],
     deps = [
         "//metropolis/node/core/localstorage:go_default_library",
         "//metropolis/node/core/localstorage/declarative:go_default_library",
         "//metropolis/pkg/freeport:go_default_library",
+        "//metropolis/pkg/logbuffer:go_default_library",
+        "//metropolis/pkg/logtree:go_default_library",
         "//metropolis/pkg/supervisor:go_default_library",
+        "@com_github_google_go_cmp//cmp:go_default_library",
     ],
 )
diff --git a/metropolis/node/core/consensus/consensus.go b/metropolis/node/core/consensus/consensus.go
index ed44140..8e74000 100644
--- a/metropolis/node/core/consensus/consensus.go
+++ b/metropolis/node/core/consensus/consensus.go
@@ -49,6 +49,7 @@
 	"source.monogon.dev/metropolis/node/core/consensus/ca"
 	"source.monogon.dev/metropolis/node/core/consensus/client"
 	"source.monogon.dev/metropolis/node/core/localstorage"
+	"source.monogon.dev/metropolis/pkg/logtree/unraw"
 	"source.monogon.dev/metropolis/pkg/supervisor"
 )
 
@@ -166,9 +167,24 @@
 		cfg.ClusterState = "existing"
 	}
 
+	converter := unraw.Converter{
+		Parser: parseEtcdLogEntry,
+		// The initial line from a starting etcd instance is fairly long.
+		MaximumLineLength: 8192,
+		LeveledLogger:     supervisor.Logger(ctx),
+	}
+	fifoPath := s.config.Ephemeral.ServerLogsFIFO.FullPath()
+	pipe, err := converter.NamedPipeReader(fifoPath)
+	if err != nil {
+		return nil, fmt.Errorf("could not get named pipe reader: %w", err)
+	}
+	if err := supervisor.Run(ctx, "pipe", pipe); err != nil {
+		return nil, fmt.Errorf("could not start server log reader: %w", err)
+	}
+
 	// TODO(q3k): pipe logs from etcd to supervisor.RawLogger via a file.
 	cfg.Logger = DefaultLogger
-	cfg.LogOutputs = []string{"stderr"}
+	cfg.LogOutputs = []string{fifoPath}
 
 	return cfg, nil
 }
diff --git a/metropolis/node/core/consensus/logparser.go b/metropolis/node/core/consensus/logparser.go
new file mode 100644
index 0000000..f4f5a4f
--- /dev/null
+++ b/metropolis/node/core/consensus/logparser.go
@@ -0,0 +1,143 @@
+package consensus
+
+import (
+	"encoding/json"
+	"fmt"
+	"sort"
+	"strconv"
+	"strings"
+	"time"
+
+	"source.monogon.dev/metropolis/pkg/logbuffer"
+	"source.monogon.dev/metropolis/pkg/logtree"
+	"source.monogon.dev/metropolis/pkg/logtree/unraw"
+)
+
+// etcdLogEntry is a JSON-encoded, structured log entry received from a running
+// etcd server. The format comes from the logging library used there,
+// github.com/uber-go/zap.
+type etcdLogEntry struct {
+	Level   string                 `json:"level"`
+	TS      time.Time              `json:"ts"`
+	Caller  string                 `json:"caller"`
+	Message string                 `json:"msg"`
+	Extras  map[string]interface{} `json:"-"`
+}
+
+// parseEtcdLogEntry is a logtree/unraw compatible parser for etcd log lines.
+// It is fairly liberal in what it will accept, falling back to writing a
+// message that outlines the given log entry could not have been parsed. This
+// ensures that no lines are lost, even if malformed.
+func parseEtcdLogEntry(l *logbuffer.Line, write unraw.LeveledWriter) {
+	if l.Truncated() {
+		write(&logtree.ExternalLeveledPayload{
+			Message: "Log line truncated: " + l.Data,
+		})
+		return
+	}
+
+	e := etcdLogEntry{}
+	// Parse constant fields
+	if err := json.Unmarshal([]byte(l.Data), &e); err != nil {
+		write(&logtree.ExternalLeveledPayload{
+			Message: "Log line unparseable: " + l.Data,
+		})
+		return
+	}
+	// Parse extra fields.
+	if err := json.Unmarshal([]byte(l.Data), &e.Extras); err != nil {
+		// Not exactly sure how this could ever happen - the previous parse
+		// went fine, so why wouldn't this one? But to be on the safe side,
+		// just don't attempt to parse this line any further.
+		write(&logtree.ExternalLeveledPayload{
+			Message: "Log line unparseable: " + l.Data,
+		})
+		return
+	}
+	delete(e.Extras, "level")
+	delete(e.Extras, "ts")
+	delete(e.Extras, "caller")
+	delete(e.Extras, "msg")
+
+	out := logtree.ExternalLeveledPayload{
+		Timestamp: e.TS,
+	}
+
+	// Attempt to parse caller (eg. raft/raft.go:765) into file/line (eg.
+	// raft.go 765).
+	if len(e.Caller) > 0 {
+		parts := strings.Split(e.Caller, "/")
+		fileLine := parts[len(parts)-1]
+		parts = strings.Split(fileLine, ":")
+		if len(parts) == 2 {
+			out.File = parts[0]
+			if line, err := strconv.ParseInt(parts[1], 10, 32); err == nil {
+				out.Line = int(line)
+			}
+		}
+	}
+
+	// Convert zap level into logtree severity.
+	switch e.Level {
+	case "info":
+		out.Severity = logtree.INFO
+	case "warn":
+		out.Severity = logtree.WARNING
+	case "error":
+		out.Severity = logtree.ERROR
+	case "fatal", "panic", "dpanic":
+		out.Severity = logtree.FATAL
+	}
+
+	// Sort extra keys alphabetically.
+	extraKeys := make([]string, 0, len(e.Extras))
+	for k, _ := range e.Extras {
+		extraKeys = append(extraKeys, k)
+	}
+	sort.Strings(extraKeys)
+
+	// Convert structured extras into a human-friendly logline. We will
+	// comma-join the received message and any structured logging data after
+	// it.
+	parts := make([]string, 0, len(e.Extras)+1)
+	parts = append(parts, e.Message)
+	for _, k := range extraKeys {
+
+		// Format the value for logs. We elect to use JSON for representing
+		// each element, as:
+		// - this quotes strings
+		// - all the data we retrieved must already be representable in JSON,
+		//   as we just decoded it from an existing blob.
+		// - the extra data might be arbitrarily nested (eg. an array or
+		//   object) and we don't want to be in the business of coming up with
+		//   our own serialization format in case of such data.
+		var v string
+		vbytes, err := json.Marshal(e.Extras[k])
+		if err != nil {
+			// Fall back to +%v just in case. We don't make any API promises
+			// that the log line will be machine parseable or in any stable
+			// format.
+			v = fmt.Sprintf("%+v", v)
+		} else {
+			v = string(vbytes)
+		}
+		extra := fmt.Sprintf("%s: %s", k, v)
+
+		parts = append(parts, extra)
+	}
+
+	// If the given message was empty and there are some extra data attached,
+	// explicitly state that the message was empty (to avoid a mysterious
+	// leading comma).
+	// Otherwise, if the message was empty and there was no extra structured
+	// data, assume that the sender intended to have it represented as an empty
+	// line.
+	if len(parts) > 1 && parts[0] == "" {
+		parts[0] = "<empty>"
+	}
+
+	// Finally build the message line to emit in leveled logging and emit it.
+	out.Message = strings.Join(parts, ", ")
+
+	write(&out)
+}
diff --git a/metropolis/node/core/consensus/logparser_test.go b/metropolis/node/core/consensus/logparser_test.go
new file mode 100644
index 0000000..c53c13d
--- /dev/null
+++ b/metropolis/node/core/consensus/logparser_test.go
@@ -0,0 +1,85 @@
+package consensus
+
+import (
+	"testing"
+	"time"
+
+	"github.com/google/go-cmp/cmp"
+
+	"source.monogon.dev/metropolis/pkg/logbuffer"
+	"source.monogon.dev/metropolis/pkg/logtree"
+)
+
+// TestParsing exercises the parseEtcdLogEntry function.
+func TestParsing(t *testing.T) {
+	timeParse := func(s string) time.Time {
+		res, err := time.Parse(time.RFC3339, s)
+		if err != nil {
+			t.Fatalf("could not parse time: %v", err)
+		}
+		return res
+	}
+	for _, te := range []struct {
+		// Name of subtest.
+		name string
+		// Data to be parsed.
+		raw string
+		// The expected parsed data. The parser does not attempt to set any
+		// 'default' values in case any are missing, instead the logtree's
+		// external leveled payload functionality does that.
+		want *logtree.ExternalLeveledPayload
+	}{
+		{
+			"Parse configuring peer listeners message",
+			`{"level":"info","ts":"2021-07-06T17:18:24.368Z","caller":"embed/etcd.go:117","msg":"configuring peer listeners","listen-peer-urls":["https://[::]:7834"]}`,
+			&logtree.ExternalLeveledPayload{
+				Message:   `configuring peer listeners, listen-peer-urls: ["https://[::]:7834"]`,
+				Timestamp: timeParse("2021-07-06T17:18:24.368Z"),
+				Severity:  logtree.INFO,
+				File:      "etcd.go",
+				Line:      117,
+			},
+		},
+		{
+			"Parse added member message",
+			`{"level":"info","ts":"2021-07-06T17:21:49.462Z","caller":"membership/cluster.go:392","msg":"added member","cluster-id":"137c8e19524788c1","local-member-id":"9642132f5d0d99e2","added-peer-id":"9642132f5d0d99e2","added-peer-peer-urls":["https://metropolis-eb8d68cfb52711ad04c339abdeea74ed:7834"]}`,
+			&logtree.ExternalLeveledPayload{
+				Message:   `added member, added-peer-id: "9642132f5d0d99e2", added-peer-peer-urls: ["https://metropolis-eb8d68cfb52711ad04c339abdeea74ed:7834"], cluster-id: "137c8e19524788c1", local-member-id: "9642132f5d0d99e2"`,
+				Timestamp: timeParse("2021-07-06T17:21:49.462Z"),
+				Severity:  logtree.INFO,
+				File:      "cluster.go",
+				Line:      392,
+			},
+		},
+		{
+			"Parse empty message",
+			`{}`,
+			&logtree.ExternalLeveledPayload{},
+		},
+		{
+			"Parse invalid message",
+			`PANIC`,
+			&logtree.ExternalLeveledPayload{
+				Message: "Log line unparseable: PANIC",
+			},
+		},
+	} {
+		te := te
+		t.Run(te.name, func(t *testing.T) {
+			t.Parallel()
+
+			gotC := make(chan *logtree.ExternalLeveledPayload, 1)
+			parseEtcdLogEntry(&logbuffer.Line{
+				Data:           te.raw,
+				OriginalLength: len(te.raw),
+			}, func(d *logtree.ExternalLeveledPayload) {
+				gotC <- d
+			})
+
+			got := <-gotC
+			if diff := cmp.Diff(te.want, got); diff != "" {
+				t.Fatalf("diff: %s", diff)
+			}
+		})
+	}
+}
diff --git a/metropolis/node/core/localstorage/storage.go b/metropolis/node/core/localstorage/storage.go
index e1d1523..1e06967 100644
--- a/metropolis/node/core/localstorage/storage.go
+++ b/metropolis/node/core/localstorage/storage.go
@@ -163,7 +163,8 @@
 
 type EphemeralConsensusDirectory struct {
 	declarative.Directory
-	ClientSocket declarative.File `file:"client.sock"`
+	ClientSocket   declarative.File `file:"client.sock"`
+	ServerLogsFIFO declarative.File `file:"server-logs.fifo"`
 }
 
 type EphemeralCuratorDirectory struct {