blob: 8d7565fccfd1ed63106879f1a20f8b8b69006808 [file] [log] [blame]
Mateusz Zalega18a67b02022-08-02 13:37:50 +02001package core
2
3import (
4 "context"
5 "crypto/ed25519"
6 "crypto/tls"
7 "crypto/x509"
8 "fmt"
9 "io"
10 "net"
11
12 "golang.org/x/net/proxy"
13 "google.golang.org/grpc"
14
15 "source.monogon.dev/metropolis/node"
16 "source.monogon.dev/metropolis/node/core/rpc"
17 "source.monogon.dev/metropolis/node/core/rpc/resolver"
18 "source.monogon.dev/metropolis/proto/api"
19)
20
21type ResolverLogger func(format string, args ...interface{})
22
23// DialCluster dials the cluster control address. The owner certificate, and
24// proxy address parameters are optional and can be left nil, and empty,
25// respectively. At least one cluster endpoint must be provided. A missing
26// owner certificate will result in a connection that is authenticated with
27// ephemeral credentials, restricting the available API surface. proxyAddr
28// must point at a SOCKS5 endpoint.
29func DialCluster(ctx context.Context, opkey ed25519.PrivateKey, ocert *x509.Certificate, proxyAddr string, clusterEndpoints []string, rlf ResolverLogger) (*grpc.ClientConn, error) {
30 var dialOpts []grpc.DialOption
31
32 if opkey == nil {
33 return nil, fmt.Errorf("an owner's private key must be provided")
34 }
35 if len(clusterEndpoints) == 0 {
36 return nil, fmt.Errorf("at least one cluster endpoint must be provided")
37 }
38
39 if proxyAddr != "" {
40 socksDialer, err := proxy.SOCKS5("tcp", proxyAddr, nil, proxy.Direct)
41 if err != nil {
42 return nil, fmt.Errorf("failed to build a SOCKS dialer: %v", err)
43 }
44 grpcd := func(_ context.Context, addr string) (net.Conn, error) {
45 return socksDialer.Dial("tcp", addr)
46 }
47 dialOpts = append(dialOpts, grpc.WithContextDialer(grpcd))
48 }
49
50 if ocert == nil {
51 creds, err := rpc.NewEphemeralCredentials(opkey, nil)
52 if err != nil {
53 return nil, fmt.Errorf("while building ephemeral credentials: %v", err)
54 }
55 dialOpts = append(dialOpts, grpc.WithTransportCredentials(creds))
56 } else {
57 tlsc := tls.Certificate{
58 Certificate: [][]byte{ocert.Raw},
59 PrivateKey: opkey,
60 }
61 creds := rpc.NewAuthenticatedCredentials(tlsc, nil)
62 dialOpts = append(dialOpts, grpc.WithTransportCredentials(creds))
63 }
64
65 var resolverOpts []resolver.ResolverOption
66 if rlf != nil {
67 resolverOpts = append(resolverOpts, resolver.WithLogger(rlf))
68 }
69 r := resolver.New(ctx, resolverOpts...)
70
71 for _, eps := range clusterEndpoints {
72 ep := resolver.NodeByHostPort(eps, uint16(node.CuratorServicePort))
73 r.AddEndpoint(ep)
74 }
75 dialOpts = append(dialOpts, grpc.WithResolvers(r))
76
77 c, err := grpc.Dial(resolver.MetropolisControlAddress, dialOpts...)
78 if err != nil {
79 return nil, fmt.Errorf("could not dial: %v", err)
80 }
81 return c, nil
82}
83
84// GetNodes retrieves node records, filtered by the supplied node filter
85// expression fexp.
86func GetNodes(ctx context.Context, mgmt api.ManagementClient, fexp string) ([]*api.Node, error) {
87 resN, err := mgmt.GetNodes(ctx, &api.GetNodesRequest{
88 Filter: fexp,
89 })
90 if err != nil {
91 return nil, err
92 }
93
94 var nodes []*api.Node
95 for {
96 node, err := resN.Recv()
97 if err == io.EOF {
98 break
99 }
100 if err != nil {
101 return nil, err
102 }
103 nodes = append(nodes, node)
104 }
105 return nodes, nil
106}