metropolis/cli/metroctl: refactor to use RunE instead of log.Fatal
Change-Id: Id5ca65980816e1715a8f08afcdf712292117012a
Reviewed-on: https://review.monogon.dev/c/monogon/+/3441
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
Tested-by: Jenkins CI
diff --git a/metropolis/cli/metroctl/cmd_certs.go b/metropolis/cli/metroctl/cmd_certs.go
index 8475546..9aa0953 100644
--- a/metropolis/cli/metroctl/cmd_certs.go
+++ b/metropolis/cli/metroctl/cmd_certs.go
@@ -4,6 +4,7 @@
"crypto/x509"
"encoding/pem"
"errors"
+ "fmt"
"log"
"os"
@@ -27,10 +28,10 @@
Short: "Exports certificates for use in other programs",
Use: "export",
Example: "metroctl cert export",
- Run: func(cmd *cobra.Command, args []string) {
+ RunE: func(cmd *cobra.Command, args []string) error {
ocert, opkey, err := core.GetOwnerCredentials(flags.configPath)
if errors.Is(err, core.ErrNoCredentials) {
- log.Fatalf("You have to take ownership of the cluster first: %v", err)
+ return fmt.Errorf("you have to take ownership of the cluster first: %w", err)
}
pkcs8Key, err := x509.MarshalPKCS8PrivateKey(opkey)
@@ -40,13 +41,15 @@
}
if err := os.WriteFile("owner.crt", pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: ocert.Raw}), 0755); err != nil {
- log.Fatal(err)
+ return err
}
if err := os.WriteFile("owner.key", pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: pkcs8Key}), 0755); err != nil {
- log.Fatal(err)
+ return err
}
+
log.Println("Wrote files to current dir: cert.pem, key.pem")
+ return nil
},
Args: PrintUsageOnWrongArgs(cobra.NoArgs),
}
diff --git a/metropolis/cli/metroctl/cmd_install.go b/metropolis/cli/metroctl/cmd_install.go
index cf43747..fff13e9 100644
--- a/metropolis/cli/metroctl/cmd_install.go
+++ b/metropolis/cli/metroctl/cmd_install.go
@@ -5,7 +5,7 @@
"context"
"crypto/ed25519"
_ "embed"
- "log"
+ "fmt"
"os"
"os/signal"
@@ -35,11 +35,11 @@
var bootstrapStorageSecurityPolicy = flagdefs.StorageSecurityPolicyPflag(installCmd.PersistentFlags(), "bootstrap-storage-security", cpb.ClusterConfiguration_STORAGE_SECURITY_POLICY_NEEDS_ENCRYPTION_AND_AUTHENTICATION, "Storage security policy to set on cluster")
var bundlePath = installCmd.PersistentFlags().StringP("bundle", "b", "", "Path to the Metropolis bundle to be installed")
-func makeNodeParams() *api.NodeParameters {
+func makeNodeParams() (*api.NodeParameters, error) {
ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt)
if err := os.MkdirAll(flags.configPath, 0700); err != nil && !os.IsExist(err) {
- log.Fatalf("Failed to create config directory: %v", err)
+ return nil, fmt.Errorf("failed to create config directory: %w", err)
}
var params *api.NodeParameters
@@ -47,7 +47,7 @@
// TODO(lorenz): Have a key management story for this
priv, err := core.GetOrMakeOwnerKey(flags.configPath)
if err != nil {
- log.Fatalf("Failed to generate or get owner key: %v", err)
+ return nil, fmt.Errorf("failed to generate or get owner key: %w", err)
}
pub := priv.Public().(ed25519.PublicKey)
params = &api.NodeParameters{
@@ -62,15 +62,18 @@
},
}
} else {
- cc := dialAuthenticated(ctx)
+ cc, err := dialAuthenticated(ctx)
+ if err != nil {
+ return nil, fmt.Errorf("while dialing node: %w", err)
+ }
mgmt := api.NewManagementClient(cc)
resT, err := mgmt.GetRegisterTicket(ctx, &api.GetRegisterTicketRequest{})
if err != nil {
- log.Fatalf("While receiving register ticket: %v", err)
+ return nil, fmt.Errorf("while receiving register ticket: %w", err)
}
resI, err := mgmt.GetClusterInfo(ctx, &api.GetClusterInfoRequest{})
if err != nil {
- log.Fatalf("While receiving cluster directory: %v", err)
+ return nil, fmt.Errorf("while receiving cluster directory: %w", err)
}
params = &api.NodeParameters{
@@ -83,28 +86,28 @@
},
}
}
- return params
+ return params, nil
}
-func external(name, datafilePath string, flag *string) fat32.SizedReader {
+func external(name, datafilePath string, flag *string) (fat32.SizedReader, error) {
if flag == nil || *flag == "" {
rPath, err := runfiles.Rlocation(datafilePath)
if err != nil {
- log.Fatalf("No %s specified", name)
+ return nil, fmt.Errorf("no %s specified", name)
}
df, err := os.ReadFile(rPath)
if err != nil {
- log.Fatalf("Cant read file: %v", err)
+ return nil, fmt.Errorf("can't read file: %w", err)
}
- return bytes.NewReader(df)
+ return bytes.NewReader(df), nil
}
f, err := blkio.NewFileReader(*flag)
if err != nil {
- log.Fatalf("Failed to open specified %s: %v", name, err)
+ return nil, fmt.Errorf("failed to open specified %s: %w", name, err)
}
- return f
+ return f, nil
}
func init() {
diff --git a/metropolis/cli/metroctl/cmd_install_ssh.go b/metropolis/cli/metroctl/cmd_install_ssh.go
index 56bab4c..8c6f6ad 100644
--- a/metropolis/cli/metroctl/cmd_install_ssh.go
+++ b/metropolis/cli/metroctl/cmd_install_ssh.go
@@ -30,7 +30,129 @@
Short: "Installs Metropolis on a Linux system accessible via SSH.",
Example: "metroctl install --bundle=metropolis-v0.1.zip --takeover=takeover ssh --disk=nvme0n1 root@ssh-enabled-server.example",
Args: cobra.ExactArgs(1), // One positional argument: the target
- RunE: doSSH,
+ RunE: func(cmd *cobra.Command, args []string) error {
+ user, address, err := parseSSHAddr(args[0])
+ if err != nil {
+ return err
+ }
+
+ diskName, err := cmd.Flags().GetString("disk")
+ if err != nil {
+ return err
+ }
+
+ if len(diskName) == 0 {
+ return fmt.Errorf("flag disk is required")
+ }
+
+ var authMethods []xssh.AuthMethod
+ if aconn, err := net.Dial("unix", os.Getenv("SSH_AUTH_SOCK")); err == nil {
+ defer aconn.Close()
+ a := agent.NewClient(aconn)
+ authMethods = append(authMethods, xssh.PublicKeysCallback(a.Signers))
+ } else {
+ log.Printf("error while establishing ssh agent connection: %v", err)
+ log.Println("ssh agent authentication will not be available.")
+ }
+
+ if term.IsTerminal(int(os.Stdin.Fd())) {
+ authMethods = append(authMethods,
+ xssh.PasswordCallback(func() (string, error) {
+ fmt.Printf("%s@%s's password: ", user, address)
+ b, err := terminal.ReadPassword(syscall.Stdin)
+ if err != nil {
+ return "", err
+ }
+ fmt.Println()
+ return string(b), nil
+ }),
+ xssh.KeyboardInteractive(func(name, instruction string, questions []string, echos []bool) ([]string, error) {
+ answers := make([]string, 0, len(questions))
+ for i, q := range questions {
+ fmt.Print(q)
+ if echos[i] {
+ if _, err := fmt.Scan(&questions[i]); err != nil {
+ return nil, err
+ }
+ } else {
+ b, err := terminal.ReadPassword(syscall.Stdin)
+ if err != nil {
+ return nil, err
+ }
+ fmt.Println()
+ answers = append(answers, string(b))
+ }
+ }
+ return answers, nil
+ }),
+ )
+ } else {
+ log.Println("stdin is not interactive. password authentication will not be available.")
+ }
+
+ cl := ssh.DirectClient{
+ Username: user,
+ AuthMethods: authMethods,
+ }
+
+ ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt)
+ conn, err := cl.Dial(ctx, address, 5*time.Second)
+ if err != nil {
+ return fmt.Errorf("error while establishing ssh connection: %w", err)
+ }
+
+ params, err := makeNodeParams()
+ if err != nil {
+ return err
+ }
+ rawParams, err := proto.Marshal(params)
+ if err != nil {
+ return fmt.Errorf("error while marshaling node params: %w", err)
+ }
+
+ const takeoverTargetPath = "/root/takeover"
+ const bundleTargetPath = "/root/bundle.zip"
+ bundle, err := external("bundle", "_main/metropolis/node/bundle.zip", bundlePath)
+ if err != nil {
+ return err
+ }
+ takeover, err := external("takeover", "_main/metropolis/cli/takeover/takeover_bin_/takeover_bin", bundlePath)
+ if err != nil {
+ return err
+ }
+
+ barUploader := func(r fat32.SizedReader, targetPath string) {
+ bar := progressbar.DefaultBytes(
+ r.Size(),
+ targetPath,
+ )
+ defer bar.Close()
+
+ proxyReader := progressbar.NewReader(r, bar)
+ defer proxyReader.Close()
+
+ if err := conn.Upload(ctx, targetPath, &proxyReader); err != nil {
+ log.Fatalf("error while uploading %q: %v", targetPath, err)
+ }
+ }
+
+ log.Println("Uploading required binaries to target host.")
+ barUploader(takeover, takeoverTargetPath)
+ barUploader(bundle, bundleTargetPath)
+
+ // Start the agent and wait for the agent's output to arrive.
+ log.Printf("Starting the takeover executable at path %q.", takeoverTargetPath)
+ _, stderr, err := conn.Execute(ctx, fmt.Sprintf("%s -disk %s", takeoverTargetPath, diskName), rawParams)
+ stderrStr := strings.TrimSpace(string(stderr))
+ if stderrStr != "" {
+ log.Printf("Agent stderr: %q", stderrStr)
+ }
+ if err != nil {
+ return fmt.Errorf("while starting the takeover executable: %w", err)
+ }
+
+ return nil
+ },
}
func parseAddrOptionalPort(addr string) (string, string, error) {
@@ -79,121 +201,6 @@
return user, net.JoinHostPort(addr, port), nil
}
-func doSSH(cmd *cobra.Command, args []string) error {
- user, address, err := parseSSHAddr(args[0])
- if err != nil {
- return err
- }
-
- diskName, err := cmd.Flags().GetString("disk")
- if err != nil {
- return err
- }
-
- if len(diskName) == 0 {
- return fmt.Errorf("flag disk is required")
- }
-
- var authMethods []xssh.AuthMethod
- if aconn, err := net.Dial("unix", os.Getenv("SSH_AUTH_SOCK")); err == nil {
- defer aconn.Close()
- a := agent.NewClient(aconn)
- authMethods = append(authMethods, xssh.PublicKeysCallback(a.Signers))
- } else {
- log.Printf("error while establishing ssh agent connection: %v", err)
- log.Println("ssh agent authentication will not be available.")
- }
-
- if term.IsTerminal(int(os.Stdin.Fd())) {
- authMethods = append(authMethods,
- xssh.PasswordCallback(func() (string, error) {
- fmt.Printf("%s@%s's password: ", user, address)
- b, err := terminal.ReadPassword(syscall.Stdin)
- if err != nil {
- return "", err
- }
- fmt.Println()
- return string(b), nil
- }),
- xssh.KeyboardInteractive(func(name, instruction string, questions []string, echos []bool) ([]string, error) {
- answers := make([]string, 0, len(questions))
- for i, q := range questions {
- fmt.Print(q)
- if echos[i] {
- if _, err := fmt.Scan(&questions[i]); err != nil {
- return nil, err
- }
- } else {
- b, err := terminal.ReadPassword(syscall.Stdin)
- if err != nil {
- return nil, err
- }
- fmt.Println()
- answers = append(answers, string(b))
- }
- }
- return answers, nil
- }),
- )
- } else {
- log.Println("stdin is not interactive. password authentication will not be available.")
- }
-
- cl := ssh.DirectClient{
- Username: user,
- AuthMethods: authMethods,
- }
-
- ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt)
- conn, err := cl.Dial(ctx, address, 5*time.Second)
- if err != nil {
- return fmt.Errorf("error while establishing ssh connection: %w", err)
- }
-
- params := makeNodeParams()
- rawParams, err := proto.Marshal(params)
- if err != nil {
- return fmt.Errorf("error while marshaling node params: %w", err)
- }
-
- const takeoverTargetPath = "/root/takeover"
- const bundleTargetPath = "/root/bundle.zip"
- bundle := external("bundle", "_main/metropolis/node/bundle.zip", bundlePath)
- takeover := external("takeover", "_main/metropolis/cli/takeover/takeover_bin_/takeover_bin", bundlePath)
-
- barUploader := func(r fat32.SizedReader, targetPath string) {
- bar := progressbar.DefaultBytes(
- r.Size(),
- targetPath,
- )
- defer bar.Close()
-
- proxyReader := progressbar.NewReader(r, bar)
- defer proxyReader.Close()
-
- if err := conn.Upload(ctx, targetPath, &proxyReader); err != nil {
- log.Fatalf("error while uploading %q: %v", targetPath, err)
- }
- }
-
- log.Println("Uploading required binaries to target host.")
- barUploader(takeover, takeoverTargetPath)
- barUploader(bundle, bundleTargetPath)
-
- // Start the agent and wait for the agent's output to arrive.
- log.Printf("Starting the takeover executable at path %q.", takeoverTargetPath)
- _, stderr, err := conn.Execute(ctx, fmt.Sprintf("%s -disk %s", takeoverTargetPath, diskName), rawParams)
- stderrStr := strings.TrimSpace(string(stderr))
- if stderrStr != "" {
- log.Printf("Agent stderr: %q", stderrStr)
- }
- if err != nil {
- return fmt.Errorf("while starting the takeover executable: %w", err)
- }
-
- return nil
-}
-
func init() {
sshCmd.Flags().String("disk", "", "Which disk Metropolis should be installed to")
sshCmd.Flags().String("takeover", "", "Path to the Metropolis takeover binary")
diff --git a/metropolis/cli/metroctl/cmd_install_usb.go b/metropolis/cli/metroctl/cmd_install_usb.go
index fcbcfa2..e1a22a4 100644
--- a/metropolis/cli/metroctl/cmd_install_usb.go
+++ b/metropolis/cli/metroctl/cmd_install_usb.go
@@ -2,6 +2,7 @@
import (
_ "embed"
+ "fmt"
"log"
"github.com/spf13/cobra"
@@ -14,31 +15,39 @@
Short: "Generates a Metropolis installer disk or image.",
Example: "metroctl install --bundle=metropolis-v0.1.zip genusb /dev/sdx",
Args: PrintUsageOnWrongArgs(cobra.ExactArgs(1)), // One positional argument: the target
- Run: doGenUSB,
-}
+ RunE: func(cmd *cobra.Command, args []string) error {
+ params, err := makeNodeParams()
+ if err != nil {
+ return err
+ }
-func doGenUSB(cmd *cobra.Command, args []string) {
- params := makeNodeParams()
+ installerPath, err := cmd.Flags().GetString("installer")
+ if err != nil {
+ return err
+ }
- installerPath, err := cmd.Flags().GetString("installer")
- if err != nil {
- log.Fatal(err)
- }
+ installer, err := external("installer", "_main/metropolis/installer/kernel.efi", &installerPath)
+ if err != nil {
+ return err
+ }
+ bundle, err := external("bundle", "_main/metropolis/node/bundle.zip", bundlePath)
+ if err != nil {
+ return err
+ }
- installer := external("installer", "_main/metropolis/installer/kernel.efi", &installerPath)
- bundle := external("bundle", "_main/metropolis/node/bundle.zip", bundlePath)
+ installerImageArgs := core.MakeInstallerImageArgs{
+ TargetPath: args[0],
+ Installer: installer,
+ NodeParams: params,
+ Bundle: bundle,
+ }
- installerImageArgs := core.MakeInstallerImageArgs{
- TargetPath: args[0],
- Installer: installer,
- NodeParams: params,
- Bundle: bundle,
- }
-
- log.Printf("Generating installer image (this can take a while, see issues/92).")
- if err := core.MakeInstallerImage(installerImageArgs); err != nil {
- log.Fatalf("Failed to create installer: %v", err)
- }
+ log.Printf("Generating installer image (this can take a while, see issues/92).")
+ if err := core.MakeInstallerImage(installerImageArgs); err != nil {
+ return fmt.Errorf("failed to create installer: %w", err)
+ }
+ return nil
+ },
}
func init() {
diff --git a/metropolis/cli/metroctl/cmd_k8s_configure.go b/metropolis/cli/metroctl/cmd_k8s_configure.go
index cd90416..5ce4032 100644
--- a/metropolis/cli/metroctl/cmd_k8s_configure.go
+++ b/metropolis/cli/metroctl/cmd_k8s_configure.go
@@ -2,6 +2,7 @@
import (
"context"
+ "fmt"
"log"
"os"
"os/exec"
@@ -24,36 +25,35 @@
to connect to a Metropolis cluster. A cluster endpoint must be provided with the
--endpoints parameter.`,
Args: PrintUsageOnWrongArgs(cobra.ExactArgs(0)),
- Run: doK8sConfigure,
-}
-
-func doK8sConfigure(cmd *cobra.Command, _ []string) {
- ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt)
- if len(flags.clusterEndpoints) < 1 {
- log.Fatalf("k8s configure requires at least one cluster endpoint to be provided with the --endpoints parameter.")
- }
-
- contextName, err := cmd.Flags().GetString("context")
- if err != nil || contextName == "" {
- log.Fatalf("k8s configure requires a valid context name to be provided with the --context parameter.")
- }
-
- // If the user has metroctl in their path, use the metroctl from path as
- // a credential plugin. Otherwise use the path to the currently-running
- // metroctl.
- metroctlPath := "metroctl"
- if _, err := exec.LookPath("metroctl"); err != nil {
- metroctlPath, err = os.Executable()
- if err != nil {
- log.Fatalf("Failed to create kubectl entry as metroctl is neither in PATH nor can its absolute path be determined: %v", err)
+ RunE: func(cmd *cobra.Command, _ []string) error {
+ ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt)
+ if len(flags.clusterEndpoints) < 1 {
+ return fmt.Errorf("k8s configure requires at least one cluster endpoint to be provided with the --endpoints parameter")
}
- }
- // TODO(q3k, issues/144): this only works as long as all nodes are kubernetes controller
- // nodes. This won't be the case for too long. Figure this out.
- if err := core.InstallKubeletConfig(ctx, metroctlPath, connectOptions(), contextName, flags.clusterEndpoints[0]); err != nil {
- log.Fatalf("Failed to install metroctl/k8s integration: %v", err)
- }
- log.Printf("Success! kubeconfig is set up. You can now run kubectl --context=%s ... to access the Kubernetes cluster.", contextName)
+
+ contextName, err := cmd.Flags().GetString("context")
+ if err != nil || contextName == "" {
+ return fmt.Errorf("k8s configure requires a valid context name to be provided with the --context parameter")
+ }
+
+ // If the user has metroctl in their path, use the metroctl from path as
+ // a credential plugin. Otherwise use the path to the currently-running
+ // metroctl.
+ metroctlPath := "metroctl"
+ if _, err := exec.LookPath("metroctl"); err != nil {
+ metroctlPath, err = os.Executable()
+ if err != nil {
+ return fmt.Errorf("failed to create kubectl entry as metroctl is neither in PATH nor can its absolute path be determined: %w", err)
+ }
+ }
+ // TODO(q3k, issues/144): this only works as long as all nodes are kubernetes controller
+ // nodes. This won't be the case for too long. Figure this out.
+ if err := core.InstallKubeletConfig(ctx, metroctlPath, connectOptions(), contextName, flags.clusterEndpoints[0]); err != nil {
+ return fmt.Errorf("failed to install metroctl/k8s integration: %w", err)
+ }
+ log.Printf("Success! kubeconfig is set up. You can now run kubectl --context=%s ... to access the Kubernetes cluster.", contextName)
+ return nil
+ },
}
func init() {
diff --git a/metropolis/cli/metroctl/cmd_k8scredplugin.go b/metropolis/cli/metroctl/cmd_k8scredplugin.go
index 8d84a9e..50bf474 100644
--- a/metropolis/cli/metroctl/cmd_k8scredplugin.go
+++ b/metropolis/cli/metroctl/cmd_k8scredplugin.go
@@ -5,7 +5,7 @@
"encoding/json"
"encoding/pem"
"errors"
- "log"
+ "fmt"
"os"
"github.com/spf13/cobra"
@@ -23,37 +23,36 @@
cluster. This should never be directly called by end users.`,
Args: PrintUsageOnWrongArgs(cobra.ExactArgs(0)),
Hidden: true,
- Run: doK8sCredPlugin,
-}
+ RunE: func(cmd *cobra.Command, args []string) error {
+ cert, key, err := core.GetOwnerCredentials(flags.configPath)
+ if errors.Is(err, core.ErrNoCredentials) {
+ return fmt.Errorf("no credentials found on your machine")
+ }
+ if err != nil {
+ return fmt.Errorf("failed to get Metropolis credentials: %w", err)
+ }
-func doK8sCredPlugin(cmd *cobra.Command, args []string) {
- cert, key, err := core.GetOwnerCredentials(flags.configPath)
- if errors.Is(err, core.ErrNoCredentials) {
- log.Fatal("No credentials found on your machine")
- }
- if err != nil {
- log.Fatalf("failed to get Metropolis credentials: %v", err)
- }
+ pkcs8Key, err := x509.MarshalPKCS8PrivateKey(key)
+ if err != nil {
+ // We explicitly pass an Ed25519 private key in, so this can't happen
+ panic(err)
+ }
- pkcs8Key, err := x509.MarshalPKCS8PrivateKey(key)
- if err != nil {
- // We explicitly pass an Ed25519 private key in, so this can't happen
- panic(err)
- }
-
- cred := clientauthentication.ExecCredential{
- TypeMeta: metav1.TypeMeta{
- APIVersion: clientauthentication.SchemeGroupVersion.String(),
- Kind: "ExecCredential",
- },
- Status: &clientauthentication.ExecCredentialStatus{
- ClientCertificateData: string(pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: cert.Raw})),
- ClientKeyData: string(pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: pkcs8Key})),
- },
- }
- if err := json.NewEncoder(os.Stdout).Encode(cred); err != nil {
- log.Fatalf("failed to encode ExecCredential: %v", err)
- }
+ cred := clientauthentication.ExecCredential{
+ TypeMeta: metav1.TypeMeta{
+ APIVersion: clientauthentication.SchemeGroupVersion.String(),
+ Kind: "ExecCredential",
+ },
+ Status: &clientauthentication.ExecCredentialStatus{
+ ClientCertificateData: string(pem.EncodeToMemory(&pem.Block{Type: "CERTIFICATE", Bytes: cert.Raw})),
+ ClientKeyData: string(pem.EncodeToMemory(&pem.Block{Type: "PRIVATE KEY", Bytes: pkcs8Key})),
+ },
+ }
+ if err := json.NewEncoder(os.Stdout).Encode(cred); err != nil {
+ return fmt.Errorf("failed to encode ExecCredential: %w", err)
+ }
+ return nil
+ },
}
func init() {
diff --git a/metropolis/cli/metroctl/cmd_node.go b/metropolis/cli/metroctl/cmd_node.go
index 30dd965..941d6f6 100644
--- a/metropolis/cli/metroctl/cmd_node.go
+++ b/metropolis/cli/metroctl/cmd_node.go
@@ -2,7 +2,6 @@
import (
"context"
- "errors"
"fmt"
"io"
"log"
@@ -31,14 +30,17 @@
Short: "Describes cluster nodes.",
Use: "describe [node-id] [--filter] [--output] [--format] [--columns]",
Example: "metroctl node describe metropolis-c556e31c3fa2bf0a36e9ccb9fd5d6056",
- Run: func(cmd *cobra.Command, args []string) {
+ RunE: func(cmd *cobra.Command, args []string) error {
ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt)
- cc := dialAuthenticated(ctx)
+ cc, err := dialAuthenticated(ctx)
+ if err != nil {
+ return fmt.Errorf("while dialing node: %w", err)
+ }
mgmt := apb.NewManagementClient(cc)
nodes, err := core.GetNodes(ctx, mgmt, flags.filter)
if err != nil {
- log.Fatalf("While calling Management.GetNodes: %v", err)
+ return fmt.Errorf("while calling Management.GetNodes: %w", err)
}
var columns map[string]bool
@@ -50,7 +52,8 @@
columns[p] = true
}
}
- printNodes(nodes, args, columns)
+
+ return printNodes(nodes, args, columns)
},
Args: PrintUsageOnWrongArgs(cobra.ArbitraryArgs),
}
@@ -59,17 +62,20 @@
Short: "Lists cluster nodes.",
Use: "list [node-id] [--filter] [--output] [--format]",
Example: "metroctl node list --filter node.status.external_address==\"10.8.0.2\"",
- Run: func(cmd *cobra.Command, args []string) {
+ RunE: func(cmd *cobra.Command, args []string) error {
ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt)
- cc := dialAuthenticated(ctx)
+ cc, err := dialAuthenticated(ctx)
+ if err != nil {
+ return fmt.Errorf("while dialing node: %w", err)
+ }
mgmt := apb.NewManagementClient(cc)
nodes, err := core.GetNodes(ctx, mgmt, flags.filter)
if err != nil {
- log.Fatalf("While calling Management.GetNodes: %v", err)
+ return fmt.Errorf("while calling Management.GetNodes: %w", err)
}
- printNodes(nodes, args, map[string]bool{"node id": true})
+ return printNodes(nodes, args, map[string]bool{"node id": true})
},
Args: PrintUsageOnWrongArgs(cobra.ArbitraryArgs),
}
@@ -110,7 +116,7 @@
return err
}
if maxUnavailable == 0 {
- return errors.New("unable to update notes with max-unavailable set to zero")
+ return fmt.Errorf("unable to update notes with max-unavailable set to zero")
}
unavailableSemaphore := semaphore.NewWeighted(int64(maxUnavailable))
@@ -121,7 +127,11 @@
return fmt.Errorf("could not get CA certificate: %w", err)
}
- mgmt := apb.NewManagementClient(dialAuthenticated(ctx))
+ conn, err := dialAuthenticated(ctx)
+ if err != nil {
+ return err
+ }
+ mgmt := apb.NewManagementClient(conn)
nodes, err := core.GetNodes(ctx, mgmt, "")
if err != nil {
@@ -169,11 +179,14 @@
go func(n *apb.Node) {
defer wg.Done()
- cc := dialAuthenticatedNode(ctx, n.Id, n.Status.ExternalAddress, cacert)
+ cc, err := dialAuthenticatedNode(ctx, n.Id, n.Status.ExternalAddress, cacert)
+ if err != nil {
+ log.Fatalf("failed to dial node: %v", err)
+ }
nodeMgmt := apb.NewNodeManagementClient(cc)
log.Printf("sending update request to: %s (%s)", n.Id, n.Status.ExternalAddress)
start := time.Now()
- _, err := nodeMgmt.UpdateNode(ctx, updateReq)
+ _, err = nodeMgmt.UpdateNode(ctx, updateReq)
if err != nil {
log.Printf("update request to node %s failed: %v", n.Id, err)
// A failed UpdateNode does not mean that the node is now unavailable as it
@@ -242,7 +255,11 @@
}
ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt)
- mgmt := apb.NewManagementClient(dialAuthenticated(ctx))
+ conn, err := dialAuthenticated(ctx)
+ if err != nil {
+ return err
+ }
+ mgmt := apb.NewManagementClient(conn)
nodes, err := core.GetNodes(ctx, mgmt, fmt.Sprintf("node.id==%q", args[0]))
if err != nil {
@@ -287,7 +304,10 @@
func dialNode(ctx context.Context, node string) (apb.NodeManagementClient, error) {
// First connect to the main management service and figure out the node's IP
// address.
- cc := dialAuthenticated(ctx)
+ cc, err := dialAuthenticated(ctx)
+ if err != nil {
+ return nil, fmt.Errorf("while dialing node: %w", err)
+ }
mgmt := apb.NewManagementClient(cc)
nodes, err := core.GetNodes(ctx, mgmt, fmt.Sprintf("node.id == %q", node))
if err != nil {
@@ -311,7 +331,10 @@
}
// Dial the actual node at its management port.
- cl := dialAuthenticatedNode(ctx, n.Id, n.Status.ExternalAddress, cacert)
+ cl, err := dialAuthenticatedNode(ctx, n.Id, n.Status.ExternalAddress, cacert)
+ if err != nil {
+ return nil, fmt.Errorf("while dialing node: %w", err)
+ }
nmgmt := apb.NewNodeManagementClient(cl)
return nmgmt, nil
}
@@ -348,10 +371,10 @@
}
if kexecFlag && firmwareFlag {
- return errors.New("--kexec cannot be used with --firmware as firmware is not involved when using kexec")
+ return fmt.Errorf("--kexec cannot be used with --firmware as firmware is not involved when using kexec")
}
if firmwareFlag && rollbackFlag {
- return errors.New("--firmware cannot be used with --rollback as the next boot won't be into the OS")
+ return fmt.Errorf("--firmware cannot be used with --rollback as the next boot won't be into the OS")
}
var req apb.RebootRequest
if kexecFlag {
@@ -374,7 +397,7 @@
if _, err := nmgmt.Reboot(ctx, &req); err != nil {
return fmt.Errorf("reboot RPC failed: %w", err)
}
- fmt.Printf("Node %v is being rebooted", args[0])
+ log.Printf("Node %v is being rebooted", args[0])
return nil
},
@@ -398,7 +421,7 @@
}); err != nil {
return fmt.Errorf("reboot RPC failed: %w", err)
}
- fmt.Printf("Node %v is being powered off", args[0])
+ log.Printf("Node %v is being powered off", args[0])
return nil
},
@@ -426,12 +449,12 @@
rootCmd.AddCommand(nodeCmd)
}
-func printNodes(nodes []*apb.Node, args []string, onlyColumns map[string]bool) {
+func printNodes(nodes []*apb.Node, args []string, onlyColumns map[string]bool) error {
o := io.WriteCloser(os.Stdout)
if flags.output != "" {
of, err := os.Create(flags.output)
if err != nil {
- log.Fatalf("Couldn't create the output file at %s: %v", flags.output, err)
+ return fmt.Errorf("couldn't create the output file at %s: %w", flags.output, err)
}
o = of
}
@@ -456,4 +479,5 @@
}
t.Print(o, onlyColumns)
+ return nil
}
diff --git a/metropolis/cli/metroctl/cmd_node_approve.go b/metropolis/cli/metroctl/cmd_node_approve.go
index a70f6b3..f935bbc 100644
--- a/metropolis/cli/metroctl/cmd_node_approve.go
+++ b/metropolis/cli/metroctl/cmd_node_approve.go
@@ -17,7 +17,50 @@
Short: "Approves a candidate node, if specified; lists nodes pending approval otherwise.",
Use: "approve [node-id]",
Args: PrintUsageOnWrongArgs(cobra.MaximumNArgs(1)), // One positional argument: node ID
- Run: doApprove,
+ RunE: func(cmd *cobra.Command, args []string) error {
+ ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt)
+ cc, err := dialAuthenticated(ctx)
+ if err != nil {
+ return fmt.Errorf("while dialing node: %w", err)
+ }
+ mgmt := api.NewManagementClient(cc)
+
+ // Get a list of all nodes pending approval by calling Management.GetNodes.
+ // We need this list regardless of whether we're actually approving nodes, or
+ // just listing them.
+ nodes, err := core.GetNodes(ctx, mgmt, "node.state == NODE_STATE_NEW")
+ if err != nil {
+ log.Fatalf("While fetching a list of nodes pending approval: %v", err)
+ }
+
+ if len(args) == 0 {
+ // If no id was given, just list the nodes pending approval.
+ if len(nodes) != 0 {
+ for _, n := range nodes {
+ fmt.Println(n.Id)
+ }
+ } else {
+ log.Print("There are no nodes pending approval at this time.")
+ }
+ } else {
+ // Otherwise, try to approve the nodes matching the supplied ids.
+ for _, tgtNodeId := range args {
+ n := nodeById(nodes, tgtNodeId)
+ if n == nil {
+ return fmt.Errorf("couldn't find a new node matching id %s", tgtNodeId)
+ }
+ // nolint:SA5011
+ _, err := mgmt.ApproveNode(ctx, &api.ApproveNodeRequest{
+ Pubkey: n.Pubkey,
+ })
+ if err != nil {
+ return fmt.Errorf("while approving node %s: %w", tgtNodeId, err)
+ }
+ log.Printf("Approved node %s.", tgtNodeId)
+ }
+ }
+ return nil
+ },
}
func init() {
@@ -33,44 +76,3 @@
}
return nil
}
-
-func doApprove(cmd *cobra.Command, args []string) {
- ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt)
- cc := dialAuthenticated(ctx)
- mgmt := api.NewManagementClient(cc)
-
- // Get a list of all nodes pending approval by calling Management.GetNodes.
- // We need this list regardless of whether we're actually approving nodes, or
- // just listing them.
- nodes, err := core.GetNodes(ctx, mgmt, "node.state == NODE_STATE_NEW")
- if err != nil {
- log.Fatalf("While fetching a list of nodes pending approval: %v", err)
- }
-
- if len(args) == 0 {
- // If no id was given, just list the nodes pending approval.
- if len(nodes) != 0 {
- for _, n := range nodes {
- fmt.Println(n.Id)
- }
- } else {
- log.Print("There are no nodes pending approval at this time.")
- }
- } else {
- // Otherwise, try to approve the nodes matching the supplied ids.
- for _, tgtNodeId := range args {
- n := nodeById(nodes, tgtNodeId)
- if n == nil {
- log.Fatalf("Couldn't find a new node matching id %s", tgtNodeId)
- }
- // nolint:SA5011
- _, err := mgmt.ApproveNode(ctx, &api.ApproveNodeRequest{
- Pubkey: n.Pubkey,
- })
- if err != nil {
- log.Fatalf("While approving node %s: %v", tgtNodeId, err)
- }
- log.Printf("Approved node %s.", tgtNodeId)
- }
- }
-}
diff --git a/metropolis/cli/metroctl/cmd_node_logs.go b/metropolis/cli/metroctl/cmd_node_logs.go
index d21df7b..0b732de 100644
--- a/metropolis/cli/metroctl/cmd_node_logs.go
+++ b/metropolis/cli/metroctl/cmd_node_logs.go
@@ -60,7 +60,10 @@
// First connect to the main management service and figure out the node's IP
// address.
- cc := dialAuthenticated(ctx)
+ cc, err := dialAuthenticated(ctx)
+ if err != nil {
+ return fmt.Errorf("while dialing node: %w", err)
+ }
mgmt := api.NewManagementClient(cc)
nodes, err := core.GetNodes(ctx, mgmt, fmt.Sprintf("node.id == %q", args[0]))
if err != nil {
@@ -85,7 +88,10 @@
fmt.Printf("=== Logs from %s (%s):\n", n.Id, n.Status.ExternalAddress)
// Dial the actual node at its management port.
- cl := dialAuthenticatedNode(ctx, n.Id, n.Status.ExternalAddress, cacert)
+ cl, err := dialAuthenticatedNode(ctx, n.Id, n.Status.ExternalAddress, cacert)
+ if err != nil {
+ return fmt.Errorf("while dialing node: %w", err)
+ }
nmgmt := api.NewNodeManagementClient(cl)
streamMode := api.GetLogsRequest_STREAM_DISABLE
diff --git a/metropolis/cli/metroctl/cmd_node_metrics.go b/metropolis/cli/metroctl/cmd_node_metrics.go
index 0842c24..1572147 100644
--- a/metropolis/cli/metroctl/cmd_node_metrics.go
+++ b/metropolis/cli/metroctl/cmd_node_metrics.go
@@ -45,7 +45,10 @@
// First connect to the main management service and figure out the node's IP
// address.
- cc := dialAuthenticated(ctx)
+ cc, err := dialAuthenticated(ctx)
+ if err != nil {
+ return fmt.Errorf("while dialing node: %w", err)
+ }
mgmt := api.NewManagementClient(cc)
nodes, err := core.GetNodes(ctx, mgmt, fmt.Sprintf("node.id == %q", args[0]))
if err != nil {
@@ -63,8 +66,12 @@
return fmt.Errorf("node has no external address")
}
+ transport, err := newAuthenticatedNodeHTTPTransport(ctx, n.Id)
+ if err != nil {
+ return err
+ }
client := http.Client{
- Transport: newAuthenticatedNodeHTTPTransport(ctx, n.Id),
+ Transport: transport,
}
res, err := client.Get(fmt.Sprintf("https://%s/metrics/%s", net.JoinHostPort(n.Status.ExternalAddress, common.MetricsPort.PortString()), args[1]))
if err != nil {
diff --git a/metropolis/cli/metroctl/cmd_node_set.go b/metropolis/cli/metroctl/cmd_node_set.go
index 59df43e..bdba8a9 100644
--- a/metropolis/cli/metroctl/cmd_node_set.go
+++ b/metropolis/cli/metroctl/cmd_node_set.go
@@ -2,6 +2,7 @@
import (
"context"
+ "fmt"
"log"
"os"
"os/signal"
@@ -26,8 +27,45 @@
Short: "Updates node roles.",
Use: "role <KubernetesController|KubernetesWorker|ConsensusMember> [NodeID, ...]",
Example: "metroctl node add role KubernetesWorker metropolis-25fa5f5e9349381d4a5e9e59de0215e3",
- Args: PrintUsageOnWrongArgs(cobra.ArbitraryArgs),
- Run: doAdd,
+ Args: PrintUsageOnWrongArgs(cobra.MinimumNArgs(2)),
+ RunE: func(cmd *cobra.Command, args []string) error {
+ ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt)
+ cc, err := dialAuthenticated(ctx)
+ if err != nil {
+ return fmt.Errorf("while dialing node: %w", err)
+ }
+ mgmt := api.NewManagementClient(cc)
+
+ role := strings.ToLower(args[0])
+ nodes := args[1:]
+
+ opt := func(v bool) *bool { return &v }
+ for _, node := range nodes {
+ req := &api.UpdateNodeRolesRequest{
+ Node: &api.UpdateNodeRolesRequest_Id{
+ Id: node,
+ },
+ }
+ switch role {
+ case "kubernetescontroller", "kc":
+ req.KubernetesController = opt(true)
+ case "kubernetesworker", "kw":
+ req.KubernetesWorker = opt(true)
+ case "consensusmember", "cm":
+ req.ConsensusMember = opt(true)
+ default:
+ return fmt.Errorf("unknown role: %s", role)
+ }
+
+ _, err := mgmt.UpdateNodeRoles(ctx, req)
+ if err != nil {
+ log.Printf("Couldn't update node \"%s\": %v", node, err)
+ } else {
+ log.Printf("Updated node %s.", node)
+ }
+ }
+ return nil
+ },
}
var removeRoleCmd = &cobra.Command{
@@ -35,7 +73,45 @@
Use: "role <KubernetesController|KubernetesWorker|ConsensusMember> [NodeID, ...]",
Example: "metroctl node remove role KubernetesWorker metropolis-25fa5f5e9349381d4a5e9e59de0215e3",
Args: PrintUsageOnWrongArgs(cobra.ArbitraryArgs),
- Run: doRemove,
+ RunE: func(cmd *cobra.Command, args []string) error {
+ ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt)
+ cc, err := dialAuthenticated(ctx)
+ if err != nil {
+ return fmt.Errorf("while dialing node: %w", err)
+ }
+ mgmt := api.NewManagementClient(cc)
+
+ role := strings.ToLower(args[0])
+ nodes := args[1:]
+
+ opt := func(v bool) *bool { return &v }
+ for _, node := range nodes {
+ req := &api.UpdateNodeRolesRequest{
+ Node: &api.UpdateNodeRolesRequest_Id{
+ Id: node,
+ },
+ }
+
+ switch role {
+ case "kubernetescontroller", "kc":
+ req.KubernetesController = opt(false)
+ case "kubernetesworker", "kw":
+ req.KubernetesWorker = opt(false)
+ case "consensusmember", "cm":
+ req.ConsensusMember = opt(false)
+ default:
+ return fmt.Errorf("unknown role: %s. Must be one of: KubernetesController, KubernetesWorker, ConsensusMember", role)
+ }
+
+ _, err := mgmt.UpdateNodeRoles(ctx, req)
+ if err != nil {
+ log.Printf("Couldn't update node \"%s\": %v", node, err)
+ } else {
+ log.Printf("Updated node %s.", node)
+ }
+ }
+ return nil
+ },
}
func init() {
@@ -45,82 +121,3 @@
removeCmd.AddCommand(removeRoleCmd)
nodeCmd.AddCommand(removeCmd)
}
-
-func doAdd(cmd *cobra.Command, args []string) {
- ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt)
- cc := dialAuthenticated(ctx)
- mgmt := api.NewManagementClient(cc)
-
- if len(args) < 2 {
- log.Fatal("Provide the role parameter together with at least one node ID.")
- }
-
- role := strings.ToLower(args[0])
- nodes := args[1:]
-
- opt := func(v bool) *bool { return &v }
- for _, node := range nodes {
- req := &api.UpdateNodeRolesRequest{
- Node: &api.UpdateNodeRolesRequest_Id{
- Id: node,
- },
- }
- switch role {
- case "kubernetescontroller", "kc":
- req.KubernetesController = opt(true)
- case "kubernetesworker", "kw":
- req.KubernetesWorker = opt(true)
- case "consensusmember", "cm":
- req.ConsensusMember = opt(true)
- default:
- log.Fatalf("Unknown role: %s", role)
- }
-
- _, err := mgmt.UpdateNodeRoles(ctx, req)
- if err != nil {
- log.Printf("Couldn't update node \"%s\": %v", node, err)
- } else {
- log.Printf("Updated node %s.", node)
- }
- }
-}
-
-func doRemove(cmd *cobra.Command, args []string) {
- ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt)
- cc := dialAuthenticated(ctx)
- mgmt := api.NewManagementClient(cc)
-
- if len(args) < 2 {
- log.Fatal("Provide the role parameter together with at least one node ID.")
- }
-
- role := strings.ToLower(args[0])
- nodes := args[1:]
-
- opt := func(v bool) *bool { return &v }
- for _, node := range nodes {
- req := &api.UpdateNodeRolesRequest{
- Node: &api.UpdateNodeRolesRequest_Id{
- Id: node,
- },
- }
-
- switch role {
- case "kubernetescontroller", "kc":
- req.KubernetesController = opt(false)
- case "kubernetesworker", "kw":
- req.KubernetesWorker = opt(false)
- case "consensusmember", "cm":
- req.ConsensusMember = opt(false)
- default:
- log.Fatalf("Unknown role: %s. Must be one of: KubernetesController, KubernetesWorker, ConsensusMember.", role)
- }
-
- _, err := mgmt.UpdateNodeRoles(ctx, req)
- if err != nil {
- log.Printf("Couldn't update node \"%s\": %v", node, err)
- } else {
- log.Printf("Updated node %s.", node)
- }
- }
-}
diff --git a/metropolis/cli/metroctl/cmd_takeownership.go b/metropolis/cli/metroctl/cmd_takeownership.go
index d421f71..a1a205a 100644
--- a/metropolis/cli/metroctl/cmd_takeownership.go
+++ b/metropolis/cli/metroctl/cmd_takeownership.go
@@ -3,6 +3,7 @@
import (
"context"
"errors"
+ "fmt"
"log"
"os"
"os/exec"
@@ -25,78 +26,77 @@
previous invocation of metroctl install on this machine. A single cluster
endpoint must be provided with the --endpoints parameter.`,
Args: PrintUsageOnWrongArgs(cobra.ExactArgs(0)),
- Run: doTakeOwnership,
-}
-
-func doTakeOwnership(cmd *cobra.Command, _ []string) {
- ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt)
- if len(flags.clusterEndpoints) != 1 {
- log.Fatalf("takeownership requires a single cluster endpoint to be provided with the --endpoints parameter.")
- }
-
- contextName, err := cmd.Flags().GetString("context")
- if err != nil || contextName == "" {
- log.Fatalf("takeownership requires a valid context name to be provided with the --context parameter.")
- }
-
- ca, err := core.GetClusterCAWithTOFU(ctx, connectOptions())
- if err != nil {
- log.Fatalf("Could not retrieve cluster CA: %v", err)
- }
-
- // Retrieve the cluster owner's private key, and use it to construct
- // ephemeral credentials. Then, dial the cluster.
- opk, err := core.GetOwnerKey(flags.configPath)
- if errors.Is(err, core.ErrNoCredentials) {
- log.Fatalf("Owner key does not exist. takeownership needs to be executed on the same system that has previously installed the cluster using metroctl install.")
- }
- if err != nil {
- log.Fatalf("Couldn't get owner's key: %v", err)
- }
- opts, err := core.DialOpts(ctx, connectOptions())
- if err != nil {
- log.Fatalf("While configuring cluster dial opts: %v", err)
- }
- creds, err := rpc.NewEphemeralCredentials(opk, rpc.WantRemoteCluster(ca))
- if err != nil {
- log.Fatalf("While generating ephemeral credentials: %v", err)
- }
- opts = append(opts, grpc.WithTransportCredentials(creds))
-
- cc, err := grpc.Dial(resolver.MetropolisControlAddress, opts...)
- if err != nil {
- log.Fatalf("While dialing the cluster: %v", err)
- }
- aaa := apb.NewAAAClient(cc)
-
- ownerCert, err := rpc.RetrieveOwnerCertificate(ctx, aaa, opk)
- if err != nil {
- log.Fatalf("Failed to retrive owner certificate from cluster: %v", err)
- }
-
- if err := core.WriteOwnerCertificate(flags.configPath, ownerCert.Certificate[0]); err != nil {
- log.Printf("Failed to store retrieved owner certificate: %v", err)
- log.Fatalln("Sorry, the cluster has been lost as taking ownership cannot be repeated. Fix the reason the file couldn't be written and reinstall the node.")
- }
- log.Print("Successfully retrieved owner credentials! You now own this cluster. Setting up kubeconfig now...")
-
- // If the user has metroctl in their path, use the metroctl from path as
- // a credential plugin. Otherwise use the path to the currently-running
- // metroctl.
- metroctlPath := "metroctl"
- if _, err := exec.LookPath("metroctl"); err != nil {
- metroctlPath, err = os.Executable()
- if err != nil {
- log.Fatalf("Failed to create kubectl entry as metroctl is neither in PATH nor can its absolute path be determined: %v", err)
+ RunE: func(cmd *cobra.Command, _ []string) error {
+ ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt)
+ if len(flags.clusterEndpoints) != 1 {
+ return fmt.Errorf("takeownership requires a single cluster endpoint to be provided with the --endpoints parameter")
}
- }
- // TODO(q3k, issues/144): this only works as long as all nodes are kubernetes controller
- // nodes. This won't be the case for too long. Figure this out.
- configName := "metroctl"
- if err := core.InstallKubeletConfig(ctx, metroctlPath, connectOptions(), configName, flags.clusterEndpoints[0]); err != nil {
- log.Fatalf("Failed to install metroctl/k8s integration: %v", err)
- }
- log.Printf("Success! kubeconfig is set up. You can now run kubectl --context=%s ... to access the Kubernetes cluster.", configName)
+
+ contextName, err := cmd.Flags().GetString("context")
+ if err != nil || contextName == "" {
+ return fmt.Errorf("takeownership requires a valid context name to be provided with the --context parameter")
+ }
+
+ ca, err := core.GetClusterCAWithTOFU(ctx, connectOptions())
+ if err != nil {
+ return fmt.Errorf("could not retrieve cluster CA: %w", err)
+ }
+
+ // Retrieve the cluster owner's private key, and use it to construct
+ // ephemeral credentials. Then, dial the cluster.
+ opk, err := core.GetOwnerKey(flags.configPath)
+ if errors.Is(err, core.ErrNoCredentials) {
+ return fmt.Errorf("owner key does not exist. takeownership needs to be executed on the same system that has previously installed the cluster using metroctl install")
+ }
+ if err != nil {
+ return fmt.Errorf("couldn't get owner's key: %w", err)
+ }
+ opts, err := core.DialOpts(ctx, connectOptions())
+ if err != nil {
+ return fmt.Errorf("while configuring cluster dial opts: %w", err)
+ }
+ creds, err := rpc.NewEphemeralCredentials(opk, rpc.WantRemoteCluster(ca))
+ if err != nil {
+ return fmt.Errorf("while generating ephemeral credentials: %w", err)
+ }
+ opts = append(opts, grpc.WithTransportCredentials(creds))
+
+ cc, err := grpc.Dial(resolver.MetropolisControlAddress, opts...)
+ if err != nil {
+ return fmt.Errorf("while dialing the cluster: %w", err)
+ }
+ aaa := apb.NewAAAClient(cc)
+
+ ownerCert, err := rpc.RetrieveOwnerCertificate(ctx, aaa, opk)
+ if err != nil {
+ return fmt.Errorf("failed to retrive owner certificate from cluster: %w", err)
+ }
+
+ if err := core.WriteOwnerCertificate(flags.configPath, ownerCert.Certificate[0]); err != nil {
+ log.Printf("Failed to store retrieved owner certificate: %v", err)
+ return fmt.Errorf("sorry, the cluster has been lost as taking ownership cannot be repeated. Fix the reason the file couldn't be written and reinstall the node")
+ }
+ log.Print("Successfully retrieved owner credentials! You now own this cluster. Setting up kubeconfig now...")
+
+ // If the user has metroctl in their path, use the metroctl from path as
+ // a credential plugin. Otherwise use the path to the currently-running
+ // metroctl.
+ metroctlPath := "metroctl"
+ if _, err := exec.LookPath("metroctl"); err != nil {
+ metroctlPath, err = os.Executable()
+ if err != nil {
+ return fmt.Errorf("failed to create kubectl entry as metroctl is neither in PATH nor can its absolute path be determined: %w", err)
+ }
+ }
+ // TODO(q3k, issues/144): this only works as long as all nodes are kubernetes controller
+ // nodes. This won't be the case for too long. Figure this out.
+ configName := "metroctl"
+ if err := core.InstallKubeletConfig(ctx, metroctlPath, connectOptions(), configName, flags.clusterEndpoints[0]); err != nil {
+ return fmt.Errorf("failed to install metroctl/k8s integration: %w", err)
+ }
+ log.Printf("Success! kubeconfig is set up. You can now run kubectl --context=%s ... to access the Kubernetes cluster.", configName)
+ return nil
+ },
}
func init() {
diff --git a/metropolis/cli/metroctl/rpc.go b/metropolis/cli/metroctl/rpc.go
index 512cb98..cfe1b41 100644
--- a/metropolis/cli/metroctl/rpc.go
+++ b/metropolis/cli/metroctl/rpc.go
@@ -5,7 +5,7 @@
"crypto/tls"
"crypto/x509"
"errors"
- "log"
+ "fmt"
"net"
"net/http"
@@ -17,23 +17,23 @@
"source.monogon.dev/metropolis/node/core/rpc/resolver"
)
-func dialAuthenticated(ctx context.Context) *grpc.ClientConn {
+func dialAuthenticated(ctx context.Context) (*grpc.ClientConn, error) {
// Collect credentials, validate command parameters, and try dialing the
// cluster.
ocert, opkey, err := core.GetOwnerCredentials(flags.configPath)
if errors.Is(err, core.ErrNoCredentials) {
- log.Fatalf("You have to take ownership of the cluster first: %v", err)
+ return nil, fmt.Errorf("you have to take ownership of the cluster first: %w", err)
}
if err != nil {
- log.Fatalf("Failed to get owner credentials: %v", err)
+ return nil, fmt.Errorf("failed to get owner credentials: %w", err)
}
if len(flags.clusterEndpoints) == 0 {
- log.Fatal("Please provide at least one cluster endpoint using the --endpoint parameter.")
+ return nil, fmt.Errorf("please provide at least one cluster endpoint using the --endpoint parameter")
}
ca, err := core.GetClusterCAWithTOFU(ctx, connectOptions())
if err != nil {
- log.Fatalf("Failed to get cluster CA: %v", err)
+ return nil, fmt.Errorf("failed to get cluster CA: %w", err)
}
tlsc := tls.Certificate{
@@ -43,39 +43,39 @@
creds := rpc.NewAuthenticatedCredentials(tlsc, rpc.WantRemoteCluster(ca))
opts, err := core.DialOpts(ctx, connectOptions())
if err != nil {
- log.Fatalf("While configuring dial options: %v", err)
+ return nil, fmt.Errorf("while configuring dial options: %w", err)
}
opts = append(opts, grpc.WithTransportCredentials(creds))
cc, err := grpc.Dial(resolver.MetropolisControlAddress, opts...)
if err != nil {
- log.Fatalf("While dialing cluster: %v", err)
+ return nil, fmt.Errorf("while dialing cluster: %w", err)
}
- return cc
+ return cc, nil
}
-func dialAuthenticatedNode(ctx context.Context, id, address string, cacert *x509.Certificate) *grpc.ClientConn {
+func dialAuthenticatedNode(ctx context.Context, id, address string, cacert *x509.Certificate) (*grpc.ClientConn, error) {
// Collect credentials, validate command parameters, and try dialing the
// cluster.
ocert, opkey, err := core.GetOwnerCredentials(flags.configPath)
if errors.Is(err, core.ErrNoCredentials) {
- log.Fatalf("You have to take ownership of the cluster first: %v", err)
+ return nil, fmt.Errorf("you have to take ownership of the cluster first: %w", err)
}
cc, err := core.DialNode(ctx, opkey, ocert, cacert, flags.proxyAddr, id, address)
if err != nil {
- log.Fatalf("While dialing node: %v", err)
+ return nil, fmt.Errorf("while dialing node: %w", err)
}
- return cc
+ return cc, nil
}
-func newAuthenticatedNodeHTTPTransport(ctx context.Context, id string) *http.Transport {
+func newAuthenticatedNodeHTTPTransport(ctx context.Context, id string) (*http.Transport, error) {
cacert, err := core.GetClusterCAWithTOFU(ctx, connectOptions())
if err != nil {
- log.Fatalf("Could not get CA certificate: %v", err)
+ return nil, fmt.Errorf("could not get CA certificate: %w", err)
}
ocert, opkey, err := core.GetOwnerCredentials(flags.configPath)
if errors.Is(err, core.ErrNoCredentials) {
- log.Fatalf("You have to take ownership of the cluster first: %v", err)
+ return nil, fmt.Errorf("you have to take ownership of the cluster first: %w", err)
}
tlsc := tls.Certificate{
Certificate: [][]byte{ocert.Raw},
@@ -88,12 +88,12 @@
if flags.proxyAddr != "" {
dialer, err := proxy.SOCKS5("tcp", flags.proxyAddr, nil, proxy.Direct)
if err != nil {
- log.Fatalf("Failed to create proxy dialer: %v", err)
+ return nil, fmt.Errorf("failed to create proxy dialer: %w", err)
}
transport.DialContext = func(ctx context.Context, network, addr string) (net.Conn, error) {
// TODO(q3k): handle context
return dialer.Dial(network, addr)
}
}
- return transport
+ return transport, nil
}