metropolis: stub out log service
The server side and client-side implementations are not quite ready yet,
but we're commiting this early so that we can start implementing more
node-local management RPCs.
Change-Id: I81b615b0f77dc7750cc738d60ee4923c3182721b
Reviewed-on: https://review.monogon.dev/c/monogon/+/1429
Tested-by: Jenkins CI
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/cli/metroctl/BUILD.bazel b/metropolis/cli/metroctl/BUILD.bazel
index b2000fb..73a0a6e 100644
--- a/metropolis/cli/metroctl/BUILD.bazel
+++ b/metropolis/cli/metroctl/BUILD.bazel
@@ -7,6 +7,7 @@
"cmd_k8scredplugin.go",
"cmd_node.go",
"cmd_node_approve.go",
+ "cmd_node_logs.go",
"cmd_node_set.go",
"cmd_takeownership.go",
"main.go",
@@ -29,6 +30,7 @@
"//metropolis/node",
"//metropolis/node/core/identity",
"//metropolis/node/core/rpc",
+ "//metropolis/pkg/logtree",
"//metropolis/proto/api",
"@com_github_adrg_xdg//:xdg",
"@com_github_spf13_cobra//:cobra",
diff --git a/metropolis/cli/metroctl/cmd_node.go b/metropolis/cli/metroctl/cmd_node.go
index 97198d8..0f841ef 100644
--- a/metropolis/cli/metroctl/cmd_node.go
+++ b/metropolis/cli/metroctl/cmd_node.go
@@ -60,6 +60,7 @@
func init() {
nodeCmd.AddCommand(nodeDescribeCmd)
nodeCmd.AddCommand(nodeListCmd)
+ nodeCmd.AddCommand(nodeLogsCmd)
rootCmd.AddCommand(nodeCmd)
}
diff --git a/metropolis/cli/metroctl/cmd_node_logs.go b/metropolis/cli/metroctl/cmd_node_logs.go
new file mode 100644
index 0000000..21d5637
--- /dev/null
+++ b/metropolis/cli/metroctl/cmd_node_logs.go
@@ -0,0 +1,94 @@
+package main
+
+import (
+ "crypto/x509"
+ "errors"
+ "fmt"
+ "io"
+
+ "github.com/spf13/cobra"
+
+ "source.monogon.dev/metropolis/cli/metroctl/core"
+ "source.monogon.dev/metropolis/pkg/logtree"
+ "source.monogon.dev/metropolis/proto/api"
+)
+
+var nodeLogsCmd = &cobra.Command{
+ Short: "Get/stream logs from node",
+ Use: "logs [node-id]",
+ Args: cobra.MinimumNArgs(1),
+ RunE: func(cmd *cobra.Command, args []string) error {
+ ctx := cmd.Context()
+
+ // First connect to the main management service and figure out the node's IP
+ // address.
+ cc := dialAuthenticated(ctx)
+ mgmt := api.NewManagementClient(cc)
+ nodes, err := core.GetNodes(ctx, mgmt, fmt.Sprintf("node.id == %q", args[0]))
+ if err != nil {
+ return fmt.Errorf("when getting node info: %w", err)
+ }
+
+ if len(nodes) == 0 {
+ return fmt.Errorf("no such node")
+ }
+ if len(nodes) > 1 {
+ return fmt.Errorf("expression matched more than one node")
+ }
+ n := nodes[0]
+ if n.Status == nil || n.Status.ExternalAddress == "" {
+ return fmt.Errorf("node has no external address")
+ }
+
+ // TODO(q3k): save CA certificate on takeover
+ info, err := mgmt.GetClusterInfo(ctx, &api.GetClusterInfoRequest{})
+ if err != nil {
+ return fmt.Errorf("couldn't get cluster info: %w", err)
+ }
+ cacert, err := x509.ParseCertificate(info.CaCertificate)
+ if err != nil {
+ return fmt.Errorf("remote CA certificate invalid: %w", err)
+ }
+
+ fmt.Printf("Getting logs from %s (%s)...\n", n.Id, n.Status.ExternalAddress)
+ // Dial the actual node at its management port.
+ cl := dialAuthenticatedNode(ctx, n.Id, n.Status.ExternalAddress, cacert)
+ nmgmt := api.NewNodeManagementClient(cl)
+
+ srv, err := nmgmt.Logs(ctx, &api.GetLogsRequest{
+ Dn: "",
+ BacklogMode: api.GetLogsRequest_BACKLOG_ALL,
+ StreamMode: api.GetLogsRequest_STREAM_DISABLE,
+ Filters: []*api.LogFilter{
+ {
+ Filter: &api.LogFilter_WithChildren_{
+ WithChildren: &api.LogFilter_WithChildren{},
+ },
+ },
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("failed to get logs: %w", err)
+ }
+ for {
+ res, err := srv.Recv()
+ if errors.Is(err, io.EOF) {
+ fmt.Println("Done.")
+ break
+ }
+ if err != nil {
+ return fmt.Errorf("log stream failed: %w", err)
+ }
+ for _, entry := range res.BacklogEntries {
+ entry, err := logtree.LogEntryFromProto(entry)
+ if err != nil {
+ fmt.Printf("invalid entry: %v\n", err)
+ continue
+ }
+ fmt.Println(entry.String())
+ }
+ }
+
+ return nil
+ },
+}
diff --git a/metropolis/cli/metroctl/core/rpc.go b/metropolis/cli/metroctl/core/rpc.go
index 00f1f01..4aeda75 100644
--- a/metropolis/cli/metroctl/core/rpc.go
+++ b/metropolis/cli/metroctl/core/rpc.go
@@ -81,6 +81,33 @@
return c, nil
}
+func DialNode(ctx context.Context, opkey ed25519.PrivateKey, ocert, ca *x509.Certificate, proxyAddr, nodeId, nodeAddr string) (*grpc.ClientConn, error) {
+ var dialOpts []grpc.DialOption
+
+ if opkey == nil {
+ return nil, fmt.Errorf("an owner's private key must be provided")
+ }
+ if proxyAddr != "" {
+ socksDialer, err := proxy.SOCKS5("tcp", proxyAddr, nil, proxy.Direct)
+ if err != nil {
+ return nil, fmt.Errorf("failed to build a SOCKS dialer: %v", err)
+ }
+ grpcd := func(_ context.Context, addr string) (net.Conn, error) {
+ return socksDialer.Dial("tcp", addr)
+ }
+ dialOpts = append(dialOpts, grpc.WithContextDialer(grpcd))
+ }
+ tlsc := tls.Certificate{
+ Certificate: [][]byte{ocert.Raw},
+ PrivateKey: opkey,
+ }
+ creds := rpc.NewAuthenticatedCredentials(tlsc, rpc.WantRemoteCluster(ca), rpc.WantRemoteNode(nodeId))
+ dialOpts = append(dialOpts, grpc.WithTransportCredentials(creds))
+
+ endpoint := net.JoinHostPort(nodeAddr, node.NodeManagement.PortString())
+ return grpc.Dial(endpoint, dialOpts...)
+}
+
// GetNodes retrieves node records, filtered by the supplied node filter
// expression fexp.
func GetNodes(ctx context.Context, mgmt api.ManagementClient, fexp string) ([]*api.Node, error) {
diff --git a/metropolis/cli/metroctl/rpc.go b/metropolis/cli/metroctl/rpc.go
index 8e8ee48..6d7beab 100644
--- a/metropolis/cli/metroctl/rpc.go
+++ b/metropolis/cli/metroctl/rpc.go
@@ -2,6 +2,7 @@
import (
"context"
+ "crypto/x509"
"log"
"google.golang.org/grpc"
@@ -25,3 +26,17 @@
}
return cc
}
+
+func dialAuthenticatedNode(ctx context.Context, id, address string, cacert *x509.Certificate) *grpc.ClientConn {
+ // Collect credentials, validate command parameters, and try dialing the
+ // cluster.
+ ocert, opkey, err := core.GetOwnerCredentials(flags.configPath)
+ if err == core.NoCredentialsError {
+ log.Fatalf("You have to take ownership of the cluster first: %v", err)
+ }
+ cc, err := core.DialNode(ctx, opkey, ocert, cacert, flags.proxyAddr, id, address)
+ if err != nil {
+ log.Fatalf("While dialing node: %v", err)
+ }
+ return cc
+}