cloud: split shepherd up

Change-Id: I8e386d9eaaf17543743e1e8a37a8d71426910d59
Reviewed-on: https://review.monogon.dev/c/monogon/+/2213
Reviewed-by: Serge Bazanski <serge@monogon.tech>
Tested-by: Jenkins CI
diff --git a/cloud/shepherd/manager/recoverer.go b/cloud/shepherd/manager/recoverer.go
new file mode 100644
index 0000000..a94700a
--- /dev/null
+++ b/cloud/shepherd/manager/recoverer.go
@@ -0,0 +1,81 @@
+package manager
+
+import (
+	"context"
+	"fmt"
+	"time"
+
+	"k8s.io/klog/v2"
+
+	"source.monogon.dev/cloud/bmaas/bmdb"
+	"source.monogon.dev/cloud/bmaas/bmdb/metrics"
+	"source.monogon.dev/cloud/bmaas/bmdb/model"
+	"source.monogon.dev/cloud/shepherd"
+)
+
+type RecovererConfig struct {
+	ControlLoopConfig
+}
+
+func (r *RecovererConfig) RegisterFlags() {
+	r.ControlLoopConfig.RegisterFlags("recoverer")
+}
+
+// The Recoverer reboots machines whose agent has stopped sending heartbeats or
+// has not sent any heartbeats at all.
+type Recoverer struct {
+	RecovererConfig
+	r shepherd.Recoverer
+}
+
+func NewRecoverer(r shepherd.Recoverer, rc RecovererConfig) (*Recoverer, error) {
+	if err := rc.ControlLoopConfig.Check(); err != nil {
+		return nil, err
+	}
+	return &Recoverer{
+		RecovererConfig: rc,
+		r:               r,
+	}, nil
+}
+
+func (r *Recoverer) getProcessInfo() processInfo {
+	return processInfo{
+		process: model.ProcessShepherdRecovery,
+		defaultBackoff: bmdb.Backoff{
+			Initial:  1 * time.Minute,
+			Maximum:  1 * time.Hour,
+			Exponent: 1.2,
+		},
+		processor: metrics.ProcessorShepherdRecoverer,
+	}
+}
+
+func (r *Recoverer) getMachines(ctx context.Context, q *model.Queries, limit int32) ([]model.MachineProvided, error) {
+	return q.GetMachineForAgentRecovery(ctx, model.GetMachineForAgentRecoveryParams{
+		Limit:    limit,
+		Provider: r.r.Type(),
+	})
+}
+
+func (r *Recoverer) processMachine(ctx context.Context, t *task) error {
+	klog.Infof("Starting recovery of machine (ID: %s, PID %s)", t.machine.MachineID, t.machine.ProviderID)
+
+	if err := r.r.RebootMachine(ctx, shepherd.ProviderID(t.machine.ProviderID)); err != nil {
+		return fmt.Errorf("failed to reboot machine: %w", err)
+	}
+
+	klog.Infof("Removing AgentStarted/AgentHeartbeat (ID: %s, PID: %s)...", t.machine.MachineID, t.machine.ProviderID)
+	err := t.work.Finish(ctx, func(q *model.Queries) error {
+		if err := q.MachineDeleteAgentStarted(ctx, t.machine.MachineID); err != nil {
+			return fmt.Errorf("while deleting AgentStarted: %w", err)
+		}
+		if err := q.MachineDeleteAgentHeartbeat(ctx, t.machine.MachineID); err != nil {
+			return fmt.Errorf("while deleting AgentHeartbeat: %w", err)
+		}
+		return nil
+	})
+	if err != nil {
+		return fmt.Errorf("while deleting AgentStarted/AgentHeartbeat tags: %w", err)
+	}
+	return nil
+}