cloud/shepherd/equinix: implement recoverer

This implements basic recovery functionality for 'stuck' agents. The
shepherd will notice machines with a agent that either never sent a
heartbeat, or stopped sending heartbeats, and will remove their agent
started tags and reboot the machine. Then, the main agent start logic
should kick in again.

More complex recovery flows can be implemented later, this will do for
now.

Change-Id: I2c1b0d0465e4e302cdecce950a041581c2dc8548
Reviewed-on: https://review.monogon.dev/c/monogon/+/1560
Tested-by: Jenkins CI
Reviewed-by: Tim Windelschmidt <tim@monogon.tech>
diff --git a/cloud/bmaas/bmdb/model/queries_tags.sql b/cloud/bmaas/bmdb/model/queries_tags.sql
index 27f3a4c..a781509 100644
--- a/cloud/bmaas/bmdb/model/queries_tags.sql
+++ b/cloud/bmaas/bmdb/model/queries_tags.sql
@@ -53,4 +53,10 @@
 ;
 
 
--- name: MachineAgentReset :exec
\ No newline at end of file
+-- name: MachineDeleteAgentStarted :exec
+DELETE FROM machine_agent_started
+WHERE machine_id = $1;
+
+-- name: MachineDeleteAgentHeartbeat :exec
+DELETE FROM machine_agent_heartbeat
+WHERE machine_id = $1;
diff --git a/cloud/shepherd/equinix/manager/BUILD.bazel b/cloud/shepherd/equinix/manager/BUILD.bazel
index d6554e0..a4e5255 100644
--- a/cloud/shepherd/equinix/manager/BUILD.bazel
+++ b/cloud/shepherd/equinix/manager/BUILD.bazel
@@ -7,6 +7,7 @@
         "initializer.go",
         "manager.go",
         "provisioner.go",
+        "recoverer.go",
         "shared_config.go",
         "ssh.go",
     ],
@@ -36,6 +37,7 @@
         "fakequinix_test.go",
         "initializer_test.go",
         "provisioner_test.go",
+        "recoverer_test.go",
     ],
     data = [
         "//cloud/shepherd/equinix/manager/test_agent",
diff --git a/cloud/shepherd/equinix/manager/fakequinix_test.go b/cloud/shepherd/equinix/manager/fakequinix_test.go
index 3970373..d9e5683 100644
--- a/cloud/shepherd/equinix/manager/fakequinix_test.go
+++ b/cloud/shepherd/equinix/manager/fakequinix_test.go
@@ -19,6 +19,7 @@
 	devices      map[string]*packngo.Device
 	reservations map[string]*packngo.HardwareReservation
 	sshKeys      map[string]*packngo.SSHKey
+	reboots      map[string]int
 }
 
 // newFakequinix makes a fakequinix with a given fake project ID and number of
@@ -29,6 +30,7 @@
 		devices:      make(map[string]*packngo.Device),
 		reservations: make(map[string]*packngo.HardwareReservation),
 		sshKeys:      make(map[string]*packngo.SSHKey),
+		reboots:      make(map[string]int),
 	}
 
 	for i := 0; i < numReservations; i++ {
@@ -168,5 +170,14 @@
 	return key, nil
 }
 
+func (f *fakequinix) RebootDevice(_ context.Context, did string) error {
+	f.mu.Lock()
+	defer f.mu.Unlock()
+
+	f.reboots[did]++
+
+	return nil
+}
+
 func (f *fakequinix) Close() {
 }
diff --git a/cloud/shepherd/equinix/manager/recoverer.go b/cloud/shepherd/equinix/manager/recoverer.go
new file mode 100644
index 0000000..f323d03
--- /dev/null
+++ b/cloud/shepherd/equinix/manager/recoverer.go
@@ -0,0 +1,64 @@
+package manager
+
+import (
+	"context"
+	"fmt"
+
+	"k8s.io/klog/v2"
+
+	"source.monogon.dev/cloud/bmaas/bmdb/model"
+	ecl "source.monogon.dev/cloud/shepherd/equinix/wrapngo"
+)
+
+type RecovererConfig struct {
+	ControlLoopConfig
+}
+
+func (r *RecovererConfig) RegisterFlags() {
+	r.ControlLoopConfig.RegisterFlags("recoverer")
+}
+
+// The Recoverer reboots machines whose agent has stopped sending heartbeats or
+// has not sent any heartbeats at all.
+type Recoverer struct {
+	RecovererConfig
+
+	cl ecl.Client
+}
+
+func NewRecoverer(cl ecl.Client, rc RecovererConfig) (*Recoverer, error) {
+	if err := rc.ControlLoopConfig.Check(); err != nil {
+		return nil, err
+	}
+	return &Recoverer{
+		RecovererConfig: rc,
+		cl:              cl,
+	}, nil
+}
+
+func (r *Recoverer) getMachines(ctx context.Context, q *model.Queries, limit int32) ([]model.MachineProvided, error) {
+	return q.GetMachineForAgentRecovery(ctx, limit)
+}
+
+func (r *Recoverer) processMachine(ctx context.Context, t *task) error {
+	klog.Infof("Starting recovery of device (ID: %s, PID %s)", t.machine.MachineID, t.machine.ProviderID)
+
+	if err := r.cl.RebootDevice(ctx, t.machine.ProviderID); err != nil {
+		return fmt.Errorf("failed to reboot device: %w", err)
+	}
+
+	klog.Infof("Removing AgentStarted/AgentHeartbeat (ID: %s, PID: %s)...", t.machine.MachineID, t.machine.ProviderID)
+	err := t.work.Finish(ctx, func(q *model.Queries) error {
+		if err := q.MachineDeleteAgentStarted(ctx, t.machine.MachineID); err != nil {
+			return fmt.Errorf("while deleting AgentStarted: %w", err)
+		}
+		if err := q.MachineDeleteAgentHeartbeat(ctx, t.machine.MachineID); err != nil {
+			return fmt.Errorf("while deleting AgentHeartbeat: %w", err)
+		}
+		return nil
+	})
+	if err != nil {
+		return fmt.Errorf("while deleting AgentStarted/AgentHeartbeat tags: %w", err)
+	}
+	return nil
+}
diff --git a/cloud/shepherd/equinix/manager/recoverer_test.go b/cloud/shepherd/equinix/manager/recoverer_test.go
new file mode 100644
index 0000000..8583efd
--- /dev/null
+++ b/cloud/shepherd/equinix/manager/recoverer_test.go
@@ -0,0 +1,156 @@
+package manager
+
+import (
+	"context"
+	"testing"
+	"time"
+
+	"github.com/packethost/packngo"
+	"golang.org/x/time/rate"
+
+	"source.monogon.dev/cloud/bmaas/bmdb"
+	"source.monogon.dev/cloud/bmaas/bmdb/model"
+	"source.monogon.dev/cloud/lib/component"
+)
+
+type recovererDut struct {
+	f    *fakequinix
+	r    *Recoverer
+	bmdb *bmdb.Connection
+	ctx  context.Context
+}
+
+func newRecovererDut(t *testing.T) *recovererDut {
+	t.Helper()
+
+	rc := RecovererConfig{
+		ControlLoopConfig: ControlLoopConfig{
+			DBQueryLimiter: rate.NewLimiter(rate.Every(time.Second), 10),
+		},
+	}
+
+	f := newFakequinix("fake", 100)
+	r, err := NewRecoverer(f, rc)
+	if err != nil {
+		t.Fatalf("Could not create Initializer: %v", err)
+	}
+
+	b := bmdb.BMDB{
+		Config: bmdb.Config{
+			Database: component.CockroachConfig{
+				InMemory: true,
+			},
+			ComponentName: "test",
+			RuntimeInfo:   "test",
+		},
+	}
+	conn, err := b.Open(true)
+	if err != nil {
+		t.Fatalf("Could not create in-memory BMDB: %v", err)
+	}
+
+	ctx, ctxC := context.WithCancel(context.Background())
+	t.Cleanup(ctxC)
+
+	go RunControlLoop(ctx, conn, r)
+
+	return &recovererDut{
+		f:    f,
+		r:    r,
+		bmdb: conn,
+		ctx:  ctx,
+	}
+}
+
+// TestRecoverySmokes makes sure that the Initializer in recovery mode doesn't go
+// up in flames on the happy path.
+func TestRecoverySmokes(t *testing.T) {
+	dut := newRecovererDut(t)
+	f := dut.f
+	ctx := dut.ctx
+	conn := dut.bmdb
+
+	reservations, _ := f.ListReservations(ctx, "fake")
+
+	sess, err := conn.StartSession(ctx)
+	if err != nil {
+		t.Fatalf("Failed to create BMDB session: %v", err)
+	}
+
+	// Create test machine that should be selected for recovery.
+	// First in Fakequinix...
+	dev, _ := f.CreateDevice(ctx, &packngo.DeviceCreateRequest{
+		Hostname:              "test-devices",
+		OS:                    "fake",
+		ProjectID:             "fake",
+		HardwareReservationID: reservations[0].ID,
+		ProjectSSHKeys:        []string{},
+	})
+	// ... and in BMDB.
+	err = sess.Transact(ctx, func(q *model.Queries) error {
+		machine, err := q.NewMachine(ctx)
+		if err != nil {
+			return err
+		}
+		err = q.MachineAddProvided(ctx, model.MachineAddProvidedParams{
+			MachineID:  machine.MachineID,
+			Provider:   model.ProviderEquinix,
+			ProviderID: dev.ID,
+		})
+		if err != nil {
+			return err
+		}
+		return q.MachineSetAgentStarted(ctx, model.MachineSetAgentStartedParams{
+			MachineID:      machine.MachineID,
+			AgentStartedAt: time.Now().Add(time.Hour * -10),
+			AgentPublicKey: []byte("fakefakefakefake"),
+		})
+	})
+	if err != nil {
+		t.Fatalf("Failed to create test machine: %v", err)
+	}
+
+	// Expect to find 0 machines needing recovery.
+	deadline := time.Now().Add(10 * time.Second)
+	for {
+		if time.Now().After(deadline) {
+			t.Fatalf("Machines did not get processed in time")
+		}
+		time.Sleep(100 * time.Millisecond)
+
+		var machines []model.MachineProvided
+		err = sess.Transact(ctx, func(q *model.Queries) error {
+			var err error
+			machines, err = q.GetMachineForAgentRecovery(ctx, 100)
+			return err
+		})
+		if err != nil {
+			t.Fatalf("Failed to run Transaction: %v", err)
+		}
+		if len(machines) == 0 {
+			break
+		}
+	}
+
+	// Expect the target machine to have been rebooted.
+	dut.f.mu.Lock()
+	reboots := dut.f.reboots[dev.ID]
+	dut.f.mu.Unlock()
+	if want, got := 1, reboots; want != got {
+		t.Fatalf("Wanted %d reboot, got %d", want, got)
+	}
+
+	// Expect machine to now be available again for agent start.
+	var machines []model.MachineProvided
+	err = sess.Transact(ctx, func(q *model.Queries) error {
+		var err error
+		machines, err = q.GetMachinesForAgentStart(ctx, 100)
+		return err
+	})
+	if err != nil {
+		t.Fatalf("Failed to run Transaction: %v", err)
+	}
+	if want, got := 1, len(machines); want != got {
+		t.Fatalf("Wanted %d machine ready for agent start, got %d", want, got)
+	}
+}
diff --git a/cloud/shepherd/equinix/manager/server/main.go b/cloud/shepherd/equinix/manager/server/main.go
index e9b9289..2a293dc 100644
--- a/cloud/shepherd/equinix/manager/server/main.go
+++ b/cloud/shepherd/equinix/manager/server/main.go
@@ -24,6 +24,7 @@
 	SharedConfig      manager.SharedConfig
 	ProvisionerConfig manager.ProvisionerConfig
 	InitializerConfig manager.InitializerConfig
+	RecovererConfig   manager.RecovererConfig
 	WebugConfig       webug.Config
 	API               wrapngo.Opts
 }
@@ -46,6 +47,7 @@
 	c.SharedConfig.RegisterFlags("")
 	c.ProvisionerConfig.RegisterFlags()
 	c.InitializerConfig.RegisterFlags()
+	c.RecovererConfig.RegisterFlags()
 	c.WebugConfig.RegisterFlags()
 	c.API.RegisterFlags()
 }
@@ -87,6 +89,11 @@
 		klog.Exitf("%v", err)
 	}
 
+	recoverer, err := manager.NewRecoverer(api, c.RecovererConfig)
+	if err != nil {
+		klog.Exitf("%v", err)
+	}
+
 	conn, err := c.BMDB.Open(true)
 	if err != nil {
 		klog.Exitf("Failed to open BMDB connection: %v", err)
@@ -105,6 +112,12 @@
 		}
 	}()
 	go func() {
+		err = manager.RunControlLoop(ctx, conn, recoverer)
+		if err != nil {
+			klog.Exit(err)
+		}
+	}()
+	go func() {
 		if err := c.WebugConfig.Start(ctx, conn); err != nil && err != ctx.Err() {
 			klog.Exitf("Failed to start webug: %v", err)
 		}
diff --git a/cloud/shepherd/equinix/wrapngo/wrapn.go b/cloud/shepherd/equinix/wrapngo/wrapn.go
index 400556b..dc54340 100644
--- a/cloud/shepherd/equinix/wrapngo/wrapn.go
+++ b/cloud/shepherd/equinix/wrapngo/wrapn.go
@@ -113,6 +113,7 @@
 	// package comment for information on this method's behavior and returned error
 	// values.
 	UpdateSSHKey(ctx context.Context, kid string, req *packngo.SSHKeyUpdateRequest) (*packngo.SSHKey, error)
+	RebootDevice(ctx context.Context, did string) error
 
 	Close()
 }
@@ -314,3 +315,11 @@
 		return k, nil
 	})
 }
+
+func (e *client) RebootDevice(ctx context.Context, did string) error {
+	_, err := wrap(ctx, e, func(cl *packngo.Client) (struct{}, error) {
+		_, err := cl.Devices.Reboot(did)
+		return struct{}{}, err
+	})
+	return err
+}