m/n/c/curator: inject Spans into RPCs, log events

This uses the new Span/Trace API in the RPC library to inject some spans
into all Curator RPC handlers, and converts a bunch of TODO: add logging
comments into Trace(ctx).Printf.

Change-Id: Ie480fa7020246b60befa024e000f9e452daabe0c
Reviewed-on: https://review.monogon.dev/c/monogon/+/542
Reviewed-by: Leopold Schabel <leo@nexantic.com>
diff --git a/metropolis/node/core/curator/impl_leader.go b/metropolis/node/core/curator/impl_leader.go
index 08772af..a1b9941 100644
--- a/metropolis/node/core/curator/impl_leader.go
+++ b/metropolis/node/core/curator/impl_leader.go
@@ -4,6 +4,7 @@
 	"context"
 	"errors"
 	"fmt"
+	"strings"
 	"sync"
 
 	"go.etcd.io/etcd/clientv3"
@@ -12,6 +13,7 @@
 
 	"source.monogon.dev/metropolis/node/core/consensus/client"
 	"source.monogon.dev/metropolis/node/core/identity"
+	"source.monogon.dev/metropolis/node/core/rpc"
 )
 
 // leadership represents the curator leader's ability to perform actions as a
@@ -50,15 +52,32 @@
 // txnAsLeader performs an etcd transaction guarded by continued leadership.
 // lostLeadership will be returned as an error in case the leadership is lost.
 func (l *leadership) txnAsLeader(ctx context.Context, ops ...clientv3.Op) (*clientv3.TxnResponse, error) {
+	var opsStr []string
+	for _, op := range ops {
+		opstr := "unk"
+		switch {
+		case op.IsGet():
+			opstr = "get"
+		case op.IsDelete():
+			opstr = "delete"
+		case op.IsPut():
+			opstr = "put"
+		}
+		opsStr = append(opsStr, fmt.Sprintf("%s: %s", opstr, op.KeyBytes()))
+	}
+	rpc.Trace(ctx).Printf("txnAsLeader(%s)...", strings.Join(opsStr, ","))
 	resp, err := l.etcd.Txn(ctx).If(
 		clientv3.Compare(clientv3.CreateRevision(l.lockKey), "=", l.lockRev),
 	).Then(ops...).Commit()
 	if err != nil {
+		rpc.Trace(ctx).Printf("txnAsLeader(...): failed: %v", err)
 		return nil, fmt.Errorf("when running leader transaction: %w", err)
 	}
 	if !resp.Succeeded {
+		rpc.Trace(ctx).Printf("txnAsLeader(...): rejected (lost leadership)")
 		return nil, lostLeadership
 	}
+	rpc.Trace(ctx).Printf("txnAsLeader(...): ok")
 	return resp, nil
 }
 
diff --git a/metropolis/node/core/curator/impl_leader_aaa.go b/metropolis/node/core/curator/impl_leader_aaa.go
index a66c249..c9eb08f 100644
--- a/metropolis/node/core/curator/impl_leader_aaa.go
+++ b/metropolis/node/core/curator/impl_leader_aaa.go
@@ -33,7 +33,6 @@
 	res, err := a.etcd.Get(ctx, initialOwnerEtcdPath)
 	if err != nil {
 		if !errors.Is(err, ctx.Err()) {
-			// TODO(issues/85): log
 			return nil, status.Error(codes.Unavailable, "could not retrieve initial owner status in etcd")
 		}
 		return nil, err
diff --git a/metropolis/node/core/curator/impl_leader_curator.go b/metropolis/node/core/curator/impl_leader_curator.go
index 6241827..348ec2c 100644
--- a/metropolis/node/core/curator/impl_leader_curator.go
+++ b/metropolis/node/core/curator/impl_leader_curator.go
@@ -72,7 +72,7 @@
 			if rpcErr, ok := rpcError(err); ok {
 				return rpcErr
 			}
-			// TODO(issues/85): log err
+			rpc.Trace(ctx).Printf("etcd watch failed: %v", err)
 			return status.Error(codes.Unavailable, "internal error")
 		}
 
@@ -105,7 +105,7 @@
 			break
 		}
 		if err != nil {
-			// TODO(issues/85): log err
+			rpc.Trace(ctx).Printf("etcd watch failed (initial fetch): %v", err)
 			return status.Error(codes.Unavailable, "internal error during initial fetch")
 		}
 		nodeKV := v.(nodeAtID)
@@ -143,7 +143,7 @@
 	for {
 		v, err := w.Get(ctx)
 		if err != nil {
-			// TODO(issues/85): log err
+			rpc.Trace(ctx).Printf("etcd watch failed (update): %v", err)
 			return status.Errorf(codes.Unavailable, "internal error during update")
 		}
 		we := &ipb.WatchEvent{}
@@ -261,7 +261,7 @@
 	// valid.
 	wantTicket, err := l.ensureRegisterTicket(ctx)
 	if err != nil {
-		// TODO(issues/85): log err
+		rpc.Trace(ctx).Printf("could not ensure register ticket: %v", err)
 		return nil, status.Error(codes.Unavailable, "could not retrieve register ticket")
 	}
 	gotTicket := req.RegisterTicket
@@ -291,7 +291,7 @@
 		// cluster, it should have access to the status of the node without danger of
 		// leaking data about other nodes.
 		//
-		// TODO(issues/85): log this
+		rpc.Trace(ctx).Printf("node %s already exists in cluster, failing", id)
 		return nil, status.Errorf(codes.FailedPrecondition, "node already exists in cluster, state %s", node.state.String())
 	}
 	if err != errNodeNotFound {
diff --git a/metropolis/node/core/curator/impl_leader_management.go b/metropolis/node/core/curator/impl_leader_management.go
index 710eb93..c24a53d 100644
--- a/metropolis/node/core/curator/impl_leader_management.go
+++ b/metropolis/node/core/curator/impl_leader_management.go
@@ -10,6 +10,7 @@
 	"google.golang.org/grpc/status"
 
 	"source.monogon.dev/metropolis/node/core/identity"
+	"source.monogon.dev/metropolis/node/core/rpc"
 	apb "source.monogon.dev/metropolis/proto/api"
 	cpb "source.monogon.dev/metropolis/proto/common"
 )
@@ -72,7 +73,7 @@
 	for _, kv := range kvs {
 		node, err := nodeUnmarshal(kv.Value)
 		if err != nil {
-			// TODO(issues/85): log this
+			rpc.Trace(ctx).Printf("Unmarshalling node %q failed: %v", kv.Value, err)
 			continue
 		}
 		if node.state != cpb.NodeState_NODE_STATE_UP {
@@ -130,7 +131,7 @@
 	for _, kv := range kvs {
 		node, err := nodeUnmarshal(kv.Value)
 		if err != nil {
-			// TODO(issues/85): log this
+			rpc.Trace(ctx).Printf("Unmarshalling node %q failed: %v", kv.Value, err)
 			continue
 		}
 
diff --git a/metropolis/node/core/curator/listener.go b/metropolis/node/core/curator/listener.go
index 3298d12..b0b8d5d 100644
--- a/metropolis/node/core/curator/listener.go
+++ b/metropolis/node/core/curator/listener.go
@@ -218,7 +218,7 @@
 		}
 		defer lisLocal.Close()
 
-		runnable := supervisor.GRPCServer(ls.SetupLocalGRPC(nil, l), lisLocal, true)
+		runnable := supervisor.GRPCServer(ls.SetupLocalGRPC(supervisor.MustSubLogger(ctx, "rpc"), l), lisLocal, true)
 		return runnable(ctx)
 	})
 	if err != nil {
@@ -232,7 +232,7 @@
 		}
 		defer lisExternal.Close()
 
-		runnable := supervisor.GRPCServer(es.SetupExternalGRPC(nil, l), lisExternal, true)
+		runnable := supervisor.GRPCServer(es.SetupExternalGRPC(supervisor.MustSubLogger(ctx, "rpc"), l), lisExternal, true)
 		return runnable(ctx)
 	})
 	if err != nil {
diff --git a/metropolis/node/core/curator/state_node.go b/metropolis/node/core/curator/state_node.go
index c391e73..e4012cd 100644
--- a/metropolis/node/core/curator/state_node.go
+++ b/metropolis/node/core/curator/state_node.go
@@ -27,6 +27,7 @@
 
 	ppb "source.monogon.dev/metropolis/node/core/curator/proto/private"
 	"source.monogon.dev/metropolis/node/core/identity"
+	"source.monogon.dev/metropolis/node/core/rpc"
 	cpb "source.monogon.dev/metropolis/proto/common"
 )
 
@@ -165,6 +166,7 @@
 // untrusted callers. If the given node is not found, errNodeNotFound will be
 // returned.
 func nodeLoad(ctx context.Context, l *leadership, id string) (*Node, error) {
+	rpc.Trace(ctx).Printf("loadNode(%s)...", id)
 	key, err := nodeEtcdPrefix.Key(id)
 	if err != nil {
 		// TODO(issues/85): log err
@@ -179,6 +181,7 @@
 		return nil, status.Errorf(codes.Unavailable, "could not retrieve node %s: %v", id, err)
 	}
 	kvs := res.Responses[0].GetResponseRange().Kvs
+	rpc.Trace(ctx).Printf("loadNode(%s): %d KVs", id, len(kvs))
 	if len(kvs) != 1 {
 		return nil, errNodeNotFound
 	}
@@ -187,6 +190,7 @@
 		// TODO(issues/85): log this
 		return nil, status.Errorf(codes.Unavailable, "could not unmarshal node")
 	}
+	rpc.Trace(ctx).Printf("loadNode(%s): unmarshal ok", id)
 	return node, nil
 }
 
@@ -194,6 +198,7 @@
 // All returned errors are gRPC statuses that safe to return to untrusted callers.
 func nodeSave(ctx context.Context, l *leadership, n *Node) error {
 	id := n.ID()
+	rpc.Trace(ctx).Printf("nodeSave(%s)...", id)
 	key, err := nodeEtcdPrefix.Key(id)
 	if err != nil {
 		// TODO(issues/85): log err
@@ -212,5 +217,6 @@
 		// TODO(issues/85): log this
 		return status.Error(codes.Unavailable, "could not save updated node")
 	}
+	rpc.Trace(ctx).Printf("nodeSave(%s): write ok", id)
 	return nil
 }
diff --git a/metropolis/node/core/curator/state_registerticket.go b/metropolis/node/core/curator/state_registerticket.go
index e516661..4b674aa 100644
--- a/metropolis/node/core/curator/state_registerticket.go
+++ b/metropolis/node/core/curator/state_registerticket.go
@@ -10,6 +10,7 @@
 	"google.golang.org/protobuf/proto"
 
 	ppb "source.monogon.dev/metropolis/node/core/curator/proto/private"
+	"source.monogon.dev/metropolis/node/core/rpc"
 )
 
 // ensureRegisterTicket returns the cluster's current RegisterTicket, creating
@@ -18,6 +19,8 @@
 	l.muRegisterTicket.Lock()
 	defer l.muRegisterTicket.Unlock()
 
+	rpc.Trace(ctx).Printf("ensureRegisterTicket()...")
+
 	// Retrieve existing ticket, if any.
 	res, err := l.txnAsLeader(ctx, clientv3.OpGet(registerTicketEtcdPath))
 	if err != nil {
@@ -26,6 +29,7 @@
 	kvs := res.Responses[0].GetResponseRange().Kvs
 	if len(kvs) > 0 {
 		// Ticket already generated, return.
+		rpc.Trace(ctx).Printf("ensureRegisterTicket(): ticket already exists")
 		return kvs[0].Value, nil
 	}
 
@@ -48,5 +52,7 @@
 		return nil, status.Errorf(codes.Unavailable, "could not save new ticket: %v", err)
 	}
 
+	rpc.Trace(ctx).Printf("ensureRegisterTicket(): generated and saved new ticket")
+
 	return ticketBytes, nil
 }
diff --git a/metropolis/pkg/supervisor/supervisor.go b/metropolis/pkg/supervisor/supervisor.go
index ef7b909..9865279 100644
--- a/metropolis/pkg/supervisor/supervisor.go
+++ b/metropolis/pkg/supervisor/supervisor.go
@@ -187,3 +187,14 @@
 	dn := fmt.Sprintf("%s.%s", node.dn(), name)
 	return node.sup.logtree.LeveledFor(logtree.DN(dn))
 }
+
+// MustSubLogger is a wrapper around SubLogger which panics on error. Errors
+// should only happen due to invalid names, so as long as the given name is
+// compile-time constant and valid, this function is safe to use.
+func MustSubLogger(ctx context.Context, name string) logtree.LeveledLogger {
+	l, err := SubLogger(ctx, name)
+	if err != nil {
+		panic(err)
+	}
+	return l
+}