cloud: split shepherd up
Change-Id: I8e386d9eaaf17543743e1e8a37a8d71426910d59
Reviewed-on: https://review.monogon.dev/c/monogon/+/2213
Reviewed-by: Serge Bazanski <serge@monogon.tech>
Tested-by: Jenkins CI
diff --git a/cloud/shepherd/mini/BUILD.bazel b/cloud/shepherd/mini/BUILD.bazel
new file mode 100644
index 0000000..eb949ee
--- /dev/null
+++ b/cloud/shepherd/mini/BUILD.bazel
@@ -0,0 +1,48 @@
+load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library")
+load("@io_bazel_rules_docker//container:container.bzl", "container_image")
+load("//build/static_binary_tarball:def.bzl", "static_binary_tarball")
+
+go_library(
+ name = "mini_lib",
+ srcs = [
+ "main.go",
+ "provider.go",
+ "ssh.go",
+ ],
+ importpath = "source.monogon.dev/cloud/shepherd/mini",
+ visibility = ["//visibility:public"],
+ deps = [
+ "//cloud/bmaas/bmdb",
+ "//cloud/bmaas/bmdb/model",
+ "//cloud/bmaas/bmdb/webug",
+ "//cloud/lib/component",
+ "//cloud/shepherd",
+ "//cloud/shepherd/manager",
+ "//metropolis/cli/pkg/context",
+ "@io_k8s_klog_v2//:klog",
+ "@org_golang_x_crypto//ssh",
+ ],
+)
+
+go_binary(
+ name = "mini",
+ embed = [":mini_lib"],
+ visibility = ["//visibility:public"],
+)
+
+static_binary_tarball(
+ name = "mini_layer",
+ executable = ":mini",
+)
+
+container_image(
+ name = "mini_container",
+ base = "@go_image_base//image",
+ entrypoint = ["/app/cloud/shepherd/mini/mini_/mini"],
+ tars = [
+ ":mini_layer",
+ "//cloud/takeover:takeover_layer",
+ ],
+ visibility = ["//visibility:public"],
+ workdir = "/app",
+)
diff --git a/cloud/shepherd/mini/main.go b/cloud/shepherd/mini/main.go
new file mode 100644
index 0000000..67231c0
--- /dev/null
+++ b/cloud/shepherd/mini/main.go
@@ -0,0 +1,191 @@
+package main
+
+import (
+ "context"
+ "encoding/json"
+ "flag"
+ "fmt"
+ "io"
+ "net/http"
+ "net/url"
+ "os"
+ "strings"
+
+ "k8s.io/klog/v2"
+
+ "source.monogon.dev/cloud/bmaas/bmdb"
+ "source.monogon.dev/cloud/bmaas/bmdb/model"
+ "source.monogon.dev/cloud/bmaas/bmdb/webug"
+ "source.monogon.dev/cloud/lib/component"
+ "source.monogon.dev/cloud/shepherd"
+ "source.monogon.dev/cloud/shepherd/manager"
+ clicontext "source.monogon.dev/metropolis/cli/pkg/context"
+)
+
+type Config struct {
+ Component component.ComponentConfig
+ BMDB bmdb.BMDB
+ WebugConfig webug.Config
+
+ InitializerConfig manager.InitializerConfig
+ ProvisionerConfig manager.ProvisionerConfig
+ RecovererConfig manager.RecovererConfig
+
+ SSHConfig sshConfig
+ DeviceListSource string
+ ProviderType model.Provider
+}
+
+// TODO(q3k): factor this out to BMDB library?
+func runtimeInfo() string {
+ hostname, _ := os.Hostname()
+ if hostname == "" {
+ hostname = "UNKNOWN"
+ }
+ return fmt.Sprintf("host %s", hostname)
+}
+
+func (c *Config) RegisterFlags() {
+ c.Component.RegisterFlags("shepherd")
+ c.BMDB.ComponentName = "shepherd-mini"
+ c.BMDB.RuntimeInfo = runtimeInfo()
+ c.BMDB.Database.RegisterFlags("bmdb")
+ c.WebugConfig.RegisterFlags()
+
+ c.InitializerConfig.RegisterFlags()
+ c.ProvisionerConfig.RegisterFlags()
+ c.RecovererConfig.RegisterFlags()
+
+ c.SSHConfig.RegisterFlags()
+ flag.StringVar(&c.DeviceListSource, "mini_device_list_url", "", "The url from where to fetch the device list. For local paths use file:// as scheme")
+ flag.Func("mini_provider", "The provider this mini shepherd should emulate. Supported values are: lumen,equinix", func(s string) error {
+ switch s {
+ case strings.ToLower(string(model.ProviderEquinix)):
+ c.ProviderType = model.ProviderEquinix
+ case strings.ToLower(string(model.ProviderLumen)):
+ c.ProviderType = model.ProviderLumen
+ default:
+ return fmt.Errorf("invalid provider name")
+ }
+ return nil
+ })
+}
+
+type deviceList []machine
+
+func (dl deviceList) asMap() map[shepherd.ProviderID]machine {
+ mm := make(map[shepherd.ProviderID]machine)
+ for _, m := range dl {
+ mm[m.ProviderID] = m
+ }
+ return mm
+}
+
+func fetchDeviceList(s string) (deviceList, error) {
+ var r io.Reader
+ u, err := url.Parse(s)
+ if err != nil {
+ return nil, fmt.Errorf("failed parsing device list url: %v", err)
+ }
+
+ if u.Scheme != "file" {
+ resp, err := http.Get(u.String())
+ if err != nil {
+ return nil, err
+ }
+ defer resp.Body.Close()
+
+ if resp.StatusCode != http.StatusOK {
+ return nil, fmt.Errorf("invalid status code: %d != %v", http.StatusOK, resp.StatusCode)
+ }
+ r = resp.Body
+ } else {
+ f, err := os.Open(u.Path)
+ if err != nil {
+ return nil, err
+ }
+ defer f.Close()
+ r = f
+ }
+
+ var d deviceList
+ dec := json.NewDecoder(r)
+ dec.DisallowUnknownFields()
+ if err := dec.Decode(&d); err != nil {
+ return nil, err
+ }
+
+ klog.Infof("Fetched device list with %d entries", len(d))
+
+ return d, nil
+}
+
+func main() {
+ var c Config
+ c.RegisterFlags()
+
+ flag.Parse()
+ if flag.NArg() > 0 {
+ klog.Exitf("unexpected positional arguments: %v", flag.Args())
+ }
+
+ registry := c.Component.PrometheusRegistry()
+ c.BMDB.EnableMetrics(registry)
+
+ ctx := clicontext.WithInterrupt(context.Background())
+ c.Component.StartPrometheus(ctx)
+
+ conn, err := c.BMDB.Open(true)
+ if err != nil {
+ klog.Exitf("Failed to open BMDB connection: %v", err)
+ }
+
+ sshClient, err := c.SSHConfig.NewClient()
+ if err != nil {
+ klog.Exitf("Failed to create SSH client: %v", err)
+ }
+
+ if c.DeviceListSource == "" {
+ klog.Exitf("-mini_device_list_source must be set")
+ }
+
+ list, err := fetchDeviceList(c.DeviceListSource)
+ if err != nil {
+ klog.Exitf("Failed to fetch device list: %v", err)
+ }
+
+ mini := &provider{
+ providerType: c.ProviderType,
+ machines: list.asMap(),
+ }
+
+ provisioner, err := manager.NewProvisioner(mini, c.ProvisionerConfig)
+ if err != nil {
+ klog.Exitf("%v", err)
+ }
+
+ initializer, err := manager.NewInitializer(mini, sshClient, c.InitializerConfig)
+ if err != nil {
+ klog.Exitf("%v", err)
+ }
+
+ go func() {
+ err = provisioner.Run(ctx, conn)
+ if err != nil {
+ klog.Exit(err)
+ }
+ }()
+ go func() {
+ err = manager.RunControlLoop(ctx, conn, initializer)
+ if err != nil {
+ klog.Exit(err)
+ }
+ }()
+ go func() {
+ if err := c.WebugConfig.Start(ctx, conn); err != nil && err != ctx.Err() {
+ klog.Exitf("Failed to start webug: %v", err)
+ }
+ }()
+
+ <-ctx.Done()
+}
diff --git a/cloud/shepherd/mini/provider.go b/cloud/shepherd/mini/provider.go
new file mode 100644
index 0000000..05b628f
--- /dev/null
+++ b/cloud/shepherd/mini/provider.go
@@ -0,0 +1,126 @@
+package main
+
+import (
+ "context"
+ "database/sql"
+ "fmt"
+ "net/netip"
+
+ "k8s.io/klog/v2"
+
+ "source.monogon.dev/cloud/bmaas/bmdb"
+ "source.monogon.dev/cloud/bmaas/bmdb/model"
+ "source.monogon.dev/cloud/shepherd"
+)
+
+// provider represents a shepherd.Provider that works entirely on a
+// static device list. It requires a provider type and a device list.
+type provider struct {
+ providerType model.Provider
+ machines map[shepherd.ProviderID]machine
+}
+
+type machine struct {
+ ProviderID shepherd.ProviderID `json:"ID"`
+ Address netip.Addr `json:"Addr"`
+ Location string `json:"Location"`
+}
+
+func (d machine) ID() shepherd.ProviderID {
+ return d.ProviderID
+}
+
+func (d machine) Addr() netip.Addr {
+ return d.Address
+}
+
+func (d machine) State() shepherd.State {
+ return shepherd.StatePossiblyUsed
+}
+
+func (p *provider) ListMachines(ctx context.Context) ([]shepherd.Machine, error) {
+ machines := make([]shepherd.Machine, 0, len(p.machines))
+ for _, m := range p.machines {
+ machines = append(machines, m)
+ }
+
+ return machines, nil
+}
+
+func (p *provider) GetMachine(ctx context.Context, id shepherd.ProviderID) (shepherd.Machine, error) {
+ // If the provided machine is not inside our known machines,
+ // bail-out early as this is unsupported.
+ if _, ok := p.machines[id]; !ok {
+ return nil, fmt.Errorf("unknown provided machine requested")
+ }
+
+ return p.machines[id], nil
+}
+
+func (p *provider) CreateMachine(ctx context.Context, session *bmdb.Session, request shepherd.CreateMachineRequest) (shepherd.Machine, error) {
+ if request.UnusedMachine == nil {
+ return nil, fmt.Errorf("parameter UnusedMachine is missing")
+ }
+
+ //TODO: Do we just trust the implementation to be correct?
+ m, ok := request.UnusedMachine.(machine)
+ if !ok {
+ return nil, fmt.Errorf("invalid type for parameter UnusedMachine")
+ }
+
+ if err := p.assimilate(ctx, session, m); err != nil {
+ klog.Errorf("Failed to provision machine %s: %v", m.ProviderID, err)
+ return nil, err
+ }
+
+ return m, nil
+}
+
+func (p *provider) assimilate(ctx context.Context, sess *bmdb.Session, machine machine) error {
+ return sess.Transact(ctx, func(q *model.Queries) error {
+ // Create a new machine record within BMDB.
+ m, err := q.NewMachine(ctx)
+ if err != nil {
+ return fmt.Errorf("while creating a new machine record in BMDB: %w", err)
+ }
+
+ // Link the new machine with the device, and tag it "provided".
+ addParams := model.MachineAddProvidedParams{
+ MachineID: m.MachineID,
+ ProviderID: string(machine.ProviderID),
+ Provider: p.providerType,
+ }
+ klog.Infof("Setting \"provided\" tag (ID: %s, PID: %s, Provider: %s).", addParams.MachineID, addParams.ProviderID, addParams.Provider)
+ if err := q.MachineAddProvided(ctx, addParams); err != nil {
+ return fmt.Errorf("while tagging machine active: %w", err)
+ }
+
+ upParams := model.MachineUpdateProviderStatusParams{
+ ProviderID: string(machine.ProviderID),
+ Provider: p.providerType,
+ ProviderIpAddress: sql.NullString{
+ String: machine.Address.String(),
+ Valid: true,
+ },
+ ProviderLocation: sql.NullString{
+ String: machine.Location,
+ Valid: machine.Location != "",
+ },
+ ProviderStatus: model.NullProviderStatus{
+ ProviderStatus: model.ProviderStatusUnknown,
+ Valid: true,
+ },
+ }
+
+ klog.Infof("Setting \"provided\" tag status parameter (ID: %s, PID: %s, Provider: %s).", addParams.MachineID, upParams.ProviderID, upParams.Provider)
+ if err := q.MachineUpdateProviderStatus(ctx, upParams); err != nil {
+ return fmt.Errorf("while setting machine params: %w", err)
+ }
+
+ return nil
+ })
+}
+
+func (p *provider) Type() model.Provider {
+ return p.providerType
+}
diff --git a/cloud/shepherd/mini/ssh.go b/cloud/shepherd/mini/ssh.go
new file mode 100644
index 0000000..99f3e90
--- /dev/null
+++ b/cloud/shepherd/mini/ssh.go
@@ -0,0 +1,67 @@
+package main
+
+import (
+ "flag"
+ "fmt"
+
+ "golang.org/x/crypto/ssh"
+ "k8s.io/klog/v2"
+
+ "source.monogon.dev/cloud/shepherd/manager"
+)
+
+type sshConfig struct {
+ User string
+ Pass string
+ SSHKey manager.SSHKey
+}
+
+func (sc *sshConfig) check() error {
+ if sc.User == "" {
+ return fmt.Errorf("-ssh_user must be set")
+ }
+
+ if sc.Pass == "" && sc.SSHKey.KeyPersistPath == "" {
+ //TODO: The flag name -ssh_key_path could change, which would make this
+ // error very confusing.
+ return fmt.Errorf("-ssh_pass or -ssh_key_path must be set")
+ }
+
+ return nil
+}
+
+func (sc *sshConfig) RegisterFlags() {
+ flag.StringVar(&sc.User, "ssh_user", "", "SSH username to log into the machines")
+ flag.StringVar(&sc.Pass, "ssh_pass", "", "SSH password to log into the machines")
+ sc.SSHKey.RegisterFlags()
+}
+
+func (sc *sshConfig) NewClient() (*manager.PlainSSHClient, error) {
+ if err := sc.check(); err != nil {
+ return nil, err
+ }
+
+ c := manager.PlainSSHClient{
+ Username: sc.User,
+ }
+
+ switch {
+ case sc.Pass != "":
+ c.AuthMethod = ssh.Password(sc.Pass)
+ case sc.SSHKey.KeyPersistPath != "":
+ signer, err := sc.SSHKey.Signer()
+ if err != nil {
+ return nil, err
+ }
+
+ pubKey, err := sc.SSHKey.PublicKey()
+ if err != nil {
+ return nil, err
+ }
+
+ klog.Infof("Using ssh key auth with public key: %s", pubKey)
+
+ c.AuthMethod = ssh.PublicKeys(signer)
+ }
+ return &c, nil
+}