blob: 6cb9620ee955c687e5d360521f876f55d1b721f9 [file] [log] [blame]
Serge Bazanski1ebd1e12020-07-13 19:17:16 +02001// Copyright 2020 The Monogon Project Authors.
2//
3// SPDX-License-Identifier: Apache-2.0
4//
5// Licensed under the Apache License, Version 2.0 (the "License");
6// you may not use this file except in compliance with the License.
7// You may obtain a copy of the License at
8//
9// http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing, software
12// distributed under the License is distributed on an "AS IS" BASIS,
13// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14// See the License for the specific language governing permissions and
15// limitations under the License.
16
17package main
18
19import (
20 "context"
Serge Bazanskib0272182020-11-02 18:39:44 +010021 "crypto/x509"
22 "fmt"
23 "net"
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020024
25 "google.golang.org/grpc/codes"
26 "google.golang.org/grpc/status"
Serge Bazanskib0272182020-11-02 18:39:44 +010027
28 "git.monogon.dev/source/nexantic.git/core/internal/cluster"
29 "git.monogon.dev/source/nexantic.git/core/internal/common"
30 "git.monogon.dev/source/nexantic.git/core/internal/consensus/ca"
31 "git.monogon.dev/source/nexantic.git/core/internal/kubernetes"
32 "git.monogon.dev/source/nexantic.git/core/pkg/logtree"
33 apb "git.monogon.dev/source/nexantic.git/core/proto/api"
34)
35
36const (
37 logFilterMax = 1000
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020038)
39
40// debugService implements the Smalltown node debug API.
Serge Bazanski1ebd1e12020-07-13 19:17:16 +020041type debugService struct {
42 cluster *cluster.Manager
43 kubernetes *kubernetes.Service
Serge Bazanskib0272182020-11-02 18:39:44 +010044 logtree *logtree.LogTree
45}
46
47func (s *debugService) GetGoldenTicket(ctx context.Context, req *apb.GetGoldenTicketRequest) (*apb.GetGoldenTicketResponse, error) {
48 ip := net.ParseIP(req.ExternalIp)
49 if ip == nil {
50 return nil, status.Errorf(codes.InvalidArgument, "could not parse IP %q", req.ExternalIp)
51 }
52 this := s.cluster.Node()
53
54 certRaw, key, err := s.nodeCertificate()
55 if err != nil {
56 return nil, status.Errorf(codes.Unavailable, "failed to generate node certificate: %v", err)
57 }
58 cert, err := x509.ParseCertificate(certRaw)
59 if err != nil {
60 panic(err)
61 }
62 kv := s.cluster.ConsensusKVRoot()
63 ca, err := ca.Load(ctx, kv)
64 if err != nil {
65 return nil, status.Errorf(codes.Unavailable, "could not load CA: %v", err)
66 }
67 etcdCert, etcdKey, err := ca.Issue(ctx, kv, cert.Subject.CommonName, ip)
68 if err != nil {
69 return nil, status.Errorf(codes.Unavailable, "could not generate etcd peer certificate: %v", err)
70 }
71 etcdCRL, err := ca.GetCurrentCRL(ctx, kv)
72 if err != nil {
73 return nil, status.Errorf(codes.Unavailable, "could not get etcd CRL: %v", err)
74 }
75
76 // Add new etcd member to etcd cluster.
77 etcd := s.cluster.ConsensusCluster()
78 etcdAddr := fmt.Sprintf("https://%s:%d", ip.String(), common.ConsensusPort)
79 _, err = etcd.MemberAddAsLearner(ctx, []string{etcdAddr})
80 if err != nil {
81 return nil, status.Errorf(codes.Unavailable, "could not add as new etcd consensus member: %v", err)
82 }
83
84 return &apb.GetGoldenTicketResponse{
85 Ticket: &apb.GoldenTicket{
86 EtcdCaCert: ca.CACertRaw,
87 EtcdClientCert: etcdCert,
88 EtcdClientKey: etcdKey,
89 EtcdCrl: etcdCRL,
90 Peers: []*apb.GoldenTicket_EtcdPeer{
91 {Name: this.ID(), Address: this.Address().String()},
92 },
93 This: &apb.GoldenTicket_EtcdPeer{Name: cert.Subject.CommonName, Address: ip.String()},
94
95 NodeId: cert.Subject.CommonName,
96 NodeCert: certRaw,
97 NodeKey: key,
98 },
99 }, nil
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200100}
101
102func (s *debugService) GetDebugKubeconfig(ctx context.Context, req *apb.GetDebugKubeconfigRequest) (*apb.GetDebugKubeconfigResponse, error) {
103 return s.kubernetes.GetDebugKubeconfig(ctx, req)
104}
105
Serge Bazanskib0272182020-11-02 18:39:44 +0100106func (s *debugService) GetLogs(req *apb.GetLogsRequest, srv apb.NodeDebugService_GetLogsServer) error {
107 if len(req.Filters) > logFilterMax {
108 return status.Errorf(codes.InvalidArgument, "requested %d filters, maximum permitted is %d", len(req.Filters), logFilterMax)
109 }
110 dn := logtree.DN(req.Dn)
111 _, err := dn.Path()
112 switch err {
113 case nil:
114 case logtree.ErrInvalidDN:
115 return status.Errorf(codes.InvalidArgument, "invalid DN")
116 default:
117 return status.Errorf(codes.Unavailable, "could not parse DN: %v", err)
118 }
119
120 var options []logtree.LogReadOption
121
122 // Turn backlog mode into logtree option(s).
123 switch req.BacklogMode {
124 case apb.GetLogsRequest_BACKLOG_DISABLE:
125 case apb.GetLogsRequest_BACKLOG_ALL:
126 options = append(options, logtree.WithBacklog(logtree.BacklogAllAvailable))
127 case apb.GetLogsRequest_BACKLOG_COUNT:
128 count := int(req.BacklogCount)
129 if count <= 0 {
130 return status.Errorf(codes.InvalidArgument, "backlog_count must be > 0 if backlog_mode is BACKLOG_COUNT")
131 }
132 options = append(options, logtree.WithBacklog(count))
133 default:
134 return status.Errorf(codes.InvalidArgument, "unknown backlog_mode %d", req.BacklogMode)
135 }
136
137 // Turn stream mode into logtree option(s).
138 streamEnable := false
139 switch req.StreamMode {
140 case apb.GetLogsRequest_STREAM_DISABLE:
141 case apb.GetLogsRequest_STREAM_UNBUFFERED:
142 streamEnable = true
143 options = append(options, logtree.WithStream())
144 }
145
146 // Parse proto filters into logtree options.
147 for i, filter := range req.Filters {
148 switch inner := filter.Filter.(type) {
149 case *apb.LogFilter_WithChildren_:
150 options = append(options, logtree.WithChildren())
151 case *apb.LogFilter_OnlyRaw_:
152 options = append(options, logtree.OnlyRaw())
153 case *apb.LogFilter_OnlyLeveled_:
154 options = append(options, logtree.OnlyLeveled())
155 case *apb.LogFilter_LeveledWithMinimumSeverity_:
156 severity, err := logtree.SeverityFromProto(inner.LeveledWithMinimumSeverity.Minimum)
157 if err != nil {
158 return status.Errorf(codes.InvalidArgument, "filter %d has invalid severity: %v", i, err)
159 }
160 options = append(options, logtree.LeveledWithMinimumSeverity(severity))
161 }
162 }
163
164 reader, err := s.logtree.Read(logtree.DN(req.Dn), options...)
165 switch err {
166 case nil:
167 case logtree.ErrRawAndLeveled:
168 return status.Errorf(codes.InvalidArgument, "requested only raw and only leveled logs simultaneously")
169 default:
170 return status.Errorf(codes.Unavailable, "could not retrieve logs: %v", err)
171 }
172 defer reader.Close()
173
174 // Default protobuf message size limit is 64MB. We want to limit ourselves
175 // to 10MB.
176 // Currently each raw log line can be at most 1024 unicode codepoints (or
177 // 4096 bytes). To cover extra metadata and proto overhead, let's round
178 // this up to 4500 bytes. This in turn means we can store a maximum of
179 // (10e6/4500) == 2222 entries.
180 // Currently each leveled log line can also be at most 1024 unicode
181 // codepoints (or 4096 bytes). To cover extra metadata and proto overhead
182 // let's round this up to 2000 bytes. This in turn means we can store a
183 // maximum of (10e6/5000) == 2000 entries.
184 // The lowever of these numbers, ie the worst case scenario, is 2000
185 // maximum entries.
186 maxChunkSize := 2000
187
188 // Serve all backlog entries in chunks.
189 chunk := make([]*apb.LogEntry, 0, maxChunkSize)
190 for _, entry := range reader.Backlog {
191 p := entry.Proto()
192 if p == nil {
193 // TODO(q3k): log this once we have logtree/gRPC compatibility.
194 continue
195 }
196 chunk = append(chunk, p)
197
198 if len(chunk) >= maxChunkSize {
199 err := srv.Send(&apb.GetLogsResponse{
200 BacklogEntries: chunk,
201 })
202 if err != nil {
203 return err
204 }
205 chunk = make([]*apb.LogEntry, 0, maxChunkSize)
206 }
207 }
208
209 // Send last chunk of backlog, if present..
210 if len(chunk) > 0 {
211 err := srv.Send(&apb.GetLogsResponse{
212 BacklogEntries: chunk,
213 })
214 if err != nil {
215 return err
216 }
217 chunk = make([]*apb.LogEntry, 0, maxChunkSize)
218 }
219
220 // Start serving streaming data, if streaming has been requested.
221 if !streamEnable {
222 return nil
223 }
224
225 for {
226 entry, ok := <-reader.Stream
227 if !ok {
228 // Streaming has been ended by logtree - tell the client and return.
229 return status.Error(codes.Unavailable, "log streaming aborted by system")
230 }
231 p := entry.Proto()
232 if p == nil {
233 // TODO(q3k): log this once we have logtree/gRPC compatibility.
234 continue
235 }
236 err := srv.Send(&apb.GetLogsResponse{
237 StreamEntries: []*apb.LogEntry{p},
238 })
239 if err != nil {
240 return err
241 }
242 }
Serge Bazanski1ebd1e12020-07-13 19:17:16 +0200243}