Merge remote-tracking branch 'origin/develop' into webv2

This commit is contained in:
Gabe Kangas 2022-10-12 16:52:05 -07:00
commit c844e98a19
No known key found for this signature in database
GPG key ID: 9A56337728BC81EA
15 changed files with 642 additions and 42 deletions

View file

@ -6,6 +6,11 @@ on:
- 'webroot/**'
- 'web/**'
pull_request:
paths-ignore:
- 'webroot/**'
- 'web/**'
jobs:
test:
runs-on: ubuntu-latest

5
.gitpod.yml Normal file
View file

@ -0,0 +1,5 @@
# Automatic workspace preparation for gitpod instances
tasks:
- init: sudo apt-get install ffmpeg -y && go get && go build ./... && go test ./...
command: go run .

View file

@ -44,7 +44,7 @@
</a>
</p>
Owncast is an open source, self-hosted, decentralized, single user live video streaming and chat server for running your own live streams similar in style to the large mainstream options. It offers complete ownership over your content, interface, moderation and audience. <a href="https://watch.owncast.online">Visit the demo</a> for an example.
Owncast is an open source, self-hosted, decentralized, single user live video streaming and chat server for running your own live streams similar in style to the large mainstream options. It offers complete ownership over your content, interface, moderation and audience. <a href="https://watch.owncast.online">Visit the demo</a> for an example.
<div>
<img alt="GitHub all releases" src="https://img.shields.io/github/downloads/owncast/owncast/total?style=for-the-badge">
@ -59,7 +59,6 @@ Owncast is an open source, self-hosted, decentralized, single user live video st
</a>
</div>
---
<!-- GETTING STARTED -->
@ -87,7 +86,9 @@ Owncast consists of two projects.
The Owncast backend is a service written in Go.
1. Ensure you have a c compiler installed.
1. Ensure you have pre-requisites installed.
- C compiler, such as [GCC compiler](https://gcc.gnu.org/install/download.html) or a [Musl-compatible compiler](https://musl.libc.org/)
- [ffmpeg](https://ffmpeg.org/download.html)
1. Install the [Go toolchain](https://golang.org/dl/) (1.16 or above).
1. Clone the repo. `git clone https://github.com/owncast/owncast`
1. `go run main.go` will run from source.
@ -100,8 +101,7 @@ The frontend is the web interface that includes the player, chat, embed componen
1. This project lives in the `web` directory.
1. Run `npm install` to install the Javascript dependencies.
1. Run `npm run dev`
1. Run `npm run dev`
## Contributing
@ -113,9 +113,6 @@ Weve been very lucky to have this so far, so maybe you can help us with your
There is a larger, more detailed, and more up-to-date [guide for helping contribute to Owncast on our website](https://owncast.online/help/).
<!-- LICENSE -->
## License

View file

@ -350,9 +350,9 @@
}
},
"caniuse-lite": {
"version": "1.0.30001412",
"resolved": "https://registry.npmjs.org/caniuse-lite/-/caniuse-lite-1.0.30001412.tgz",
"integrity": "sha512-+TeEIee1gS5bYOiuf+PS/kp2mrXic37Hl66VY6EAfxasIk5fELTktK2oOezYed12H8w7jt3s512PpulQidPjwA=="
"version": "1.0.30001418",
"resolved": "https://registry.npmjs.org/caniuse-lite/-/caniuse-lite-1.0.30001418.tgz",
"integrity": "sha512-oIs7+JL3K9JRQ3jPZjlH6qyYDp+nBTCais7hjh0s+fuBwufc7uZ7hPYMXrDOJhV360KGMTcczMRObk0/iMqZRg=="
},
"chalk": {
"version": "4.1.2",
@ -416,13 +416,13 @@
}
},
"cliui": {
"version": "7.0.4",
"resolved": "https://registry.npmjs.org/cliui/-/cliui-7.0.4.tgz",
"integrity": "sha512-OcRE68cOsVMXp1Yvonl/fzkQOyjLSu/8bhPDfQt0e0/Eb283TKP20Fs2MqoPsr9SwA595rRCA+QMzYc9nBP+JQ==",
"version": "8.0.1",
"resolved": "https://registry.npmjs.org/cliui/-/cliui-8.0.1.tgz",
"integrity": "sha512-BSeNnyus75C4//NQ9gQt1/csTXyo/8Sb+afLAkzAptFuMsod9HFokGNudZpi/oQV73hnVK+sR+5PVRMd+Dr7YQ==",
"dev": true,
"requires": {
"string-width": "^4.2.0",
"strip-ansi": "^6.0.0",
"strip-ansi": "^6.0.1",
"wrap-ansi": "^7.0.0"
}
},
@ -645,9 +645,9 @@
}
},
"electron-to-chromium": {
"version": "1.4.262",
"resolved": "https://registry.npmjs.org/electron-to-chromium/-/electron-to-chromium-1.4.262.tgz",
"integrity": "sha512-Ckn5haqmGh/xS8IbcgK3dnwAVnhDyo/WQnklWn6yaMucYTq7NNxwlGE8ElzEOnonzRLzUCo2Ot3vUb2GYUF2Hw=="
"version": "1.4.276",
"resolved": "https://registry.npmjs.org/electron-to-chromium/-/electron-to-chromium-1.4.276.tgz",
"integrity": "sha512-EpuHPqu8YhonqLBXHoU6hDJCD98FCe6KDoet3/gY1qsQ6usjJoHqBH2YIVs8FXaAtHwVL8Uqa/fsYao/vq9VWQ=="
},
"emoji-regex": {
"version": "8.0.0",
@ -2076,9 +2076,9 @@
"integrity": "sha512-rBJeI5CXAlmy1pV+617WB9J63U6XcazHHF2f2dbJix4XzpUF0RS3Zbj0FGIOCAva5P/d/GBOYaACQ1w+0azUkg=="
},
"update-browserslist-db": {
"version": "1.0.9",
"resolved": "https://registry.npmjs.org/update-browserslist-db/-/update-browserslist-db-1.0.9.tgz",
"integrity": "sha512-/xsqn21EGVdXI3EXSum1Yckj3ZVZugqyOZQ/CxYPBD/R+ko9NSUScf8tFF4dOKY+2pvSSJA/S+5B8s4Zr4kyvg==",
"version": "1.0.10",
"resolved": "https://registry.npmjs.org/update-browserslist-db/-/update-browserslist-db-1.0.10.tgz",
"integrity": "sha512-OztqDenkfFkbSG+tRxBeAnCVPckDBcvibKd35yDONx6OU8N7sqgwc7rCbkJ/WcYtVRZ4ba68d6byhC21GFh7sQ==",
"requires": {
"escalade": "^3.1.1",
"picocolors": "^1.0.0"
@ -2194,12 +2194,12 @@
"dev": true
},
"yargs": {
"version": "17.5.1",
"resolved": "https://registry.npmjs.org/yargs/-/yargs-17.5.1.tgz",
"integrity": "sha512-t6YAJcxDkNX7NFYiVtKvWUz8l+PaKTLiL63mJYWR2GnHq2gjEWISzsLp9wg3aY36dY1j+gfIEL3pIF+XlJJfbA==",
"version": "17.6.0",
"resolved": "https://registry.npmjs.org/yargs/-/yargs-17.6.0.tgz",
"integrity": "sha512-8H/wTDqlSwoSnScvV2N/JHfLWOKuh5MVla9hqLjK3nsfyy6Y4kDSYSvkU5YCUEPOSnRXfIyx3Sq+B/IWudTo4g==",
"dev": true,
"requires": {
"cliui": "^7.0.2",
"cliui": "^8.0.1",
"escalade": "^3.1.1",
"get-caller-file": "^2.0.5",
"require-directory": "^2.1.1",

View file

@ -37,7 +37,7 @@ func GetViewersOverTime(w http.ResponseWriter, r *http.Request) {
// GetActiveViewers returns currently connected clients.
func GetActiveViewers(w http.ResponseWriter, r *http.Request) {
c := core.GetActiveViewers()
viewers := []models.Viewer{}
viewers := make([]models.Viewer, 0, len(c))
for _, v := range c {
viewers = append(viewers, *v)
}

View file

@ -29,13 +29,7 @@ func SetMessagesVisibility(messageIDs []string, visibility bool) error {
return errors.New("error broadcasting message visibility payload " + err.Error())
}
// Send webhook
wh := webhooks.WebhookEvent{
EventData: payload,
Type: event.GetMessageType(),
}
webhooks.SendEventToWebhooks(wh)
webhooks.SendChatEventSetMessageVisibility(event)
return nil
}

View file

@ -42,3 +42,14 @@ func SendChatEventUserJoined(event events.UserJoinedEvent) {
SendEventToWebhooks(webhookEvent)
}
// SendChatEventSetMessageVisibility sends a webhook notifying that the visibility of one or more
// messages has changed.
func SendChatEventSetMessageVisibility(event events.SetMessageVisibilityEvent) {
webhookEvent := WebhookEvent{
Type: models.VisibiltyToggled,
EventData: event,
}
SendEventToWebhooks(webhookEvent)
}

185
core/webhooks/chat_test.go Normal file
View file

@ -0,0 +1,185 @@
package webhooks
import (
"testing"
"time"
"github.com/owncast/owncast/core/chat/events"
"github.com/owncast/owncast/core/user"
"github.com/owncast/owncast/models"
)
func TestSendChatEvent(t *testing.T) {
timestamp := time.Unix(72, 6).UTC()
user := user.User{
ID: "user id",
DisplayName: "display name",
DisplayColor: 4,
CreatedAt: time.Unix(3, 26).UTC(),
DisabledAt: nil,
PreviousNames: []string{"somebody"},
NameChangedAt: nil,
Scopes: []string{},
IsBot: false,
AuthenticatedAt: nil,
Authenticated: false,
}
checkPayload(t, models.MessageSent, func() {
SendChatEvent(&events.UserMessageEvent{
Event: events.Event{
Type: events.MessageSent,
ID: "id",
Timestamp: timestamp,
},
UserEvent: events.UserEvent{
User: &user,
ClientID: 51,
HiddenAt: nil,
},
MessageEvent: events.MessageEvent{
OutboundEvent: nil,
Body: "body",
RawBody: "raw body",
},
})
}, `{
"body": "body",
"clientId": 51,
"id": "id",
"rawBody": "raw body",
"timestamp": "1970-01-01T00:01:12.000000006Z",
"user": {
"authenticated": false,
"createdAt": "1970-01-01T00:00:03.000000026Z",
"displayColor": 4,
"displayName": "display name",
"id": "user id",
"isBot": false,
"previousNames": ["somebody"]
},
"visible": true
}`)
}
func TestSendChatEventUsernameChanged(t *testing.T) {
timestamp := time.Unix(72, 6).UTC()
user := user.User{
ID: "user id",
DisplayName: "display name",
DisplayColor: 4,
CreatedAt: time.Unix(3, 26).UTC(),
DisabledAt: nil,
PreviousNames: []string{"somebody"},
NameChangedAt: nil,
Scopes: []string{},
IsBot: false,
AuthenticatedAt: nil,
Authenticated: false,
}
checkPayload(t, models.UserNameChanged, func() {
SendChatEventUsernameChanged(events.NameChangeEvent{
Event: events.Event{
Type: events.UserNameChanged,
ID: "id",
Timestamp: timestamp,
},
UserEvent: events.UserEvent{
User: &user,
ClientID: 51,
HiddenAt: nil,
},
NewName: "new name",
})
}, `{
"clientId": 51,
"id": "id",
"newName": "new name",
"timestamp": "1970-01-01T00:01:12.000000006Z",
"type": "NAME_CHANGE",
"user": {
"authenticated": false,
"createdAt": "1970-01-01T00:00:03.000000026Z",
"displayColor": 4,
"displayName": "display name",
"id": "user id",
"isBot": false,
"previousNames": ["somebody"]
}
}`)
}
func TestSendChatEventUserJoined(t *testing.T) {
timestamp := time.Unix(72, 6).UTC()
user := user.User{
ID: "user id",
DisplayName: "display name",
DisplayColor: 4,
CreatedAt: time.Unix(3, 26).UTC(),
DisabledAt: nil,
PreviousNames: []string{"somebody"},
NameChangedAt: nil,
Scopes: []string{},
IsBot: false,
AuthenticatedAt: nil,
Authenticated: false,
}
checkPayload(t, models.UserJoined, func() {
SendChatEventUserJoined(events.UserJoinedEvent{
Event: events.Event{
Type: events.UserJoined,
ID: "id",
Timestamp: timestamp,
},
UserEvent: events.UserEvent{
User: &user,
ClientID: 51,
HiddenAt: nil,
},
})
}, `{
"clientId": 51,
"id": "id",
"type": "USER_JOINED",
"timestamp": "1970-01-01T00:01:12.000000006Z",
"user": {
"authenticated": false,
"createdAt": "1970-01-01T00:00:03.000000026Z",
"displayColor": 4,
"displayName": "display name",
"id": "user id",
"isBot": false,
"previousNames": ["somebody"]
}
}`)
}
func TestSendChatEventSetMessageVisibility(t *testing.T) {
timestamp := time.Unix(72, 6).UTC()
checkPayload(t, models.VisibiltyToggled, func() {
SendChatEventSetMessageVisibility(events.SetMessageVisibilityEvent{
Event: events.Event{
Type: events.VisibiltyUpdate,
ID: "id",
Timestamp: timestamp,
},
UserMessageEvent: events.UserMessageEvent{},
MessageIDs: []string{"message1", "message2"},
Visible: false,
})
}, `{
"MessageIDs": [
"message1",
"message2"
],
"Visible": false,
"body": "",
"id": "id",
"timestamp": "1970-01-01T00:01:12.000000006Z",
"type": "VISIBILITY-UPDATE",
"user": null
}`)
}

View file

@ -10,14 +10,18 @@ import (
// SendStreamStatusEvent will send all webhook destinations the current stream status.
func SendStreamStatusEvent(eventType models.EventType) {
sendStreamStatusEvent(eventType, shortid.MustGenerate(), time.Now())
}
func sendStreamStatusEvent(eventType models.EventType, id string, timestamp time.Time) {
SendEventToWebhooks(WebhookEvent{
Type: eventType,
EventData: map[string]interface{}{
"id": shortid.MustGenerate(),
"id": id,
"name": data.GetServerName(),
"summary": data.GetServerSummary(),
"streamTitle": data.GetStreamTitle(),
"timestamp": time.Now(),
"timestamp": timestamp,
},
})
}

View file

@ -0,0 +1,26 @@
package webhooks
import (
"testing"
"time"
"github.com/owncast/owncast/core/chat/events"
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/models"
)
func TestSendStreamStatusEvent(t *testing.T) {
data.SetServerName("my server")
data.SetServerSummary("my server where I stream")
data.SetStreamTitle("my stream")
checkPayload(t, models.StreamStarted, func() {
sendStreamStatusEvent(events.StreamStarted, "id", time.Unix(72, 6).UTC())
}, `{
"id": "id",
"name": "my server",
"streamTitle": "my stream",
"summary": "my server where I stream",
"timestamp": "1970-01-01T00:01:12.000000006Z"
}`)
}

View file

@ -1,6 +1,7 @@
package webhooks
import (
"sync"
"time"
"github.com/owncast/owncast/core/data"
@ -27,9 +28,17 @@ type WebhookChatMessage struct {
// SendEventToWebhooks will send a single webhook event to all webhook destinations.
func SendEventToWebhooks(payload WebhookEvent) {
sendEventToWebhooks(payload, nil)
}
func sendEventToWebhooks(payload WebhookEvent, wg *sync.WaitGroup) {
webhooks := data.GetWebhooksForEvent(payload.Type)
for _, webhook := range webhooks {
go addToQueue(webhook, payload)
// Use wg to track the number of notifications to be sent.
if wg != nil {
wg.Add(1)
}
addToQueue(webhook, payload, wg)
}
}

View file

@ -0,0 +1,347 @@
package webhooks
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"os"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/models"
jsonpatch "gopkg.in/evanphx/json-patch.v5"
)
func TestMain(m *testing.M) {
dbFile, err := os.CreateTemp(os.TempDir(), "owncast-test-db.db")
if err != nil {
panic(err)
}
dbFile.Close()
defer os.Remove(dbFile.Name())
if err := data.SetupPersistence(dbFile.Name()); err != nil {
panic(err)
}
InitWorkerPool()
defer close(queue)
m.Run()
}
// Because the other tests use `sendEventToWebhooks` with a `WaitGroup` to know when the test completes,
// this test ensures that `SendToWebhooks` without a `WaitGroup` doesn't panic.
func TestPublicSend(t *testing.T) {
// Send enough events to be sure at least one worker delivers a second event.
const eventsCount = webhookWorkerPoolSize + 1
var wg sync.WaitGroup
wg.Add(eventsCount)
svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
wg.Done()
}))
defer svr.Close()
hook, err := data.InsertWebhook(svr.URL, []models.EventType{models.MessageSent})
if err != nil {
t.Fatal(err)
}
defer func() {
if err := data.DeleteWebhook(hook); err != nil {
t.Error(err)
}
}()
for i := 0; i < eventsCount; i++ {
wh := WebhookEvent{
EventData: struct{}{},
Type: models.MessageSent,
}
SendEventToWebhooks(wh)
}
wg.Wait()
}
// Make sure that events are only sent to interested endpoints.
func TestRouting(t *testing.T) {
eventTypes := []models.EventType{models.PING, models.PONG}
calls := map[models.EventType]int{}
var lock sync.Mutex
svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if len(r.URL.Path) < 1 || r.URL.Path[0] != '/' {
t.Fatalf("Got unexpected path %v", r.URL.Path)
}
pathType := r.URL.Path[1:]
var body WebhookEvent
if err := json.NewDecoder(r.Body).Decode(&body); err != nil {
t.Fatal(err)
}
if body.Type != pathType {
t.Fatalf("Got %v payload on %v endpoint", body.Type, pathType)
}
lock.Lock()
defer lock.Unlock()
calls[pathType] += 1
}))
defer svr.Close()
for _, eventType := range eventTypes {
hook, err := data.InsertWebhook(svr.URL+"/"+eventType, []models.EventType{eventType})
if err != nil {
t.Fatal(err)
}
defer func() {
if err := data.DeleteWebhook(hook); err != nil {
t.Error(err)
}
}()
}
var wg sync.WaitGroup
for _, eventType := range eventTypes {
wh := WebhookEvent{
EventData: struct{}{},
Type: eventType,
}
sendEventToWebhooks(wh, &wg)
}
wg.Wait()
for _, eventType := range eventTypes {
if calls[eventType] != 1 {
t.Errorf("Expected %v to be called exactly once but it was called %v times", eventType, calls[eventType])
}
}
}
// Make sure that events are sent to all interested endpoints.
func TestMultiple(t *testing.T) {
const times = 2
var calls uint32
svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
atomic.AddUint32(&calls, 1)
}))
defer svr.Close()
for i := 0; i < times; i++ {
hook, err := data.InsertWebhook(fmt.Sprintf("%v/%v", svr.URL, i), []models.EventType{models.MessageSent})
if err != nil {
t.Fatal(err)
}
defer func() {
if err := data.DeleteWebhook(hook); err != nil {
t.Error(err)
}
}()
}
var wg sync.WaitGroup
wh := WebhookEvent{
EventData: struct{}{},
Type: models.MessageSent,
}
sendEventToWebhooks(wh, &wg)
wg.Wait()
if atomic.LoadUint32(&calls) != times {
t.Errorf("Expected event to be sent exactly %v times but it was sent %v times", times, atomic.LoadUint32(&calls))
}
}
// Make sure when a webhook is used its last used timestamp is updated.
func TestTimestamps(t *testing.T) {
const tolerance = time.Second
start := time.Now()
eventTypes := []models.EventType{models.PING, models.PONG}
handlerIds := []int{0, 0}
handlers := []*models.Webhook{nil, nil}
svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
}))
defer svr.Close()
for i, eventType := range eventTypes {
hook, err := data.InsertWebhook(svr.URL+"/"+eventType, []models.EventType{eventType})
if err != nil {
t.Fatal(err)
}
handlerIds[i] = hook
defer func() {
if err := data.DeleteWebhook(hook); err != nil {
t.Error(err)
}
}()
}
var wg sync.WaitGroup
wh := WebhookEvent{
EventData: struct{}{},
Type: eventTypes[0],
}
sendEventToWebhooks(wh, &wg)
wg.Wait()
hooks, err := data.GetWebhooks()
if err != nil {
t.Fatal(err)
}
for h, hook := range hooks {
for i, handlerId := range handlerIds {
if hook.ID == handlerId {
handlers[i] = &hooks[h]
}
}
}
if handlers[0] == nil {
t.Fatal("First handler was not found in registered handlers")
}
if handlers[1] == nil {
t.Fatal("Second handler was not found in registered handlers")
}
end := time.Now()
if handlers[0].Timestamp.Add(tolerance).Before(start) {
t.Errorf("First handler timestamp %v should not be before start of test %v", handlers[0].Timestamp, start)
}
if handlers[0].Timestamp.Add(tolerance).Before(handlers[1].Timestamp) {
t.Errorf("Second handler timestamp %v should not be before first handler timestamp %v", handlers[1].Timestamp, handlers[0].Timestamp)
}
if end.Add(tolerance).Before(handlers[1].Timestamp) {
t.Errorf("End of test %v should not be before second handler timestamp %v", end, handlers[1].Timestamp)
}
if handlers[0].LastUsed == nil {
t.Error("First handler last used should have been set")
} else if handlers[0].LastUsed.Add(tolerance).Before(handlers[1].Timestamp) {
t.Errorf("First handler last used %v should not be before second handler timestamp %v", handlers[0].LastUsed, handlers[1].Timestamp)
} else if end.Add(tolerance).Before(*handlers[0].LastUsed) {
t.Errorf("End of test %v should not be before first handler last used %v", end, handlers[0].LastUsed)
}
if handlers[1].LastUsed != nil {
t.Error("Second handler last used should not have been set")
}
}
// Make sure up to the expected number of events can be fired in parallel.
func TestParallel(t *testing.T) {
var calls uint32
var wgStart sync.WaitGroup
finished := make(chan int)
wgStart.Add(webhookWorkerPoolSize)
svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
myId := atomic.AddUint32(&calls, 1)
// We made it to the pool size + 1 event, so we're done with the test.
if myId == webhookWorkerPoolSize+1 {
close(finished)
return
}
// Wait until all the handlers are started.
wgStart.Done()
wgStart.Wait()
// The first handler just returns so the pool size + 1 event can be handled.
if myId != 1 {
// The other handlers will wait for pool size + 1.
_ = <-finished
}
}))
defer svr.Close()
hook, err := data.InsertWebhook(svr.URL, []models.EventType{models.MessageSent})
if err != nil {
t.Fatal(err)
}
defer func() {
if err := data.DeleteWebhook(hook); err != nil {
t.Error(err)
}
}()
var wgMessages sync.WaitGroup
for i := 0; i < webhookWorkerPoolSize+1; i++ {
wh := WebhookEvent{
EventData: struct{}{},
Type: models.MessageSent,
}
sendEventToWebhooks(wh, &wgMessages)
}
wgMessages.Wait()
}
// Send an event, capture it, and verify that it has the expected payload.
func checkPayload(t *testing.T, eventType models.EventType, send func(), expectedJson string) {
eventChannel := make(chan WebhookEvent)
// Set up a server.
svr := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
data := WebhookEvent{}
json.NewDecoder(r.Body).Decode(&data)
eventChannel <- data
}))
defer svr.Close()
// Subscribe to the webhook.
hook, err := data.InsertWebhook(svr.URL, []models.EventType{eventType})
if err != nil {
t.Fatal(err)
}
defer func() {
if err := data.DeleteWebhook(hook); err != nil {
t.Error(err)
}
}()
// Send and capture the event.
send()
event := <-eventChannel
if event.Type != eventType {
t.Errorf("Got event type %v but expected %v", event.Type, eventType)
}
// Compare.
payloadJson, err := json.MarshalIndent(event.EventData, "", " ")
if err != nil {
t.Fatal(err)
}
t.Logf("Actual payload:\n%s", payloadJson)
if !jsonpatch.Equal(payloadJson, []byte(expectedJson)) {
diff, err := jsonpatch.CreateMergePatch(payloadJson, []byte(expectedJson))
if err != nil {
t.Fatal(err)
}
var out bytes.Buffer
if err := json.Indent(&out, diff, "", " "); err != nil {
t.Fatal(err)
}
t.Errorf("Expected difference from actual payload:\n%s", out.Bytes())
}
}

View file

@ -4,6 +4,7 @@ import (
"bytes"
"encoding/json"
"net/http"
"sync"
log "github.com/sirupsen/logrus"
@ -20,6 +21,7 @@ const (
type Job struct {
webhook models.Webhook
payload WebhookEvent
wg *sync.WaitGroup
}
var queue chan Job
@ -34,9 +36,9 @@ func InitWorkerPool() {
}
}
func addToQueue(webhook models.Webhook, payload WebhookEvent) {
func addToQueue(webhook models.Webhook, payload WebhookEvent, wg *sync.WaitGroup) {
log.Tracef("Queued Event %s for Webhook %s", payload.Type, webhook.URL)
queue <- Job{webhook, payload}
queue <- Job{webhook, payload, wg}
}
func worker(workerID int, queue <-chan Job) {
@ -49,6 +51,9 @@ func worker(workerID int, queue <-chan Job) {
log.Errorf("Event: %s failed to send to webhook: %s Error: %s", job.payload.Type, job.webhook.URL, err)
}
log.Tracef("Done with Event %s to Webhook %s using worker %d", job.payload.Type, job.webhook.URL, workerID)
if job.wg != nil {
job.wg.Done()
}
}
}

9
go.mod
View file

@ -4,7 +4,7 @@ go 1.17
require (
github.com/amalfra/etag v1.0.0
github.com/aws/aws-sdk-go v1.44.109
github.com/aws/aws-sdk-go v1.44.110
github.com/go-fed/activity v1.0.1-0.20210803212804-d866ba75dd0f
github.com/go-fed/httpsig v1.1.0
github.com/go-ole/go-ole v1.2.6 // indirect
@ -12,7 +12,7 @@ require (
github.com/grafov/m3u8 v0.11.1
github.com/lestrrat-go/file-rotatelogs v2.4.0+incompatible
github.com/mattn/go-sqlite3 v1.14.15
github.com/microcosm-cc/bluemonday v1.0.20
github.com/microcosm-cc/bluemonday v1.0.21
github.com/mssola/user_agent v0.5.3
github.com/nareix/joy5 v0.0.0-20210317075623-2c912ca30590
github.com/oschwald/geoip2-golang v1.8.0
@ -35,7 +35,7 @@ require (
github.com/tklauser/numcpus v0.4.0 // indirect
github.com/yusufpapurcu/wmi v1.2.2 // indirect
golang.org/x/crypto v0.0.0-20211215153901-e495a2d5b3d3 // indirect
golang.org/x/net v0.0.0-20220927171203-f486391704dc
golang.org/x/net v0.0.0-20221002022538-bcab6841153b
golang.org/x/sys v0.0.0-20220804214406-8e32c043e418 // indirect
)
@ -53,11 +53,12 @@ require (
github.com/prometheus/procfs v0.8.0 // indirect
golang.org/x/text v0.3.7 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/evanphx/json-patch.v5 v5.6.0 // indirect
)
require (
github.com/nakabonne/tstorage v0.3.5
github.com/shirou/gopsutil/v3 v3.22.8
github.com/shirou/gopsutil/v3 v3.22.9
)
require github.com/SherClockHolmes/webpush-go v1.2.0

11
go.sum
View file

@ -74,6 +74,8 @@ github.com/aws/aws-sdk-go v1.44.107 h1:VP7Rq3wzsOV7wrfHqjAAKRksD4We58PaoVSDPKhm8
github.com/aws/aws-sdk-go v1.44.107/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo=
github.com/aws/aws-sdk-go v1.44.109 h1:+Na5JPeS0kiEHoBp5Umcuuf+IDqXqD0lXnM920E31YI=
github.com/aws/aws-sdk-go v1.44.109/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo=
github.com/aws/aws-sdk-go v1.44.110 h1:unno3l2FYQo6p0wYCp9gUk8YNzhOxqSktM0Y1vukl9k=
github.com/aws/aws-sdk-go v1.44.110/go.mod h1:y4AeaBuwd2Lk+GepC1E9v0qOiTws0MIWAX4oIKwKHZo=
github.com/aymerick/douceur v0.2.0 h1:Mv+mAeH1Q+n9Fr+oyamOlAkUNPWPlA8PPGR0QAaYuPk=
github.com/aymerick/douceur v0.2.0/go.mod h1:wlT5vV2O3h55X9m7iVYN0TBM0NH/MmbLnd30/FjWUq4=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
@ -167,6 +169,7 @@ github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/go-cmp v0.5.7 h1:81/ik6ipDQS2aGcBfIN5dHDB36BwrStyeAQquSYCV4o=
github.com/google/go-cmp v0.5.7/go.mod h1:n+brtR0CgQNWTVd5ZUFpTBC8YFBDLK/h/bpaJ8/DtOE=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
@ -236,6 +239,8 @@ github.com/microcosm-cc/bluemonday v1.0.19 h1:OI7hoF5FY4pFz2VA//RN8TfM0YJ2dJcl4P
github.com/microcosm-cc/bluemonday v1.0.19/go.mod h1:QNzV2UbLK2/53oIIwTOyLUSABMkjZ4tqiyC1g/DyqxE=
github.com/microcosm-cc/bluemonday v1.0.20 h1:flpzsq4KU3QIYAYGV/szUat7H+GPOXR0B2JU5A1Wp8Y=
github.com/microcosm-cc/bluemonday v1.0.20/go.mod h1:yfBmMi8mxvaZut3Yytv+jTXRY8mxyjJ0/kQBTElld50=
github.com/microcosm-cc/bluemonday v1.0.21 h1:dNH3e4PSyE4vNX+KlRGHT5KrSvjeUkoNPwEORjffHJg=
github.com/microcosm-cc/bluemonday v1.0.21/go.mod h1:ytNkv4RrDrLJ2pqlsSI46O6IVXmZOBBD4SaJyDwwTkM=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
@ -312,6 +317,8 @@ github.com/shirou/gopsutil/v3 v3.22.7 h1:flKnuCMfUUrO+oAvwAd6GKZgnPzr098VA/UJ14n
github.com/shirou/gopsutil/v3 v3.22.7/go.mod h1:s648gW4IywYzUfE/KjXxUsqrqx/T2xO5VqOXxONeRfI=
github.com/shirou/gopsutil/v3 v3.22.8 h1:a4s3hXogo5mE2PfdfJIonDbstO/P+9JszdfhAHSzD9Y=
github.com/shirou/gopsutil/v3 v3.22.8/go.mod h1:s648gW4IywYzUfE/KjXxUsqrqx/T2xO5VqOXxONeRfI=
github.com/shirou/gopsutil/v3 v3.22.9 h1:yibtJhIVEMcdw+tCTbOPiF1VcsuDeTE4utJ8Dm4c5eA=
github.com/shirou/gopsutil/v3 v3.22.9/go.mod h1:bBYl1kjgEJpWpxeHmLI+dVHWtyAwfcmSBLDsp2TNT8A=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88=
@ -461,6 +468,8 @@ golang.org/x/net v0.0.0-20220909164309-bea034e7d591 h1:D0B/7al0LLrVC8aWF4+oxpv/m
golang.org/x/net v0.0.0-20220909164309-bea034e7d591/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
golang.org/x/net v0.0.0-20220927171203-f486391704dc h1:FxpXZdoBqT8RjqTy6i1E8nXHhW21wK7ptQ/EPIGxzPQ=
golang.org/x/net v0.0.0-20220927171203-f486391704dc/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
golang.org/x/net v0.0.0-20221002022538-bcab6841153b h1:6e93nYa3hNqAvLr0pD4PN1fFS+gKzp2zAXqrnTCstqU=
golang.org/x/net v0.0.0-20221002022538-bcab6841153b/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@ -680,6 +689,8 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/evanphx/json-patch.v5 v5.6.0 h1:BMT6KIwBD9CaU91PJCZIe46bDmBWa9ynTQgJIOpfQBk=
gopkg.in/evanphx/json-patch.v5 v5.6.0/go.mod h1:/kvTRh1TVm5wuM6OkHxqXtE/1nUZZpihg29RtuIyfvk=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=