m/c/metroctl: move common impl to m/c/m/core
This moves the implementation shared between CLI commands into metroctl
core package.
Change-Id: I93624a07356accf3441f02e6ecd8e91d5b71e66e
Reviewed-on: https://review.monogon.dev/c/monogon/+/843
Tested-by: Jenkins CI
Reviewed-by: Sergiusz Bazanski <serge@monogon.tech>
diff --git a/metropolis/cli/metroctl/BUILD.bazel b/metropolis/cli/metroctl/BUILD.bazel
index 4ecdd60..d6a0e47 100644
--- a/metropolis/cli/metroctl/BUILD.bazel
+++ b/metropolis/cli/metroctl/BUILD.bazel
@@ -26,7 +26,6 @@
"//metropolis/node",
"//metropolis/node/core/identity",
"//metropolis/node/core/rpc",
- "//metropolis/node/core/rpc/resolver",
"//metropolis/proto/api",
"@com_github_adrg_xdg//:xdg",
"@com_github_spf13_cobra//:cobra",
@@ -35,7 +34,6 @@
"@io_k8s_client_go//tools/clientcmd",
"@io_k8s_client_go//tools/clientcmd/api",
"@org_golang_google_grpc//:go_default_library",
- "@org_golang_x_net//proxy",
],
)
diff --git a/metropolis/cli/metroctl/approve.go b/metropolis/cli/metroctl/approve.go
index adb7b8f..892a756 100644
--- a/metropolis/cli/metroctl/approve.go
+++ b/metropolis/cli/metroctl/approve.go
@@ -3,11 +3,11 @@
import (
"context"
"fmt"
- "io"
"log"
"github.com/spf13/cobra"
+ "source.monogon.dev/metropolis/cli/metroctl/core"
clicontext "source.monogon.dev/metropolis/cli/pkg/context"
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/proto/api"
@@ -24,29 +24,6 @@
rootCmd.AddCommand(approveCmd)
}
-// getNewNodes returns all nodes pending approval within the cluster.
-func getNewNodes(ctx context.Context, mgmt api.ManagementClient) ([]*api.Node, error) {
- resN, err := mgmt.GetNodes(ctx, &api.GetNodesRequest{
- Filter: "node.state == NODE_STATE_NEW",
- })
- if err != nil {
- return nil, err
- }
-
- var nodes []*api.Node
- for {
- node, err := resN.Recv()
- if err == io.EOF {
- break
- }
- if err != nil {
- return nil, err
- }
- nodes = append(nodes, node)
- }
- return nodes, nil
-}
-
// nodeById returns the node matching id, if it exists within nodes.
func nodeById(nodes []*api.Node, id string) *api.Node {
for _, n := range nodes {
@@ -58,26 +35,14 @@
}
func doApprove(cmd *cobra.Command, args []string) {
- // Collect credentials, validate command parameters, and try dialing the
- // cluster.
- ocert, opkey, err := getCredentials()
- if err == noCredentialsError {
- log.Fatalf("You have to take ownership of the cluster first: %v", err)
- }
- if len(flags.clusterEndpoints) == 0 {
- log.Fatal("Please provide at least one cluster endpoint using the --endpoint parameter.")
- }
- ctx := clicontext.WithInterrupt(context.Background())
- cc, err := dialCluster(ctx, opkey, ocert, flags.proxyAddr, flags.clusterEndpoints)
- if err != nil {
- log.Fatalf("While dialing the cluster: %v", err)
- }
+ cc := dialAuthenticated()
mgmt := api.NewManagementClient(cc)
// Get a list of all nodes pending approval by calling Management.GetNodes.
// We need this list regardless of whether we're actually approving nodes, or
// just listing them.
- nodes, err := getNewNodes(ctx, mgmt)
+ ctx := clicontext.WithInterrupt(context.Background())
+ nodes, err := core.GetNodes(ctx, mgmt, "node.state == NODE_STATE_NEW")
if err != nil {
log.Fatalf("While fetching a list of nodes pending approval: %v", err)
}
diff --git a/metropolis/cli/metroctl/core/BUILD.bazel b/metropolis/cli/metroctl/core/BUILD.bazel
index cdde06b..af948af 100644
--- a/metropolis/cli/metroctl/core/BUILD.bazel
+++ b/metropolis/cli/metroctl/core/BUILD.bazel
@@ -5,15 +5,21 @@
srcs = [
"core.go",
"install.go",
+ "rpc.go",
],
importpath = "source.monogon.dev/metropolis/cli/metroctl/core",
visibility = ["//visibility:public"],
deps = [
+ "//metropolis/node",
+ "//metropolis/node/core/rpc",
+ "//metropolis/node/core/rpc/resolver",
"//metropolis/proto/api",
"@com_github_diskfs_go_diskfs//:go-diskfs",
"@com_github_diskfs_go_diskfs//disk",
"@com_github_diskfs_go_diskfs//filesystem",
"@com_github_diskfs_go_diskfs//partition/gpt",
+ "@org_golang_google_grpc//:go_default_library",
"@org_golang_google_protobuf//proto",
+ "@org_golang_x_net//proxy",
],
)
diff --git a/metropolis/cli/metroctl/core/rpc.go b/metropolis/cli/metroctl/core/rpc.go
new file mode 100644
index 0000000..8d7565f
--- /dev/null
+++ b/metropolis/cli/metroctl/core/rpc.go
@@ -0,0 +1,106 @@
+package core
+
+import (
+ "context"
+ "crypto/ed25519"
+ "crypto/tls"
+ "crypto/x509"
+ "fmt"
+ "io"
+ "net"
+
+ "golang.org/x/net/proxy"
+ "google.golang.org/grpc"
+
+ "source.monogon.dev/metropolis/node"
+ "source.monogon.dev/metropolis/node/core/rpc"
+ "source.monogon.dev/metropolis/node/core/rpc/resolver"
+ "source.monogon.dev/metropolis/proto/api"
+)
+
+type ResolverLogger func(format string, args ...interface{})
+
+// DialCluster dials the cluster control address. The owner certificate, and
+// proxy address parameters are optional and can be left nil, and empty,
+// respectively. At least one cluster endpoint must be provided. A missing
+// owner certificate will result in a connection that is authenticated with
+// ephemeral credentials, restricting the available API surface. proxyAddr
+// must point at a SOCKS5 endpoint.
+func DialCluster(ctx context.Context, opkey ed25519.PrivateKey, ocert *x509.Certificate, proxyAddr string, clusterEndpoints []string, rlf ResolverLogger) (*grpc.ClientConn, error) {
+ var dialOpts []grpc.DialOption
+
+ if opkey == nil {
+ return nil, fmt.Errorf("an owner's private key must be provided")
+ }
+ if len(clusterEndpoints) == 0 {
+ return nil, fmt.Errorf("at least one cluster endpoint must be provided")
+ }
+
+ if proxyAddr != "" {
+ socksDialer, err := proxy.SOCKS5("tcp", proxyAddr, nil, proxy.Direct)
+ if err != nil {
+ return nil, fmt.Errorf("failed to build a SOCKS dialer: %v", err)
+ }
+ grpcd := func(_ context.Context, addr string) (net.Conn, error) {
+ return socksDialer.Dial("tcp", addr)
+ }
+ dialOpts = append(dialOpts, grpc.WithContextDialer(grpcd))
+ }
+
+ if ocert == nil {
+ creds, err := rpc.NewEphemeralCredentials(opkey, nil)
+ if err != nil {
+ return nil, fmt.Errorf("while building ephemeral credentials: %v", err)
+ }
+ dialOpts = append(dialOpts, grpc.WithTransportCredentials(creds))
+ } else {
+ tlsc := tls.Certificate{
+ Certificate: [][]byte{ocert.Raw},
+ PrivateKey: opkey,
+ }
+ creds := rpc.NewAuthenticatedCredentials(tlsc, nil)
+ dialOpts = append(dialOpts, grpc.WithTransportCredentials(creds))
+ }
+
+ var resolverOpts []resolver.ResolverOption
+ if rlf != nil {
+ resolverOpts = append(resolverOpts, resolver.WithLogger(rlf))
+ }
+ r := resolver.New(ctx, resolverOpts...)
+
+ for _, eps := range clusterEndpoints {
+ ep := resolver.NodeByHostPort(eps, uint16(node.CuratorServicePort))
+ r.AddEndpoint(ep)
+ }
+ dialOpts = append(dialOpts, grpc.WithResolvers(r))
+
+ c, err := grpc.Dial(resolver.MetropolisControlAddress, dialOpts...)
+ if err != nil {
+ return nil, fmt.Errorf("could not dial: %v", err)
+ }
+ return c, nil
+}
+
+// GetNodes retrieves node records, filtered by the supplied node filter
+// expression fexp.
+func GetNodes(ctx context.Context, mgmt api.ManagementClient, fexp string) ([]*api.Node, error) {
+ resN, err := mgmt.GetNodes(ctx, &api.GetNodesRequest{
+ Filter: fexp,
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ var nodes []*api.Node
+ for {
+ node, err := resN.Recv()
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ return nil, err
+ }
+ nodes = append(nodes, node)
+ }
+ return nodes, nil
+}
diff --git a/metropolis/cli/metroctl/install.go b/metropolis/cli/metroctl/install.go
index 38b82bf..07f5d3e 100644
--- a/metropolis/cli/metroctl/install.go
+++ b/metropolis/cli/metroctl/install.go
@@ -118,20 +118,7 @@
},
}
} else {
- ocert, opkey, err := getCredentials()
- if err == noCredentialsError {
- log.Fatalf("In order to create a non-bootstrap node installer, you have to take ownership of the cluster first: %v", err)
- }
- if err != nil {
- log.Fatalf("While retrieving owner credentials: %v", err)
- }
- if len(flags.clusterEndpoints) == 0 {
- log.Fatal("At least one cluster endpoint is required while generating non-bootstrap installer images.")
- }
- cc, err := dialCluster(ctx, opkey, ocert, flags.proxyAddr, flags.clusterEndpoints)
- if err != nil {
- log.Fatalf("While dialing the cluster: %v", err)
- }
+ cc := dialAuthenticated()
mgmt := api.NewManagementClient(cc)
resT, err := mgmt.GetRegisterTicket(ctx, &api.GetRegisterTicketRequest{})
if err != nil {
diff --git a/metropolis/cli/metroctl/main.go b/metropolis/cli/metroctl/main.go
index d616714..949eb63 100644
--- a/metropolis/cli/metroctl/main.go
+++ b/metropolis/cli/metroctl/main.go
@@ -1,6 +1,7 @@
package main
import (
+ "log"
"path/filepath"
"github.com/adrg/xdg"
@@ -35,6 +36,14 @@
rootCmd.PersistentFlags().BoolVar(&flags.verbose, "verbose", false, "Log additional runtime information")
}
+// rpcLogger passes through the cluster resolver logs, if "--verbose" flag was
+// used.
+func rpcLogger(f string, args ...interface{}) {
+ if flags.verbose {
+ log.Printf("resolver: " + f, args...)
+ }
+}
+
func main() {
cobra.CheckErr(rootCmd.Execute())
}
diff --git a/metropolis/cli/metroctl/rpc.go b/metropolis/cli/metroctl/rpc.go
index 7cb8d7c..a050e9b 100644
--- a/metropolis/cli/metroctl/rpc.go
+++ b/metropolis/cli/metroctl/rpc.go
@@ -2,80 +2,30 @@
import (
"context"
- "crypto/ed25519"
- "crypto/tls"
- "crypto/x509"
- "fmt"
"log"
- "net"
- "golang.org/x/net/proxy"
"google.golang.org/grpc"
- "source.monogon.dev/metropolis/node"
- "source.monogon.dev/metropolis/node/core/rpc"
- "source.monogon.dev/metropolis/node/core/rpc/resolver"
+ "source.monogon.dev/metropolis/cli/metroctl/core"
+ clicontext "source.monogon.dev/metropolis/cli/pkg/context"
)
-// dialCluster dials the cluster control address. The owner certificate, and
-// proxy address parameters are optional and can be left nil, and empty,
-// respectively. At least one cluster endpoint must be provided. A missing
-// owner certificate will result in a connection that is authenticated with
-// ephemeral credentials, restricting the available API surface. proxyAddr
-// must point at a SOCKS5 endpoint.
-func dialCluster(ctx context.Context, opkey ed25519.PrivateKey, ocert *x509.Certificate, proxyAddr string, clusterEndpoints []string) (*grpc.ClientConn, error) {
- var dialOpts []grpc.DialOption
-
- if opkey == nil {
- return nil, fmt.Errorf("an owner's private key must be provided")
- }
- if len(clusterEndpoints) == 0 {
- return nil, fmt.Errorf("at least one cluster endpoint must be provided")
+func dialAuthenticated() *grpc.ClientConn {
+ if len(flags.clusterEndpoints) == 0 {
+ log.Fatal("Please provide at least one cluster endpoint using the --endpoint parameter.")
}
- if proxyAddr != "" {
- socksDialer, err := proxy.SOCKS5("tcp", proxyAddr, nil, proxy.Direct)
- if err != nil {
- return nil, fmt.Errorf("failed to build a SOCKS dialer: %v", err)
- }
- grpcd := func(_ context.Context, addr string) (net.Conn, error) {
- return socksDialer.Dial("tcp", addr)
- }
- dialOpts = append(dialOpts, grpc.WithContextDialer(grpcd))
+ // Collect credentials, validate command parameters, and try dialing the
+ // cluster.
+ ocert, opkey, err := getCredentials()
+ if err == noCredentialsError {
+ log.Fatalf("You have to take ownership of the cluster first: %v", err)
}
- if ocert == nil {
- creds, err := rpc.NewEphemeralCredentials(opkey, nil)
- if err != nil {
- return nil, fmt.Errorf("while building ephemeral credentials: %v", err)
- }
- dialOpts = append(dialOpts, grpc.WithTransportCredentials(creds))
- } else {
- tlsc := tls.Certificate{
- Certificate: [][]byte{ocert.Raw},
- PrivateKey: opkey,
- }
- creds := rpc.NewAuthenticatedCredentials(tlsc, nil)
- dialOpts = append(dialOpts, grpc.WithTransportCredentials(creds))
- }
-
- var resolverOpts []resolver.ResolverOption
- if flags.verbose {
- l := func(f string, args ...interface{}) {
- log.Printf("resolver: " + f, args...)
- }
- resolverOpts = append(resolverOpts, resolver.WithLogger(l))
- }
- r := resolver.New(ctx, resolverOpts...)
-
- for _, ep := range clusterEndpoints {
- r.AddEndpoint(resolver.NodeByHostPort(ep, uint16(node.CuratorServicePort)))
- }
- dialOpts = append(dialOpts, grpc.WithResolvers(r))
-
- c, err := grpc.Dial(resolver.MetropolisControlAddress, dialOpts...)
+ ctx := clicontext.WithInterrupt(context.Background())
+ cc, err := core.DialCluster(ctx, opkey, ocert, flags.proxyAddr, flags.clusterEndpoints, rpcLogger)
if err != nil {
- return nil, fmt.Errorf("could not dial: %v", err)
+ log.Fatalf("While dialing the cluster: %v", err)
}
- return c, nil
+ return cc
}
diff --git a/metropolis/cli/metroctl/takeownership.go b/metropolis/cli/metroctl/takeownership.go
index 1b05500..6102cf1 100644
--- a/metropolis/cli/metroctl/takeownership.go
+++ b/metropolis/cli/metroctl/takeownership.go
@@ -14,6 +14,7 @@
"k8s.io/client-go/tools/clientcmd"
clientapi "k8s.io/client-go/tools/clientcmd/api"
+ "source.monogon.dev/metropolis/cli/metroctl/core"
clicontext "source.monogon.dev/metropolis/cli/pkg/context"
"source.monogon.dev/metropolis/node"
"source.monogon.dev/metropolis/node/core/rpc"
@@ -46,7 +47,7 @@
log.Fatalf("Couldn't get owner's key: %v", err)
}
ctx := clicontext.WithInterrupt(context.Background())
- cc, err := dialCluster(ctx, opk, nil, flags.proxyAddr, flags.clusterEndpoints)
+ cc, err := core.DialCluster(ctx, opk, nil, flags.proxyAddr, flags.clusterEndpoints, rpcLogger)
if err != nil {
log.Fatalf("While dialing the cluster: %v", err)
}