blob: b6e0e71d95feb28bd9e46ff170e8543c49f8ec43 [file] [log] [blame]
Serge Bazanski4abeb132022-10-11 11:32:19 +02001package server
2
3import (
4 "context"
5 "crypto/ed25519"
Serge Bazanski6c9535b2023-01-03 13:17:42 +01006 "encoding/hex"
Serge Bazanski4abeb132022-10-11 11:32:19 +02007 "errors"
8 "fmt"
9 "time"
10
11 "github.com/google/uuid"
12 "google.golang.org/grpc/codes"
13 "google.golang.org/grpc/status"
14 "google.golang.org/protobuf/proto"
15 "k8s.io/klog"
16
17 "source.monogon.dev/cloud/bmaas/bmdb/model"
18 apb "source.monogon.dev/cloud/bmaas/server/api"
19 "source.monogon.dev/metropolis/node/core/rpc"
20)
21
22type agentCallbackService struct {
23 s *Server
24}
25
26var (
27 errAgentUnauthenticated = errors.New("machine id or public key unknown")
28)
29
30func (a *agentCallbackService) Heartbeat(ctx context.Context, req *apb.AgentHeartbeatRequest) (*apb.AgentHeartbeatResponse, error) {
31 // Extract ED25519 self-signed certificate from client connection.
32 cert, err := rpc.GetPeerCertificate(ctx)
33 if err != nil {
34 return nil, err
35 }
36 pk := cert.PublicKey.(ed25519.PublicKey)
37 machineId, err := uuid.Parse(req.MachineId)
38 if err != nil {
39 return nil, status.Error(codes.InvalidArgument, "machine_id invalid")
40 }
41
42 // TODO(q3k): don't start a session for every RPC.
43 session, err := a.s.bmdb.StartSession(ctx)
44 if err != nil {
45 klog.Errorf("Could not start session: %v", err)
46 return nil, status.Error(codes.Unavailable, "could not start session")
47 }
48
49 // Verify that machine ID and connection public key match up to a machine in the
50 // BMDB. Prevent leaking information about a machine's existence to unauthorized
51 // agents.
52 err = session.Transact(ctx, func(q *model.Queries) error {
53 agents, err := q.AuthenticateAgentConnection(ctx, model.AuthenticateAgentConnectionParams{
54 MachineID: machineId,
55 AgentPublicKey: pk,
56 })
57 if err != nil {
58 return fmt.Errorf("AuthenticateAgentConnection: %w", err)
59 }
60 if len(agents) < 1 {
Serge Bazanski6c9535b2023-01-03 13:17:42 +010061 klog.Errorf("No agent for %s/%s", machineId.String(), hex.EncodeToString(pk))
Serge Bazanski4abeb132022-10-11 11:32:19 +020062 return errAgentUnauthenticated
63 }
64 return nil
65 })
66 if err != nil {
67 if errors.Is(err, errAgentUnauthenticated) {
68 return nil, status.Error(codes.Unauthenticated, err.Error())
69 }
70 klog.Errorf("Could not authenticate agent: %v", err)
71 return nil, status.Error(codes.Unavailable, "could not authenticate agent")
72 }
73
74 // Request is now authenticated.
75
76 // Serialize hardware report if submitted alongside heartbeat.
77 var hwraw []byte
78 if req.HardwareReport != nil {
79 hwraw, err = proto.Marshal(req.HardwareReport)
80 if err != nil {
Serge Bazanski6c9535b2023-01-03 13:17:42 +010081 return nil, status.Errorf(codes.InvalidArgument, "could not serialize hardware report: %v", err)
Serge Bazanski4abeb132022-10-11 11:32:19 +020082 }
83 }
84
85 // Upsert heartbeat time and hardware report.
86 err = session.Transact(ctx, func(q *model.Queries) error {
87 // Upsert hardware report if submitted.
88 if hwraw != nil {
89 err = q.MachineSetHardwareReport(ctx, model.MachineSetHardwareReportParams{
90 MachineID: machineId,
91 HardwareReportRaw: hwraw,
92 })
93 if err != nil {
94 return fmt.Errorf("hardware report upsert: %w", err)
95 }
96 }
Serge Bazanski6c9535b2023-01-03 13:17:42 +010097 // Upsert os installation report if submitted.
98 if req.InstallationReport != nil {
99 err = q.MachineSetOSInstallationReport(ctx, model.MachineSetOSInstallationReportParams{
100 MachineID: machineId,
101 Generation: req.InstallationReport.Generation,
102 })
103 }
Serge Bazanski4abeb132022-10-11 11:32:19 +0200104 return q.MachineSetAgentHeartbeat(ctx, model.MachineSetAgentHeartbeatParams{
105 MachineID: machineId,
106 AgentHeartbeatAt: time.Now(),
107 })
108 })
109 if err != nil {
110 klog.Errorf("Could not submit heartbeat: %v", err)
111 return nil, status.Error(codes.Unavailable, "could not submit heartbeat")
112 }
Serge Bazanski6c9535b2023-01-03 13:17:42 +0100113 klog.Infof("Heartbeat from %s/%s", machineId.String(), hex.EncodeToString(pk))
Serge Bazanski4abeb132022-10-11 11:32:19 +0200114
Serge Bazanski6c9535b2023-01-03 13:17:42 +0100115 // Get installation request for machine if present.
116 var installRequest *apb.OSInstallationRequest
117 err = session.Transact(ctx, func(q *model.Queries) error {
118 reqs, err := q.GetExactMachineForOSInstallation(ctx, model.GetExactMachineForOSInstallationParams{
119 MachineID: machineId,
120 Limit: 1,
121 })
122 if err != nil {
123 return fmt.Errorf("GetExactMachineForOSInstallation: %w", err)
124 }
125 if len(reqs) > 0 {
126 raw := reqs[0].OsInstallationRequestRaw
127 var preq apb.OSInstallationRequest
128 if err := proto.Unmarshal(raw, &preq); err != nil {
129 return fmt.Errorf("could not decode stored OS installation request: %w", err)
130 }
131 installRequest = &preq
132 }
133 return nil
134 })
135 if err != nil {
136 // Do not fail entire request. Instead, just log an error.
137 // TODO(q3k): alert on this
138 klog.Errorf("Failure during OS installation request retrieval: %v", err)
139 }
140
141 return &apb.AgentHeartbeatResponse{
142 InstallationRequest: installRequest,
143 }, nil
Serge Bazanski4abeb132022-10-11 11:32:19 +0200144}