cloud: split shepherd up
Change-Id: I8e386d9eaaf17543743e1e8a37a8d71426910d59
Reviewed-on: https://review.monogon.dev/c/monogon/+/2213
Reviewed-by: Serge Bazanski <serge@monogon.tech>
Tested-by: Jenkins CI
diff --git a/cloud/shepherd/provider/equinix/BUILD.bazel b/cloud/shepherd/provider/equinix/BUILD.bazel
new file mode 100644
index 0000000..3363d7f
--- /dev/null
+++ b/cloud/shepherd/provider/equinix/BUILD.bazel
@@ -0,0 +1,77 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library", "go_test")
+load("@io_bazel_rules_docker//container:container.bzl", "container_image")
+load("//build/static_binary_tarball:def.bzl", "static_binary_tarball")
+
+go_library(
+ name = "equinix_lib",
+ srcs = [
+ "main.go",
+ "provider.go",
+ "provider_config.go",
+ "updater.go",
+ ],
+ importpath = "source.monogon.dev/cloud/shepherd/provider/equinix",
+ visibility = ["//visibility:private"],
+ deps = [
+ "//cloud/bmaas/bmdb",
+ "//cloud/bmaas/bmdb/metrics",
+ "//cloud/bmaas/bmdb/model",
+ "//cloud/bmaas/bmdb/webug",
+ "//cloud/equinix/wrapngo",
+ "//cloud/lib/component",
+ "//cloud/lib/sinbin",
+ "//cloud/shepherd",
+ "//cloud/shepherd/manager",
+ "//metropolis/cli/pkg/context",
+ "@com_github_packethost_packngo//:packngo",
+ "@io_k8s_klog_v2//:klog",
+ "@org_golang_x_crypto//ssh",
+ ],
+)
+
+go_test(
+ name = "equinix_test",
+ srcs = [
+ "fakequinix_test.go",
+ "initializer_test.go",
+ "provisioner_test.go",
+ "recoverer_test.go",
+ "updater_test.go",
+ ],
+ data = [
+ "@cockroach",
+ ],
+ embed = [":equinix_lib"],
+ deps = [
+ "//cloud/bmaas/bmdb",
+ "//cloud/bmaas/bmdb/model",
+ "//cloud/lib/component",
+ "//cloud/shepherd/manager",
+ "@com_github_google_uuid//:uuid",
+ "@com_github_packethost_packngo//:packngo",
+ "@org_golang_x_time//rate",
+ ],
+)
+
+go_binary(
+ name = "equinix",
+ embed = [":equinix_lib"],
+ visibility = ["//visibility:public"],
+)
+
+static_binary_tarball(
+ name = "equinix_layer",
+ executable = ":equinix",
+)
+
+container_image(
+ name = "equinix_container",
+ base = "@go_image_base//image",
+ entrypoint = ["/app/cloud/shepherd/provider/equinix/equinix_/equinix"],
+ tars = [
+ ":equinix_layer",
+ "//cloud/takeover:takeover_layer",
+ ],
+ visibility = ["//visibility:public"],
+ workdir = "/app",
+)
diff --git a/cloud/shepherd/provider/equinix/fakequinix_test.go b/cloud/shepherd/provider/equinix/fakequinix_test.go
new file mode 100644
index 0000000..bd0df4a
--- /dev/null
+++ b/cloud/shepherd/provider/equinix/fakequinix_test.go
@@ -0,0 +1,221 @@
+package main
+
+import (
+ "context"
+ "fmt"
+ "net/http"
+ "sync"
+
+ "github.com/google/uuid"
+ "github.com/packethost/packngo"
+)
+
+// fakequinix implements a wrapngo.Client for testing. It starts out with a
+// number of made up hardware reservations, and allows for creating devices and
+// SSH keys.
+type fakequinix struct {
+ mu sync.Mutex
+
+ pid string
+ devices map[string]*packngo.Device
+ reservations map[string]*packngo.HardwareReservation
+ sshKeys map[string]*packngo.SSHKey
+ reboots map[string]int
+}
+
+// newFakequinix makes a fakequinix with a given fake project ID and number of
+// hardware reservations to create.
+func newFakequinix(pid string, numReservations int) *fakequinix {
+ f := fakequinix{
+ pid: pid,
+ devices: make(map[string]*packngo.Device),
+ reservations: make(map[string]*packngo.HardwareReservation),
+ sshKeys: make(map[string]*packngo.SSHKey),
+ reboots: make(map[string]int),
+ }
+
+ for i := 0; i < numReservations; i++ {
+ uid := uuid.New()
+ f.reservations[uid.String()] = &packngo.HardwareReservation{
+ ID: uid.String(),
+ ShortID: uid.String(),
+ Provisionable: true,
+ }
+ }
+
+ return &f
+}
+
+func (f *fakequinix) notFound() error {
+ return &packngo.ErrorResponse{
+ Response: &http.Response{
+ StatusCode: http.StatusNotFound,
+ },
+ }
+}
+
+func (f *fakequinix) GetDevice(_ context.Context, pid, did string, _ *packngo.ListOptions) (*packngo.Device, error) {
+ f.mu.Lock()
+ defer f.mu.Unlock()
+
+ val := f.devices[did]
+ if val == nil {
+ return nil, f.notFound()
+ }
+ return val, nil
+}
+
+func (f *fakequinix) ListDevices(_ context.Context, pid string) ([]packngo.Device, error) {
+ f.mu.Lock()
+ defer f.mu.Unlock()
+
+ if pid != f.pid {
+ return nil, nil
+ }
+ var res []packngo.Device
+ for _, dev := range f.devices {
+ res = append(res, *dev)
+ }
+ return res, nil
+}
+
+func (f *fakequinix) UpdateDevice(ctx context.Context, id string, r *packngo.DeviceUpdateRequest) (*packngo.Device, error) {
+ return nil, fmt.Errorf("not implemented")
+}
+
+// MoveReservation is not implemented in fakequinix
+func (f *fakequinix) MoveReservation(_ context.Context, hardwareReservationDID, projectID string) (*packngo.HardwareReservation, error) {
+ return nil, &packngo.ErrorResponse{
+ Response: &http.Response{
+ StatusCode: http.StatusNotImplemented,
+ },
+ }
+}
+
+func (f *fakequinix) DeleteDevice(_ context.Context, id string) error {
+ f.mu.Lock()
+ defer f.mu.Unlock()
+
+ if _, ok := f.devices[id]; !ok {
+ return f.notFound()
+ }
+
+ delete(f.devices, id)
+
+ return nil
+}
+
+func (f *fakequinix) CreateDevice(_ context.Context, request *packngo.DeviceCreateRequest) (*packngo.Device, error) {
+ f.mu.Lock()
+ defer f.mu.Unlock()
+
+ rid := request.HardwareReservationID
+ res := f.reservations[rid]
+ if res == nil {
+ return nil, f.notFound()
+ }
+ if res.Device != nil {
+ return nil, f.notFound()
+ }
+
+ dev := &packngo.Device{
+ ID: uuid.New().String(),
+ State: "active",
+ HardwareReservation: &packngo.HardwareReservation{
+ ID: rid,
+ },
+ Network: []*packngo.IPAddressAssignment{
+ {
+ IpAddressCommon: packngo.IpAddressCommon{
+ Public: true,
+ Address: "1.2.3.4",
+ },
+ },
+ },
+ Facility: &packngo.Facility{
+ Code: "wad",
+ },
+ Hostname: request.Hostname,
+ OS: &packngo.OS{
+ Name: request.OS,
+ Slug: request.OS,
+ },
+ }
+ res.Device = dev
+ res.Provisionable = false
+
+ f.devices[dev.ID] = dev
+ return dev, nil
+}
+
+func (f *fakequinix) ListReservations(_ context.Context, pid string) ([]packngo.HardwareReservation, error) {
+ f.mu.Lock()
+ defer f.mu.Unlock()
+
+ var res []packngo.HardwareReservation
+ for _, r := range f.reservations {
+ res = append(res, *r)
+ }
+
+ return res, nil
+}
+
+func (f *fakequinix) ListSSHKeys(_ context.Context) ([]packngo.SSHKey, error) {
+ f.mu.Lock()
+ defer f.mu.Unlock()
+
+ var res []packngo.SSHKey
+ for _, key := range f.sshKeys {
+ res = append(res, *key)
+ }
+
+ return res, nil
+}
+
+func (f *fakequinix) CreateSSHKey(_ context.Context, req *packngo.SSHKeyCreateRequest) (*packngo.SSHKey, error) {
+ f.mu.Lock()
+ defer f.mu.Unlock()
+
+ for _, k := range f.sshKeys {
+ if k.Key == req.Key {
+ return nil, f.notFound()
+ }
+ if k.Label == req.Label {
+ return nil, f.notFound()
+ }
+ }
+
+ uid := uuid.New().String()
+ f.sshKeys[uid] = &packngo.SSHKey{
+ ID: uid,
+ Label: req.Label,
+ Key: req.Key,
+ }
+
+ return f.sshKeys[uid], nil
+}
+
+func (f *fakequinix) UpdateSSHKey(_ context.Context, kid string, req *packngo.SSHKeyUpdateRequest) (*packngo.SSHKey, error) {
+ f.mu.Lock()
+ defer f.mu.Unlock()
+
+ key := f.sshKeys[kid]
+ if key == nil {
+ return nil, f.notFound()
+ }
+ key.Key = *req.Key
+
+ return key, nil
+}
+
+func (f *fakequinix) RebootDevice(_ context.Context, did string) error {
+ f.mu.Lock()
+ defer f.mu.Unlock()
+
+ f.reboots[did]++
+
+ return nil
+}
+
+func (f *fakequinix) Close() {
+}
diff --git a/cloud/shepherd/provider/equinix/initializer_test.go b/cloud/shepherd/provider/equinix/initializer_test.go
new file mode 100644
index 0000000..3100ad2
--- /dev/null
+++ b/cloud/shepherd/provider/equinix/initializer_test.go
@@ -0,0 +1,169 @@
+package main
+
+import (
+ "context"
+ "crypto/ed25519"
+ "crypto/rand"
+ "fmt"
+ "testing"
+ "time"
+
+ "github.com/packethost/packngo"
+ "golang.org/x/time/rate"
+
+ "source.monogon.dev/cloud/bmaas/bmdb"
+ "source.monogon.dev/cloud/bmaas/bmdb/model"
+ "source.monogon.dev/cloud/lib/component"
+ "source.monogon.dev/cloud/shepherd/manager"
+)
+
+type initializerDut struct {
+ f *fakequinix
+ i *manager.Initializer
+ bmdb *bmdb.Connection
+ ctx context.Context
+ provider *equinixProvider
+}
+
+func newInitializerDut(t *testing.T) *initializerDut {
+ t.Helper()
+
+ sc := providerConfig{
+ ProjectId: "noproject",
+ KeyLabel: "somekey",
+ DevicePrefix: "test-",
+ }
+ _, key, _ := ed25519.GenerateKey(rand.Reader)
+ k := manager.SSHKey{
+ Key: key,
+ }
+
+ f := newFakequinix(sc.ProjectId, 100)
+ provider, err := sc.New(&k, f)
+ if err != nil {
+ t.Fatalf("Could not create Provider: %v", err)
+ }
+
+ ic := manager.InitializerConfig{
+ ControlLoopConfig: manager.ControlLoopConfig{
+ DBQueryLimiter: rate.NewLimiter(rate.Every(time.Second), 10),
+ },
+ Executable: []byte("beep boop i'm a real program"),
+ TargetPath: "/fake/path",
+ Endpoint: "example.com:1234",
+ SSHConnectTimeout: time.Second,
+ SSHExecTimeout: time.Second,
+ }
+
+ i, err := manager.NewInitializer(provider, &manager.FakeSSHClient{}, ic)
+ if err != nil {
+ t.Fatalf("Could not create Initializer: %v", err)
+ }
+
+ b := bmdb.BMDB{
+ Config: bmdb.Config{
+ Database: component.CockroachConfig{
+ InMemory: true,
+ },
+ ComponentName: "test",
+ RuntimeInfo: "test",
+ },
+ }
+ conn, err := b.Open(true)
+ if err != nil {
+ t.Fatalf("Could not create in-memory BMDB: %v", err)
+ }
+
+ ctx, ctxC := context.WithCancel(context.Background())
+ t.Cleanup(ctxC)
+
+ if err := provider.SSHEquinixEnsure(ctx); err != nil {
+ t.Fatalf("Failed to ensure SSH key: %v", err)
+ }
+ go manager.RunControlLoop(ctx, conn, i)
+
+ return &initializerDut{
+ f: f,
+ i: i,
+ bmdb: conn,
+ ctx: ctx,
+ provider: provider,
+ }
+}
+
+// TestInitializerSmokes makes sure the Initializer doesn't go up in flames on
+// the happy path.
+func TestInitializerSmokes(t *testing.T) {
+ dut := newInitializerDut(t)
+ f := dut.f
+ ctx := dut.ctx
+ conn := dut.bmdb
+
+ reservations, _ := f.ListReservations(ctx, f.pid)
+ kid, err := dut.provider.sshEquinixId(ctx)
+ if err != nil {
+ t.Fatalf("Failed to retrieve equinix key ID: %v", err)
+ }
+ sess, err := conn.StartSession(ctx)
+ if err != nil {
+ t.Fatalf("Failed to create BMDB session for verifiaction: %v", err)
+ }
+
+ // Create 10 provided machines for testing.
+ for i := 0; i < 10; i++ {
+ res := reservations[i]
+ dev, _ := f.CreateDevice(ctx, &packngo.DeviceCreateRequest{
+ Hostname: fmt.Sprintf("test-%d", i),
+ OS: "fake",
+ ProjectID: f.pid,
+ HardwareReservationID: res.ID,
+ ProjectSSHKeys: []string{kid},
+ })
+ f.devices[dev.ID].Network = []*packngo.IPAddressAssignment{
+ {
+ IpAddressCommon: packngo.IpAddressCommon{
+ ID: "fake",
+ Address: "1.2.3.4",
+ Management: true,
+ AddressFamily: 4,
+ Public: true,
+ },
+ },
+ }
+ err = sess.Transact(ctx, func(q *model.Queries) error {
+ machine, err := q.NewMachine(ctx)
+ if err != nil {
+ return err
+ }
+ return q.MachineAddProvided(ctx, model.MachineAddProvidedParams{
+ MachineID: machine.MachineID,
+ Provider: model.ProviderEquinix,
+ ProviderID: dev.ID,
+ })
+ })
+ if err != nil {
+ t.Fatalf("Failed to create BMDB machine: %v", err)
+ }
+ }
+
+ // Expect to find 0 machines needing start.
+ for {
+ time.Sleep(100 * time.Millisecond)
+
+ var machines []model.MachineProvided
+ err = sess.Transact(ctx, func(q *model.Queries) error {
+ var err error
+ machines, err = q.GetMachinesForAgentStart(ctx, model.GetMachinesForAgentStartParams{
+ Limit: 100,
+ Provider: model.ProviderEquinix,
+ })
+ return err
+ })
+ if err != nil {
+ t.Fatalf("Failed to run Transaction: %v", err)
+ }
+ if len(machines) == 0 {
+ break
+ }
+ }
+}
diff --git a/cloud/shepherd/provider/equinix/main.go b/cloud/shepherd/provider/equinix/main.go
new file mode 100644
index 0000000..3a402e8
--- /dev/null
+++ b/cloud/shepherd/provider/equinix/main.go
@@ -0,0 +1,154 @@
+package main
+
+import (
+ "context"
+ "flag"
+ "fmt"
+ "os"
+
+ "golang.org/x/crypto/ssh"
+ "k8s.io/klog/v2"
+
+ "source.monogon.dev/cloud/bmaas/bmdb"
+ "source.monogon.dev/cloud/bmaas/bmdb/webug"
+ "source.monogon.dev/cloud/equinix/wrapngo"
+ "source.monogon.dev/cloud/lib/component"
+ "source.monogon.dev/cloud/shepherd/manager"
+ clicontext "source.monogon.dev/metropolis/cli/pkg/context"
+)
+
+type Config struct {
+ Component component.ComponentConfig
+ BMDB bmdb.BMDB
+ WebugConfig webug.Config
+
+ SSHKey manager.SSHKey
+ InitializerConfig manager.InitializerConfig
+ ProvisionerConfig manager.ProvisionerConfig
+ RecovererConfig manager.RecovererConfig
+
+ API wrapngo.Opts
+ Provider providerConfig
+ UpdaterConfig UpdaterConfig
+}
+
+// TODO(q3k): factor this out to BMDB library?
+func runtimeInfo() string {
+ hostname, _ := os.Hostname()
+ if hostname == "" {
+ hostname = "UNKNOWN"
+ }
+ return fmt.Sprintf("host %s", hostname)
+}
+
+func (c *Config) RegisterFlags() {
+ c.Component.RegisterFlags("shepherd")
+ c.BMDB.ComponentName = "shepherd-equinix"
+ c.BMDB.RuntimeInfo = runtimeInfo()
+ c.BMDB.Database.RegisterFlags("bmdb")
+ c.WebugConfig.RegisterFlags()
+
+ c.SSHKey.RegisterFlags()
+ c.InitializerConfig.RegisterFlags()
+ c.ProvisionerConfig.RegisterFlags()
+ c.RecovererConfig.RegisterFlags()
+
+ c.API.RegisterFlags()
+ c.Provider.RegisterFlags()
+ c.UpdaterConfig.RegisterFlags()
+}
+
+func main() {
+ var c Config
+ c.RegisterFlags()
+
+ flag.Parse()
+ if flag.NArg() > 0 {
+ klog.Exitf("unexpected positional arguments: %v", flag.Args())
+ }
+
+ registry := c.Component.PrometheusRegistry()
+ c.BMDB.EnableMetrics(registry)
+
+ ctx := clicontext.WithInterrupt(context.Background())
+ c.Component.StartPrometheus(ctx)
+
+ if c.API.APIKey == "" || c.API.User == "" {
+ klog.Exitf("-equinix_api_username and -equinix_api_key must be set")
+ }
+ c.API.MetricsRegistry = registry
+ api := wrapngo.New(&c.API)
+
+ provider, err := c.Provider.New(&c.SSHKey, api)
+ if err != nil {
+ klog.Exitf("%v", err)
+ }
+
+ sshSigner, err := c.SSHKey.Signer()
+ if err != nil {
+ klog.Exitf("%v", err)
+ }
+
+ sshClient := &manager.PlainSSHClient{
+ AuthMethod: ssh.PublicKeys(sshSigner),
+ // Equinix OS installations always use root.
+ Username: "root",
+ }
+
+ provisioner, err := manager.NewProvisioner(provider, c.ProvisionerConfig)
+ if err != nil {
+ klog.Exitf("%v", err)
+ }
+
+ initializer, err := manager.NewInitializer(provider, sshClient, c.InitializerConfig)
+ if err != nil {
+ klog.Exitf("%v", err)
+ }
+
+ recoverer, err := manager.NewRecoverer(provider, c.RecovererConfig)
+ if err != nil {
+ klog.Exitf("%v", err)
+ }
+
+ updater, err := c.UpdaterConfig.New(api)
+ if err != nil {
+ klog.Exitf("%v", err)
+ }
+
+ conn, err := c.BMDB.Open(true)
+ if err != nil {
+ klog.Exitf("Failed to open BMDB connection: %v", err)
+ }
+
+ go func() {
+ err = provisioner.Run(ctx, conn)
+ if err != nil {
+ klog.Exit(err)
+ }
+ }()
+ go func() {
+ err = manager.RunControlLoop(ctx, conn, initializer)
+ if err != nil {
+ klog.Exit(err)
+ }
+ }()
+ go func() {
+ err = manager.RunControlLoop(ctx, conn, recoverer)
+ if err != nil {
+ klog.Exit(err)
+ }
+ }()
+ go func() {
+ err = updater.Run(ctx, conn)
+ if err != nil {
+ klog.Exit(err)
+ }
+ }()
+ go func() {
+ if err := c.WebugConfig.Start(ctx, conn); err != nil && err != ctx.Err() {
+ klog.Exitf("Failed to start webug: %v", err)
+ }
+ }()
+
+ <-ctx.Done()
+}
diff --git a/cloud/shepherd/provider/equinix/provider.go b/cloud/shepherd/provider/equinix/provider.go
new file mode 100644
index 0000000..edc8f3f
--- /dev/null
+++ b/cloud/shepherd/provider/equinix/provider.go
@@ -0,0 +1,369 @@
+package main
+
+import (
+ "context"
+ "fmt"
+ "net/netip"
+ "slices"
+ "strings"
+ "time"
+
+ "github.com/packethost/packngo"
+ "k8s.io/klog/v2"
+
+ "source.monogon.dev/cloud/bmaas/bmdb"
+ "source.monogon.dev/cloud/bmaas/bmdb/model"
+ "source.monogon.dev/cloud/equinix/wrapngo"
+ "source.monogon.dev/cloud/lib/sinbin"
+ "source.monogon.dev/cloud/shepherd"
+ "source.monogon.dev/cloud/shepherd/manager"
+)
+
+type equinixProvider struct {
+ config *providerConfig
+ api wrapngo.Client
+ sshKey *manager.SSHKey
+
+ // badReservations is a holiday resort for Equinix hardware reservations which
+ // failed to be provisioned for some reason or another. We keep a list of them in
+ // memory just so that we don't repeatedly try to provision the same known bad
+ // machines.
+ badReservations sinbin.Sinbin[string]
+
+ reservationDeadline time.Time
+ reservationCache []packngo.HardwareReservation
+}
+
+func (ep *equinixProvider) RebootMachine(ctx context.Context, id shepherd.ProviderID) error {
+ if err := ep.api.RebootDevice(ctx, string(id)); err != nil {
+ return fmt.Errorf("failed to reboot device: %w", err)
+ }
+
+ // TODO(issue/215): replace this
+ // This is required as Equinix doesn't reboot the machines synchronously
+ // during the API call.
+ select {
+ case <-time.After(time.Duration(ep.config.RebootWaitSeconds) * time.Second):
+ case <-ctx.Done():
+ return fmt.Errorf("while waiting for reboot: %w", ctx.Err())
+ }
+ return nil
+}
+
+func (ep *equinixProvider) ReinstallMachine(ctx context.Context, id shepherd.ProviderID) error {
+ return shepherd.ErrNotImplemented
+}
+
+func (ep *equinixProvider) GetMachine(ctx context.Context, id shepherd.ProviderID) (shepherd.Machine, error) {
+ machines, err := ep.ListMachines(ctx)
+ if err != nil {
+ return nil, err
+ }
+
+ for _, machine := range machines {
+ if machine.ID() == id {
+ return machine, nil
+ }
+ }
+
+ return nil, shepherd.ErrMachineNotFound
+}
+
+func (ep *equinixProvider) ListMachines(ctx context.Context) ([]shepherd.Machine, error) {
+ if ep.reservationDeadline.Before(time.Now()) {
+ reservations, err := ep.listReservations(ctx)
+ if err != nil {
+ return nil, err
+ }
+ ep.reservationCache = reservations
+ ep.reservationDeadline = time.Now().Add(ep.config.ReservationCacheTimeout)
+ }
+
+ devices, err := ep.managedDevices(ctx)
+ if err != nil {
+ return nil, err
+ }
+
+ machines := make([]shepherd.Machine, 0, len(ep.reservationCache)+len(devices))
+ for _, device := range devices {
+ machines = append(machines, &machine{device})
+ }
+
+ for _, res := range ep.reservationCache {
+ machines = append(machines, reservation{res})
+ }
+
+ return machines, nil
+}
+
+func (ep *equinixProvider) CreateMachine(ctx context.Context, session *bmdb.Session, request shepherd.CreateMachineRequest) (shepherd.Machine, error) {
+ if request.UnusedMachine == nil {
+ return nil, fmt.Errorf("parameter UnusedMachine is missing")
+ }
+
+ //TODO: Do we just trust the implementation to be correct?
+ res, ok := request.UnusedMachine.(reservation)
+ if !ok {
+ return nil, fmt.Errorf("invalid type for parameter UnusedMachine")
+ }
+
+ d, err := ep.provision(ctx, session, res.HardwareReservation)
+ if err != nil {
+ klog.Errorf("Failed to provision reservation %s: %v", res.HardwareReservation.ID, err)
+ until := time.Now().Add(time.Hour)
+ klog.Errorf("Adding hardware reservation %s to sinbin until %s", res.HardwareReservation.ID, until)
+ ep.badReservations.Add(res.HardwareReservation.ID, until)
+ return nil, err
+ }
+
+ return &machine{*d}, nil
+}
+
+func (ep *equinixProvider) Type() model.Provider {
+ return model.ProviderEquinix
+}
+
+type reservation struct {
+ packngo.HardwareReservation
+}
+
+func (e reservation) ID() shepherd.ProviderID {
+ return shepherd.InvalidProviderID
+}
+
+func (e reservation) Addr() netip.Addr {
+ return netip.Addr{}
+}
+
+func (e reservation) State() shepherd.State {
+ return shepherd.StateKnownUnused
+}
+
+type machine struct {
+ packngo.Device
+}
+
+func (e *machine) ID() shepherd.ProviderID {
+ return shepherd.ProviderID(e.Device.ID)
+}
+
+func (e *machine) Addr() netip.Addr {
+ ni := e.GetNetworkInfo()
+
+ var addr string
+ if ni.PublicIPv4 != "" {
+ addr = ni.PublicIPv4
+ } else if ni.PublicIPv6 != "" {
+ addr = ni.PublicIPv6
+ } else {
+ klog.Errorf("missing address for machine: %v", e.ID())
+ return netip.Addr{}
+ }
+
+ a, err := netip.ParseAddr(addr)
+ if err != nil {
+ klog.Errorf("failed parsing address %q: %v", addr, err)
+ return netip.Addr{}
+ }
+
+ return a
+}
+
+func (e *machine) State() shepherd.State {
+ return shepherd.StateKnownUsed
+}
+
+// listReservations doesn't lock the mutex and expects the caller to lock.
+func (ep *equinixProvider) listReservations(ctx context.Context) ([]packngo.HardwareReservation, error) {
+ klog.Infof("Retrieving hardware reservations, this will take a while...")
+ reservations, err := ep.api.ListReservations(ctx, ep.config.ProjectId)
+ if err != nil {
+ return nil, fmt.Errorf("failed to list reservations: %w", err)
+ }
+
+ var available []packngo.HardwareReservation
+ var inUse, notProvisionable, penalized int
+ for _, reservation := range reservations {
+ if reservation.Device != nil {
+ inUse++
+ continue
+ }
+ if !reservation.Provisionable {
+ notProvisionable++
+ continue
+ }
+ if ep.badReservations.Penalized(reservation.ID) {
+ penalized++
+ continue
+ }
+ available = append(available, reservation)
+ }
+ klog.Infof("Retrieved hardware reservations: %d (total), %d (available), %d (in use), %d (not provisionable), %d (penalized)", len(reservations), len(available), inUse, notProvisionable, penalized)
+
+ return available, nil
+}
+
+// provision attempts to create a device within Equinix using given Hardware
+// Reservation rsv. The resulting device is registered with BMDB, and tagged as
+// "provided" in the process.
+func (ep *equinixProvider) provision(ctx context.Context, sess *bmdb.Session, rsv packngo.HardwareReservation) (*packngo.Device, error) {
+ klog.Infof("Creating a new device using reservation ID %s.", rsv.ID)
+ hostname := ep.config.DevicePrefix + rsv.ID[:18]
+ kid, err := ep.sshEquinixId(ctx)
+ if err != nil {
+ return nil, err
+ }
+ req := &packngo.DeviceCreateRequest{
+ Hostname: hostname,
+ OS: ep.config.OS,
+ Plan: rsv.Plan.Slug,
+ ProjectID: ep.config.ProjectId,
+ HardwareReservationID: rsv.ID,
+ ProjectSSHKeys: []string{kid},
+ }
+ if ep.config.UseProjectKeys {
+ klog.Warningf("INSECURE: Machines will be created with ALL PROJECT SSH KEYS!")
+ req.ProjectSSHKeys = nil
+ }
+
+ nd, err := ep.api.CreateDevice(ctx, req)
+ if err != nil {
+ return nil, fmt.Errorf("while creating new device within Equinix: %w", err)
+ }
+ klog.Infof("Created a new device within Equinix (RID: %s, PID: %s, HOST: %s)", rsv.ID, nd.ID, hostname)
+
+ slices.DeleteFunc(ep.reservationCache, func(v packngo.HardwareReservation) bool {
+ return rsv.ID == v.ID
+ })
+
+ err = ep.assimilate(ctx, sess, nd.ID)
+ if err != nil {
+ // TODO(serge@monogon.tech) at this point the device at Equinix isn't
+ // matched by a BMDB record. Schedule device deletion or make sure this
+ // case is being handled elsewhere.
+ return nil, err
+ }
+ return nd, nil
+}
+
+// assimilate brings in an already existing machine from Equinix into the BMDB.
+// This is only used in manual testing.
+func (ep *equinixProvider) assimilate(ctx context.Context, sess *bmdb.Session, deviceID string) error {
+ return sess.Transact(ctx, func(q *model.Queries) error {
+ // Create a new machine record within BMDB.
+ m, err := q.NewMachine(ctx)
+ if err != nil {
+ return fmt.Errorf("while creating a new machine record in BMDB: %w", err)
+ }
+
+ // Link the new machine with the Equinix device, and tag it "provided".
+ p := model.MachineAddProvidedParams{
+ MachineID: m.MachineID,
+ ProviderID: deviceID,
+ Provider: model.ProviderEquinix,
+ }
+ klog.Infof("Setting \"provided\" tag (ID: %s, PID: %s, Provider: %s).", p.MachineID, p.ProviderID, p.Provider)
+ if err := q.MachineAddProvided(ctx, p); err != nil {
+ return fmt.Errorf("while tagging machine active: %w", err)
+ }
+ return nil
+ })
+}
+
+// sshEquinixGet looks up the Equinix key matching providerConfig.KeyLabel,
+// returning its packngo.SSHKey instance.
+func (ep *equinixProvider) sshEquinix(ctx context.Context) (*packngo.SSHKey, error) {
+ ks, err := ep.api.ListSSHKeys(ctx)
+ if err != nil {
+ return nil, fmt.Errorf("while listing SSH keys: %w", err)
+ }
+
+ for _, k := range ks {
+ if k.Label == ep.config.KeyLabel {
+ return &k, nil
+ }
+ }
+ return nil, NoSuchKey
+}
+
+// sshEquinixId looks up the Equinix key identified by providerConfig.KeyLabel,
+// returning its Equinix-assigned UUID.
+func (ep *equinixProvider) sshEquinixId(ctx context.Context) (string, error) {
+ k, err := ep.sshEquinix(ctx)
+ if err != nil {
+ return "", err
+ }
+ return k.ID, nil
+}
+
+// sshEquinixUpdate makes sure the existing SSH key registered with Equinix
+// matches the one from sshPub.
+func (ep *equinixProvider) sshEquinixUpdate(ctx context.Context, kid string) error {
+ pub, err := ep.sshKey.PublicKey()
+ if err != nil {
+ return err
+ }
+ _, err = ep.api.UpdateSSHKey(ctx, kid, &packngo.SSHKeyUpdateRequest{
+ Key: &pub,
+ })
+ if err != nil {
+ return fmt.Errorf("while updating the SSH key: %w", err)
+ }
+ return nil
+}
+
+// sshEquinixUpload registers a new SSH key from sshPub.
+func (ep *equinixProvider) sshEquinixUpload(ctx context.Context) error {
+ pub, err := ep.sshKey.PublicKey()
+ if err != nil {
+ return fmt.Errorf("while generating public key: %w", err)
+ }
+ _, err = ep.api.CreateSSHKey(ctx, &packngo.SSHKeyCreateRequest{
+ Label: ep.config.KeyLabel,
+ Key: pub,
+ ProjectID: ep.config.ProjectId,
+ })
+ if err != nil {
+ return fmt.Errorf("while creating an SSH key: %w", err)
+ }
+ return nil
+}
+
+// SSHEquinixEnsure initializes the locally managed SSH key (from a persistence
+// path or explicitly set key) and updates or uploads it to Equinix. The key is
+// generated as needed The key is generated as needed
+func (ep *equinixProvider) SSHEquinixEnsure(ctx context.Context) error {
+ k, err := ep.sshEquinix(ctx)
+ switch err {
+ case NoSuchKey:
+ if err := ep.sshEquinixUpload(ctx); err != nil {
+ return fmt.Errorf("while uploading key: %w", err)
+ }
+ return nil
+ case nil:
+ if err := ep.sshEquinixUpdate(ctx, k.ID); err != nil {
+ return fmt.Errorf("while updating key: %w", err)
+ }
+ return nil
+ default:
+ return err
+ }
+}
+
+// managedDevices provides a map of device provider IDs to matching
+// packngo.Device instances. It calls Equinix API's ListDevices. The returned
+// devices are filtered according to DevicePrefix provided through Opts. The
+// returned error value, if not nil, will originate in wrapngo.
+func (ep *equinixProvider) managedDevices(ctx context.Context) (map[string]packngo.Device, error) {
+ ds, err := ep.api.ListDevices(ctx, ep.config.ProjectId)
+ if err != nil {
+ return nil, err
+ }
+ dm := map[string]packngo.Device{}
+ for _, d := range ds {
+ if strings.HasPrefix(d.Hostname, ep.config.DevicePrefix) {
+ dm[d.ID] = d
+ }
+ }
+ return dm, nil
+}
diff --git a/cloud/shepherd/provider/equinix/provider_config.go b/cloud/shepherd/provider/equinix/provider_config.go
new file mode 100644
index 0000000..be3bc27
--- /dev/null
+++ b/cloud/shepherd/provider/equinix/provider_config.go
@@ -0,0 +1,97 @@
+package main
+
+import (
+ "errors"
+ "flag"
+ "fmt"
+ "strings"
+ "time"
+
+ "source.monogon.dev/cloud/equinix/wrapngo"
+ "source.monogon.dev/cloud/shepherd/manager"
+)
+
+var (
+ NoSuchKey = errors.New("no such key")
+)
+
+// providerConfig contains configuration options used by both the Initializer and
+// Provisioner components of the Shepherd. In CLI scenarios, RegisterFlags should
+// be called to configure this struct from CLI flags. Otherwise, this structure
+// should be explicitly configured, as the default values are not valid.
+type providerConfig struct {
+ // ProjectId is the Equinix project UUID used by the manager. See Equinix API
+ // documentation for details. Must be set.
+ ProjectId string
+
+ // KeyLabel specifies the ID to use when handling the Equinix-registered SSH
+ // key used to authenticate to newly created servers. Must be set.
+ KeyLabel string
+
+ // DevicePrefix applied to all devices (machines) created by the Provisioner,
+ // and used by the Provisioner to identify machines which it managed.
+ // Must be set.
+ DevicePrefix string
+
+ // OS defines the operating system new devices are created with. Its format
+ // is specified by Equinix API.
+ OS string
+
+ // UseProjectKeys defines if the provisioner adds all ssh keys defined inside
+ // the used project to every new machine. This is only used for debug purposes.
+ UseProjectKeys bool
+
+ // RebootWaitSeconds defines how many seconds to sleep after a reboot call
+ // to ensure a reboot actually happened.
+ RebootWaitSeconds int
+
+ // ReservationCacheTimeout defines how after which time the reservations should be
+ // refreshed.
+ ReservationCacheTimeout time.Duration
+}
+
+func (pc *providerConfig) check() error {
+ if pc.ProjectId == "" {
+ return fmt.Errorf("-equinix_project_id must be set")
+ }
+ if pc.KeyLabel == "" {
+ return fmt.Errorf("-equinix_ssh_key_label must be set")
+ }
+ if pc.DevicePrefix == "" {
+ return fmt.Errorf("-equinix_device_prefix must be set")
+ }
+
+ // These variables are _very_ important to configure correctly, otherwise someone
+ // running this locally with prod creds will actually destroy production
+ // data.
+ if strings.Contains(pc.KeyLabel, "FIXME") {
+ return fmt.Errorf("refusing to run with -equinix_ssh_key_label %q, please set it to something unique", pc.KeyLabel)
+ }
+ if strings.Contains(pc.DevicePrefix, "FIXME") {
+ return fmt.Errorf("refusing to run with -equinix_device_prefix %q, please set it to something unique", pc.DevicePrefix)
+ }
+
+ return nil
+}
+
+func (pc *providerConfig) RegisterFlags() {
+ flag.StringVar(&pc.ProjectId, "equinix_project_id", "", "Equinix project ID where resources will be managed")
+ flag.StringVar(&pc.KeyLabel, "equinix_ssh_key_label", "shepherd-FIXME", "Label used to identify managed SSH key in Equinix project")
+ flag.StringVar(&pc.DevicePrefix, "equinix_device_prefix", "shepherd-FIXME-", "Prefix applied to all devices (machines) in Equinix project, used to identify managed machines")
+ flag.StringVar(&pc.OS, "equinix_os", "ubuntu_20_04", "OS that provisioner will deploy on Equinix machines. Not the target OS for cluster customers.")
+ flag.BoolVar(&pc.UseProjectKeys, "equinix_use_project_keys", false, "Add all Equinix project keys to newly provisioned machines, not just the provisioner's managed key. Debug/development only.")
+ flag.IntVar(&pc.RebootWaitSeconds, "equinix_reboot_wait_seconds", 30, "How many seconds to sleep to ensure a reboot happend")
+ flag.DurationVar(&pc.ReservationCacheTimeout, "equinix_reservation_cache_timeout", time.Minute*15, "Reservation cache validity timeo")
+}
+
+func (pc *providerConfig) New(sshKey *manager.SSHKey, api wrapngo.Client) (*equinixProvider, error) {
+ if err := pc.check(); err != nil {
+ return nil, err
+ }
+
+ return &equinixProvider{
+ config: pc,
+ sshKey: sshKey,
+ api: api,
+ }, nil
+}
diff --git a/cloud/shepherd/provider/equinix/provisioner_test.go b/cloud/shepherd/provider/equinix/provisioner_test.go
new file mode 100644
index 0000000..b57546a
--- /dev/null
+++ b/cloud/shepherd/provider/equinix/provisioner_test.go
@@ -0,0 +1,103 @@
+package main
+
+import (
+ "context"
+ "crypto/ed25519"
+ "crypto/rand"
+ "testing"
+ "time"
+
+ "golang.org/x/time/rate"
+
+ "source.monogon.dev/cloud/bmaas/bmdb"
+ "source.monogon.dev/cloud/bmaas/bmdb/model"
+ "source.monogon.dev/cloud/lib/component"
+ "source.monogon.dev/cloud/shepherd/manager"
+)
+
+// TestProvisionerSmokes makes sure the Provisioner doesn't go up in flames on
+// the happy path.
+func TestProvisionerSmokes(t *testing.T) {
+ pc := manager.ProvisionerConfig{
+ MaxCount: 10,
+ // We need 3 iterations to provide 10 machines with a chunk size of 4.
+ ReconcileLoopLimiter: rate.NewLimiter(rate.Every(10*time.Second), 3),
+ DeviceCreationLimiter: rate.NewLimiter(rate.Every(time.Second), 10),
+ ChunkSize: 4,
+ }
+ sc := providerConfig{
+ ProjectId: "noproject",
+ KeyLabel: "somekey",
+ DevicePrefix: "test-",
+ }
+
+ _, key, _ := ed25519.GenerateKey(rand.Reader)
+ k := manager.SSHKey{
+ Key: key,
+ }
+
+ f := newFakequinix(sc.ProjectId, 100)
+ provider, err := sc.New(&k, f)
+ if err != nil {
+ t.Fatalf("Could not create Provider: %v", err)
+ }
+
+ p, err := manager.NewProvisioner(provider, pc)
+ if err != nil {
+ t.Fatalf("Could not create Provisioner: %v", err)
+ }
+
+ ctx, ctxC := context.WithCancel(context.Background())
+ defer ctxC()
+
+ b := bmdb.BMDB{
+ Config: bmdb.Config{
+ Database: component.CockroachConfig{
+ InMemory: true,
+ },
+ ComponentName: "test",
+ RuntimeInfo: "test",
+ },
+ }
+ conn, err := b.Open(true)
+ if err != nil {
+ t.Fatalf("Could not create in-memory BMDB: %v", err)
+ }
+
+ if err := provider.SSHEquinixEnsure(ctx); err != nil {
+ t.Fatalf("Failed to ensure SSH key: %v", err)
+ }
+ go p.Run(ctx, conn)
+
+ sess, err := conn.StartSession(ctx)
+ if err != nil {
+ t.Fatalf("Failed to create BMDB session for verification: %v", err)
+ }
+ for {
+ time.Sleep(100 * time.Millisecond)
+
+ var provided []model.MachineProvided
+ err = sess.Transact(ctx, func(q *model.Queries) error {
+ var err error
+ provided, err = q.GetProvidedMachines(ctx, model.ProviderEquinix)
+ return err
+ })
+ if err != nil {
+ t.Errorf("Transact failed: %v", err)
+ }
+ if len(provided) < 10 {
+ continue
+ }
+ if len(provided) > 10 {
+ t.Errorf("%d machines provided (limit: 10)", len(provided))
+ }
+
+ for _, mp := range provided {
+ if f.devices[mp.ProviderID] == nil {
+ t.Errorf("BMDB machine %q has unknown provider ID %q", mp.MachineID, mp.ProviderID)
+ }
+ }
+
+ return
+ }
+}
diff --git a/cloud/shepherd/provider/equinix/recoverer_test.go b/cloud/shepherd/provider/equinix/recoverer_test.go
new file mode 100644
index 0000000..109c375
--- /dev/null
+++ b/cloud/shepherd/provider/equinix/recoverer_test.go
@@ -0,0 +1,181 @@
+package main
+
+import (
+ "context"
+ "crypto/ed25519"
+ "crypto/rand"
+ "testing"
+ "time"
+
+ "github.com/packethost/packngo"
+ "golang.org/x/time/rate"
+
+ "source.monogon.dev/cloud/bmaas/bmdb"
+ "source.monogon.dev/cloud/bmaas/bmdb/model"
+ "source.monogon.dev/cloud/lib/component"
+ "source.monogon.dev/cloud/shepherd/manager"
+)
+
+type recovererDut struct {
+ f *fakequinix
+ r *manager.Recoverer
+ bmdb *bmdb.Connection
+ ctx context.Context
+}
+
+func newRecovererDut(t *testing.T) *recovererDut {
+ t.Helper()
+
+ rc := manager.RecovererConfig{
+ ControlLoopConfig: manager.ControlLoopConfig{
+ DBQueryLimiter: rate.NewLimiter(rate.Every(time.Second), 10),
+ },
+ }
+
+ sc := providerConfig{
+ ProjectId: "noproject",
+ KeyLabel: "somekey",
+ DevicePrefix: "test-",
+ }
+
+ _, key, _ := ed25519.GenerateKey(rand.Reader)
+ k := manager.SSHKey{
+ Key: key,
+ }
+
+ f := newFakequinix(sc.ProjectId, 100)
+ provider, err := sc.New(&k, f)
+ if err != nil {
+ t.Fatalf("Could not create Provider: %v", err)
+ }
+
+ r, err := manager.NewRecoverer(provider, rc)
+ if err != nil {
+ t.Fatalf("Could not create Initializer: %v", err)
+ }
+
+ b := bmdb.BMDB{
+ Config: bmdb.Config{
+ Database: component.CockroachConfig{
+ InMemory: true,
+ },
+ ComponentName: "test",
+ RuntimeInfo: "test",
+ },
+ }
+ conn, err := b.Open(true)
+ if err != nil {
+ t.Fatalf("Could not create in-memory BMDB: %v", err)
+ }
+
+ ctx, ctxC := context.WithCancel(context.Background())
+ t.Cleanup(ctxC)
+
+ go manager.RunControlLoop(ctx, conn, r)
+
+ return &recovererDut{
+ f: f,
+ r: r,
+ bmdb: conn,
+ ctx: ctx,
+ }
+}
+
+// TestRecoverySmokes makes sure that the Initializer in recovery mode doesn't go
+// up in flames on the happy path.
+func TestRecoverySmokes(t *testing.T) {
+ dut := newRecovererDut(t)
+ f := dut.f
+ ctx := dut.ctx
+ conn := dut.bmdb
+
+ reservations, _ := f.ListReservations(ctx, "fake")
+
+ sess, err := conn.StartSession(ctx)
+ if err != nil {
+ t.Fatalf("Failed to create BMDB session: %v", err)
+ }
+
+ // Create test machine that should be selected for recovery.
+ // First in Fakequinix...
+ dev, _ := f.CreateDevice(ctx, &packngo.DeviceCreateRequest{
+ Hostname: "test-devices",
+ OS: "fake",
+ ProjectID: "fake",
+ HardwareReservationID: reservations[0].ID,
+ ProjectSSHKeys: []string{},
+ })
+ // ... and in BMDB.
+ err = sess.Transact(ctx, func(q *model.Queries) error {
+ machine, err := q.NewMachine(ctx)
+ if err != nil {
+ return err
+ }
+ err = q.MachineAddProvided(ctx, model.MachineAddProvidedParams{
+ MachineID: machine.MachineID,
+ Provider: model.ProviderEquinix,
+ ProviderID: dev.ID,
+ })
+ if err != nil {
+ return err
+ }
+ return q.MachineSetAgentStarted(ctx, model.MachineSetAgentStartedParams{
+ MachineID: machine.MachineID,
+ AgentStartedAt: time.Now().Add(time.Hour * -10),
+ AgentPublicKey: []byte("fakefakefakefake"),
+ })
+ })
+ if err != nil {
+ t.Fatalf("Failed to create test machine: %v", err)
+ }
+
+ // Expect to find 0 machines needing recovery.
+ deadline := time.Now().Add(10 * time.Second)
+ for {
+ if time.Now().After(deadline) {
+ t.Fatalf("Machines did not get processed in time")
+ }
+ time.Sleep(100 * time.Millisecond)
+
+ var machines []model.MachineProvided
+ err = sess.Transact(ctx, func(q *model.Queries) error {
+ var err error
+ machines, err = q.GetMachineForAgentRecovery(ctx, model.GetMachineForAgentRecoveryParams{
+ Limit: 100,
+ Provider: model.ProviderEquinix,
+ })
+ return err
+ })
+ if err != nil {
+ t.Fatalf("Failed to run Transaction: %v", err)
+ }
+ if len(machines) == 0 {
+ break
+ }
+ }
+
+ // Expect the target machine to have been rebooted.
+ dut.f.mu.Lock()
+ reboots := dut.f.reboots[dev.ID]
+ dut.f.mu.Unlock()
+ if want, got := 1, reboots; want != got {
+ t.Fatalf("Wanted %d reboot, got %d", want, got)
+ }
+
+ // Expect machine to now be available again for agent start.
+ var machines []model.MachineProvided
+ err = sess.Transact(ctx, func(q *model.Queries) error {
+ var err error
+ machines, err = q.GetMachinesForAgentStart(ctx, model.GetMachinesForAgentStartParams{
+ Limit: 100,
+ Provider: model.ProviderEquinix,
+ })
+ return err
+ })
+ if err != nil {
+ t.Fatalf("Failed to run Transaction: %v", err)
+ }
+ if want, got := 1, len(machines); want != got {
+ t.Fatalf("Wanted %d machine ready for agent start, got %d", want, got)
+ }
+}
diff --git a/cloud/shepherd/provider/equinix/updater.go b/cloud/shepherd/provider/equinix/updater.go
new file mode 100644
index 0000000..b053f26
--- /dev/null
+++ b/cloud/shepherd/provider/equinix/updater.go
@@ -0,0 +1,263 @@
+package main
+
+import (
+ "context"
+ "database/sql"
+ "errors"
+ "flag"
+ "fmt"
+ "time"
+
+ "github.com/packethost/packngo"
+ "k8s.io/klog/v2"
+
+ "source.monogon.dev/cloud/bmaas/bmdb"
+ "source.monogon.dev/cloud/bmaas/bmdb/metrics"
+ "source.monogon.dev/cloud/bmaas/bmdb/model"
+ ecl "source.monogon.dev/cloud/equinix/wrapngo"
+ "source.monogon.dev/cloud/lib/sinbin"
+)
+
+type UpdaterConfig struct {
+ // Enable starts the updater.
+ Enable bool
+ // IterationRate is the minimu mtime taken between subsequent iterations of the
+ // updater.
+ IterationRate time.Duration
+}
+
+func (u *UpdaterConfig) RegisterFlags() {
+ flag.BoolVar(&u.Enable, "updater_enable", true, "Enable the updater, which periodically scans equinix machines and updates their status in the BMDB")
+ flag.DurationVar(&u.IterationRate, "updater_iteration_rate", time.Minute, "Rate limiting for updater iteration loop")
+}
+
+// The Updater periodically scans all machines backed by the equinix provider and
+// updaters their Provided status fields based on data retrieved from the Equinix
+// API.
+type Updater struct {
+ config *UpdaterConfig
+ sinbin sinbin.Sinbin[string]
+
+ cl ecl.Client
+}
+
+func (c *UpdaterConfig) New(cl ecl.Client) (*Updater, error) {
+ return &Updater{
+ config: c,
+ cl: cl,
+ }, nil
+}
+
+func (u *Updater) Run(ctx context.Context, conn *bmdb.Connection) error {
+ var sess *bmdb.Session
+ var err error
+
+ if !u.config.Enable {
+ return nil
+ }
+
+ for {
+ if sess == nil {
+ sess, err = conn.StartSession(ctx, bmdb.SessionOption{Processor: metrics.ProcessorShepherdUpdater})
+ if err != nil {
+ return fmt.Errorf("could not start BMDB session: %w", err)
+ }
+ }
+ limit := time.After(u.config.IterationRate)
+
+ err = u.runInSession(ctx, sess)
+ switch {
+ case err == nil:
+ <-limit
+ case errors.Is(err, ctx.Err()):
+ return err
+ case errors.Is(err, bmdb.ErrSessionExpired):
+ klog.Errorf("Session expired, restarting...")
+ sess = nil
+ time.Sleep(time.Second)
+ case err != nil:
+ klog.Errorf("Processing failed: %v", err)
+ // TODO(q3k): close session
+ time.Sleep(time.Second)
+ }
+ }
+}
+
+// applyNullStringUpdate returns true if 'up' supersedes 'cur'. Otherwise, it
+// returns false and zeroes out up.
+func applyNullStringUpdate(up, cur *sql.NullString) bool {
+ if up.Valid {
+ if !cur.Valid {
+ return true
+ }
+ if up.String != cur.String {
+ return true
+ }
+ }
+ up.String = ""
+ up.Valid = false
+ return false
+}
+
+// applyNullProviderStatusUpdate returns true if 'up' supersedes 'cur'.
+// Otherwise, it returns false and zeroes out up.
+func applyNullProviderStatusUpdate(up, cur *model.NullProviderStatus) bool {
+ if up.Valid {
+ if !cur.Valid {
+ return true
+ }
+ if up.ProviderStatus != cur.ProviderStatus {
+ return true
+ }
+ }
+ up.ProviderStatus = model.ProviderStatusUnknown
+ up.Valid = false
+ return false
+}
+
+// applyUpdate returns true if 'up' supersedes 'cur'. Otherwise, it returns false
+// and zeroes out up.
+func applyUpdate(up *model.MachineUpdateProviderStatusParams, cur *model.MachineProvided) bool {
+ res := false
+ res = res || applyNullStringUpdate(&up.ProviderReservationID, &cur.ProviderReservationID)
+ res = res || applyNullStringUpdate(&up.ProviderIpAddress, &cur.ProviderIpAddress)
+ res = res || applyNullStringUpdate(&up.ProviderLocation, &cur.ProviderLocation)
+ res = res || applyNullProviderStatusUpdate(&up.ProviderStatus, &cur.ProviderStatus)
+ return res
+}
+
+// updateLog logs information about the given update as calculated by applyUpdate.
+func updateLog(up *model.MachineUpdateProviderStatusParams) {
+ if up.ProviderReservationID.Valid {
+ klog.Infof(" Machine %s: new reservation ID %s", up.ProviderID, up.ProviderReservationID.String)
+ }
+ if up.ProviderIpAddress.Valid {
+ klog.Infof(" Machine %s: new IP address %s", up.ProviderID, up.ProviderIpAddress.String)
+ }
+ if up.ProviderLocation.Valid {
+ klog.Infof(" Machine %s: new location %s", up.ProviderID, up.ProviderLocation.String)
+ }
+ if up.ProviderStatus.Valid {
+ klog.Infof(" Machine %s: new status %s", up.ProviderID, up.ProviderStatus.ProviderStatus)
+ }
+}
+
+func (u *Updater) runInSession(ctx context.Context, sess *bmdb.Session) error {
+ // Get all machines provided by us into the BMDB.
+ // TODO(q3k): do not load all machines into memory.
+
+ var machines []model.MachineProvided
+ err := sess.Transact(ctx, func(q *model.Queries) error {
+ var err error
+ machines, err = q.GetProvidedMachines(ctx, model.ProviderEquinix)
+ return err
+ })
+ if err != nil {
+ return fmt.Errorf("when fetching provided machines: %w", err)
+ }
+
+ // Limit how many machines we check by timing them out if they're likely to not
+ // get updated soon.
+ penalized := 0
+ var check []model.MachineProvided
+ for _, m := range machines {
+ if u.sinbin.Penalized(m.ProviderID) {
+ penalized += 1
+ } else {
+ check = append(check, m)
+ }
+ }
+
+ klog.Infof("Machines to check %d, skipping: %d", len(check), penalized)
+ for _, m := range check {
+ dev, err := u.cl.GetDevice(ctx, "", m.ProviderID, &packngo.ListOptions{
+ Includes: []string{
+ "hardware_reservation",
+ },
+ Excludes: []string{
+ "created_by", "customdata", "network_ports", "operating_system", "actions",
+ "plan", "provisioning_events", "ssh_keys", "tags", "volumes",
+ },
+ })
+ if err != nil {
+ klog.Warningf("Fetching device %s failed: %v", m.ProviderID, err)
+ continue
+ }
+
+ // nextCheck will be used to sinbin the machine for some given time if there is
+ // no difference between the current state and new state.
+ //
+ // Some conditions override this to be shorter (when the machine doesn't yet have
+ // all data available or is in an otherwise unstable state).
+ nextCheck := time.Minute * 30
+
+ up := model.MachineUpdateProviderStatusParams{
+ Provider: m.Provider,
+ ProviderID: m.ProviderID,
+ }
+
+ if dev.HardwareReservation != nil {
+ up.ProviderReservationID.Valid = true
+ up.ProviderReservationID.String = dev.HardwareReservation.ID
+ } else {
+ nextCheck = time.Minute
+ }
+
+ for _, addr := range dev.Network {
+ if !addr.Public {
+ continue
+ }
+ up.ProviderIpAddress.Valid = true
+ up.ProviderIpAddress.String = addr.Address
+ break
+ }
+ if !up.ProviderIpAddress.Valid {
+ nextCheck = time.Minute
+ }
+
+ if dev.Facility != nil {
+ up.ProviderLocation.Valid = true
+ up.ProviderLocation.String = dev.Facility.Code
+ } else {
+ nextCheck = time.Minute
+ }
+
+ up.ProviderStatus.Valid = true
+ switch dev.State {
+ case "active":
+ up.ProviderStatus.ProviderStatus = model.ProviderStatusRunning
+ case "deleted":
+ up.ProviderStatus.ProviderStatus = model.ProviderStatusMissing
+ case "failed":
+ up.ProviderStatus.ProviderStatus = model.ProviderStatusProvisioningFailedPermanent
+ case "inactive":
+ up.ProviderStatus.ProviderStatus = model.ProviderStatusStopped
+ case "powering_on", "powering_off":
+ nextCheck = time.Minute
+ up.ProviderStatus.ProviderStatus = model.ProviderStatusStopped
+ case "queued", "provisioning", "reinstalling", "post_provisioning":
+ nextCheck = time.Minute
+ up.ProviderStatus.ProviderStatus = model.ProviderStatusProvisioning
+ default:
+ klog.Warningf("Device %s has unexpected status: %q", m.ProviderID, dev.State)
+ nextCheck = time.Minute
+ up.ProviderStatus.ProviderStatus = model.ProviderStatusUnknown
+ }
+
+ if !applyUpdate(&up, &m) {
+ u.sinbin.Add(m.ProviderID, time.Now().Add(nextCheck))
+ continue
+ }
+
+ klog.Infof("Device %s has new data:", m.ProviderID)
+ updateLog(&up)
+ err = sess.Transact(ctx, func(q *model.Queries) error {
+ return q.MachineUpdateProviderStatus(ctx, up)
+ })
+ if err != nil {
+ klog.Warningf("Device %s failed to update: %v", m.ProviderID, err)
+ }
+ u.sinbin.Add(m.ProviderID, time.Now().Add(time.Minute))
+ }
+ return nil
+}
diff --git a/cloud/shepherd/provider/equinix/updater_test.go b/cloud/shepherd/provider/equinix/updater_test.go
new file mode 100644
index 0000000..9b23295
--- /dev/null
+++ b/cloud/shepherd/provider/equinix/updater_test.go
@@ -0,0 +1,140 @@
+package main
+
+import (
+ "context"
+ "testing"
+ "time"
+
+ "github.com/packethost/packngo"
+
+ "source.monogon.dev/cloud/bmaas/bmdb"
+ "source.monogon.dev/cloud/bmaas/bmdb/model"
+ "source.monogon.dev/cloud/lib/component"
+)
+
+type updaterDut struct {
+ f *fakequinix
+ u *Updater
+ bmdb *bmdb.Connection
+ ctx context.Context
+}
+
+func newUpdaterDut(t *testing.T) *updaterDut {
+ t.Helper()
+
+ uc := UpdaterConfig{
+ Enable: true,
+ IterationRate: time.Second,
+ }
+
+ f := newFakequinix("fake", 100)
+ u, err := uc.New(f)
+ if err != nil {
+ t.Fatalf("Could not create Updater: %v", err)
+ }
+
+ b := bmdb.BMDB{
+ Config: bmdb.Config{
+ Database: component.CockroachConfig{
+ InMemory: true,
+ },
+ ComponentName: "test",
+ RuntimeInfo: "test",
+ },
+ }
+ conn, err := b.Open(true)
+ if err != nil {
+ t.Fatalf("Could not create in-memory BMDB: %v", err)
+ }
+
+ ctx, ctxC := context.WithCancel(context.Background())
+ t.Cleanup(ctxC)
+
+ go u.Run(ctx, conn)
+
+ return &updaterDut{
+ f: f,
+ u: u,
+ bmdb: conn,
+ ctx: ctx,
+ }
+}
+
+func TestUpdater(t *testing.T) {
+ dut := newUpdaterDut(t)
+ f := dut.f
+ ctx := dut.ctx
+ conn := dut.bmdb
+
+ reservations, _ := f.ListReservations(ctx, "fake")
+
+ sess, err := conn.StartSession(ctx)
+ if err != nil {
+ t.Fatalf("Failed to create BMDB session: %v", err)
+ }
+
+ // Create test machine that should be selected for updating.
+ // First in Fakequinix...
+ dev, _ := f.CreateDevice(ctx, &packngo.DeviceCreateRequest{
+ Hostname: "test-devices",
+ OS: "fake",
+ ProjectID: "fake",
+ HardwareReservationID: reservations[0].ID,
+ ProjectSSHKeys: []string{},
+ })
+ // ... and in BMDB.
+ err = sess.Transact(ctx, func(q *model.Queries) error {
+ machine, err := q.NewMachine(ctx)
+ if err != nil {
+ return err
+ }
+ err = q.MachineAddProvided(ctx, model.MachineAddProvidedParams{
+ MachineID: machine.MachineID,
+ Provider: model.ProviderEquinix,
+ ProviderID: dev.ID,
+ })
+ if err != nil {
+ return err
+ }
+ return q.MachineSetAgentStarted(ctx, model.MachineSetAgentStartedParams{
+ MachineID: machine.MachineID,
+ AgentStartedAt: time.Now().Add(time.Hour * -10),
+ AgentPublicKey: []byte("fakefakefakefake"),
+ })
+ })
+
+ deadline := time.Now().Add(time.Second * 10)
+ for {
+ time.Sleep(100 * time.Millisecond)
+ if time.Now().After(deadline) {
+ t.Fatalf("Deadline exceeded")
+ }
+
+ var provided []model.MachineProvided
+ err = sess.Transact(ctx, func(q *model.Queries) error {
+ var err error
+ provided, err = q.GetProvidedMachines(ctx, model.ProviderEquinix)
+ return err
+ })
+ if err != nil {
+ t.Fatalf("Transact: %v", err)
+ }
+ if len(provided) < 1 {
+ continue
+ }
+ p := provided[0]
+ if p.ProviderStatus.ProviderStatus != model.ProviderStatusRunning {
+ continue
+ }
+ if p.ProviderLocation.String != "wad" {
+ continue
+ }
+ if p.ProviderIpAddress.String != "1.2.3.4" {
+ continue
+ }
+ if p.ProviderReservationID.String != reservations[0].ID {
+ continue
+ }
+ break
+ }
+}