cloud/bmaas/bmdb: Filter for machines based on provider
Currently the equinix shepherd tries to initialize our nodes from lumen,
which of course is not correct. This change adds another parameter to
the queries and prevents that.
Change-Id: Ib3f65e68403cb1b1282b80c1d494fb030a6d17b1
Reviewed-on: https://review.monogon.dev/c/monogon/+/2039
Reviewed-by: Serge Bazanski <serge@monogon.tech>
Tested-by: Jenkins CI
diff --git a/cloud/bmaas/bmdb/model/queries_workflows.sql b/cloud/bmaas/bmdb/model/queries_workflows.sql
index 8400d4e..49622d8 100644
--- a/cloud/bmaas/bmdb/model/queries_workflows.sql
+++ b/cloud/bmaas/bmdb/model/queries_workflows.sql
@@ -27,6 +27,7 @@
)
AND work.machine_id IS NULL
AND work_backoff.machine_id IS NULL
+ AND machine_provided.provider = $2
LIMIT $1;
-- name: GetMachineForAgentRecovery :many
@@ -66,6 +67,7 @@
)
AND work.machine_id IS NULL
AND work_backoff.machine_id IS NULL
+ AND machine_provided.provider = $2
LIMIT $1;
-- name: AuthenticateAgentConnection :many
diff --git a/cloud/bmaas/bmdb/queries_test.go b/cloud/bmaas/bmdb/queries_test.go
index 96e8dbe..7ceae89 100644
--- a/cloud/bmaas/bmdb/queries_test.go
+++ b/cloud/bmaas/bmdb/queries_test.go
@@ -42,7 +42,10 @@
expectCandidates := func(want int) {
t.Helper()
if err := session.Transact(ctx, func(q *model.Queries) error {
- candidates, err := q.GetMachinesForAgentStart(ctx, 1)
+ candidates, err := q.GetMachinesForAgentStart(ctx, model.GetMachinesForAgentStartParams{
+ Limit: 1,
+ Provider: model.ProviderEquinix,
+ })
if err != nil {
t.Fatalf("Could not retrieve machines for agent start: %v", err)
}
@@ -246,7 +249,10 @@
found := false
if err := session.Transact(ctx, func(q *model.Queries) error {
- candidates, err := q.GetMachineForAgentRecovery(ctx, 100)
+ candidates, err := q.GetMachineForAgentRecovery(ctx, model.GetMachineForAgentRecoveryParams{
+ Limit: 100,
+ Provider: model.ProviderEquinix,
+ })
if err != nil {
return fmt.Errorf("GetMachinesForAgentRecovery: %w", err)
}
diff --git a/cloud/bmaas/bmdb/sessions_test.go b/cloud/bmaas/bmdb/sessions_test.go
index b8625a9..d3c80bd 100644
--- a/cloud/bmaas/bmdb/sessions_test.go
+++ b/cloud/bmaas/bmdb/sessions_test.go
@@ -202,7 +202,10 @@
t.Fatalf("Deadline expired")
}
work, err := session.Work(ctx, model.ProcessShepherdAgentStart, func(q *model.Queries) ([]uuid.UUID, error) {
- machines, err := q.GetMachinesForAgentStart(ctx, 1)
+ machines, err := q.GetMachinesForAgentStart(ctx, model.GetMachinesForAgentStartParams{
+ Limit: 1,
+ Provider: model.ProviderEquinix,
+ })
if err != nil {
return nil, err
}
@@ -238,7 +241,10 @@
var machines []model.MachineProvided
var err error
err = session.Transact(ctx, func(q *model.Queries) error {
- machines, err = q.GetMachinesForAgentStart(ctx, 1)
+ machines, err = q.GetMachinesForAgentStart(ctx, model.GetMachinesForAgentStartParams{
+ Limit: 1,
+ Provider: model.ProviderEquinix,
+ })
if err != nil {
return err
}
@@ -351,7 +357,10 @@
errC := make(chan error)
go func() {
work, err := session.Work(ctx, model.ProcessShepherdAgentStart, func(q *model.Queries) ([]uuid.UUID, error) {
- machines, err := q.GetMachinesForAgentStart(ctx, 1)
+ machines, err := q.GetMachinesForAgentStart(ctx, model.GetMachinesForAgentStartParams{
+ Limit: 1,
+ Provider: model.ProviderEquinix,
+ })
if err != nil {
return nil, err
}
@@ -390,7 +399,10 @@
// Mutual exclusion with AgentStart:
err = session.Transact(ctx, func(q *model.Queries) error {
- machines, err := q.GetMachinesForAgentStart(ctx, 1)
+ machines, err := q.GetMachinesForAgentStart(ctx, model.GetMachinesForAgentStartParams{
+ Limit: 1,
+ Provider: model.ProviderEquinix,
+ })
if err != nil {
return err
}
@@ -405,7 +417,10 @@
// Mutual exclusion with Recovery:
err = session.Transact(ctx, func(q *model.Queries) error {
- machines, err := q.GetMachineForAgentRecovery(ctx, 1)
+ machines, err := q.GetMachineForAgentRecovery(ctx, model.GetMachineForAgentRecoveryParams{
+ Limit: 1,
+ Provider: model.ProviderEquinix,
+ })
if err != nil {
return err
}
@@ -427,7 +442,10 @@
// That machine has its agent started, so we still expect no work to have to be
// done.
err = session.Transact(ctx, func(q *model.Queries) error {
- machines, err := q.GetMachinesForAgentStart(ctx, 1)
+ machines, err := q.GetMachinesForAgentStart(ctx, model.GetMachinesForAgentStartParams{
+ Limit: 1,
+ Provider: model.ProviderEquinix,
+ })
if err != nil {
return err
}
@@ -518,7 +536,10 @@
workOnce := func(ctx context.Context, workerID int, session *Session) error {
work, err := session.Work(ctx, model.ProcessShepherdAgentStart, func(q *model.Queries) ([]uuid.UUID, error) {
- machines, err := q.GetMachinesForAgentStart(ctx, 1)
+ machines, err := q.GetMachinesForAgentStart(ctx, model.GetMachinesForAgentStartParams{
+ Limit: 1,
+ Provider: model.ProviderEquinix,
+ })
if err != nil {
return nil, err
}
diff --git a/cloud/shepherd/equinix/manager/initializer.go b/cloud/shepherd/equinix/manager/initializer.go
index 2f1721b..272df20 100644
--- a/cloud/shepherd/equinix/manager/initializer.go
+++ b/cloud/shepherd/equinix/manager/initializer.go
@@ -20,6 +20,7 @@
"k8s.io/klog/v2"
apb "source.monogon.dev/cloud/agent/api"
+
"source.monogon.dev/cloud/bmaas/bmdb"
"source.monogon.dev/cloud/bmaas/bmdb/metrics"
"source.monogon.dev/cloud/bmaas/bmdb/model"
@@ -162,7 +163,10 @@
}
func (c *Initializer) getMachines(ctx context.Context, q *model.Queries, limit int32) ([]model.MachineProvided, error) {
- return q.GetMachinesForAgentStart(ctx, limit)
+ return q.GetMachinesForAgentStart(ctx, model.GetMachinesForAgentStartParams{
+ Limit: limit,
+ Provider: model.ProviderEquinix,
+ })
}
func (c *Initializer) processMachine(ctx context.Context, t *task) error {
diff --git a/cloud/shepherd/equinix/manager/initializer_test.go b/cloud/shepherd/equinix/manager/initializer_test.go
index 20c2f16..82e1f90 100644
--- a/cloud/shepherd/equinix/manager/initializer_test.go
+++ b/cloud/shepherd/equinix/manager/initializer_test.go
@@ -14,6 +14,7 @@
"google.golang.org/protobuf/proto"
apb "source.monogon.dev/cloud/agent/api"
+
"source.monogon.dev/cloud/bmaas/bmdb"
"source.monogon.dev/cloud/bmaas/bmdb/model"
"source.monogon.dev/cloud/lib/component"
@@ -193,7 +194,10 @@
var machines []model.MachineProvided
err = sess.Transact(ctx, func(q *model.Queries) error {
var err error
- machines, err = q.GetMachinesForAgentStart(ctx, 100)
+ machines, err = q.GetMachinesForAgentStart(ctx, model.GetMachinesForAgentStartParams{
+ Limit: 100,
+ Provider: model.ProviderEquinix,
+ })
return err
})
if err != nil {
diff --git a/cloud/shepherd/equinix/manager/recoverer.go b/cloud/shepherd/equinix/manager/recoverer.go
index 72b5588..3779b02 100644
--- a/cloud/shepherd/equinix/manager/recoverer.go
+++ b/cloud/shepherd/equinix/manager/recoverer.go
@@ -55,7 +55,10 @@
}
func (r *Recoverer) getMachines(ctx context.Context, q *model.Queries, limit int32) ([]model.MachineProvided, error) {
- return q.GetMachineForAgentRecovery(ctx, limit)
+ return q.GetMachineForAgentRecovery(ctx, model.GetMachineForAgentRecoveryParams{
+ Limit: limit,
+ Provider: model.ProviderEquinix,
+ })
}
func (r *Recoverer) processMachine(ctx context.Context, t *task) error {
diff --git a/cloud/shepherd/equinix/manager/recoverer_test.go b/cloud/shepherd/equinix/manager/recoverer_test.go
index 8583efd..63e244e 100644
--- a/cloud/shepherd/equinix/manager/recoverer_test.go
+++ b/cloud/shepherd/equinix/manager/recoverer_test.go
@@ -121,7 +121,10 @@
var machines []model.MachineProvided
err = sess.Transact(ctx, func(q *model.Queries) error {
var err error
- machines, err = q.GetMachineForAgentRecovery(ctx, 100)
+ machines, err = q.GetMachineForAgentRecovery(ctx, model.GetMachineForAgentRecoveryParams{
+ Limit: 100,
+ Provider: model.ProviderEquinix,
+ })
return err
})
if err != nil {
@@ -144,7 +147,10 @@
var machines []model.MachineProvided
err = sess.Transact(ctx, func(q *model.Queries) error {
var err error
- machines, err = q.GetMachinesForAgentStart(ctx, 100)
+ machines, err = q.GetMachinesForAgentStart(ctx, model.GetMachinesForAgentStartParams{
+ Limit: 100,
+ Provider: model.ProviderEquinix,
+ })
return err
})
if err != nil {