cloud/bmaas/server: init
This adds the BMaaS server alongside its first functionality: serving an
Agent heartbeat API.
This allows (untrusted) Agents to communicate with the rest of the
system by submitting heartbeats which may include a hardware report.
The BMaaS server will likely grow to implement further functionality as
described in its README.
Change-Id: I1ede02121b3700079cbb11295525f4c167ee1e7d
Reviewed-on: https://review.monogon.dev/c/monogon/+/988
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
Tested-by: Jenkins CI
diff --git a/cloud/bmaas/server/BUILD.bazel b/cloud/bmaas/server/BUILD.bazel
new file mode 100644
index 0000000..ee491d1
--- /dev/null
+++ b/cloud/bmaas/server/BUILD.bazel
@@ -0,0 +1,44 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")
+
+go_library(
+ name = "server",
+ srcs = [
+ "agent_callback_service.go",
+ "server.go",
+ ],
+ importpath = "source.monogon.dev/cloud/bmaas/server",
+ visibility = ["//visibility:public"],
+ deps = [
+ "//cloud/bmaas/bmdb",
+ "//cloud/bmaas/bmdb/model",
+ "//cloud/bmaas/server/api",
+ "//cloud/lib/component",
+ "//metropolis/node/core/rpc",
+ "@com_github_google_uuid//:uuid",
+ "@io_k8s_klog//:klog",
+ "@io_k8s_klog_v2//:klog",
+ "@org_golang_google_grpc//:go_default_library",
+ "@org_golang_google_grpc//codes",
+ "@org_golang_google_grpc//reflection",
+ "@org_golang_google_grpc//status",
+ "@org_golang_google_protobuf//proto",
+ ],
+)
+
+go_test(
+ name = "server_test",
+ srcs = ["agent_callback_service_test.go"],
+ data = [
+ "@cockroach",
+ ],
+ embed = [":server"],
+ deps = [
+ "//cloud/bmaas/bmdb",
+ "//cloud/bmaas/bmdb/model",
+ "//cloud/bmaas/server/api",
+ "//cloud/lib/component",
+ "//metropolis/node/core/rpc",
+ "@com_github_google_uuid//:uuid",
+ "@org_golang_google_grpc//:go_default_library",
+ ],
+)
diff --git a/cloud/bmaas/server/README.md b/cloud/bmaas/server/README.md
new file mode 100644
index 0000000..b84ae25
--- /dev/null
+++ b/cloud/bmaas/server/README.md
@@ -0,0 +1,27 @@
+BMaaS Server
+===
+
+Background
+---
+
+This server provides an interface to the BMaaS database/state over a gRPC API. Most components of the BMaaS system talk to the database directly whenever possible. Everything else communicates through this server.
+
+Currently this is:
+
+1. Agents running on machines, as they should only be allowed to access/update information about the machine they're running on, and they're generally considered untrusted since they run on end-user available machines.
+
+In the future this server will likely also take care of:
+
+1. A debug web API for developers/administrators to inspect database/BMDB state.
+2. Periodic batch jobs across the entire BMDB, like consistency checks.
+3. Exporting BMDB state into monitoring systems.
+4. Coordinating access to the BMDB systems if the current direct-access-to-database architecture stops scaling.
+
+Running
+---
+
+ bazel run //cloud/bmaas/server/cmd -- -srv_dev_certs -bmdb_eat_my_data
+
+Although that's not very useful in itself currently. Instead, most functionality is currently exercised through automated tests.
+
+TODO(q3k): document complete BMaaS dev deployment (multi-component, single BMDB).
\ No newline at end of file
diff --git a/cloud/bmaas/server/agent_callback_service.go b/cloud/bmaas/server/agent_callback_service.go
new file mode 100644
index 0000000..f058213
--- /dev/null
+++ b/cloud/bmaas/server/agent_callback_service.go
@@ -0,0 +1,106 @@
+package server
+
+import (
+ "context"
+ "crypto/ed25519"
+ "errors"
+ "fmt"
+ "time"
+
+ "github.com/google/uuid"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "google.golang.org/protobuf/proto"
+ "k8s.io/klog"
+
+ "source.monogon.dev/cloud/bmaas/bmdb/model"
+ apb "source.monogon.dev/cloud/bmaas/server/api"
+ "source.monogon.dev/metropolis/node/core/rpc"
+)
+
+type agentCallbackService struct {
+ s *Server
+}
+
+var (
+ errAgentUnauthenticated = errors.New("machine id or public key unknown")
+)
+
+func (a *agentCallbackService) Heartbeat(ctx context.Context, req *apb.AgentHeartbeatRequest) (*apb.AgentHeartbeatResponse, error) {
+ // Extract ED25519 self-signed certificate from client connection.
+ cert, err := rpc.GetPeerCertificate(ctx)
+ if err != nil {
+ return nil, err
+ }
+ pk := cert.PublicKey.(ed25519.PublicKey)
+ machineId, err := uuid.Parse(req.MachineId)
+ if err != nil {
+ return nil, status.Error(codes.InvalidArgument, "machine_id invalid")
+ }
+
+ // TODO(q3k): don't start a session for every RPC.
+ session, err := a.s.bmdb.StartSession(ctx)
+ if err != nil {
+ klog.Errorf("Could not start session: %v", err)
+ return nil, status.Error(codes.Unavailable, "could not start session")
+ }
+
+ // Verify that machine ID and connection public key match up to a machine in the
+ // BMDB. Prevent leaking information about a machine's existence to unauthorized
+ // agents.
+ err = session.Transact(ctx, func(q *model.Queries) error {
+ agents, err := q.AuthenticateAgentConnection(ctx, model.AuthenticateAgentConnectionParams{
+ MachineID: machineId,
+ AgentPublicKey: pk,
+ })
+ if err != nil {
+ return fmt.Errorf("AuthenticateAgentConnection: %w", err)
+ }
+ if len(agents) < 1 {
+ return errAgentUnauthenticated
+ }
+ return nil
+ })
+ if err != nil {
+ if errors.Is(err, errAgentUnauthenticated) {
+ return nil, status.Error(codes.Unauthenticated, err.Error())
+ }
+ klog.Errorf("Could not authenticate agent: %v", err)
+ return nil, status.Error(codes.Unavailable, "could not authenticate agent")
+ }
+
+ // Request is now authenticated.
+
+ // Serialize hardware report if submitted alongside heartbeat.
+ var hwraw []byte
+ if req.HardwareReport != nil {
+ hwraw, err = proto.Marshal(req.HardwareReport)
+ if err != nil {
+ return nil, status.Errorf(codes.InvalidArgument, "could not serialize harcware report: %v", err)
+ }
+ }
+
+ // Upsert heartbeat time and hardware report.
+ err = session.Transact(ctx, func(q *model.Queries) error {
+ // Upsert hardware report if submitted.
+ if hwraw != nil {
+ err = q.MachineSetHardwareReport(ctx, model.MachineSetHardwareReportParams{
+ MachineID: machineId,
+ HardwareReportRaw: hwraw,
+ })
+ if err != nil {
+ return fmt.Errorf("hardware report upsert: %w", err)
+ }
+ }
+ return q.MachineSetAgentHeartbeat(ctx, model.MachineSetAgentHeartbeatParams{
+ MachineID: machineId,
+ AgentHeartbeatAt: time.Now(),
+ })
+ })
+ if err != nil {
+ klog.Errorf("Could not submit heartbeat: %v", err)
+ return nil, status.Error(codes.Unavailable, "could not submit heartbeat")
+ }
+
+ return &apb.AgentHeartbeatResponse{}, nil
+}
diff --git a/cloud/bmaas/server/agent_callback_service_test.go b/cloud/bmaas/server/agent_callback_service_test.go
new file mode 100644
index 0000000..bc3201a
--- /dev/null
+++ b/cloud/bmaas/server/agent_callback_service_test.go
@@ -0,0 +1,116 @@
+package server
+
+import (
+ "context"
+ "crypto/ed25519"
+ "crypto/rand"
+ "testing"
+ "time"
+
+ "github.com/google/uuid"
+ "google.golang.org/grpc"
+
+ "source.monogon.dev/cloud/bmaas/bmdb"
+ "source.monogon.dev/cloud/bmaas/bmdb/model"
+ apb "source.monogon.dev/cloud/bmaas/server/api"
+ "source.monogon.dev/cloud/lib/component"
+ "source.monogon.dev/metropolis/node/core/rpc"
+)
+
+func dut() *Server {
+ return &Server{
+ Config: Config{
+ Component: component.ComponentConfig{
+ GRPCListenAddress: ":0",
+ DevCerts: true,
+ DevCertsPath: "/tmp/foo",
+ },
+ BMDB: bmdb.BMDB{
+ Config: bmdb.Config{
+ Database: component.CockroachConfig{
+ InMemory: true,
+ },
+ },
+ },
+ PublicListenAddress: ":0",
+ },
+ }
+}
+
+// TestAgentCallbackService exercises the basic flow for submitting an agent
+// heartbeat and hardware report.
+func TestAgentCallbackService(t *testing.T) {
+ s := dut()
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+ s.Start(ctx)
+
+ pub, priv, err := ed25519.GenerateKey(rand.Reader)
+ if err != nil {
+ t.Fatalf("could not generate keypair: %v", err)
+ }
+
+ sess, err := s.bmdb.StartSession(ctx)
+ if err != nil {
+ t.Fatalf("could not start session")
+ }
+
+ heartbeat := func(mid uuid.UUID) error {
+ creds, err := rpc.NewEphemeralCredentials(priv, nil)
+ if err != nil {
+ t.Fatalf("could not generate ephemeral credentials: %v", err)
+ }
+ conn, err := grpc.Dial(s.ListenPublic, grpc.WithTransportCredentials(creds))
+ if err != nil {
+ t.Fatalf("Dial failed: %v", err)
+ }
+ defer conn.Close()
+
+ stub := apb.NewAgentCallbackClient(conn)
+ _, err = stub.Heartbeat(ctx, &apb.AgentHeartbeatRequest{
+ MachineId: mid.String(),
+ HardwareReport: &apb.AgentHardwareReport{},
+ })
+ return err
+ }
+
+ // First, attempt to heartbeat for some totally made up machine ID. That should
+ // fail.
+ if err := heartbeat(uuid.New()); err == nil {
+ t.Errorf("heartbeat for made up UUID should've failed")
+ }
+
+ // Create an actual machine in the BMDB alongside the expected pubkey within an
+ // AgentStarted tag.
+ var machine model.Machine
+ err = sess.Transact(ctx, func(q *model.Queries) error {
+ machine, err = q.NewMachine(ctx)
+ if err != nil {
+ return err
+ }
+ err = q.MachineAddProvided(ctx, model.MachineAddProvidedParams{
+ MachineID: machine.MachineID,
+ Provider: model.ProviderEquinix,
+ ProviderID: "123",
+ })
+ if err != nil {
+ return err
+ }
+ return q.MachineSetAgentStarted(ctx, model.MachineSetAgentStartedParams{
+ MachineID: machine.MachineID,
+ AgentStartedAt: time.Now(),
+ AgentPublicKey: pub,
+ })
+ })
+ if err != nil {
+ t.Fatalf("could not create machine: %v", err)
+ }
+
+ // Now heartbeat with correct machine ID and key. This should succeed.
+ if err := heartbeat(machine.MachineID); err != nil {
+ t.Errorf("heartbeat should've succeeded, got: %v", err)
+ }
+
+ // TODO(q3k): test hardware report being attached once we have some debug API
+ // for tags.
+}
diff --git a/cloud/bmaas/server/api/BUILD.bazel b/cloud/bmaas/server/api/BUILD.bazel
new file mode 100644
index 0000000..01f3e3a
--- /dev/null
+++ b/cloud/bmaas/server/api/BUILD.bazel
@@ -0,0 +1,24 @@
+load("@rules_proto//proto:defs.bzl", "proto_library")
+load("@io_bazel_rules_go//go:def.bzl", "go_library")
+load("@io_bazel_rules_go//proto:def.bzl", "go_proto_library")
+
+proto_library(
+ name = "api_proto",
+ srcs = ["agent.proto"],
+ visibility = ["//visibility:public"],
+)
+
+go_proto_library(
+ name = "api_go_proto",
+ compilers = ["@io_bazel_rules_go//proto:go_grpc"],
+ importpath = "source.monogon.dev/cloud/bmaas/server/api",
+ proto = ":api_proto",
+ visibility = ["//visibility:public"],
+)
+
+go_library(
+ name = "api",
+ embed = [":api_go_proto"],
+ importpath = "source.monogon.dev/cloud/bmaas/server/api",
+ visibility = ["//visibility:public"],
+)
diff --git a/cloud/bmaas/server/api/agent.proto b/cloud/bmaas/server/api/agent.proto
new file mode 100644
index 0000000..c08c767
--- /dev/null
+++ b/cloud/bmaas/server/api/agent.proto
@@ -0,0 +1,36 @@
+syntax = "proto3";
+package cloud.bmaas.server.api;
+option go_package = "source.monogon.dev/cloud/bmaas/server/api";
+
+// AgentCallback runs on the BMDB Server and exposes a gRPC interface to agents
+// running on machines. These APIs are served over TLS using component-style
+// server certificates, but clients are authenticated using ephemeral
+// certificates proving ownership of an agent keypair.
+service AgentCallback {
+ // Heartbeat is called by agents repeatedly to upload a hardware report, signal
+ // liveness and retrieve actions to be prformed on a host.
+ //
+ // This isn't a streaming RPC as the current server implementation actually
+ // isn't reactive, so it would have to do its own inner polling to create
+ // a stream of updates. To keep things simple, we instead let the agent decide
+ // on the cadence of updates it wants to keep up with.
+ rpc Heartbeat(AgentHeartbeatRequest) returns (AgentHeartbeatResponse);
+}
+
+message AgentHardwareReport {
+ // TODO(lorenz): implement
+}
+
+message AgentHeartbeatRequest {
+ // MachineID that this agent represents. Technically not necessary since
+ // keypairs between agents should be unique, but this provides an extra layer
+ // of protection against programming bugs.
+ string machine_id = 1;
+ // Optional hardware report to be upserted for this machine. An agent should
+ // submit one at least once after it's started, as early as it can.
+ AgentHardwareReport hardware_report = 2;
+}
+
+message AgentHeartbeatResponse {
+ // Agent actions (like install, reboot, etc) go here.
+}
\ No newline at end of file
diff --git a/cloud/bmaas/server/cmd/BUILD.bazel b/cloud/bmaas/server/cmd/BUILD.bazel
new file mode 100644
index 0000000..dc43afc
--- /dev/null
+++ b/cloud/bmaas/server/cmd/BUILD.bazel
@@ -0,0 +1,15 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library")
+
+go_library(
+ name = "cmd_lib",
+ srcs = ["main.go"],
+ importpath = "source.monogon.dev/cloud/bmaas/server/cmd",
+ visibility = ["//visibility:private"],
+ deps = ["//cloud/bmaas/server"],
+)
+
+go_binary(
+ name = "cmd",
+ embed = [":cmd_lib"],
+ visibility = ["//visibility:public"],
+)
diff --git a/cloud/bmaas/server/cmd/main.go b/cloud/bmaas/server/cmd/main.go
new file mode 100644
index 0000000..59f6ca6
--- /dev/null
+++ b/cloud/bmaas/server/cmd/main.go
@@ -0,0 +1,21 @@
+package main
+
+import (
+ "context"
+ "flag"
+
+ "source.monogon.dev/cloud/bmaas/server"
+)
+
+func main() {
+ s := &server.Server{}
+ s.Config.RegisterFlags()
+ flag.Parse()
+
+ ctx, ctxC := context.WithCancel(context.Background())
+ // TODO: context cancel on interrupt.
+ _ = ctxC
+
+ s.Start(ctx)
+ select {}
+}
diff --git a/cloud/bmaas/server/server.go b/cloud/bmaas/server/server.go
new file mode 100644
index 0000000..97fb393
--- /dev/null
+++ b/cloud/bmaas/server/server.go
@@ -0,0 +1,111 @@
+package server
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "net"
+ "os"
+
+ "google.golang.org/grpc"
+ "google.golang.org/grpc/reflection"
+ "k8s.io/klog/v2"
+
+ "source.monogon.dev/cloud/bmaas/bmdb"
+ apb "source.monogon.dev/cloud/bmaas/server/api"
+ "source.monogon.dev/cloud/lib/component"
+)
+
+type Config struct {
+ Component component.ComponentConfig
+ BMDB bmdb.BMDB
+
+ // PublicListenAddress is the address at which the 'public' (agent-facing) gRPC
+ // server listener will run.
+ PublicListenAddress string
+}
+
+// TODO(q3k): factor this out to BMDB library?
+func runtimeInfo() string {
+ hostname, _ := os.Hostname()
+ if hostname == "" {
+ hostname = "UNKNOWN"
+ }
+ return fmt.Sprintf("host %s", hostname)
+}
+
+func (c *Config) RegisterFlags() {
+ c.Component.RegisterFlags("srv")
+ c.BMDB.ComponentName = "srv"
+ c.BMDB.RuntimeInfo = runtimeInfo()
+ c.BMDB.Database.RegisterFlags("bmdb")
+
+ flag.StringVar(&c.PublicListenAddress, "srv_public_grpc_listen_address", ":8080", "Address to listen at for public/user gRPC connections for bmdbsrv")
+}
+
+type Server struct {
+ Config Config
+
+ // ListenGRPC will contain the address at which the internal gRPC server is
+ // listening after .Start() has been called. This can differ from the configured
+ // value if the configuration requests any port (via :0).
+ ListenGRPC string
+ // ListenPublic will contain the address at which the 'public' (agent-facing)
+ // gRPC server is lsitening after .Start() has been called.
+ ListenPublic string
+
+ bmdb *bmdb.Connection
+ acsvc *agentCallbackService
+}
+
+func (s *Server) startPublic(ctx context.Context) {
+ g := grpc.NewServer(s.Config.Component.GRPCServerOptionsPublic()...)
+ lis, err := net.Listen("tcp", s.Config.PublicListenAddress)
+ if err != nil {
+ klog.Exitf("Could not listen: %v", err)
+ }
+ s.ListenPublic = lis.Addr().String()
+ apb.RegisterAgentCallbackServer(g, s.acsvc)
+ reflection.Register(g)
+
+ klog.Infof("Public API listening on %s", s.ListenPublic)
+ go func() {
+ err := g.Serve(lis)
+ if err != ctx.Err() {
+ klog.Exitf("Public gRPC serve failed: %v", err)
+ }
+ }()
+}
+
+func (s *Server) startInternalGRPC(ctx context.Context) {
+ g := grpc.NewServer(s.Config.Component.GRPCServerOptions()...)
+ lis, err := net.Listen("tcp", s.Config.Component.GRPCListenAddress)
+ if err != nil {
+ klog.Exitf("Could not listen: %v", err)
+ }
+ s.ListenGRPC = lis.Addr().String()
+
+ reflection.Register(g)
+ klog.Infof("Internal gRPC listening on %s", s.ListenGRPC)
+ go func() {
+ err := g.Serve(lis)
+ if err != ctx.Err() {
+ klog.Exitf("Internal gRPC serve failed: %v", err)
+ }
+ }()
+}
+
+// Start the BMaaS Server in background goroutines. This should only be called
+// once. The process will exit with debug logs if starting the server failed.
+func (s *Server) Start(ctx context.Context) {
+ conn, err := s.Config.BMDB.Open(true)
+ if err != nil {
+ klog.Exitf("Failed to connect to BMDB: %v", err)
+ }
+ s.acsvc = &agentCallbackService{
+ s: s,
+ }
+ s.bmdb = conn
+ s.startInternalGRPC(ctx)
+ s.startPublic(ctx)
+}