c/bmaas/bmdb: add Agent{Started,Heartbeat} tags and queries
This should be the required tags and queries for the first interactions
with the Shepherd subsystem.
Change-Id: I8c663803cfd936b11c59bce7db5abc94b99dd1db
Reviewed-on: https://review.monogon.dev/c/monogon/+/962
Tested-by: Jenkins CI
Reviewed-by: Mateusz Zalega <mateusz@monogon.tech>
diff --git a/cloud/bmaas/bmdb/BUILD.bazel b/cloud/bmaas/bmdb/BUILD.bazel
index 957a884..882d73c 100644
--- a/cloud/bmaas/bmdb/BUILD.bazel
+++ b/cloud/bmaas/bmdb/BUILD.bazel
@@ -20,7 +20,10 @@
go_test(
name = "bmdb_test",
- srcs = ["sessions_test.go"],
+ srcs = [
+ "queries_test.go",
+ "sessions_test.go",
+ ],
data = [
"@cockroach",
],
diff --git a/cloud/bmaas/bmdb/model/migrations/1662136250_initial.down.sql b/cloud/bmaas/bmdb/model/migrations/1662136250_initial.down.sql
index 6a85991..5f336d1 100644
--- a/cloud/bmaas/bmdb/model/migrations/1662136250_initial.down.sql
+++ b/cloud/bmaas/bmdb/model/migrations/1662136250_initial.down.sql
@@ -1,6 +1,3 @@
-DROP TABLE machine_agent_report;
-DROP TABLE machine_agent_installed;
-DROP TABLE machine_provided;
DROP TABLE work;
DROP TABLE sessions;
DROP TABLE machines;
diff --git a/cloud/bmaas/bmdb/model/migrations/1662136250_initial.up.sql b/cloud/bmaas/bmdb/model/migrations/1662136250_initial.up.sql
index b31324f..7b5b812 100644
--- a/cloud/bmaas/bmdb/model/migrations/1662136250_initial.up.sql
+++ b/cloud/bmaas/bmdb/model/migrations/1662136250_initial.up.sql
@@ -49,25 +49,4 @@
process process NOT NULL,
UNIQUE (machine_id, process),
CONSTRAINT "primary" PRIMARY KEY (machine_id, session_id, process)
-);
-
--- The following three tables are for illustrative purposes only.
-
-CREATE TABLE machine_provided (
- machine_id UUID NOT NULL REFERENCES machines(machine_id) ON DELETE CASCADE,
- provider STRING NOT NULL,
- provider_id STRING NOT NULL,
- CONSTRAINT "primary" PRIMARY KEY (machine_id)
-);
-
-CREATE TABLE machine_agent_installed (
- machine_id UUID NOT NULL REFERENCES machines(machine_id) ON DELETE CASCADE,
- CONSTRAINT "primary" PRIMARY KEY (machine_id)
-);
-
-CREATE TABLE machine_agent_report (
- machine_id UUID NOT NULL REFERENCES machines(machine_id) ON DELETE CASCADE,
- shape_cpu_count INT NOT NULL,
- shape_memory_megabytes INT NOT NULL,
- CONSTRAINT "primary" PRIMARY KEY (machine_id)
-);
+);
\ No newline at end of file
diff --git a/cloud/bmaas/bmdb/model/migrations/1667232160_agent_tags.down.sql b/cloud/bmaas/bmdb/model/migrations/1667232160_agent_tags.down.sql
new file mode 100644
index 0000000..90bb586
--- /dev/null
+++ b/cloud/bmaas/bmdb/model/migrations/1667232160_agent_tags.down.sql
@@ -0,0 +1,3 @@
+DROP TABLE machine_provided;
+DROP TABLE machine_agent_started;
+DROP TABLE machine_agent_heartbeat;
diff --git a/cloud/bmaas/bmdb/model/migrations/1667232160_agent_tags.up.sql b/cloud/bmaas/bmdb/model/migrations/1667232160_agent_tags.up.sql
new file mode 100644
index 0000000..14701a6
--- /dev/null
+++ b/cloud/bmaas/bmdb/model/migrations/1667232160_agent_tags.up.sql
@@ -0,0 +1,46 @@
+CREATE TYPE provider AS ENUM (
+ 'Equinix'
+ -- More providers will follow in subsequent migrations.
+);
+
+-- tag MachineProvided {
+-- Provider Provider
+-- ProviderID String
+-- }
+-- Represents the fact that a machine is backed by a machine from a given
+-- provider identified there with a given provider id.
+CREATE TABLE machine_provided (
+ machine_id UUID NOT NULL REFERENCES machines(machine_id) ON DELETE RESTRICT,
+ provider provider NOT NULL,
+ provider_id STRING(128) NOT NULL,
+ CONSTRAINT "primary" PRIMARY KEY (machine_id),
+ UNIQUE (provider, provider_id)
+);
+
+-- tag AgentStarted {
+-- StartedAt time.Time
+-- PublicKey []byte
+-- }
+-- Represents the fact that a machine has had the Agent started on it at some
+-- given time, and that the agent returned a given public key which it will use
+-- to authenticate itself to the bmdb API server.
+CREATE TABLE machine_agent_started (
+ machine_id UUID NOT NULL REFERENCES machines(machine_id) ON DELETE RESTRICT,
+ agent_started_at TIMESTAMPTZ NOT NULL,
+ agent_public_key BYTES NOT NULL,
+ CONSTRAINT "primary" PRIMARY KEY(machine_id)
+);
+
+-- tag AgentHeartbeat {
+-- At time.Time
+-- }
+-- Represents a successful heartbeat send by the Agent running on a machine at
+-- some given time.
+CREATE TABLE machine_agent_heartbeat (
+ machine_id UUID NOT NULL REFERENCES machines(machine_id) ON DELETE RESTRICT,
+ agent_heartbeat_at TIMESTAMPTZ NOT NULL,
+ CONSTRAINT "primary" PRIMARY KEY(machine_id)
+);
+
+-- Used by the Shepherd when performing direct actions against a machine.
+ALTER TYPE process ADD VALUE IF NOT EXISTS 'ShepherdInstall';
\ No newline at end of file
diff --git a/cloud/bmaas/bmdb/model/queries.sql b/cloud/bmaas/bmdb/model/queries.sql
index ee3f618..4d15a79 100644
--- a/cloud/bmaas/bmdb/model/queries.sql
+++ b/cloud/bmaas/bmdb/model/queries.sql
@@ -51,14 +51,68 @@
$1, $2, $3
);
--- name: GetMachinesNeedingInstall :many
+-- name: MachineSetAgentStarted :exec
+INSERT INTO machine_agent_started (
+ machine_id, agent_started_at, agent_public_key
+) VALUES (
+ $1, $2, $3
+) ON CONFLICT (machine_id) DO UPDATE SET
+ agent_started_at = $2,
+ agent_public_key = $3
+;
+
+-- name: MachineSetAgentHeartbeat :exec
+INSERT INTO machine_agent_heartbeat (
+ machine_id, agent_heartbeat_at
+) VALUES (
+ $1, $2
+) ON CONFLICT (machine_id) DO UPDATE SET
+ agent_heartbeat_at = $2
+;
+
+-- name: GetMachinesForAgentStart :many
+-- Get machines that need agent installed for the first time. Machine can be
+-- assumed to be 'new', with no previous attempts or failures.
SELECT
machine_provided.*
FROM machines
INNER JOIN machine_provided ON machines.machine_id = machine_provided.machine_id
-LEFT JOIN work ON machines.machine_id = work.machine_id AND work.process = 'NecromancerInstall'
-LEFT JOIN machine_agent_installed ON machines.machine_id = machine_agent_installed.machine_id
-WHERE machine_agent_installed.machine_id IS NULL
+LEFT JOIN work ON machines.machine_id = work.machine_id AND work.process = 'ShepherdInstall'
+LEFT JOIN machine_agent_started ON machines.machine_id = machine_agent_started.machine_id
+WHERE
+ machine_agent_started.machine_id IS NULL
+ -- TODO(q3k): exclude machines which are not expected to run the agent (eg.
+ -- are already exposed to a user).
AND work.machine_id IS NULL
LIMIT $1;
+-- name: GetMachineForAgentRecovery :many
+-- Get machines that need agent installed after something went wrong. Either
+-- the agent started but never responded, or the agent stopped responding at
+-- some point, or the machine is being reinstalled after failure. Assume some
+-- work needs to be performed on the shepherd side to diagnose and recover
+-- whatever state the machine truly is in.
+SELECT
+ machine_provided.*
+FROM machines
+INNER JOIN machine_provided ON machines.machine_id = machine_provided.machine_id
+LEFT JOIN work ON machines.machine_id = work.machine_id AND work.process = 'ShepherdInstall'
+LEFT JOIN machine_agent_started ON machines.machine_id = machine_agent_started.machine_id
+LEFT JOIN machine_agent_heartbeat ON machines.machine_id = machine_agent_heartbeat.machine_id
+WHERE
+ -- Only act on machines where the agent is expected to have been started.
+ machine_agent_started.machine_id IS NOT NULL
+ AND (
+ -- No heartbeat 30 minutes after starting the agent.
+ (
+ machine_agent_heartbeat.machine_id IS NULL
+ AND now() > (machine_agent_started.agent_started_at + interval '30 minutes')
+ )
+ -- Heartbeats ceased for 10 minutes.
+ OR (
+ machine_agent_heartbeat.machine_id IS NOT NULL
+ AND now() > (machine_agent_heartbeat.agent_heartbeat_at + interval '10 minutes')
+ )
+ )
+ AND work.machine_id IS NULL
+LIMIT $1;
diff --git a/cloud/bmaas/bmdb/queries_test.go b/cloud/bmaas/bmdb/queries_test.go
new file mode 100644
index 0000000..8490930
--- /dev/null
+++ b/cloud/bmaas/bmdb/queries_test.go
@@ -0,0 +1,184 @@
+package bmdb
+
+import (
+ "context"
+ "fmt"
+ "testing"
+ "time"
+
+ "source.monogon.dev/cloud/bmaas/bmdb/model"
+)
+
+// TestAgentStart exercises GetMachinesForAgentStart.
+func TestAgentStart(t *testing.T) {
+ b := dut()
+ conn, err := b.Open(true)
+ if err != nil {
+ t.Fatalf("Open failed: %v", err)
+ }
+
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+
+ session, err := conn.StartSession(ctx)
+ if err != nil {
+ t.Fatalf("Starting session failed: %v", err)
+ }
+
+ // Create a test machine.
+ var machine model.Machine
+ err = session.Transact(ctx, func(q *model.Queries) error {
+ machine, err = q.NewMachine(ctx)
+ return err
+ })
+ if err != nil {
+ t.Fatalf("Creating machine failed: %v", err)
+ }
+
+ // It should be, by default, not a candidate for agent start as it's not yet
+ // provided by any provider.
+ expectNoCandidates := func() {
+ if err := session.Transact(ctx, func(q *model.Queries) error {
+ candidates, err := q.GetMachinesForAgentStart(ctx, 1)
+ if err != nil {
+ t.Fatalf("Could not retrieve machines for agent start: %v", err)
+ }
+ if want, got := 0, len(candidates); want != got {
+ t.Fatalf("Wanted %d machines for agent start, got %+v", want, candidates)
+ }
+ return nil
+ }); err != nil {
+ t.Fatal(err)
+ }
+ }
+ expectNoCandidates()
+
+ // Provide machine, and check it is now a candidate.
+ if err := session.Transact(ctx, func(q *model.Queries) error {
+ return q.MachineAddProvided(ctx, model.MachineAddProvidedParams{
+ MachineID: machine.MachineID,
+ Provider: model.ProviderEquinix,
+ ProviderID: "123",
+ })
+ }); err != nil {
+ t.Fatalf("could not add provided tag to machine: %v", err)
+ }
+ if err := session.Transact(ctx, func(q *model.Queries) error {
+ candidates, err := q.GetMachinesForAgentStart(ctx, 1)
+ if err != nil {
+ t.Fatalf("Could not retrieve machines for agent start: %v", err)
+ }
+ if want, got := 1, len(candidates); want != got {
+ t.Fatalf("Wanted %d machines for agent start, got %+v", want, candidates)
+ }
+ if want, got := machine.MachineID, candidates[0].MachineID; want != got {
+ t.Fatalf("Wanted %s for agent start, got %s", want, got)
+ }
+ return nil
+ }); err != nil {
+ t.Fatal(err)
+ }
+
+ // Add a start tag. Machine shouldn't be a candidate anymore.
+ if err := session.Transact(ctx, func(q *model.Queries) error {
+ return q.MachineSetAgentStarted(ctx, model.MachineSetAgentStartedParams{
+ MachineID: machine.MachineID,
+ AgentStartedAt: time.Now(),
+ AgentPublicKey: []byte("fakefakefakefake"),
+ })
+ }); err != nil {
+ t.Fatalf("could not add provided tag to machine: %v", err)
+ }
+ expectNoCandidates()
+}
+
+// TestAgentRecovery exercises GetMachinesForAgentRecovery though a few
+// different scenarios in which a test machine is present with different tags
+// set.
+func TestAgentRecovery(t *testing.T) {
+ b := dut()
+ conn, err := b.Open(true)
+ if err != nil {
+ t.Fatalf("Open failed: %v", err)
+ }
+
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+
+ session, err := conn.StartSession(ctx)
+ if err != nil {
+ t.Fatalf("Starting session failed: %v", err)
+ }
+
+ for i, scenario := range []struct {
+ // Whether recovery is expected to run.
+ wantRun bool
+ // started will add a AgentStarted tag for a given time, if set.
+ started time.Time
+ // heartbeat will add a AgentHeartbeat tag for a given time, if set.
+ heartbeat time.Time
+ }{
+ // No start, no heartbeat -> no recovery expected.
+ {false, time.Time{}, time.Time{}},
+ // Started recently, no heartbeat -> no recovery expected.
+ {false, time.Now(), time.Time{}},
+ // Started a while ago, heartbeat active -> no recovery expected.
+ {false, time.Now().Add(-40 * time.Minute), time.Now()},
+
+ // Started a while ago, no heartbeat -> recovery expected.
+ {true, time.Now().Add(-40 * time.Minute), time.Time{}},
+ // Started a while ago, no recent heartbeat -> recovery expected.
+ {true, time.Now().Add(-40 * time.Minute), time.Now().Add(-20 * time.Minute)},
+ } {
+ if err := session.Transact(ctx, func(q *model.Queries) error {
+ machine, err := q.NewMachine(ctx)
+ if err != nil {
+ return fmt.Errorf("NewMachine: %w", err)
+ }
+ if err := q.MachineAddProvided(ctx, model.MachineAddProvidedParams{
+ MachineID: machine.MachineID,
+ Provider: model.ProviderEquinix,
+ ProviderID: fmt.Sprintf("test-%d", i),
+ }); err != nil {
+ return fmt.Errorf("MachineAddProvided: %w", err)
+ }
+ if !scenario.started.IsZero() {
+ if err := q.MachineSetAgentStarted(ctx, model.MachineSetAgentStartedParams{
+ MachineID: machine.MachineID,
+ AgentStartedAt: scenario.started,
+ AgentPublicKey: []byte("fake"),
+ }); err != nil {
+ return fmt.Errorf("MachineSetAgentStarted: %w", err)
+ }
+ }
+ if !scenario.heartbeat.IsZero() {
+ if err := q.MachineSetAgentHeartbeat(ctx, model.MachineSetAgentHeartbeatParams{
+ MachineID: machine.MachineID,
+ AgentHeartbeatAt: scenario.heartbeat,
+ }); err != nil {
+ return fmt.Errorf("MachineSetAgentHeartbeat: %w", err)
+ }
+ }
+ return nil
+ }); err != nil {
+ t.Errorf("%d: setup failed: %v", i, err)
+ continue
+ }
+
+ if err := session.Transact(ctx, func(q *model.Queries) error {
+ candidates, err := q.GetMachineForAgentRecovery(ctx, 1)
+ if err != nil {
+ return fmt.Errorf("GetMachinesForAgentRecovery: %w", err)
+ }
+ if scenario.wantRun && len(candidates) == 0 {
+ return fmt.Errorf("machine unscheduled for recovery")
+ }
+ if !scenario.wantRun && len(candidates) != 0 {
+ return fmt.Errorf("machine scheduled for recovery")
+ }
+ return nil
+ }); err != nil {
+ t.Errorf("%d: test failed: %v", i, err)
+ }
+ }
+}