m/n/core/consensus: log etcd into sub-DN and wait for DNS resolvability
This lets us distrnguish between things that etcd logs (which is often
extremely verbose) what our own consensus service says.
It also makes the consensus service wait for DNS resolvability before
attempting to join an existing cluster, which makes etcd startup much
cleaner (as etcd will itself crash if it cannot immediately resolve its
ExistingPeers in startup).
Change-Id: Icc6a5a40fc56733cc24ccd88af0a73feba4f6922
Reviewed-on: https://review.monogon.dev/c/monogon/+/1356
Tested-by: Jenkins CI
Reviewed-by: Lorenz Brun <lorenz@monogon.tech>
diff --git a/metropolis/node/core/consensus/consensus.go b/metropolis/node/core/consensus/consensus.go
index fff95b0..c000bb0 100644
--- a/metropolis/node/core/consensus/consensus.go
+++ b/metropolis/node/core/consensus/consensus.go
@@ -92,6 +92,8 @@
"crypto/x509/pkix"
"fmt"
"math/big"
+ "net"
+ "net/url"
"time"
clientv3 "go.etcd.io/etcd/client/v3"
@@ -168,18 +170,31 @@
//
// TODO(q3k): add support for streaming to a sub-logger in the tree to get
// cleaner logs.
- converter := unraw.Converter{
- Parser: parseEtcdLogEntry,
- MaximumLineLength: 8192,
- LeveledLogger: supervisor.Logger(ctx),
- }
+
fifoPath := s.config.Ephemeral.ServerLogsFIFO.FullPath()
- pipe, err := converter.NamedPipeReader(fifoPath)
+
+ // This is not where etcd will run, but where its log ingestion machinery lives.
+ // This ensures that the (annoying verbose) etcd logs are contained into just
+ // .etcd.
+ err := supervisor.Run(ctx, "etcd", func(ctx context.Context) error {
+ converter := unraw.Converter{
+ Parser: parseEtcdLogEntry,
+ MaximumLineLength: 8192,
+ LeveledLogger: supervisor.Logger(ctx),
+ }
+ pipe, err := converter.NamedPipeReader(fifoPath)
+ if err != nil {
+ return fmt.Errorf("when creating pipe reader: %w", err)
+ }
+ if err := supervisor.Run(ctx, "piper", pipe); err != nil {
+ return fmt.Errorf("when starting log piper: %w", err)
+ }
+ supervisor.Signal(ctx, supervisor.SignalHealthy)
+ <-ctx.Done()
+ return ctx.Err()
+ })
if err != nil {
- return fmt.Errorf("when creating pipe reader: %w", err)
- }
- if err := supervisor.Run(ctx, "piper", pipe); err != nil {
- return fmt.Errorf("when starting log piper: %w", err)
+ return fmt.Errorf("when starting etcd logger: %w", err)
}
// Create autopromoter, which will automatically promote all learners to full
@@ -228,7 +243,45 @@
}
}
+ // If we're joining a cluster, make sure that our peers are actually DNS
+ // resolvable. This prevents us from immediately failing due to transient DNS
+ // issues.
+ if jc := s.config.JoinCluster; jc != nil {
+ supervisor.Logger(ctx).Infof("Waiting for initial peers to be DNS resolvable...")
+ startLogging := time.Now().Add(5 * time.Second)
+ for {
+ allOkay := true
+ shouldLog := time.Now().After(startLogging)
+ for _, node := range jc.ExistingNodes {
+ u, _ := url.Parse(node.URL)
+ if err != nil {
+ // Just pretend this node is up. If the URL is really bad, etcd will complain
+ // more clearly than us. This shouldn't happen, anyway.
+ }
+ host := u.Hostname()
+ _, err := net.LookupIP(host)
+ if err == nil {
+ continue
+ }
+ if shouldLog {
+ supervisor.Logger(ctx).Errorf("Still can't resolve peer %s (%s): %v", node.Name, host, err)
+ }
+ allOkay = false
+ }
+ if allOkay {
+ supervisor.Logger(ctx).Infof("All peers resolvable, continuing startup.")
+ break
+ }
+
+ time.Sleep(100 * time.Millisecond)
+ if shouldLog {
+ startLogging = time.Now().Add(5 * time.Second)
+ }
+ }
+ }
+
// Start etcd ...
+ supervisor.Logger(ctx).Infof("Starting etcd...")
cfg := s.config.build(true)
server, err := embed.StartEtcd(cfg)
if err != nil {