mc/c/metroctl: implement simple update sequencing
Adds support for the max-unavailable flag, specifying the number of
nodes which can be concurrently in the update process.
To implement this, we now wait for the node to report as healthy again
before considering the update complete.
This allows simple gradual cluster updates.
Change-Id: If6aa44d1b8f12aadd77e35f16513a4af521fa356
Reviewed-on: https://review.monogon.dev/c/monogon/+/2770
Reviewed-by: Tim Windelschmidt <tim@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/cli/metroctl/BUILD.bazel b/metropolis/cli/metroctl/BUILD.bazel
index af520e1..60c59b6 100644
--- a/metropolis/cli/metroctl/BUILD.bazel
+++ b/metropolis/cli/metroctl/BUILD.bazel
@@ -58,6 +58,7 @@
"@io_k8s_apimachinery//pkg/apis/meta/v1:meta",
"@io_k8s_client_go//pkg/apis/clientauthentication/v1:clientauthentication",
"@org_golang_google_grpc//:go_default_library",
+ "@org_golang_x_sync//semaphore",
],
)
diff --git a/metropolis/cli/metroctl/cmd_node.go b/metropolis/cli/metroctl/cmd_node.go
index a582f02..18cec83 100644
--- a/metropolis/cli/metroctl/cmd_node.go
+++ b/metropolis/cli/metroctl/cmd_node.go
@@ -2,19 +2,25 @@
import (
"context"
+ "errors"
"fmt"
"io"
"log"
"os"
"strings"
+ "sync"
+ "time"
"github.com/spf13/cobra"
+ "golang.org/x/sync/semaphore"
"source.monogon.dev/go/clitable"
"source.monogon.dev/metropolis/cli/metroctl/core"
clicontext "source.monogon.dev/metropolis/cli/pkg/context"
"source.monogon.dev/metropolis/node/core/identity"
+ "source.monogon.dev/version"
+ "source.monogon.dev/metropolis/proto/api"
apb "source.monogon.dev/metropolis/proto/api"
)
@@ -63,7 +69,7 @@
var nodeUpdateCmd = &cobra.Command{
Short: "Updates the operating system of a cluster node.",
- Use: "update [NodeID] --bundle-url bundleURL [--activation-mode <none|reboot|kexec>]",
+ Use: "update [NodeIDs]",
Example: "metroctl node update --bundle-url https://example.com/bundle.zip --activation-mode reboot metropolis-25fa5f5e9349381d4a5e9e59de0215e3",
RunE: func(cmd *cobra.Command, args []string) error {
bundleUrl, err := cmd.Flags().GetString("bundle-url")
@@ -92,6 +98,15 @@
return fmt.Errorf("invalid value for flag activation-mode")
}
+ maxUnavailable, err := cmd.Flags().GetUint64("max-unavailable")
+ if err != nil {
+ return err
+ }
+ if maxUnavailable == 0 {
+ return errors.New("unable to update notes with max-unavailable set to zero")
+ }
+ unavailableSemaphore := semaphore.NewWeighted(int64(maxUnavailable))
+
ctx := clicontext.WithInterrupt(context.Background())
cacert, err := core.GetClusterCAWithTOFU(ctx, connectOptions())
@@ -118,6 +133,8 @@
ActivationMode: am,
}
+ var wg sync.WaitGroup
+
for _, n := range nodes {
// Filter the information we want client-side.
if len(qids) != 0 {
@@ -127,18 +144,65 @@
}
}
- cc := dialAuthenticatedNode(ctx, n.Id, n.Status.ExternalAddress, cacert)
- nodeMgmt := apb.NewNodeManagementClient(cc)
- log.Printf("sending update request to: %s (%s)", n.Id, n.Status.ExternalAddress)
- _, err := nodeMgmt.UpdateNode(ctx, updateReq)
- if err != nil {
+ if err := unavailableSemaphore.Acquire(ctx, 1); err != nil {
return err
}
+ wg.Add(1)
+
+ go func(n *api.Node) {
+ defer wg.Done()
+ cc := dialAuthenticatedNode(ctx, n.Id, n.Status.ExternalAddress, cacert)
+ nodeMgmt := apb.NewNodeManagementClient(cc)
+ log.Printf("sending update request to: %s (%s)", n.Id, n.Status.ExternalAddress)
+ start := time.Now()
+ _, err := nodeMgmt.UpdateNode(ctx, updateReq)
+ if err != nil {
+ log.Printf("update request to node %s failed: %v", n.Id, err)
+ // A failed UpdateNode does not mean that the node is now unavailable as it
+ // hasn't started activating yet.
+ unavailableSemaphore.Release(1)
+ }
+ // Wait for the internal activation sleep plus the heartbeat
+ // to make sure the node has missed one heartbeat (or is
+ // back up already).
+ time.Sleep((5 + 10) * time.Second)
+ for {
+ select {
+ case <-time.After(10 * time.Second):
+ nodes, err := core.GetNodes(ctx, mgmt, fmt.Sprintf("node.id == %q", n.Id))
+ if err != nil {
+ log.Printf("while getting node status for %s: %v", n.Id, err)
+ }
+ if len(nodes) == 0 {
+ log.Printf("node status for %s returned no node", n.Id)
+ }
+ if len(nodes) > 1 {
+ log.Printf("node status for %s returned too many nodes (%d)", n.Id, len(nodes))
+ }
+ s := nodes[0]
+ if s.Health == api.Node_HEALTHY {
+ if s.Status != nil && s.Status.Version != nil {
+ log.Printf("node %s updated in %v to version %s", s.Id, time.Since(start), version.Semver(s.Status.Version))
+ } else {
+ log.Printf("node %s updated in %v to unknown version", s.Id, time.Since(start))
+ }
+ unavailableSemaphore.Release(1)
+ return
+ }
+ case <-ctx.Done():
+ log.Printf("update to node %s incomplete", n.Id)
+ return
+ }
+ }
+ }(n)
}
+ // Wait for all update processes to finish
+ wg.Wait()
+
return nil
},
- Args: cobra.ExactArgs(1),
+ Args: cobra.MinimumNArgs(1),
}
var nodeDeleteCmd = &cobra.Command{
@@ -198,6 +262,7 @@
func init() {
nodeUpdateCmd.Flags().String("bundle-url", "", "The URL to the new version")
nodeUpdateCmd.Flags().String("activation-mode", "reboot", "How the update should be activated (kexec, reboot, none)")
+ nodeUpdateCmd.Flags().Uint64("max-unavailable", 1, "Maximum nodes which can be unavailable during the update process")
nodeDeleteCmd.Flags().Bool("bypass-has-roles", false, "Allows to bypass the HasRoles check")
nodeDeleteCmd.Flags().Bool("bypass-not-decommissioned", false, "Allows to bypass the NotDecommissioned check")