cloud/shepherd/equinix: implement recoverer

This implements basic recovery functionality for 'stuck' agents. The
shepherd will notice machines with a agent that either never sent a
heartbeat, or stopped sending heartbeats, and will remove their agent
started tags and reboot the machine. Then, the main agent start logic
should kick in again.

More complex recovery flows can be implemented later, this will do for
now.

Change-Id: I2c1b0d0465e4e302cdecce950a041581c2dc8548
Reviewed-on: https://review.monogon.dev/c/monogon/+/1560
Tested-by: Jenkins CI
Reviewed-by: Tim Windelschmidt <tim@monogon.tech>
diff --git a/cloud/shepherd/equinix/manager/recoverer.go b/cloud/shepherd/equinix/manager/recoverer.go
new file mode 100644
index 0000000..f323d03
--- /dev/null
+++ b/cloud/shepherd/equinix/manager/recoverer.go
@@ -0,0 +1,64 @@
+package manager
+
+import (
+	"context"
+	"fmt"
+
+	"k8s.io/klog/v2"
+
+	"source.monogon.dev/cloud/bmaas/bmdb/model"
+	ecl "source.monogon.dev/cloud/shepherd/equinix/wrapngo"
+)
+
+type RecovererConfig struct {
+	ControlLoopConfig
+}
+
+func (r *RecovererConfig) RegisterFlags() {
+	r.ControlLoopConfig.RegisterFlags("recoverer")
+}
+
+// The Recoverer reboots machines whose agent has stopped sending heartbeats or
+// has not sent any heartbeats at all.
+type Recoverer struct {
+	RecovererConfig
+
+	cl ecl.Client
+}
+
+func NewRecoverer(cl ecl.Client, rc RecovererConfig) (*Recoverer, error) {
+	if err := rc.ControlLoopConfig.Check(); err != nil {
+		return nil, err
+	}
+	return &Recoverer{
+		RecovererConfig: rc,
+		cl:              cl,
+	}, nil
+}
+
+func (r *Recoverer) getMachines(ctx context.Context, q *model.Queries, limit int32) ([]model.MachineProvided, error) {
+	return q.GetMachineForAgentRecovery(ctx, limit)
+}
+
+func (r *Recoverer) processMachine(ctx context.Context, t *task) error {
+	klog.Infof("Starting recovery of device (ID: %s, PID %s)", t.machine.MachineID, t.machine.ProviderID)
+
+	if err := r.cl.RebootDevice(ctx, t.machine.ProviderID); err != nil {
+		return fmt.Errorf("failed to reboot device: %w", err)
+	}
+
+	klog.Infof("Removing AgentStarted/AgentHeartbeat (ID: %s, PID: %s)...", t.machine.MachineID, t.machine.ProviderID)
+	err := t.work.Finish(ctx, func(q *model.Queries) error {
+		if err := q.MachineDeleteAgentStarted(ctx, t.machine.MachineID); err != nil {
+			return fmt.Errorf("while deleting AgentStarted: %w", err)
+		}
+		if err := q.MachineDeleteAgentHeartbeat(ctx, t.machine.MachineID); err != nil {
+			return fmt.Errorf("while deleting AgentHeartbeat: %w", err)
+		}
+		return nil
+	})
+	if err != nil {
+		return fmt.Errorf("while deleting AgentStarted/AgentHeartbeat tags: %w", err)
+	}
+	return nil
+}