next: upd more

This commit is contained in:
Ainar Garipov 2024-11-12 19:58:56 +03:00
parent d729aa150f
commit 586b0eb180
4 changed files with 51 additions and 33 deletions

View file

@ -49,6 +49,9 @@ func Main(embeddedFrontend fs.FS) {
frontend, err := frontendFromOpts(ctx, baseLogger, opts, embeddedFrontend) frontend, err := frontendFromOpts(ctx, baseLogger, opts, embeddedFrontend)
errors.Check(err) errors.Check(err)
startCtx, startCancel := context.WithTimeout(ctx, defaultTimeoutStart)
defer startCancel()
confMgrConf := &configmgr.Config{ confMgrConf := &configmgr.Config{
BaseLogger: baseLogger, BaseLogger: baseLogger,
Logger: baseLogger.With(slogutil.KeyPrefix, "configmgr"), Logger: baseLogger.With(slogutil.KeyPrefix, "configmgr"),
@ -58,15 +61,15 @@ func Main(embeddedFrontend fs.FS) {
FileName: opts.confFile, FileName: opts.confFile,
} }
confMgr, err := newConfigMgr(confMgrConf) confMgr, err := configmgr.New(startCtx, confMgrConf)
errors.Check(err) errors.Check(err)
web := confMgr.Web() web := confMgr.Web()
err = web.Start(ctx) err = web.Start(startCtx)
errors.Check(err) errors.Check(err)
dns := confMgr.DNS() dns := confMgr.DNS()
err = dns.Start(ctx) err = dns.Start(startCtx)
errors.Check(err) errors.Check(err)
sigHdlr := newSignalHandler( sigHdlr := newSignalHandler(
@ -80,21 +83,16 @@ func Main(embeddedFrontend fs.FS) {
os.Exit(sigHdlr.handle(ctx)) os.Exit(sigHdlr.handle(ctx))
} }
// defaultTimeout is the timeout used for some operations where another timeout // Default timeouts.
// hasn't been defined yet. //
const defaultTimeout = 5 * time.Second // TODO(a.garipov): Make configurable.
const (
// ctxWithDefaultTimeout is a helper function that returns a context with defaultTimeoutStart = 1 * time.Minute
// timeout set to defaultTimeout. defaultTimeoutShutdown = 5 * time.Second
func ctxWithDefaultTimeout() (ctx context.Context, cancel context.CancelFunc) { )
return context.WithTimeout(context.Background(), defaultTimeout)
}
// newConfigMgr returns a new configuration manager using defaultTimeout as the // newConfigMgr returns a new configuration manager using defaultTimeout as the
// context timeout. // context timeout.
func newConfigMgr(c *configmgr.Config) (m *configmgr.Manager, err error) { func newConfigMgr(ctx context.Context, c *configmgr.Config) (m *configmgr.Manager, err error) {
ctx, cancel := ctxWithDefaultTimeout()
defer cancel()
return configmgr.New(ctx, c) return configmgr.New(ctx, c)
} }

View file

@ -109,7 +109,10 @@ func (h *signalHandler) reconfigure(ctx context.Context) (err error) {
var errs []error var errs []error
confMgr, err := newConfigMgr(h.confMgrConf) ctx, cancel := context.WithTimeout(ctx, defaultTimeoutStart)
defer cancel()
confMgr, err := newConfigMgr(ctx, h.confMgrConf)
if err != nil { if err != nil {
errs = append(errs, fmt.Errorf("configuration manager: %w", err)) errs = append(errs, fmt.Errorf("configuration manager: %w", err))
} }
@ -142,7 +145,7 @@ func (h *signalHandler) reconfigure(ctx context.Context) (err error) {
// shutdown gracefully shuts down all services. // shutdown gracefully shuts down all services.
func (h *signalHandler) shutdown(ctx context.Context) (status int) { func (h *signalHandler) shutdown(ctx context.Context) (status int) {
ctx, cancel := context.WithTimeout(ctx, defaultTimeout) ctx, cancel := context.WithTimeout(ctx, h.shutdownTimeout)
defer cancel() defer cancel()
status = osutil.ExitCodeSuccess status = osutil.ExitCodeSuccess
@ -173,7 +176,7 @@ func newSignalHandler(
signal: make(chan os.Signal, 1), signal: make(chan os.Signal, 1),
pidFile: pidFile, pidFile: pidFile,
services: svcs, services: svcs,
shutdownTimeout: defaultTimeout, shutdownTimeout: defaultTimeoutShutdown,
} }
notifier := osutil.DefaultSignalNotifier{} notifier := osutil.DefaultSignalNotifier{}

View file

@ -37,6 +37,9 @@ type server struct {
initialAddr netip.AddrPort initialAddr netip.AddrPort
} }
// loggerKeyServer is the key used by [server] to identify itself.
const loggerKeyServer = "server"
// newServer returns a *server that is ready to serve HTTP queries. The TCP // newServer returns a *server that is ready to serve HTTP queries. The TCP
// listener is not started. handler must not be nil. // listener is not started. handler must not be nil.
func newServer( func newServer(
@ -55,7 +58,7 @@ func newServer(
u.Scheme = urlutil.SchemeHTTPS u.Scheme = urlutil.SchemeHTTPS
} }
logger := baseLogger.With("server", u) logger := baseLogger.With(loggerKeyServer, u)
return &server{ return &server{
mu: &sync.Mutex{}, mu: &sync.Mutex{},
@ -96,11 +99,9 @@ func (s *server) localAddr() (addr net.Addr) {
func (s *server) serve(ctx context.Context, baseLogger *slog.Logger) { func (s *server) serve(ctx context.Context, baseLogger *slog.Logger) {
l, err := net.ListenTCP("tcp", net.TCPAddrFromAddrPort(s.initialAddr)) l, err := net.ListenTCP("tcp", net.TCPAddrFromAddrPort(s.initialAddr))
if err != nil { if err != nil {
err = fmt.Errorf("listening tcp: %w", err)
s.logger.ErrorContext(ctx, "listening tcp", slogutil.KeyError, err) s.logger.ErrorContext(ctx, "listening tcp", slogutil.KeyError, err)
panic(fmt.Errorf("websvc: %s", err)) panic(fmt.Errorf("websvc: listening tcp: %w", err))
} }
func() { func() {
@ -111,7 +112,7 @@ func (s *server) serve(ctx context.Context, baseLogger *slog.Logger) {
// Reassign the address in case the port was zero. // Reassign the address in case the port was zero.
s.url.Host = l.Addr().String() s.url.Host = l.Addr().String()
s.logger = baseLogger.With("server", s.url) s.logger = baseLogger.With(loggerKeyServer, s.url)
s.http.ErrorLog = slog.NewLogLogger(s.logger.Handler(), slog.LevelError) s.http.ErrorLog = slog.NewLogLogger(s.logger.Handler(), slog.LevelError)
}() }()
@ -123,9 +124,9 @@ func (s *server) serve(ctx context.Context, baseLogger *slog.Logger) {
return return
} }
err = fmt.Errorf("serving: %w", err) s.logger.ErrorContext(ctx, "serving", slogutil.KeyError, err)
s.logger.ErrorContext(ctx, "serve failed", slogutil.KeyError, err)
panic(fmt.Errorf("websvc: %s", err)) panic(fmt.Errorf("websvc: serving: %w", err))
} }
// shutdown shuts s down. // shutdown shuts s down.

View file

@ -163,22 +163,38 @@ func (svc *Service) Start(ctx context.Context) (err error) {
go svc.pprof.serve(ctx, svc.logger) go svc.pprof.serve(ctx, svc.logger)
} }
started := false return svc.wait(ctx)
for !started { }
// wait waits until either the context is canceled or all servers have started.
func (svc *Service) wait(ctx context.Context) (err error) {
for !svc.serversHaveStarted() {
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
default: default:
started = true // Wait and let the other goroutines do their job.
for _, srv := range svc.servers { runtime.Gosched()
started = started && srv.localAddr() != nil
}
} }
} }
return nil return nil
} }
// serversHaveStarted returns true if all servers have started serving.
func (svc *Service) serversHaveStarted() (started bool) {
started = len(svc.servers) != 0
for _, srv := range svc.servers {
started = started && srv.localAddr() != nil
}
if svc.pprof != nil {
started = started && svc.pprof.localAddr() != nil
}
return started
}
// Shutdown implements the [agh.Service] interface for *Service. svc may be // Shutdown implements the [agh.Service] interface for *Service. svc may be
// nil. // nil.
func (svc *Service) Shutdown(ctx context.Context) (err error) { func (svc *Service) Shutdown(ctx context.Context) (err error) {