blob: 6dee3d991835ae467338cfa9da4ce4148ca4ed4f [file] [log] [blame]
Serge Bazanski1ebd1e12020-07-13 19:17:16 +02001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package main
18
19import (
20 "context"
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020021
22 "google.golang.org/grpc/codes"
23 "google.golang.org/grpc/status"
Serge Bazanskib0272182020-11-02 18:39:44 +010024
Serge Bazanski31370b02021-01-07 16:31:14 +010025 "source.monogon.dev/metropolis/node/core/cluster"
Serge Bazanski31370b02021-01-07 16:31:14 +010026 "source.monogon.dev/metropolis/node/kubernetes"
27 "source.monogon.dev/metropolis/pkg/logtree"
28 apb "source.monogon.dev/metropolis/proto/api"
Serge Bazanskib0272182020-11-02 18:39:44 +010029)
30
31const (
32 logFilterMax = 1000
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020033)
34
Serge Bazanski662b5b32020-12-21 13:49:00 +010035// debugService implements the Metropolis node debug API.
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020036type debugService struct {
37 cluster *cluster.Manager
38 kubernetes *kubernetes.Service
Serge Bazanskib0272182020-11-02 18:39:44 +010039 logtree *logtree.LogTree
40}
41
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020042func (s *debugService) GetDebugKubeconfig(ctx context.Context, req *apb.GetDebugKubeconfigRequest) (*apb.GetDebugKubeconfigResponse, error) {
43 return s.kubernetes.GetDebugKubeconfig(ctx, req)
44}
45
Serge Bazanskib0272182020-11-02 18:39:44 +010046func (s *debugService) GetLogs(req *apb.GetLogsRequest, srv apb.NodeDebugService_GetLogsServer) error {
47 if len(req.Filters) > logFilterMax {
48 return status.Errorf(codes.InvalidArgument, "requested %d filters, maximum permitted is %d", len(req.Filters), logFilterMax)
49 }
50 dn := logtree.DN(req.Dn)
51 _, err := dn.Path()
52 switch err {
53 case nil:
54 case logtree.ErrInvalidDN:
55 return status.Errorf(codes.InvalidArgument, "invalid DN")
56 default:
57 return status.Errorf(codes.Unavailable, "could not parse DN: %v", err)
58 }
59
60 var options []logtree.LogReadOption
61
62 // Turn backlog mode into logtree option(s).
63 switch req.BacklogMode {
64 case apb.GetLogsRequest_BACKLOG_DISABLE:
65 case apb.GetLogsRequest_BACKLOG_ALL:
66 options = append(options, logtree.WithBacklog(logtree.BacklogAllAvailable))
67 case apb.GetLogsRequest_BACKLOG_COUNT:
68 count := int(req.BacklogCount)
69 if count <= 0 {
70 return status.Errorf(codes.InvalidArgument, "backlog_count must be > 0 if backlog_mode is BACKLOG_COUNT")
71 }
72 options = append(options, logtree.WithBacklog(count))
73 default:
74 return status.Errorf(codes.InvalidArgument, "unknown backlog_mode %d", req.BacklogMode)
75 }
76
77 // Turn stream mode into logtree option(s).
78 streamEnable := false
79 switch req.StreamMode {
80 case apb.GetLogsRequest_STREAM_DISABLE:
81 case apb.GetLogsRequest_STREAM_UNBUFFERED:
82 streamEnable = true
83 options = append(options, logtree.WithStream())
84 }
85
86 // Parse proto filters into logtree options.
87 for i, filter := range req.Filters {
88 switch inner := filter.Filter.(type) {
89 case *apb.LogFilter_WithChildren_:
90 options = append(options, logtree.WithChildren())
91 case *apb.LogFilter_OnlyRaw_:
92 options = append(options, logtree.OnlyRaw())
93 case *apb.LogFilter_OnlyLeveled_:
94 options = append(options, logtree.OnlyLeveled())
95 case *apb.LogFilter_LeveledWithMinimumSeverity_:
96 severity, err := logtree.SeverityFromProto(inner.LeveledWithMinimumSeverity.Minimum)
97 if err != nil {
98 return status.Errorf(codes.InvalidArgument, "filter %d has invalid severity: %v", i, err)
99 }
100 options = append(options, logtree.LeveledWithMinimumSeverity(severity))
101 }
102 }
103
104 reader, err := s.logtree.Read(logtree.DN(req.Dn), options...)
105 switch err {
106 case nil:
107 case logtree.ErrRawAndLeveled:
108 return status.Errorf(codes.InvalidArgument, "requested only raw and only leveled logs simultaneously")
109 default:
110 return status.Errorf(codes.Unavailable, "could not retrieve logs: %v", err)
111 }
112 defer reader.Close()
113
114 // Default protobuf message size limit is 64MB. We want to limit ourselves
115 // to 10MB.
116 // Currently each raw log line can be at most 1024 unicode codepoints (or
117 // 4096 bytes). To cover extra metadata and proto overhead, let's round
118 // this up to 4500 bytes. This in turn means we can store a maximum of
119 // (10e6/4500) == 2222 entries.
120 // Currently each leveled log line can also be at most 1024 unicode
121 // codepoints (or 4096 bytes). To cover extra metadata and proto overhead
122 // let's round this up to 2000 bytes. This in turn means we can store a
123 // maximum of (10e6/5000) == 2000 entries.
124 // The lowever of these numbers, ie the worst case scenario, is 2000
125 // maximum entries.
126 maxChunkSize := 2000
127
128 // Serve all backlog entries in chunks.
129 chunk := make([]*apb.LogEntry, 0, maxChunkSize)
130 for _, entry := range reader.Backlog {
131 p := entry.Proto()
132 if p == nil {
133 // TODO(q3k): log this once we have logtree/gRPC compatibility.
134 continue
135 }
136 chunk = append(chunk, p)
137
138 if len(chunk) >= maxChunkSize {
139 err := srv.Send(&apb.GetLogsResponse{
140 BacklogEntries: chunk,
141 })
142 if err != nil {
143 return err
144 }
145 chunk = make([]*apb.LogEntry, 0, maxChunkSize)
146 }
147 }
148
149 // Send last chunk of backlog, if present..
150 if len(chunk) > 0 {
151 err := srv.Send(&apb.GetLogsResponse{
152 BacklogEntries: chunk,
153 })
154 if err != nil {
155 return err
156 }
157 chunk = make([]*apb.LogEntry, 0, maxChunkSize)
158 }
159
160 // Start serving streaming data, if streaming has been requested.
161 if !streamEnable {
162 return nil
163 }
164
165 for {
166 entry, ok := <-reader.Stream
167 if !ok {
168 // Streaming has been ended by logtree - tell the client and return.
169 return status.Error(codes.Unavailable, "log streaming aborted by system")
170 }
171 p := entry.Proto()
172 if p == nil {
173 // TODO(q3k): log this once we have logtree/gRPC compatibility.
174 continue
175 }
176 err := srv.Send(&apb.GetLogsResponse{
177 StreamEntries: []*apb.LogEntry{p},
178 })
179 if err != nil {
180 return err
181 }
182 }
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200183}