| // Copyright 2020 The Monogon Project Authors. |
| // |
| // SPDX-License-Identifier: Apache-2.0 |
| // |
| // Licensed under the Apache License, Version 2.0 (the "License"); |
| // you may not use this file except in compliance with the License. |
| // You may obtain a copy of the License at |
| // |
| // http://www.apache.org/licenses/LICENSE-2.0 |
| // |
| // Unless required by applicable law or agreed to in writing, software |
| // distributed under the License is distributed on an "AS IS" BASIS, |
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| // See the License for the specific language governing permissions and |
| // limitations under the License. |
| |
| package main |
| |
| import ( |
| "context" |
| |
| "google.golang.org/grpc/codes" |
| "google.golang.org/grpc/status" |
| |
| "source.monogon.dev/metropolis/node/core/cluster" |
| "source.monogon.dev/metropolis/node/kubernetes" |
| "source.monogon.dev/metropolis/pkg/logtree" |
| apb "source.monogon.dev/metropolis/proto/api" |
| ) |
| |
| const ( |
| logFilterMax = 1000 |
| ) |
| |
| // debugService implements the Metropolis node debug API. |
| type debugService struct { |
| cluster *cluster.Manager |
| kubernetes *kubernetes.Service |
| logtree *logtree.LogTree |
| } |
| |
| func (s *debugService) GetDebugKubeconfig(ctx context.Context, req *apb.GetDebugKubeconfigRequest) (*apb.GetDebugKubeconfigResponse, error) { |
| return s.kubernetes.GetDebugKubeconfig(ctx, req) |
| } |
| |
| func (s *debugService) GetLogs(req *apb.GetLogsRequest, srv apb.NodeDebugService_GetLogsServer) error { |
| if len(req.Filters) > logFilterMax { |
| return status.Errorf(codes.InvalidArgument, "requested %d filters, maximum permitted is %d", len(req.Filters), logFilterMax) |
| } |
| dn := logtree.DN(req.Dn) |
| _, err := dn.Path() |
| switch err { |
| case nil: |
| case logtree.ErrInvalidDN: |
| return status.Errorf(codes.InvalidArgument, "invalid DN") |
| default: |
| return status.Errorf(codes.Unavailable, "could not parse DN: %v", err) |
| } |
| |
| var options []logtree.LogReadOption |
| |
| // Turn backlog mode into logtree option(s). |
| switch req.BacklogMode { |
| case apb.GetLogsRequest_BACKLOG_DISABLE: |
| case apb.GetLogsRequest_BACKLOG_ALL: |
| options = append(options, logtree.WithBacklog(logtree.BacklogAllAvailable)) |
| case apb.GetLogsRequest_BACKLOG_COUNT: |
| count := int(req.BacklogCount) |
| if count <= 0 { |
| return status.Errorf(codes.InvalidArgument, "backlog_count must be > 0 if backlog_mode is BACKLOG_COUNT") |
| } |
| options = append(options, logtree.WithBacklog(count)) |
| default: |
| return status.Errorf(codes.InvalidArgument, "unknown backlog_mode %d", req.BacklogMode) |
| } |
| |
| // Turn stream mode into logtree option(s). |
| streamEnable := false |
| switch req.StreamMode { |
| case apb.GetLogsRequest_STREAM_DISABLE: |
| case apb.GetLogsRequest_STREAM_UNBUFFERED: |
| streamEnable = true |
| options = append(options, logtree.WithStream()) |
| } |
| |
| // Parse proto filters into logtree options. |
| for i, filter := range req.Filters { |
| switch inner := filter.Filter.(type) { |
| case *apb.LogFilter_WithChildren_: |
| options = append(options, logtree.WithChildren()) |
| case *apb.LogFilter_OnlyRaw_: |
| options = append(options, logtree.OnlyRaw()) |
| case *apb.LogFilter_OnlyLeveled_: |
| options = append(options, logtree.OnlyLeveled()) |
| case *apb.LogFilter_LeveledWithMinimumSeverity_: |
| severity, err := logtree.SeverityFromProto(inner.LeveledWithMinimumSeverity.Minimum) |
| if err != nil { |
| return status.Errorf(codes.InvalidArgument, "filter %d has invalid severity: %v", i, err) |
| } |
| options = append(options, logtree.LeveledWithMinimumSeverity(severity)) |
| } |
| } |
| |
| reader, err := s.logtree.Read(logtree.DN(req.Dn), options...) |
| switch err { |
| case nil: |
| case logtree.ErrRawAndLeveled: |
| return status.Errorf(codes.InvalidArgument, "requested only raw and only leveled logs simultaneously") |
| default: |
| return status.Errorf(codes.Unavailable, "could not retrieve logs: %v", err) |
| } |
| defer reader.Close() |
| |
| // Default protobuf message size limit is 64MB. We want to limit ourselves |
| // to 10MB. |
| // Currently each raw log line can be at most 1024 unicode codepoints (or |
| // 4096 bytes). To cover extra metadata and proto overhead, let's round |
| // this up to 4500 bytes. This in turn means we can store a maximum of |
| // (10e6/4500) == 2222 entries. |
| // Currently each leveled log line can also be at most 1024 unicode |
| // codepoints (or 4096 bytes). To cover extra metadata and proto overhead |
| // let's round this up to 2000 bytes. This in turn means we can store a |
| // maximum of (10e6/5000) == 2000 entries. |
| // The lowever of these numbers, ie the worst case scenario, is 2000 |
| // maximum entries. |
| maxChunkSize := 2000 |
| |
| // Serve all backlog entries in chunks. |
| chunk := make([]*apb.LogEntry, 0, maxChunkSize) |
| for _, entry := range reader.Backlog { |
| p := entry.Proto() |
| if p == nil { |
| // TODO(q3k): log this once we have logtree/gRPC compatibility. |
| continue |
| } |
| chunk = append(chunk, p) |
| |
| if len(chunk) >= maxChunkSize { |
| err := srv.Send(&apb.GetLogsResponse{ |
| BacklogEntries: chunk, |
| }) |
| if err != nil { |
| return err |
| } |
| chunk = make([]*apb.LogEntry, 0, maxChunkSize) |
| } |
| } |
| |
| // Send last chunk of backlog, if present.. |
| if len(chunk) > 0 { |
| err := srv.Send(&apb.GetLogsResponse{ |
| BacklogEntries: chunk, |
| }) |
| if err != nil { |
| return err |
| } |
| chunk = make([]*apb.LogEntry, 0, maxChunkSize) |
| } |
| |
| // Start serving streaming data, if streaming has been requested. |
| if !streamEnable { |
| return nil |
| } |
| |
| for { |
| entry, ok := <-reader.Stream |
| if !ok { |
| // Streaming has been ended by logtree - tell the client and return. |
| return status.Error(codes.Unavailable, "log streaming aborted by system") |
| } |
| p := entry.Proto() |
| if p == nil { |
| // TODO(q3k): log this once we have logtree/gRPC compatibility. |
| continue |
| } |
| err := srv.Send(&apb.GetLogsResponse{ |
| StreamEntries: []*apb.LogEntry{p}, |
| }) |
| if err != nil { |
| return err |
| } |
| } |
| } |