c/bmaas/bmdb: add Agent{Started,Heartbeat} tags and queries

This should be the required tags and queries for the first interactions
with the Shepherd subsystem.

Change-Id: I8c663803cfd936b11c59bce7db5abc94b99dd1db
Reviewed-on: https://review.monogon.dev/c/monogon/+/962
Tested-by: Jenkins CI
Reviewed-by: Mateusz Zalega <mateusz@monogon.tech>
diff --git a/cloud/bmaas/bmdb/BUILD.bazel b/cloud/bmaas/bmdb/BUILD.bazel
index 957a884..882d73c 100644
--- a/cloud/bmaas/bmdb/BUILD.bazel
+++ b/cloud/bmaas/bmdb/BUILD.bazel
@@ -20,7 +20,10 @@
 
 go_test(
     name = "bmdb_test",
-    srcs = ["sessions_test.go"],
+    srcs = [
+        "queries_test.go",
+        "sessions_test.go",
+    ],
     data = [
         "@cockroach",
     ],
diff --git a/cloud/bmaas/bmdb/model/migrations/1662136250_initial.down.sql b/cloud/bmaas/bmdb/model/migrations/1662136250_initial.down.sql
index 6a85991..5f336d1 100644
--- a/cloud/bmaas/bmdb/model/migrations/1662136250_initial.down.sql
+++ b/cloud/bmaas/bmdb/model/migrations/1662136250_initial.down.sql
@@ -1,6 +1,3 @@
-DROP TABLE machine_agent_report;
-DROP TABLE machine_agent_installed;
-DROP TABLE machine_provided;
 DROP TABLE work;
 DROP TABLE sessions;
 DROP TABLE machines;
diff --git a/cloud/bmaas/bmdb/model/migrations/1662136250_initial.up.sql b/cloud/bmaas/bmdb/model/migrations/1662136250_initial.up.sql
index b31324f..7b5b812 100644
--- a/cloud/bmaas/bmdb/model/migrations/1662136250_initial.up.sql
+++ b/cloud/bmaas/bmdb/model/migrations/1662136250_initial.up.sql
@@ -49,25 +49,4 @@
     process process NOT NULL,
     UNIQUE (machine_id, process),
     CONSTRAINT "primary" PRIMARY KEY (machine_id, session_id, process)
-);
-
--- The following three tables are for illustrative purposes only.
-
-CREATE TABLE machine_provided (
-    machine_id UUID NOT NULL REFERENCES machines(machine_id) ON DELETE CASCADE,
-    provider STRING NOT NULL,
-    provider_id STRING NOT NULL,
-    CONSTRAINT "primary" PRIMARY KEY (machine_id)
-);
-
-CREATE TABLE machine_agent_installed (
-    machine_id UUID NOT NULL REFERENCES machines(machine_id) ON DELETE CASCADE,
-    CONSTRAINT "primary" PRIMARY KEY (machine_id)
-);
-
-CREATE TABLE machine_agent_report (
-    machine_id UUID NOT NULL REFERENCES machines(machine_id) ON DELETE CASCADE,
-    shape_cpu_count INT NOT NULL,
-    shape_memory_megabytes INT NOT NULL,
-    CONSTRAINT "primary" PRIMARY KEY (machine_id)
-);
+);
\ No newline at end of file
diff --git a/cloud/bmaas/bmdb/model/migrations/1667232160_agent_tags.down.sql b/cloud/bmaas/bmdb/model/migrations/1667232160_agent_tags.down.sql
new file mode 100644
index 0000000..90bb586
--- /dev/null
+++ b/cloud/bmaas/bmdb/model/migrations/1667232160_agent_tags.down.sql
@@ -0,0 +1,3 @@
+DROP TABLE machine_provided;
+DROP TABLE machine_agent_started;
+DROP TABLE machine_agent_heartbeat;
diff --git a/cloud/bmaas/bmdb/model/migrations/1667232160_agent_tags.up.sql b/cloud/bmaas/bmdb/model/migrations/1667232160_agent_tags.up.sql
new file mode 100644
index 0000000..14701a6
--- /dev/null
+++ b/cloud/bmaas/bmdb/model/migrations/1667232160_agent_tags.up.sql
@@ -0,0 +1,46 @@
+CREATE TYPE provider AS ENUM (
+    'Equinix'
+    -- More providers will follow in subsequent migrations.
+);
+
+-- tag MachineProvided {
+--     Provider Provider
+--     ProviderID String
+-- }
+-- Represents the fact that a machine is backed by a machine from a given
+-- provider identified there with a given provider id.
+CREATE TABLE machine_provided (
+    machine_id UUID NOT NULL REFERENCES machines(machine_id) ON DELETE RESTRICT,
+    provider provider NOT NULL,
+    provider_id STRING(128) NOT NULL,
+    CONSTRAINT "primary" PRIMARY KEY (machine_id),
+    UNIQUE (provider, provider_id)
+);
+
+-- tag AgentStarted {
+--     StartedAt time.Time
+--     PublicKey []byte
+-- }
+-- Represents the fact that a machine has had the Agent started on it at some
+-- given time, and that the agent returned a given public key which it will use
+-- to authenticate itself to the bmdb API server.
+CREATE TABLE machine_agent_started (
+    machine_id UUID NOT NULL REFERENCES machines(machine_id) ON DELETE RESTRICT,
+    agent_started_at TIMESTAMPTZ NOT NULL,
+    agent_public_key BYTES NOT NULL,
+    CONSTRAINT "primary" PRIMARY KEY(machine_id)
+);
+
+-- tag AgentHeartbeat {
+--     At time.Time
+-- }
+-- Represents a successful heartbeat send by the Agent running on a machine at
+-- some given time.
+CREATE TABLE machine_agent_heartbeat (
+    machine_id UUID NOT NULL REFERENCES machines(machine_id) ON DELETE RESTRICT,
+    agent_heartbeat_at TIMESTAMPTZ NOT NULL,
+    CONSTRAINT "primary" PRIMARY KEY(machine_id)
+);
+
+-- Used by the Shepherd when performing direct actions against a machine.
+ALTER TYPE process ADD VALUE IF NOT EXISTS 'ShepherdInstall';
\ No newline at end of file
diff --git a/cloud/bmaas/bmdb/model/queries.sql b/cloud/bmaas/bmdb/model/queries.sql
index ee3f618..4d15a79 100644
--- a/cloud/bmaas/bmdb/model/queries.sql
+++ b/cloud/bmaas/bmdb/model/queries.sql
@@ -51,14 +51,68 @@
     $1, $2, $3
 );
 
--- name: GetMachinesNeedingInstall :many
+-- name: MachineSetAgentStarted :exec
+INSERT INTO machine_agent_started (
+    machine_id, agent_started_at, agent_public_key
+) VALUES (
+    $1, $2, $3
+) ON CONFLICT (machine_id) DO UPDATE SET
+    agent_started_at = $2,
+    agent_public_key = $3
+;
+
+-- name: MachineSetAgentHeartbeat :exec
+INSERT INTO machine_agent_heartbeat (
+    machine_id, agent_heartbeat_at
+) VALUES (
+    $1, $2
+) ON CONFLICT (machine_id) DO UPDATE SET
+    agent_heartbeat_at = $2
+;
+
+-- name: GetMachinesForAgentStart :many
+-- Get machines that need agent installed for the first time. Machine can be
+-- assumed to be 'new', with no previous attempts or failures.
 SELECT
     machine_provided.*
 FROM machines
 INNER JOIN machine_provided ON machines.machine_id = machine_provided.machine_id
-LEFT JOIN work ON machines.machine_id = work.machine_id AND work.process = 'NecromancerInstall'
-LEFT JOIN machine_agent_installed ON machines.machine_id = machine_agent_installed.machine_id
-WHERE machine_agent_installed.machine_id IS NULL
+LEFT JOIN work ON machines.machine_id = work.machine_id AND work.process = 'ShepherdInstall'
+LEFT JOIN machine_agent_started ON machines.machine_id = machine_agent_started.machine_id
+WHERE
+  machine_agent_started.machine_id IS NULL
+  -- TODO(q3k): exclude machines which are not expected to run the agent (eg.
+  -- are already exposed to a user).
   AND work.machine_id IS NULL
 LIMIT $1;
 
+-- name: GetMachineForAgentRecovery :many
+-- Get machines that need agent installed after something went wrong. Either
+-- the agent started but never responded, or the agent stopped responding at
+-- some point, or the machine is being reinstalled after failure. Assume some
+-- work needs to be performed on the shepherd side to diagnose and recover
+-- whatever state the machine truly is in.
+SELECT
+    machine_provided.*
+FROM machines
+INNER JOIN machine_provided ON machines.machine_id = machine_provided.machine_id
+LEFT JOIN work ON machines.machine_id = work.machine_id AND work.process = 'ShepherdInstall'
+LEFT JOIN machine_agent_started ON machines.machine_id = machine_agent_started.machine_id
+LEFT JOIN machine_agent_heartbeat ON machines.machine_id = machine_agent_heartbeat.machine_id
+WHERE
+  -- Only act on machines where the agent is expected to have been started.
+  machine_agent_started.machine_id IS NOT NULL
+  AND (
+    -- No heartbeat 30 minutes after starting the agent.
+    (
+        machine_agent_heartbeat.machine_id IS NULL
+        AND now() > (machine_agent_started.agent_started_at + interval '30 minutes')
+    )
+    -- Heartbeats ceased for 10 minutes.
+    OR (
+        machine_agent_heartbeat.machine_id IS NOT NULL
+        AND now() > (machine_agent_heartbeat.agent_heartbeat_at + interval '10 minutes')
+    )
+  )
+  AND work.machine_id IS NULL
+LIMIT $1;
diff --git a/cloud/bmaas/bmdb/queries_test.go b/cloud/bmaas/bmdb/queries_test.go
new file mode 100644
index 0000000..8490930
--- /dev/null
+++ b/cloud/bmaas/bmdb/queries_test.go
@@ -0,0 +1,184 @@
+package bmdb
+
+import (
+	"context"
+	"fmt"
+	"testing"
+	"time"
+
+	"source.monogon.dev/cloud/bmaas/bmdb/model"
+)
+
+// TestAgentStart exercises GetMachinesForAgentStart.
+func TestAgentStart(t *testing.T) {
+	b := dut()
+	conn, err := b.Open(true)
+	if err != nil {
+		t.Fatalf("Open failed: %v", err)
+	}
+
+	ctx, ctxC := context.WithCancel(context.Background())
+	defer ctxC()
+
+	session, err := conn.StartSession(ctx)
+	if err != nil {
+		t.Fatalf("Starting session failed: %v", err)
+	}
+
+	// Create a test machine.
+	var machine model.Machine
+	err = session.Transact(ctx, func(q *model.Queries) error {
+		machine, err = q.NewMachine(ctx)
+		return err
+	})
+	if err != nil {
+		t.Fatalf("Creating machine failed: %v", err)
+	}
+
+	// It should be, by default, not a candidate for agent start as it's not yet
+	// provided by any provider.
+	expectNoCandidates := func() {
+		if err := session.Transact(ctx, func(q *model.Queries) error {
+			candidates, err := q.GetMachinesForAgentStart(ctx, 1)
+			if err != nil {
+				t.Fatalf("Could not retrieve machines for agent start: %v", err)
+			}
+			if want, got := 0, len(candidates); want != got {
+				t.Fatalf("Wanted %d machines for agent start, got %+v", want, candidates)
+			}
+			return nil
+		}); err != nil {
+			t.Fatal(err)
+		}
+	}
+	expectNoCandidates()
+
+	// Provide machine, and check it is now a candidate.
+	if err := session.Transact(ctx, func(q *model.Queries) error {
+		return q.MachineAddProvided(ctx, model.MachineAddProvidedParams{
+			MachineID:  machine.MachineID,
+			Provider:   model.ProviderEquinix,
+			ProviderID: "123",
+		})
+	}); err != nil {
+		t.Fatalf("could not add provided tag to machine: %v", err)
+	}
+	if err := session.Transact(ctx, func(q *model.Queries) error {
+		candidates, err := q.GetMachinesForAgentStart(ctx, 1)
+		if err != nil {
+			t.Fatalf("Could not retrieve machines for agent start: %v", err)
+		}
+		if want, got := 1, len(candidates); want != got {
+			t.Fatalf("Wanted %d machines for agent start, got %+v", want, candidates)
+		}
+		if want, got := machine.MachineID, candidates[0].MachineID; want != got {
+			t.Fatalf("Wanted %s for agent start, got %s", want, got)
+		}
+		return nil
+	}); err != nil {
+		t.Fatal(err)
+	}
+
+	// Add a start tag. Machine shouldn't be a candidate anymore.
+	if err := session.Transact(ctx, func(q *model.Queries) error {
+		return q.MachineSetAgentStarted(ctx, model.MachineSetAgentStartedParams{
+			MachineID:      machine.MachineID,
+			AgentStartedAt: time.Now(),
+			AgentPublicKey: []byte("fakefakefakefake"),
+		})
+	}); err != nil {
+		t.Fatalf("could not add provided tag to machine: %v", err)
+	}
+	expectNoCandidates()
+}
+
+// TestAgentRecovery exercises GetMachinesForAgentRecovery though a few
+// different scenarios in which a test machine is present with different tags
+// set.
+func TestAgentRecovery(t *testing.T) {
+	b := dut()
+	conn, err := b.Open(true)
+	if err != nil {
+		t.Fatalf("Open failed: %v", err)
+	}
+
+	ctx, ctxC := context.WithCancel(context.Background())
+	defer ctxC()
+
+	session, err := conn.StartSession(ctx)
+	if err != nil {
+		t.Fatalf("Starting session failed: %v", err)
+	}
+
+	for i, scenario := range []struct {
+		// Whether recovery is expected to run.
+		wantRun bool
+		// started will add a AgentStarted tag for a given time, if set.
+		started time.Time
+		// heartbeat will add a AgentHeartbeat tag for a given time, if set.
+		heartbeat time.Time
+	}{
+		// No start, no heartbeat -> no recovery expected.
+		{false, time.Time{}, time.Time{}},
+		// Started recently, no heartbeat -> no recovery expected.
+		{false, time.Now(), time.Time{}},
+		// Started a while ago, heartbeat active -> no recovery expected.
+		{false, time.Now().Add(-40 * time.Minute), time.Now()},
+
+		// Started a while ago, no heartbeat -> recovery expected.
+		{true, time.Now().Add(-40 * time.Minute), time.Time{}},
+		// Started a while ago, no recent heartbeat -> recovery expected.
+		{true, time.Now().Add(-40 * time.Minute), time.Now().Add(-20 * time.Minute)},
+	} {
+		if err := session.Transact(ctx, func(q *model.Queries) error {
+			machine, err := q.NewMachine(ctx)
+			if err != nil {
+				return fmt.Errorf("NewMachine: %w", err)
+			}
+			if err := q.MachineAddProvided(ctx, model.MachineAddProvidedParams{
+				MachineID:  machine.MachineID,
+				Provider:   model.ProviderEquinix,
+				ProviderID: fmt.Sprintf("test-%d", i),
+			}); err != nil {
+				return fmt.Errorf("MachineAddProvided: %w", err)
+			}
+			if !scenario.started.IsZero() {
+				if err := q.MachineSetAgentStarted(ctx, model.MachineSetAgentStartedParams{
+					MachineID:      machine.MachineID,
+					AgentStartedAt: scenario.started,
+					AgentPublicKey: []byte("fake"),
+				}); err != nil {
+					return fmt.Errorf("MachineSetAgentStarted: %w", err)
+				}
+			}
+			if !scenario.heartbeat.IsZero() {
+				if err := q.MachineSetAgentHeartbeat(ctx, model.MachineSetAgentHeartbeatParams{
+					MachineID:        machine.MachineID,
+					AgentHeartbeatAt: scenario.heartbeat,
+				}); err != nil {
+					return fmt.Errorf("MachineSetAgentHeartbeat: %w", err)
+				}
+			}
+			return nil
+		}); err != nil {
+			t.Errorf("%d: setup failed: %v", i, err)
+			continue
+		}
+
+		if err := session.Transact(ctx, func(q *model.Queries) error {
+			candidates, err := q.GetMachineForAgentRecovery(ctx, 1)
+			if err != nil {
+				return fmt.Errorf("GetMachinesForAgentRecovery: %w", err)
+			}
+			if scenario.wantRun && len(candidates) == 0 {
+				return fmt.Errorf("machine unscheduled for recovery")
+			}
+			if !scenario.wantRun && len(candidates) != 0 {
+				return fmt.Errorf("machine scheduled for recovery")
+			}
+			return nil
+		}); err != nil {
+			t.Errorf("%d: test failed: %v", i, err)
+		}
+	}
+}