From faae2505c05edbaec27a89d9964b41415694421f Mon Sep 17 00:00:00 2001 From: kim <89579420+NyaaaWhatsUpDoc@users.noreply.github.com> Date: Sun, 1 May 2022 15:45:15 +0100 Subject: [PATCH] Add logging to the new generic worker package (#516) * add logging to generic worker type --- internal/worker/workers.go | 27 ++++++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/internal/worker/workers.go b/internal/worker/workers.go index d3d6197ed..ac329f8f7 100644 --- a/internal/worker/workers.go +++ b/internal/worker/workers.go @@ -3,6 +3,7 @@ package worker import ( "context" "errors" + "reflect" "runtime" "codeberg.org/gruf/go-runners" @@ -13,6 +14,7 @@ import ( type Worker[MsgType any] struct { workers runners.WorkerPool process func(context.Context, MsgType) error + prefix string // contains type prefix for logging } // New returns a new Worker[MsgType] with given number of workers and queue size @@ -20,47 +22,66 @@ type Worker[MsgType any] struct { // defaults are determined from the runtime's GOMAXPROCS variable. func New[MsgType any](workers int, queue int) *Worker[MsgType] { if workers < 1 { + // ensure sensible workers workers = runtime.GOMAXPROCS(0) } if queue < 1 { + // ensure sensible queue queue = workers * 100 } - return &Worker[MsgType]{ + + w := &Worker[MsgType]{ workers: runners.NewWorkerPool(workers, queue), process: nil, + prefix: reflect.TypeOf(Worker[MsgType]{}).String(), //nolint } + + // Log new worker creation with type prefix + logrus.Infof("%s created with workers=%d queue=%d", w.prefix, workers, queue) + + return w } // Start will attempt to start the underlying worker pool, or return error. func (w *Worker[MsgType]) Start() error { + logrus.Info(w.prefix, "starting") + + // Check processor was set if w.process == nil { return errors.New("nil Worker.process function") } + + // Attempt to start pool if !w.workers.Start() { return errors.New("failed to start Worker pool") } + return nil } // Stop will attempt to stop the underlying worker pool, or return error. func (w *Worker[MsgType]) Stop() error { + logrus.Info(w.prefix, "stopping") + + // Attempt to stop pool if !w.workers.Stop() { return errors.New("failed to stop Worker pool") } + return nil } // SetProcessor will set the Worker's processor function, which is called for each queued message. func (w *Worker[MsgType]) SetProcessor(fn func(context.Context, MsgType) error) { if w.process != nil { - logrus.Panic("Worker.process is already set") + logrus.Panic(w.prefix, "Worker.process is already set") } w.process = fn } // Queue will queue provided message to be processed with there's a free worker. func (w *Worker[MsgType]) Queue(msg MsgType) { - logrus.Tracef("queueing %[1]T message; %+[1]v", msg) + logrus.Tracef("%s queueing message: %+v", w.prefix, msg) w.workers.Enqueue(func(ctx context.Context) { if err := w.process(ctx, msg); err != nil { logrus.Error(err)