blob: 4d4c672a055f081746243cc47766c724eb2e2253 [file] [log] [blame]
Serge Bazanski1ebd1e12020-07-13 19:17:16 +02001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package main
18
19import (
Lorenz Brun09c275b2021-03-30 12:47:09 +020020 "bufio"
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020021 "context"
Lorenz Brun09c275b2021-03-30 12:47:09 +020022 "fmt"
Lorenz Brun09c275b2021-03-30 12:47:09 +020023 "os"
24 "regexp"
25 "strings"
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020026
Lorenz Brun9d6c4c72021-07-20 21:16:27 +020027 ctr "github.com/containerd/containerd"
28 "github.com/containerd/containerd/namespaces"
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020029 "google.golang.org/grpc/codes"
30 "google.golang.org/grpc/status"
Serge Bazanskib0272182020-11-02 18:39:44 +010031
Lorenz Brun9d6c4c72021-07-20 21:16:27 +020032 "source.monogon.dev/metropolis/node/core/localstorage"
Serge Bazanski158e9a42021-08-17 17:04:54 +020033 "source.monogon.dev/metropolis/node/core/roleserve"
Serge Bazanski31370b02021-01-07 16:31:14 +010034 "source.monogon.dev/metropolis/pkg/logtree"
Serge Bazanskif9edf522021-06-17 15:57:13 +020035 apb "source.monogon.dev/metropolis/proto/api"
Serge Bazanskib0272182020-11-02 18:39:44 +010036)
37
38const (
39 logFilterMax = 1000
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020040)
41
Serge Bazanski662b5b32020-12-21 13:49:00 +010042// debugService implements the Metropolis node debug API.
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020043type debugService struct {
Serge Bazanski158e9a42021-08-17 17:04:54 +020044 roleserve *roleserve.Service
45 logtree *logtree.LogTree
Lorenz Brun9d6c4c72021-07-20 21:16:27 +020046 ephemeralVolume *localstorage.EphemeralContainerdDirectory
47
Serge Bazanski216fe7b2021-05-21 18:36:16 +020048 // traceLock provides exclusive access to the Linux tracing infrastructure
49 // (ftrace)
50 // This is a channel because Go's mutexes can't be cancelled or be acquired
51 // in a non-blocking way.
Lorenz Brun09c275b2021-03-30 12:47:09 +020052 traceLock chan struct{}
Serge Bazanskib0272182020-11-02 18:39:44 +010053}
54
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020055func (s *debugService) GetDebugKubeconfig(ctx context.Context, req *apb.GetDebugKubeconfigRequest) (*apb.GetDebugKubeconfigResponse, error) {
Serge Bazanski6dff6d62022-01-28 18:15:14 +010056 w := s.roleserve.KubernetesStatus.Watch()
Serge Bazanskif9edf522021-06-17 15:57:13 +020057 defer w.Close()
58 for {
59 v, err := w.Get(ctx)
60 if err != nil {
Serge Bazanski6dff6d62022-01-28 18:15:14 +010061 return nil, status.Errorf(codes.Unavailable, "could not get kubernetes status: %v", err)
Serge Bazanskif9edf522021-06-17 15:57:13 +020062 }
Serge Bazanski6dff6d62022-01-28 18:15:14 +010063 if v.Svc == nil {
Serge Bazanskif9edf522021-06-17 15:57:13 +020064 continue
65 }
Serge Bazanski6dff6d62022-01-28 18:15:14 +010066 return v.Svc.GetDebugKubeconfig(ctx, req)
Serge Bazanskif9edf522021-06-17 15:57:13 +020067 }
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020068}
69
Serge Bazanskib0272182020-11-02 18:39:44 +010070func (s *debugService) GetLogs(req *apb.GetLogsRequest, srv apb.NodeDebugService_GetLogsServer) error {
71 if len(req.Filters) > logFilterMax {
72 return status.Errorf(codes.InvalidArgument, "requested %d filters, maximum permitted is %d", len(req.Filters), logFilterMax)
73 }
74 dn := logtree.DN(req.Dn)
75 _, err := dn.Path()
76 switch err {
77 case nil:
78 case logtree.ErrInvalidDN:
79 return status.Errorf(codes.InvalidArgument, "invalid DN")
80 default:
81 return status.Errorf(codes.Unavailable, "could not parse DN: %v", err)
82 }
83
84 var options []logtree.LogReadOption
85
86 // Turn backlog mode into logtree option(s).
87 switch req.BacklogMode {
88 case apb.GetLogsRequest_BACKLOG_DISABLE:
89 case apb.GetLogsRequest_BACKLOG_ALL:
90 options = append(options, logtree.WithBacklog(logtree.BacklogAllAvailable))
91 case apb.GetLogsRequest_BACKLOG_COUNT:
92 count := int(req.BacklogCount)
93 if count <= 0 {
94 return status.Errorf(codes.InvalidArgument, "backlog_count must be > 0 if backlog_mode is BACKLOG_COUNT")
95 }
96 options = append(options, logtree.WithBacklog(count))
97 default:
98 return status.Errorf(codes.InvalidArgument, "unknown backlog_mode %d", req.BacklogMode)
99 }
100
101 // Turn stream mode into logtree option(s).
102 streamEnable := false
103 switch req.StreamMode {
104 case apb.GetLogsRequest_STREAM_DISABLE:
105 case apb.GetLogsRequest_STREAM_UNBUFFERED:
106 streamEnable = true
107 options = append(options, logtree.WithStream())
108 }
109
110 // Parse proto filters into logtree options.
111 for i, filter := range req.Filters {
112 switch inner := filter.Filter.(type) {
113 case *apb.LogFilter_WithChildren_:
114 options = append(options, logtree.WithChildren())
115 case *apb.LogFilter_OnlyRaw_:
116 options = append(options, logtree.OnlyRaw())
117 case *apb.LogFilter_OnlyLeveled_:
118 options = append(options, logtree.OnlyLeveled())
119 case *apb.LogFilter_LeveledWithMinimumSeverity_:
120 severity, err := logtree.SeverityFromProto(inner.LeveledWithMinimumSeverity.Minimum)
121 if err != nil {
122 return status.Errorf(codes.InvalidArgument, "filter %d has invalid severity: %v", i, err)
123 }
124 options = append(options, logtree.LeveledWithMinimumSeverity(severity))
125 }
126 }
127
128 reader, err := s.logtree.Read(logtree.DN(req.Dn), options...)
129 switch err {
130 case nil:
131 case logtree.ErrRawAndLeveled:
132 return status.Errorf(codes.InvalidArgument, "requested only raw and only leveled logs simultaneously")
133 default:
134 return status.Errorf(codes.Unavailable, "could not retrieve logs: %v", err)
135 }
136 defer reader.Close()
137
138 // Default protobuf message size limit is 64MB. We want to limit ourselves
139 // to 10MB.
140 // Currently each raw log line can be at most 1024 unicode codepoints (or
141 // 4096 bytes). To cover extra metadata and proto overhead, let's round
142 // this up to 4500 bytes. This in turn means we can store a maximum of
143 // (10e6/4500) == 2222 entries.
144 // Currently each leveled log line can also be at most 1024 unicode
145 // codepoints (or 4096 bytes). To cover extra metadata and proto overhead
146 // let's round this up to 2000 bytes. This in turn means we can store a
147 // maximum of (10e6/5000) == 2000 entries.
148 // The lowever of these numbers, ie the worst case scenario, is 2000
149 // maximum entries.
150 maxChunkSize := 2000
151
152 // Serve all backlog entries in chunks.
153 chunk := make([]*apb.LogEntry, 0, maxChunkSize)
154 for _, entry := range reader.Backlog {
155 p := entry.Proto()
156 if p == nil {
157 // TODO(q3k): log this once we have logtree/gRPC compatibility.
158 continue
159 }
160 chunk = append(chunk, p)
161
162 if len(chunk) >= maxChunkSize {
163 err := srv.Send(&apb.GetLogsResponse{
164 BacklogEntries: chunk,
165 })
166 if err != nil {
167 return err
168 }
169 chunk = make([]*apb.LogEntry, 0, maxChunkSize)
170 }
171 }
172
173 // Send last chunk of backlog, if present..
174 if len(chunk) > 0 {
175 err := srv.Send(&apb.GetLogsResponse{
176 BacklogEntries: chunk,
177 })
178 if err != nil {
179 return err
180 }
181 chunk = make([]*apb.LogEntry, 0, maxChunkSize)
182 }
183
184 // Start serving streaming data, if streaming has been requested.
185 if !streamEnable {
186 return nil
187 }
188
189 for {
190 entry, ok := <-reader.Stream
191 if !ok {
192 // Streaming has been ended by logtree - tell the client and return.
193 return status.Error(codes.Unavailable, "log streaming aborted by system")
194 }
195 p := entry.Proto()
196 if p == nil {
197 // TODO(q3k): log this once we have logtree/gRPC compatibility.
198 continue
199 }
200 err := srv.Send(&apb.GetLogsResponse{
201 StreamEntries: []*apb.LogEntry{p},
202 })
203 if err != nil {
204 return err
205 }
206 }
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200207}
Lorenz Brun09c275b2021-03-30 12:47:09 +0200208
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200209// Validate property names as they are used in path construction and we really
210// don't want a path traversal vulnerability
Lorenz Brun09c275b2021-03-30 12:47:09 +0200211var safeTracingPropertyNamesRe = regexp.MustCompile("^[a-z0-9_]+$")
212
213func writeTracingProperty(name string, value string) error {
214 if !safeTracingPropertyNamesRe.MatchString(name) {
215 return fmt.Errorf("disallowed tracing property name received: \"%v\"", name)
216 }
Lorenz Brun764a2de2021-11-22 16:26:36 +0100217 return os.WriteFile("/sys/kernel/tracing/"+name, []byte(value+"\n"), 0)
Lorenz Brun09c275b2021-03-30 12:47:09 +0200218}
219
220func (s *debugService) Trace(req *apb.TraceRequest, srv apb.NodeDebugService_TraceServer) error {
221 // Don't allow more than one trace as the kernel doesn't support this.
222 select {
223 case s.traceLock <- struct{}{}:
224 defer func() {
225 <-s.traceLock
226 }()
227 default:
228 return status.Error(codes.FailedPrecondition, "a trace is already in progress")
229 }
230
231 if len(req.FunctionFilter) == 0 {
232 req.FunctionFilter = []string{"*"} // For reset purposes
233 }
234 if len(req.GraphFunctionFilter) == 0 {
235 req.GraphFunctionFilter = []string{"*"} // For reset purposes
236 }
237
238 defer writeTracingProperty("current_tracer", "nop")
239 if err := writeTracingProperty("current_tracer", req.Tracer); err != nil {
240 return status.Errorf(codes.InvalidArgument, "requested tracer not available: %v", err)
241 }
242
243 if err := writeTracingProperty("set_ftrace_filter", strings.Join(req.FunctionFilter, " ")); err != nil {
244 return status.Errorf(codes.InvalidArgument, "setting ftrace filter failed: %v", err)
245 }
246 if err := writeTracingProperty("set_graph_function", strings.Join(req.GraphFunctionFilter, " ")); err != nil {
247 return status.Errorf(codes.InvalidArgument, "setting graph filter failed: %v", err)
248 }
249 tracePipe, err := os.Open("/sys/kernel/tracing/trace_pipe")
250 if err != nil {
251 return status.Errorf(codes.Unavailable, "cannot open trace output pipe: %v", err)
252 }
253 defer tracePipe.Close()
254
255 defer writeTracingProperty("tracing_on", "0")
256 if err := writeTracingProperty("tracing_on", "1"); err != nil {
257 return status.Errorf(codes.InvalidArgument, "requested tracer not available: %v", err)
258 }
259
260 go func() {
261 <-srv.Context().Done()
262 tracePipe.Close()
263 }()
264
265 eventScanner := bufio.NewScanner(tracePipe)
266 for eventScanner.Scan() {
267 if err := eventScanner.Err(); err != nil {
268 return status.Errorf(codes.Unavailable, "event pipe read error: %v", err)
269 }
270 err := srv.Send(&apb.TraceEvent{
271 RawLine: eventScanner.Text(),
272 })
273 if err != nil {
274 return err
275 }
276 }
277 return nil
278}
Lorenz Brun9d6c4c72021-07-20 21:16:27 +0200279
280// imageReader is an adapter converting a gRPC stream into an io.Reader
281type imageReader struct {
282 srv apb.NodeDebugService_LoadImageServer
283 restOfPart []byte
284}
285
286func (i *imageReader) Read(p []byte) (n int, err error) {
287 n1 := copy(p, i.restOfPart)
288 if len(p) > len(i.restOfPart) {
289 part, err := i.srv.Recv()
290 if err != nil {
291 return n1, err
292 }
293 n2 := copy(p[n1:], part.DataPart)
294 i.restOfPart = part.DataPart[n2:]
295 return n1 + n2, nil
296 } else {
297 i.restOfPart = i.restOfPart[n1:]
298 return n1, nil
299 }
300}
301
302// LoadImage loads an OCI image into the image cache of this node
303func (s *debugService) LoadImage(srv apb.NodeDebugService_LoadImageServer) error {
304 client, err := ctr.New(s.ephemeralVolume.ClientSocket.FullPath())
305 if err != nil {
306 return status.Errorf(codes.Unavailable, "failed to connect to containerd: %v", err)
307 }
308 ctxWithNS := namespaces.WithNamespace(srv.Context(), "k8s.io")
309 reader := &imageReader{srv: srv}
310 _, err = client.Import(ctxWithNS, reader)
311 if err != nil {
312 return status.Errorf(codes.Unknown, "failed to import image: %v", err)
313 }
314 return srv.SendAndClose(&apb.LoadImageResponse{})
315}