blob: 123d1ab4ef68f0f39ab6411b82fa62fc25d32345 [file] [log] [blame]
Serge Bazanski1ebd1e12020-07-13 19:17:16 +02001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package main
18
19import (
Lorenz Brun09c275b2021-03-30 12:47:09 +020020 "bufio"
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020021 "context"
Lorenz Brun09c275b2021-03-30 12:47:09 +020022 "fmt"
23 "io/ioutil"
24 "os"
25 "regexp"
26 "strings"
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020027
Lorenz Brun9d6c4c72021-07-20 21:16:27 +020028 ctr "github.com/containerd/containerd"
29 "github.com/containerd/containerd/namespaces"
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020030 "google.golang.org/grpc/codes"
31 "google.golang.org/grpc/status"
Serge Bazanskib0272182020-11-02 18:39:44 +010032
Serge Bazanskif9edf522021-06-17 15:57:13 +020033 "source.monogon.dev/metropolis/node/core/roleserve"
Lorenz Brun9d6c4c72021-07-20 21:16:27 +020034 "source.monogon.dev/metropolis/node/core/localstorage"
Serge Bazanski31370b02021-01-07 16:31:14 +010035 "source.monogon.dev/metropolis/pkg/logtree"
Serge Bazanskif9edf522021-06-17 15:57:13 +020036 apb "source.monogon.dev/metropolis/proto/api"
Serge Bazanskib0272182020-11-02 18:39:44 +010037)
38
39const (
40 logFilterMax = 1000
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020041)
42
Serge Bazanski662b5b32020-12-21 13:49:00 +010043// debugService implements the Metropolis node debug API.
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020044type debugService struct {
Serge Bazanskif9edf522021-06-17 15:57:13 +020045 roleserve *roleserve.Service
46 logtree *logtree.LogTree
Lorenz Brun9d6c4c72021-07-20 21:16:27 +020047 ephemeralVolume *localstorage.EphemeralContainerdDirectory
48
Serge Bazanski216fe7b2021-05-21 18:36:16 +020049 // traceLock provides exclusive access to the Linux tracing infrastructure
50 // (ftrace)
51 // This is a channel because Go's mutexes can't be cancelled or be acquired
52 // in a non-blocking way.
Lorenz Brun09c275b2021-03-30 12:47:09 +020053 traceLock chan struct{}
Serge Bazanskib0272182020-11-02 18:39:44 +010054}
55
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020056func (s *debugService) GetDebugKubeconfig(ctx context.Context, req *apb.GetDebugKubeconfigRequest) (*apb.GetDebugKubeconfigResponse, error) {
Serge Bazanskif9edf522021-06-17 15:57:13 +020057 w := s.roleserve.Watch()
58 defer w.Close()
59 for {
60 v, err := w.Get(ctx)
61 if err != nil {
62 return nil, status.Errorf(codes.Unavailable, "could not get roleserve status: %v", err)
63 }
64 if v.Kubernetes == nil {
65 continue
66 }
67 return v.Kubernetes.GetDebugKubeconfig(ctx, req)
68 }
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020069}
70
Serge Bazanskib0272182020-11-02 18:39:44 +010071func (s *debugService) GetLogs(req *apb.GetLogsRequest, srv apb.NodeDebugService_GetLogsServer) error {
72 if len(req.Filters) > logFilterMax {
73 return status.Errorf(codes.InvalidArgument, "requested %d filters, maximum permitted is %d", len(req.Filters), logFilterMax)
74 }
75 dn := logtree.DN(req.Dn)
76 _, err := dn.Path()
77 switch err {
78 case nil:
79 case logtree.ErrInvalidDN:
80 return status.Errorf(codes.InvalidArgument, "invalid DN")
81 default:
82 return status.Errorf(codes.Unavailable, "could not parse DN: %v", err)
83 }
84
85 var options []logtree.LogReadOption
86
87 // Turn backlog mode into logtree option(s).
88 switch req.BacklogMode {
89 case apb.GetLogsRequest_BACKLOG_DISABLE:
90 case apb.GetLogsRequest_BACKLOG_ALL:
91 options = append(options, logtree.WithBacklog(logtree.BacklogAllAvailable))
92 case apb.GetLogsRequest_BACKLOG_COUNT:
93 count := int(req.BacklogCount)
94 if count <= 0 {
95 return status.Errorf(codes.InvalidArgument, "backlog_count must be > 0 if backlog_mode is BACKLOG_COUNT")
96 }
97 options = append(options, logtree.WithBacklog(count))
98 default:
99 return status.Errorf(codes.InvalidArgument, "unknown backlog_mode %d", req.BacklogMode)
100 }
101
102 // Turn stream mode into logtree option(s).
103 streamEnable := false
104 switch req.StreamMode {
105 case apb.GetLogsRequest_STREAM_DISABLE:
106 case apb.GetLogsRequest_STREAM_UNBUFFERED:
107 streamEnable = true
108 options = append(options, logtree.WithStream())
109 }
110
111 // Parse proto filters into logtree options.
112 for i, filter := range req.Filters {
113 switch inner := filter.Filter.(type) {
114 case *apb.LogFilter_WithChildren_:
115 options = append(options, logtree.WithChildren())
116 case *apb.LogFilter_OnlyRaw_:
117 options = append(options, logtree.OnlyRaw())
118 case *apb.LogFilter_OnlyLeveled_:
119 options = append(options, logtree.OnlyLeveled())
120 case *apb.LogFilter_LeveledWithMinimumSeverity_:
121 severity, err := logtree.SeverityFromProto(inner.LeveledWithMinimumSeverity.Minimum)
122 if err != nil {
123 return status.Errorf(codes.InvalidArgument, "filter %d has invalid severity: %v", i, err)
124 }
125 options = append(options, logtree.LeveledWithMinimumSeverity(severity))
126 }
127 }
128
129 reader, err := s.logtree.Read(logtree.DN(req.Dn), options...)
130 switch err {
131 case nil:
132 case logtree.ErrRawAndLeveled:
133 return status.Errorf(codes.InvalidArgument, "requested only raw and only leveled logs simultaneously")
134 default:
135 return status.Errorf(codes.Unavailable, "could not retrieve logs: %v", err)
136 }
137 defer reader.Close()
138
139 // Default protobuf message size limit is 64MB. We want to limit ourselves
140 // to 10MB.
141 // Currently each raw log line can be at most 1024 unicode codepoints (or
142 // 4096 bytes). To cover extra metadata and proto overhead, let's round
143 // this up to 4500 bytes. This in turn means we can store a maximum of
144 // (10e6/4500) == 2222 entries.
145 // Currently each leveled log line can also be at most 1024 unicode
146 // codepoints (or 4096 bytes). To cover extra metadata and proto overhead
147 // let's round this up to 2000 bytes. This in turn means we can store a
148 // maximum of (10e6/5000) == 2000 entries.
149 // The lowever of these numbers, ie the worst case scenario, is 2000
150 // maximum entries.
151 maxChunkSize := 2000
152
153 // Serve all backlog entries in chunks.
154 chunk := make([]*apb.LogEntry, 0, maxChunkSize)
155 for _, entry := range reader.Backlog {
156 p := entry.Proto()
157 if p == nil {
158 // TODO(q3k): log this once we have logtree/gRPC compatibility.
159 continue
160 }
161 chunk = append(chunk, p)
162
163 if len(chunk) >= maxChunkSize {
164 err := srv.Send(&apb.GetLogsResponse{
165 BacklogEntries: chunk,
166 })
167 if err != nil {
168 return err
169 }
170 chunk = make([]*apb.LogEntry, 0, maxChunkSize)
171 }
172 }
173
174 // Send last chunk of backlog, if present..
175 if len(chunk) > 0 {
176 err := srv.Send(&apb.GetLogsResponse{
177 BacklogEntries: chunk,
178 })
179 if err != nil {
180 return err
181 }
182 chunk = make([]*apb.LogEntry, 0, maxChunkSize)
183 }
184
185 // Start serving streaming data, if streaming has been requested.
186 if !streamEnable {
187 return nil
188 }
189
190 for {
191 entry, ok := <-reader.Stream
192 if !ok {
193 // Streaming has been ended by logtree - tell the client and return.
194 return status.Error(codes.Unavailable, "log streaming aborted by system")
195 }
196 p := entry.Proto()
197 if p == nil {
198 // TODO(q3k): log this once we have logtree/gRPC compatibility.
199 continue
200 }
201 err := srv.Send(&apb.GetLogsResponse{
202 StreamEntries: []*apb.LogEntry{p},
203 })
204 if err != nil {
205 return err
206 }
207 }
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200208}
Lorenz Brun09c275b2021-03-30 12:47:09 +0200209
Serge Bazanski216fe7b2021-05-21 18:36:16 +0200210// Validate property names as they are used in path construction and we really
211// don't want a path traversal vulnerability
Lorenz Brun09c275b2021-03-30 12:47:09 +0200212var safeTracingPropertyNamesRe = regexp.MustCompile("^[a-z0-9_]+$")
213
214func writeTracingProperty(name string, value string) error {
215 if !safeTracingPropertyNamesRe.MatchString(name) {
216 return fmt.Errorf("disallowed tracing property name received: \"%v\"", name)
217 }
218 return ioutil.WriteFile("/sys/kernel/tracing/"+name, []byte(value+"\n"), 0)
219}
220
221func (s *debugService) Trace(req *apb.TraceRequest, srv apb.NodeDebugService_TraceServer) error {
222 // Don't allow more than one trace as the kernel doesn't support this.
223 select {
224 case s.traceLock <- struct{}{}:
225 defer func() {
226 <-s.traceLock
227 }()
228 default:
229 return status.Error(codes.FailedPrecondition, "a trace is already in progress")
230 }
231
232 if len(req.FunctionFilter) == 0 {
233 req.FunctionFilter = []string{"*"} // For reset purposes
234 }
235 if len(req.GraphFunctionFilter) == 0 {
236 req.GraphFunctionFilter = []string{"*"} // For reset purposes
237 }
238
239 defer writeTracingProperty("current_tracer", "nop")
240 if err := writeTracingProperty("current_tracer", req.Tracer); err != nil {
241 return status.Errorf(codes.InvalidArgument, "requested tracer not available: %v", err)
242 }
243
244 if err := writeTracingProperty("set_ftrace_filter", strings.Join(req.FunctionFilter, " ")); err != nil {
245 return status.Errorf(codes.InvalidArgument, "setting ftrace filter failed: %v", err)
246 }
247 if err := writeTracingProperty("set_graph_function", strings.Join(req.GraphFunctionFilter, " ")); err != nil {
248 return status.Errorf(codes.InvalidArgument, "setting graph filter failed: %v", err)
249 }
250 tracePipe, err := os.Open("/sys/kernel/tracing/trace_pipe")
251 if err != nil {
252 return status.Errorf(codes.Unavailable, "cannot open trace output pipe: %v", err)
253 }
254 defer tracePipe.Close()
255
256 defer writeTracingProperty("tracing_on", "0")
257 if err := writeTracingProperty("tracing_on", "1"); err != nil {
258 return status.Errorf(codes.InvalidArgument, "requested tracer not available: %v", err)
259 }
260
261 go func() {
262 <-srv.Context().Done()
263 tracePipe.Close()
264 }()
265
266 eventScanner := bufio.NewScanner(tracePipe)
267 for eventScanner.Scan() {
268 if err := eventScanner.Err(); err != nil {
269 return status.Errorf(codes.Unavailable, "event pipe read error: %v", err)
270 }
271 err := srv.Send(&apb.TraceEvent{
272 RawLine: eventScanner.Text(),
273 })
274 if err != nil {
275 return err
276 }
277 }
278 return nil
279}
Lorenz Brun9d6c4c72021-07-20 21:16:27 +0200280
281// imageReader is an adapter converting a gRPC stream into an io.Reader
282type imageReader struct {
283 srv apb.NodeDebugService_LoadImageServer
284 restOfPart []byte
285}
286
287func (i *imageReader) Read(p []byte) (n int, err error) {
288 n1 := copy(p, i.restOfPart)
289 if len(p) > len(i.restOfPart) {
290 part, err := i.srv.Recv()
291 if err != nil {
292 return n1, err
293 }
294 n2 := copy(p[n1:], part.DataPart)
295 i.restOfPart = part.DataPart[n2:]
296 return n1 + n2, nil
297 } else {
298 i.restOfPart = i.restOfPart[n1:]
299 return n1, nil
300 }
301}
302
303// LoadImage loads an OCI image into the image cache of this node
304func (s *debugService) LoadImage(srv apb.NodeDebugService_LoadImageServer) error {
305 client, err := ctr.New(s.ephemeralVolume.ClientSocket.FullPath())
306 if err != nil {
307 return status.Errorf(codes.Unavailable, "failed to connect to containerd: %v", err)
308 }
309 ctxWithNS := namespaces.WithNamespace(srv.Context(), "k8s.io")
310 reader := &imageReader{srv: srv}
311 _, err = client.Import(ctxWithNS, reader)
312 if err != nil {
313 return status.Errorf(codes.Unknown, "failed to import image: %v", err)
314 }
315 return srv.SendAndClose(&apb.LoadImageResponse{})
316}