m/test: implement SOCKS proxy in cluster tests
This uses the new socksproxy package to run a proxy server in the
nanoswitch, and uses it within tests to access the test cluster's nodes.
The cluster test code (and nanoswitch) still forward traffic to the
first node, but this will be gradually removed as SOCKS support is
implemented in metroctl and the debug tool. Forwards from host ports to
different node can then be implemented as part of the dbg tool (instead
of the cluster launch code) to maintain a simple interface during debug
and development.
We also use the opportunity to make the non-cluster launch code not
Metropolis specific (by removing an assumption that all ports on all
nodes are Metropolis ports). In the long term, we will probably remove
non-cluster launches entirely (or further turn this code into just being
a 'launch qemu' wrapper).
Change-Id: I9b321bde95ba74fbfaa695eaaad8f9974aba5372
Reviewed-on: https://review.monogon.dev/c/monogon/+/648
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/test/e2e/BUILD.bazel b/metropolis/test/e2e/BUILD.bazel
index 30932de..4d04857 100644
--- a/metropolis/test/e2e/BUILD.bazel
+++ b/metropolis/test/e2e/BUILD.bazel
@@ -34,11 +34,13 @@
deps = [
"//metropolis/node",
"//metropolis/node/core/identity",
+ "//metropolis/node/core/rpc",
"//metropolis/proto/api",
"//metropolis/test/launch/cluster",
"@io_k8s_api//core/v1:core",
"@io_k8s_apimachinery//pkg/api/resource",
"@io_k8s_apimachinery//pkg/apis/meta/v1:meta",
"@io_k8s_kubernetes//pkg/api/v1/pod",
+ "@org_golang_google_grpc//:go_default_library",
],
)
diff --git a/metropolis/test/e2e/k8s_cts/main.go b/metropolis/test/e2e/k8s_cts/main.go
index 15be332..c0adc75 100644
--- a/metropolis/test/e2e/k8s_cts/main.go
+++ b/metropolis/test/e2e/k8s_cts/main.go
@@ -106,7 +106,8 @@
log.Fatalf("Failed to launch cluster: %v", err)
}
log.Println("Cluster initialized")
- clientSet, err := e2e.GetKubeClientSet(cl, cl.Ports[common.KubernetesAPIWrappedPort])
+ // TODO(q3k): use SOCKS proxy instead.
+ clientSet, err := e2e.GetKubeClientSet(cl, cl.Ports[uint16(common.KubernetesAPIWrappedPort)])
if err != nil {
log.Fatalf("Failed to get clientSet: %v", err)
}
diff --git a/metropolis/test/e2e/main_test.go b/metropolis/test/e2e/main_test.go
index f7dfff8..51dbe4b 100644
--- a/metropolis/test/e2e/main_test.go
+++ b/metropolis/test/e2e/main_test.go
@@ -30,6 +30,7 @@
"testing"
"time"
+ "google.golang.org/grpc"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/resource"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@@ -37,6 +38,7 @@
common "source.monogon.dev/metropolis/node"
"source.monogon.dev/metropolis/node/core/identity"
+ "source.monogon.dev/metropolis/node/core/rpc"
apb "source.monogon.dev/metropolis/proto/api"
"source.monogon.dev/metropolis/test/launch/cluster"
)
@@ -96,13 +98,23 @@
log.Printf("E2E: Cluster running, starting tests...")
+ // Dial first node's curator.
+ creds := rpc.NewAuthenticatedCredentials(cluster.Owner, nil)
+ remote := net.JoinHostPort(cluster.NodeIDs[0], common.CuratorServicePort.PortString())
+ cl, err := grpc.Dial(remote, grpc.WithContextDialer(cluster.DialNode), grpc.WithTransportCredentials(creds))
+ if err != nil {
+ t.Fatalf("failed to dial first node's curator: %v", err)
+ }
+ defer cl.Close()
+ mgmt := apb.NewManagementClient(cl)
+
// This exists to keep the parent around while all the children race.
// It currently tests both a set of OS-level conditions and Kubernetes
// Deployments and StatefulSets
t.Run("RunGroup", func(t *testing.T) {
t.Run("Cluster", func(t *testing.T) {
testEventual(t, "Retrieving cluster directory sucessful", ctx, 60*time.Second, func(ctx context.Context) error {
- res, err := cluster.Management.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
+ res, err := mgmt.GetClusterInfo(ctx, &apb.GetClusterInfoRequest{})
if err != nil {
return fmt.Errorf("GetClusterInfo: %w", err)
}
@@ -133,7 +145,8 @@
})
t.Run("Kubernetes", func(t *testing.T) {
t.Parallel()
- clientSet, err := GetKubeClientSet(cluster, cluster.Ports[common.KubernetesAPIWrappedPort])
+ // TODO(q3k): use SOCKS proxy.
+ clientSet, err := GetKubeClientSet(cluster, cluster.Ports[uint16(common.KubernetesAPIWrappedPort)])
if err != nil {
t.Fatal(err)
}
diff --git a/metropolis/test/launch/BUILD.bazel b/metropolis/test/launch/BUILD.bazel
index 4508dd5..ff476c6 100644
--- a/metropolis/test/launch/BUILD.bazel
+++ b/metropolis/test/launch/BUILD.bazel
@@ -6,9 +6,7 @@
importpath = "source.monogon.dev/metropolis/test/launch",
visibility = ["//metropolis:__subpackages__"],
deps = [
- "//metropolis/node",
"//metropolis/pkg/freeport",
- "@org_golang_google_grpc//:go_default_library",
"@org_golang_x_sys//unix",
],
)
diff --git a/metropolis/test/launch/cli/launch/main.go b/metropolis/test/launch/cli/launch/main.go
index 5567379..a826ea0 100644
--- a/metropolis/test/launch/cli/launch/main.go
+++ b/metropolis/test/launch/cli/launch/main.go
@@ -28,9 +28,13 @@
)
func main() {
+ var ports []uint16
+ for _, p := range cluster.NodePorts {
+ ports = append(ports, uint16(p))
+ }
ctx := clicontext.WithInterrupt(context.Background())
err := cluster.LaunchNode(ctx, cluster.NodeOptions{
- Ports: launch.IdentityPortMap(cluster.NodePorts),
+ Ports: launch.IdentityPortMap(ports),
SerialPort: os.Stdout,
NodeParameters: &apb.NodeParameters{
Cluster: &apb.NodeParameters_ClusterBootstrap_{
diff --git a/metropolis/test/launch/cluster/BUILD.bazel b/metropolis/test/launch/cluster/BUILD.bazel
index 2ca9816..b70428c 100644
--- a/metropolis/test/launch/cluster/BUILD.bazel
+++ b/metropolis/test/launch/cluster/BUILD.bazel
@@ -26,11 +26,11 @@
"//metropolis/proto/common",
"//metropolis/test/launch",
"@com_github_cenkalti_backoff_v4//:backoff",
- "@com_github_grpc_ecosystem_go_grpc_middleware//retry",
"@org_golang_google_grpc//:go_default_library",
"@org_golang_google_grpc//codes",
"@org_golang_google_grpc//status",
"@org_golang_google_protobuf//proto",
+ "@org_golang_x_net//proxy",
"@org_uber_go_multierr//:multierr",
],
)
diff --git a/metropolis/test/launch/cluster/cluster.go b/metropolis/test/launch/cluster/cluster.go
index cf9cd07..da7862f 100644
--- a/metropolis/test/launch/cluster/cluster.go
+++ b/metropolis/test/launch/cluster/cluster.go
@@ -21,14 +21,15 @@
"time"
"github.com/cenkalti/backoff/v4"
- grpcretry "github.com/grpc-ecosystem/go-grpc-middleware/retry"
"go.uber.org/multierr"
+ "golang.org/x/net/proxy"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/proto"
"source.monogon.dev/metropolis/node"
+ common "source.monogon.dev/metropolis/node"
"source.monogon.dev/metropolis/node/core/identity"
"source.monogon.dev/metropolis/node/core/rpc"
apb "source.monogon.dev/metropolis/proto/api"
@@ -282,14 +283,18 @@
return &mac, nil
}
-// ClusterPorts contains all ports forwarded by Nanoswitch to the first VM in a
-// launched Metropolis cluster.
-var ClusterPorts = []node.Port{
- node.CuratorServicePort,
- node.DebugServicePort,
+const SOCKSPort uint16 = 1080
- node.KubernetesAPIPort,
- node.KubernetesAPIWrappedPort,
+// ClusterPorts contains all ports handled by Nanoswitch.
+var ClusterPorts = []uint16{
+ // Forwarded to the first node.
+ uint16(node.CuratorServicePort),
+ uint16(node.DebugServicePort),
+ uint16(node.KubernetesAPIPort),
+ uint16(node.KubernetesAPIWrappedPort),
+
+ // SOCKS proxy to the switch network
+ SOCKSPort,
}
// ClusterOptions contains all options for launching a Metropolis cluster.
@@ -301,19 +306,19 @@
// Cluster is the running Metropolis cluster launched using the LaunchCluster
// function.
type Cluster struct {
- // Debug is the NodeDebugService gRPC service, allowing for debug
- // unauthenticated cluster to the access.
- Debug apb.NodeDebugServiceClient
- // Management is the Management gRPC service, authenticated as the owner of the
- // cluster.
- Management apb.ManagementClient
// Owner is the TLS Certificate of the owner of the test cluster. This can be
// used to authenticate further clients to the running cluster.
Owner tls.Certificate
// Ports is the PortMap used to access the first nodes' services (defined in
- // ClusterPorts).
+ // ClusterPorts) and the SOCKS proxy (at SOCKSPort).
Ports launch.PortMap
+ // Nodes is a map from Node ID to its runtime information.
+ Nodes map[string]*NodeInCluster
+ // NodeIDs is a list of node IDs that are backing this cluster, in order of
+ // creation.
+ NodeIDs []string
+
// nodesDone is a list of channels populated with the return codes from all the
// nodes' qemu instances. It's used by Close to ensure all nodes have
// succesfully been stopped.
@@ -321,6 +326,96 @@
// ctxC is used by Close to cancel the context under which the nodes are
// running.
ctxC context.CancelFunc
+ // socksDialer is used by DialNode to establish connections to nodes via the
+ // SOCKS server ran by nanoswitch.
+ socksDialer proxy.Dialer
+}
+
+// NodeInCluster represents information about a node that's part of a Cluster.
+type NodeInCluster struct {
+ // ID of the node, which can be used to dial this node's services via DialNode.
+ ID string
+ // Address of the node on the network ran by nanoswitch. Not reachable from the
+ // host unless dialed via DialNode or via the nanoswitch SOCKS proxy (reachable
+ // on Cluster.Ports[SOCKSPort]).
+ ManagementAddress string
+}
+
+// firstConnection performs the initial owner credential escrow with a newly
+// started nanoswitch-backed cluster over SOCKS. It expects the first node to be
+// running at 10.1.0.2, which is always the case with the current nanoswitch
+// implementation.
+//
+// It returns the newly escrowed credentials as well as the firt node's
+// information as NodeInCluster.
+func firstConnection(ctx context.Context, socksDialer proxy.Dialer) (*tls.Certificate, *NodeInCluster, error) {
+ // Dial external service.
+ remote := fmt.Sprintf("10.1.0.2:%s", node.CuratorServicePort.PortString())
+ initCreds, err := rpc.NewEphemeralCredentials(InsecurePrivateKey, nil)
+ if err != nil {
+ return nil, nil, fmt.Errorf("NewEphemeralCredentials: %w", err)
+ }
+ initDialer := func(_ context.Context, addr string) (net.Conn, error) {
+ return socksDialer.Dial("tcp", addr)
+ }
+ initClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(initCreds))
+ if err != nil {
+ return nil, nil, fmt.Errorf("dialing with ephemeral credentials failed: %w", err)
+ }
+ defer initClient.Close()
+
+ // Retrieve owner certificate - this can take a while because the node is still
+ // coming up, so do it in a backoff loop.
+ log.Printf("Cluster: retrieving owner certificate (this can take a few seconds while the first node boots)...")
+ aaa := apb.NewAAAClient(initClient)
+ var cert *tls.Certificate
+ err = backoff.Retry(func() error {
+ cert, err = rpc.RetrieveOwnerCertificate(ctx, aaa, InsecurePrivateKey)
+ if st, ok := status.FromError(err); ok {
+ if st.Code() == codes.Unavailable {
+ return err
+ }
+ }
+ return backoff.Permanent(err)
+ }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
+ if err != nil {
+ return nil, nil, err
+ }
+ log.Printf("Cluster: retrieved owner certificate.")
+
+ // Now connect authenticated and get the node ID.
+ creds := rpc.NewAuthenticatedCredentials(*cert, nil)
+ authClient, err := grpc.Dial(remote, grpc.WithContextDialer(initDialer), grpc.WithTransportCredentials(creds))
+ if err != nil {
+ return nil, nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
+ }
+ defer authClient.Close()
+ mgmt := apb.NewManagementClient(authClient)
+
+ var node *NodeInCluster
+ err = backoff.Retry(func() error {
+ nodes, err := getNodes(ctx, mgmt)
+ if err != nil {
+ return fmt.Errorf("retrieving nodes failed: %w", err)
+ }
+ if len(nodes) != 1 {
+ return fmt.Errorf("expected one node, got %d", len(nodes))
+ }
+ n := nodes[0]
+ if n.Status == nil || n.Status.ExternalAddress == "" {
+ return fmt.Errorf("node has no status and/or address")
+ }
+ node = &NodeInCluster{
+ ID: identity.NodeID(n.Pubkey),
+ ManagementAddress: n.Status.ExternalAddress,
+ }
+ return nil
+ }, backoff.WithContext(backoff.NewExponentialBackOff(), ctx))
+ if err != nil {
+ return nil, nil, err
+ }
+
+ return cert, node, nil
}
// LaunchCluster launches a cluster of Metropolis node VMs together with a
@@ -392,57 +487,48 @@
}
}()
- // Dial debug service.
- copts := []grpcretry.CallOption{
- grpcretry.WithBackoff(grpcretry.BackoffExponential(100 * time.Millisecond)),
- }
- debugConn, err := portMap.DialGRPC(node.DebugServicePort, grpc.WithInsecure(),
- grpc.WithUnaryInterceptor(grpcretry.UnaryClientInterceptor(copts...)))
+ // Build SOCKS dialer.
+ socksRemote := fmt.Sprintf("localhost:%v", portMap[SOCKSPort])
+ socksDialer, err := proxy.SOCKS5("tcp", socksRemote, nil, proxy.Direct)
if err != nil {
ctxC()
- return nil, fmt.Errorf("failed to dial debug service: %w", err)
+ return nil, fmt.Errorf("failed to build SOCKS dialer: %w", err)
}
- // Dial external service.
- remote := fmt.Sprintf("localhost:%v", portMap[node.CuratorServicePort])
- initCreds, err := rpc.NewEphemeralCredentials(InsecurePrivateKey, nil)
- if err != nil {
- ctxC()
- return nil, fmt.Errorf("NewEphemeralCredentials: %w", err)
- }
- initClient, err := grpc.Dial(remote, grpc.WithTransportCredentials(initCreds))
- if err != nil {
- ctxC()
- return nil, fmt.Errorf("dialing with ephemeral credentials failed: %w", err)
- }
-
- // Retrieve owner certificate - this can take a while because the node is still
- // coming up, so do it in a backoff loop.
- log.Printf("Cluster: retrieving owner certificate (this can take a few seconds while the first node boots)...")
- aaa := apb.NewAAAClient(initClient)
- var cert *tls.Certificate
- err = backoff.Retry(func() error {
- cert, err = rpc.RetrieveOwnerCertificate(ctxT, aaa, InsecurePrivateKey)
- if st, ok := status.FromError(err); ok {
- if st.Code() == codes.Unavailable {
- return err
- }
- }
- return backoff.Permanent(err)
- }, backoff.WithContext(backoff.NewExponentialBackOff(), ctxT))
+ // Retrieve owner credentials and first node.
+ cert, firstNode, err := firstConnection(ctxT, socksDialer)
if err != nil {
ctxC()
return nil, err
}
- log.Printf("Cluster: retrieved owner certificate.")
- // Build authenticated owner client to new node.
+ cluster := &Cluster{
+ Owner: *cert,
+ Ports: portMap,
+ Nodes: map[string]*NodeInCluster{
+ firstNode.ID: firstNode,
+ },
+ NodeIDs: []string{
+ firstNode.ID,
+ },
+
+ nodesDone: done,
+ socksDialer: socksDialer,
+
+ ctxC: ctxC,
+ }
+
+ // Now start the rest of the nodes and register them into the cluster.
+
+ // Build authenticated owner client to first node.
authCreds := rpc.NewAuthenticatedCredentials(*cert, nil)
- authClient, err := grpc.Dial(remote, grpc.WithTransportCredentials(authCreds))
+ remote := net.JoinHostPort(cluster.NodeIDs[0], common.CuratorServicePort.PortString())
+ authClient, err := grpc.Dial(remote, grpc.WithTransportCredentials(authCreds), grpc.WithContextDialer(cluster.DialNode))
if err != nil {
ctxC()
return nil, fmt.Errorf("dialing with owner credentials failed: %w", err)
}
+ defer authClient.Close()
mgmt := apb.NewManagementClient(authClient)
// Retrieve register ticket to register further nodes.
@@ -463,8 +549,6 @@
return nil, fmt.Errorf("GetClusterInfo: %w", err)
}
- // Now run the rest of the nodes.
- //
// TODO(q3k): parallelize this
for i := 1; i < opts.NumNodes; i++ {
log.Printf("Cluster: Starting node %d...", i+1)
@@ -515,7 +599,7 @@
ctxC()
return nil, fmt.Errorf("ApproveNode(%s): %w", id, err)
}
- log.Printf("Cluster: node %d approved, waiting for it to appear as UP...", i)
+ log.Printf("Cluster: node %d approved, waiting for it to appear as UP and with a network address...", i)
for {
nodes, err := getNodes(ctx, mgmt)
if err != nil {
@@ -527,31 +611,37 @@
if !bytes.Equal(n.Pubkey, newNode.Pubkey) {
continue
}
- if n.State == cpb.NodeState_NODE_STATE_UP {
- found = true
+ if n.Status == nil || n.Status.ExternalAddress == "" {
break
}
- time.Sleep(time.Second)
+ if n.State != cpb.NodeState_NODE_STATE_UP {
+ break
+ }
+ found = true
+ cluster.Nodes[identity.NodeID(n.Pubkey)] = &NodeInCluster{
+ ID: identity.NodeID(n.Pubkey),
+ ManagementAddress: n.Status.ExternalAddress,
+ }
+ cluster.NodeIDs = append(cluster.NodeIDs, identity.NodeID(n.Pubkey))
+ break
}
if found {
break
}
+ time.Sleep(time.Second)
}
log.Printf("Cluster: node %d (%s) UP!", i, id)
}
- return &Cluster{
- Debug: apb.NewNodeDebugServiceClient(debugConn),
- Management: mgmt,
- Owner: *cert,
- Ports: portMap,
- nodesDone: done,
+ log.Printf("Cluster: all nodes up:")
+ for _, node := range cluster.Nodes {
+ log.Printf("Cluster: - %s at %s", node.ID, node.ManagementAddress)
+ }
- ctxC: ctxC,
- }, nil
+ return cluster, nil
}
-// Close cancels the running clusters' context and waits for all virtualized
+// Close cancels the running cluster's context and waits for all virtualized
// nodes to stop. It returns an error if stopping the nodes failed, or one of
// the nodes failed to fully start in the first place.
func (c *Cluster) Close() error {
@@ -569,3 +659,30 @@
log.Printf("Cluster: done")
return multierr.Combine(errors...)
}
+
+// DialNode is a grpc.WithContextDialer compatible dialer which dials nodes by
+// their ID. This is performed by connecting to the cluster nanoswitch via its
+// SOCKS proxy, and using the cluster node list for name resolution.
+//
+// For example:
+//
+// grpc.Dial("metropolis-deadbeef:1234", grpc.WithContextDialer(c.DialNode))
+//
+func (c *Cluster) DialNode(_ context.Context, addr string) (net.Conn, error) {
+ host, port, err := net.SplitHostPort(addr)
+ if err != nil {
+ return nil, fmt.Errorf("invalid host:port: %w", err)
+ }
+ // Already an IP address?
+ if net.ParseIP(host) != nil {
+ return c.socksDialer.Dial("tcp", addr)
+ }
+
+ // Otherwise, expect a node name.
+ node, ok := c.Nodes[host]
+ if !ok {
+ return nil, fmt.Errorf("unknown node %q", host)
+ }
+ addr = net.JoinHostPort(node.ManagementAddress, port)
+ return c.socksDialer.Dial("tcp", addr)
+}
diff --git a/metropolis/test/launch/launch.go b/metropolis/test/launch/launch.go
index 219e787..8f72434 100644
--- a/metropolis/test/launch/launch.go
+++ b/metropolis/test/launch/launch.go
@@ -31,9 +31,7 @@
"syscall"
"golang.org/x/sys/unix"
- "google.golang.org/grpc"
- "source.monogon.dev/metropolis/node"
"source.monogon.dev/metropolis/pkg/freeport"
)
@@ -59,7 +57,7 @@
// PortMap represents where VM ports are mapped to on the host. It maps from the VM
// port number to the host port number.
-type PortMap map[node.Port]uint16
+type PortMap map[uint16]uint16
// ToQemuForwards generates QEMU hostfwd values (https://qemu.weilnetz.de/doc/qemu-
// doc.html#:~:text=hostfwd=) for all mapped ports.
@@ -71,24 +69,10 @@
return hostfwdOptions
}
-// DialGRPC creates a gRPC client for a VM port that's forwarded/mapped to the
-// host. The given port is automatically resolved to the host-mapped port.
-func (p PortMap) DialGRPC(port node.Port, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
- mappedPort, ok := p[port]
- if !ok {
- return nil, fmt.Errorf("cannot dial port: port %d is not mapped/forwarded", port)
- }
- grpcClient, err := grpc.Dial(fmt.Sprintf("localhost:%d", mappedPort), opts...)
- if err != nil {
- return nil, fmt.Errorf("failed to dial port %d: %w", port, err)
- }
- return grpcClient, nil
-}
-
// IdentityPortMap returns a port map where each given port is mapped onto itself
// on the host. This is mainly useful for development against Metropolis. The dbg
// command requires this mapping.
-func IdentityPortMap(ports []node.Port) PortMap {
+func IdentityPortMap(ports []uint16) PortMap {
portMap := make(PortMap)
for _, port := range ports {
portMap[port] = uint16(port)
@@ -101,7 +85,7 @@
// multiple instances of Metropolis nodes might be running. Please call this
// function for each Launch command separately and as close to it as possible since
// it cannot guarantee that the ports will remain free.
-func ConflictFreePortMap(ports []node.Port) (PortMap, error) {
+func ConflictFreePortMap(ports []uint16) (PortMap, error) {
portMap := make(PortMap)
for _, port := range ports {
mappedPort, listenCloser, err := freeport.AllocateTCPPort()
diff --git a/metropolis/test/nanoswitch/BUILD b/metropolis/test/nanoswitch/BUILD
index 74f2ddf..a3163f5 100644
--- a/metropolis/test/nanoswitch/BUILD
+++ b/metropolis/test/nanoswitch/BUILD
@@ -3,7 +3,10 @@
go_library(
name = "nanoswitch_lib",
- srcs = ["nanoswitch.go"],
+ srcs = [
+ "nanoswitch.go",
+ "socks.go",
+ ],
importpath = "source.monogon.dev/metropolis/test/nanoswitch",
visibility = ["//visibility:private"],
deps = [
@@ -11,6 +14,7 @@
"//metropolis/node/core/network/dhcp4c",
"//metropolis/node/core/network/dhcp4c/callback",
"//metropolis/pkg/logtree",
+ "//metropolis/pkg/socksproxy",
"//metropolis/pkg/supervisor",
"//metropolis/test/launch",
"@com_github_google_nftables//:nftables",
diff --git a/metropolis/test/nanoswitch/nanoswitch.go b/metropolis/test/nanoswitch/nanoswitch.go
index de04a42..5cc2077 100644
--- a/metropolis/test/nanoswitch/nanoswitch.go
+++ b/metropolis/test/nanoswitch/nanoswitch.go
@@ -20,7 +20,10 @@
// served by a built-in DHCP server. Traffic from that network to the
// SLIRP/external network is SNATed as the host-side SLIRP ignores routed
// packets.
-// It also has built-in userspace proxying support for debugging.
+//
+// It also has built-in userspace proxying support for accessing the first
+// node's services, as well as a SOCKS proxy to access all nodes within the
+// network.
package main
import (
@@ -312,6 +315,7 @@
supervisor.Run(ctx, "proxy-dbg1", userspaceProxy(net.IPv4(10, 1, 0, 2), common.DebugServicePort))
supervisor.Run(ctx, "proxy-k8s-api1", userspaceProxy(net.IPv4(10, 1, 0, 2), common.KubernetesAPIPort))
supervisor.Run(ctx, "proxy-k8s-api-wrapped1", userspaceProxy(net.IPv4(10, 1, 0, 2), common.KubernetesAPIWrappedPort))
+ supervisor.Run(ctx, "socks", runSOCKSProxy)
supervisor.Signal(ctx, supervisor.SignalHealthy)
supervisor.Signal(ctx, supervisor.SignalDone)
return nil
diff --git a/metropolis/test/nanoswitch/socks.go b/metropolis/test/nanoswitch/socks.go
new file mode 100644
index 0000000..7b0278a
--- /dev/null
+++ b/metropolis/test/nanoswitch/socks.go
@@ -0,0 +1,70 @@
+package main
+
+import (
+ "context"
+ "fmt"
+ "net"
+
+ "source.monogon.dev/metropolis/pkg/socksproxy"
+ "source.monogon.dev/metropolis/pkg/supervisor"
+)
+
+// ONCHANGE(//metropolis/test/launch/cluster:cluster.go): port must be kept in sync
+const SOCKSPort uint16 = 1080
+
+// socksHandler implements a socksproxy.Handler which permits and logs
+// connections to the nanoswitch network.
+type socksHandler struct{}
+
+func (s *socksHandler) Connect(ctx context.Context, req *socksproxy.ConnectRequest) *socksproxy.ConnectResponse {
+ logger := supervisor.Logger(ctx)
+ target := net.JoinHostPort(req.Address.String(), fmt.Sprintf("%d", req.Port))
+
+ if len(req.Address) != 4 {
+ logger.Warningf("Connect %s: wrong address type", target)
+ return &socksproxy.ConnectResponse{
+ Error: socksproxy.ReplyAddressTypeNotSupported,
+ }
+ }
+
+ addr := req.Address
+ switchCIDR := net.IPNet{
+ IP: switchIP.Mask(switchSubnetMask),
+ Mask: switchSubnetMask,
+ }
+ if !switchCIDR.Contains(addr) || switchCIDR.IP.Equal(addr) {
+ logger.Warningf("Connect %s: not in switch network", target)
+ return &socksproxy.ConnectResponse{
+ Error: socksproxy.ReplyNetworkUnreachable,
+ }
+ }
+
+ con, err := net.Dial("tcp", target)
+ if err != nil {
+ logger.Warningf("Connect %s: dial failed: %v", target, err)
+ return &socksproxy.ConnectResponse{
+ Error: socksproxy.ReplyHostUnreachable,
+ }
+ }
+ res, err := socksproxy.ConnectResponseFromConn(con)
+ if err != nil {
+ logger.Warningf("Connect %s: could not make SOCKS response: %v", target, err)
+ return &socksproxy.ConnectResponse{
+ Error: socksproxy.ReplyGeneralFailure,
+ }
+ }
+ logger.Infof("Connect %s: established", target)
+ return res
+}
+
+// runSOCKSProxy starts a SOCKS proxy to the nanoswitchnetwork at SOCKSPort.
+func runSOCKSProxy(ctx context.Context) error {
+ lis, err := net.Listen("tcp", fmt.Sprintf(":%d", SOCKSPort))
+ if err != nil {
+ return fmt.Errorf("failed to listen on :%d : %v", SOCKSPort, err)
+ }
+
+ h := &socksHandler{}
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ return socksproxy.Serve(ctx, h, lis)
+}