metropolis: implement NodeManagement.Logs
This takes the implementation from the debug service, dusts it off a
bit, adds tests and moves eerything to the new node mgmt service.
Change-Id: Id3b70126a2551775d8328c0c4e424ec0e675f40f
Reviewed-on: https://review.monogon.dev/c/monogon/+/1439
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/cli/metroctl/cmd_node.go b/metropolis/cli/metroctl/cmd_node.go
index 0f841ef..97198d8 100644
--- a/metropolis/cli/metroctl/cmd_node.go
+++ b/metropolis/cli/metroctl/cmd_node.go
@@ -60,7 +60,6 @@
func init() {
nodeCmd.AddCommand(nodeDescribeCmd)
nodeCmd.AddCommand(nodeListCmd)
- nodeCmd.AddCommand(nodeLogsCmd)
rootCmd.AddCommand(nodeCmd)
}
diff --git a/metropolis/cli/metroctl/cmd_node_logs.go b/metropolis/cli/metroctl/cmd_node_logs.go
index beae6ab..a04558c 100644
--- a/metropolis/cli/metroctl/cmd_node_logs.go
+++ b/metropolis/cli/metroctl/cmd_node_logs.go
@@ -11,13 +11,51 @@
"source.monogon.dev/metropolis/cli/metroctl/core"
"source.monogon.dev/metropolis/pkg/logtree"
"source.monogon.dev/metropolis/proto/api"
+
cpb "source.monogon.dev/metropolis/proto/common"
)
+type metroctlLogFlags struct {
+ // follow (ie. stream) logs live.
+ follow bool
+ // dn to query.
+ dn string
+ // exact dn query, i.e. without children/recursion.
+ exact bool
+ // concise logging output format.
+ concise bool
+ // backlog: >0 for a concrete limit, -1 for all, 0 for none
+ backlog int
+}
+
+var logFlags metroctlLogFlags
+
var nodeLogsCmd = &cobra.Command{
Short: "Get/stream logs from node",
- Use: "logs [node-id]",
- Args: cobra.MinimumNArgs(1),
+ Long: `Get or stream logs from node.
+
+Node logs are structured in a 'log tree' structure, in which different subsystems
+log to DNs (distinguished names). For example, service 'foo' might log to
+root.role.foo, while service 'bar' might log to root.role.bar.
+
+To set the DN you want to request logs from, use --dn. The default is to return
+all logs. The default output is also also a good starting point to figure out
+what DNs are active in the system.
+
+When requesting logs for a DN by default all sub-DNs will also be returned (ie.
+with the above example, when requesting DN 'root.role' logs at root.role.foo and
+root.role.bar would also be returned). This behaviour can be disabled by setting
+--exact.
+
+To stream logs, use --follow.
+
+By default, all available logs are returned. To limit the number of historical
+log lines (a.k.a. 'backlog') to return, set --backlog. This similar to requesting
+all lines and then piping the result through 'tail' - but more efficient, as no
+unnecessary lines are fetched.
+`,
+ Use: "logs [node-id]",
+ Args: cobra.MinimumNArgs(1),
RunE: func(cmd *cobra.Command, args []string) error {
ctx := cmd.Context()
@@ -51,22 +89,39 @@
return fmt.Errorf("remote CA certificate invalid: %w", err)
}
- fmt.Printf("Getting logs from %s (%s)...\n", n.Id, n.Status.ExternalAddress)
+ fmt.Printf("=== Logs from %s (%s):\n", n.Id, n.Status.ExternalAddress)
// Dial the actual node at its management port.
cl := dialAuthenticatedNode(ctx, n.Id, n.Status.ExternalAddress, cacert)
nmgmt := api.NewNodeManagementClient(cl)
- srv, err := nmgmt.Logs(ctx, &api.GetLogsRequest{
- Dn: "",
- BacklogMode: api.GetLogsRequest_BACKLOG_ALL,
- StreamMode: api.GetLogsRequest_STREAM_DISABLE,
- Filters: []*cpb.LogFilter{
- {
- Filter: &cpb.LogFilter_WithChildren_{
- WithChildren: &cpb.LogFilter_WithChildren{},
- },
+ streamMode := api.GetLogsRequest_STREAM_DISABLE
+ if logFlags.follow {
+ streamMode = api.GetLogsRequest_STREAM_UNBUFFERED
+ }
+ var filters []*cpb.LogFilter
+ if !logFlags.exact {
+ filters = append(filters, &cpb.LogFilter{
+ Filter: &cpb.LogFilter_WithChildren_{
+ WithChildren: &cpb.LogFilter_WithChildren{},
},
- },
+ })
+ }
+ backlogMode := api.GetLogsRequest_BACKLOG_ALL
+ var backlogCount int64
+ switch {
+ case logFlags.backlog > 0:
+ backlogMode = api.GetLogsRequest_BACKLOG_COUNT
+ backlogCount = int64(logFlags.backlog)
+ case logFlags.backlog == 0:
+ backlogMode = api.GetLogsRequest_BACKLOG_DISABLE
+ }
+
+ srv, err := nmgmt.Logs(ctx, &api.GetLogsRequest{
+ Dn: logFlags.dn,
+ BacklogMode: backlogMode,
+ BacklogCount: backlogCount,
+ StreamMode: streamMode,
+ Filters: filters,
})
if err != nil {
return fmt.Errorf("failed to get logs: %w", err)
@@ -74,22 +129,42 @@
for {
res, err := srv.Recv()
if errors.Is(err, io.EOF) {
- fmt.Println("Done.")
+ fmt.Println("=== Done.")
break
}
if err != nil {
return fmt.Errorf("log stream failed: %w", err)
}
for _, entry := range res.BacklogEntries {
- entry, err := logtree.LogEntryFromProto(entry)
- if err != nil {
- fmt.Printf("invalid entry: %v\n", err)
- continue
- }
- fmt.Println(entry.String())
+ printEntry(entry)
+ }
+ for _, entry := range res.StreamEntries {
+ printEntry(entry)
}
}
return nil
},
}
+
+func printEntry(e *cpb.LogEntry) {
+ entry, err := logtree.LogEntryFromProto(e)
+ if err != nil {
+ fmt.Printf("invalid stream entry: %v\n", err)
+ return
+ }
+ if logFlags.concise {
+ fmt.Println(entry.ConciseString(logtree.MetropolisShortenDict, 0))
+ } else {
+ fmt.Println(entry.String())
+ }
+}
+
+func init() {
+ nodeLogsCmd.Flags().BoolVarP(&logFlags.follow, "follow", "f", false, "Continue streaming logs after fetching backlog.")
+ nodeLogsCmd.Flags().StringVar(&logFlags.dn, "dn", "", "Distinguished Name to get logs from (and children, if --exact is not set). If not set, defaults to '', which is the top-level DN.")
+ nodeLogsCmd.Flags().BoolVarP(&logFlags.exact, "exact", "e", false, "Only show logs for exactly the DN, do not recurse down the tree.")
+ nodeLogsCmd.Flags().BoolVarP(&logFlags.concise, "concise", "c", false, "Output concise logs.")
+ nodeLogsCmd.Flags().IntVar(&logFlags.backlog, "backlog", -1, "How many lines of historical log data to return. The default (-1) returns all available lines. Zero value means no backlog is returned (useful when using --follow).")
+ nodeCmd.AddCommand(nodeLogsCmd)
+}
diff --git a/metropolis/cli/metroctl/main.go b/metropolis/cli/metroctl/main.go
index 19b378d..a4ea438 100644
--- a/metropolis/cli/metroctl/main.go
+++ b/metropolis/cli/metroctl/main.go
@@ -44,7 +44,7 @@
rootCmd.PersistentFlags().StringVar(&flags.proxyAddr, "proxy", "", "SOCKS5 proxy address")
rootCmd.PersistentFlags().StringVar(&flags.configPath, "config", filepath.Join(xdg.ConfigHome, "metroctl"), "An alternative cluster config path")
rootCmd.PersistentFlags().BoolVar(&flags.verbose, "verbose", false, "Log additional runtime information")
- rootCmd.PersistentFlags().StringVarP(&flags.format, "format", "f", "plaintext", "Data output format")
+ rootCmd.PersistentFlags().StringVar(&flags.format, "format", "plaintext", "Data output format")
rootCmd.PersistentFlags().StringVar(&flags.filter, "filter", "", "The object filter applied to the output data")
rootCmd.PersistentFlags().StringVarP(&flags.output, "output", "o", "", "Redirects output to the specified file")
}
diff --git a/metropolis/cli/metroctl/test/test.go b/metropolis/cli/metroctl/test/test.go
index 5468834..872f7b3 100644
--- a/metropolis/cli/metroctl/test/test.go
+++ b/metropolis/cli/metroctl/test/test.go
@@ -265,6 +265,20 @@
return nil
})
})
+ t.Run("logs [nodeID]", func(t *testing.T) {
+ util.TestEventual(t, "metroctl logs [nodeID]", ctx, 10*time.Second, func(ctx context.Context) error {
+ var args []string
+ args = append(args, commonOpts...)
+ args = append(args, endpointOpts...)
+ args = append(args, "node", "logs", cl.NodeIDs[1])
+
+ if err := mctlFailIfMissing(t, ctx, args, "Cluster enrolment done."); err != nil {
+ return err
+ }
+
+ return nil
+ })
+ })
t.Run("set/unset role", func(t *testing.T) {
util.TestEventual(t, "metroctl set/unset role KubernetesController", ctx, 10*time.Second, func(ctx context.Context) error {
nid := cl.NodeIDs[1]
diff --git a/metropolis/node/core/BUILD.bazel b/metropolis/node/core/BUILD.bazel
index 281266b..07f4f6e 100644
--- a/metropolis/node/core/BUILD.bazel
+++ b/metropolis/node/core/BUILD.bazel
@@ -25,6 +25,7 @@
"//metropolis/node/core/cluster",
"//metropolis/node/core/localstorage",
"//metropolis/node/core/localstorage/declarative",
+ "//metropolis/node/core/mgmt",
"//metropolis/node/core/network",
"//metropolis/node/core/network/hostsfile",
"//metropolis/node/core/roleserve",
@@ -35,7 +36,6 @@
"//metropolis/pkg/supervisor",
"//metropolis/pkg/tpm",
"//metropolis/proto/api",
- "//metropolis/proto/common",
"@com_github_containerd_containerd//:containerd",
"@com_github_containerd_containerd//namespaces",
"@org_golang_google_grpc//:go_default_library",
diff --git a/metropolis/node/core/debug_service_enabled.go b/metropolis/node/core/debug_service_enabled.go
index 3494270..75e92bc 100644
--- a/metropolis/node/core/debug_service_enabled.go
+++ b/metropolis/node/core/debug_service_enabled.go
@@ -31,13 +31,14 @@
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
- common "source.monogon.dev/metropolis/node"
"source.monogon.dev/metropolis/node/core/localstorage"
+ "source.monogon.dev/metropolis/node/core/mgmt"
"source.monogon.dev/metropolis/node/core/roleserve"
"source.monogon.dev/metropolis/pkg/logtree"
"source.monogon.dev/metropolis/pkg/supervisor"
+
+ common "source.monogon.dev/metropolis/node"
apb "source.monogon.dev/metropolis/proto/api"
- cpb "source.monogon.dev/metropolis/proto/common"
)
const (
@@ -98,142 +99,10 @@
}
func (s *debugService) GetLogs(req *apb.GetLogsRequest, srv apb.NodeDebugService_GetLogsServer) error {
- if len(req.Filters) > logFilterMax {
- return status.Errorf(codes.InvalidArgument, "requested %d filters, maximum permitted is %d", len(req.Filters), logFilterMax)
+ svc := mgmt.LogService{
+ LogTree: s.logtree,
}
- dn := logtree.DN(req.Dn)
- _, err := dn.Path()
- switch err {
- case nil:
- case logtree.ErrInvalidDN:
- return status.Errorf(codes.InvalidArgument, "invalid DN")
- default:
- return status.Errorf(codes.Unavailable, "could not parse DN: %v", err)
- }
-
- var options []logtree.LogReadOption
-
- // Turn backlog mode into logtree option(s).
- switch req.BacklogMode {
- case apb.GetLogsRequest_BACKLOG_DISABLE:
- case apb.GetLogsRequest_BACKLOG_ALL:
- options = append(options, logtree.WithBacklog(logtree.BacklogAllAvailable))
- case apb.GetLogsRequest_BACKLOG_COUNT:
- count := int(req.BacklogCount)
- if count <= 0 {
- return status.Errorf(codes.InvalidArgument, "backlog_count must be > 0 if backlog_mode is BACKLOG_COUNT")
- }
- options = append(options, logtree.WithBacklog(count))
- default:
- return status.Errorf(codes.InvalidArgument, "unknown backlog_mode %d", req.BacklogMode)
- }
-
- // Turn stream mode into logtree option(s).
- streamEnable := false
- switch req.StreamMode {
- case apb.GetLogsRequest_STREAM_DISABLE:
- case apb.GetLogsRequest_STREAM_UNBUFFERED:
- streamEnable = true
- options = append(options, logtree.WithStream())
- }
-
- // Parse proto filters into logtree options.
- for i, filter := range req.Filters {
- switch inner := filter.Filter.(type) {
- case *cpb.LogFilter_WithChildren_:
- options = append(options, logtree.WithChildren())
- case *cpb.LogFilter_OnlyRaw_:
- options = append(options, logtree.OnlyRaw())
- case *cpb.LogFilter_OnlyLeveled_:
- options = append(options, logtree.OnlyLeveled())
- case *cpb.LogFilter_LeveledWithMinimumSeverity_:
- severity, err := logtree.SeverityFromProto(inner.LeveledWithMinimumSeverity.Minimum)
- if err != nil {
- return status.Errorf(codes.InvalidArgument, "filter %d has invalid severity: %v", i, err)
- }
- options = append(options, logtree.LeveledWithMinimumSeverity(severity))
- }
- }
-
- reader, err := s.logtree.Read(logtree.DN(req.Dn), options...)
- switch err {
- case nil:
- case logtree.ErrRawAndLeveled:
- return status.Errorf(codes.InvalidArgument, "requested only raw and only leveled logs simultaneously")
- default:
- return status.Errorf(codes.Unavailable, "could not retrieve logs: %v", err)
- }
- defer reader.Close()
-
- // Default protobuf message size limit is 64MB. We want to limit ourselves
- // to 10MB.
- // Currently each raw log line can be at most 1024 unicode codepoints (or
- // 4096 bytes). To cover extra metadata and proto overhead, let's round
- // this up to 4500 bytes. This in turn means we can store a maximum of
- // (10e6/4500) == 2222 entries.
- // Currently each leveled log line can also be at most 1024 unicode
- // codepoints (or 4096 bytes). To cover extra metadata and proto overhead
- // let's round this up to 2000 bytes. This in turn means we can store a
- // maximum of (10e6/5000) == 2000 entries.
- // The lowever of these numbers, ie the worst case scenario, is 2000
- // maximum entries.
- maxChunkSize := 2000
-
- // Serve all backlog entries in chunks.
- chunk := make([]*cpb.LogEntry, 0, maxChunkSize)
- for _, entry := range reader.Backlog {
- p := entry.Proto()
- if p == nil {
- // TODO(q3k): log this once we have logtree/gRPC compatibility.
- continue
- }
- chunk = append(chunk, p)
-
- if len(chunk) >= maxChunkSize {
- err := srv.Send(&apb.GetLogsResponse{
- BacklogEntries: chunk,
- })
- if err != nil {
- return err
- }
- chunk = make([]*cpb.LogEntry, 0, maxChunkSize)
- }
- }
-
- // Send last chunk of backlog, if present..
- if len(chunk) > 0 {
- err := srv.Send(&apb.GetLogsResponse{
- BacklogEntries: chunk,
- })
- if err != nil {
- return err
- }
- chunk = make([]*cpb.LogEntry, 0, maxChunkSize)
- }
-
- // Start serving streaming data, if streaming has been requested.
- if !streamEnable {
- return nil
- }
-
- for {
- entry, ok := <-reader.Stream
- if !ok {
- // Streaming has been ended by logtree - tell the client and return.
- return status.Error(codes.Unavailable, "log streaming aborted by system")
- }
- p := entry.Proto()
- if p == nil {
- // TODO(q3k): log this once we have logtree/gRPC compatibility.
- continue
- }
- err := srv.Send(&apb.GetLogsResponse{
- StreamEntries: []*cpb.LogEntry{p},
- })
- if err != nil {
- return err
- }
- }
+ return svc.Logs(req, srv)
}
// Validate property names as they are used in path construction and we really
diff --git a/metropolis/node/core/main.go b/metropolis/node/core/main.go
index 8496e47..0538478 100644
--- a/metropolis/node/core/main.go
+++ b/metropolis/node/core/main.go
@@ -154,6 +154,7 @@
StorageRoot: root,
Network: networkSvc,
Resolver: res,
+ LogTree: lt,
})
if err := supervisor.Run(ctx, "role", rs.Run); err != nil {
return fmt.Errorf("failed to start role service: %w", err)
diff --git a/metropolis/node/core/mgmt/BUILD.bazel b/metropolis/node/core/mgmt/BUILD.bazel
index 349f13e..dff5bac 100644
--- a/metropolis/node/core/mgmt/BUILD.bazel
+++ b/metropolis/node/core/mgmt/BUILD.bazel
@@ -1,4 +1,4 @@
-load("@io_bazel_rules_go//go:def.bzl", "go_library")
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
go_library(
name = "mgmt",
@@ -12,10 +12,28 @@
"//metropolis/node",
"//metropolis/node/core/identity",
"//metropolis/node/core/rpc",
+ "//metropolis/pkg/logtree",
"//metropolis/pkg/supervisor",
"//metropolis/proto/api",
+ "//metropolis/proto/common",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
],
)
+
+go_test(
+ name = "mgmt_test",
+ srcs = ["svc_logs_test.go"],
+ embed = [":mgmt"],
+ deps = [
+ "//metropolis/pkg/logtree",
+ "//metropolis/proto/api",
+ "//metropolis/proto/common",
+ "@com_github_google_go_cmp//cmp",
+ "@org_golang_google_grpc//:go_default_library",
+ "@org_golang_google_grpc//credentials/insecure",
+ "@org_golang_google_grpc//test/bufconn",
+ "@org_golang_google_protobuf//testing/protocmp",
+ ],
+)
diff --git a/metropolis/node/core/mgmt/mgmt.go b/metropolis/node/core/mgmt/mgmt.go
index 5fa12a0..2572764 100644
--- a/metropolis/node/core/mgmt/mgmt.go
+++ b/metropolis/node/core/mgmt/mgmt.go
@@ -12,16 +12,34 @@
"source.monogon.dev/metropolis/node"
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/rpc"
+ "source.monogon.dev/metropolis/pkg/logtree"
"source.monogon.dev/metropolis/pkg/supervisor"
apb "source.monogon.dev/metropolis/proto/api"
)
+// Service implements metropolis.proto.api.NodeManagement.
type Service struct {
+ // NodeCredentials used to set up gRPC server.
NodeCredentials *identity.NodeCredentials
+ // LogTree from which NodeManagement.Logs will be served.
+ LogTree *logtree.LogTree
+
+ // Automatically populated on Run.
+ LogService
}
+// Run the Servie as a supervisor runnable.
func (s *Service) Run(ctx context.Context) error {
+ if s.NodeCredentials == nil {
+ return fmt.Errorf("NodeCredentials missing")
+ }
+ if s.LogTree == nil {
+ return fmt.Errorf("LogTree missing")
+ }
+
+ s.LogService.LogTree = s.LogTree
+
sec := rpc.ServerSecurity{
NodeCredentials: s.NodeCredentials,
}
diff --git a/metropolis/node/core/mgmt/svc_logs.go b/metropolis/node/core/mgmt/svc_logs.go
index 6e05ce9..baf9d9b 100644
--- a/metropolis/node/core/mgmt/svc_logs.go
+++ b/metropolis/node/core/mgmt/svc_logs.go
@@ -4,9 +4,156 @@
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
+ "source.monogon.dev/metropolis/pkg/logtree"
"source.monogon.dev/metropolis/proto/api"
+ cpb "source.monogon.dev/metropolis/proto/common"
)
-func (s *Service) Logs(_ *api.GetLogsRequest, _ api.NodeManagement_LogsServer) error {
- return status.Error(codes.Unimplemented, "unimplemented")
+const (
+ logFilterMax = 10
+)
+
+// LogService implements NodeManagement.Logs. This is split away from the rest of
+// the Service to allow the debug service to reuse this implementation.
+type LogService struct {
+ LogTree *logtree.LogTree
+}
+
+func (s *LogService) Logs(req *api.GetLogsRequest, srv api.NodeManagement_LogsServer) error {
+ if len(req.Filters) > logFilterMax {
+ return status.Errorf(codes.InvalidArgument, "requested %d filters, maximum permitted is %d", len(req.Filters), logFilterMax)
+ }
+ dn := logtree.DN(req.Dn)
+ _, err := dn.Path()
+ switch err {
+ case nil:
+ case logtree.ErrInvalidDN:
+ return status.Errorf(codes.InvalidArgument, "invalid DN")
+ default:
+ return status.Errorf(codes.Unavailable, "could not parse DN: %v", err)
+ }
+
+ var options []logtree.LogReadOption
+
+ // Turn backlog mode into logtree option(s).
+ switch req.BacklogMode {
+ case api.GetLogsRequest_BACKLOG_DISABLE:
+ case api.GetLogsRequest_BACKLOG_ALL:
+ options = append(options, logtree.WithBacklog(logtree.BacklogAllAvailable))
+ case api.GetLogsRequest_BACKLOG_COUNT:
+ count := int(req.BacklogCount)
+ if count <= 0 {
+ return status.Errorf(codes.InvalidArgument, "backlog_count must be > 0 if backlog_mode is BACKLOG_COUNT")
+ }
+ options = append(options, logtree.WithBacklog(count))
+ default:
+ return status.Errorf(codes.InvalidArgument, "unknown backlog_mode %d", req.BacklogMode)
+ }
+
+ // Turn stream mode into logtree option(s).
+ streamEnable := false
+ switch req.StreamMode {
+ case api.GetLogsRequest_STREAM_DISABLE:
+ case api.GetLogsRequest_STREAM_UNBUFFERED:
+ streamEnable = true
+ options = append(options, logtree.WithStream())
+ }
+
+ // Parse proto filters into logtree options.
+ for i, filter := range req.Filters {
+ switch inner := filter.Filter.(type) {
+ case *cpb.LogFilter_WithChildren_:
+ options = append(options, logtree.WithChildren())
+ case *cpb.LogFilter_OnlyRaw_:
+ options = append(options, logtree.OnlyRaw())
+ case *cpb.LogFilter_OnlyLeveled_:
+ options = append(options, logtree.OnlyLeveled())
+ case *cpb.LogFilter_LeveledWithMinimumSeverity_:
+ severity, err := logtree.SeverityFromProto(inner.LeveledWithMinimumSeverity.Minimum)
+ if err != nil {
+ return status.Errorf(codes.InvalidArgument, "filter %d has invalid severity: %v", i, err)
+ }
+ options = append(options, logtree.LeveledWithMinimumSeverity(severity))
+ }
+ }
+
+ reader, err := s.LogTree.Read(logtree.DN(req.Dn), options...)
+ switch err {
+ case nil:
+ case logtree.ErrRawAndLeveled:
+ return status.Errorf(codes.InvalidArgument, "requested only raw and only leveled logs simultaneously")
+ default:
+ return status.Errorf(codes.Unavailable, "could not retrieve logs: %v", err)
+ }
+ defer reader.Close()
+
+ // Default protobuf message size limit is 64MB. We want to limit ourselves
+ // to 10MB.
+ // Currently each raw log line can be at most 1024 unicode codepoints (or
+ // 4096 bytes). To cover extra metadata and proto overhead, let's round
+ // this up to 4500 bytes. This in turn means we can store a maximum of
+ // (10e6/4500) == 2222 entries.
+ // Currently each leveled log line can also be at most 1024 unicode
+ // codepoints (or 4096 bytes). To cover extra metadata and proto overhead
+ // let's round this up to 2000 bytes. This in turn means we can store a
+ // maximum of (10e6/5000) == 2000 entries.
+ // The lowever of these numbers, ie the worst case scenario, is 2000
+ // maximum entries.
+ maxChunkSize := 2000
+
+ // Serve all backlog entries in chunks.
+ chunk := make([]*cpb.LogEntry, 0, maxChunkSize)
+ for _, entry := range reader.Backlog {
+ p := entry.Proto()
+ if p == nil {
+ // TODO(q3k): log this once we have logtree/gRPC compatibility.
+ continue
+ }
+ chunk = append(chunk, p)
+
+ if len(chunk) >= maxChunkSize {
+ err := srv.Send(&api.GetLogsResponse{
+ BacklogEntries: chunk,
+ })
+ if err != nil {
+ return err
+ }
+ chunk = make([]*cpb.LogEntry, 0, maxChunkSize)
+ }
+ }
+
+ // Send last chunk of backlog, if present..
+ if len(chunk) > 0 {
+ err := srv.Send(&api.GetLogsResponse{
+ BacklogEntries: chunk,
+ })
+ if err != nil {
+ return err
+ }
+ chunk = make([]*cpb.LogEntry, 0, maxChunkSize)
+ }
+
+ // Start serving streaming data, if streaming has been requested.
+ if !streamEnable {
+ return nil
+ }
+
+ for {
+ entry, ok := <-reader.Stream
+ if !ok {
+ // Streaming has been ended by logtree - tell the client and return.
+ return status.Error(codes.Unavailable, "log streaming aborted by system")
+ }
+ p := entry.Proto()
+ if p == nil {
+ // TODO(q3k): log this once we have logtree/gRPC compatibility.
+ continue
+ }
+ err := srv.Send(&api.GetLogsResponse{
+ StreamEntries: []*cpb.LogEntry{p},
+ })
+ if err != nil {
+ return err
+ }
+ }
}
diff --git a/metropolis/node/core/mgmt/svc_logs_test.go b/metropolis/node/core/mgmt/svc_logs_test.go
new file mode 100644
index 0000000..d5825ad
--- /dev/null
+++ b/metropolis/node/core/mgmt/svc_logs_test.go
@@ -0,0 +1,367 @@
+package mgmt
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "io"
+ "net"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/google/go-cmp/cmp"
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/credentials/insecure"
+ "google.golang.org/grpc/test/bufconn"
+ "google.golang.org/protobuf/testing/protocmp"
+
+ "source.monogon.dev/metropolis/pkg/logtree"
+ "source.monogon.dev/metropolis/proto/api"
+ cpb "source.monogon.dev/metropolis/proto/common"
+)
+
+func dut(t *testing.T) (*Service, *grpc.ClientConn) {
+ lt := logtree.New()
+ s := &Service{
+ LogTree: lt,
+ LogService: LogService{
+ LogTree: lt,
+ },
+ }
+
+ srv := grpc.NewServer()
+ api.RegisterNodeManagementServer(srv, s)
+ externalLis := bufconn.Listen(1024 * 1024)
+ go func() {
+ if err := srv.Serve(externalLis); err != nil {
+ t.Fatalf("GRPC serve failed: %v", err)
+ }
+ }()
+ withLocalDialer := grpc.WithContextDialer(func(_ context.Context, _ string) (net.Conn, error) {
+ return externalLis.Dial()
+ })
+ cl, err := grpc.Dial("local", withLocalDialer, grpc.WithTransportCredentials(insecure.NewCredentials()))
+ if err != nil {
+ t.Fatalf("Dialing GRPC failed: %v", err)
+ }
+
+ return s, cl
+}
+
+func cleanLogEntry(e *cpb.LogEntry) {
+ // Filter out bits that change too much to test them.
+ switch k := e.Kind.(type) {
+ case *cpb.LogEntry_Leveled_:
+ k.Leveled.Location = ""
+ k.Leveled.Timestamp = nil
+ }
+}
+
+func mkRawEntry(dn string, line string) *cpb.LogEntry {
+ return &cpb.LogEntry{
+ Dn: dn, Kind: &cpb.LogEntry_Raw_{
+ Raw: &cpb.LogEntry_Raw{
+ Data: line,
+ OriginalLength: int64(len(line)),
+ },
+ },
+ }
+}
+
+func mkLeveledEntry(dn string, severity string, lines string) *cpb.LogEntry {
+ var sev cpb.LeveledLogSeverity
+ switch severity {
+ case "i":
+ sev = cpb.LeveledLogSeverity_INFO
+ case "w":
+ sev = cpb.LeveledLogSeverity_WARNING
+ case "e":
+ sev = cpb.LeveledLogSeverity_ERROR
+ }
+ return &cpb.LogEntry{
+ Dn: dn, Kind: &cpb.LogEntry_Leveled_{
+ Leveled: &cpb.LogEntry_Leveled{
+ Lines: strings.Split(lines, "\n"),
+ Severity: sev,
+ },
+ },
+ }
+}
+
+func drainLogs(t *testing.T, srv api.NodeManagement_LogsClient) (res []*cpb.LogEntry) {
+ t.Helper()
+ for {
+ ev, err := srv.Recv()
+ if errors.Is(err, io.EOF) {
+ return
+ }
+ if err != nil {
+ t.Errorf("Recv: %v", err)
+ return
+ }
+ for _, e := range ev.BacklogEntries {
+ res = append(res, e)
+ }
+ }
+}
+
+// TestLogService_Logs_Backlog exercises the basic log API by requesting
+// backlogged leveled log entries.
+func TestLogService_Logs_Backlog(t *testing.T) {
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+
+ s, cl := dut(t)
+
+ mgmt := api.NewNodeManagementClient(cl)
+
+ s.LogTree.MustLeveledFor("init").Infof("Hello")
+ s.LogTree.MustLeveledFor("main").Infof("Starting roleserver...")
+ s.LogTree.MustLeveledFor("main.roleserver").Infof("Waiting for node roles...")
+ s.LogTree.MustLeveledFor("main.roleserver.kubernetes").Infof("Starting kubernetes...")
+ s.LogTree.MustLeveledFor("main.roleserver.controlplane").Infof("Starting control plane...")
+ s.LogTree.MustLeveledFor("main.roleserver.kubernetes").Infof("Kubernetes version: 1.21.37")
+ s.LogTree.MustLeveledFor("main.roleserver.controlplane").Infof("Starting etcd...")
+
+ mkReq := func(dn string, backlog int64) *api.GetLogsRequest {
+ var backlogMode api.GetLogsRequest_BacklogMode
+ var backlogCount int64
+ switch {
+ case backlog < 0:
+ backlogMode = api.GetLogsRequest_BACKLOG_ALL
+ case backlog > 0:
+ backlogMode = api.GetLogsRequest_BACKLOG_COUNT
+ backlogCount = backlog
+ case backlog == 0:
+ backlogMode = api.GetLogsRequest_BACKLOG_DISABLE
+ }
+ return &api.GetLogsRequest{
+ Dn: dn,
+ BacklogMode: backlogMode,
+ BacklogCount: backlogCount,
+ StreamMode: api.GetLogsRequest_STREAM_DISABLE,
+ }
+ }
+ mkRecursive := func(in *api.GetLogsRequest) *api.GetLogsRequest {
+ in.Filters = append(in.Filters, &cpb.LogFilter{
+ Filter: &cpb.LogFilter_WithChildren_{
+ WithChildren: &cpb.LogFilter_WithChildren{},
+ },
+ })
+ return in
+ }
+ for i, te := range []struct {
+ req *api.GetLogsRequest
+ want []*cpb.LogEntry
+ }{
+ {
+ // Test all backlog.
+ req: mkReq("main.roleserver.kubernetes", -1),
+ want: []*cpb.LogEntry{
+ mkLeveledEntry("main.roleserver.kubernetes", "i", "Starting kubernetes..."),
+ mkLeveledEntry("main.roleserver.kubernetes", "i", "Kubernetes version: 1.21.37"),
+ },
+ },
+ {
+ // Test exact backlog.
+ req: mkReq("main.roleserver.kubernetes", 1),
+ want: []*cpb.LogEntry{
+ mkLeveledEntry("main.roleserver.kubernetes", "i", "Kubernetes version: 1.21.37"),
+ },
+ },
+ {
+ // Test no backlog.
+ req: mkReq("main.roleserver.kubernetes", 0),
+ want: nil,
+ },
+
+ {
+ // Test recursion with backlog.
+ req: mkRecursive(mkReq("main.roleserver", 2)),
+ want: []*cpb.LogEntry{
+ mkLeveledEntry("main.roleserver.kubernetes", "i", "Kubernetes version: 1.21.37"),
+ mkLeveledEntry("main.roleserver.controlplane", "i", "Starting etcd..."),
+ },
+ },
+ } {
+ srv, err := mgmt.Logs(ctx, te.req)
+ if err != nil {
+ t.Errorf("Case %d: Logs failed: %v", i, err)
+ continue
+ }
+ logs := drainLogs(t, srv)
+ for _, e := range logs {
+ cleanLogEntry(e)
+ }
+ diff := cmp.Diff(te.want, logs, protocmp.Transform())
+ if diff != "" {
+ t.Errorf("Case %d: diff: \n%s", i, diff)
+ }
+ }
+}
+
+// TestLogService_Logs_Strea, exercises the basic log API by requesting
+// streaming leveled log entries.
+func TestLogService_Logs_Stream(t *testing.T) {
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+
+ s, cl := dut(t)
+
+ // Start streaming all logs.
+ mgmt := api.NewNodeManagementClient(cl)
+ srv, err := mgmt.Logs(ctx, &api.GetLogsRequest{
+ Dn: "",
+ BacklogMode: api.GetLogsRequest_BACKLOG_ALL,
+ StreamMode: api.GetLogsRequest_STREAM_UNBUFFERED,
+ Filters: []*cpb.LogFilter{
+ {
+ Filter: &cpb.LogFilter_WithChildren_{
+ WithChildren: &cpb.LogFilter_WithChildren{},
+ },
+ },
+ },
+ })
+ if err != nil {
+ t.Fatalf("Logs failed: %v", err)
+ }
+
+ // Pipe returned logs into a channel for analysis.
+ logC := make(chan *cpb.LogEntry)
+ go func() {
+ for {
+ ev, err := srv.Recv()
+ if err != nil {
+ return
+ }
+ for _, e := range ev.BacklogEntries {
+ logC <- e
+ }
+ for _, e := range ev.StreamEntries {
+ logC <- e
+ }
+ }
+ }()
+
+ // Submit log entry, expect it on the channel.
+ s.LogTree.MustLeveledFor("test").Infof("Hello, world")
+ select {
+ case e := <-logC:
+ cleanLogEntry(e)
+ if diff := cmp.Diff(mkLeveledEntry("test", "i", "Hello, world"), e, protocmp.Transform()); diff != "" {
+ t.Errorf("Diff:\n%s", diff)
+ }
+ case <-time.After(time.Second * 2):
+ t.Errorf("Timeout")
+ }
+
+ // That could've made it through the backlog. Do it again to make sure it came
+ // through streaming.
+ s.LogTree.MustLeveledFor("test").Infof("Hello again, world")
+ select {
+ case e := <-logC:
+ cleanLogEntry(e)
+ if diff := cmp.Diff(mkLeveledEntry("test", "i", "Hello again, world"), e, protocmp.Transform()); diff != "" {
+ t.Errorf("Diff:\n%s", diff)
+ }
+ case <-time.After(time.Second * 2):
+ t.Errorf("Timeout")
+ }
+}
+
+// TestLogService_Logs_Filters exercises the rest of the filter functionality.
+func TestLogService_Logs_Filters(t *testing.T) {
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+
+ s, cl := dut(t)
+
+ mgmt := api.NewNodeManagementClient(cl)
+ s.LogTree.MustLeveledFor("main").Infof("Hello")
+ s.LogTree.MustLeveledFor("main").Infof("Starting...")
+ s.LogTree.MustLeveledFor("main").Warningf("Something failed!")
+ fmt.Fprintln(s.LogTree.MustRawFor("main"), "medium rare")
+ s.LogTree.MustLeveledFor("main").Errorf("Something failed very hard!")
+
+ for i, te := range []struct {
+ req *api.GetLogsRequest
+ want []*cpb.LogEntry
+ }{
+ // Case 0: request given level
+ {
+ req: &api.GetLogsRequest{
+ Dn: "main",
+ BacklogMode: api.GetLogsRequest_BACKLOG_ALL,
+ StreamMode: api.GetLogsRequest_STREAM_DISABLE,
+ Filters: []*cpb.LogFilter{
+ {
+ Filter: &cpb.LogFilter_LeveledWithMinimumSeverity_{
+ LeveledWithMinimumSeverity: &cpb.LogFilter_LeveledWithMinimumSeverity{
+ Minimum: cpb.LeveledLogSeverity_WARNING,
+ },
+ },
+ },
+ },
+ },
+ want: []*cpb.LogEntry{
+ mkLeveledEntry("main", "w", "Something failed!"),
+ mkLeveledEntry("main", "e", "Something failed very hard!"),
+ },
+ },
+ // Case 1: request raw only
+ {
+ req: &api.GetLogsRequest{
+ Dn: "main",
+ BacklogMode: api.GetLogsRequest_BACKLOG_ALL,
+ StreamMode: api.GetLogsRequest_STREAM_DISABLE,
+ Filters: []*cpb.LogFilter{
+ {
+ Filter: &cpb.LogFilter_OnlyRaw_{
+ OnlyRaw: &cpb.LogFilter_OnlyRaw{},
+ },
+ },
+ },
+ },
+ want: []*cpb.LogEntry{
+ mkRawEntry("main", "medium rare"),
+ },
+ },
+ // Case 2: request leveled only
+ {
+ req: &api.GetLogsRequest{
+ Dn: "main",
+ BacklogMode: api.GetLogsRequest_BACKLOG_ALL,
+ StreamMode: api.GetLogsRequest_STREAM_DISABLE,
+ Filters: []*cpb.LogFilter{
+ {
+ Filter: &cpb.LogFilter_OnlyLeveled_{
+ OnlyLeveled: &cpb.LogFilter_OnlyLeveled{},
+ },
+ },
+ },
+ },
+ want: []*cpb.LogEntry{
+ mkLeveledEntry("main", "i", "Hello"),
+ mkLeveledEntry("main", "i", "Starting..."),
+ mkLeveledEntry("main", "w", "Something failed!"),
+ mkLeveledEntry("main", "e", "Something failed very hard!"),
+ },
+ },
+ } {
+ srv, err := mgmt.Logs(ctx, te.req)
+ if err != nil {
+ t.Errorf("Case %d: Logs failed: %v", i, err)
+ continue
+ }
+ logs := drainLogs(t, srv)
+ for _, e := range logs {
+ cleanLogEntry(e)
+ }
+ diff := cmp.Diff(te.want, logs, protocmp.Transform())
+ if diff != "" {
+ t.Errorf("Case %d: diff: \n%s", i, diff)
+ }
+ }
+
+}
diff --git a/metropolis/node/core/roleserve/BUILD.bazel b/metropolis/node/core/roleserve/BUILD.bazel
index c2dada4..8aa4fc0 100644
--- a/metropolis/node/core/roleserve/BUILD.bazel
+++ b/metropolis/node/core/roleserve/BUILD.bazel
@@ -33,6 +33,7 @@
"//metropolis/node/kubernetes/pki",
"//metropolis/pkg/event",
"//metropolis/pkg/event/memory",
+ "//metropolis/pkg/logtree",
"//metropolis/pkg/pki",
"//metropolis/pkg/supervisor",
"//metropolis/proto/common",
diff --git a/metropolis/node/core/roleserve/roleserve.go b/metropolis/node/core/roleserve/roleserve.go
index 8f9bb47..a1969bf 100644
--- a/metropolis/node/core/roleserve/roleserve.go
+++ b/metropolis/node/core/roleserve/roleserve.go
@@ -49,7 +49,9 @@
"source.monogon.dev/metropolis/node/core/network"
"source.monogon.dev/metropolis/node/core/rpc/resolver"
"source.monogon.dev/metropolis/pkg/event/memory"
+ "source.monogon.dev/metropolis/pkg/logtree"
"source.monogon.dev/metropolis/pkg/supervisor"
+
cpb "source.monogon.dev/metropolis/proto/common"
)
@@ -68,6 +70,8 @@
// created early in the roleserver lifecycle, and is seeded with node
// information as the first subordinate runs DialCurator().
Resolver *resolver.Resolver
+
+ LogTree *logtree.LogTree
}
// Service is the roleserver/“Role Server” service. See the package-level
@@ -132,6 +136,7 @@
s.nodeMgmt = &workerNodeMgmt{
clusterMembership: &s.ClusterMembership,
+ logTree: s.LogTree,
}
return s
diff --git a/metropolis/node/core/roleserve/worker_nodemgmt.go b/metropolis/node/core/roleserve/worker_nodemgmt.go
index 889f0eb..9ffb112 100644
--- a/metropolis/node/core/roleserve/worker_nodemgmt.go
+++ b/metropolis/node/core/roleserve/worker_nodemgmt.go
@@ -5,11 +5,13 @@
"source.monogon.dev/metropolis/node/core/mgmt"
"source.monogon.dev/metropolis/pkg/event/memory"
+ "source.monogon.dev/metropolis/pkg/logtree"
"source.monogon.dev/metropolis/pkg/supervisor"
)
type workerNodeMgmt struct {
clusterMembership *memory.Value[*ClusterMembership]
+ logTree *logtree.LogTree
}
func (s *workerNodeMgmt) run(ctx context.Context) error {
@@ -24,6 +26,7 @@
supervisor.Logger(ctx).Infof("Got cluster membership, starting...")
srv := mgmt.Service{
NodeCredentials: cm.credentials,
+ LogTree: s.logTree,
}
return srv.Run(ctx)
}
diff --git a/metropolis/pkg/cmd/run.go b/metropolis/pkg/cmd/run.go
index 9f554b5..7834c99 100644
--- a/metropolis/pkg/cmd/run.go
+++ b/metropolis/pkg/cmd/run.go
@@ -33,13 +33,18 @@
// after the reader loop is broken, avoid deadlocks by making lineC a
// buffered channel.
lineC := make(chan string, 2)
- outBuffer := logbuffer.NewLineBuffer(1024, func(l *logbuffer.Line) {
- lineC <- l.Data
- })
+ lineCB := func(l *logbuffer.Line) {
+ // If the context is canceled, no-one is listening on lineC anymore, so we would
+ // block.
+ select {
+ case <-ctx.Done():
+ return
+ case lineC <- l.Data:
+ }
+ }
+ outBuffer := logbuffer.NewLineBuffer(1024, lineCB)
defer outBuffer.Close()
- errBuffer := logbuffer.NewLineBuffer(1024, func(l *logbuffer.Line) {
- lineC <- l.Data
- })
+ errBuffer := logbuffer.NewLineBuffer(1024, lineCB)
defer errBuffer.Close()
// Prepare the command context, and start the process.