blob: f253d0e8dcf204f673c93cf22226df9558678624 [file] [log] [blame]
Serge Bazanski1ebd1e12020-07-13 19:17:16 +02001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package main
18
19import (
Lorenz Brun09c275b2021-03-30 12:47:09 +020020 "bufio"
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020021 "context"
Lorenz Brun09c275b2021-03-30 12:47:09 +020022 "fmt"
Lorenz Brun09c275b2021-03-30 12:47:09 +020023 "os"
24 "regexp"
25 "strings"
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020026
Lorenz Brun9d6c4c72021-07-20 21:16:27 +020027 ctr "github.com/containerd/containerd"
28 "github.com/containerd/containerd/namespaces"
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020029 "google.golang.org/grpc/codes"
30 "google.golang.org/grpc/status"
Serge Bazanskib0272182020-11-02 18:39:44 +010031
Lorenz Brun9d6c4c72021-07-20 21:16:27 +020032 "source.monogon.dev/metropolis/node/core/localstorage"
Serge Bazanski158e9a42021-08-17 17:04:54 +020033 "source.monogon.dev/metropolis/node/core/roleserve"
Serge Bazanski31370b02021-01-07 16:31:14 +010034 "source.monogon.dev/metropolis/pkg/logtree"
Serge Bazanskif9edf522021-06-17 15:57:13 +020035 apb "source.monogon.dev/metropolis/proto/api"
Serge Bazanskib0272182020-11-02 18:39:44 +010036)
37
38const (
39 logFilterMax = 1000
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020040)
41
Serge Bazanski662b5b32020-12-21 13:49:00 +010042// debugService implements the Metropolis node debug API.
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020043type debugService struct {
Serge Bazanski158e9a42021-08-17 17:04:54 +020044 roleserve *roleserve.Service
45 logtree *logtree.LogTree
Lorenz Brun9d6c4c72021-07-20 21:16:27 +020046 ephemeralVolume *localstorage.EphemeralContainerdDirectory
47
Serge Bazanski216fe7b2021-05-21 18:36:16 +020048 // traceLock provides exclusive access to the Linux tracing infrastructure
49 // (ftrace)
50 // This is a channel because Go's mutexes can't be cancelled or be acquired
51 // in a non-blocking way.
Lorenz Brun09c275b2021-03-30 12:47:09 +020052 traceLock chan struct{}
Serge Bazanskib0272182020-11-02 18:39:44 +010053}
54
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020055func (s *debugService) GetDebugKubeconfig(ctx context.Context, req *apb.GetDebugKubeconfigRequest) (*apb.GetDebugKubeconfigResponse, error) {
Serge Bazanskie78a0892021-10-07 17:03:49 +020056 if s.roleserve == nil {
57 return nil, status.Errorf(codes.Unavailable, "node does not run roleserver/kubernetes")
58 }
Serge Bazanskif9edf522021-06-17 15:57:13 +020059 w := s.roleserve.Watch()
60 defer w.Close()
61 for {
62 v, err := w.Get(ctx)
63 if err != nil {
64 return nil, status.Errorf(codes.Unavailable, "could not get roleserve status: %v", err)
65 }
66 if v.Kubernetes == nil {
67 continue
68 }
69 return v.Kubernetes.GetDebugKubeconfig(ctx, req)
70 }
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020071}
72
Serge Bazanskib0272182020-11-02 18:39:44 +010073func (s *debugService) GetLogs(req *apb.GetLogsRequest, srv apb.NodeDebugService_GetLogsServer) error {
74 if len(req.Filters) > logFilterMax {
75 return status.Errorf(codes.InvalidArgument, "requested %d filters, maximum permitted is %d", len(req.Filters), logFilterMax)
76 }
77 dn := logtree.DN(req.Dn)
78 _, err := dn.Path()
79 switch err {
80 case nil:
81 case logtree.ErrInvalidDN:
82 return status.Errorf(codes.InvalidArgument, "invalid DN")
83 default:
84 return status.Errorf(codes.Unavailable, "could not parse DN: %v", err)
85 }
86
87 var options []logtree.LogReadOption
88
89 // Turn backlog mode into logtree option(s).
90 switch req.BacklogMode {
91 case apb.GetLogsRequest_BACKLOG_DISABLE:
92 case apb.GetLogsRequest_BACKLOG_ALL:
93 options = append(options, logtree.WithBacklog(logtree.BacklogAllAvailable))
94 case apb.GetLogsRequest_BACKLOG_COUNT:
95 count := int(req.BacklogCount)
96 if count <= 0 {
97 return status.Errorf(codes.InvalidArgument, "backlog_count must be > 0 if backlog_mode is BACKLOG_COUNT")
98 }
99 options = append(options, logtree.WithBacklog(count))
100 default:
101 return status.Errorf(codes.InvalidArgument, "unknown backlog_mode %d", req.BacklogMode)
102 }
103
104 // Turn stream mode into logtree option(s).
105 streamEnable := false
106 switch req.StreamMode {
107 case apb.GetLogsRequest_STREAM_DISABLE:
108 case apb.GetLogsRequest_STREAM_UNBUFFERED:
109 streamEnable = true
110 options = append(options, logtree.WithStream())
111 }
112
113 // Parse proto filters into logtree options.
114 for i, filter := range req.Filters {
115 switch inner := filter.Filter.(type) {
116 case *apb.LogFilter_WithChildren_:
117 options = append(options, logtree.WithChildren())
118 case *apb.LogFilter_OnlyRaw_:
119 options = append(options, logtree.OnlyRaw())
120 case *apb.LogFilter_OnlyLeveled_:
121 options = append(options, logtree.OnlyLeveled())
122 case *apb.LogFilter_LeveledWithMinimumSeverity_:
123 severity, err := logtree.SeverityFromProto(inner.LeveledWithMinimumSeverity.Minimum)
124 if err != nil {
125 return status.Errorf(codes.InvalidArgument, "filter %d has invalid severity: %v", i, err)
126 }
127 options = append(options, logtree.LeveledWithMinimumSeverity(severity))
128 }
129 }
130
131 reader, err := s.logtree.Read(logtree.DN(req.Dn), options...)
132 switch err {
133 case nil:
134 case logtree.ErrRawAndLeveled:
135 return status.Errorf(codes.InvalidArgument, "requested only raw and only leveled logs simultaneously")
136 default:
137 return status.Errorf(codes.Unavailable, "could not retrieve logs: %v", err)
138 }
139 defer reader.Close()
140
141 // Default protobuf message size limit is 64MB. We want to limit ourselves
142 // to 10MB.
143 // Currently each raw log line can be at most 1024 unicode codepoints (or
144 // 4096 bytes). To cover extra metadata and proto overhead, let's round
145 // this up to 4500 bytes. This in turn means we can store a maximum of
146 // (10e6/4500) == 2222 entries.
147 // Currently each leveled log line can also be at most 1024 unicode
148 // codepoints (or 4096 bytes). To cover extra metadata and proto overhead
149 // let's round this up to 2000 bytes. This in turn means we can store a
150 // maximum of (10e6/5000) == 2000 entries.
151 // The lowever of these numbers, ie the worst case scenario, is 2000
152 // maximum entries.
153 maxChunkSize := 2000
154
155 // Serve all backlog entries in chunks.
156 chunk := make([]*apb.LogEntry, 0, maxChunkSize)
157 for _, entry := range reader.Backlog {
158 p := entry.Proto()
159 if p == nil {
160 // TODO(q3k): log this once we have logtree/gRPC compatibility.
161 continue
162 }
163 chunk = append(chunk, p)
164
165 if len(chunk) >= maxChunkSize {
166 err := srv.Send(&apb.GetLogsResponse{
167 BacklogEntries: chunk,
168 })
169 if err != nil {
170 return err
171 }
172 chunk = make([]*apb.LogEntry, 0, maxChunkSize)
173 }
174 }
175
176 // Send last chunk of backlog, if present..
177 if len(chunk) > 0 {
178 err := srv.Send(&apb.GetLogsResponse{
179 BacklogEntries: chunk,
180 })
181 if err != nil {
182 return err
183 }
184 chunk = make([]*apb.LogEntry, 0, maxChunkSize)
185 }
186
187 // Start serving streaming data, if streaming has been requested.
188 if !streamEnable {
189 return nil
190 }
191
192 for {
193 entry, ok := <-reader.Stream
194 if !ok {
195 // Streaming has been ended by logtree - tell the client and return.
196 return status.Error(codes.Unavailable, "log streaming aborted by system")
197 }
198 p := entry.Proto()
199 if p == nil {
200 // TODO(q3k): log this once we have logtree/gRPC compatibility.
201 continue
202 }
203 err := srv.Send(&apb.GetLogsResponse{
204 StreamEntries: []*apb.LogEntry{p},
205 })
206 if err != nil {
207 return err
208 }
209 }
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200210}
Lorenz Brun09c275b2021-03-30 12:47:09 +0200211
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200212// Validate property names as they are used in path construction and we really
213// don't want a path traversal vulnerability
Lorenz Brun09c275b2021-03-30 12:47:09 +0200214var safeTracingPropertyNamesRe = regexp.MustCompile("^[a-z0-9_]+$")
215
216func writeTracingProperty(name string, value string) error {
217 if !safeTracingPropertyNamesRe.MatchString(name) {
218 return fmt.Errorf("disallowed tracing property name received: \"%v\"", name)
219 }
Lorenz Brun764a2de2021-11-22 16:26:36 +0100220 return os.WriteFile("/sys/kernel/tracing/"+name, []byte(value+"\n"), 0)
Lorenz Brun09c275b2021-03-30 12:47:09 +0200221}
222
223func (s *debugService) Trace(req *apb.TraceRequest, srv apb.NodeDebugService_TraceServer) error {
224 // Don't allow more than one trace as the kernel doesn't support this.
225 select {
226 case s.traceLock <- struct{}{}:
227 defer func() {
228 <-s.traceLock
229 }()
230 default:
231 return status.Error(codes.FailedPrecondition, "a trace is already in progress")
232 }
233
234 if len(req.FunctionFilter) == 0 {
235 req.FunctionFilter = []string{"*"} // For reset purposes
236 }
237 if len(req.GraphFunctionFilter) == 0 {
238 req.GraphFunctionFilter = []string{"*"} // For reset purposes
239 }
240
241 defer writeTracingProperty("current_tracer", "nop")
242 if err := writeTracingProperty("current_tracer", req.Tracer); err != nil {
243 return status.Errorf(codes.InvalidArgument, "requested tracer not available: %v", err)
244 }
245
246 if err := writeTracingProperty("set_ftrace_filter", strings.Join(req.FunctionFilter, " ")); err != nil {
247 return status.Errorf(codes.InvalidArgument, "setting ftrace filter failed: %v", err)
248 }
249 if err := writeTracingProperty("set_graph_function", strings.Join(req.GraphFunctionFilter, " ")); err != nil {
250 return status.Errorf(codes.InvalidArgument, "setting graph filter failed: %v", err)
251 }
252 tracePipe, err := os.Open("/sys/kernel/tracing/trace_pipe")
253 if err != nil {
254 return status.Errorf(codes.Unavailable, "cannot open trace output pipe: %v", err)
255 }
256 defer tracePipe.Close()
257
258 defer writeTracingProperty("tracing_on", "0")
259 if err := writeTracingProperty("tracing_on", "1"); err != nil {
260 return status.Errorf(codes.InvalidArgument, "requested tracer not available: %v", err)
261 }
262
263 go func() {
264 <-srv.Context().Done()
265 tracePipe.Close()
266 }()
267
268 eventScanner := bufio.NewScanner(tracePipe)
269 for eventScanner.Scan() {
270 if err := eventScanner.Err(); err != nil {
271 return status.Errorf(codes.Unavailable, "event pipe read error: %v", err)
272 }
273 err := srv.Send(&apb.TraceEvent{
274 RawLine: eventScanner.Text(),
275 })
276 if err != nil {
277 return err
278 }
279 }
280 return nil
281}
Lorenz Brun9d6c4c72021-07-20 21:16:27 +0200282
283// imageReader is an adapter converting a gRPC stream into an io.Reader
284type imageReader struct {
285 srv apb.NodeDebugService_LoadImageServer
286 restOfPart []byte
287}
288
289func (i *imageReader) Read(p []byte) (n int, err error) {
290 n1 := copy(p, i.restOfPart)
291 if len(p) > len(i.restOfPart) {
292 part, err := i.srv.Recv()
293 if err != nil {
294 return n1, err
295 }
296 n2 := copy(p[n1:], part.DataPart)
297 i.restOfPart = part.DataPart[n2:]
298 return n1 + n2, nil
299 } else {
300 i.restOfPart = i.restOfPart[n1:]
301 return n1, nil
302 }
303}
304
305// LoadImage loads an OCI image into the image cache of this node
306func (s *debugService) LoadImage(srv apb.NodeDebugService_LoadImageServer) error {
307 client, err := ctr.New(s.ephemeralVolume.ClientSocket.FullPath())
308 if err != nil {
309 return status.Errorf(codes.Unavailable, "failed to connect to containerd: %v", err)
310 }
311 ctxWithNS := namespaces.WithNamespace(srv.Context(), "k8s.io")
312 reader := &imageReader{srv: srv}
313 _, err = client.Import(ctxWithNS, reader)
314 if err != nil {
315 return status.Errorf(codes.Unknown, "failed to import image: %v", err)
316 }
317 return srv.SendAndClose(&apb.LoadImageResponse{})
318}