m/cli/metroctl: use one progress bar for all files

Previously, there was a separate progress bar for each uploaded file,
now there is just one which shows the total number of bytes transferred.

Change-Id: Id4cba63889077a076cb63d437e3fe4b17cfc3786
Reviewed-on: https://review.monogon.dev/c/monogon/+/4049
Reviewed-by: Tim Windelschmidt <tim@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/cli/metroctl/cmd_install_ssh.go b/metropolis/cli/metroctl/cmd_install_ssh.go
index 410a88d..b52c353 100644
--- a/metropolis/cli/metroctl/cmd_install_ssh.go
+++ b/metropolis/cli/metroctl/cmd_install_ssh.go
@@ -24,9 +24,72 @@
 	"google.golang.org/protobuf/proto"
 
 	"source.monogon.dev/osbase/net/sshtakeover"
-	"source.monogon.dev/osbase/structfs"
 )
 
+// progressbarUpdater wraps a [progressbar.ProgressBar] with an improved
+// interface for updating progress. It updates the progress bar in a separate
+// goroutine and at most 60 times per second. The stop function stops the
+// updates and can be safely called multiple times.
+type progressbarUpdater struct {
+	bar    *progressbar.ProgressBar
+	update chan int64
+	close  chan struct{}
+}
+
+func startProgressbarUpdater(bar *progressbar.ProgressBar) *progressbarUpdater {
+	updater := &progressbarUpdater{
+		bar:    bar,
+		update: make(chan int64, 1),
+		close:  make(chan struct{}),
+	}
+	go updater.run()
+	return updater
+}
+
+func (p *progressbarUpdater) add(num int64) {
+	for {
+		select {
+		case p.update <- num:
+			return
+		case oldNum := <-p.update:
+			num += oldNum
+		}
+	}
+}
+
+func (p *progressbarUpdater) run() {
+	for {
+		select {
+		case num := <-p.update:
+			p.bar.Add64(num)
+		case <-p.close:
+			return
+		}
+		select {
+		case <-time.After(time.Second / 60):
+		case <-p.close:
+			return
+		}
+	}
+}
+
+func (p *progressbarUpdater) stop() {
+	if p.close == nil {
+		return
+	}
+	p.close <- struct{}{}
+	p.close = nil
+	select {
+	case num := <-p.update:
+		// Do one last update to make the bar reach 100%.
+		p.bar.Add64(num)
+	default:
+	}
+	if !p.bar.IsFinished() {
+		p.bar.Exit()
+	}
+}
+
 var sshCmd = &cobra.Command{
 	Use:     "ssh --disk=<disk> <target>",
 	Short:   "Installs Metropolis on a Linux system accessible via SSH.",
@@ -136,30 +199,33 @@
 			return err
 		}
 
-		barUploader := func(blob structfs.Blob, targetPath string) {
-			content, err := blob.Open()
-			if err != nil {
-				log.Fatalf("error while uploading %q: %v", targetPath, err)
-			}
-			defer content.Close()
+		log.Println("Uploading files to target host.")
+		totalSize := takeover.Size() + bundle.Size()
+		barUpdater := startProgressbarUpdater(progressbar.DefaultBytes(totalSize))
+		defer barUpdater.stop()
+		conn.SetProgress(barUpdater.add)
 
-			bar := progressbar.DefaultBytes(
-				blob.Size(),
-				targetPath,
-			)
-			defer bar.Close()
-
-			proxyReader := progressbar.NewReader(content, bar)
-			defer proxyReader.Close()
-
-			if err := conn.UploadExecutable(ctx, targetPath, &proxyReader); err != nil {
-				log.Fatalf("error while uploading %q: %v", targetPath, err)
-			}
+		takeoverContent, err := takeover.Open()
+		if err != nil {
+			return err
+		}
+		err = conn.UploadExecutable(ctx, takeoverTargetPath, takeoverContent)
+		takeoverContent.Close()
+		if err != nil {
+			return fmt.Errorf("error while uploading %q: %w", takeoverTargetPath, err)
 		}
 
-		log.Println("Uploading required binaries to target host.")
-		barUploader(takeover, takeoverTargetPath)
-		barUploader(bundle, bundleTargetPath)
+		bundleContent, err := bundle.Open()
+		if err != nil {
+			return err
+		}
+		err = conn.Upload(ctx, bundleTargetPath, bundleContent)
+		bundleContent.Close()
+		if err != nil {
+			return fmt.Errorf("error while uploading %q: %w", bundleTargetPath, err)
+		}
+
+		barUpdater.stop()
 
 		// Start the agent and wait for the agent's output to arrive.
 		log.Printf("Starting the takeover executable at path %q.", takeoverTargetPath)
diff --git a/osbase/net/sshtakeover/sshtakeover.go b/osbase/net/sshtakeover/sshtakeover.go
index b8fd8b2..fdb63f3 100644
--- a/osbase/net/sshtakeover/sshtakeover.go
+++ b/osbase/net/sshtakeover/sshtakeover.go
@@ -18,8 +18,9 @@
 )
 
 type Client struct {
-	cl *ssh.Client
-	sc *sftp.Client
+	cl       *ssh.Client
+	sc       *sftp.Client
+	progress func(int64)
 }
 
 // Dial starts an ssh client connection.
@@ -79,21 +80,26 @@
 	}
 }
 
-type contextReader struct {
-	r   io.Reader
-	ctx context.Context
+type wrappedReader struct {
+	r        io.Reader
+	ctx      context.Context
+	progress func(int64)
 }
 
-func (r *contextReader) Read(p []byte) (n int, err error) {
+func (r *wrappedReader) Read(p []byte) (n int, err error) {
 	if r.ctx.Err() != nil {
 		return 0, r.ctx.Err()
 	}
-	return r.r.Read(p)
+	n, err = r.r.Read(p)
+	if r.progress != nil {
+		r.progress(int64(n))
+	}
+	return
 }
 
 // Upload a given blob to a targetPath on the system.
 func (p *Client) Upload(ctx context.Context, targetPath string, src io.Reader) error {
-	src = &contextReader{r: src, ctx: ctx}
+	src = &wrappedReader{r: src, ctx: ctx, progress: p.progress}
 
 	df, err := p.sc.Create(targetPath)
 	if err != nil {
@@ -119,6 +125,12 @@
 	return nil
 }
 
+// SetProgress sets a callback which will be called repeatedly during uploads
+// with a number of bytes that have been read.
+func (p *Client) SetProgress(callback func(int64)) {
+	p.progress = callback
+}
+
 func (p *Client) Close() error {
 	scErr := p.sc.Close()
 	clErr := p.cl.Close()