2021-11-14 21:02:52 +03:00
|
|
|
package webhooks
|
|
|
|
|
|
|
|
import (
|
|
|
|
"bytes"
|
|
|
|
"encoding/json"
|
|
|
|
"net/http"
|
2023-07-19 06:26:44 +03:00
|
|
|
"runtime"
|
2022-10-10 08:55:54 +03:00
|
|
|
"sync"
|
2021-11-14 21:02:52 +03:00
|
|
|
|
|
|
|
log "github.com/sirupsen/logrus"
|
|
|
|
|
|
|
|
"github.com/owncast/owncast/core/data"
|
|
|
|
"github.com/owncast/owncast/models"
|
|
|
|
)
|
|
|
|
|
2023-07-19 06:26:44 +03:00
|
|
|
// webhookWorkerPoolSize defines the number of concurrent HTTP webhook requests.
|
|
|
|
var webhookWorkerPoolSize = runtime.GOMAXPROCS(0)
|
2021-11-14 21:02:52 +03:00
|
|
|
|
|
|
|
// Job struct bundling the webhook and the payload in one struct.
|
|
|
|
type Job struct {
|
2022-10-10 08:55:54 +03:00
|
|
|
wg *sync.WaitGroup
|
2023-10-09 00:22:28 +03:00
|
|
|
payload WebhookEvent
|
|
|
|
webhook models.Webhook
|
2021-11-14 21:02:52 +03:00
|
|
|
}
|
|
|
|
|
2023-05-30 21:32:05 +03:00
|
|
|
var (
|
|
|
|
queue chan Job
|
|
|
|
getStatus func() models.Status
|
|
|
|
)
|
|
|
|
|
|
|
|
// SetupWebhooks initializes the webhook worker pool and sets the function to get the current status.
|
|
|
|
func SetupWebhooks(getStatusFunc func() models.Status) {
|
|
|
|
getStatus = getStatusFunc
|
|
|
|
initWorkerPool()
|
|
|
|
}
|
2021-11-14 21:02:52 +03:00
|
|
|
|
2023-05-30 21:32:05 +03:00
|
|
|
// initWorkerPool starts n go routines that await webhook jobs.
|
|
|
|
func initWorkerPool() {
|
2021-11-14 21:02:52 +03:00
|
|
|
queue = make(chan Job)
|
|
|
|
|
|
|
|
// start workers
|
|
|
|
for i := 1; i <= webhookWorkerPoolSize; i++ {
|
|
|
|
go worker(i, queue)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2022-10-10 08:55:54 +03:00
|
|
|
func addToQueue(webhook models.Webhook, payload WebhookEvent, wg *sync.WaitGroup) {
|
2021-11-14 21:02:52 +03:00
|
|
|
log.Tracef("Queued Event %s for Webhook %s", payload.Type, webhook.URL)
|
2023-10-09 00:22:28 +03:00
|
|
|
queue <- Job{wg, payload, webhook}
|
2021-11-14 21:02:52 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
func worker(workerID int, queue <-chan Job) {
|
|
|
|
log.Debugf("Started Webhook worker %d", workerID)
|
|
|
|
|
|
|
|
for job := range queue {
|
|
|
|
log.Debugf("Event %s sent to Webhook %s using worker %d", job.payload.Type, job.webhook.URL, workerID)
|
|
|
|
|
|
|
|
if err := sendWebhook(job); err != nil {
|
|
|
|
log.Errorf("Event: %s failed to send to webhook: %s Error: %s", job.payload.Type, job.webhook.URL, err)
|
|
|
|
}
|
|
|
|
log.Tracef("Done with Event %s to Webhook %s using worker %d", job.payload.Type, job.webhook.URL, workerID)
|
2022-10-10 08:55:54 +03:00
|
|
|
if job.wg != nil {
|
|
|
|
job.wg.Done()
|
|
|
|
}
|
2021-11-14 21:02:52 +03:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
func sendWebhook(job Job) error {
|
|
|
|
jsonText, err := json.Marshal(job.payload)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
req, err := http.NewRequest("POST", job.webhook.URL, bytes.NewReader(jsonText))
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
req.Header.Set("Content-Type", "application/json")
|
|
|
|
|
|
|
|
client := &http.Client{}
|
|
|
|
|
|
|
|
resp, err := client.Do(req)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
defer resp.Body.Close()
|
|
|
|
|
|
|
|
if err := data.SetWebhookAsUsed(job.webhook); err != nil {
|
|
|
|
log.Warnln(err)
|
|
|
|
}
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|