m/n/c/curator: inject Spans into RPCs, log events
This uses the new Span/Trace API in the RPC library to inject some spans
into all Curator RPC handlers, and converts a bunch of TODO: add logging
comments into Trace(ctx).Printf.
Change-Id: Ie480fa7020246b60befa024e000f9e452daabe0c
Reviewed-on: https://review.monogon.dev/c/monogon/+/542
Reviewed-by: Leopold Schabel <leo@nexantic.com>
diff --git a/metropolis/node/core/curator/impl_leader.go b/metropolis/node/core/curator/impl_leader.go
index 08772af..a1b9941 100644
--- a/metropolis/node/core/curator/impl_leader.go
+++ b/metropolis/node/core/curator/impl_leader.go
@@ -4,6 +4,7 @@
"context"
"errors"
"fmt"
+ "strings"
"sync"
"go.etcd.io/etcd/clientv3"
@@ -12,6 +13,7 @@
"source.monogon.dev/metropolis/node/core/consensus/client"
"source.monogon.dev/metropolis/node/core/identity"
+ "source.monogon.dev/metropolis/node/core/rpc"
)
// leadership represents the curator leader's ability to perform actions as a
@@ -50,15 +52,32 @@
// txnAsLeader performs an etcd transaction guarded by continued leadership.
// lostLeadership will be returned as an error in case the leadership is lost.
func (l *leadership) txnAsLeader(ctx context.Context, ops ...clientv3.Op) (*clientv3.TxnResponse, error) {
+ var opsStr []string
+ for _, op := range ops {
+ opstr := "unk"
+ switch {
+ case op.IsGet():
+ opstr = "get"
+ case op.IsDelete():
+ opstr = "delete"
+ case op.IsPut():
+ opstr = "put"
+ }
+ opsStr = append(opsStr, fmt.Sprintf("%s: %s", opstr, op.KeyBytes()))
+ }
+ rpc.Trace(ctx).Printf("txnAsLeader(%s)...", strings.Join(opsStr, ","))
resp, err := l.etcd.Txn(ctx).If(
clientv3.Compare(clientv3.CreateRevision(l.lockKey), "=", l.lockRev),
).Then(ops...).Commit()
if err != nil {
+ rpc.Trace(ctx).Printf("txnAsLeader(...): failed: %v", err)
return nil, fmt.Errorf("when running leader transaction: %w", err)
}
if !resp.Succeeded {
+ rpc.Trace(ctx).Printf("txnAsLeader(...): rejected (lost leadership)")
return nil, lostLeadership
}
+ rpc.Trace(ctx).Printf("txnAsLeader(...): ok")
return resp, nil
}
diff --git a/metropolis/node/core/curator/impl_leader_aaa.go b/metropolis/node/core/curator/impl_leader_aaa.go
index a66c249..c9eb08f 100644
--- a/metropolis/node/core/curator/impl_leader_aaa.go
+++ b/metropolis/node/core/curator/impl_leader_aaa.go
@@ -33,7 +33,6 @@
res, err := a.etcd.Get(ctx, initialOwnerEtcdPath)
if err != nil {
if !errors.Is(err, ctx.Err()) {
- // TODO(issues/85): log
return nil, status.Error(codes.Unavailable, "could not retrieve initial owner status in etcd")
}
return nil, err
diff --git a/metropolis/node/core/curator/impl_leader_curator.go b/metropolis/node/core/curator/impl_leader_curator.go
index 6241827..348ec2c 100644
--- a/metropolis/node/core/curator/impl_leader_curator.go
+++ b/metropolis/node/core/curator/impl_leader_curator.go
@@ -72,7 +72,7 @@
if rpcErr, ok := rpcError(err); ok {
return rpcErr
}
- // TODO(issues/85): log err
+ rpc.Trace(ctx).Printf("etcd watch failed: %v", err)
return status.Error(codes.Unavailable, "internal error")
}
@@ -105,7 +105,7 @@
break
}
if err != nil {
- // TODO(issues/85): log err
+ rpc.Trace(ctx).Printf("etcd watch failed (initial fetch): %v", err)
return status.Error(codes.Unavailable, "internal error during initial fetch")
}
nodeKV := v.(nodeAtID)
@@ -143,7 +143,7 @@
for {
v, err := w.Get(ctx)
if err != nil {
- // TODO(issues/85): log err
+ rpc.Trace(ctx).Printf("etcd watch failed (update): %v", err)
return status.Errorf(codes.Unavailable, "internal error during update")
}
we := &ipb.WatchEvent{}
@@ -261,7 +261,7 @@
// valid.
wantTicket, err := l.ensureRegisterTicket(ctx)
if err != nil {
- // TODO(issues/85): log err
+ rpc.Trace(ctx).Printf("could not ensure register ticket: %v", err)
return nil, status.Error(codes.Unavailable, "could not retrieve register ticket")
}
gotTicket := req.RegisterTicket
@@ -291,7 +291,7 @@
// cluster, it should have access to the status of the node without danger of
// leaking data about other nodes.
//
- // TODO(issues/85): log this
+ rpc.Trace(ctx).Printf("node %s already exists in cluster, failing", id)
return nil, status.Errorf(codes.FailedPrecondition, "node already exists in cluster, state %s", node.state.String())
}
if err != errNodeNotFound {
diff --git a/metropolis/node/core/curator/impl_leader_management.go b/metropolis/node/core/curator/impl_leader_management.go
index 710eb93..c24a53d 100644
--- a/metropolis/node/core/curator/impl_leader_management.go
+++ b/metropolis/node/core/curator/impl_leader_management.go
@@ -10,6 +10,7 @@
"google.golang.org/grpc/status"
"source.monogon.dev/metropolis/node/core/identity"
+ "source.monogon.dev/metropolis/node/core/rpc"
apb "source.monogon.dev/metropolis/proto/api"
cpb "source.monogon.dev/metropolis/proto/common"
)
@@ -72,7 +73,7 @@
for _, kv := range kvs {
node, err := nodeUnmarshal(kv.Value)
if err != nil {
- // TODO(issues/85): log this
+ rpc.Trace(ctx).Printf("Unmarshalling node %q failed: %v", kv.Value, err)
continue
}
if node.state != cpb.NodeState_NODE_STATE_UP {
@@ -130,7 +131,7 @@
for _, kv := range kvs {
node, err := nodeUnmarshal(kv.Value)
if err != nil {
- // TODO(issues/85): log this
+ rpc.Trace(ctx).Printf("Unmarshalling node %q failed: %v", kv.Value, err)
continue
}
diff --git a/metropolis/node/core/curator/listener.go b/metropolis/node/core/curator/listener.go
index 3298d12..b0b8d5d 100644
--- a/metropolis/node/core/curator/listener.go
+++ b/metropolis/node/core/curator/listener.go
@@ -218,7 +218,7 @@
}
defer lisLocal.Close()
- runnable := supervisor.GRPCServer(ls.SetupLocalGRPC(nil, l), lisLocal, true)
+ runnable := supervisor.GRPCServer(ls.SetupLocalGRPC(supervisor.MustSubLogger(ctx, "rpc"), l), lisLocal, true)
return runnable(ctx)
})
if err != nil {
@@ -232,7 +232,7 @@
}
defer lisExternal.Close()
- runnable := supervisor.GRPCServer(es.SetupExternalGRPC(nil, l), lisExternal, true)
+ runnable := supervisor.GRPCServer(es.SetupExternalGRPC(supervisor.MustSubLogger(ctx, "rpc"), l), lisExternal, true)
return runnable(ctx)
})
if err != nil {
diff --git a/metropolis/node/core/curator/state_node.go b/metropolis/node/core/curator/state_node.go
index c391e73..e4012cd 100644
--- a/metropolis/node/core/curator/state_node.go
+++ b/metropolis/node/core/curator/state_node.go
@@ -27,6 +27,7 @@
ppb "source.monogon.dev/metropolis/node/core/curator/proto/private"
"source.monogon.dev/metropolis/node/core/identity"
+ "source.monogon.dev/metropolis/node/core/rpc"
cpb "source.monogon.dev/metropolis/proto/common"
)
@@ -165,6 +166,7 @@
// untrusted callers. If the given node is not found, errNodeNotFound will be
// returned.
func nodeLoad(ctx context.Context, l *leadership, id string) (*Node, error) {
+ rpc.Trace(ctx).Printf("loadNode(%s)...", id)
key, err := nodeEtcdPrefix.Key(id)
if err != nil {
// TODO(issues/85): log err
@@ -179,6 +181,7 @@
return nil, status.Errorf(codes.Unavailable, "could not retrieve node %s: %v", id, err)
}
kvs := res.Responses[0].GetResponseRange().Kvs
+ rpc.Trace(ctx).Printf("loadNode(%s): %d KVs", id, len(kvs))
if len(kvs) != 1 {
return nil, errNodeNotFound
}
@@ -187,6 +190,7 @@
// TODO(issues/85): log this
return nil, status.Errorf(codes.Unavailable, "could not unmarshal node")
}
+ rpc.Trace(ctx).Printf("loadNode(%s): unmarshal ok", id)
return node, nil
}
@@ -194,6 +198,7 @@
// All returned errors are gRPC statuses that safe to return to untrusted callers.
func nodeSave(ctx context.Context, l *leadership, n *Node) error {
id := n.ID()
+ rpc.Trace(ctx).Printf("nodeSave(%s)...", id)
key, err := nodeEtcdPrefix.Key(id)
if err != nil {
// TODO(issues/85): log err
@@ -212,5 +217,6 @@
// TODO(issues/85): log this
return status.Error(codes.Unavailable, "could not save updated node")
}
+ rpc.Trace(ctx).Printf("nodeSave(%s): write ok", id)
return nil
}
diff --git a/metropolis/node/core/curator/state_registerticket.go b/metropolis/node/core/curator/state_registerticket.go
index e516661..4b674aa 100644
--- a/metropolis/node/core/curator/state_registerticket.go
+++ b/metropolis/node/core/curator/state_registerticket.go
@@ -10,6 +10,7 @@
"google.golang.org/protobuf/proto"
ppb "source.monogon.dev/metropolis/node/core/curator/proto/private"
+ "source.monogon.dev/metropolis/node/core/rpc"
)
// ensureRegisterTicket returns the cluster's current RegisterTicket, creating
@@ -18,6 +19,8 @@
l.muRegisterTicket.Lock()
defer l.muRegisterTicket.Unlock()
+ rpc.Trace(ctx).Printf("ensureRegisterTicket()...")
+
// Retrieve existing ticket, if any.
res, err := l.txnAsLeader(ctx, clientv3.OpGet(registerTicketEtcdPath))
if err != nil {
@@ -26,6 +29,7 @@
kvs := res.Responses[0].GetResponseRange().Kvs
if len(kvs) > 0 {
// Ticket already generated, return.
+ rpc.Trace(ctx).Printf("ensureRegisterTicket(): ticket already exists")
return kvs[0].Value, nil
}
@@ -48,5 +52,7 @@
return nil, status.Errorf(codes.Unavailable, "could not save new ticket: %v", err)
}
+ rpc.Trace(ctx).Printf("ensureRegisterTicket(): generated and saved new ticket")
+
return ticketBytes, nil
}
diff --git a/metropolis/pkg/supervisor/supervisor.go b/metropolis/pkg/supervisor/supervisor.go
index ef7b909..9865279 100644
--- a/metropolis/pkg/supervisor/supervisor.go
+++ b/metropolis/pkg/supervisor/supervisor.go
@@ -187,3 +187,14 @@
dn := fmt.Sprintf("%s.%s", node.dn(), name)
return node.sup.logtree.LeveledFor(logtree.DN(dn))
}
+
+// MustSubLogger is a wrapper around SubLogger which panics on error. Errors
+// should only happen due to invalid names, so as long as the given name is
+// compile-time constant and valid, this function is safe to use.
+func MustSubLogger(ctx context.Context, name string) logtree.LeveledLogger {
+ l, err := SubLogger(ctx, name)
+ if err != nil {
+ panic(err)
+ }
+ return l
+}