blob: 9f9c851db76c9d2aafa766b7fb0a0475bbd08c78 [file] [log] [blame]
Serge Bazanski6fdca3f2023-03-20 17:47:07 +01001package kubernetes
2
3import (
4 "context"
5 "fmt"
6 "net"
7
8 "source.monogon.dev/go/net/tinylb"
9 "source.monogon.dev/metropolis/node"
10 ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
11 "source.monogon.dev/metropolis/pkg/event/memory"
12)
13
14// updateLoadBalancerAPIServers provides a tinylb BackendSet memory value with
15// the currently known nodes running a Kubernetes apiserver as retrieved from the
16// given curator client.
17func updateLoadbalancerAPIServers(ctx context.Context, val *memory.Value[tinylb.BackendSet], cur ipb.CuratorClient) error {
18 w, err := cur.Watch(ctx, &ipb.WatchRequest{
19 Kind: &ipb.WatchRequest_NodesInCluster_{
20 NodesInCluster: &ipb.WatchRequest_NodesInCluster{},
21 },
22 })
23 if err != nil {
24 return fmt.Errorf("watch failed: %w", err)
25 }
26 defer w.CloseSend()
27
28 set := &tinylb.BackendSet{}
29 val.Set(set.Clone())
30 for {
31 ev, err := w.Recv()
32 if err != nil {
33 return fmt.Errorf("receive failed: %w", err)
34 }
35
36 for _, n := range ev.Nodes {
37 if n.Status == nil || n.Status.ExternalAddress == "" {
38 set.Delete(n.Id)
39 continue
40 }
41 if n.Roles.KubernetesController == nil {
42 set.Delete(n.Id)
43 continue
44 }
45 set.Insert(n.Id, &tinylb.SimpleTCPBackend{Remote: net.JoinHostPort(n.Status.ExternalAddress, node.KubernetesAPIPort.PortString())})
46 }
47 for _, t := range ev.NodeTombstones {
48 set.Delete(t.NodeId)
49 }
50 val.Set(set.Clone())
51 }
52}