blob: 9f9c851db76c9d2aafa766b7fb0a0475bbd08c78 [file] [log] [blame]
package kubernetes
import (
"context"
"fmt"
"net"
"source.monogon.dev/go/net/tinylb"
"source.monogon.dev/metropolis/node"
ipb "source.monogon.dev/metropolis/node/core/curator/proto/api"
"source.monogon.dev/metropolis/pkg/event/memory"
)
// updateLoadBalancerAPIServers provides a tinylb BackendSet memory value with
// the currently known nodes running a Kubernetes apiserver as retrieved from the
// given curator client.
func updateLoadbalancerAPIServers(ctx context.Context, val *memory.Value[tinylb.BackendSet], cur ipb.CuratorClient) error {
w, err := cur.Watch(ctx, &ipb.WatchRequest{
Kind: &ipb.WatchRequest_NodesInCluster_{
NodesInCluster: &ipb.WatchRequest_NodesInCluster{},
},
})
if err != nil {
return fmt.Errorf("watch failed: %w", err)
}
defer w.CloseSend()
set := &tinylb.BackendSet{}
val.Set(set.Clone())
for {
ev, err := w.Recv()
if err != nil {
return fmt.Errorf("receive failed: %w", err)
}
for _, n := range ev.Nodes {
if n.Status == nil || n.Status.ExternalAddress == "" {
set.Delete(n.Id)
continue
}
if n.Roles.KubernetesController == nil {
set.Delete(n.Id)
continue
}
set.Insert(n.Id, &tinylb.SimpleTCPBackend{Remote: net.JoinHostPort(n.Status.ExternalAddress, node.KubernetesAPIPort.PortString())})
}
for _, t := range ev.NodeTombstones {
set.Delete(t.NodeId)
}
val.Set(set.Clone())
}
}