m/c/metroctl: move common impl to m/c/m/core

This moves the implementation shared between CLI commands into metroctl
core package.

Change-Id: I93624a07356accf3441f02e6ecd8e91d5b71e66e
Reviewed-on: https://review.monogon.dev/c/monogon/+/843
Tested-by: Jenkins CI
Reviewed-by: Sergiusz Bazanski <serge@monogon.tech>
diff --git a/metropolis/cli/metroctl/BUILD.bazel b/metropolis/cli/metroctl/BUILD.bazel
index 4ecdd60..d6a0e47 100644
--- a/metropolis/cli/metroctl/BUILD.bazel
+++ b/metropolis/cli/metroctl/BUILD.bazel
@@ -26,7 +26,6 @@
         "//metropolis/node",
         "//metropolis/node/core/identity",
         "//metropolis/node/core/rpc",
-        "//metropolis/node/core/rpc/resolver",
         "//metropolis/proto/api",
         "@com_github_adrg_xdg//:xdg",
         "@com_github_spf13_cobra//:cobra",
@@ -35,7 +34,6 @@
         "@io_k8s_client_go//tools/clientcmd",
         "@io_k8s_client_go//tools/clientcmd/api",
         "@org_golang_google_grpc//:go_default_library",
-        "@org_golang_x_net//proxy",
     ],
 )
 
diff --git a/metropolis/cli/metroctl/approve.go b/metropolis/cli/metroctl/approve.go
index adb7b8f..892a756 100644
--- a/metropolis/cli/metroctl/approve.go
+++ b/metropolis/cli/metroctl/approve.go
@@ -3,11 +3,11 @@
 import (
 	"context"
 	"fmt"
-	"io"
 	"log"
 
 	"github.com/spf13/cobra"
 
+	"source.monogon.dev/metropolis/cli/metroctl/core"
 	clicontext "source.monogon.dev/metropolis/cli/pkg/context"
 	"source.monogon.dev/metropolis/node/core/identity"
 	"source.monogon.dev/metropolis/proto/api"
@@ -24,29 +24,6 @@
 	rootCmd.AddCommand(approveCmd)
 }
 
-// getNewNodes returns all nodes pending approval within the cluster.
-func getNewNodes(ctx context.Context, mgmt api.ManagementClient) ([]*api.Node, error) {
-	resN, err := mgmt.GetNodes(ctx, &api.GetNodesRequest{
-		Filter: "node.state == NODE_STATE_NEW",
-	})
-	if err != nil {
-		return nil, err
-	}
-
-	var nodes []*api.Node
-	for {
-		node, err := resN.Recv()
-		if err == io.EOF {
-			break
-		}
-		if err != nil {
-			return nil, err
-		}
-		nodes = append(nodes, node)
-	}
-	return nodes, nil
-}
-
 // nodeById returns the node matching id, if it exists within nodes.
 func nodeById(nodes []*api.Node, id string) *api.Node {
 	for _, n := range nodes {
@@ -58,26 +35,14 @@
 }
 
 func doApprove(cmd *cobra.Command, args []string) {
-	// Collect credentials, validate command parameters, and try dialing the
-	// cluster.
-	ocert, opkey, err := getCredentials()
-	if err == noCredentialsError {
-		log.Fatalf("You have to take ownership of the cluster first: %v", err)
-	}
-	if len(flags.clusterEndpoints) == 0 {
-		log.Fatal("Please provide at least one cluster endpoint using the --endpoint parameter.")
-	}
-	ctx := clicontext.WithInterrupt(context.Background())
-	cc, err := dialCluster(ctx, opkey, ocert, flags.proxyAddr, flags.clusterEndpoints)
-	if err != nil {
-		log.Fatalf("While dialing the cluster: %v", err)
-	}
+	cc := dialAuthenticated()
 	mgmt := api.NewManagementClient(cc)
 
 	// Get a list of all nodes pending approval by calling Management.GetNodes.
 	// We need this list regardless of whether we're actually approving nodes, or
 	// just listing them.
-	nodes, err := getNewNodes(ctx, mgmt)
+	ctx := clicontext.WithInterrupt(context.Background())
+	nodes, err := core.GetNodes(ctx, mgmt, "node.state == NODE_STATE_NEW")
 	if err != nil {
 		log.Fatalf("While fetching a list of nodes pending approval: %v", err)
 	}
diff --git a/metropolis/cli/metroctl/core/BUILD.bazel b/metropolis/cli/metroctl/core/BUILD.bazel
index cdde06b..af948af 100644
--- a/metropolis/cli/metroctl/core/BUILD.bazel
+++ b/metropolis/cli/metroctl/core/BUILD.bazel
@@ -5,15 +5,21 @@
     srcs = [
         "core.go",
         "install.go",
+        "rpc.go",
     ],
     importpath = "source.monogon.dev/metropolis/cli/metroctl/core",
     visibility = ["//visibility:public"],
     deps = [
+        "//metropolis/node",
+        "//metropolis/node/core/rpc",
+        "//metropolis/node/core/rpc/resolver",
         "//metropolis/proto/api",
         "@com_github_diskfs_go_diskfs//:go-diskfs",
         "@com_github_diskfs_go_diskfs//disk",
         "@com_github_diskfs_go_diskfs//filesystem",
         "@com_github_diskfs_go_diskfs//partition/gpt",
+        "@org_golang_google_grpc//:go_default_library",
         "@org_golang_google_protobuf//proto",
+        "@org_golang_x_net//proxy",
     ],
 )
diff --git a/metropolis/cli/metroctl/core/rpc.go b/metropolis/cli/metroctl/core/rpc.go
new file mode 100644
index 0000000..8d7565f
--- /dev/null
+++ b/metropolis/cli/metroctl/core/rpc.go
@@ -0,0 +1,106 @@
+package core
+
+import (
+	"context"
+	"crypto/ed25519"
+	"crypto/tls"
+	"crypto/x509"
+	"fmt"
+	"io"
+	"net"
+
+	"golang.org/x/net/proxy"
+	"google.golang.org/grpc"
+
+	"source.monogon.dev/metropolis/node"
+	"source.monogon.dev/metropolis/node/core/rpc"
+	"source.monogon.dev/metropolis/node/core/rpc/resolver"
+	"source.monogon.dev/metropolis/proto/api"
+)
+
+type ResolverLogger func(format string, args ...interface{})
+
+// DialCluster dials the cluster control address. The owner certificate, and
+// proxy address parameters are optional and can be left nil, and empty,
+// respectively. At least one cluster endpoint must be provided. A missing
+// owner certificate will result in a connection that is authenticated with
+// ephemeral credentials, restricting the available API surface. proxyAddr
+// must point at a SOCKS5 endpoint.
+func DialCluster(ctx context.Context, opkey ed25519.PrivateKey, ocert *x509.Certificate, proxyAddr string, clusterEndpoints []string, rlf ResolverLogger) (*grpc.ClientConn, error) {
+	var dialOpts []grpc.DialOption
+
+	if opkey == nil {
+		return nil, fmt.Errorf("an owner's private key must be provided")
+	}
+	if len(clusterEndpoints) == 0 {
+		return nil, fmt.Errorf("at least one cluster endpoint must be provided")
+	}
+
+	if proxyAddr != "" {
+		socksDialer, err := proxy.SOCKS5("tcp", proxyAddr, nil, proxy.Direct)
+		if err != nil {
+			return nil, fmt.Errorf("failed to build a SOCKS dialer: %v", err)
+		}
+		grpcd := func(_ context.Context, addr string) (net.Conn, error) {
+			return socksDialer.Dial("tcp", addr)
+		}
+		dialOpts = append(dialOpts, grpc.WithContextDialer(grpcd))
+	}
+
+	if ocert == nil {
+		creds, err := rpc.NewEphemeralCredentials(opkey, nil)
+		if err != nil {
+			return nil, fmt.Errorf("while building ephemeral credentials: %v", err)
+		}
+		dialOpts = append(dialOpts, grpc.WithTransportCredentials(creds))
+	} else {
+		tlsc := tls.Certificate{
+			Certificate: [][]byte{ocert.Raw},
+			PrivateKey:  opkey,
+		}
+		creds := rpc.NewAuthenticatedCredentials(tlsc, nil)
+		dialOpts = append(dialOpts, grpc.WithTransportCredentials(creds))
+	}
+
+	var resolverOpts []resolver.ResolverOption
+	if rlf != nil {
+		resolverOpts = append(resolverOpts, resolver.WithLogger(rlf))
+	}
+	r := resolver.New(ctx, resolverOpts...)
+
+	for _, eps := range clusterEndpoints {
+		ep := resolver.NodeByHostPort(eps, uint16(node.CuratorServicePort))
+		r.AddEndpoint(ep)
+	}
+	dialOpts = append(dialOpts, grpc.WithResolvers(r))
+
+	c, err := grpc.Dial(resolver.MetropolisControlAddress, dialOpts...)
+	if err != nil {
+		return nil, fmt.Errorf("could not dial: %v", err)
+	}
+	return c, nil
+}
+
+// GetNodes retrieves node records, filtered by the supplied node filter
+// expression fexp.
+func GetNodes(ctx context.Context, mgmt api.ManagementClient, fexp string) ([]*api.Node, error) {
+	resN, err := mgmt.GetNodes(ctx, &api.GetNodesRequest{
+		Filter: fexp,
+	})
+	if err != nil {
+		return nil, err
+	}
+
+	var nodes []*api.Node
+	for {
+		node, err := resN.Recv()
+		if err == io.EOF {
+			break
+		}
+		if err != nil {
+			return nil, err
+		}
+		nodes = append(nodes, node)
+	}
+	return nodes, nil
+}
diff --git a/metropolis/cli/metroctl/install.go b/metropolis/cli/metroctl/install.go
index 38b82bf..07f5d3e 100644
--- a/metropolis/cli/metroctl/install.go
+++ b/metropolis/cli/metroctl/install.go
@@ -118,20 +118,7 @@
 			},
 		}
 	} else {
-		ocert, opkey, err := getCredentials()
-		if err == noCredentialsError {
-			log.Fatalf("In order to create a non-bootstrap node installer, you have to take ownership of the cluster first: %v", err)
-		}
-		if err != nil {
-			log.Fatalf("While retrieving owner credentials: %v", err)
-		}
-		if len(flags.clusterEndpoints) == 0 {
-			log.Fatal("At least one cluster endpoint is required while generating non-bootstrap installer images.")
-		}
-		cc, err := dialCluster(ctx, opkey, ocert, flags.proxyAddr, flags.clusterEndpoints)
-		if err != nil {
-			log.Fatalf("While dialing the cluster: %v", err)
-		}
+		cc := dialAuthenticated()
 		mgmt := api.NewManagementClient(cc)
 		resT, err := mgmt.GetRegisterTicket(ctx, &api.GetRegisterTicketRequest{})
 		if err != nil {
diff --git a/metropolis/cli/metroctl/main.go b/metropolis/cli/metroctl/main.go
index d616714..949eb63 100644
--- a/metropolis/cli/metroctl/main.go
+++ b/metropolis/cli/metroctl/main.go
@@ -1,6 +1,7 @@
 package main
 
 import (
+	"log"
 	"path/filepath"
 
 	"github.com/adrg/xdg"
@@ -35,6 +36,14 @@
 	rootCmd.PersistentFlags().BoolVar(&flags.verbose, "verbose", false, "Log additional runtime information")
 }
 
+// rpcLogger passes through the cluster resolver logs, if "--verbose" flag was
+// used.
+func rpcLogger(f string, args ...interface{}) {
+	if flags.verbose {
+		log.Printf("resolver: " + f, args...)
+	}
+}
+
 func main() {
 	cobra.CheckErr(rootCmd.Execute())
 }
diff --git a/metropolis/cli/metroctl/rpc.go b/metropolis/cli/metroctl/rpc.go
index 7cb8d7c..a050e9b 100644
--- a/metropolis/cli/metroctl/rpc.go
+++ b/metropolis/cli/metroctl/rpc.go
@@ -2,80 +2,30 @@
 
 import (
 	"context"
-	"crypto/ed25519"
-	"crypto/tls"
-	"crypto/x509"
-	"fmt"
 	"log"
-	"net"
 
-	"golang.org/x/net/proxy"
 	"google.golang.org/grpc"
 
-	"source.monogon.dev/metropolis/node"
-	"source.monogon.dev/metropolis/node/core/rpc"
-	"source.monogon.dev/metropolis/node/core/rpc/resolver"
+	"source.monogon.dev/metropolis/cli/metroctl/core"
+	clicontext "source.monogon.dev/metropolis/cli/pkg/context"
 )
 
-// dialCluster dials the cluster control address. The owner certificate, and
-// proxy address parameters are optional and can be left nil, and empty,
-// respectively. At least one cluster endpoint must be provided. A missing
-// owner certificate will result in a connection that is authenticated with
-// ephemeral credentials, restricting the available API surface. proxyAddr
-// must point at a SOCKS5 endpoint.
-func dialCluster(ctx context.Context, opkey ed25519.PrivateKey, ocert *x509.Certificate, proxyAddr string, clusterEndpoints []string) (*grpc.ClientConn, error) {
-	var dialOpts []grpc.DialOption
-
-	if opkey == nil {
-		return nil, fmt.Errorf("an owner's private key must be provided")
-	}
-	if len(clusterEndpoints) == 0 {
-		return nil, fmt.Errorf("at least one cluster endpoint must be provided")
+func dialAuthenticated() *grpc.ClientConn {
+	if len(flags.clusterEndpoints) == 0 {
+		log.Fatal("Please provide at least one cluster endpoint using the --endpoint parameter.")
 	}
 
-	if proxyAddr != "" {
-		socksDialer, err := proxy.SOCKS5("tcp", proxyAddr, nil, proxy.Direct)
-		if err != nil {
-			return nil, fmt.Errorf("failed to build a SOCKS dialer: %v", err)
-		}
-		grpcd := func(_ context.Context, addr string) (net.Conn, error) {
-			return socksDialer.Dial("tcp", addr)
-		}
-		dialOpts = append(dialOpts, grpc.WithContextDialer(grpcd))
+	// Collect credentials, validate command parameters, and try dialing the
+	// cluster.
+	ocert, opkey, err := getCredentials()
+	if err == noCredentialsError {
+		log.Fatalf("You have to take ownership of the cluster first: %v", err)
 	}
 
-	if ocert == nil {
-		creds, err := rpc.NewEphemeralCredentials(opkey, nil)
-		if err != nil {
-			return nil, fmt.Errorf("while building ephemeral credentials: %v", err)
-		}
-		dialOpts = append(dialOpts, grpc.WithTransportCredentials(creds))
-	} else {
-		tlsc := tls.Certificate{
-			Certificate: [][]byte{ocert.Raw},
-			PrivateKey:  opkey,
-		}
-		creds := rpc.NewAuthenticatedCredentials(tlsc, nil)
-		dialOpts = append(dialOpts, grpc.WithTransportCredentials(creds))
-	}
-
-	var resolverOpts []resolver.ResolverOption
-	if flags.verbose {
-		l := func(f string, args ...interface{}) {
-			log.Printf("resolver: " + f, args...)
-		}
-		resolverOpts = append(resolverOpts, resolver.WithLogger(l))
-	}
-	r := resolver.New(ctx, resolverOpts...)
-
-	for _, ep := range clusterEndpoints {
-		r.AddEndpoint(resolver.NodeByHostPort(ep, uint16(node.CuratorServicePort)))
-	}
-	dialOpts = append(dialOpts, grpc.WithResolvers(r))
-
-	c, err := grpc.Dial(resolver.MetropolisControlAddress, dialOpts...)
+	ctx := clicontext.WithInterrupt(context.Background())
+	cc, err := core.DialCluster(ctx, opkey, ocert, flags.proxyAddr, flags.clusterEndpoints, rpcLogger)
 	if err != nil {
-		return nil, fmt.Errorf("could not dial: %v", err)
+		log.Fatalf("While dialing the cluster: %v", err)
 	}
-	return c, nil
+	return cc
 }
diff --git a/metropolis/cli/metroctl/takeownership.go b/metropolis/cli/metroctl/takeownership.go
index 1b05500..6102cf1 100644
--- a/metropolis/cli/metroctl/takeownership.go
+++ b/metropolis/cli/metroctl/takeownership.go
@@ -14,6 +14,7 @@
 	"k8s.io/client-go/tools/clientcmd"
 	clientapi "k8s.io/client-go/tools/clientcmd/api"
 
+	"source.monogon.dev/metropolis/cli/metroctl/core"
 	clicontext "source.monogon.dev/metropolis/cli/pkg/context"
 	"source.monogon.dev/metropolis/node"
 	"source.monogon.dev/metropolis/node/core/rpc"
@@ -46,7 +47,7 @@
 		log.Fatalf("Couldn't get owner's key: %v", err)
 	}
 	ctx := clicontext.WithInterrupt(context.Background())
-	cc, err := dialCluster(ctx, opk, nil, flags.proxyAddr, flags.clusterEndpoints)
+	cc, err := core.DialCluster(ctx, opk, nil, flags.proxyAddr, flags.clusterEndpoints, rpcLogger)
 	if err != nil {
 		log.Fatalf("While dialing the cluster: %v", err)
 	}