Revamp DHCP, add basic context management

This started off as a small change to make the network service DHCP client a bit nicer, and ended up basically me half-assedly starting to add context within Smalltown.

In my opionion a simple OnStart/OnStop lifecycle management for services will stop working once we have to start handling failing services. I think taking inspiration from Erlang's OTP and implementing some sort of supervision tree is the way to go. I think this also ties nicely together with Go's context system, at least partially. Implementing the full supervision tree system is out of scope for this change, but at least this introduces .Context() on the base service struct that service implementations can use. Currently each service has its own background context, but again, this should tie into some sort of supervision tree in the future. There will be a design document for this.

I also rejigger the init code to have a context available immediately, and use that to acquire (with timeout) information about DHCP addresses from the network service.

I also fix a bug where the network service is started twice (once by init, once by the smalltown node code; now the smalltown node code takes in a dependency injected network service instead).

I also fix a bug where OnStop would call OnStart. Whoops.

Test Plan: no new functionality, covered by current tests

Bug: T561

X-Origin-Diff: phab/D396
GitOrigin-RevId: adddf3dd2f140b6ea64eb034ff19533d32c4ef23
diff --git a/core/internal/common/service/service.go b/core/internal/common/service/service.go
index e093ff6..1cbedbe 100644
--- a/core/internal/common/service/service.go
+++ b/core/internal/common/service/service.go
@@ -17,9 +17,11 @@
 package service
 
 import (
+	"context"
 	"errors"
-	"go.uber.org/zap"
 	"sync"
+
+	"go.uber.org/zap"
 )
 
 var (
@@ -43,6 +45,16 @@
 
 		mutex   sync.Mutex
 		running bool
+
+		// A context that represents the lifecycle of a service.
+		// It is created right before impl.OnStart, and canceled
+		// right after impl.OnStop is.
+		// This is a transition mechanism from moving from OnStart/OnStop
+		// based lifecycle management of services to a context-based supervision
+		// tree.
+		// Service implementations should access this via .Context()
+		ctx  *context.Context
+		ctxC *context.CancelFunc
 	}
 )
 
@@ -51,6 +63,8 @@
 		Logger: logger,
 		name:   name,
 		impl:   impl,
+		ctx:    nil,
+		ctxC:   nil,
 	}
 }
 
@@ -63,6 +77,10 @@
 		return ErrAlreadyRunning
 	}
 
+	ctx, ctxC := context.WithCancel(context.Background())
+	b.ctx = &ctx
+	b.ctxC = &ctxC
+
 	err := b.impl.OnStart()
 	if err != nil {
 		b.Logger.Error("Failed to start service", zap.String("service", b.name), zap.Error(err))
@@ -83,7 +101,7 @@
 		return ErrNotRunning
 	}
 
-	err := b.impl.OnStart()
+	err := b.impl.OnStop()
 	if err != nil {
 		b.Logger.Error("Failed to stop service", zap.String("service", b.name), zap.Error(err))
 
@@ -91,6 +109,12 @@
 	}
 
 	b.running = false
+
+	// Kill context
+	(*b.ctxC)()
+	b.ctx = nil
+	b.ctxC = nil
+
 	b.Logger.Info("Stopped service", zap.String("service", b.name))
 	return nil
 }
@@ -102,3 +126,12 @@
 
 	return b.running
 }
+
+// Context returns a context that can be used within OnStart() to create new
+// lightweight subservices that use a context for lifecycle management.
+// This is a transition measure before the Service library is rewritten to use
+// a more advanced context-and-returned-error supervision tree.
+// This context can also be used for blocking operations like IO, etc.
+func (b *BaseService) Context() context.Context {
+	return *b.ctx
+}
diff --git a/core/internal/network/BUILD.bazel b/core/internal/network/BUILD.bazel
index 3208ea8..78e4885 100644
--- a/core/internal/network/BUILD.bazel
+++ b/core/internal/network/BUILD.bazel
@@ -2,7 +2,10 @@
 
 go_library(
     name = "go_default_library",
-    srcs = ["main.go"],
+    srcs = [
+        "dhcp.go",
+        "main.go",
+    ],
     importpath = "git.monogon.dev/source/nexantic.git/core/internal/network",
     visibility = ["//:__subpackages__"],
     deps = [
diff --git a/core/internal/network/dhcp.go b/core/internal/network/dhcp.go
new file mode 100644
index 0000000..f3f25aa
--- /dev/null
+++ b/core/internal/network/dhcp.go
@@ -0,0 +1,116 @@
+// Copyright 2020 The Monogon Project Authors.
+//
+// SPDX-License-Identifier: Apache-2.0
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package network
+
+import (
+	"context"
+
+	"github.com/insomniacslk/dhcp/dhcpv4"
+	"github.com/insomniacslk/dhcp/dhcpv4/nclient4"
+	"github.com/vishvananda/netlink"
+	"go.uber.org/zap"
+)
+
+type dhcpClient struct {
+	reqC   chan *dhcpStatusReq
+	logger *zap.Logger
+}
+
+func newDHCPClient(logger *zap.Logger) *dhcpClient {
+	return &dhcpClient{
+		logger: logger,
+		reqC:   make(chan *dhcpStatusReq),
+	}
+}
+
+type dhcpStatusReq struct {
+	resC chan *dhcpv4.DHCPv4
+	wait bool
+}
+
+func (r *dhcpStatusReq) fulfill(p *dhcpv4.DHCPv4) {
+	go func() {
+		r.resC <- p
+	}()
+}
+
+func (c *dhcpClient) run(ctx context.Context, iface netlink.Link) {
+	// Channel updated with address once one gets assigned/updated
+	newC := make(chan *dhcpv4.DHCPv4)
+	// Status requests waiting for configuration
+	waiters := []*dhcpStatusReq{}
+
+	// Start lease acquisition
+	// TODO(q3k): actually maintain the lease instead of hoping we never get
+	// kicked off.
+	client, err := nclient4.New(iface.Attrs().Name)
+	if err != nil {
+		panic(err)
+	}
+	go func() {
+		_, ack, err := client.Request(ctx)
+		if err != nil {
+			c.logger.Error("DHCP lease request failed", zap.Error(err))
+			// TODO(q3k): implement retry logic with full state machine
+		}
+		newC <- ack
+	}()
+
+	// State machine
+	// Two implicit states: WAITING -> ASSIGNED
+	// We start at WAITING, once we get a current config we move to ASSIGNED
+	// Once this becomes more complex (ie. has to handle link state changes)
+	// this should grow into a real state machine.
+	var current *dhcpv4.DHCPv4
+	c.logger.Info("DHCP client WAITING")
+	for {
+		select {
+		case <-ctx.Done():
+			// TODO(q3k): don't leave waiters hanging
+			return
+
+		case cfg := <-newC:
+			current = cfg
+			c.logger.Info("DHCP client ASSIGNED", zap.String("ip", current.String()))
+			for _, w := range waiters {
+				w.fulfill(current)
+			}
+			waiters = []*dhcpStatusReq{}
+
+		case r := <-c.reqC:
+			if current != nil || !r.wait {
+				r.fulfill(current)
+			} else {
+				waiters = append(waiters, r)
+			}
+		}
+	}
+}
+
+func (c *dhcpClient) status(ctx context.Context, wait bool) (*dhcpv4.DHCPv4, error) {
+	resC := make(chan *dhcpv4.DHCPv4)
+	c.reqC <- &dhcpStatusReq{
+		resC: resC,
+		wait: wait,
+	}
+	select {
+	case <-ctx.Done():
+		return nil, ctx.Err()
+	case r := <-resC:
+		return r, nil
+	}
+}
diff --git a/core/internal/network/main.go b/core/internal/network/main.go
index 04ab159..7c22249 100644
--- a/core/internal/network/main.go
+++ b/core/internal/network/main.go
@@ -21,13 +21,9 @@
 	"fmt"
 	"net"
 	"os"
-	"sync"
-	"time"
 
 	"git.monogon.dev/source/nexantic.git/core/internal/common/service"
 
-	"github.com/insomniacslk/dhcp/dhcpv4"
-	"github.com/insomniacslk/dhcp/dhcpv4/nclient4"
 	"github.com/vishvananda/netlink"
 	"go.uber.org/zap"
 	"golang.org/x/sys/unix"
@@ -40,11 +36,9 @@
 
 type Service struct {
 	*service.BaseService
-	config      Config
-	dhcp4Client *nclient4.Client
-	ip          *net.IP
-	ipNotify    chan struct{}
-	lock        sync.Mutex
+	config Config
+
+	dhcp *dhcpClient
 }
 
 type Config struct {
@@ -52,8 +46,8 @@
 
 func NewNetworkService(config Config, logger *zap.Logger) (*Service, error) {
 	s := &Service{
-		config:   config,
-		ipNotify: make(chan struct{}),
+		config: config,
+		dhcp:   newDHCPClient(logger),
 	}
 	s.BaseService = service.NewBaseService("network", logger, s)
 	return s, nil
@@ -91,79 +85,36 @@
 		Gw:    gw,
 		Scope: netlink.SCOPE_UNIVERSE,
 	}); err != nil {
-		return fmt.Errorf("Failed to add default route: %w", err)
+		return fmt.Errorf("failed to add default route: %w", err)
 	}
 	return nil
 }
 
-const (
-	stateInitialize = 1
-	stateSelect     = 2
-	stateBound      = 3
-	stateRenew      = 4
-	stateRebind     = 5
-)
+func (s *Service) useInterface(iface netlink.Link) error {
+	go s.dhcp.run(s.Context(), iface)
 
-var dhcpBroadcastAddr = &net.UDPAddr{IP: net.IP{255, 255, 255, 255}, Port: 67}
-
-// TODO(lorenz): This is a super terrible DHCP client, but it works for QEMU slirp
-func (s *Service) dhcpClient(iface netlink.Link) error {
-	client, err := nclient4.New(iface.Attrs().Name)
+	status, err := s.dhcp.status(s.Context(), true)
 	if err != nil {
-		panic(err)
-	}
-	var ack *dhcpv4.DHCPv4
-	for {
-		dhcpCtx, dhcpCtxCancel := context.WithTimeout(context.Background(), 10*time.Second)
-		defer dhcpCtxCancel()
-		_, ack, err = client.Request(dhcpCtx)
-		if err == nil {
-			break
-		}
-		s.Logger.Info("DHCP request failed", zap.Error(err))
-	}
-	s.Logger.Info("Network service got IP", zap.String("ip", ack.YourIPAddr.String()))
-	if err := setResolvconf(ack.DNS(), []string{}); err != nil {
-		s.Logger.Warn("Failed to set resolvconf", zap.Error(err))
+		return fmt.Errorf("could not get DHCP status: %v", err)
 	}
 
-	s.lock.Lock()
-	s.ip = &ack.YourIPAddr
-	s.lock.Unlock()
-loop:
-	for {
-		select {
-		case s.ipNotify <- struct{}{}:
-		default:
-			break loop
-		}
+	if err := setResolvconf(status.DNS(), []string{}); err != nil {
+		s.Logger.Warn("failed to set resolvconf", zap.Error(err))
 	}
 
-	if err := addNetworkRoutes(iface, net.IPNet{IP: ack.YourIPAddr, Mask: ack.SubnetMask()}, ack.GatewayIPAddr); err != nil {
-		s.Logger.Warn("Failed to add routes", zap.Error(err))
+	if err := addNetworkRoutes(iface, net.IPNet{IP: status.YourIPAddr, Mask: status.SubnetMask()}, status.GatewayIPAddr); err != nil {
+		s.Logger.Warn("failed to add routes", zap.Error(err))
 	}
 	return nil
 }
 
 // GetIP returns the current IP (and optionally waits for one to be assigned)
-func (s *Service) GetIP(wait bool) *net.IP {
-	s.lock.Lock()
-	if !wait {
-		ip := s.ip
-		s.lock.Unlock()
-		return ip
+func (s *Service) GetIP(ctx context.Context, wait bool) (*net.IP, error) {
+	status, err := s.dhcp.status(ctx, wait)
+	if err != nil {
+		return nil, err
 	}
-
-	for {
-		if s.ip != nil {
-			ip := s.ip
-			s.lock.Unlock()
-			return ip
-		}
-		s.lock.Unlock()
-		<-s.ipNotify
-		s.lock.Lock()
-	}
+	return &status.YourIPAddr, nil
 }
 
 func (s *Service) OnStart() error {
@@ -190,13 +141,14 @@
 			}
 		}
 	}
-	if len(ethernetLinks) == 1 {
-		link := ethernetLinks[0]
-		go s.dhcpClient(link)
-
-	} else {
-		s.Logger.Warn("Network service cannot yet handle more than one interface :(")
+	if len(ethernetLinks) != 1 {
+		s.Logger.Warn("Network service needs exactly one link, bailing")
+		return nil
 	}
+
+	link := ethernetLinks[0]
+	go s.useInterface(link)
+
 	return nil
 }
 
diff --git a/core/internal/node/main.go b/core/internal/node/main.go
index b1d74d6..b0674d2 100644
--- a/core/internal/node/main.go
+++ b/core/internal/node/main.go
@@ -18,6 +18,7 @@
 
 import (
 	"bytes"
+	"context"
 	"crypto/ed25519"
 	"crypto/rand"
 	"crypto/sha512"
@@ -31,9 +32,8 @@
 	"io/ioutil"
 	"math/big"
 	"net"
-	"time"
-
 	"os"
+	"time"
 
 	apipb "git.monogon.dev/source/nexantic.git/core/generated/api"
 	"git.monogon.dev/source/nexantic.git/core/internal/api"
@@ -43,12 +43,12 @@
 	"git.monogon.dev/source/nexantic.git/core/internal/kubernetes"
 	"git.monogon.dev/source/nexantic.git/core/internal/network"
 	"git.monogon.dev/source/nexantic.git/core/internal/storage"
-	"github.com/cenkalti/backoff/v4"
-	"google.golang.org/grpc"
-	"google.golang.org/grpc/credentials"
 
+	"github.com/cenkalti/backoff/v4"
 	"github.com/gogo/protobuf/proto"
 	"go.uber.org/zap"
+	"google.golang.org/grpc"
+	"google.golang.org/grpc/credentials"
 )
 
 var (
@@ -71,32 +71,22 @@
 	}
 )
 
-func NewSmalltownNode(logger *zap.Logger) (*SmalltownNode, error) {
+func NewSmalltownNode(logger *zap.Logger, ntwk *network.Service, strg *storage.Manager) (*SmalltownNode, error) {
 	flag.Parse()
 	logger.Info("Creating Smalltown node")
+	ctx := context.Background()
 
 	hostname, err := os.Hostname()
 	if err != nil {
 		panic(err)
 	}
 
-	networkService, err := network.NewNetworkService(network.Config{}, logger.With(zap.String("component", "network")))
+	// Wait for IP adddress...
+	ctxT, ctxTC := context.WithTimeout(ctx, time.Second*10)
+	defer ctxTC()
+	externalIP, err := ntwk.GetIP(ctxT, true)
 	if err != nil {
-		panic(err)
-	}
-
-	if err := networkService.Start(); err != nil {
-		logger.Panic("Failed to start network service", zap.Error(err))
-	}
-
-	storageManager, err := storage.Initialize(logger.With(zap.String("component", "storage")))
-	if err != nil {
-		logger.Error("Failed to initialize storage manager", zap.Error(err))
-		return nil, err
-	}
-	externalIP := networkService.GetIP(true)
-	if externalIP == nil {
-		logger.Panic("Waited for IP but didn't get one")
+		logger.Panic("Could not get IP address", zap.Error(err))
 	}
 
 	// Important to know if the GetIP above hangs
@@ -113,8 +103,8 @@
 
 	s := &SmalltownNode{
 		Consensus: consensusService,
-		Storage:   storageManager,
-		Network:   networkService,
+		Storage:   strg,
+		Network:   ntwk,
 		logger:    logger,
 		hostname:  hostname,
 	}
@@ -133,7 +123,7 @@
 	return s, nil
 }
 
-func (s *SmalltownNode) Start() error {
+func (s *SmalltownNode) Start(ctx context.Context) error {
 	s.logger.Info("Starting Smalltown node")
 
 	// TODO(lorenz): Abstracting enrolment sounds like a good idea, but ends up being painful
@@ -156,22 +146,22 @@
 		if len(enrolmentConfig.EnrolmentSecret) == 0 {
 			return s.startFull()
 		}
-		return s.startEnrolling()
+		return s.startEnrolling(ctx)
 	} else if os.IsNotExist(err) {
 		// This is ok like this, once a new cluster has been set up the initial node also generates
 		// its own enrolment config
-		return s.startForSetup()
+		return s.startForSetup(ctx)
 	}
 	// Unknown error reading enrolment config (disk issues/invalid configuration format/...)
 	s.logger.Panic("Invalid enrolment configuration provided", zap.Error(err))
 	panic("Unreachable")
 }
 
-func (s *SmalltownNode) startEnrolling() error {
+func (s *SmalltownNode) startEnrolling(ctx context.Context) error {
 	s.logger.Info("Initializing subsystems for enrolment")
 	s.state = common.StateEnrollMode
 
-	nodeInfo, nodeID, err := s.InitializeNode()
+	nodeInfo, nodeID, err := s.InitializeNode(ctx)
 	if err != nil {
 		return err
 	}
@@ -211,9 +201,9 @@
 	return nil
 }
 
-func (s *SmalltownNode) startForSetup() error {
+func (s *SmalltownNode) startForSetup(ctx context.Context) error {
 	s.logger.Info("Setting up a new cluster")
-	initData, nodeID, err := s.InitializeNode()
+	initData, nodeID, err := s.InitializeNode(ctx)
 	if err != nil {
 		return err
 	}
@@ -276,10 +266,14 @@
 		return err
 	}
 
+	ip, err := s.Network.GetIP(ctx, true)
+	if err != nil {
+		return fmt.Errorf("could not get node IP: %v", err)
+	}
 	enrolmentConfig := &apipb.EnrolmentConfig{
 		EnrolmentSecret: []byte{}, // First node is always already enrolled
 		MastersCert:     masterCert,
-		MasterIps:       [][]byte{[]byte(*s.Network.GetIP(true))},
+		MasterIps:       [][]byte{[]byte(*ip)},
 		NodeId:          nodeID,
 	}
 	enrolmentConfigRaw, err := proto.Marshal(enrolmentConfig)
diff --git a/core/internal/node/setup.go b/core/internal/node/setup.go
index 23eb24c..cbbfd4d 100644
--- a/core/internal/node/setup.go
+++ b/core/internal/node/setup.go
@@ -18,15 +18,15 @@
 
 import (
 	"context"
+	"errors"
+	"fmt"
 
-	"git.monogon.dev/source/nexantic.git/core/generated/api"
-	"git.monogon.dev/source/nexantic.git/core/internal/common"
+	"go.uber.org/zap"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
 
-	"errors"
-
-	"go.uber.org/zap"
+	"git.monogon.dev/source/nexantic.git/core/generated/api"
+	"git.monogon.dev/source/nexantic.git/core/internal/common"
 )
 
 var (
@@ -43,13 +43,16 @@
 
 // InitializeNode contains functionality that needs to be executed regardless of what the node does
 // later on
-func (s *SmalltownNode) InitializeNode() (*api.NewNodeInfo, string, error) {
+func (s *SmalltownNode) InitializeNode(ctx context.Context) (*api.NewNodeInfo, string, error) {
 	globalUnlockKey, err := s.Storage.InitializeData()
 	if err != nil {
 		return nil, "", err
 	}
 
-	nodeIP := s.Network.GetIP(true)
+	nodeIP, err := s.Network.GetIP(ctx, true)
+	if err != nil {
+		return nil, "", fmt.Errorf("could not get IP: %v", err)
+	}
 
 	nodeCert, nodeID, err := s.generateNodeID()
 	if err != nil {