blob: 0aeefad0b97abf81b7cd7f1c708771cb524b59d1 [file] [log] [blame]
Serge Bazanski4abeb132022-10-11 11:32:19 +02001package server
2
3import (
4 "context"
5 "flag"
6 "fmt"
7 "net"
8 "os"
9
Serge Bazanski42f13462023-04-19 15:00:06 +020010 "github.com/cenkalti/backoff/v4"
Serge Bazanski4abeb132022-10-11 11:32:19 +020011 "google.golang.org/grpc"
12 "google.golang.org/grpc/reflection"
13 "k8s.io/klog/v2"
14
15 "source.monogon.dev/cloud/bmaas/bmdb"
Serge Bazanski77628312023-02-15 23:33:22 +010016 "source.monogon.dev/cloud/bmaas/bmdb/webug"
Serge Bazanski4abeb132022-10-11 11:32:19 +020017 apb "source.monogon.dev/cloud/bmaas/server/api"
18 "source.monogon.dev/cloud/lib/component"
19)
20
21type Config struct {
22 Component component.ComponentConfig
23 BMDB bmdb.BMDB
Serge Bazanski77628312023-02-15 23:33:22 +010024 Webug webug.Config
Serge Bazanski4abeb132022-10-11 11:32:19 +020025
26 // PublicListenAddress is the address at which the 'public' (agent-facing) gRPC
27 // server listener will run.
28 PublicListenAddress string
29}
30
31// TODO(q3k): factor this out to BMDB library?
32func runtimeInfo() string {
33 hostname, _ := os.Hostname()
34 if hostname == "" {
35 hostname = "UNKNOWN"
36 }
37 return fmt.Sprintf("host %s", hostname)
38}
39
40func (c *Config) RegisterFlags() {
41 c.Component.RegisterFlags("srv")
42 c.BMDB.ComponentName = "srv"
43 c.BMDB.RuntimeInfo = runtimeInfo()
44 c.BMDB.Database.RegisterFlags("bmdb")
Serge Bazanski77628312023-02-15 23:33:22 +010045 c.Webug.RegisterFlags()
Serge Bazanski4abeb132022-10-11 11:32:19 +020046
47 flag.StringVar(&c.PublicListenAddress, "srv_public_grpc_listen_address", ":8080", "Address to listen at for public/user gRPC connections for bmdbsrv")
48}
49
50type Server struct {
51 Config Config
52
53 // ListenGRPC will contain the address at which the internal gRPC server is
54 // listening after .Start() has been called. This can differ from the configured
55 // value if the configuration requests any port (via :0).
56 ListenGRPC string
57 // ListenPublic will contain the address at which the 'public' (agent-facing)
58 // gRPC server is lsitening after .Start() has been called.
59 ListenPublic string
60
61 bmdb *bmdb.Connection
62 acsvc *agentCallbackService
Serge Bazanski42f13462023-04-19 15:00:06 +020063
64 sessionC chan *bmdb.Session
65}
66
67// sessionWorker emits a valid BMDB session to sessionC as long as ctx is active.
68func (s *Server) sessionWorker(ctx context.Context) {
69 var session *bmdb.Session
70 for {
71 if session == nil || session.Expired() {
72 klog.Infof("Starting new session...")
73 bo := backoff.NewExponentialBackOff()
74 err := backoff.Retry(func() error {
75 var err error
76 session, err = s.bmdb.StartSession(ctx)
77 if err != nil {
78 klog.Errorf("Failed to start session: %v", err)
79 return err
80 } else {
81 return nil
82 }
83 }, backoff.WithContext(bo, ctx))
84 if err != nil {
85 // If something's really wrong just crash.
86 klog.Exitf("Gave up on starting session: %v", err)
87 }
88 klog.Infof("New session: %s", session.UUID)
89 }
90
91 select {
92 case <-ctx.Done():
93 return
94 case s.sessionC <- session:
95 }
96 }
97}
98
99func (s *Server) session(ctx context.Context) (*bmdb.Session, error) {
100 select {
101 case sess := <-s.sessionC:
102 return sess, nil
103 case <-ctx.Done():
104 return nil, ctx.Err()
105 }
Serge Bazanski4abeb132022-10-11 11:32:19 +0200106}
107
108func (s *Server) startPublic(ctx context.Context) {
109 g := grpc.NewServer(s.Config.Component.GRPCServerOptionsPublic()...)
110 lis, err := net.Listen("tcp", s.Config.PublicListenAddress)
111 if err != nil {
112 klog.Exitf("Could not listen: %v", err)
113 }
114 s.ListenPublic = lis.Addr().String()
115 apb.RegisterAgentCallbackServer(g, s.acsvc)
116 reflection.Register(g)
117
118 klog.Infof("Public API listening on %s", s.ListenPublic)
119 go func() {
120 err := g.Serve(lis)
121 if err != ctx.Err() {
122 klog.Exitf("Public gRPC serve failed: %v", err)
123 }
124 }()
125}
126
127func (s *Server) startInternalGRPC(ctx context.Context) {
128 g := grpc.NewServer(s.Config.Component.GRPCServerOptions()...)
129 lis, err := net.Listen("tcp", s.Config.Component.GRPCListenAddress)
130 if err != nil {
131 klog.Exitf("Could not listen: %v", err)
132 }
133 s.ListenGRPC = lis.Addr().String()
134
135 reflection.Register(g)
136 klog.Infof("Internal gRPC listening on %s", s.ListenGRPC)
137 go func() {
138 err := g.Serve(lis)
139 if err != ctx.Err() {
140 klog.Exitf("Internal gRPC serve failed: %v", err)
141 }
142 }()
143}
144
145// Start the BMaaS Server in background goroutines. This should only be called
146// once. The process will exit with debug logs if starting the server failed.
147func (s *Server) Start(ctx context.Context) {
148 conn, err := s.Config.BMDB.Open(true)
149 if err != nil {
150 klog.Exitf("Failed to connect to BMDB: %v", err)
151 }
152 s.acsvc = &agentCallbackService{
153 s: s,
154 }
155 s.bmdb = conn
Serge Bazanski42f13462023-04-19 15:00:06 +0200156 s.sessionC = make(chan *bmdb.Session)
157 go s.sessionWorker(ctx)
Serge Bazanski4abeb132022-10-11 11:32:19 +0200158 s.startInternalGRPC(ctx)
159 s.startPublic(ctx)
Serge Bazanski77628312023-02-15 23:33:22 +0100160 go func() {
161 if err := s.Config.Webug.Start(ctx, conn); err != nil && err != ctx.Err() {
162 klog.Exitf("Failed to start webug: %v", err)
163 }
164 }()
Serge Bazanski4abeb132022-10-11 11:32:19 +0200165}