m/test: implement SOCKS proxy in cluster tests
This uses the new socksproxy package to run a proxy server in the
nanoswitch, and uses it within tests to access the test cluster's nodes.
The cluster test code (and nanoswitch) still forward traffic to the
first node, but this will be gradually removed as SOCKS support is
implemented in metroctl and the debug tool. Forwards from host ports to
different node can then be implemented as part of the dbg tool (instead
of the cluster launch code) to maintain a simple interface during debug
and development.
We also use the opportunity to make the non-cluster launch code not
Metropolis specific (by removing an assumption that all ports on all
nodes are Metropolis ports). In the long term, we will probably remove
non-cluster launches entirely (or further turn this code into just being
a 'launch qemu' wrapper).
Change-Id: I9b321bde95ba74fbfaa695eaaad8f9974aba5372
Reviewed-on: https://review.monogon.dev/c/monogon/+/648
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/test/launch/cluster/BUILD.bazel b/metropolis/test/launch/cluster/BUILD.bazel
index 2ca9816..b70428c 100644
--- a/metropolis/test/launch/cluster/BUILD.bazel
+++ b/metropolis/test/launch/cluster/BUILD.bazel
@@ -26,11 +26,11 @@
"//metropolis/proto/common",
"//metropolis/test/launch",
"@com_github_cenkalti_backoff_v4//:backoff",
- "@com_github_grpc_ecosystem_go_grpc_middleware//retry",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
"@org_golang_google_protobuf//proto",
+ "@org_golang_x_net//proxy",
"@org_uber_go_multierr//:multierr",
],
)
diff --git a/metropolis/test/launch/cluster/cluster.go b/metropolis/test/launch/cluster/cluster.go
index cf9cd07..da7862f 100644
--- a/metropolis/test/launch/cluster/cluster.go
+++ b/metropolis/test/launch/cluster/cluster.go
@@ -21,14 +21,15 @@
"time"
"github.com/cenkalti/backoff/v4"
- grpcretry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
"go.uber.org/multierr"
+ "golang.org/x/net/proxy"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"source.monogon.dev/metropolis/node"
+ common "source.monogon.dev/metropolis/node"
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/rpc"
apb "source.monogon.dev/metropolis/proto/api"
@@ -282,14 +283,18 @@
return &mac, nil
}
-// ClusterPorts contains all ports forwarded by Nanoswitch to the first VM in a
-// launched Metropolis cluster.
-var ClusterPorts = []node.Port{
- node.CuratorServicePort,
- node.DebugServicePort,
+const SOCKSPort uint16 = 1080
- node.KubernetesAPIPort,
- node.KubernetesAPIWrappedPort,
+// ClusterPorts contains all ports handled by Nanoswitch.
+var ClusterPorts = []uint16{
+ // Forwarded to the first node.
+ uint16(node.CuratorServicePort),
+ uint16(node.DebugServicePort),
+ uint16(node.KubernetesAPIPort),
+ uint16(node.KubernetesAPIWrappedPort),
+
+ // SOCKS proxy to the switch network
+ SOCKSPort,
}
// ClusterOptions contains all options for launching a Metropolis cluster.
@@ -301,19 +306,19 @@
// Cluster is the running Metropolis cluster launched using the LaunchCluster
// function.
type Cluster struct {
- // Debug is the NodeDebugService gRPC service, allowing for debug
- // unauthenticated cluster to the access.
- Debug apb.NodeDebugServiceClient
- // Management is the Management gRPC service, authenticated as the owner of the
- // cluster.
- Management apb.ManagementClient
// Owner is the TLS Certificate of the owner of the test cluster. This can be
// used to authenticate further clients to the running cluster.
Owner tls.Certificate
// Ports is the PortMap used to access the first nodes' services (defined in
- // ClusterPorts).
+ // ClusterPorts) and the SOCKS proxy (at SOCKSPort).
Ports launch.PortMap
+ // Nodes is a map from Node ID to its runtime information.
+ Nodes map[string]*NodeInCluster
+ // NodeIDs is a list of node IDs that are backing this cluster, in order of
+ // creation.
+ NodeIDs []string
+
// nodesDone is a list of channels populated with the return codes from all the
// nodes' qemu instances. It's used by Close to ensure all nodes have
// succesfully been stopped.
@@ -321,6 +326,96 @@
// ctxC is used by Close to cancel the context under which the nodes are
// running.
ctxC context.CancelFunc
+ // socksDialer is used by DialNode to establish connections to nodes via the
+ // SOCKS server ran by nanoswitch.
+ socksDialer proxy.Dialer
+}
+
+// NodeInCluster represents information about a node that's part of a Cluster.
+type NodeInCluster struct {
+ // ID of the node, which can be used to dial this node's services via DialNode.
+ ID string
+ // Address of the node on the network ran by nanoswitch. Not reachable from the
+ // host unless dialed via DialNode or via the nanoswitch SOCKS proxy (reachable
+ // on Cluster.Ports[SOCKSPort]).
+ ManagementAddress string
+}
+
+// firstConnection performs the initial owner credential escrow with a newly
+// started nanoswitch-backed cluster over SOCKS. It expects the first node to be
+// running at 10.1.0.2, which is always the case with the current nanoswitch
+// implementation.
+//
+// It returns the newly escrowed credentials as well as the firt node's
+// information as NodeInCluster.
+func firstConnection(ctx context.Context, socksDialer proxy.Dialer) (*tls.Certificate, *NodeInCluster, error) {
+ // Dial external service.
+ remote := fmt.Sprintf("10.1.0.2:%s", node.CuratorServicePort.PortString())
+ initCreds, err := rpc.NewEphemeralCredentials(InsecurePrivateKey, nil)
+ if err != nil {
+ return nil, nil, fmt.Errorf("NewEphemeralCredentials: %w", err)
+ }
+ initDialer := func(_ context.Context, addr string) (net.Conn, error) {
+ return socksDialer.Dial("tcp", addr)
+ }
+ initClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(initCreds))
+ if err != nil {
+ return nil, nil, fmt.Errorf("dialing with ephemeral credentials failed: %w", err)
+ }
+ defer initClient.Close()
+
+ // Retrieve owner certificate - this can take a while because the node is still
+ // coming up, so do it in a backoff loop.
+ log.Printf("Cluster: retrieving owner certificate (this can take a few seconds while the first node boots)...")
+ aaa := apb.NewAAAClient(initClient)
+ var cert *tls.Certificate
+ err = backoff.Retry(func() error {
+ cert, err = rpc.RetrieveOwnerCertificate(ctx, aaa, InsecurePrivateKey)
+ if st, ok := status.FromError(err); ok {
+ if st.Code() == codes.Unavailable {
+ return err
+ }
+ }
+ return backoff.Permanent(err)
+ }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
+ if err != nil {
+ return nil, nil, err
+ }
+ log.Printf("Cluster: retrieved owner certificate.")
+
+ // Now connect authenticated and get the node ID.
+ creds := rpc.NewAuthenticatedCredentials(*cert, nil)
+ authClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(creds))
+ if err != nil {
+ return nil, nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
+ }
+ defer authClient.Close()
+ mgmt := apb.NewManagementClient(authClient)
+
+ var node *NodeInCluster
+ err = backoff.Retry(func() error {
+ nodes, err := getNodes(ctx, mgmt)
+ if err != nil {
+ return fmt.Errorf("retrieving nodes failed: %w", err)
+ }
+ if len(nodes) != 1 {
+ return fmt.Errorf("expected one node, got %d", len(nodes))
+ }
+ n := nodes[0]
+ if n.Status == nil || n.Status.ExternalAddress == "" {
+ return fmt.Errorf("node has no status and/or address")
+ }
+ node = &NodeInCluster{
+ ID: identity.NodeID(n.Pubkey),
+ ManagementAddress: n.Status.ExternalAddress,
+ }
+ return nil
+ }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
+ if err != nil {
+ return nil, nil, err
+ }
+
+ return cert, node, nil
}
// LaunchCluster launches a cluster of Metropolis node VMs together with a
@@ -392,57 +487,48 @@
}
}()
- // Dial debug service.
- copts := []grpcretry.CallOption{
- grpcretry.WithBackoff(grpcretry.BackoffExponential(100 * time.Millisecond)),
- }
- debugConn, err := portMap.DialGRPC(node.DebugServicePort, grpc.WithInsecure(),
- grpc.WithUnaryInterceptor(grpcretry.UnaryClientInterceptor(copts...)))
+ // Build SOCKS dialer.
+ socksRemote := fmt.Sprintf("localhost:%v", portMap[SOCKSPort])
+ socksDialer, err := proxy.SOCKS5("tcp", socksRemote, nil, proxy.Direct)
if err != nil {
ctxC()
- return nil, fmt.Errorf("failed to dial debug service: %w", err)
+ return nil, fmt.Errorf("failed to build SOCKS dialer: %w", err)
}
- // Dial external service.
- remote := fmt.Sprintf("localhost:%v", portMap[node.CuratorServicePort])
- initCreds, err := rpc.NewEphemeralCredentials(InsecurePrivateKey, nil)
- if err != nil {
- ctxC()
- return nil, fmt.Errorf("NewEphemeralCredentials: %w", err)
- }
- initClient, err := grpc.Dial(remote, grpc.WithTransportCredentials(initCreds))
- if err != nil {
- ctxC()
- return nil, fmt.Errorf("dialing with ephemeral credentials failed: %w", err)
- }
-
- // Retrieve owner certificate - this can take a while because the node is still
- // coming up, so do it in a backoff loop.
- log.Printf("Cluster: retrieving owner certificate (this can take a few seconds while the first node boots)...")
- aaa := apb.NewAAAClient(initClient)
- var cert *tls.Certificate
- err = backoff.Retry(func() error {
- cert, err = rpc.RetrieveOwnerCertificate(ctxT, aaa, InsecurePrivateKey)
- if st, ok := status.FromError(err); ok {
- if st.Code() == codes.Unavailable {
- return err
- }
- }
- return backoff.Permanent(err)
- }, backoff.WithContext(backoff.NewExponentialBackOff(), ctxT))
+ // Retrieve owner credentials and first node.
+ cert, firstNode, err := firstConnection(ctxT, socksDialer)
if err != nil {
ctxC()
return nil, err
}
- log.Printf("Cluster: retrieved owner certificate.")
- // Build authenticated owner client to new node.
+ cluster := &Cluster{
+ Owner: *cert,
+ Ports: portMap,
+ Nodes: map[string]*NodeInCluster{
+ firstNode.ID: firstNode,
+ },
+ NodeIDs: []string{
+ firstNode.ID,
+ },
+
+ nodesDone: done,
+ socksDialer: socksDialer,
+
+ ctxC: ctxC,
+ }
+
+ // Now start the rest of the nodes and register them into the cluster.
+
+ // Build authenticated owner client to first node.
authCreds := rpc.NewAuthenticatedCredentials(*cert, nil)
- authClient, err := grpc.Dial(remote, grpc.WithTransportCredentials(authCreds))
+ remote := net.JoinHostPort(cluster.NodeIDs[0], common.CuratorServicePort.PortString())
+ authClient, err := grpc.Dial(remote, grpc.WithTransportCredentials(authCreds), grpc.WithContextDialer(cluster.DialNode))
if err != nil {
ctxC()
return nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
}
+ defer authClient.Close()
mgmt := apb.NewManagementClient(authClient)
// Retrieve register ticket to register further nodes.
@@ -463,8 +549,6 @@
return nil, fmt.Errorf("GetClusterInfo: %w", err)
}
- // Now run the rest of the nodes.
- //
// TODO(q3k): parallelize this
for i := 1; i < opts.NumNodes; i++ {
log.Printf("Cluster: Starting node %d...", i+1)
@@ -515,7 +599,7 @@
ctxC()
return nil, fmt.Errorf("ApproveNode(%s): %w", id, err)
}
- log.Printf("Cluster: node %d approved, waiting for it to appear as UP...", i)
+ log.Printf("Cluster: node %d approved, waiting for it to appear as UP and with a network address...", i)
for {
nodes, err := getNodes(ctx, mgmt)
if err != nil {
@@ -527,31 +611,37 @@
if !bytes.Equal(n.Pubkey, newNode.Pubkey) {
continue
}
- if n.State == cpb.NodeState_NODE_STATE_UP {
- found = true
+ if n.Status == nil || n.Status.ExternalAddress == "" {
break
}
- time.Sleep(time.Second)
+ if n.State != cpb.NodeState_NODE_STATE_UP {
+ break
+ }
+ found = true
+ cluster.Nodes[identity.NodeID(n.Pubkey)] = &NodeInCluster{
+ ID: identity.NodeID(n.Pubkey),
+ ManagementAddress: n.Status.ExternalAddress,
+ }
+ cluster.NodeIDs = append(cluster.NodeIDs, identity.NodeID(n.Pubkey))
+ break
}
if found {
break
}
+ time.Sleep(time.Second)
}
log.Printf("Cluster: node %d (%s) UP!", i, id)
}
- return &Cluster{
- Debug: apb.NewNodeDebugServiceClient(debugConn),
- Management: mgmt,
- Owner: *cert,
- Ports: portMap,
- nodesDone: done,
+ log.Printf("Cluster: all nodes up:")
+ for _, node := range cluster.Nodes {
+ log.Printf("Cluster: - %s at %s", node.ID, node.ManagementAddress)
+ }
- ctxC: ctxC,
- }, nil
+ return cluster, nil
}
-// Close cancels the running clusters' context and waits for all virtualized
+// Close cancels the running cluster's context and waits for all virtualized
// nodes to stop. It returns an error if stopping the nodes failed, or one of
// the nodes failed to fully start in the first place.
func (c *Cluster) Close() error {
@@ -569,3 +659,30 @@
log.Printf("Cluster: done")
return multierr.Combine(errors...)
}
+
+// DialNode is a grpc.WithContextDialer compatible dialer which dials nodes by
+// their ID. This is performed by connecting to the cluster nanoswitch via its
+// SOCKS proxy, and using the cluster node list for name resolution.
+//
+// For example:
+//
+// grpc.Dial("metropolis-deadbeef:1234", grpc.WithContextDialer(c.DialNode))
+//
+func (c *Cluster) DialNode(_ context.Context, addr string) (net.Conn, error) {
+ host, port, err := net.SplitHostPort(addr)
+ if err != nil {
+ return nil, fmt.Errorf("invalid host:port: %w", err)
+ }
+ // Already an IP address?
+ if net.ParseIP(host) != nil {
+ return c.socksDialer.Dial("tcp", addr)
+ }
+
+ // Otherwise, expect a node name.
+ node, ok := c.Nodes[host]
+ if !ok {
+ return nil, fmt.Errorf("unknown node %q", host)
+ }
+ addr = net.JoinHostPort(node.ManagementAddress, port)
+ return c.socksDialer.Dial("tcp", addr)
+}