blob: 27ce9af1a1579883bdf00165b88212bbad80cbf6 [file] [log] [blame]
Serge Bazanski4abeb132022-10-11 11:32:19 +02001package server
2
3import (
4 "context"
5 "crypto/ed25519"
Serge Bazanski6c9535b2023-01-03 13:17:42 +01006 "encoding/hex"
Serge Bazanski4abeb132022-10-11 11:32:19 +02007 "errors"
8 "fmt"
9 "time"
10
11 "github.com/google/uuid"
12 "google.golang.org/grpc/codes"
13 "google.golang.org/grpc/status"
14 "google.golang.org/protobuf/proto"
Tim Windelschmidt4264b8c2023-06-12 23:54:58 +020015 "k8s.io/klog/v2"
Serge Bazanski4abeb132022-10-11 11:32:19 +020016
17 "source.monogon.dev/cloud/bmaas/bmdb/model"
18 apb "source.monogon.dev/cloud/bmaas/server/api"
19 "source.monogon.dev/metropolis/node/core/rpc"
20)
21
22type agentCallbackService struct {
23 s *Server
24}
25
26var (
27 errAgentUnauthenticated = errors.New("machine id or public key unknown")
28)
29
30func (a *agentCallbackService) Heartbeat(ctx context.Context, req *apb.AgentHeartbeatRequest) (*apb.AgentHeartbeatResponse, error) {
31 // Extract ED25519 self-signed certificate from client connection.
32 cert, err := rpc.GetPeerCertificate(ctx)
33 if err != nil {
34 return nil, err
35 }
36 pk := cert.PublicKey.(ed25519.PublicKey)
37 machineId, err := uuid.Parse(req.MachineId)
38 if err != nil {
39 return nil, status.Error(codes.InvalidArgument, "machine_id invalid")
40 }
41
Serge Bazanski42f13462023-04-19 15:00:06 +020042 session, err := a.s.session(ctx)
Serge Bazanski4abeb132022-10-11 11:32:19 +020043 if err != nil {
44 klog.Errorf("Could not start session: %v", err)
45 return nil, status.Error(codes.Unavailable, "could not start session")
46 }
47
48 // Verify that machine ID and connection public key match up to a machine in the
49 // BMDB. Prevent leaking information about a machine's existence to unauthorized
50 // agents.
51 err = session.Transact(ctx, func(q *model.Queries) error {
52 agents, err := q.AuthenticateAgentConnection(ctx, model.AuthenticateAgentConnectionParams{
53 MachineID: machineId,
54 AgentPublicKey: pk,
55 })
56 if err != nil {
57 return fmt.Errorf("AuthenticateAgentConnection: %w", err)
58 }
59 if len(agents) < 1 {
Serge Bazanski6c9535b2023-01-03 13:17:42 +010060 klog.Errorf("No agent for %s/%s", machineId.String(), hex.EncodeToString(pk))
Serge Bazanski4abeb132022-10-11 11:32:19 +020061 return errAgentUnauthenticated
62 }
63 return nil
64 })
65 if err != nil {
66 if errors.Is(err, errAgentUnauthenticated) {
67 return nil, status.Error(codes.Unauthenticated, err.Error())
68 }
69 klog.Errorf("Could not authenticate agent: %v", err)
70 return nil, status.Error(codes.Unavailable, "could not authenticate agent")
71 }
72
73 // Request is now authenticated.
74
75 // Serialize hardware report if submitted alongside heartbeat.
76 var hwraw []byte
77 if req.HardwareReport != nil {
78 hwraw, err = proto.Marshal(req.HardwareReport)
79 if err != nil {
Serge Bazanski6c9535b2023-01-03 13:17:42 +010080 return nil, status.Errorf(codes.InvalidArgument, "could not serialize hardware report: %v", err)
Serge Bazanski4abeb132022-10-11 11:32:19 +020081 }
82 }
83
84 // Upsert heartbeat time and hardware report.
85 err = session.Transact(ctx, func(q *model.Queries) error {
86 // Upsert hardware report if submitted.
87 if hwraw != nil {
88 err = q.MachineSetHardwareReport(ctx, model.MachineSetHardwareReportParams{
89 MachineID: machineId,
90 HardwareReportRaw: hwraw,
91 })
92 if err != nil {
93 return fmt.Errorf("hardware report upsert: %w", err)
94 }
95 }
Serge Bazanski6c9535b2023-01-03 13:17:42 +010096 // Upsert os installation report if submitted.
97 if req.InstallationReport != nil {
98 err = q.MachineSetOSInstallationReport(ctx, model.MachineSetOSInstallationReportParams{
99 MachineID: machineId,
100 Generation: req.InstallationReport.Generation,
101 })
102 }
Serge Bazanski4abeb132022-10-11 11:32:19 +0200103 return q.MachineSetAgentHeartbeat(ctx, model.MachineSetAgentHeartbeatParams{
104 MachineID: machineId,
105 AgentHeartbeatAt: time.Now(),
106 })
107 })
108 if err != nil {
109 klog.Errorf("Could not submit heartbeat: %v", err)
110 return nil, status.Error(codes.Unavailable, "could not submit heartbeat")
111 }
Serge Bazanski6c9535b2023-01-03 13:17:42 +0100112 klog.Infof("Heartbeat from %s/%s", machineId.String(), hex.EncodeToString(pk))
Serge Bazanski4abeb132022-10-11 11:32:19 +0200113
Serge Bazanski6c9535b2023-01-03 13:17:42 +0100114 // Get installation request for machine if present.
115 var installRequest *apb.OSInstallationRequest
116 err = session.Transact(ctx, func(q *model.Queries) error {
117 reqs, err := q.GetExactMachineForOSInstallation(ctx, model.GetExactMachineForOSInstallationParams{
118 MachineID: machineId,
119 Limit: 1,
120 })
121 if err != nil {
122 return fmt.Errorf("GetExactMachineForOSInstallation: %w", err)
123 }
124 if len(reqs) > 0 {
125 raw := reqs[0].OsInstallationRequestRaw
126 var preq apb.OSInstallationRequest
127 if err := proto.Unmarshal(raw, &preq); err != nil {
128 return fmt.Errorf("could not decode stored OS installation request: %w", err)
129 }
130 installRequest = &preq
131 }
132 return nil
133 })
134 if err != nil {
135 // Do not fail entire request. Instead, just log an error.
136 // TODO(q3k): alert on this
137 klog.Errorf("Failure during OS installation request retrieval: %v", err)
138 }
139
140 return &apb.AgentHeartbeatResponse{
141 InstallationRequest: installRequest,
142 }, nil
Serge Bazanski4abeb132022-10-11 11:32:19 +0200143}