cloud/shepherd/equinix: implement initializer parallelism

This adds a flag which allows starting multiple initializers in
parallel, sharing the same SSH key/config and API rate limiting.

Change-Id: I415e855d9b649fac258e25d884cac17f895c91c0
Reviewed-on: https://review.monogon.dev/c/monogon/+/1135
Tested-by: Jenkins CI
Reviewed-by: Mateusz Zalega <mateusz@monogon.tech>
diff --git a/cloud/shepherd/equinix/manager/BUILD.bazel b/cloud/shepherd/equinix/manager/BUILD.bazel
index 004cd24..2c702fe 100644
--- a/cloud/shepherd/equinix/manager/BUILD.bazel
+++ b/cloud/shepherd/equinix/manager/BUILD.bazel
@@ -23,6 +23,7 @@
         "@io_k8s_klog_v2//:klog",
         "@org_golang_google_protobuf//proto",
         "@org_golang_x_crypto//ssh",
+        "@org_golang_x_sync//errgroup",
         "@org_golang_x_time//rate",
     ],
 )
diff --git a/cloud/shepherd/equinix/manager/initializer.go b/cloud/shepherd/equinix/manager/initializer.go
index 5b1ea49..6194f0d 100644
--- a/cloud/shepherd/equinix/manager/initializer.go
+++ b/cloud/shepherd/equinix/manager/initializer.go
@@ -16,6 +16,7 @@
 	"github.com/google/uuid"
 	"github.com/packethost/packngo"
 	"golang.org/x/crypto/ssh"
+	"golang.org/x/sync/errgroup"
 	"golang.org/x/time/rate"
 	"google.golang.org/protobuf/proto"
 	"k8s.io/klog/v2"
@@ -76,6 +77,15 @@
 	// DBQueryLimiter limits the rate at which BMDB is queried for servers ready
 	// for BMaaS agent initialization. Must be set.
 	DBQueryLimiter *rate.Limiter
+
+	// Parallelism is how many instances of the Initializer will be allowed to run in
+	// parallel against the BMDB. This speeds up the process of starting/restarting
+	// agents significantly, as one initializer instance can handle at most one agent
+	// (re)starting process.
+	//
+	// If not set (ie. 0), default to 1. A good starting value for production
+	// deployments is 10 or so.
+	Parallelism int
 }
 
 // flagLimiter configures a *rate.Limiter as a flag.
@@ -106,6 +116,7 @@
 
 func (i *InitializerConfig) RegisterFlags() {
 	flagLimiter(&i.DBQueryLimiter, "initializer_db_query_rate", "250ms,8", "Rate limiting for BMDB queries")
+	flag.IntVar(&i.Parallelism, "initializer_parallelism", 1, "How many initializer instances to run in parallel, ie. how many agents to attempt to (re)start at once")
 }
 
 // Initializer implements the BMaaS agent initialization process. Initialization
@@ -158,6 +169,9 @@
 	if c.DBQueryLimiter == nil {
 		return nil, fmt.Errorf("DBQueryLimiter must be configured")
 	}
+	if c.Parallelism == 0 {
+		c.Parallelism = 1
+	}
 	return &Initializer{
 		config:       c,
 		sharedConfig: sc,
@@ -167,9 +181,21 @@
 	}, nil
 }
 
+// Run the initializer(s) (depending on opts.Parallelism) blocking the current
+// goroutine until the given context expires and all provisioners quit.
+func (i *Initializer) Run(ctx context.Context, conn *bmdb.Connection) error {
+	eg := errgroup.Group{}
+	for j := 0; j < i.config.Parallelism; j += 1 {
+		eg.Go(func() error {
+			return i.runOne(ctx, conn)
+		})
+	}
+	return eg.Wait()
+}
+
 // Run the initializer blocking the current goroutine until the given context
 // expires.
-func (c *Initializer) Run(ctx context.Context, conn *bmdb.Connection) error {
+func (c *Initializer) runOne(ctx context.Context, conn *bmdb.Connection) error {
 	signer, err := c.sharedConfig.sshSigner()
 	if err != nil {
 		return fmt.Errorf("could not initialize signer: %w", err)