blob: d0c89a4a329bc807b35b13a5d14c1cfb9003cff3 [file] [log] [blame]
Serge Bazanski54e212a2023-06-14 13:45:11 +02001package roleserve
2
3import (
4 "context"
Tim Windelschmidtfd49f222023-07-20 14:27:50 +02005 "crypto/tls"
6 "crypto/x509"
7 "fmt"
Serge Bazanski54e212a2023-06-14 13:45:11 +02008
Tim Windelschmidtb551b652023-07-17 16:01:42 +02009 cpb "source.monogon.dev/metropolis/proto/common"
10
Serge Bazanski54e212a2023-06-14 13:45:11 +020011 "source.monogon.dev/metropolis/node/core/metrics"
Tim Windelschmidtfd49f222023-07-20 14:27:50 +020012 kpki "source.monogon.dev/metropolis/node/kubernetes/pki"
Serge Bazanski54e212a2023-06-14 13:45:11 +020013 "source.monogon.dev/metropolis/pkg/event/memory"
14 "source.monogon.dev/metropolis/pkg/supervisor"
Tim Windelschmidtb551b652023-07-17 16:01:42 +020015
16 ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
Serge Bazanski54e212a2023-06-14 13:45:11 +020017)
18
19// workerMetrics runs the Metrics Service, which runs local Prometheus collectors
20// (themselves usually instances of existing Prometheus Exporters running as
21// sub-processes), and a forwarding service that lets external users access them
22// over HTTPS using the Cluster CA.
23type workerMetrics struct {
24 curatorConnection *memory.Value[*curatorConnection]
Tim Windelschmidtb551b652023-07-17 16:01:42 +020025 localRoles *memory.Value[*cpb.NodeRoles]
Tim Windelschmidtfd49f222023-07-20 14:27:50 +020026 localControlplane *memory.Value[*localControlPlane]
Serge Bazanski54e212a2023-06-14 13:45:11 +020027}
28
29func (s *workerMetrics) run(ctx context.Context) error {
30 w := s.curatorConnection.Watch()
31 defer w.Close()
32
33 supervisor.Logger(ctx).Infof("Waiting for curator connection")
34 cc, err := w.Get(ctx)
35 if err != nil {
36 return err
37 }
38 supervisor.Logger(ctx).Infof("Got curator connection, starting...")
39
Tim Windelschmidtfd49f222023-07-20 14:27:50 +020040 lw := s.localControlplane.Watch()
41 defer lw.Close()
42 cp, err := lw.Get(ctx)
43 if err != nil {
44 return err
45 }
46
47 pki, err := kpki.FromLocalConsensus(ctx, cp.consensus)
48 if err != nil {
49 return err
50 }
51
52 // TODO(q3k): move this to IssueCertificates and replace with dedicated certificate
53 cert, key, err := pki.Certificate(ctx, kpki.Master)
54 if err != nil {
55 return fmt.Errorf("could not load certificate %q from PKI: %w", kpki.Master, err)
56 }
57 parsedKey, err := x509.ParsePKCS8PrivateKey(key)
58 if err != nil {
59 return fmt.Errorf("failed to parse key for cert %q: %w", kpki.Master, err)
60 }
61
62 caCert, _, err := pki.Certificate(ctx, kpki.IdCA)
63 if err != nil {
64 return fmt.Errorf("could not load certificate %q from PKI: %w", kpki.IdCA, err)
65 }
66 parsedCACert, err := x509.ParseCertificate(caCert)
67 if err != nil {
68 return fmt.Errorf("failed to parse cert %q: %w", kpki.IdCA, err)
69 }
70
71 rootCAs := x509.NewCertPool()
72 rootCAs.AddCert(parsedCACert)
73
74 kubeTLSConfig := &tls.Config{
75 RootCAs: rootCAs,
76 Certificates: []tls.Certificate{{
77 Certificate: [][]byte{cert},
78 PrivateKey: parsedKey,
79 }},
80 }
81
Serge Bazanski54e212a2023-06-14 13:45:11 +020082 svc := metrics.Service{
Tim Windelschmidtfd49f222023-07-20 14:27:50 +020083 Credentials: cc.credentials,
84 Curator: ipb.NewCuratorClient(cc.conn),
85 LocalRoles: s.localRoles,
86 KubeTLSConfig: kubeTLSConfig,
Serge Bazanski54e212a2023-06-14 13:45:11 +020087 }
88 return svc.Run(ctx)
89}