From da199e577513861ec74e29e3f9d29e774681fde9 Mon Sep 17 00:00:00 2001 From: Jannik Date: Sun, 14 Nov 2021 19:02:52 +0100 Subject: [PATCH] use worker pool to limit webhooks to 10 concurrent http executions (#1510) (#1525) * refactor: use worker pool to limit webhooks to 10 concurrent http executions (#1510) * chore: try to please go linter --- core/core.go | 3 ++ core/webhooks/webhooks.go | 44 +------------------- core/webhooks/workerpool.go | 82 +++++++++++++++++++++++++++++++++++++ 3 files changed, 87 insertions(+), 42 deletions(-) create mode 100644 core/webhooks/workerpool.go diff --git a/core/core.go b/core/core.go index 08ef25591..5f1e37f56 100644 --- a/core/core.go +++ b/core/core.go @@ -14,6 +14,7 @@ import ( "github.com/owncast/owncast/core/rtmp" "github.com/owncast/owncast/core/transcoder" "github.com/owncast/owncast/core/user" + "github.com/owncast/owncast/core/webhooks" "github.com/owncast/owncast/models" "github.com/owncast/owncast/static" "github.com/owncast/owncast/utils" @@ -77,6 +78,8 @@ func Start() error { rtmpPort := data.GetRTMPPortNumber() log.Infof("RTMP is accepting inbound streams on port %d.", rtmpPort) + webhooks.InitWorkerPool() + return nil } diff --git a/core/webhooks/webhooks.go b/core/webhooks/webhooks.go index 6871f305a..3fa6ee67b 100644 --- a/core/webhooks/webhooks.go +++ b/core/webhooks/webhooks.go @@ -1,16 +1,10 @@ package webhooks import ( - "bytes" - "encoding/json" - "net/http" "time" - log "github.com/sirupsen/logrus" - - "github.com/owncast/owncast/core/user" - "github.com/owncast/owncast/core/data" + "github.com/owncast/owncast/core/user" "github.com/owncast/owncast/models" ) @@ -36,40 +30,6 @@ func SendEventToWebhooks(payload WebhookEvent) { webhooks := data.GetWebhooksForEvent(payload.Type) for _, webhook := range webhooks { - go func(webhook models.Webhook, payload WebhookEvent) { - log.Debugf("Event %s sent to Webhook %s", payload.Type, webhook.URL) - if err := sendWebhook(webhook, payload); err != nil { - log.Errorf("Event: %s failed to send to webhook: %s Error: %s", payload.Type, webhook.URL, err) - } - }(webhook, payload) + go addToQueue(webhook, payload) } } - -func sendWebhook(webhook models.Webhook, payload WebhookEvent) error { - jsonText, err := json.Marshal(payload) - if err != nil { - return err - } - - req, err := http.NewRequest("POST", webhook.URL, bytes.NewReader(jsonText)) - if err != nil { - return err - } - - req.Header.Set("Content-Type", "application/json") - - client := &http.Client{} - - resp, err := client.Do(req) - if err != nil { - return err - } - - defer resp.Body.Close() - - if err := data.SetWebhookAsUsed(webhook); err != nil { - log.Warnln(err) - } - - return nil -} diff --git a/core/webhooks/workerpool.go b/core/webhooks/workerpool.go new file mode 100644 index 000000000..cfca428c4 --- /dev/null +++ b/core/webhooks/workerpool.go @@ -0,0 +1,82 @@ +package webhooks + +import ( + "bytes" + "encoding/json" + "net/http" + + log "github.com/sirupsen/logrus" + + "github.com/owncast/owncast/core/data" + "github.com/owncast/owncast/models" +) + +const ( + // webhookWorkerPoolSize defines the number of concurrent HTTP webhook requests. + webhookWorkerPoolSize = 10 +) + +// Job struct bundling the webhook and the payload in one struct. +type Job struct { + webhook models.Webhook + payload WebhookEvent +} + +var queue chan Job + +// InitWorkerPool starts n go routines that await webhook jobs. +func InitWorkerPool() { + queue = make(chan Job) + + // start workers + for i := 1; i <= webhookWorkerPoolSize; i++ { + go worker(i, queue) + } +} + +func addToQueue(webhook models.Webhook, payload WebhookEvent) { + log.Tracef("Queued Event %s for Webhook %s", payload.Type, webhook.URL) + queue <- Job{webhook, payload} +} + +func worker(workerID int, queue <-chan Job) { + log.Debugf("Started Webhook worker %d", workerID) + + for job := range queue { + log.Debugf("Event %s sent to Webhook %s using worker %d", job.payload.Type, job.webhook.URL, workerID) + + if err := sendWebhook(job); err != nil { + log.Errorf("Event: %s failed to send to webhook: %s Error: %s", job.payload.Type, job.webhook.URL, err) + } + log.Tracef("Done with Event %s to Webhook %s using worker %d", job.payload.Type, job.webhook.URL, workerID) + } +} + +func sendWebhook(job Job) error { + jsonText, err := json.Marshal(job.payload) + if err != nil { + return err + } + + req, err := http.NewRequest("POST", job.webhook.URL, bytes.NewReader(jsonText)) + if err != nil { + return err + } + + req.Header.Set("Content-Type", "application/json") + + client := &http.Client{} + + resp, err := client.Do(req) + if err != nil { + return err + } + + defer resp.Body.Close() + + if err := data.SetWebhookAsUsed(job.webhook); err != nil { + log.Warnln(err) + } + + return nil +}