cloud/bmaas/server: init

This adds the BMaaS server alongside its first functionality: serving an
Agent heartbeat API.

This allows (untrusted) Agents to communicate with the rest of the
system by submitting heartbeats which may include a hardware report.

The BMaaS server will likely grow to implement further functionality as
described in its README.

Change-Id: I1ede02121b3700079cbb11295525f4c167ee1e7d
Reviewed-on: https://review.monogon.dev/c/monogon/+/988
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
Tested-by: Jenkins CI
diff --git a/cloud/bmaas/bmdb/model/migrations/1667232160_agent_tags.up.sql b/cloud/bmaas/bmdb/model/migrations/1667232160_agent_tags.up.sql
index 14701a6..ccf1ab3 100644
--- a/cloud/bmaas/bmdb/model/migrations/1667232160_agent_tags.up.sql
+++ b/cloud/bmaas/bmdb/model/migrations/1667232160_agent_tags.up.sql
@@ -42,5 +42,16 @@
     CONSTRAINT "primary" PRIMARY KEY(machine_id)
 );
 
+-- tag HardwareReport {
+--     Raw []byte
+-- }
+-- Represents a hardware report submitted by an Agent running on a machine.
+-- Usually a report is submitted only once after an agent has been started.
+CREATE TABLE machine_hardware_report (
+    machine_id UUID NOT NULL REFERENCES machines(machine_id) ON DELETE RESTRICT,
+    hardware_report_raw BYTES NOT NULL,
+    CONSTRAINT "primary" PRIMARY KEY(machine_id)
+);
+
 -- Used by the Shepherd when performing direct actions against a machine.
 ALTER TYPE process ADD VALUE IF NOT EXISTS 'ShepherdInstall';
\ No newline at end of file
diff --git a/cloud/bmaas/bmdb/model/queries.sql b/cloud/bmaas/bmdb/model/queries.sql
index 4d15a79..4e32cff 100644
--- a/cloud/bmaas/bmdb/model/queries.sql
+++ b/cloud/bmaas/bmdb/model/queries.sql
@@ -70,6 +70,15 @@
     agent_heartbeat_at = $2
 ;
 
+-- name: MachineSetHardwareReport :exec
+INSERT INTO machine_hardware_report (
+    machine_id, hardware_report_raw
+) VALUES (
+    $1, $2
+) ON CONFLICT (machine_id) DO UPDATE SET
+    hardware_report_raw = $2
+;
+
 -- name: GetMachinesForAgentStart :many
 -- Get machines that need agent installed for the first time. Machine can be
 -- assumed to be 'new', with no previous attempts or failures.
@@ -116,3 +125,12 @@
   )
   AND work.machine_id IS NULL
 LIMIT $1;
+
+-- name: AuthenticateAgentConnection :many
+SELECT
+    machine_agent_started.*
+FROM machines
+INNER JOIN machine_agent_started ON machines.machine_id = machine_agent_started.machine_id
+WHERE
+    machines.machine_id = $1
+    AND machine_agent_started.agent_public_key = $2;
\ No newline at end of file
diff --git a/cloud/bmaas/server/BUILD.bazel b/cloud/bmaas/server/BUILD.bazel
new file mode 100644
index 0000000..ee491d1
--- /dev/null
+++ b/cloud/bmaas/server/BUILD.bazel
@@ -0,0 +1,44 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
+
+go_library(
+    name = "server",
+    srcs = [
+        "agent_callback_service.go",
+        "server.go",
+    ],
+    importpath = "source.monogon.dev/cloud/bmaas/server",
+    visibility = ["//visibility:public"],
+    deps = [
+        "//cloud/bmaas/bmdb",
+        "//cloud/bmaas/bmdb/model",
+        "//cloud/bmaas/server/api",
+        "//cloud/lib/component",
+        "//metropolis/node/core/rpc",
+        "@com_github_google_uuid//:uuid",
+        "@io_k8s_klog//:klog",
+        "@io_k8s_klog_v2//:klog",
+        "@org_golang_google_grpc//:go_default_library",
+        "@org_golang_google_grpc//codes",
+        "@org_golang_google_grpc//reflection",
+        "@org_golang_google_grpc//status",
+        "@org_golang_google_protobuf//proto",
+    ],
+)
+
+go_test(
+    name = "server_test",
+    srcs = ["agent_callback_service_test.go"],
+    data = [
+        "@cockroach",
+    ],
+    embed = [":server"],
+    deps = [
+        "//cloud/bmaas/bmdb",
+        "//cloud/bmaas/bmdb/model",
+        "//cloud/bmaas/server/api",
+        "//cloud/lib/component",
+        "//metropolis/node/core/rpc",
+        "@com_github_google_uuid//:uuid",
+        "@org_golang_google_grpc//:go_default_library",
+    ],
+)
diff --git a/cloud/bmaas/server/README.md b/cloud/bmaas/server/README.md
new file mode 100644
index 0000000..b84ae25
--- /dev/null
+++ b/cloud/bmaas/server/README.md
@@ -0,0 +1,27 @@
+BMaaS Server
+===
+
+Background
+---
+
+This server provides an interface to the BMaaS database/state over a gRPC API. Most components of the BMaaS system talk to the database directly whenever possible. Everything else communicates through this server.
+
+Currently this is:
+
+1. Agents running on machines, as they should only be allowed to access/update information about the machine they're running on, and they're generally considered untrusted since they run on end-user available machines.
+
+In the future this server will likely also take care of:
+
+1. A debug web API for developers/administrators to inspect database/BMDB state.
+2. Periodic batch jobs across the entire BMDB, like consistency checks.
+3. Exporting BMDB state into monitoring systems.
+4. Coordinating access to the BMDB systems if the current direct-access-to-database architecture stops scaling.
+
+Running
+---
+
+    bazel run //cloud/bmaas/server/cmd -- -srv_dev_certs -bmdb_eat_my_data
+
+Although that's not very useful in itself currently. Instead, most functionality is currently exercised through automated tests.
+
+TODO(q3k): document complete BMaaS dev deployment (multi-component, single BMDB).
\ No newline at end of file
diff --git a/cloud/bmaas/server/agent_callback_service.go b/cloud/bmaas/server/agent_callback_service.go
new file mode 100644
index 0000000..f058213
--- /dev/null
+++ b/cloud/bmaas/server/agent_callback_service.go
@@ -0,0 +1,106 @@
+package server
+
+import (
+	"context"
+	"crypto/ed25519"
+	"errors"
+	"fmt"
+	"time"
+
+	"github.com/google/uuid"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+	"google.golang.org/protobuf/proto"
+	"k8s.io/klog"
+
+	"source.monogon.dev/cloud/bmaas/bmdb/model"
+	apb "source.monogon.dev/cloud/bmaas/server/api"
+	"source.monogon.dev/metropolis/node/core/rpc"
+)
+
+type agentCallbackService struct {
+	s *Server
+}
+
+var (
+	errAgentUnauthenticated = errors.New("machine id or public key unknown")
+)
+
+func (a *agentCallbackService) Heartbeat(ctx context.Context, req *apb.AgentHeartbeatRequest) (*apb.AgentHeartbeatResponse, error) {
+	// Extract ED25519 self-signed certificate from client connection.
+	cert, err := rpc.GetPeerCertificate(ctx)
+	if err != nil {
+		return nil, err
+	}
+	pk := cert.PublicKey.(ed25519.PublicKey)
+	machineId, err := uuid.Parse(req.MachineId)
+	if err != nil {
+		return nil, status.Error(codes.InvalidArgument, "machine_id invalid")
+	}
+
+	// TODO(q3k): don't start a session for every RPC.
+	session, err := a.s.bmdb.StartSession(ctx)
+	if err != nil {
+		klog.Errorf("Could not start session: %v", err)
+		return nil, status.Error(codes.Unavailable, "could not start session")
+	}
+
+	// Verify that machine ID and connection public key match up to a machine in the
+	// BMDB. Prevent leaking information about a machine's existence to unauthorized
+	// agents.
+	err = session.Transact(ctx, func(q *model.Queries) error {
+		agents, err := q.AuthenticateAgentConnection(ctx, model.AuthenticateAgentConnectionParams{
+			MachineID:      machineId,
+			AgentPublicKey: pk,
+		})
+		if err != nil {
+			return fmt.Errorf("AuthenticateAgentConnection: %w", err)
+		}
+		if len(agents) < 1 {
+			return errAgentUnauthenticated
+		}
+		return nil
+	})
+	if err != nil {
+		if errors.Is(err, errAgentUnauthenticated) {
+			return nil, status.Error(codes.Unauthenticated, err.Error())
+		}
+		klog.Errorf("Could not authenticate agent: %v", err)
+		return nil, status.Error(codes.Unavailable, "could not authenticate agent")
+	}
+
+	// Request is now authenticated.
+
+	// Serialize hardware report if submitted alongside heartbeat.
+	var hwraw []byte
+	if req.HardwareReport != nil {
+		hwraw, err = proto.Marshal(req.HardwareReport)
+		if err != nil {
+			return nil, status.Errorf(codes.InvalidArgument, "could not serialize harcware report: %v", err)
+		}
+	}
+
+	// Upsert heartbeat time and hardware report.
+	err = session.Transact(ctx, func(q *model.Queries) error {
+		// Upsert hardware report if submitted.
+		if hwraw != nil {
+			err = q.MachineSetHardwareReport(ctx, model.MachineSetHardwareReportParams{
+				MachineID:         machineId,
+				HardwareReportRaw: hwraw,
+			})
+			if err != nil {
+				return fmt.Errorf("hardware report upsert: %w", err)
+			}
+		}
+		return q.MachineSetAgentHeartbeat(ctx, model.MachineSetAgentHeartbeatParams{
+			MachineID:        machineId,
+			AgentHeartbeatAt: time.Now(),
+		})
+	})
+	if err != nil {
+		klog.Errorf("Could not submit heartbeat: %v", err)
+		return nil, status.Error(codes.Unavailable, "could not submit heartbeat")
+	}
+
+	return &apb.AgentHeartbeatResponse{}, nil
+}
diff --git a/cloud/bmaas/server/agent_callback_service_test.go b/cloud/bmaas/server/agent_callback_service_test.go
new file mode 100644
index 0000000..bc3201a
--- /dev/null
+++ b/cloud/bmaas/server/agent_callback_service_test.go
@@ -0,0 +1,116 @@
+package server
+
+import (
+	"context"
+	"crypto/ed25519"
+	"crypto/rand"
+	"testing"
+	"time"
+
+	"github.com/google/uuid"
+	"google.golang.org/grpc"
+
+	"source.monogon.dev/cloud/bmaas/bmdb"
+	"source.monogon.dev/cloud/bmaas/bmdb/model"
+	apb "source.monogon.dev/cloud/bmaas/server/api"
+	"source.monogon.dev/cloud/lib/component"
+	"source.monogon.dev/metropolis/node/core/rpc"
+)
+
+func dut() *Server {
+	return &Server{
+		Config: Config{
+			Component: component.ComponentConfig{
+				GRPCListenAddress: ":0",
+				DevCerts:          true,
+				DevCertsPath:      "/tmp/foo",
+			},
+			BMDB: bmdb.BMDB{
+				Config: bmdb.Config{
+					Database: component.CockroachConfig{
+						InMemory: true,
+					},
+				},
+			},
+			PublicListenAddress: ":0",
+		},
+	}
+}
+
+// TestAgentCallbackService exercises the basic flow for submitting an agent
+// heartbeat and hardware report.
+func TestAgentCallbackService(t *testing.T) {
+	s := dut()
+	ctx, ctxC := context.WithCancel(context.Background())
+	defer ctxC()
+	s.Start(ctx)
+
+	pub, priv, err := ed25519.GenerateKey(rand.Reader)
+	if err != nil {
+		t.Fatalf("could not generate keypair: %v", err)
+	}
+
+	sess, err := s.bmdb.StartSession(ctx)
+	if err != nil {
+		t.Fatalf("could not start session")
+	}
+
+	heartbeat := func(mid uuid.UUID) error {
+		creds, err := rpc.NewEphemeralCredentials(priv, nil)
+		if err != nil {
+			t.Fatalf("could not generate ephemeral credentials: %v", err)
+		}
+		conn, err := grpc.Dial(s.ListenPublic, grpc.WithTransportCredentials(creds))
+		if err != nil {
+			t.Fatalf("Dial failed: %v", err)
+		}
+		defer conn.Close()
+
+		stub := apb.NewAgentCallbackClient(conn)
+		_, err = stub.Heartbeat(ctx, &apb.AgentHeartbeatRequest{
+			MachineId:      mid.String(),
+			HardwareReport: &apb.AgentHardwareReport{},
+		})
+		return err
+	}
+
+	// First, attempt to heartbeat for some totally made up machine ID. That should
+	// fail.
+	if err := heartbeat(uuid.New()); err == nil {
+		t.Errorf("heartbeat for made up UUID should've failed")
+	}
+
+	// Create an actual machine in the BMDB alongside the expected pubkey within an
+	// AgentStarted tag.
+	var machine model.Machine
+	err = sess.Transact(ctx, func(q *model.Queries) error {
+		machine, err = q.NewMachine(ctx)
+		if err != nil {
+			return err
+		}
+		err = q.MachineAddProvided(ctx, model.MachineAddProvidedParams{
+			MachineID:  machine.MachineID,
+			Provider:   model.ProviderEquinix,
+			ProviderID: "123",
+		})
+		if err != nil {
+			return err
+		}
+		return q.MachineSetAgentStarted(ctx, model.MachineSetAgentStartedParams{
+			MachineID:      machine.MachineID,
+			AgentStartedAt: time.Now(),
+			AgentPublicKey: pub,
+		})
+	})
+	if err != nil {
+		t.Fatalf("could not create machine: %v", err)
+	}
+
+	// Now heartbeat with correct machine ID and key. This should succeed.
+	if err := heartbeat(machine.MachineID); err != nil {
+		t.Errorf("heartbeat should've succeeded, got: %v", err)
+	}
+
+	// TODO(q3k): test hardware report being attached once we have some debug API
+	// for tags.
+}
diff --git a/cloud/bmaas/server/api/BUILD.bazel b/cloud/bmaas/server/api/BUILD.bazel
new file mode 100644
index 0000000..01f3e3a
--- /dev/null
+++ b/cloud/bmaas/server/api/BUILD.bazel
@@ -0,0 +1,24 @@
+load("@rules_proto//proto:defs.bzl", "proto_library")
+load("@io_bazel_rules_go//go:def.bzl", "go_library")
+load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library")
+
+proto_library(
+    name = "api_proto",
+    srcs = ["agent.proto"],
+    visibility = ["//visibility:public"],
+)
+
+go_proto_library(
+    name = "api_go_proto",
+    compilers = ["@io_bazel_rules_go//proto:go_grpc"],
+    importpath = "source.monogon.dev/cloud/bmaas/server/api",
+    proto = ":api_proto",
+    visibility = ["//visibility:public"],
+)
+
+go_library(
+    name = "api",
+    embed = [":api_go_proto"],
+    importpath = "source.monogon.dev/cloud/bmaas/server/api",
+    visibility = ["//visibility:public"],
+)
diff --git a/cloud/bmaas/server/api/agent.proto b/cloud/bmaas/server/api/agent.proto
new file mode 100644
index 0000000..c08c767
--- /dev/null
+++ b/cloud/bmaas/server/api/agent.proto
@@ -0,0 +1,36 @@
+syntax = "proto3";
+package cloud.bmaas.server.api;
+option go_package = "source.monogon.dev/cloud/bmaas/server/api";
+
+// AgentCallback runs on the BMDB Server and exposes a gRPC interface to agents
+// running on machines. These APIs are served over TLS using component-style
+// server certificates, but clients are authenticated using ephemeral
+// certificates proving ownership of an agent keypair.
+service AgentCallback {
+  // Heartbeat is called by agents repeatedly to upload a hardware report, signal
+  // liveness and retrieve actions to be prformed on a host.
+  //
+  // This isn't a streaming RPC as the current server implementation actually
+  // isn't reactive, so it would have to do its own inner polling to create
+  // a stream of updates. To keep things simple, we instead let the agent decide
+  // on the cadence of updates it wants to keep up with.
+  rpc Heartbeat(AgentHeartbeatRequest) returns (AgentHeartbeatResponse);
+}
+
+message AgentHardwareReport {
+  // TODO(lorenz): implement
+}
+
+message AgentHeartbeatRequest {
+  // MachineID that this agent represents. Technically not necessary since
+  // keypairs between agents should be unique, but this provides an extra layer
+  // of protection against programming bugs.
+  string machine_id = 1;
+  // Optional hardware report to be upserted for this machine. An agent should
+  // submit one at least once after it's started, as early as it can.
+  AgentHardwareReport hardware_report = 2;
+}
+
+message AgentHeartbeatResponse {
+  // Agent actions (like install, reboot, etc) go here.
+}
\ No newline at end of file
diff --git a/cloud/bmaas/server/cmd/BUILD.bazel b/cloud/bmaas/server/cmd/BUILD.bazel
new file mode 100644
index 0000000..dc43afc
--- /dev/null
+++ b/cloud/bmaas/server/cmd/BUILD.bazel
@@ -0,0 +1,15 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library")
+
+go_library(
+    name = "cmd_lib",
+    srcs = ["main.go"],
+    importpath = "source.monogon.dev/cloud/bmaas/server/cmd",
+    visibility = ["//visibility:private"],
+    deps = ["//cloud/bmaas/server"],
+)
+
+go_binary(
+    name = "cmd",
+    embed = [":cmd_lib"],
+    visibility = ["//visibility:public"],
+)
diff --git a/cloud/bmaas/server/cmd/main.go b/cloud/bmaas/server/cmd/main.go
new file mode 100644
index 0000000..59f6ca6
--- /dev/null
+++ b/cloud/bmaas/server/cmd/main.go
@@ -0,0 +1,21 @@
+package main
+
+import (
+	"context"
+	"flag"
+
+	"source.monogon.dev/cloud/bmaas/server"
+)
+
+func main() {
+	s := &server.Server{}
+	s.Config.RegisterFlags()
+	flag.Parse()
+
+	ctx, ctxC := context.WithCancel(context.Background())
+	// TODO: context cancel on interrupt.
+	_ = ctxC
+
+	s.Start(ctx)
+	select {}
+}
diff --git a/cloud/bmaas/server/server.go b/cloud/bmaas/server/server.go
new file mode 100644
index 0000000..97fb393
--- /dev/null
+++ b/cloud/bmaas/server/server.go
@@ -0,0 +1,111 @@
+package server
+
+import (
+	"context"
+	"flag"
+	"fmt"
+	"net"
+	"os"
+
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/reflection"
+	"k8s.io/klog/v2"
+
+	"source.monogon.dev/cloud/bmaas/bmdb"
+	apb "source.monogon.dev/cloud/bmaas/server/api"
+	"source.monogon.dev/cloud/lib/component"
+)
+
+type Config struct {
+	Component component.ComponentConfig
+	BMDB      bmdb.BMDB
+
+	// PublicListenAddress is the address at which the 'public' (agent-facing) gRPC
+	// server listener will run.
+	PublicListenAddress string
+}
+
+// TODO(q3k): factor this out to BMDB library?
+func runtimeInfo() string {
+	hostname, _ := os.Hostname()
+	if hostname == "" {
+		hostname = "UNKNOWN"
+	}
+	return fmt.Sprintf("host %s", hostname)
+}
+
+func (c *Config) RegisterFlags() {
+	c.Component.RegisterFlags("srv")
+	c.BMDB.ComponentName = "srv"
+	c.BMDB.RuntimeInfo = runtimeInfo()
+	c.BMDB.Database.RegisterFlags("bmdb")
+
+	flag.StringVar(&c.PublicListenAddress, "srv_public_grpc_listen_address", ":8080", "Address to listen at for public/user gRPC connections for bmdbsrv")
+}
+
+type Server struct {
+	Config Config
+
+	// ListenGRPC will contain the address at which the internal gRPC server is
+	// listening after .Start() has been called. This can differ from the configured
+	// value if the configuration requests any port (via :0).
+	ListenGRPC string
+	// ListenPublic will contain the address at which the 'public' (agent-facing)
+	// gRPC server is lsitening after .Start() has been called.
+	ListenPublic string
+
+	bmdb  *bmdb.Connection
+	acsvc *agentCallbackService
+}
+
+func (s *Server) startPublic(ctx context.Context) {
+	g := grpc.NewServer(s.Config.Component.GRPCServerOptionsPublic()...)
+	lis, err := net.Listen("tcp", s.Config.PublicListenAddress)
+	if err != nil {
+		klog.Exitf("Could not listen: %v", err)
+	}
+	s.ListenPublic = lis.Addr().String()
+	apb.RegisterAgentCallbackServer(g, s.acsvc)
+	reflection.Register(g)
+
+	klog.Infof("Public API listening on %s", s.ListenPublic)
+	go func() {
+		err := g.Serve(lis)
+		if err != ctx.Err() {
+			klog.Exitf("Public gRPC serve failed: %v", err)
+		}
+	}()
+}
+
+func (s *Server) startInternalGRPC(ctx context.Context) {
+	g := grpc.NewServer(s.Config.Component.GRPCServerOptions()...)
+	lis, err := net.Listen("tcp", s.Config.Component.GRPCListenAddress)
+	if err != nil {
+		klog.Exitf("Could not listen: %v", err)
+	}
+	s.ListenGRPC = lis.Addr().String()
+
+	reflection.Register(g)
+	klog.Infof("Internal gRPC listening on %s", s.ListenGRPC)
+	go func() {
+		err := g.Serve(lis)
+		if err != ctx.Err() {
+			klog.Exitf("Internal gRPC serve failed: %v", err)
+		}
+	}()
+}
+
+// Start the BMaaS Server in background goroutines. This should only be called
+// once. The process will exit with debug logs if starting the server failed.
+func (s *Server) Start(ctx context.Context) {
+	conn, err := s.Config.BMDB.Open(true)
+	if err != nil {
+		klog.Exitf("Failed to connect to BMDB: %v", err)
+	}
+	s.acsvc = &agentCallbackService{
+		s: s,
+	}
+	s.bmdb = conn
+	s.startInternalGRPC(ctx)
+	s.startPublic(ctx)
+}
diff --git a/cloud/lib/component/component.go b/cloud/lib/component/component.go
index 831d099..dee4703 100644
--- a/cloud/lib/component/component.go
+++ b/cloud/lib/component/component.go
@@ -106,3 +106,37 @@
 		grpc.Creds(credentials.NewTLS(tlsConf)),
 	}
 }
+
+// GRPCServerOptionsPublic returns pre-built grpc.ServerOptions that this
+// component should use to serve public gRPC. Any client will be allowed to
+// connect, and it's up to the server implementation to authenticate incoming
+// requests.
+func (c *ComponentConfig) GRPCServerOptionsPublic() []grpc.ServerOption {
+	var certPath, keyPath string
+	if c.DevCerts {
+		// Use devcerts if requested.
+		certPath, keyPath, _ = c.GetDevCerts()
+	} else {
+		// Otherwise, use data from flags.
+		if c.GRPCKeyPath == "" {
+			klog.Exitf("-grpc_key_path must be set")
+		}
+		if c.GRPCCertificatePath == "" {
+			klog.Exitf("-grpc_certificate_path must be set")
+		}
+		keyPath = c.GRPCKeyPath
+		certPath = c.GRPCCertificatePath
+	}
+
+	pair, err := tls.LoadX509KeyPair(certPath, keyPath)
+	if err != nil {
+		klog.Exitf("Could not load GRPC TLS keypair: %v", err)
+	}
+	tlsConf := &tls.Config{
+		Certificates: []tls.Certificate{pair},
+		ClientAuth:   tls.RequestClientCert,
+	}
+	return []grpc.ServerOption{
+		grpc.Creds(credentials.NewTLS(tlsConf)),
+	}
+}