This commit is contained in:
Gabe Kangas 2023-07-21 22:25:59 -07:00
parent 32b1dbeaf3
commit b80ccc4966
No known key found for this signature in database
GPG key ID: 4345B2060657F330
170 changed files with 2573 additions and 2026 deletions

View file

@ -1,61 +0,0 @@
package activitypub
import (
"net/http"
"github.com/owncast/owncast/activitypub/crypto"
"github.com/owncast/owncast/activitypub/inbox"
"github.com/owncast/owncast/activitypub/outbox"
"github.com/owncast/owncast/activitypub/persistence"
"github.com/owncast/owncast/activitypub/workerpool"
"github.com/owncast/owncast/storage/configrepository"
"github.com/owncast/owncast/storage/data"
"github.com/owncast/owncast/models"
log "github.com/sirupsen/logrus"
)
var configRepository = configrepository.Get()
// Start will initialize and start the federation support.
func Start(datastore *data.Store, router *http.ServeMux) {
persistence.Setup(datastore)
workerpool.InitOutboundWorkerPool()
inbox.InitInboxWorkerPool()
StartRouter(router)
// Generate the keys for signing federated activity if needed.
if configRepository.GetPrivateKey() == "" {
privateKey, publicKey, err := crypto.GenerateKeys()
_ = configRepository.SetPrivateKey(string(privateKey))
_ = configRepository.SetPublicKey(string(publicKey))
if err != nil {
log.Errorln("Unable to get private key", err)
}
}
}
// SendLive will send a "Go Live" message to followers.
func SendLive() error {
return outbox.SendLive()
}
// SendPublicFederatedMessage will send an arbitrary provided message to followers.
func SendPublicFederatedMessage(message string) error {
return outbox.SendPublicMessage(message)
}
// SendDirectFederatedMessage will send a direct message to a single account.
func SendDirectFederatedMessage(message, account string) error {
return outbox.SendDirectMessageToAccount(message, account)
}
// GetFollowerCount will return the local tracked follower count.
func GetFollowerCount() (int64, error) {
return persistence.GetFollowerCount()
}
// GetPendingFollowRequests will return the pending follow requests.
func GetPendingFollowRequests() ([]models.Follower, error) {
return persistence.GetPendingFollowRequests()
}

View file

@ -1,64 +0,0 @@
package inbox
import (
"fmt"
"github.com/go-fed/activity/streams/vocab"
"github.com/owncast/owncast/activitypub/resolvers"
"github.com/owncast/owncast/core/chat"
"github.com/owncast/owncast/core/chat/events"
"github.com/owncast/owncast/storage/configrepository"
)
var configRepository = configrepository.Get()
func handleEngagementActivity(eventType events.EventType, isLiveNotification bool, actorReference vocab.ActivityStreamsActorProperty, action string) error {
// Do nothing if displaying engagement actions has been turned off.
if !configRepository.GetFederationShowEngagement() {
return nil
}
// Do nothing if chat is disabled
if configRepository.GetChatDisabled() {
return nil
}
// Get actor of the action
actor, _ := resolvers.GetResolvedActorFromActorProperty(actorReference)
// Send chat message
actorName := actor.Name
if actorName == "" {
actorName = actor.Username
}
actorIRI := actorReference.Begin().GetIRI().String()
userPrefix := fmt.Sprintf("%s ", actorName)
var suffix string
if isLiveNotification && action == events.FediverseEngagementLike {
suffix = "liked that this stream went live."
} else if action == events.FediverseEngagementLike {
suffix = fmt.Sprintf("liked a post from %s.", configRepository.GetServerName())
} else if isLiveNotification && action == events.FediverseEngagementRepost {
suffix = "shared this stream with their followers."
} else if action == events.FediverseEngagementRepost {
suffix = fmt.Sprintf("shared a post from %s.", configRepository.GetServerName())
} else if action == events.FediverseEngagementFollow {
suffix = "followed this stream."
} else {
return fmt.Errorf("could not handle event for sending to chat: %s", action)
}
body := fmt.Sprintf("%s %s", userPrefix, suffix)
var image *string
if actor.Image != nil {
s := actor.Image.String()
image = &s
}
if err := chat.SendFediverseAction(eventType, actor.FullUsername, image, body, actorIRI); err != nil {
return err
}
return nil
}

View file

@ -1,25 +0,0 @@
package inbox
import (
"context"
"github.com/go-fed/activity/streams/vocab"
"github.com/owncast/owncast/activitypub/persistence"
"github.com/owncast/owncast/activitypub/resolvers"
log "github.com/sirupsen/logrus"
)
func handleUpdateRequest(c context.Context, activity vocab.ActivityStreamsUpdate) error {
// We only care about update events to followers.
if !activity.GetActivityStreamsObject().At(0).IsActivityStreamsPerson() {
return nil
}
actor, err := resolvers.GetResolvedActorFromActorProperty(activity.GetActivityStreamsActor())
if err != nil {
log.Errorln(err)
return err
}
return persistence.UpdateFollower(actor.ActorIri.String(), actor.Inbox.String(), actor.Name, actor.FullUsername, actor.Image.String())
}

View file

@ -1,35 +0,0 @@
package activitypub
import (
"net/http"
"github.com/owncast/owncast/activitypub/controllers"
"github.com/owncast/owncast/webserver/middleware"
)
// StartRouter will start the federation specific http router.
func StartRouter(router *http.ServeMux) {
// WebFinger
router.HandleFunc("/.well-known/webfinger", controllers.WebfingerHandler)
// Host Metadata
router.HandleFunc("/.well-known/host-meta", controllers.HostMetaController)
// Nodeinfo v1
router.HandleFunc("/.well-known/nodeinfo", controllers.NodeInfoController)
// x-nodeinfo v2
router.HandleFunc("/.well-known/x-nodeinfo2", controllers.XNodeInfo2Controller)
// Nodeinfo v2
router.HandleFunc("/nodeinfo/2.0", controllers.NodeInfoV2Controller)
// Instance details
router.HandleFunc("/api/v1/instance", controllers.InstanceV1Controller)
// Single ActivityPub Actor
router.HandleFunc("/federation/user/", middleware.RequireActivityPubOrRedirect(controllers.ActorHandler))
// Single AP object
router.HandleFunc("/federation/", middleware.RequireActivityPubOrRedirect(controllers.ObjectHandler))
}

48
cmd/application.go Normal file
View file

@ -0,0 +1,48 @@
package cmd
import (
"github.com/owncast/owncast/services/config"
"github.com/owncast/owncast/services/metrics"
"github.com/owncast/owncast/storage/configrepository"
log "github.com/sirupsen/logrus"
)
type Application struct {
configservice *config.Config
metricsservice *metrics.Metrics
configRepository *configrepository.SqlConfigRepository
maximumConcurrentConnectionLimit int64
}
/*
The order of this setup matters.
- Parse flags
- Set the session runtime values
- Use the session values to configure data persistence
*/
func (app *Application) Start() {
app.configservice = config.Get()
app.parseFlags()
app.configureLogging(*enableDebugOptions, *enableVerboseLogging, app.configservice.LogDirectory)
app.showStartupMessage()
app.setSessionConfig()
app.createDirectories()
app.maximumConcurrentConnectionLimit = getMaximumConcurrentConnectionLimit()
setSystemConcurrentConnectionLimit(app.maximumConcurrentConnectionLimit)
// If we're restoring a backup, do that and exit.
if *restoreDatabaseFile != "" {
app.handleRestoreBackup(restoreDatabaseFile)
log.Exit(0)
}
if *backupDirectory != "" {
app.configservice.BackupDirectory = *backupDirectory
}
app.startServices()
}

20
cmd/backuprestore.go Normal file
View file

@ -0,0 +1,20 @@
package cmd
import (
"github.com/owncast/owncast/utils"
log "github.com/sirupsen/logrus"
)
func (app *Application) handleRestoreBackup(restoreDatabaseFile *string) {
// Allows a user to restore a specific database backup
databaseFile := app.configservice.DatabaseFilePath
if *dbFile != "" {
databaseFile = *dbFile
}
if err := utils.Restore(*restoreDatabaseFile, databaseFile); err != nil {
log.Fatalln(err)
}
log.Println("Database has been restored. Restart Owncast.")
}

View file

@ -2,7 +2,7 @@
//go:build !freebsd && !windows
// +build !freebsd,!windows
package chat
package cmd
import (
"syscall"
@ -24,3 +24,15 @@ func setSystemConcurrentConnectionLimit(limit int64) {
log.Traceln("Max process connection count changed from system limit of", originalLimit, "to", limit)
}
func getMaximumConcurrentConnectionLimit() int64 {
var rLimit syscall.Rlimit
if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rLimit); err != nil {
log.Fatalln(err)
}
// Return the limit to 70% of max so the machine doesn't die even if it's maxed out for some reason.
proposedLimit := int64(float32(rLimit.Max) * 0.7)
return proposedLimit
}

View file

@ -1,7 +1,7 @@
//go:build freebsd
// +build freebsd
package chat
package cmd
import (
"syscall"

View file

@ -1,6 +1,6 @@
//go:build windows
// +build windows
package chat
package cmd
func setSystemConcurrentConnectionLimit(limit int64) {}

71
cmd/config.go Normal file
View file

@ -0,0 +1,71 @@
package cmd
import (
"strconv"
"github.com/owncast/owncast/storage/configrepository"
log "github.com/sirupsen/logrus"
)
func (app *Application) setSessionConfig() {
// Stream key
if *newStreamKey != "" {
log.Println("Temporary stream key is set for this session.")
app.configservice.TemporaryStreamKey = *newStreamKey
}
app.configservice.EnableDebugFeatures = *enableDebugOptions
if *dbFile != "" {
app.configservice.DatabaseFilePath = *dbFile
}
if *logDirectory != "" {
app.configservice.LogDirectory = *logDirectory
}
}
func (app *Application) saveUpdatedConfig() {
configRepository := configrepository.Get()
if *newAdminPassword != "" {
if err := configRepository.SetAdminPassword(*newAdminPassword); err != nil {
log.Errorln("Error setting your admin password.", err)
log.Exit(1)
} else {
log.Infoln("Admin password changed")
}
}
// Set the web server port
if *webServerPortOverride != "" {
portNumber, err := strconv.Atoi(*webServerPortOverride)
if err != nil {
log.Warnln(err)
return
}
log.Println("Saving new web server port number to", portNumber)
if err := configRepository.SetHTTPPortNumber(float64(portNumber)); err != nil {
log.Errorln(err)
}
}
app.configservice.WebServerPort = configRepository.GetHTTPPortNumber()
// Set the web server ip
if *webServerIPOverride != "" {
log.Println("Saving new web server listen IP address to", *webServerIPOverride)
if err := configRepository.SetHTTPListenAddress(*webServerIPOverride); err != nil {
log.Errorln(err)
}
}
app.configservice.WebServerIP = configRepository.GetHTTPListenAddress()
// Set the rtmp server port
if *rtmpPortOverride > 0 {
log.Println("Saving new RTMP server port number to", *rtmpPortOverride)
if err := configRepository.SetRTMPPortNumber(float64(*rtmpPortOverride)); err != nil {
log.Errorln(err)
}
}
}

7
cmd/console.go Normal file
View file

@ -0,0 +1,7 @@
package cmd
import log "github.com/sirupsen/logrus"
func (app *Application) showStartupMessage() {
log.Infoln(app.configservice.GetReleaseString())
}

4
cmd/data.go Normal file
View file

@ -0,0 +1,4 @@
package cmd
func initializeData() {
}

23
cmd/flags.go Normal file
View file

@ -0,0 +1,23 @@
package cmd
import (
"flag"
)
var (
dbFile = flag.String("database", "", "Path to the database file.")
logDirectory = flag.String("logdir", "", "Directory where logs will be written to")
backupDirectory = flag.String("backupdir", "", "Directory where backups will be written to")
enableDebugOptions = flag.Bool("enableDebugFeatures", false, "Enable additional debugging options.")
enableVerboseLogging = flag.Bool("enableVerboseLogging", false, "Enable additional logging.")
restoreDatabaseFile = flag.String("restoreDatabase", "", "Restore an Owncast database backup")
newAdminPassword = flag.String("adminpassword", "", "Set your admin password")
newStreamKey = flag.String("streamkey", "", "Set a temporary stream key for this session")
webServerPortOverride = flag.String("webserverport", "", "Force the web server to listen on a specific port")
webServerIPOverride = flag.String("webserverip", "", "Force web server to listen on this IP address")
rtmpPortOverride = flag.Int("rtmpport", 0, "Set listen port for the RTMP server")
)
func (app *Application) parseFlags() {
flag.Parse()
}

4
cmd/services.go Normal file
View file

@ -0,0 +1,4 @@
package cmd
func (app *Application) startServices() {
}

162
cmd/setup.go Normal file
View file

@ -0,0 +1,162 @@
package cmd
import (
"fmt"
"io"
"io/fs"
"os"
"path"
"path/filepath"
"github.com/owncast/owncast/logging"
"github.com/owncast/owncast/services/config"
"github.com/owncast/owncast/static"
"github.com/owncast/owncast/utils"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)
func (app *Application) createDirectories() {
// Create the data directory if needed
if !utils.DoesFileExists("data") {
if err := os.Mkdir("./data", 0o700); err != nil {
log.Fatalln("Cannot create data directory", err)
}
}
// Recreate the temp dir
if utils.DoesFileExists(app.configservice.TempDir) {
err := os.RemoveAll(app.configservice.TempDir)
if err != nil {
log.Fatalln("Unable to remove temp dir! Check permissions.", app.configservice.TempDir, err)
}
}
if err := os.Mkdir(app.configservice.TempDir, 0o700); err != nil {
log.Fatalln("Unable to create temp dir!", err)
}
}
func (app *Application) configureLogging(enableDebugFeatures bool, enableVerboseLogging bool, logDirectory string) {
logging.Setup(enableDebugFeatures, enableVerboseLogging, logDirectory)
log.SetFormatter(&log.TextFormatter{
FullTimestamp: true,
})
}
// setupEmojiDirectory sets up the custom emoji directory by copying all built-in
// emojis if the directory does not yet exist.
func (app *Application) setupEmojiDirectory() (err error) {
type emojiDirectory struct {
path string
isDir bool
}
// Migrate old (pre 0.1.0) emoji to new location if they exist.
app.migrateCustomEmojiLocations()
if utils.DoesFileExists(app.configservice.CustomEmojiPath) {
return nil
}
if err = os.MkdirAll(app.configservice.CustomEmojiPath, 0o750); err != nil {
return fmt.Errorf("unable to create custom emoji directory: %w", err)
}
staticFS := static.GetEmoji()
files := []emojiDirectory{}
walkFunction := func(path string, d os.DirEntry, err error) error {
if path == "." {
return nil
}
if d.Name() == "LICENSE.md" {
return nil
}
files = append(files, emojiDirectory{path: path, isDir: d.IsDir()})
return nil
}
if err := fs.WalkDir(staticFS, ".", walkFunction); err != nil {
log.Errorln("unable to fetch emojis: " + err.Error())
return errors.Wrap(err, "unable to fetch embedded emoji files")
}
if err != nil {
return fmt.Errorf("unable to read built-in emoji files: %w", err)
}
// Now copy all built-in emojis to the custom emoji directory
for _, path := range files {
emojiPath := filepath.Join(app.configservice.CustomEmojiPath, path.path)
if path.isDir {
if err := os.Mkdir(emojiPath, 0o700); err != nil {
return errors.Wrap(err, "unable to create emoji directory, check permissions?: "+path.path)
}
continue
}
memFile, staticOpenErr := staticFS.Open(path.path)
if staticOpenErr != nil {
return errors.Wrap(staticOpenErr, "unable to open emoji file from embedded filesystem")
}
// nolint:gosec
diskFile, err := os.Create(emojiPath)
if err != nil {
return fmt.Errorf("unable to create custom emoji file on disk: %w", err)
}
if err != nil {
_ = diskFile.Close()
return fmt.Errorf("unable to open built-in emoji file: %w", err)
}
if _, err = io.Copy(diskFile, memFile); err != nil {
_ = diskFile.Close()
_ = os.Remove(emojiPath)
return fmt.Errorf("unable to copy built-in emoji file to disk: %w", err)
}
if err = diskFile.Close(); err != nil {
_ = os.Remove(emojiPath)
return fmt.Errorf("unable to close custom emoji file on disk: %w", err)
}
}
return nil
}
// MigrateCustomEmojiLocations migrates custom emoji from the old location to the new location.
func (app *Application) migrateCustomEmojiLocations() {
oldLocation := path.Join("webroot", "img", "emoji")
newLocation := path.Join("data", "emoji")
if !utils.DoesFileExists(oldLocation) {
return
}
log.Println("Moving custom emoji directory from", oldLocation, "to", newLocation)
if err := utils.Move(oldLocation, newLocation); err != nil {
log.Errorln("error moving custom emoji directory", err)
}
}
func (app *Application) resetDirectories() {
log.Trace("Resetting file directories to a clean slate.")
// Wipe hls data directory
utils.CleanupDirectory(app.configservice.HLSStoragePath)
// Remove the previous thumbnail
logo := app.configRepository.GetLogoPath()
if utils.DoesFileExists(logo) {
err := utils.Copy(path.Join("data", logo), filepath.Join(config.DataDirectory, "thumbnail.jpg"))
if err != nil {
log.Warnln(err)
}
}
}

View file

@ -1,187 +0,0 @@
package chat
import (
"errors"
"net/http"
"sort"
"github.com/owncast/owncast/core/chat/events"
"github.com/owncast/owncast/models"
"github.com/owncast/owncast/services/config"
"github.com/owncast/owncast/storage/configrepository"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
log "github.com/sirupsen/logrus"
)
var (
getStatus func() models.Status
chatMessagesSentCounter prometheus.Gauge
)
var configRepository = configrepository.Get()
// Start begins the chat server.
func Start(getStatusFunc func() models.Status) error {
setupPersistence()
getStatus = getStatusFunc
_server = NewChat()
go _server.Run()
log.Traceln("Chat server started with max connection count of", _server.maxSocketConnectionLimit)
c := config.GetConfig()
chatMessagesSentCounter = promauto.NewGauge(prometheus.GaugeOpts{
Name: "total_chat_message_count",
Help: "The number of chat messages incremented over time.",
ConstLabels: map[string]string{
"version": c.VersionNumber,
"host": configRepository.GetServerURL(),
},
})
return nil
}
// GetClientsForUser will return chat connections that are owned by a specific user.
func GetClientsForUser(userID string) ([]*Client, error) {
_server.mu.Lock()
defer _server.mu.Unlock()
clients := map[string][]*Client{}
for _, client := range _server.clients {
clients[client.User.ID] = append(clients[client.User.ID], client)
}
if _, exists := clients[userID]; !exists {
return nil, errors.New("no connections for user found")
}
return clients[userID], nil
}
// FindClientByID will return a single connected client by ID.
func FindClientByID(clientID uint) (*Client, bool) {
client, found := _server.clients[clientID]
return client, found
}
// GetClients will return all the current chat clients connected.
func GetClients() []*Client {
clients := []*Client{}
if _server == nil {
return clients
}
// Convert the keyed map to a slice.
for _, client := range _server.clients {
clients = append(clients, client)
}
sort.Slice(clients, func(i, j int) bool {
return clients[i].ConnectedAt.Before(clients[j].ConnectedAt)
})
return clients
}
// SendSystemMessage will send a message string as a system message to all clients.
func SendSystemMessage(text string, ephemeral bool) error {
message := events.SystemMessageEvent{
MessageEvent: events.MessageEvent{
Body: text,
},
}
message.SetDefaults()
message.RenderBody()
if err := Broadcast(&message); err != nil {
log.Errorln("error sending system message", err)
}
if !ephemeral {
saveEvent(message.ID, nil, message.Body, message.GetMessageType(), nil, message.Timestamp, nil, nil, nil, nil)
}
return nil
}
// SendFediverseAction will send a message indicating some Fediverse engagement took place.
func SendFediverseAction(eventType string, userAccountName string, image *string, body string, link string) error {
message := events.FediverseEngagementEvent{
Event: events.Event{
Type: eventType,
},
MessageEvent: events.MessageEvent{
Body: body,
},
UserAccountName: userAccountName,
Image: image,
Link: link,
}
message.SetDefaults()
message.RenderBody()
if err := Broadcast(&message); err != nil {
log.Errorln("error sending system message", err)
return err
}
saveFederatedAction(message)
return nil
}
// SendSystemAction will send a system action string as an action event to all clients.
func SendSystemAction(text string, ephemeral bool) error {
message := events.ActionEvent{
MessageEvent: events.MessageEvent{
Body: text,
},
}
message.SetDefaults()
message.RenderBody()
if err := Broadcast(&message); err != nil {
log.Errorln("error sending system chat action")
}
if !ephemeral {
saveEvent(message.ID, nil, message.Body, message.GetMessageType(), nil, message.Timestamp, nil, nil, nil, nil)
}
return nil
}
// SendAllWelcomeMessage will send the chat message to all connected clients.
func SendAllWelcomeMessage() {
_server.sendAllWelcomeMessage()
}
// SendSystemMessageToClient will send a single message to a single connected chat client.
func SendSystemMessageToClient(clientID uint, text string) {
if client, foundClient := FindClientByID(clientID); foundClient {
_server.sendSystemMessageToClient(client, text)
}
}
// Broadcast will send all connected clients the outbound object provided.
func Broadcast(event events.OutboundEvent) error {
return _server.Broadcast(event.GetBroadcastPayload())
}
// HandleClientConnection handles a single inbound websocket connection.
func HandleClientConnection(w http.ResponseWriter, r *http.Request) {
_server.HandleClientConnection(w, r)
}
// DisconnectClients will forcefully disconnect all clients belonging to a user by ID.
func DisconnectClients(clients []*Client) {
_server.DisconnectClients(clients)
}

View file

@ -1,25 +0,0 @@
package events
// SystemMessageEvent is a message displayed in chat on behalf of the server.
type SystemMessageEvent struct {
Event
MessageEvent
}
// GetBroadcastPayload will return the object to send to all chat users.
func (e *SystemMessageEvent) GetBroadcastPayload() EventPayload {
return EventPayload{
"id": e.ID,
"timestamp": e.Timestamp,
"body": e.Body,
"type": SystemMessageSent,
"user": EventPayload{
"displayName": configRepository.GetServerName(),
},
}
}
// GetMessageType will return the event type for this message.
func (e *SystemMessageEvent) GetMessageType() EventType {
return SystemMessageSent
}

View file

@ -1,17 +0,0 @@
package events
// UserJoinedEvent is the event fired when a user joins chat.
type UserJoinedEvent struct {
Event
UserEvent
}
// GetBroadcastPayload will return the object to send to all chat users.
func (e *UserJoinedEvent) GetBroadcastPayload() EventPayload {
return EventPayload{
"type": UserJoined,
"id": e.ID,
"timestamp": e.Timestamp,
"user": e.User,
}
}

View file

@ -1,25 +0,0 @@
package events
// UserMessageEvent is an inbound message from a user.
type UserMessageEvent struct {
Event
UserEvent
MessageEvent
}
// GetBroadcastPayload will return the object to send to all chat users.
func (e *UserMessageEvent) GetBroadcastPayload() EventPayload {
return EventPayload{
"id": e.ID,
"timestamp": e.Timestamp,
"body": e.Body,
"user": e.User,
"type": MessageSent,
"visible": e.HiddenAt == nil,
}
}
// GetMessageType will return the event type for this message.
func (e *UserMessageEvent) GetMessageType() EventType {
return MessageSent
}

View file

@ -1,22 +0,0 @@
//go:build !windows
// +build !windows
package chat
import (
"syscall"
log "github.com/sirupsen/logrus"
)
func getMaximumConcurrentConnectionLimit() int64 {
var rLimit syscall.Rlimit
if err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rLimit); err != nil {
log.Fatalln(err)
}
// Return the limit to 70% of max so the machine doesn't die even if it's maxed out for some reason.
proposedLimit := int64(float32(rLimit.Max) * 0.7)
return proposedLimit
}

View file

@ -9,9 +9,9 @@ import (
"github.com/owncast/owncast/core/chat"
"github.com/owncast/owncast/models"
"github.com/owncast/owncast/services/auth"
"github.com/owncast/owncast/services/config"
"github.com/owncast/owncast/services/notifications"
"github.com/owncast/owncast/services/status"
"github.com/owncast/owncast/services/webhooks"
"github.com/owncast/owncast/services/yp"
"github.com/owncast/owncast/storage/configrepository"
@ -31,11 +31,10 @@ var (
fileWriter = transcoder.FileWriterReceiverService{}
)
var configRepository = configrepository.Get()
// Start starts up the core processing.
func Start() error {
resetDirectories()
configRepository := configrepository.Get()
configRepository.PopulateDefaults()
@ -59,7 +58,7 @@ func Start() error {
}
// user.SetupUsers()
auth.Setup(data.GetDatastore())
// auth.Setup(data.GetDatastore())
fileWriter.SetupFileWriterReceiverService(&handler)
@ -68,77 +67,29 @@ func Start() error {
return err
}
_yp = yp.NewYP(GetStatus)
s := status.Get()
gsf := func() *models.Status {
s := status.Get()
return &s.Status
}
if err := chat.Start(GetStatus); err != nil {
_yp = yp.NewYP(gsf)
if err := chat.Start(gsf); err != nil {
log.Errorln(err)
}
// start the rtmp server
go rtmp.Start(setStreamAsConnected, setBroadcaster)
go rtmp.Start(setStreamAsConnected, s.SetBroadcaster)
rtmpPort := configRepository.GetRTMPPortNumber()
if rtmpPort != 1935 {
log.Infof("RTMP is accepting inbound streams on port %d.", rtmpPort)
}
webhooks.InitTemporarySingleton(GetStatus)
webhooks.InitTemporarySingleton(gsf)
notifications.Setup(data.GetDatastore())
return nil
}
func createInitialOfflineState() error {
transitionToOfflineVideoStreamContent()
return nil
}
// transitionToOfflineVideoStreamContent will overwrite the current stream with the
// offline video stream state only. No live stream HLS segments will continue to be
// referenced.
func transitionToOfflineVideoStreamContent() {
log.Traceln("Firing transcoder with offline stream state")
_transcoder := transcoder.NewTranscoder()
_transcoder.SetIdentifier("offline")
_transcoder.SetLatencyLevel(models.GetLatencyLevel(4))
_transcoder.SetIsEvent(true)
offlineFilePath, err := saveOfflineClipToDisk("offline.ts")
if err != nil {
log.Fatalln("unable to save offline clip:", err)
}
_transcoder.SetInput(offlineFilePath)
go _transcoder.Start(false)
// Copy the logo to be the thumbnail
logo := configRepository.GetLogoPath()
c := config.GetConfig()
dst := filepath.Join(c.TempDir, "thumbnail.jpg")
if err = utils.Copy(filepath.Join("data", logo), dst); err != nil {
log.Warnln(err)
}
// Delete the preview Gif
_ = os.Remove(path.Join(config.DataDirectory, "preview.gif"))
}
func resetDirectories() {
log.Trace("Resetting file directories to a clean slate.")
// Wipe hls data directory
c := config.GetConfig()
utils.CleanupDirectory(c.HLSStoragePath)
// Remove the previous thumbnail
logo := configRepository.GetLogoPath()
if utils.DoesFileExists(logo) {
err := utils.Copy(path.Join("data", logo), filepath.Join(config.DataDirectory, "thumbnail.jpg"))
if err != nil {
log.Warnln(err)
}
}
}

View file

@ -19,7 +19,7 @@ func appendOfflineToVariantPlaylist(index int, playlistFilePath string) {
return
}
c := config.GetConfig()
c := config.Get()
tmpFileName := fmt.Sprintf("tmp-stream-%d.m3u8", index)
atomicWriteTmpPlaylistFile, err := os.CreateTemp(c.TempDir, tmpFileName)
if err != nil {
@ -50,7 +50,7 @@ func appendOfflineToVariantPlaylist(index int, playlistFilePath string) {
}
func makeVariantIndexOffline(index int, offlineFilePath string, offlineFilename string) {
c := config.GetConfig()
c := config.Get()
playlistFilePath := fmt.Sprintf(filepath.Join(c.HLSStoragePath, "%d/stream.m3u8"), index)
segmentFilePath := fmt.Sprintf(filepath.Join(c.HLSStoragePath, "%d/%s"), index, offlineFilename)
@ -96,7 +96,7 @@ func createEmptyOfflinePlaylist(playlistFilePath string, offlineFilename string)
func saveOfflineClipToDisk(offlineFilename string) (string, error) {
offlineFileData := static.GetOfflineSegment()
c := config.GetConfig()
c := config.Get()
offlineTmpFile, err := os.CreateTemp(c.TempDir, offlineFilename)
if err != nil {
log.Errorln("unable to create temp file for offline video segment", err)

View file

@ -9,6 +9,7 @@ import (
"github.com/owncast/owncast/models"
"github.com/owncast/owncast/services/geoip"
"github.com/owncast/owncast/storage/configrepository"
)
var (
@ -44,6 +45,8 @@ func IsStreamConnected() bool {
return false
}
configRepository := configrepository.Get()
// Kind of a hack. It takes a handful of seconds between a RTMP connection and when HLS data is available.
// So account for that with an artificial buffer of four segments.
timeSinceLastConnected := time.Since(_stats.LastConnectTime.Time).Seconds()
@ -110,6 +113,8 @@ func pruneViewerCount() {
}
func saveStats() {
configRepository := configrepository.Get()
if err := configRepository.SetPeakOverallViewerCount(_stats.OverallMaxViewerCount); err != nil {
log.Errorln("error saving viewer count", err)
}
@ -124,6 +129,8 @@ func saveStats() {
}
func getSavedStats() models.Stats {
configRepository := configrepository.Get()
savedLastDisconnectTime, _ := configRepository.GetLastDisconnectTime()
result := models.Stats{

View file

@ -1,46 +0,0 @@
package core
import (
"github.com/owncast/owncast/models"
"github.com/owncast/owncast/services/config"
)
// GetStatus gets the status of the system.
func GetStatus() models.Status {
if _stats == nil {
return models.Status{}
}
viewerCount := 0
if IsStreamConnected() {
viewerCount = len(_stats.Viewers)
}
c := config.GetConfig()
return models.Status{
Online: IsStreamConnected(),
ViewerCount: viewerCount,
OverallMaxViewerCount: _stats.OverallMaxViewerCount,
SessionMaxViewerCount: _stats.SessionMaxViewerCount,
LastDisconnectTime: _stats.LastDisconnectTime,
LastConnectTime: _stats.LastConnectTime,
VersionNumber: c.VersionNumber,
StreamTitle: configRepository.GetStreamTitle(),
}
}
// GetCurrentBroadcast will return the currently active broadcast.
func GetCurrentBroadcast() *models.CurrentBroadcast {
return _currentBroadcast
}
// setBroadcaster will store the current inbound broadcasting details.
func setBroadcaster(broadcaster models.Broadcaster) {
_broadcaster = &broadcaster
}
// GetBroadcaster will return the details of the currently active broadcaster.
func GetBroadcaster() *models.Broadcaster {
return _broadcaster
}

View file

@ -1,203 +0,0 @@
package core
import (
"context"
"io"
"time"
log "github.com/sirupsen/logrus"
"github.com/owncast/owncast/activitypub"
"github.com/owncast/owncast/core/chat"
"github.com/owncast/owncast/models"
"github.com/owncast/owncast/services/config"
"github.com/owncast/owncast/services/notifications"
"github.com/owncast/owncast/services/webhooks"
"github.com/owncast/owncast/storage/data"
"github.com/owncast/owncast/utils"
"github.com/owncast/owncast/video/rtmp"
"github.com/owncast/owncast/video/transcoder"
)
// After the stream goes offline this timer fires a full cleanup after N min.
var _offlineCleanupTimer *time.Timer
// While a stream takes place cleanup old HLS content every N min.
var _onlineCleanupTicker *time.Ticker
var _currentBroadcast *models.CurrentBroadcast
var _onlineTimerCancelFunc context.CancelFunc
var _lastNotified *time.Time
// setStreamAsConnected sets the stream as connected.
func setStreamAsConnected(rtmpOut *io.PipeReader) {
now := utils.NullTime{Time: time.Now(), Valid: true}
_stats.StreamConnected = true
_stats.LastDisconnectTime = nil
_stats.LastConnectTime = &now
_stats.SessionMaxViewerCount = 0
_currentBroadcast = &models.CurrentBroadcast{
LatencyLevel: configRepository.GetStreamLatencyLevel(),
OutputSettings: configRepository.GetStreamOutputVariants(),
}
StopOfflineCleanupTimer()
startOnlineCleanupTimer()
if _yp != nil {
go _yp.Start()
}
c := config.GetConfig()
segmentPath := c.HLSStoragePath
if err := setupStorage(); err != nil {
log.Fatalln("failed to setup the storage", err)
}
go func() {
_transcoder = transcoder.NewTranscoder()
_transcoder.TranscoderCompleted = func(error) {
SetStreamAsDisconnected()
_transcoder = nil
_currentBroadcast = nil
}
_transcoder.SetStdin(rtmpOut)
_transcoder.Start(true)
}()
webhookManager := webhooks.Get()
go webhookManager.SendStreamStatusEvent(models.StreamStarted)
transcoder.StartThumbnailGenerator(segmentPath, configRepository.FindHighestVideoQualityIndex(_currentBroadcast.OutputSettings))
_ = chat.SendSystemAction("Stay tuned, the stream is **starting**!", true)
chat.SendAllWelcomeMessage()
// Send delayed notification messages.
_onlineTimerCancelFunc = startLiveStreamNotificationsTimer()
}
// SetStreamAsDisconnected sets the stream as disconnected.
func SetStreamAsDisconnected() {
_ = chat.SendSystemAction("The stream is ending.", true)
now := utils.NullTime{Time: time.Now(), Valid: true}
if _onlineTimerCancelFunc != nil {
_onlineTimerCancelFunc()
}
_stats.StreamConnected = false
_stats.LastDisconnectTime = &now
_stats.LastConnectTime = nil
_broadcaster = nil
offlineFilename := "offline.ts"
offlineFilePath, err := saveOfflineClipToDisk(offlineFilename)
if err != nil {
log.Errorln(err)
return
}
transcoder.StopThumbnailGenerator()
rtmp.Disconnect()
if _yp != nil {
_yp.Stop()
}
// If there is no current broadcast available the previous stream
// likely failed for some reason. Don't try to append to it.
// Just transition to offline.
if _currentBroadcast == nil {
stopOnlineCleanupTimer()
transitionToOfflineVideoStreamContent()
log.Errorln("unexpected nil _currentBroadcast")
return
}
for index := range _currentBroadcast.OutputSettings {
makeVariantIndexOffline(index, offlineFilePath, offlineFilename)
}
StartOfflineCleanupTimer()
stopOnlineCleanupTimer()
saveStats()
webhookManager := webhooks.Get()
go webhookManager.SendStreamStatusEvent(models.StreamStopped)
}
// StartOfflineCleanupTimer will fire a cleanup after n minutes being disconnected.
func StartOfflineCleanupTimer() {
_offlineCleanupTimer = time.NewTimer(5 * time.Minute)
go func() {
for range _offlineCleanupTimer.C {
// Set video to offline state
resetDirectories()
transitionToOfflineVideoStreamContent()
}
}()
}
// StopOfflineCleanupTimer will stop the previous cleanup timer.
func StopOfflineCleanupTimer() {
if _offlineCleanupTimer != nil {
_offlineCleanupTimer.Stop()
}
}
func startOnlineCleanupTimer() {
_onlineCleanupTicker = time.NewTicker(1 * time.Minute)
go func() {
for range _onlineCleanupTicker.C {
if err := _storage.Cleanup(); err != nil {
log.Errorln(err)
}
}
}()
}
func stopOnlineCleanupTimer() {
if _onlineCleanupTicker != nil {
_onlineCleanupTicker.Stop()
}
}
func startLiveStreamNotificationsTimer() context.CancelFunc {
// Send delayed notification messages.
c, cancelFunc := context.WithCancel(context.Background())
_onlineTimerCancelFunc = cancelFunc
go func(c context.Context) {
select {
case <-time.After(time.Minute * 2.0):
if _lastNotified != nil && time.Since(*_lastNotified) < 10*time.Minute {
return
}
// Send Fediverse message.
if configRepository.GetFederationEnabled() {
log.Traceln("Sending Federated Go Live message.")
if err := activitypub.SendLive(); err != nil {
log.Errorln(err)
}
}
// Send notification to those who have registered for them.
if notifier, err := notifications.New(data.GetDatastore()); err != nil {
log.Errorln(err)
} else {
notifier.Notify()
}
now := time.Now()
_lastNotified = &now
case <-c.Done():
}
}(c)
return cancelFunc
}

View file

@ -30,9 +30,9 @@ type OCLogger struct {
var Logger *OCLogger
// Setup configures our custom logging destinations.
func Setup(enableDebugOptions bool, enableVerboseLogging bool) {
func Setup(enableDebugOptions bool, enableVerboseLogging bool, logDirectory string) {
// Create the logging directory if needed
loggingDirectory := filepath.Dir(getLogFilePath())
loggingDirectory := filepath.Dir(logDirectory)
if !utils.DoesFileExists(loggingDirectory) {
if err := os.Mkdir(loggingDirectory, 0o700); err != nil {
logger.Errorln("unable to create logs directory", loggingDirectory, err)
@ -40,10 +40,10 @@ func Setup(enableDebugOptions bool, enableVerboseLogging bool) {
}
// Write logs to a file
path := getLogFilePath()
logFile := filepath.Join(logDirectory, "owncast.log")
writer, _ := rotatelogs.New(
path+".%Y%m%d%H%M",
rotatelogs.WithLinkName(path),
logFile+".%Y%m%d%H%M",
rotatelogs.WithLinkName(logFile),
rotatelogs.WithMaxAge(time.Duration(86400)*time.Second),
rotatelogs.WithRotationTime(time.Duration(604800)*time.Second),
)

View file

@ -1,18 +0,0 @@
package logging
import (
"path/filepath"
"github.com/owncast/owncast/services/config"
)
// GetTranscoderLogFilePath returns the logging path for the transcoder log output.
func GetTranscoderLogFilePath() string {
c := config.GetConfig()
return filepath.Join(c.LogDirectory, "transcoder.log")
}
func getLogFilePath() string {
c := config.GetConfig()
return filepath.Join(c.LogDirectory, "owncast.log")
}

192
main.go
View file

@ -1,173 +1,43 @@
package main
import (
"flag"
"os"
"strconv"
import "github.com/owncast/owncast/cmd"
"github.com/owncast/owncast/logging"
"github.com/owncast/owncast/storage/configrepository"
"github.com/owncast/owncast/webserver"
log "github.com/sirupsen/logrus"
"github.com/owncast/owncast/core"
configservice "github.com/owncast/owncast/services/config"
"github.com/owncast/owncast/services/metrics"
"github.com/owncast/owncast/utils"
)
var (
dbFile = flag.String("database", "", "Path to the database file.")
logDirectory = flag.String("logdir", "", "Directory where logs will be written to")
backupDirectory = flag.String("backupdir", "", "Directory where backups will be written to")
enableDebugOptions = flag.Bool("enableDebugFeatures", false, "Enable additional debugging options.")
enableVerboseLogging = flag.Bool("enableVerboseLogging", false, "Enable additional logging.")
restoreDatabaseFile = flag.String("restoreDatabase", "", "Restore an Owncast database backup")
newAdminPassword = flag.String("adminpassword", "", "Set your admin password")
newStreamKey = flag.String("streamkey", "", "Set a temporary stream key for this session")
webServerPortOverride = flag.String("webserverport", "", "Force the web server to listen on a specific port")
webServerIPOverride = flag.String("webserverip", "", "Force web server to listen on this IP address")
rtmpPortOverride = flag.Int("rtmpport", 0, "Set listen port for the RTMP server")
config *configservice.Config
)
var configRepository = configrepository.Get()
// nolint:cyclop
func main() {
flag.Parse()
config = configservice.NewConfig()
if *logDirectory != "" {
config.LogDirectory = *logDirectory
}
if *backupDirectory != "" {
config.BackupDirectory = *backupDirectory
}
// Create the data directory if needed
if !utils.DoesFileExists("data") {
if err := os.Mkdir("./data", 0o700); err != nil {
log.Fatalln("Cannot create data directory", err)
}
}
// Migrate old (pre 0.1.0) emoji to new location if they exist.
utils.MigrateCustomEmojiLocations()
// Otherwise save the default emoji to the data directory.
if err := data.SetupEmojiDirectory(); err != nil {
log.Fatalln("Cannot set up emoji directory", err)
}
// Recreate the temp dir
if utils.DoesFileExists(config.TempDir) {
err := os.RemoveAll(config.TempDir)
if err != nil {
log.Fatalln("Unable to remove temp dir! Check permissions.", config.TempDir, err)
}
}
if err := os.Mkdir(config.TempDir, 0o700); err != nil {
log.Fatalln("Unable to create temp dir!", err)
}
configureLogging(*enableDebugOptions, *enableVerboseLogging)
log.Infoln(config.GetReleaseString())
// Allows a user to restore a specific database backup
if *restoreDatabaseFile != "" {
databaseFile := config.DatabaseFilePath
if *dbFile != "" {
databaseFile = *dbFile
}
if err := utils.Restore(*restoreDatabaseFile, databaseFile); err != nil {
log.Fatalln(err)
}
log.Println("Database has been restored. Restart Owncast.")
log.Exit(0)
}
config.EnableDebugFeatures = *enableDebugOptions
if *dbFile != "" {
config.DatabaseFilePath = *dbFile
}
if err := data.SetupPersistence(config.DatabaseFilePath); err != nil {
log.Fatalln("failed to open database", err)
}
handleCommandLineFlags()
// starts the core
if err := core.Start(); err != nil {
log.Fatalln("failed to start the core package", err)
}
go metrics.Start(core.GetStatus)
webserver := webserver.New()
if err := webserver.Start(config.WebServerIP, config.WebServerPort); err != nil {
log.Fatalln("failed to start/run the web server", err)
}
app := &cmd.Application{}
app.Start()
}
func handleCommandLineFlags() {
if *newAdminPassword != "" {
if err := configRepository.SetAdminPassword(*newAdminPassword); err != nil {
log.Errorln("Error setting your admin password.", err)
log.Exit(1)
} else {
log.Infoln("Admin password changed")
}
}
// var configRepository = configrepository.Get()
if *newStreamKey != "" {
log.Println("Temporary stream key is set for this session.")
config.TemporaryStreamKey = *newStreamKey
}
// // nolint:cyclop
// func main() {
// flag.Parse()
// Set the web server port
if *webServerPortOverride != "" {
portNumber, err := strconv.Atoi(*webServerPortOverride)
if err != nil {
log.Warnln(err)
return
}
// config = configservice.NewConfig()
log.Println("Saving new web server port number to", portNumber)
if err := configRepository.SetHTTPPortNumber(float64(portNumber)); err != nil {
log.Errorln(err)
}
}
config.WebServerPort = configRepository.GetHTTPPortNumber()
// // Otherwise save the default emoji to the data directory.
// if err := data.SetupEmojiDirectory(); err != nil {
// log.Fatalln("Cannot set up emoji directory", err)
// }
// Set the web server ip
if *webServerIPOverride != "" {
log.Println("Saving new web server listen IP address to", *webServerIPOverride)
if err := configRepository.SetHTTPListenAddress(*webServerIPOverride); err != nil {
log.Errorln(err)
}
}
config.WebServerIP = configRepository.GetHTTPListenAddress()
// if err := data.SetupPersistence(config.DatabaseFilePath); err != nil {
// log.Fatalln("failed to open database", err)
// }
// Set the rtmp server port
if *rtmpPortOverride > 0 {
log.Println("Saving new RTMP server port number to", *rtmpPortOverride)
if err := configRepository.SetRTMPPortNumber(float64(*rtmpPortOverride)); err != nil {
log.Errorln(err)
}
}
}
// handleCommandLineFlags()
func configureLogging(enableDebugFeatures bool, enableVerboseLogging bool) {
logging.Setup(enableDebugFeatures, enableVerboseLogging)
log.SetFormatter(&log.TextFormatter{
FullTimestamp: true,
})
}
// // starts the core
// if err := core.Start(); err != nil {
// log.Fatalln("failed to start the core package", err)
// }
// go metrics.Start(core.GetStatus)
// webserver := webserver.New()
// if err := webserver.Start(config.WebServerIP, config.WebServerPort); err != nil {
// log.Fatalln("failed to start/run the web server", err)
// }
// }
// func handleCommandLineFlags() {
// }

View file

@ -0,0 +1,12 @@
package models
const (
// ScopeCanSendChatMessages will allow sending chat messages as itself.
ScopeCanSendChatMessages = "CAN_SEND_MESSAGES"
// ScopeCanSendSystemMessages will allow sending chat messages as the system.
ScopeCanSendSystemMessages = "CAN_SEND_SYSTEM_MESSAGES"
// ScopeHasAdminAccess will allow performing administrative actions on the server.
ScopeHasAdminAccess = "HAS_ADMIN_ACCESS"
ModeratorScopeKey = "MODERATOR"
)

View file

@ -1,4 +1,4 @@
package events
package models
// EventType is the type of a websocket event.
type EventType = string
@ -16,6 +16,8 @@ const (
UserColorChanged EventType = "COLOR_CHANGE"
// VisibiltyUpdate is the event sent when a chat message's visibility changes.
VisibiltyUpdate EventType = "VISIBILITY-UPDATE"
// VisibiltyToggled is the event sent when a chat message's visibility changes.
VisibiltyToggled EventType = "VISIBILITY-UPDATE"
// PING is a ping message.
PING EventType = "PING"
// PONG is a pong message.

View file

@ -1,9 +1,7 @@
package events
import "github.com/owncast/owncast/models"
package models
// ConnectedClientInfo represents the information about a connected client.
type ConnectedClientInfo struct {
Event
User *models.User `json:"user"`
User *User `json:"user"`
}

20
models/event.go Normal file
View file

@ -0,0 +1,20 @@
package models
import (
"time"
"github.com/teris-io/shortid"
)
// Event is any kind of event. A type is required to be specified.
type Event struct {
Timestamp time.Time `json:"timestamp"`
Type EventType `json:"type,omitempty"`
ID string `json:"id"`
}
// SetDefaults will set default properties of all inbound events.
func (e *Event) SetDefaults() {
e.ID = shortid.MustGenerate()
e.Timestamp = time.Now()
}

4
models/eventPayload.go Normal file
View file

@ -0,0 +1,4 @@
package models
// EventPayload is a generic key/value map for sending out to chat clients.
type EventPayload map[string]interface{}

View file

@ -1,29 +0,0 @@
package models
// EventType is the type of a websocket event.
type EventType = string
const (
// MessageSent is the event sent when a chat event takes place.
MessageSent EventType = "CHAT"
// UserJoined is the event sent when a chat user join action takes place.
UserJoined EventType = "USER_JOINED"
// UserNameChanged is the event sent when a chat username change takes place.
UserNameChanged EventType = "NAME_CHANGE"
// VisibiltyToggled is the event sent when a chat message's visibility changes.
VisibiltyToggled EventType = "VISIBILITY-UPDATE"
// PING is a ping message.
PING EventType = "PING"
// PONG is a pong message.
PONG EventType = "PONG"
// StreamStarted represents a stream started event.
StreamStarted EventType = "STREAM_STARTED"
// StreamStopped represents a stream stopped event.
StreamStopped EventType = "STREAM_STOPPED"
// StreamTitleUpdated is the event sent when a stream's title changes.
StreamTitleUpdated EventType = "STREAM_TITLE_UPDATED"
// SystemMessageSent is the event sent when a system message is sent.
SystemMessageSent EventType = "SYSTEM"
// ChatActionSent is a generic chat action that can be used for anything that doesn't need specific handling or formatting.
ChatActionSent EventType = "CHAT_ACTION"
)

View file

@ -1,4 +1,4 @@
package events
package models
// NameChangeEvent is received when a user changes their chat display name.
type NameChangeEvent struct {

View file

@ -1,4 +1,4 @@
package events
package models
// SetMessageVisibilityEvent is the event fired when one or more message
// visibilities are changed.

View file

@ -6,9 +6,9 @@ import (
// Stats holds the stats for the system.
type Stats struct {
LastConnectTime *utils.NullTime `json:"lastConnectTime"`
LastDisconnectTime *utils.NullTime `json:"lastDisconnectTime"`
LastConnectTime *utils.NullTime `json:"-"`
ChatClients map[string]Client `json:"-"`
Viewers map[string]*Viewer `json:"-"`
SessionMaxViewerCount int `json:"sessionMaxViewerCount"`

View file

@ -1,4 +1,4 @@
package events
package models
// UserDisabledEvent is the event fired when a user is banned/blocked and disconnected from chat.
type UserDisabledEvent struct {

10
models/userEvent.go Normal file
View file

@ -0,0 +1,10 @@
package models
import "time"
// UserEvent is an event with an associated user.
type UserEvent struct {
User *User `json:"user"`
HiddenAt *time.Time `json:"hiddenAt,omitempty"`
ClientID uint `json:"clientId,omitempty"`
}

View file

@ -1,11 +1,17 @@
package models
import "time"
// UserJoinedEvent represents an event when a user joins the chat.
// UserJoinedEvent is the event fired when a user joins chat.
type UserJoinedEvent struct {
Timestamp time.Time `json:"timestamp,omitempty"`
Username string `json:"username"`
Type EventType `json:"type"`
ID string `json:"id"`
Event
UserEvent
}
// GetBroadcastPayload will return the object to send to all chat users.
func (e *UserJoinedEvent) GetBroadcastPayload() EventPayload {
return EventPayload{
"type": UserJoined,
"id": e.ID,
"timestamp": e.Timestamp,
"user": e.User,
}
}

View file

@ -0,0 +1,82 @@
package apfederation
import (
"github.com/owncast/owncast/services/apfederation/crypto"
"github.com/owncast/owncast/services/apfederation/outbox"
"github.com/owncast/owncast/services/apfederation/workerpool"
"github.com/owncast/owncast/storage/configrepository"
"github.com/owncast/owncast/storage/data"
"github.com/owncast/owncast/storage/federationrepository"
"github.com/owncast/owncast/models"
log "github.com/sirupsen/logrus"
)
type APFederation struct {
workers *workerpool.WorkerPool
outbox *outbox.APOutbox
}
func New() *APFederation {
ds := data.GetDatastore()
apf := &APFederation{
outbox: outbox.Get(),
}
apf.Start(ds)
return apf
}
var temporaryGlobalInstance *APFederation
func Get() *APFederation {
if temporaryGlobalInstance == nil {
temporaryGlobalInstance = New()
}
return temporaryGlobalInstance
}
// Start will initialize and start the federation support.
func (ap *APFederation) Start(datastore *data.Store) {
configRepository := configrepository.Get()
// workerpool.InitOutboundWorkerPool()
// ap.InitInboxWorkerPool()
// Generate the keys for signing federated activity if needed.
if configRepository.GetPrivateKey() == "" {
privateKey, publicKey, err := crypto.GenerateKeys()
_ = configRepository.SetPrivateKey(string(privateKey))
_ = configRepository.SetPublicKey(string(publicKey))
if err != nil {
log.Errorln("Unable to get private key", err)
}
}
}
// SendLive will send a "Go Live" message to followers.
func (ap *APFederation) SendLive() error {
return ap.SendLive()
}
// SendPublicFederatedMessage will send an arbitrary provided message to followers.
func (ap *APFederation) SendPublicFederatedMessage(message string) error {
return ap.outbox.SendPublicMessage(message)
}
// SendDirectFederatedMessage will send a direct message to a single account.
func (ap *APFederation) SendDirectFederatedMessage(message, account string) error {
return ap.outbox.SendDirectMessageToAccount(message, account)
}
// GetFollowerCount will return the local tracked follower count.
func (ap *APFederation) GetFollowerCount() (int64, error) {
federationRepository := federationrepository.Get()
return federationRepository.GetFollowerCount()
}
// GetPendingFollowRequests will return the pending follow requests.
func (ap *APFederation) GetPendingFollowRequests() ([]models.Follower, error) {
federationRepository := federationrepository.Get()
return federationRepository.GetPendingFollowRequests()
}

View file

@ -8,8 +8,8 @@ import (
"github.com/go-fed/activity/streams"
"github.com/go-fed/activity/streams/vocab"
"github.com/owncast/owncast/activitypub/crypto"
"github.com/owncast/owncast/models"
"github.com/owncast/owncast/services/apfederation/crypto"
"github.com/owncast/owncast/storage/configrepository"
log "github.com/sirupsen/logrus"
)

View file

@ -77,7 +77,7 @@ func CreateSignedRequest(payload []byte, url *url.URL, fromActorIRI *url.URL) (*
req, _ := http.NewRequest("POST", url.String(), bytes.NewBuffer(payload))
c := config.GetConfig()
c := config.Get()
ua := fmt.Sprintf("%s; https://owncast.online", c.GetReleaseString())
req.Header.Set("User-Agent", ua)

View file

@ -5,23 +5,22 @@ import (
"time"
"github.com/go-fed/activity/streams/vocab"
"github.com/owncast/owncast/activitypub/persistence"
"github.com/owncast/owncast/core/chat/events"
"github.com/owncast/owncast/models"
"github.com/pkg/errors"
)
func handleAnnounceRequest(c context.Context, activity vocab.ActivityStreamsAnnounce) error {
func (api *APInbox) handleAnnounceRequest(c context.Context, activity vocab.ActivityStreamsAnnounce) error {
object := activity.GetActivityStreamsObject()
actorReference := activity.GetActivityStreamsActor()
objectIRI := object.At(0).GetIRI().String()
actorIRI := actorReference.At(0).GetIRI().String()
if hasPreviouslyhandled, err := persistence.HasPreviouslyHandledInboundActivity(objectIRI, actorIRI, events.FediverseEngagementRepost); hasPreviouslyhandled || err != nil {
if hasPreviouslyhandled, err := api.federationRepository.HasPreviouslyHandledInboundActivity(objectIRI, actorIRI, models.FediverseEngagementRepost); hasPreviouslyhandled || err != nil {
return errors.Wrap(err, "inbound activity of share/re-post has already been handled")
}
// Shares need to match a post we had already sent.
_, isLiveNotification, timestamp, err := persistence.GetObjectByIRI(objectIRI)
_, isLiveNotification, timestamp, err := api.federationRepository.GetObjectByIRI(objectIRI)
if err != nil {
return errors.Wrap(err, "Could not find post locally")
}
@ -32,9 +31,9 @@ func handleAnnounceRequest(c context.Context, activity vocab.ActivityStreamsAnno
}
// Save as an accepted activity
if err := persistence.SaveInboundFediverseActivity(objectIRI, actorIRI, events.FediverseEngagementRepost, time.Now()); err != nil {
if err := api.federationRepository.SaveInboundFediverseActivity(objectIRI, actorIRI, models.FediverseEngagementRepost, time.Now()); err != nil {
return errors.Wrap(err, "unable to save inbound share/re-post activity")
}
return handleEngagementActivity(events.FediverseEngagementRepost, isLiveNotification, actorReference, events.FediverseEngagementRepost)
return api.handleEngagementActivity(models.FediverseEngagementRepost, isLiveNotification, actorReference, models.FediverseEngagementRepost)
}

View file

@ -0,0 +1,59 @@
package inbox
import (
"fmt"
"github.com/go-fed/activity/streams/vocab"
"github.com/owncast/owncast/models"
)
func (api *APInbox) handleEngagementActivity(eventType models.EventType, isLiveNotification bool, actorReference vocab.ActivityStreamsActorProperty, action string) error {
// Do nothing if displaying engagement actions has been turned off.
if !api.configRepository.GetFederationShowEngagement() {
return nil
}
// Do nothing if chat is disabled
if api.configRepository.GetChatDisabled() {
return nil
}
// Get actor of the action
actor, _ := api.resolvers.GetResolvedActorFromActorProperty(actorReference)
// Send chat message
actorName := actor.Name
if actorName == "" {
actorName = actor.Username
}
actorIRI := actorReference.Begin().GetIRI().String()
userPrefix := fmt.Sprintf("%s ", actorName)
var suffix string
if isLiveNotification && action == models.FediverseEngagementLike {
suffix = "liked that this stream went live."
} else if action == models.FediverseEngagementLike {
suffix = fmt.Sprintf("liked a post from %s.", api.configRepository.GetServerName())
} else if isLiveNotification && action == models.FediverseEngagementRepost {
suffix = "shared this stream with their followers."
} else if action == models.FediverseEngagementRepost {
suffix = fmt.Sprintf("shared a post from %s.", api.configRepository.GetServerName())
} else if action == models.FediverseEngagementFollow {
suffix = "followed this stream."
} else {
return fmt.Errorf("could not handle event for sending to chat: %s", action)
}
body := fmt.Sprintf("%s %s", userPrefix, suffix)
var image *string
if actor.Image != nil {
s := actor.Image.String()
image = &s
}
if err := api.chatService.SendFediverseAction(eventType, actor.FullUsername, image, body, actorIRI); err != nil {
return err
}
return nil
}

View file

@ -7,7 +7,7 @@ import (
"github.com/pkg/errors"
)
func handleCreateRequest(c context.Context, activity vocab.ActivityStreamsCreate) error {
func (api *APInbox) handleCreateRequest(c context.Context, activity vocab.ActivityStreamsCreate) error {
iri := activity.GetJSONLDId().GetIRI().String()
return errors.New("not handling create request of: " + iri)
}

View file

@ -6,17 +6,15 @@ import (
"time"
"github.com/go-fed/activity/streams/vocab"
"github.com/owncast/owncast/activitypub/persistence"
"github.com/owncast/owncast/activitypub/requests"
"github.com/owncast/owncast/activitypub/resolvers"
"github.com/owncast/owncast/core/chat/events"
"github.com/owncast/owncast/models"
"github.com/owncast/owncast/services/apfederation/outbox"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)
func handleFollowInboxRequest(c context.Context, activity vocab.ActivityStreamsFollow) error {
follow, err := resolvers.MakeFollowRequest(c, activity)
func (api *APInbox) handleFollowInboxRequest(c context.Context, activity vocab.ActivityStreamsFollow) error {
follow, err := api.resolvers.MakeFollowRequest(c, activity)
if err != nil {
log.Errorln("unable to create follow inbox request", err)
return err
@ -26,19 +24,21 @@ func handleFollowInboxRequest(c context.Context, activity vocab.ActivityStreamsF
return fmt.Errorf("unable to handle request")
}
approved := !configRepository.GetFederationIsPrivate()
approved := !api.configRepository.GetFederationIsPrivate()
followRequest := *follow
if err := persistence.AddFollow(followRequest, approved); err != nil {
if err := api.federationRepository.AddFollow(followRequest, approved); err != nil {
log.Errorln("unable to save follow request", err)
return err
}
localAccountName := configRepository.GetDefaultFederationUsername()
localAccountName := api.configRepository.GetDefaultFederationUsername()
ob := outbox.Get()
if approved {
if err := requests.SendFollowAccept(follow.Inbox, activity, localAccountName); err != nil {
if err := ob.SendFollowAccept(follow.Inbox, activity, localAccountName); err != nil {
log.Errorln("unable to send follow accept", err)
return err
}
@ -54,27 +54,27 @@ func handleFollowInboxRequest(c context.Context, activity vocab.ActivityStreamsF
// chat due to a previous follow request, then do so.
hasPreviouslyhandled := true // Default so we don't send anything if it fails.
if approved {
hasPreviouslyhandled, err = persistence.HasPreviouslyHandledInboundActivity(objectIRI, actorIRI, events.FediverseEngagementFollow)
hasPreviouslyhandled, err = api.federationRepository.HasPreviouslyHandledInboundActivity(objectIRI, actorIRI, models.FediverseEngagementFollow)
if err != nil {
log.Errorln("error checking for previously handled follow activity", err)
}
}
// Save this follow action to our activities table.
if err := persistence.SaveInboundFediverseActivity(objectIRI, actorIRI, events.FediverseEngagementFollow, time.Now()); err != nil {
if err := api.federationRepository.SaveInboundFediverseActivity(objectIRI, actorIRI, models.FediverseEngagementFollow, time.Now()); err != nil {
return errors.Wrap(err, "unable to save inbound share/re-post activity")
}
// Send action to chat if it has not been previously handled.
if !hasPreviouslyhandled {
return handleEngagementActivity(events.FediverseEngagementFollow, false, actorReference, events.FediverseEngagementFollow)
return api.handleEngagementActivity(models.FediverseEngagementFollow, false, actorReference, models.FediverseEngagementFollow)
}
return nil
}
func handleUnfollowRequest(c context.Context, activity vocab.ActivityStreamsUndo) error {
request := resolvers.MakeUnFollowRequest(c, activity)
func (api *APInbox) handleUnfollowRequest(c context.Context, activity vocab.ActivityStreamsUndo) error {
request := api.resolvers.MakeUnFollowRequest(c, activity)
if request == nil {
log.Errorf("unable to handle unfollow request")
return errors.New("unable to handle unfollow request")
@ -83,5 +83,5 @@ func handleUnfollowRequest(c context.Context, activity vocab.ActivityStreamsUndo
unfollowRequest := *request
log.Traceln("unfollow request:", unfollowRequest)
return persistence.RemoveFollow(unfollowRequest)
return api.federationRepository.RemoveFollow(unfollowRequest)
}

View file

@ -0,0 +1,37 @@
package inbox
import (
"github.com/owncast/owncast/services/apfederation/requests"
"github.com/owncast/owncast/services/apfederation/resolvers"
"github.com/owncast/owncast/services/chat"
"github.com/owncast/owncast/storage/configrepository"
"github.com/owncast/owncast/storage/federationrepository"
)
type APInbox struct {
configRepository configrepository.ConfigRepository
federationRepository *federationrepository.FederationRepository
resolvers *resolvers.APResolvers
requests *requests.Requests
chatService *chat.Chat
}
func New() *APInbox {
return &APInbox{
configRepository: configrepository.Get(),
federationRepository: federationrepository.Get(),
resolvers: resolvers.Get(),
requests: requests.Get(),
chatService: chat.Get(),
}
}
var temporaryGlobalInstance *APInbox
func Get() *APInbox {
if temporaryGlobalInstance == nil {
temporaryGlobalInstance = New()
}
return temporaryGlobalInstance
}

View file

@ -5,12 +5,11 @@ import (
"time"
"github.com/go-fed/activity/streams/vocab"
"github.com/owncast/owncast/activitypub/persistence"
"github.com/owncast/owncast/core/chat/events"
"github.com/owncast/owncast/models"
"github.com/pkg/errors"
)
func handleLikeRequest(c context.Context, activity vocab.ActivityStreamsLike) error {
func (api *APInbox) handleLikeRequest(c context.Context, activity vocab.ActivityStreamsLike) error {
object := activity.GetActivityStreamsObject()
actorReference := activity.GetActivityStreamsActor()
if object.Len() < 1 {
@ -24,12 +23,12 @@ func handleLikeRequest(c context.Context, activity vocab.ActivityStreamsLike) er
objectIRI := object.At(0).GetIRI().String()
actorIRI := actorReference.At(0).GetIRI().String()
if hasPreviouslyhandled, err := persistence.HasPreviouslyHandledInboundActivity(objectIRI, actorIRI, events.FediverseEngagementLike); hasPreviouslyhandled || err != nil {
if hasPreviouslyhandled, err := api.federationRepository.HasPreviouslyHandledInboundActivity(objectIRI, actorIRI, models.FediverseEngagementLike); hasPreviouslyhandled || err != nil {
return errors.Wrap(err, "inbound activity of like has already been handled")
}
// Likes need to match a post we had already sent.
_, isLiveNotification, timestamp, err := persistence.GetObjectByIRI(objectIRI)
_, isLiveNotification, timestamp, err := api.federationRepository.GetObjectByIRI(objectIRI)
if err != nil {
return errors.Wrap(err, "Could not find post locally")
}
@ -40,9 +39,9 @@ func handleLikeRequest(c context.Context, activity vocab.ActivityStreamsLike) er
}
// Save as an accepted activity
if err := persistence.SaveInboundFediverseActivity(objectIRI, actorIRI, events.FediverseEngagementLike, time.Now()); err != nil {
if err := api.federationRepository.SaveInboundFediverseActivity(objectIRI, actorIRI, models.FediverseEngagementLike, time.Now()); err != nil {
return errors.Wrap(err, "unable to save inbound like activity")
}
return handleEngagementActivity(events.FediverseEngagementLike, isLiveNotification, actorReference, events.FediverseEngagementLike)
return api.handleEngagementActivity(models.FediverseEngagementLike, isLiveNotification, actorReference, models.FediverseEngagementLike)
}

View file

@ -8,13 +8,13 @@ import (
"github.com/go-fed/activity/streams/vocab"
)
func handleUndoInboxRequest(c context.Context, activity vocab.ActivityStreamsUndo) error {
func (api *APInbox) handleUndoInboxRequest(c context.Context, activity vocab.ActivityStreamsUndo) error {
// Determine if this is an undo of a follow, favorite, announce, etc.
o := activity.GetActivityStreamsObject()
for iter := o.Begin(); iter != o.End(); iter = iter.Next() {
if iter.IsActivityStreamsFollow() {
// This is an Unfollow request
if err := handleUnfollowRequest(c, activity); err != nil {
if err := api.handleUnfollowRequest(c, activity); err != nil {
return err
}
} else {

View file

@ -0,0 +1,23 @@
package inbox
import (
"context"
"github.com/go-fed/activity/streams/vocab"
log "github.com/sirupsen/logrus"
)
func (api *APInbox) handleUpdateRequest(c context.Context, activity vocab.ActivityStreamsUpdate) error {
// We only care about update events to followers.
if !activity.GetActivityStreamsObject().At(0).IsActivityStreamsPerson() {
return nil
}
actor, err := api.resolvers.GetResolvedActorFromActorProperty(activity.GetActivityStreamsActor())
if err != nil {
log.Errorln(err)
return err
}
return api.federationRepository.UpdateFollower(actor.ActorIri.String(), actor.Inbox.String(), actor.Name, actor.FullUsername, actor.Image.String())
}

View file

@ -12,15 +12,13 @@ import (
"github.com/pkg/errors"
"github.com/go-fed/httpsig"
"github.com/owncast/owncast/activitypub/apmodels"
"github.com/owncast/owncast/activitypub/persistence"
"github.com/owncast/owncast/activitypub/resolvers"
"github.com/owncast/owncast/services/apfederation/apmodels"
log "github.com/sirupsen/logrus"
)
func handle(request apmodels.InboxRequest) {
if verified, err := Verify(request.Request); err != nil {
func (api *APInbox) handle(request apmodels.InboxRequest) {
if verified, err := api.Verify(request.Request); err != nil {
log.Debugln("Error in attempting to verify request", err)
return
} else if !verified {
@ -28,7 +26,7 @@ func handle(request apmodels.InboxRequest) {
return
}
if err := resolvers.Resolve(context.Background(), request.Body, handleUpdateRequest, handleFollowInboxRequest, handleLikeRequest, handleAnnounceRequest, handleUndoInboxRequest, handleCreateRequest); err != nil {
if err := api.resolvers.Resolve(context.Background(), request.Body, api.handleUpdateRequest, api.handleFollowInboxRequest, api.handleLikeRequest, api.handleAnnounceRequest, api.handleUndoInboxRequest, api.handleCreateRequest); err != nil {
log.Debugln("resolver error:", err)
}
}
@ -36,7 +34,7 @@ func handle(request apmodels.InboxRequest) {
// Verify will Verify the http signature of an inbound request as well as
// check it against the list of blocked domains.
// nolint: cyclop
func Verify(request *http.Request) (bool, error) {
func (api *APInbox) Verify(request *http.Request) (bool, error) {
verifier, err := httpsig.NewVerifier(request)
if err != nil {
return false, errors.Wrap(err, "failed to create key verifier for request")
@ -71,7 +69,7 @@ func Verify(request *http.Request) (bool, error) {
return false, errors.New("Unable to determine algorithm to verify request")
}
publicKey, err := resolvers.GetResolvedPublicKeyFromIRI(pubKeyID.String())
publicKey, err := api.resolvers.GetResolvedPublicKeyFromIRI(pubKeyID.String())
if err != nil {
return false, errors.Wrap(err, "failed to resolve actor from IRI to fetch key")
}
@ -86,12 +84,12 @@ func Verify(request *http.Request) (bool, error) {
}
// Test to see if the actor is in the list of blocked federated domains.
if isBlockedDomain(publicKeyActorIRI.Hostname()) {
if api.isBlockedDomain(publicKeyActorIRI.Hostname()) {
return false, errors.New("domain is blocked")
}
// If actor is specifically blocked, then fail validation.
if blocked, err := isBlockedActor(publicKeyActorIRI); err != nil || blocked {
if blocked, err := api.isBlockedActor(publicKeyActorIRI); err != nil || blocked {
return false, err
}
@ -129,8 +127,8 @@ func Verify(request *http.Request) (bool, error) {
return false, fmt.Errorf("http signature verification error(s) for: %s: %+v", pubKeyID.String(), triedAlgos)
}
func isBlockedDomain(domain string) bool {
blockedDomains := configRepository.GetBlockedFederatedDomains()
func (api *APInbox) isBlockedDomain(domain string) bool {
blockedDomains := api.configRepository.GetBlockedFederatedDomains()
for _, blockedDomain := range blockedDomains {
if strings.Contains(domain, blockedDomain) {
@ -141,8 +139,8 @@ func isBlockedDomain(domain string) bool {
return false
}
func isBlockedActor(actorIRI *url.URL) (bool, error) {
blockedactor, err := persistence.GetFollower(actorIRI.String())
func (api *APInbox) isBlockedActor(actorIRI *url.URL) (bool, error) {
blockedactor, err := api.federationRepository.GetFollower(actorIRI.String())
if blockedactor != nil && blockedactor.DisabledAt != nil {
return true, errors.Wrap(err, "remote actor is blocked")

View file

@ -6,10 +6,10 @@ import (
"github.com/go-fed/activity/streams"
"github.com/go-fed/activity/streams/vocab"
"github.com/owncast/owncast/activitypub/apmodels"
"github.com/owncast/owncast/activitypub/persistence"
"github.com/owncast/owncast/services/apfederation/apmodels"
"github.com/owncast/owncast/storage/configrepository"
"github.com/owncast/owncast/storage/data"
"github.com/owncast/owncast/storage/federationrepository"
)
func makeFakePerson() vocab.ActivityStreamsPerson {
@ -54,6 +54,7 @@ func TestMain(m *testing.M) {
panic(err)
}
_ = federationrepository.New(ds)
configRepository := configrepository.New(ds)
configRepository.PopulateDefaults()
configRepository.SetServerURL("https://my.cool.site.biz")
@ -82,17 +83,20 @@ func TestBlockedDomains(t *testing.T) {
}
func TestBlockedActors(t *testing.T) {
federationRepository := federationrepository.Get()
person := makeFakePerson()
fakeRequest := streams.NewActivityStreamsFollow()
persistence.AddFollow(apmodels.ActivityPubActor{
ib := Get()
federationRepository.AddFollow(apmodels.ActivityPubActor{
ActorIri: person.GetJSONLDId().GetIRI(),
Inbox: person.GetJSONLDId().GetIRI(),
FollowRequestIri: person.GetJSONLDId().GetIRI(),
RequestObject: fakeRequest,
}, false)
persistence.BlockOrRejectFollower(person.GetJSONLDId().GetIRI().String())
federationRepository.BlockOrRejectFollower(person.GetJSONLDId().GetIRI().String())
blocked, err := isBlockedActor(person.GetJSONLDId().GetIRI())
blocked, err := ib.isBlockedActor(person.GetJSONLDId().GetIRI())
if err != nil {
t.Error(err)
return
@ -103,7 +107,7 @@ func TestBlockedActors(t *testing.T) {
}
failedBlockIRI, _ := url.Parse("https://freedom.eagle/user/mrbar")
failedBlock, err := isBlockedActor(failedBlockIRI)
failedBlock, err := ib.isBlockedActor(failedBlockIRI)
if failedBlock {
t.Error("Invalid blocking of unblocked actor IRI")

View file

@ -4,6 +4,7 @@ import (
"runtime"
"github.com/owncast/owncast/activitypub/apmodels"
"github.com/owncast/owncast/services/apfederation/apmodels"
log "github.com/sirupsen/logrus"
)
@ -18,7 +19,7 @@ type Job struct {
var queue chan Job
// InitInboxWorkerPool starts n go routines that await ActivityPub jobs.
func InitInboxWorkerPool() {
func (api *APInbox) InitInboxWorkerPool() {
queue = make(chan Job)
// start workers
@ -28,16 +29,16 @@ func InitInboxWorkerPool() {
}
// AddToQueue will queue up an outbound http request.
func AddToQueue(req apmodels.InboxRequest) {
func (api *APInbox) AddToQueue(req apmodels.InboxRequest) {
log.Tracef("Queued request for ActivityPub inbox handler")
queue <- Job{req}
}
func worker(workerID int, queue <-chan Job) {
func (api *APInbox) worker(workerID int, queue <-chan Job) {
log.Debugf("Started ActivityPub worker %d", workerID)
for job := range queue {
handle(job.request)
api.handle(job.request)
log.Tracef("Done with ActivityPub inbox handler using worker %d", workerID)
}

View file

@ -1,4 +1,4 @@
package requests
package outbox
import (
"encoding/json"
@ -6,16 +6,15 @@ import (
"github.com/go-fed/activity/streams"
"github.com/go-fed/activity/streams/vocab"
"github.com/owncast/owncast/activitypub/apmodels"
"github.com/owncast/owncast/activitypub/crypto"
"github.com/owncast/owncast/activitypub/workerpool"
"github.com/owncast/owncast/services/apfederation/apmodels"
"github.com/owncast/owncast/services/apfederation/crypto"
"github.com/teris-io/shortid"
)
// SendFollowAccept will send an accept activity to a follow request from a specified local user.
func SendFollowAccept(inbox *url.URL, originalFollowActivity vocab.ActivityStreamsFollow, fromLocalAccountName string) error {
followAccept := makeAcceptFollow(originalFollowActivity, fromLocalAccountName)
func (apo *APOutbox) SendFollowAccept(inbox *url.URL, originalFollowActivity vocab.ActivityStreamsFollow, fromLocalAccountName string) error {
followAccept := apo.makeAcceptFollow(originalFollowActivity, fromLocalAccountName)
localAccountIRI := apmodels.MakeLocalIRIForAccount(fromLocalAccountName)
var jsonmap map[string]interface{}
@ -26,12 +25,12 @@ func SendFollowAccept(inbox *url.URL, originalFollowActivity vocab.ActivityStrea
return err
}
workerpool.AddToOutboundQueue(req)
apo.workerpool.AddToOutboundQueue(req)
return nil
}
func makeAcceptFollow(originalFollowActivity vocab.ActivityStreamsFollow, fromAccountName string) vocab.ActivityStreamsAccept {
func (r *APOutbox) makeAcceptFollow(originalFollowActivity vocab.ActivityStreamsFollow, fromAccountName string) vocab.ActivityStreamsAccept {
acceptIDString := shortid.MustGenerate()
acceptID := apmodels.MakeLocalIRIForResource(acceptIDString)
actorID := apmodels.MakeLocalIRIForAccount(fromAccountName)

View file

@ -9,6 +9,7 @@ import (
"github.com/go-fed/activity/streams"
"github.com/go-fed/activity/streams/vocab"
<<<<<<< HEAD:activitypub/outbox/outbox.go
"github.com/owncast/owncast/activitypub/apmodels"
"github.com/owncast/owncast/activitypub/crypto"
"github.com/owncast/owncast/activitypub/persistence"
@ -17,20 +18,55 @@ import (
"github.com/owncast/owncast/activitypub/webfinger"
"github.com/owncast/owncast/activitypub/workerpool"
"github.com/owncast/owncast/core/data"
=======
>>>>>>> 4f9fbfba1 (WIP):services/apfederation/outbox/outbox.go
"github.com/owncast/owncast/storage/configrepository"
"github.com/owncast/owncast/storage/federationrepository"
"github.com/pkg/errors"
"github.com/owncast/owncast/services/apfederation/apmodels"
"github.com/owncast/owncast/services/apfederation/crypto"
"github.com/owncast/owncast/services/apfederation/requests"
"github.com/owncast/owncast/services/apfederation/resolvers"
"github.com/owncast/owncast/services/apfederation/webfinger"
"github.com/owncast/owncast/services/apfederation/workerpool"
"github.com/owncast/owncast/services/config"
"github.com/owncast/owncast/utils"
log "github.com/sirupsen/logrus"
"github.com/teris-io/shortid"
)
var configRepository = configrepository.Get()
type APOutbox struct {
configRepository configrepository.ConfigRepository
federationRepository *federationrepository.FederationRepository
resolvers *resolvers.APResolvers
workerpool *workerpool.WorkerPool
requests *requests.Requests
}
func New() *APOutbox {
return &APOutbox{
configRepository: configrepository.Get(),
federationRepository: federationrepository.Get(),
resolvers: resolvers.Get(),
workerpool: workerpool.Get(),
requests: requests.Get(),
}
}
var temporaryGlobalInstance *APOutbox
func Get() *APOutbox {
if temporaryGlobalInstance == nil {
temporaryGlobalInstance = New()
}
return temporaryGlobalInstance
}
// SendLive will send all followers the message saying you started a live stream.
func SendLive() error {
textContent := configRepository.GetFederationGoLiveMessage()
func (apo *APOutbox) SendLive() error {
textContent := apo.configRepository.GetFederationGoLiveMessage()
// If the message is empty then do not send it.
if textContent == "" {
@ -41,11 +77,11 @@ func SendLive() error {
reg := regexp.MustCompile("[^a-zA-Z0-9]+")
tagProp := streams.NewActivityStreamsTagProperty()
for _, tagString := range configRepository.GetServerMetadataTags() {
for _, tagString := range apo.configRepository.GetServerMetadataTags() {
tagWithoutSpecialCharacters := reg.ReplaceAllString(tagString, "")
hashtag := apmodels.MakeHashtag(tagWithoutSpecialCharacters)
tagProp.AppendTootHashtag(hashtag)
tagString := getHashtagLinkHTMLFromTagString(tagWithoutSpecialCharacters)
tagString := apo.getHashtagLinkHTMLFromTagString(tagWithoutSpecialCharacters)
tagStrings = append(tagStrings, tagString)
}
@ -60,15 +96,19 @@ func SendLive() error {
tagsString := strings.Join(tagStrings, " ")
var streamTitle string
if title := configRepository.GetStreamTitle(); title != "" {
if title := apo.configRepository.GetStreamTitle(); title != "" {
streamTitle = fmt.Sprintf("<p>%s</p>", title)
}
<<<<<<< HEAD:activitypub/outbox/outbox.go
textContent = fmt.Sprintf("<p>%s</p>%s<p>%s</p><p><a href=\"%s\">%s</a></p>", textContent, streamTitle, tagsString, data.GetServerURL(), data.GetServerURL())
=======
textContent = fmt.Sprintf("<p>%s</p>%s<p>%s</p><a href=\"%s\">%s</a>", textContent, streamTitle, tagsString, apo.configRepository.GetServerURL(), apo.configRepository.GetServerURL())
>>>>>>> 4f9fbfba1 (WIP):services/apfederation/outbox/outbox.go
activity, _, note, noteID := createBaseOutboundMessage(textContent)
activity, _, note, noteID := apo.createBaseOutboundMessage(textContent)
// To the public if we're not treating ActivityPub as "private".
if !configRepository.GetFederationIsPrivate() {
if !apo.configRepository.GetFederationIsPrivate() {
note = apmodels.MakeNotePublic(note)
activity = apmodels.MakeActivityPublic(activity)
}
@ -76,11 +116,11 @@ func SendLive() error {
note.SetActivityStreamsTag(tagProp)
// Attach an image along with the Federated message.
previewURL, err := url.Parse(configRepository.GetServerURL())
previewURL, err := url.Parse(apo.configRepository.GetServerURL())
if err == nil {
var imageToAttach string
var mediaType string
c := config.GetConfig()
c := config.Get()
previewGif := filepath.Join(c.TempDir, "preview.gif")
thumbnailJpg := filepath.Join(c.TempDir, "thumbnail.jpg")
uniquenessString := shortid.MustGenerate()
@ -98,7 +138,7 @@ func SendLive() error {
}
}
if configRepository.GetNSFW() {
if apo.configRepository.GetNSFW() {
// Mark content as sensitive.
sensitive := streams.NewActivityStreamsSensitiveProperty()
sensitive.AppendXMLSchemaBoolean(true)
@ -111,11 +151,11 @@ func SendLive() error {
return errors.New("unable to serialize go live message activity " + err.Error())
}
if err := SendToFollowers(b); err != nil {
if err := apo.SendToFollowers(b); err != nil {
return err
}
if err := Add(note, noteID, true); err != nil {
if err := apo.Add(note, noteID, true); err != nil {
return err
}
@ -123,20 +163,22 @@ func SendLive() error {
}
// SendDirectMessageToAccount will send a direct message to a single account.
func SendDirectMessageToAccount(textContent, account string) error {
links, err := webfinger.GetWebfingerLinks(account)
func (apo *APOutbox) SendDirectMessageToAccount(textContent, account string) error {
wf := webfinger.Get()
links, err := wf.GetWebfingerLinks(account)
if err != nil {
return errors.Wrap(err, "unable to get webfinger links when sending private message")
}
user := apmodels.MakeWebFingerRequestResponseFromData(links)
iri := user.Self
actor, err := resolvers.GetResolvedActorFromIRI(iri)
actor, err := apo.resolvers.GetResolvedActorFromIRI(iri)
if err != nil {
return errors.Wrap(err, "unable to resolve actor to send message to")
}
activity, _, note, _ := createBaseOutboundMessage(textContent)
activity, _, note, _ := apo.createBaseOutboundMessage(textContent)
// Set direct message visibility
activity = apmodels.MakeActivityDirect(activity, actor.ActorIri)
@ -150,11 +192,11 @@ func SendDirectMessageToAccount(textContent, account string) error {
return errors.Wrap(err, "unable to serialize custom fediverse message activity")
}
return SendToUser(actor.Inbox, b)
return apo.SendToUser(actor.Inbox, b)
}
// SendPublicMessage will send a public message to all followers.
func SendPublicMessage(textContent string) error {
func (apo *APOutbox) SendPublicMessage(textContent string) error {
originalContent := textContent
textContent = utils.RenderSimpleMarkdown(textContent)
@ -166,7 +208,7 @@ func SendPublicMessage(textContent string) error {
tagWithoutHashtag := strings.TrimPrefix(hashtag, "#")
// Replace the instances of the tag with a link to the tag page.
tagHTML := getHashtagLinkHTMLFromTagString(tagWithoutHashtag)
tagHTML := apo.getHashtagLinkHTMLFromTagString(tagWithoutHashtag)
textContent = strings.ReplaceAll(textContent, hashtag, tagHTML)
// Create Hashtag object for the tag.
@ -174,10 +216,10 @@ func SendPublicMessage(textContent string) error {
tagProp.AppendTootHashtag(hashtag)
}
activity, _, note, noteID := createBaseOutboundMessage(textContent)
activity, _, note, noteID := apo.createBaseOutboundMessage(textContent)
note.SetActivityStreamsTag(tagProp)
if !configRepository.GetFederationIsPrivate() {
if !apo.configRepository.GetFederationIsPrivate() {
note = apmodels.MakeNotePublic(note)
activity = apmodels.MakeActivityPublic(activity)
}
@ -188,11 +230,11 @@ func SendPublicMessage(textContent string) error {
return errors.New("unable to serialize custom fediverse message activity " + err.Error())
}
if err := SendToFollowers(b); err != nil {
if err := apo.SendToFollowers(b); err != nil {
return err
}
if err := Add(note, noteID, false); err != nil {
if err := apo.Add(note, noteID, false); err != nil {
return err
}
@ -200,8 +242,8 @@ func SendPublicMessage(textContent string) error {
}
// nolint: unparam
func createBaseOutboundMessage(textContent string) (vocab.ActivityStreamsCreate, string, vocab.ActivityStreamsNote, string) {
localActor := apmodels.MakeLocalIRIForAccount(configRepository.GetDefaultFederationUsername())
func (apo *APOutbox) createBaseOutboundMessage(textContent string) (vocab.ActivityStreamsCreate, string, vocab.ActivityStreamsNote, string) {
localActor := apmodels.MakeLocalIRIForAccount(apo.configRepository.GetDefaultFederationUsername())
noteID := shortid.MustGenerate()
noteIRI := apmodels.MakeLocalIRIForResource(noteID)
id := shortid.MustGenerate()
@ -216,15 +258,15 @@ func createBaseOutboundMessage(textContent string) (vocab.ActivityStreamsCreate,
}
// Get Hashtag HTML link for a given tag (without # prefix).
func getHashtagLinkHTMLFromTagString(baseHashtag string) string {
func (apo *APOutbox) getHashtagLinkHTMLFromTagString(baseHashtag string) string {
return fmt.Sprintf("<a class=\"hashtag\" href=\"https://directory.owncast.online/tags/%s\">#%s</a>", baseHashtag, baseHashtag)
}
// SendToFollowers will send an arbitrary payload to all follower inboxes.
func SendToFollowers(payload []byte) error {
localActor := apmodels.MakeLocalIRIForAccount(configRepository.GetDefaultFederationUsername())
func (apo *APOutbox) SendToFollowers(payload []byte) error {
localActor := apmodels.MakeLocalIRIForAccount(apo.configRepository.GetDefaultFederationUsername())
followers, _, err := persistence.GetFederationFollowers(-1, 0)
followers, _, err := apo.federationRepository.GetFederationFollowers(-1, 0)
if err != nil {
log.Errorln("unable to fetch followers to send to", err)
return errors.New("unable to fetch followers to send payload to")
@ -238,29 +280,29 @@ func SendToFollowers(payload []byte) error {
return errors.New("unable to create outbox request: " + follower.Inbox)
}
workerpool.AddToOutboundQueue(req)
apo.workerpool.AddToOutboundQueue(req)
}
return nil
}
// SendToUser will send a payload to a single specific inbox.
func SendToUser(inbox *url.URL, payload []byte) error {
localActor := apmodels.MakeLocalIRIForAccount(configRepository.GetDefaultFederationUsername())
func (apo *APOutbox) SendToUser(inbox *url.URL, payload []byte) error {
localActor := apmodels.MakeLocalIRIForAccount(apo.configRepository.GetDefaultFederationUsername())
req, err := requests.CreateSignedRequest(payload, inbox, localActor)
req, err := apo.requests.CreateSignedRequest(payload, inbox, localActor)
if err != nil {
return errors.Wrap(err, "unable to create outbox request")
}
workerpool.AddToOutboundQueue(req)
apo.workerpool.AddToOutboundQueue(req)
return nil
}
// UpdateFollowersWithAccountUpdates will send an update to all followers alerting of a profile update.
func UpdateFollowersWithAccountUpdates() error {
func (apo *APOutbox) UpdateFollowersWithAccountUpdates() error {
// Don't do anything if federation is disabled.
if !configRepository.GetFederationEnabled() {
if !apo.configRepository.GetFederationEnabled() {
return nil
}
@ -269,7 +311,7 @@ func UpdateFollowersWithAccountUpdates() error {
activity := apmodels.MakeUpdateActivity(objectID)
actor := streams.NewActivityStreamsPerson()
actorID := apmodels.MakeLocalIRIForAccount(configRepository.GetDefaultFederationUsername())
actorID := apmodels.MakeLocalIRIForAccount(apo.configRepository.GetDefaultFederationUsername())
actorIDProperty := streams.NewJSONLDIdProperty()
actorIDProperty.Set(actorID)
actor.SetJSONLDId(actorIDProperty)
@ -287,11 +329,11 @@ func UpdateFollowersWithAccountUpdates() error {
log.Errorln("unable to serialize send update actor activity", err)
return errors.New("unable to serialize send update actor activity")
}
return SendToFollowers(b)
return apo.SendToFollowers(b)
}
// Add will save an ActivityPub object to the datastore.
func Add(item vocab.Type, id string, isLiveNotification bool) error {
func (apo *APOutbox) Add(item vocab.Type, id string, isLiveNotification bool) error {
iri := item.GetJSONLDId().GetIRI().String()
typeString := item.GetTypeName()
@ -306,5 +348,5 @@ func Add(item vocab.Type, id string, isLiveNotification bool) error {
return err
}
return persistence.AddToOutbox(iri, b, typeString, isLiveNotification)
return apo.federationRepository.AddToOutbox(iri, b, typeString, isLiveNotification)
}

View file

@ -9,14 +9,14 @@ import (
"github.com/go-fed/activity/streams"
"github.com/go-fed/activity/streams/vocab"
"github.com/owncast/owncast/activitypub/crypto"
"github.com/owncast/owncast/services/apfederation/crypto"
"github.com/owncast/owncast/services/config"
log "github.com/sirupsen/logrus"
)
// WriteStreamResponse will write a ActivityPub object to the provided ResponseWriter and sign with the provided key.
func WriteStreamResponse(item vocab.Type, w http.ResponseWriter, publicKey crypto.PublicKey) error {
func (r *Requests) WriteStreamResponse(item vocab.Type, w http.ResponseWriter, publicKey crypto.PublicKey) error {
var jsonmap map[string]interface{}
jsonmap, _ = streams.Serialize(item)
b, err := json.Marshal(jsonmap)
@ -24,21 +24,21 @@ func WriteStreamResponse(item vocab.Type, w http.ResponseWriter, publicKey crypt
return err
}
return WriteResponse(b, w, publicKey)
return r.WriteResponse(b, w, publicKey)
}
// WritePayloadResponse will write any arbitrary object to the provided ResponseWriter and sign with the provided key.
func WritePayloadResponse(payload interface{}, w http.ResponseWriter, publicKey crypto.PublicKey) error {
func (r *Requests) WritePayloadResponse(payload interface{}, w http.ResponseWriter, publicKey crypto.PublicKey) error {
b, err := json.Marshal(payload)
if err != nil {
return err
}
return WriteResponse(b, w, publicKey)
return r.WriteResponse(b, w, publicKey)
}
// WriteResponse will write any arbitrary payload to the provided ResponseWriter and sign with the provided key.
func WriteResponse(payload []byte, w http.ResponseWriter, publicKey crypto.PublicKey) error {
func (r *Requests) WriteResponse(payload []byte, w http.ResponseWriter, publicKey crypto.PublicKey) error {
w.Header().Set("Content-Type", "application/activity+json")
if err := crypto.SignResponse(w, payload, publicKey); err != nil {
@ -56,11 +56,11 @@ func WriteResponse(payload []byte, w http.ResponseWriter, publicKey crypto.Publi
}
// CreateSignedRequest will create a signed POST request of a payload to the provided destination.
func CreateSignedRequest(payload []byte, url *url.URL, fromActorIRI *url.URL) (*http.Request, error) {
func (r *Requests) CreateSignedRequest(payload []byte, url *url.URL, fromActorIRI *url.URL) (*http.Request, error) {
log.Debugln("Sending", string(payload), "to", url)
req, _ := http.NewRequest(http.MethodPost, url.String(), bytes.NewBuffer(payload))
c := config.GetConfig()
c := config.Get()
ua := fmt.Sprintf("%s; https://owncast.online", c.GetReleaseString())
req.Header.Set("User-Agent", ua)
req.Header.Set("Content-Type", "application/activity+json")

View file

@ -0,0 +1,20 @@
package requests
import "github.com/owncast/owncast/services/apfederation/workerpool"
type Requests struct {
outboundWorkerPool *workerpool.WorkerPool
}
func New() *Requests {
return &Requests{}
}
var temporaryGlobalInstance *Requests
func Get() *Requests {
if temporaryGlobalInstance == nil {
temporaryGlobalInstance = New()
}
return temporaryGlobalInstance
}

View file

@ -5,18 +5,18 @@ import (
"fmt"
"github.com/go-fed/activity/streams/vocab"
"github.com/owncast/owncast/activitypub/apmodels"
"github.com/owncast/owncast/services/apfederation/apmodels"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)
func getPersonFromFollow(activity vocab.ActivityStreamsFollow) (apmodels.ActivityPubActor, error) {
return GetResolvedActorFromActorProperty(activity.GetActivityStreamsActor())
func (apr *APResolvers) getPersonFromFollow(activity vocab.ActivityStreamsFollow) (apmodels.ActivityPubActor, error) {
return apr.GetResolvedActorFromActorProperty(activity.GetActivityStreamsActor())
}
// MakeFollowRequest will convert an inbound Follow request to our internal actor model.
func MakeFollowRequest(c context.Context, activity vocab.ActivityStreamsFollow) (*apmodels.ActivityPubActor, error) {
person, err := getPersonFromFollow(activity)
func (apr *APResolvers) MakeFollowRequest(c context.Context, activity vocab.ActivityStreamsFollow) (*apmodels.ActivityPubActor, error) {
person, err := apr.getPersonFromFollow(activity)
if err != nil {
return nil, errors.New("unable to resolve person from follow request: " + err.Error())
}
@ -39,8 +39,8 @@ func MakeFollowRequest(c context.Context, activity vocab.ActivityStreamsFollow)
}
// MakeUnFollowRequest will convert an inbound Unfollow request to our internal actor model.
func MakeUnFollowRequest(c context.Context, activity vocab.ActivityStreamsUndo) *apmodels.ActivityPubActor {
person, err := GetResolvedActorFromActorProperty(activity.GetActivityStreamsActor())
func (apr *APResolvers) MakeUnFollowRequest(c context.Context, activity vocab.ActivityStreamsUndo) *apmodels.ActivityPubActor {
person, err := apr.GetResolvedActorFromActorProperty(activity.GetActivityStreamsActor())
if err != nil {
log.Errorln("unable to resolve person from actor iri", person.ActorIri, err)
return nil

View file

@ -8,15 +8,15 @@ import (
"github.com/go-fed/activity/streams"
"github.com/go-fed/activity/streams/vocab"
"github.com/owncast/owncast/activitypub/apmodels"
"github.com/owncast/owncast/activitypub/crypto"
"github.com/owncast/owncast/services/apfederation/apmodels"
"github.com/owncast/owncast/services/apfederation/crypto"
"github.com/owncast/owncast/storage/configrepository"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)
// Resolve will translate a raw ActivityPub payload and fire the callback associated with that activity type.
func Resolve(c context.Context, data []byte, callbacks ...interface{}) error {
func (apr *APResolvers) Resolve(c context.Context, data []byte, callbacks ...interface{}) error {
jsonResolver, err := streams.NewJSONResolver(callbacks...)
if err != nil {
// Something in the setup was wrong. For example, a callback has an
@ -46,7 +46,7 @@ func Resolve(c context.Context, data []byte, callbacks ...interface{}) error {
}
// ResolveIRI will resolve an IRI ahd call the correct callback for the resolved type.
func ResolveIRI(c context.Context, iri string, callbacks ...interface{}) error {
func (apr *APResolvers) ResolveIRI(c context.Context, iri string, callbacks ...interface{}) error {
log.Debugln("Resolving", iri)
req, _ := http.NewRequest(http.MethodGet, iri, nil)
@ -71,12 +71,12 @@ func ResolveIRI(c context.Context, iri string, callbacks ...interface{}) error {
}
// fmt.Println(string(data))
return Resolve(c, data, callbacks...)
return apr.Resolve(c, data, callbacks...)
}
// GetResolvedActorFromActorProperty resolve an external actor property to a
// fully populated internal actor representation.
func GetResolvedActorFromActorProperty(actor vocab.ActivityStreamsActorProperty) (apmodels.ActivityPubActor, error) {
func (apr *APResolvers) GetResolvedActorFromActorProperty(actor vocab.ActivityStreamsActorProperty) (apmodels.ActivityPubActor, error) {
var err error
var apActor apmodels.ActivityPubActor
resolved := false
@ -89,7 +89,7 @@ func GetResolvedActorFromActorProperty(actor vocab.ActivityStreamsActorProperty)
// If the actor is an unresolved IRI then we need to resolve it.
if actorObjectOrIRI.IsIRI() {
iri := actorObjectOrIRI.GetIRI().String()
return GetResolvedActorFromIRI(iri)
return apr.GetResolvedActorFromIRI(iri)
}
if actorObjectOrIRI.IsActivityStreamsPerson() {
@ -125,7 +125,7 @@ func GetResolvedActorFromActorProperty(actor vocab.ActivityStreamsActorProperty)
}
// GetResolvedPublicKeyFromIRI will resolve a publicKey IRI string to a vocab.W3IDSecurityV1PublicKey.
func GetResolvedPublicKeyFromIRI(publicKeyIRI string) (vocab.W3IDSecurityV1PublicKey, error) {
func (apr *APResolvers) GetResolvedPublicKeyFromIRI(publicKeyIRI string) (vocab.W3IDSecurityV1PublicKey, error) {
var err error
var pubkey vocab.W3IDSecurityV1PublicKey
resolved := false
@ -175,7 +175,7 @@ func GetResolvedPublicKeyFromIRI(publicKeyIRI string) (vocab.W3IDSecurityV1Publi
return nil
}
if e := ResolveIRI(context.Background(), publicKeyIRI, personCallback, serviceCallback, applicationCallback, pubkeyCallback); e != nil {
if e := apr.ResolveIRI(context.Background(), publicKeyIRI, personCallback, serviceCallback, applicationCallback, pubkeyCallback); e != nil {
err = e
}
@ -191,7 +191,7 @@ func GetResolvedPublicKeyFromIRI(publicKeyIRI string) (vocab.W3IDSecurityV1Publi
}
// GetResolvedActorFromIRI will resolve an IRI string to a fully populated actor.
func GetResolvedActorFromIRI(personOrServiceIRI string) (apmodels.ActivityPubActor, error) {
func (apr *APResolvers) GetResolvedActorFromIRI(personOrServiceIRI string) (apmodels.ActivityPubActor, error) {
var err error
var apActor apmodels.ActivityPubActor
resolved := false
@ -222,7 +222,7 @@ func GetResolvedActorFromIRI(personOrServiceIRI string) (apmodels.ActivityPubAct
return e
}
if e := ResolveIRI(context.Background(), personOrServiceIRI, personCallback, serviceCallback, applicationCallback); e != nil {
if e := apr.ResolveIRI(context.Background(), personOrServiceIRI, personCallback, serviceCallback, applicationCallback); e != nil {
err = e
}

View file

@ -0,0 +1,24 @@
package resolvers
import (
"github.com/owncast/owncast/storage/configrepository"
)
type APResolvers struct {
configRepository configrepository.ConfigRepository
}
func New() *APResolvers {
return &APResolvers{
configRepository: configrepository.Get(),
}
}
var temporaryGlobalInstance *APResolvers
func Get() *APResolvers {
if temporaryGlobalInstance == nil {
temporaryGlobalInstance = New()
}
return temporaryGlobalInstance
}

View file

@ -11,8 +11,23 @@ import (
"github.com/owncast/owncast/utils"
)
type Webfinger struct{}
var temporaryGlobalInstance *Webfinger
func Get() *Webfinger {
if temporaryGlobalInstance == nil {
temporaryGlobalInstance = New()
}
return temporaryGlobalInstance
}
func New() *Webfinger {
return &Webfinger{}
}
// GetWebfingerLinks will return webfinger data for an account.
func GetWebfingerLinks(account string) ([]map[string]interface{}, error) {
func (w *Webfinger) GetWebfingerLinks(account string) ([]map[string]interface{}, error) {
type webfingerResponse struct {
Links []map[string]interface{} `json:"links"`
}

View file

@ -15,12 +15,29 @@ type Job struct {
request *http.Request
}
var queue chan Job
type WorkerPool struct {
queue chan Job
}
func New() *WorkerPool {
wp := &WorkerPool{
queue: make(chan Job),
}
wp.initOutboundWorkerPool()
return wp
}
var temporaryGlobalInstance *WorkerPool
func Get() *WorkerPool {
if temporaryGlobalInstance == nil {
temporaryGlobalInstance = New()
}
return temporaryGlobalInstance
}
// InitOutboundWorkerPool starts n go routines that await ActivityPub jobs.
func InitOutboundWorkerPool() {
queue = make(chan Job)
func (wp *WorkerPool) initOutboundWorkerPool() {
// start workers
for i := 1; i <= workerPoolSize; i++ {
go worker(i, queue)
@ -28,23 +45,23 @@ func InitOutboundWorkerPool() {
}
// AddToOutboundQueue will queue up an outbound http request.
func AddToOutboundQueue(req *http.Request) {
func (wp *WorkerPool) AddToOutboundQueue(req *http.Request) {
log.Tracef("Queued request for ActivityPub destination %s", req.RequestURI)
queue <- Job{req}
wp.queue <- Job{req}
}
func worker(workerID int, queue <-chan Job) {
func (wp *WorkerPool) worker(workerID int, queue <-chan Job) {
log.Debugf("Started ActivityPub worker %d", workerID)
for job := range queue {
if err := sendActivityPubMessageToInbox(job); err != nil {
if err := wp.sendActivityPubMessageToInbox(job); err != nil {
log.Errorf("ActivityPub destination %s failed to send Error: %s", job.request.RequestURI, err)
}
log.Tracef("Done with ActivityPub destination %s using worker %d", job.request.RequestURI, workerID)
}
}
func sendActivityPubMessageToInbox(job Job) error {
func (wp *WorkerPool) sendActivityPubMessageToInbox(job Job) error {
client := &http.Client{}
resp, err := client.Do(job.request)

View file

@ -10,12 +10,8 @@ import (
"strings"
"time"
<<<<<<< HEAD
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/utils"
=======
"github.com/owncast/owncast/storage/configrepository"
>>>>>>> 659a19bf2 (WIP refactored all storage into repos. Tests pass.)
"github.com/owncast/owncast/utils"
"github.com/pkg/errors"
log "github.com/sirupsen/logrus"
)
@ -45,7 +41,6 @@ func (c *IndieAuthClient) StartAuthFlow(authHost, userID, accessToken, displayNa
return nil, errors.New("Please try again later. Too many pending requests.")
}
<<<<<<< HEAD
// Reject any requests to our internal network or loopback
if utils.IsHostnameInternal(authHost) {
return nil, errors.New("unable to use provided host")
@ -62,11 +57,8 @@ func (c *IndieAuthClient) StartAuthFlow(authHost, userID, accessToken, displayNa
return nil, errors.New("only servers secured with https are supported")
}
serverURL := data.GetServerURL()
=======
configRepository := configrepository.Get()
serverURL := configRepository.GetServerURL()
>>>>>>> 659a19bf2 (WIP refactored all storage into repos. Tests pass.)
if serverURL == "" {
return nil, errors.New("Owncast server URL must be set when using auth")
}

View file

@ -1,14 +1,16 @@
package events
package chat
import "github.com/owncast/owncast/models"
// ActionEvent represents an action that took place, not a chat message.
type ActionEvent struct {
Event
models.Event
MessageEvent
}
// GetBroadcastPayload will return the object to send to all chat users.
func (e *ActionEvent) GetBroadcastPayload() EventPayload {
return EventPayload{
func (e *ActionEvent) GetBroadcastPayload() models.EventPayload {
return models.EventPayload{
"id": e.ID,
"timestamp": e.Timestamp,
"body": e.Body,
@ -17,6 +19,6 @@ func (e *ActionEvent) GetBroadcastPayload() EventPayload {
}
// GetMessageType will return the type of message.
func (e *ActionEvent) GetMessageType() EventType {
return ChatActionSent
func (e *ActionEvent) GetMessageType() models.EventType {
return models.ChatActionSent
}

227
services/chat/chat.go Normal file
View file

@ -0,0 +1,227 @@
package chat
import (
"fmt"
"net/http"
"sort"
"github.com/owncast/owncast/models"
"github.com/owncast/owncast/storage/chatrepository"
"github.com/owncast/owncast/storage/configrepository"
"github.com/owncast/owncast/storage/userrepository"
log "github.com/sirupsen/logrus"
)
type Chat struct {
getStatus func() *models.Status
server *Server
configRepository *configrepository.SqlConfigRepository
emojis *emojis
}
func New() *Chat {
return &Chat{
configRepository: configrepository.Get(),
emojis: newEmojis(),
}
}
var temporaryGlobalInstance *Chat
// GetConfig returns the temporary global instance.
// Remove this after dependency injection is implemented.
func Get() *Chat {
if temporaryGlobalInstance == nil {
temporaryGlobalInstance = New()
}
return temporaryGlobalInstance
}
// Start begins the chat server.
func (c *Chat) Start(getStatusFunc func() *models.Status) error {
c.getStatus = getStatusFunc
c.server = NewChat()
go c.server.Run()
log.Traceln("Chat server started with max connection count of", c.server.maxSocketConnectionLimit)
return nil
}
// FindClientByID will return a single connected client by ID.
func (c *Chat) FindClientByID(clientID uint) (*Client, bool) {
client, found := c.server.clients[clientID]
return client, found
}
// GetClients will return all the current chat clients connected.
func (c *Chat) GetClients() []*Client {
clients := []*Client{}
if c.server == nil {
return clients
}
// Convert the keyed map to a slice.
for _, client := range c.server.clients {
clients = append(clients, client)
}
sort.Slice(clients, func(i, j int) bool {
return clients[i].ConnectedAt.Before(clients[j].ConnectedAt)
})
return clients
}
// SendSystemMessage will send a message string as a system message to all clients.
func (c *Chat) SendSystemMessage(text string, ephemeral bool) error {
message := SystemMessageEvent{
MessageEvent: MessageEvent{
Body: text,
},
}
message.SetDefaults()
message.RenderBody()
message.DisplayName = c.configRepository.GetServerName()
if err := c.Broadcast(&message); err != nil {
log.Errorln("error sending system message", err)
}
if !ephemeral {
cr := chatrepository.Get()
cr.SaveEvent(message.ID, nil, message.Body, message.GetMessageType(), nil, message.Timestamp, nil, nil, nil, nil)
}
return nil
}
// SendFediverseAction will send a message indicating some Fediverse engagement took place.
func (c *Chat) SendFediverseAction(eventType string, userAccountName string, image *string, body string, link string) error {
message := FediverseEngagementEvent{
Event: models.Event{
Type: eventType,
},
MessageEvent: MessageEvent{
Body: body,
},
UserAccountName: userAccountName,
Image: image,
Link: link,
}
message.SetDefaults()
message.RenderBody()
if err := c.Broadcast(&message); err != nil {
log.Errorln("error sending system message", err)
return err
}
cr := chatrepository.Get()
cr.SaveFederatedAction(message)
return nil
}
// SendSystemAction will send a system action string as an action event to all clients.
func (c *Chat) SendSystemAction(text string, ephemeral bool) error {
message := ActionEvent{
MessageEvent: MessageEvent{
Body: text,
},
}
message.SetDefaults()
message.RenderBody()
if err := c.Broadcast(&message); err != nil {
log.Errorln("error sending system chat action")
}
if !ephemeral {
cr := chatrepository.Get()
cr.SaveEvent(message.ID, nil, message.Body, message.GetMessageType(), nil, message.Timestamp, nil, nil, nil, nil)
}
return nil
}
// SendAllWelcomeMessage will send the chat message to all connected clients.
func (c *Chat) SendAllWelcomeMessage() {
c.server.sendAllWelcomeMessage()
}
// SendSystemMessageToClient will send a single message to a single connected chat client.
func (c *Chat) SendSystemMessageToClient(clientID uint, text string) {
if client, foundClient := c.FindClientByID(clientID); foundClient {
c.server.sendSystemMessageToClient(client, text)
}
}
// Broadcast will send all connected clients the outbound object provided.
func (c *Chat) Broadcast(event OutboundEvent) error {
return c.server.Broadcast(event.GetBroadcastPayload())
}
// HandleClientConnection handles a single inbound websocket connection.
func (c *Chat) HandleClientConnection(w http.ResponseWriter, r *http.Request) {
c.server.HandleClientConnection(w, r)
}
// DisconnectClients will forcefully disconnect all clients belonging to a user by ID.
func (c *Chat) DisconnectClients(clients []*Client) {
c.server.DisconnectClients(clients)
}
func (c *Chat) GetClientsForUser(userID string) ([]*Client, error) {
return c.server.GetClientsForUser(userID)
}
// SendConnectedClientInfoToUser will find all the connected clients assigned to a user
// and re-send each the connected client info.
func (c *Chat) SendConnectedClientInfoToUser(userID string) error {
clients, err := c.server.GetClientsForUser(userID)
if err != nil {
return err
}
userRepository := userrepository.Get()
// Get an updated reference to the user.
user := userRepository.GetUserByID(userID)
if user == nil {
return fmt.Errorf("user not found")
}
if err != nil {
return err
}
for _, client := range clients {
// Update the client's reference to its user.
client.User = user
// Send the update to the client.
client.sendConnectedClientInfo()
}
return nil
}
// SendActionToUser will send system action text to all connected clients
// assigned to a user ID.
func (c *Chat) SendActionToUser(userID string, text string) error {
clients, err := c.server.GetClientsForUser(userID)
if err != nil {
return err
}
for _, client := range clients {
c.server.sendActionToClient(client, text)
}
return nil
}

View file

@ -11,7 +11,6 @@ import (
"golang.org/x/time/rate"
"github.com/gorilla/websocket"
"github.com/owncast/owncast/core/chat/events"
"github.com/owncast/owncast/models"
"github.com/owncast/owncast/services/config"
"github.com/owncast/owncast/services/geoip"
@ -75,9 +74,9 @@ var (
)
func (c *Client) sendConnectedClientInfo() {
payload := events.ConnectedClientInfo{
Event: events.Event{
Type: events.ConnectedUserInfo,
payload := models.ConnectedClientInfo{
Event: models.Event{
Type: models.ConnectedUserInfo,
},
User: c.User,
}
@ -237,8 +236,8 @@ func (c *Client) sendPayload(payload interface{}) {
}
func (c *Client) sendAction(message string) {
clientMessage := events.ActionEvent{
MessageEvent: events.MessageEvent{
clientMessage := ActionEvent{
MessageEvent: MessageEvent{
Body: message,
},
}

131
services/chat/emoji.go Normal file
View file

@ -0,0 +1,131 @@
package chat
import (
"bytes"
"strings"
"sync"
"text/template"
"time"
emojiDef "github.com/yuin/goldmark-emoji/definition"
)
// implements the emojiDef.Emojis interface but uses case-insensitive search.
// the .children field isn't currently used, but could be used in a future
// implementation of say, emoji packs where a child represents a pack.
type emojis struct {
list []emojiDef.Emoji
names map[string]*emojiDef.Emoji
children []emojiDef.Emojis
emojiMu sync.Mutex
emojiDefs emojiDef.Emojis
emojiHTML map[string]string
emojiModTime time.Time
emojiHTMLFormat string
emojiHTMLTemplate *template.Template
}
// return a new Emojis set.
func newEmojis(emotes ...emojiDef.Emoji) *emojis {
loadEmoji()
e := &emojis{
list: emotes,
names: map[string]*emojiDef.Emoji{},
children: []emojiDef.Emojis{},
emojiMu: sync.Mutex{},
emojiHTML: make(map[string]string),
emojiModTime: time.Now(),
emojiHTMLFormat: `<img src="{{ .URL }}" class="emoji" alt=":{{ .Name }}:" title=":{{ .Name }}:">`,
emojiHTMLTemplate: template.Must(template.New("emojiHTML").Parse(emojiHTMLFormat)),
}
for i := range e.list {
emoji := &e.list[i]
for _, s := range emoji.ShortNames {
e.names[s] = emoji
}
}
return e
}
func (self *emojis) Get(shortName string) (*emojiDef.Emoji, bool) {
v, ok := self.names[strings.ToLower(shortName)]
if ok {
return v, ok
}
for _, child := range self.children {
v, ok := child.Get(shortName)
if ok {
return v, ok
}
}
return nil, false
}
func (self *emojis) Add(emotes emojiDef.Emojis) {
self.children = append(self.children, emotes)
}
func (self *emojis) Clone() emojiDef.Emojis {
clone := &emojis{
list: self.list,
names: self.names,
children: make([]emojiDef.Emojis, len(self.children)),
}
copy(clone.children, self.children)
return clone
}
var (
emojiMu sync.Mutex
// emojiDefs = newEmojis()
// emojiHTML = make(map[string]string)
emojiModTime time.Time
emojiHTMLFormat = `<img src="{{ .URL }}" class="emoji" alt=":{{ .Name }}:" title=":{{ .Name }}:">`
emojiHTMLTemplate = template.Must(template.New("emojiHTML").Parse(emojiHTMLFormat))
)
type emojiMeta struct {
emojiDefs emojiDef.Emojis
emojiHTML map[string]string
}
func loadEmoji() emojiDef.Emojis {
modTime, err := data.UpdateEmojiList(false)
if err != nil {
return
}
emojiArr := make([]emojiDef.Emoji, 0)
if modTime.After(emojiModTime) {
emojiMu.Lock()
defer emojiMu.Unlock()
emojiHTML := make(map[string]string)
emojiList := data.GetEmojiList()
for i := 0; i < len(emojiList); i++ {
var buf bytes.Buffer
err := emojiHTMLTemplate.Execute(&buf, emojiList[i])
if err != nil {
return
}
emojiHTML[strings.ToLower(emojiList[i].Name)] = buf.String()
emoji := emojiDef.NewEmoji(emojiList[i].Name, nil, strings.ToLower(emojiList[i].Name))
emojiArr = append(emojiArr, emoji)
}
}
emojiDefs := newEmojis(emojiArr...)
return emojiDefs
}

View file

@ -6,16 +6,18 @@ import (
"strings"
"time"
"github.com/owncast/owncast/core/chat/events"
"github.com/owncast/owncast/models"
"github.com/owncast/owncast/services/config"
"github.com/owncast/owncast/services/status"
"github.com/owncast/owncast/services/webhooks"
"github.com/owncast/owncast/storage"
"github.com/owncast/owncast/storage/chatrepository"
"github.com/owncast/owncast/storage/userrepository"
"github.com/owncast/owncast/utils"
log "github.com/sirupsen/logrus"
)
func (s *Server) userNameChanged(eventData chatClientEvent) {
var receivedEvent events.NameChangeEvent
var receivedEvent models.NameChangeEvent
if err := json.Unmarshal(eventData.data, &receivedEvent); err != nil {
log.Errorln("error unmarshalling to NameChangeEvent", err)
return
@ -24,8 +26,8 @@ func (s *Server) userNameChanged(eventData chatClientEvent) {
proposedUsername := receivedEvent.NewName
// Check if name is on the blocklist
blocklist := configRepository.GetForbiddenUsernameList()
userRepository := storage.GetUserRepository()
blocklist := s.configRepository.GetForbiddenUsernameList()
userRepository := userrepository.Get()
// Names have a max length
proposedUsername = utils.MakeSafeStringOfLength(proposedUsername, config.MaxChatDisplayNameLength)
@ -82,7 +84,7 @@ func (s *Server) userNameChanged(eventData chatClientEvent) {
// Send chat event letting everyone about about the name change
savedUser.DisplayName = proposedUsername
broadcastEvent := events.NameChangeBroadcast{
broadcastEvent := models.NameChangeBroadcast{
Oldname: oldName,
}
broadcastEvent.User = savedUser
@ -104,7 +106,7 @@ func (s *Server) userNameChanged(eventData chatClientEvent) {
}
func (s *Server) userColorChanged(eventData chatClientEvent) {
var receivedEvent events.ColorChangeEvent
var receivedEvent models.ColorChangeEvent
if err := json.Unmarshal(eventData.data, &receivedEvent); err != nil {
log.Errorln("error unmarshalling to ColorChangeEvent", err)
return
@ -115,7 +117,7 @@ func (s *Server) userColorChanged(eventData chatClientEvent) {
log.Errorln("invalid color requested when changing user display color")
return
}
userRepository := storage.GetUserRepository()
userRepository := userrepository.Get()
// Save the new color
if err := userRepository.ChangeUserColor(eventData.client.User.ID, receivedEvent.NewColor); err != nil {
@ -128,7 +130,7 @@ func (s *Server) userColorChanged(eventData chatClientEvent) {
}
func (s *Server) userMessageSent(eventData chatClientEvent) {
var event events.UserMessageEvent
var event UserMessageEvent
if err := json.Unmarshal(eventData.data, &event); err != nil {
log.Errorln("error unmarshalling to UserMessageEvent", err)
return
@ -142,15 +144,17 @@ func (s *Server) userMessageSent(eventData chatClientEvent) {
return
}
st := status.Get()
// Ignore if the stream has been offline
if !getStatus().Online && getStatus().LastDisconnectTime != nil {
disconnectedTime := getStatus().LastDisconnectTime.Time
if st.Online && st.Status.LastDisconnectTime != nil {
disconnectedTime := st.Status.LastDisconnectTime.Time
if time.Since(disconnectedTime) > 5*time.Minute {
return
}
}
userRepository := storage.GetUserRepository()
userRepository := userrepository.Get()
event.User = userRepository.GetUserByToken(eventData.client.accessToken)
@ -168,9 +172,10 @@ func (s *Server) userMessageSent(eventData chatClientEvent) {
// Send chat message sent webhook
webhookManager := webhooks.Get()
webhookManager.SendChatEvent(&event)
chatMessagesSentCounter.Inc()
s.chatMessagesSentCounter.Inc()
SaveUserMessage(event)
cr := chatrepository.Get()
cr.SaveUserMessage(event)
eventData.client.MessageCount++
}

View file

@ -1,23 +1,19 @@
package events
package chat
import (
"github.com/owncast/owncast/storage/configrepository"
)
import "github.com/owncast/owncast/models"
// FediverseEngagementEvent is a message displayed in chat on representing an action on the Fediverse.
type FediverseEngagementEvent struct {
Event
models.Event
MessageEvent
Image *string `json:"image"`
Link string `json:"link"`
UserAccountName string `json:"title"`
}
var configRepository = configrepository.Get()
// GetBroadcastPayload will return the object to send to all chat users.
func (e *FediverseEngagementEvent) GetBroadcastPayload() EventPayload {
return EventPayload{
func (e *FediverseEngagementEvent) GetBroadcastPayload() models.EventPayload {
return models.EventPayload{
"id": e.ID,
"timestamp": e.Timestamp,
"body": e.Body,
@ -25,13 +21,13 @@ func (e *FediverseEngagementEvent) GetBroadcastPayload() EventPayload {
"type": e.Event.Type,
"title": e.UserAccountName,
"link": e.Link,
"user": EventPayload{
"displayName": configRepository.GetServerName(),
"user": models.EventPayload{
"displayName": "Owncast",
},
}
}
// GetMessageType will return the event type for this message.
func (e *FediverseEngagementEvent) GetMessageType() EventType {
func (e *FediverseEngagementEvent) GetMessageType() models.EventType {
return e.Event.Type
}

View file

@ -1,50 +1,27 @@
package events
package chat
import (
"bytes"
"regexp"
"strings"
"sync"
"text/template"
"time"
"github.com/microcosm-cc/bluemonday"
"github.com/owncast/owncast/models"
"github.com/teris-io/shortid"
"github.com/yuin/goldmark"
emoji "github.com/yuin/goldmark-emoji"
emojiAst "github.com/yuin/goldmark-emoji/ast"
emojiDef "github.com/yuin/goldmark-emoji/definition"
"github.com/yuin/goldmark/extension"
"github.com/yuin/goldmark/renderer/html"
"github.com/yuin/goldmark/util"
"mvdan.cc/xurls"
"github.com/owncast/owncast/core/data"
log "github.com/sirupsen/logrus"
)
// EventPayload is a generic key/value map for sending out to chat clients.
type EventPayload map[string]interface{}
// OutboundEvent represents an event that is sent out to all listeners of the chat server.
type OutboundEvent interface {
GetBroadcastPayload() EventPayload
GetMessageType() EventType
}
// Event is any kind of event. A type is required to be specified.
type Event struct {
Timestamp time.Time `json:"timestamp"`
Type EventType `json:"type,omitempty"`
ID string `json:"id"`
}
// UserEvent is an event with an associated user.
type UserEvent struct {
User *models.User `json:"user"`
HiddenAt *time.Time `json:"hiddenAt,omitempty"`
ClientID uint `json:"clientId,omitempty"`
GetBroadcastPayload() models.EventPayload
GetMessageType() models.EventType
}
// MessageEvent is an event that has a message body.
@ -56,122 +33,10 @@ type MessageEvent struct {
// SystemActionEvent is an event that represents an action that took place, not a chat message.
type SystemActionEvent struct {
Event
models.Event
MessageEvent
}
// SetDefaults will set default properties of all inbound events.
func (e *Event) SetDefaults() {
e.ID = shortid.MustGenerate()
e.Timestamp = time.Now()
}
// SetDefaults will set default properties of all inbound events.
func (e *UserMessageEvent) SetDefaults() {
e.ID = shortid.MustGenerate()
e.Timestamp = time.Now()
e.RenderAndSanitizeMessageBody()
}
// implements the emojiDef.Emojis interface but uses case-insensitive search.
// the .children field isn't currently used, but could be used in a future
// implementation of say, emoji packs where a child represents a pack.
type emojis struct {
list []emojiDef.Emoji
names map[string]*emojiDef.Emoji
children []emojiDef.Emojis
}
// return a new Emojis set.
func newEmojis(emotes ...emojiDef.Emoji) emojiDef.Emojis {
self := &emojis{
list: emotes,
names: map[string]*emojiDef.Emoji{},
children: []emojiDef.Emojis{},
}
for i := range self.list {
emoji := &self.list[i]
for _, s := range emoji.ShortNames {
self.names[s] = emoji
}
}
return self
}
func (self *emojis) Get(shortName string) (*emojiDef.Emoji, bool) {
v, ok := self.names[strings.ToLower(shortName)]
if ok {
return v, ok
}
for _, child := range self.children {
v, ok := child.Get(shortName)
if ok {
return v, ok
}
}
return nil, false
}
func (self *emojis) Add(emotes emojiDef.Emojis) {
self.children = append(self.children, emotes)
}
func (self *emojis) Clone() emojiDef.Emojis {
clone := &emojis{
list: self.list,
names: self.names,
children: make([]emojiDef.Emojis, len(self.children)),
}
copy(clone.children, self.children)
return clone
}
var (
emojiMu sync.Mutex
emojiDefs = newEmojis()
emojiHTML = make(map[string]string)
emojiModTime time.Time
emojiHTMLFormat = `<img src="{{ .URL }}" class="emoji" alt=":{{ .Name }}:" title=":{{ .Name }}:">`
emojiHTMLTemplate = template.Must(template.New("emojiHTML").Parse(emojiHTMLFormat))
)
func loadEmoji() {
modTime, err := data.UpdateEmojiList(false)
if err != nil {
return
}
if modTime.After(emojiModTime) {
emojiMu.Lock()
defer emojiMu.Unlock()
emojiHTML = make(map[string]string)
emojiList := data.GetEmojiList()
emojiArr := make([]emojiDef.Emoji, 0)
for i := 0; i < len(emojiList); i++ {
var buf bytes.Buffer
err := emojiHTMLTemplate.Execute(&buf, emojiList[i])
if err != nil {
return
}
emojiHTML[strings.ToLower(emojiList[i].Name)] = buf.String()
emoji := emojiDef.NewEmoji(emojiList[i].Name, nil, strings.ToLower(emojiList[i].Name))
emojiArr = append(emojiArr, emoji)
}
emojiDefs = newEmojis(emojiArr...)
}
}
// RenderAndSanitizeMessageBody will turn markdown into HTML, sanitize raw user-supplied HTML and standardize
// the message into something safe and renderable for clients.
func (m *MessageEvent) RenderAndSanitizeMessageBody() {
@ -204,10 +69,8 @@ func RenderAndSanitize(raw string) string {
// RenderMarkdown will return HTML rendered from the string body of a chat message.
func RenderMarkdown(raw string) string {
loadEmoji()
emojiMu.Lock()
defer emojiMu.Unlock()
// emojiMu.Lock()
// defer emojiMu.Unlock()
markdown := goldmark.New(
goldmark.WithRendererOptions(

View file

@ -4,6 +4,7 @@ import (
"testing"
"github.com/owncast/owncast/core/chat/events"
"github.com/owncast/owncast/models"
)
// Test a bunch of arbitrary markup and markdown to make sure we get sanitized
@ -54,7 +55,7 @@ func TestAllowEmojiImages(t *testing.T) {
func TestAllowHTML(t *testing.T) {
messageContent := `<img src="/img/emoji/beerparrot.gif"><ul><li>**test thing**</li></ul>`
expected := "<p><img src=\"/img/emoji/beerparrot.gif\"><ul><li><strong>test thing</strong></li></ul></p>\n"
result := events.RenderMarkdown(messageContent)
result := models.RenderMarkdown(messageContent)
if result != expected {
t.Errorf("message rendering does not match expected. Got\n%s, \n\n want:\n%s", result, expected)

View file

@ -3,29 +3,32 @@ package chat
import (
"errors"
"github.com/owncast/owncast/core/chat/events"
"github.com/owncast/owncast/models"
"github.com/owncast/owncast/services/webhooks"
"github.com/owncast/owncast/storage/chatrepository"
log "github.com/sirupsen/logrus"
)
// SetMessagesVisibility will set the visibility of multiple messages by ID.
func SetMessagesVisibility(messageIDs []string, visibility bool) error {
func (c *Chat) SetMessagesVisibility(messageIDs []string, visibility bool) error {
cr := chatrepository.Get()
// Save new message visibility
if err := saveMessageVisibility(messageIDs, visibility); err != nil {
if err := cr.SaveMessageVisibility(messageIDs, visibility); err != nil {
log.Errorln(err)
return err
}
// Send an event letting the chat clients know to hide or show
// the messages.
event := events.SetMessageVisibilityEvent{
event := models.SetMessageVisibilityEvent{
MessageIDs: messageIDs,
Visible: visibility,
}
event.Event.SetDefaults()
payload := event.GetBroadcastPayload()
if err := _server.Broadcast(payload); err != nil {
if err := c.server.Broadcast(payload); err != nil {
return errors.New("error broadcasting message visibility payload " + err.Error())
}

View file

@ -2,21 +2,26 @@ package chat
import (
"encoding/json"
"errors"
"fmt"
"net/http"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
log "github.com/sirupsen/logrus"
"github.com/gorilla/websocket"
"github.com/owncast/owncast/core/chat/events"
"github.com/owncast/owncast/models"
"github.com/owncast/owncast/services/config"
"github.com/owncast/owncast/services/geoip"
"github.com/owncast/owncast/services/status"
"github.com/owncast/owncast/services/webhooks"
"github.com/owncast/owncast/storage"
"github.com/owncast/owncast/storage/chatrepository"
"github.com/owncast/owncast/storage/configrepository"
"github.com/owncast/owncast/storage/userrepository"
"github.com/owncast/owncast/utils"
)
@ -41,25 +46,42 @@ type Server struct {
userPartedTimers map[string]*time.Ticker
seq uint
maxSocketConnectionLimit int64
chatMessagesSentCounter prometheus.Gauge
mu sync.RWMutex
// a map of user IDs and when they last were active.
lastSeenCache map[string]time.Time
mu sync.RWMutex
config *config.Config
configRepository *configrepository.SqlConfigRepository
chatRepository *chatrepository.ChatRepository
}
// NewChat will return a new instance of the chat server.
func NewChat() *Server {
maximumConcurrentConnectionLimit := getMaximumConcurrentConnectionLimit()
setSystemConcurrentConnectionLimit(maximumConcurrentConnectionLimit)
server := &Server{
clients: map[uint]*Client{},
outbound: make(chan []byte),
inbound: make(chan chatClientEvent),
unregister: make(chan uint),
maxSocketConnectionLimit: maximumConcurrentConnectionLimit,
maxSocketConnectionLimit: 100, // TODO: Set this properly!
lastSeenCache: map[string]time.Time{},
geoipClient: geoip.NewClient(),
userPartedTimers: map[string]*time.Ticker{},
config: config.Get(),
configRepository: configrepository.Get(),
chatRepository: chatrepository.Get(),
}
server.chatMessagesSentCounter = promauto.NewGauge(prometheus.GaugeOpts{
Name: "total_chat_message_count",
Help: "The number of chat messages incremented over time.",
ConstLabels: map[string]string{
"version": server.config.VersionNumber,
"host": server.configRepository.GetServerURL(),
},
})
return server
}
@ -94,11 +116,7 @@ func (s *Server) Addclient(conn *websocket.Conn, user *models.User, accessToken
ConnectedAt: time.Now(),
}
// Do not send user re-joined broadcast message if they've been active within 10 minutes.
shouldSendJoinedMessages := configRepository.GetChatJoinPartMessagesEnabled()
if previouslyLastSeen, ok := _lastSeenCache[user.ID]; ok && time.Since(previouslyLastSeen) < time.Minute*10 {
shouldSendJoinedMessages = false
}
shouldSendJoinedMessages := s.configRepository.GetChatJoinPartMessagesEnabled()
s.mu.Lock()
{
@ -108,7 +126,6 @@ func (s *Server) Addclient(conn *websocket.Conn, user *models.User, accessToken
if ticker, ok := s.userPartedTimers[user.ID]; ok {
ticker.Stop()
delete(s.userPartedTimers, user.ID)
shouldSendJoinedMessages = false
}
client.Id = s.seq
@ -124,7 +141,9 @@ func (s *Server) Addclient(conn *websocket.Conn, user *models.User, accessToken
client.sendConnectedClientInfo()
if getStatus().Online {
st := status.Get()
if st.Online {
if shouldSendJoinedMessages {
s.sendUserJoinedMessage(client)
}
@ -140,7 +159,7 @@ func (s *Server) Addclient(conn *websocket.Conn, user *models.User, accessToken
}
func (s *Server) sendUserJoinedMessage(c *Client) {
userJoinedEvent := events.UserJoinedEvent{}
userJoinedEvent := models.UserJoinedEvent{}
userJoinedEvent.SetDefaults()
userJoinedEvent.User = c.User
userJoinedEvent.ClientID = c.Id
@ -154,13 +173,31 @@ func (s *Server) sendUserJoinedMessage(c *Client) {
webhookManager.SendChatEventUserJoined(userJoinedEvent)
}
// getClientsForUser will return chat connections that are owned by a specific user.
func (s *Server) GetClientsForUser(userID string) ([]*Client, error) {
s.mu.Lock()
defer s.mu.Unlock()
clients := map[string][]*Client{}
for _, client := range s.clients {
clients[client.User.ID] = append(clients[client.User.ID], client)
}
if _, exists := clients[userID]; !exists {
return nil, errors.New("no connections for user found")
}
return clients[userID], nil
}
func (s *Server) handleClientDisconnected(c *Client) {
if _, ok := s.clients[c.Id]; ok {
log.Debugln("Deleting", c.Id)
delete(s.clients, c.Id)
}
additionalClientCheck, _ := GetClientsForUser(c.User.ID)
additionalClientCheck, _ := s.GetClientsForUser(c.User.ID)
if len(additionalClientCheck) > 0 {
// This user is still connected to chat with another client.
return
@ -178,13 +215,13 @@ func (s *Server) sendUserPartedMessage(c *Client) {
s.userPartedTimers[c.User.ID].Stop()
delete(s.userPartedTimers, c.User.ID)
userPartEvent := events.UserPartEvent{}
userPartEvent := UserPartEvent{}
userPartEvent.SetDefaults()
userPartEvent.User = c.User
userPartEvent.ClientID = c.Id
// If part messages are disabled.
if data.GetChatJoinPartMessagesEnabled() {
if s.configRepository.GetChatJoinPartMessagesEnabled() {
if err := s.Broadcast(userPartEvent.GetBroadcastPayload()); err != nil {
log.Errorln("error sending chat part message", err)
}
@ -195,14 +232,17 @@ func (s *Server) sendUserPartedMessage(c *Client) {
// HandleClientConnection is fired when a single client connects to the websocket.
func (s *Server) HandleClientConnection(w http.ResponseWriter, r *http.Request) {
if configRepository.GetChatDisabled() {
_, _ = w.Write([]byte(events.ChatDisabled))
cr := configrepository.Get()
chatRepository := chatrepository.Get()
if cr.GetChatDisabled() {
_, _ = w.Write([]byte(models.ChatDisabled))
return
}
ipAddress := utils.GetIPAddressFromRequest(r)
// Check if this client's IP address is banned. If so send a rejection.
if blocked, err := configRepository.IsIPAddressBanned(ipAddress); blocked {
if blocked, err := chatRepository.IsIPAddressBanned(ipAddress); blocked {
log.Debugln("Client ip address has been blocked. Rejecting.")
w.WriteHeader(http.StatusForbidden)
@ -214,7 +254,7 @@ func (s *Server) HandleClientConnection(w http.ResponseWriter, r *http.Request)
// Limit concurrent chat connections
if int64(len(s.clients)) >= s.maxSocketConnectionLimit {
log.Warnln("rejecting incoming client connection as it exceeds the max client count of", s.maxSocketConnectionLimit)
_, _ = w.Write([]byte(events.ErrorMaxConnectionsExceeded))
_, _ = w.Write([]byte(models.ErrorMaxConnectionsExceeded))
return
}
@ -237,14 +277,14 @@ func (s *Server) HandleClientConnection(w http.ResponseWriter, r *http.Request)
return
}
userRepository := storage.GetUserRepository()
userRepository := userrepository.Get()
// A user is required to use the websocket
user := userRepository.GetUserByToken(accessToken)
if user == nil {
// Send error that registration is required
_ = conn.WriteJSON(events.EventPayload{
"type": events.ErrorNeedsRegistration,
_ = conn.WriteJSON(models.EventPayload{
"type": models.ErrorNeedsRegistration,
})
_ = conn.Close()
return
@ -253,8 +293,8 @@ func (s *Server) HandleClientConnection(w http.ResponseWriter, r *http.Request)
// User is disabled therefore we should disconnect.
if user.DisabledAt != nil {
log.Traceln("Disabled user", user.ID, user.DisplayName, "rejected")
_ = conn.WriteJSON(events.EventPayload{
"type": events.ErrorUserDisabled,
_ = conn.WriteJSON(models.EventPayload{
"type": models.ErrorUserDisabled,
})
_ = conn.Close()
return
@ -266,7 +306,7 @@ func (s *Server) HandleClientConnection(w http.ResponseWriter, r *http.Request)
}
// Broadcast sends message to all connected clients.
func (s *Server) Broadcast(payload events.EventPayload) error {
func (s *Server) Broadcast(payload models.EventPayload) error {
data, err := json.Marshal(payload)
if err != nil {
return err
@ -291,7 +331,7 @@ func (s *Server) Broadcast(payload events.EventPayload) error {
}
// Send will send a single payload to a single connected client.
func (s *Server) Send(payload events.EventPayload, client *Client) {
func (s *Server) Send(payload models.EventPayload, client *Client) {
data, err := json.Marshal(payload)
if err != nil {
log.Errorln(err)
@ -307,12 +347,12 @@ func (s *Server) DisconnectClients(clients []*Client) {
log.Traceln("Disconnecting client", client.User.ID, "owned by", client.User.DisplayName)
go func(client *Client) {
event := events.UserDisabledEvent{}
event := models.UserDisabledEvent{}
event.SetDefaults()
// Send this disabled event specifically to this single connected client
// to let them know they've been banned.
_server.Send(event.GetBroadcastPayload(), client)
s.Send(event.GetBroadcastPayload(), client)
// Give the socket time to send out the above message.
// Unfortunately I don't know of any way to get a real callback to know when
@ -327,58 +367,15 @@ func (s *Server) DisconnectClients(clients []*Client) {
}
}
// SendConnectedClientInfoToUser will find all the connected clients assigned to a user
// and re-send each the connected client info.
func SendConnectedClientInfoToUser(userID string) error {
clients, err := GetClientsForUser(userID)
if err != nil {
return err
}
userRepository := storage.GetUserRepository()
// Get an updated reference to the user.
user := userRepository.GetUserByID(userID)
if user == nil {
return fmt.Errorf("user not found")
}
if err != nil {
return err
}
for _, client := range clients {
// Update the client's reference to its user.
client.User = user
// Send the update to the client.
client.sendConnectedClientInfo()
}
return nil
}
// SendActionToUser will send system action text to all connected clients
// assigned to a user ID.
func SendActionToUser(userID string, text string) error {
clients, err := GetClientsForUser(userID)
if err != nil {
return err
}
for _, client := range clients {
_server.sendActionToClient(client, text)
}
return nil
}
func (s *Server) eventReceived(event chatClientEvent) {
c := event.client
u := c.User
cr := configrepository.Get()
// If established chat user only mode is enabled and the user is not old
// enough then reject this event and send them an informative message.
if u != nil && configRepository.GetChatEstbalishedUsersOnlyMode() && time.Since(event.client.User.CreatedAt) < config.GetDefaults().ChatEstablishedUserModeTimeDuration && !u.IsModerator() {
if u != nil && cr.GetChatEstbalishedUsersOnlyMode() && time.Since(event.client.User.CreatedAt) < config.GetDefaults().ChatEstablishedUserModeTimeDuration && !u.IsModerator() {
s.sendActionToClient(c, "You have not been an established chat participant long enough to take part in chat. Please enjoy the stream and try again later.")
return
}
@ -391,13 +388,13 @@ func (s *Server) eventReceived(event chatClientEvent) {
eventType := typecheck["type"]
switch eventType {
case events.MessageSent:
case models.MessageSent:
s.userMessageSent(event)
case events.UserNameChanged:
case models.UserNameChanged:
s.userNameChanged(event)
case events.UserColorChanged:
case models.UserColorChanged:
s.userColorChanged(event)
default:
log.Debugln(logSanitize(fmt.Sprint(eventType)), "event not found:", logSanitize(fmt.Sprint(typecheck)))
@ -407,8 +404,9 @@ func (s *Server) eventReceived(event chatClientEvent) {
func (s *Server) sendWelcomeMessageToClient(c *Client) {
// Add an artificial delay so people notice this message come in.
time.Sleep(7 * time.Second)
cr := configrepository.Get()
welcomeMessage := utils.RenderSimpleMarkdown(configRepository.GetServerWelcomeMessage())
welcomeMessage := utils.RenderSimpleMarkdown(cr.GetServerWelcomeMessage())
if welcomeMessage != "" {
s.sendSystemMessageToClient(c, welcomeMessage)
@ -416,39 +414,42 @@ func (s *Server) sendWelcomeMessageToClient(c *Client) {
}
func (s *Server) sendAllWelcomeMessage() {
welcomeMessage := utils.RenderSimpleMarkdown(configRepository.GetServerWelcomeMessage())
cr := configrepository.Get()
welcomeMessage := utils.RenderSimpleMarkdown(cr.GetServerWelcomeMessage())
if welcomeMessage != "" {
clientMessage := events.SystemMessageEvent{
Event: events.Event{},
MessageEvent: events.MessageEvent{
clientMessage := SystemMessageEvent{
Event: models.Event{},
MessageEvent: MessageEvent{
Body: welcomeMessage,
},
}
clientMessage.SetDefaults()
clientMessage.DisplayName = s.configRepository.GetServerName()
_ = s.Broadcast(clientMessage.GetBroadcastPayload())
}
}
func (s *Server) sendSystemMessageToClient(c *Client, message string) {
clientMessage := events.SystemMessageEvent{
Event: events.Event{},
MessageEvent: events.MessageEvent{
clientMessage := SystemMessageEvent{
Event: models.Event{},
MessageEvent: MessageEvent{
Body: message,
},
}
clientMessage.SetDefaults()
clientMessage.RenderBody()
clientMessage.DisplayName = s.configRepository.GetServerName()
s.Send(clientMessage.GetBroadcastPayload(), c)
}
func (s *Server) sendActionToClient(c *Client, message string) {
clientMessage := events.ActionEvent{
MessageEvent: events.MessageEvent{
clientMessage := ActionEvent{
MessageEvent: MessageEvent{
Body: message,
},
Event: events.Event{
Type: events.ChatActionSent,
Event: models.Event{
Type: models.ChatActionSent,
},
}
clientMessage.SetDefaults()

View file

@ -0,0 +1,28 @@
package chat
import "github.com/owncast/owncast/models"
// SystemMessageEvent is a message displayed in chat on behalf of the server.
type SystemMessageEvent struct {
models.Event
MessageEvent
DisplayName string
}
// GetBroadcastPayload will return the object to send to all chat users.
func (e *SystemMessageEvent) GetBroadcastPayload() models.EventPayload {
return models.EventPayload{
"id": e.ID,
"timestamp": e.Timestamp,
"body": e.Body,
"type": models.SystemMessageSent,
"user": models.EventPayload{
"displayName": e.DisplayName,
},
}
}
// GetMessageType will return the event type for this message.
func (e *SystemMessageEvent) GetMessageType() models.EventType {
return models.SystemMessageSent
}

View file

@ -0,0 +1,39 @@
package chat
import (
"time"
"github.com/owncast/owncast/models"
"github.com/teris-io/shortid"
)
// UserMessageEvent is an inbound message from a user.
type UserMessageEvent struct {
models.Event
models.UserEvent
MessageEvent
}
// GetBroadcastPayload will return the object to send to all chat users.
func (e *UserMessageEvent) GetBroadcastPayload() models.EventPayload {
return models.EventPayload{
"id": e.ID,
"timestamp": e.Timestamp,
"body": e.Body,
"user": e.User,
"type": models.MessageSent,
"visible": e.HiddenAt == nil,
}
}
// GetMessageType will return the event type for this message.
func (e *UserMessageEvent) GetMessageType() models.EventType {
return models.MessageSent
}
// SetDefaults will set default properties of all inbound events.
func (e *UserMessageEvent) SetDefaults() {
e.ID = shortid.MustGenerate()
e.Timestamp = time.Now()
e.RenderAndSanitizeMessageBody()
}

View file

@ -1,15 +1,17 @@
package events
package chat
import "github.com/owncast/owncast/models"
// UserPartEvent is the event fired when a user leaves chat.
type UserPartEvent struct {
Event
UserEvent
models.Event
models.UserEvent
}
// GetBroadcastPayload will return the object to send to all chat users.
func (e *UserPartEvent) GetBroadcastPayload() EventPayload {
return EventPayload{
"type": UserParted,
func (e *UserPartEvent) GetBroadcastPayload() models.EventPayload {
return models.EventPayload{
"type": models.UserParted,
"id": e.ID,
"timestamp": e.Timestamp,
"user": e.User,

View file

@ -44,7 +44,7 @@ type Config struct {
}
// NewFediAuth creates a new FediAuth instance.
func NewConfig() *Config {
func New() *Config {
// Default config values.
c := &Config{
DatabaseFilePath: "data/owncast.db",
@ -71,9 +71,9 @@ var temporaryGlobalInstance *Config
// GetConfig returns the temporary global instance.
// Remove this after dependency injection is implemented.
func GetConfig() *Config {
func Get() *Config {
if temporaryGlobalInstance == nil {
temporaryGlobalInstance = NewConfig()
temporaryGlobalInstance = New()
}
return temporaryGlobalInstance
@ -104,3 +104,8 @@ func (c *Config) GetReleaseString() string {
return fmt.Sprintf("Owncast v%s-%s (%s)", versionNumber, buildPlatform, gitCommit)
}
// GetTranscoderLogFilePath returns the logging path for the transcoder log output.
func (c *Config) GetTranscoderLogFilePath() string {
return filepath.Join(c.LogDirectory, "transcoder.log")
}

View file

@ -6,6 +6,7 @@ import (
"github.com/owncast/owncast/core"
"github.com/owncast/owncast/models"
"github.com/owncast/owncast/services/status"
"github.com/owncast/owncast/storage/configrepository"
"github.com/owncast/owncast/utils"
)
@ -16,8 +17,6 @@ const (
minClientCountForDetails = 3
)
var configRepository = configrepository.Get()
// GetStreamHealthOverview will return the stream health overview.
func (m *Metrics) GetStreamHealthOverview() *models.StreamHealthOverview {
return m.metrics.streamHealthOverview
@ -71,6 +70,8 @@ func (m *Metrics) networkSpeedHealthOverviewMessage() string {
bitrate int
}
configRepository := configrepository.Get()
outputVariants := configRepository.GetStreamOutputVariants()
streamSortVariants := make([]singleVariant, len(outputVariants))
@ -137,12 +138,14 @@ func (m *Metrics) wastefulBitrateOverviewMessage() string {
return ""
}
currentBroadcast := core.GetCurrentBroadcast()
stat := status.Get()
currentBroadcast := stat.GetCurrentBroadcast()
if currentBroadcast == nil {
return ""
}
currentBroadcaster := core.GetBroadcaster()
currentBroadcaster := stat.GetBroadcaster()
if currentBroadcast == nil {
return ""
}
@ -156,6 +159,7 @@ func (m *Metrics) wastefulBitrateOverviewMessage() string {
if inboundBitrate == 0 {
return ""
}
configRepository := configrepository.Get()
outputVariants := configRepository.GetStreamOutputVariants()
@ -230,6 +234,8 @@ func (m *Metrics) errorCountHealthOverviewMessage() string {
if totalNumberOfClients >= minClientCountForDetails {
healthyPercentage := utils.IntPercentage(clientsWithErrors, totalNumberOfClients)
configRepository := configrepository.Get()
isUsingPassthrough := false
outputVariants := configRepository.GetStreamOutputVariants()
for _, variant := range outputVariants {
@ -242,7 +248,8 @@ func (m *Metrics) errorCountHealthOverviewMessage() string {
return fmt.Sprintf("%d of %d viewers (%d%%) are experiencing errors. You're currently using a video passthrough output, often known for causing playback issues for people. It is suggested you turn it off.", clientsWithErrors, totalNumberOfClients, healthyPercentage)
}
currentBroadcast := core.GetCurrentBroadcast()
stat := status.Get()
currentBroadcast := stat.GetCurrentBroadcast()
if currentBroadcast != nil && currentBroadcast.LatencyLevel.SecondsPerSegment < 3 {
return fmt.Sprintf("%d of %d viewers (%d%%) may be experiencing some issues. You may want to increase your latency buffer level in your video configuration to see if it helps.", clientsWithErrors, totalNumberOfClients, healthyPercentage)
}

View file

@ -5,7 +5,9 @@ import (
"time"
"github.com/owncast/owncast/models"
"github.com/owncast/owncast/services/chat"
"github.com/owncast/owncast/services/config"
"github.com/owncast/owncast/storage/configrepository"
"github.com/prometheus/client_golang/prometheus"
)
@ -26,6 +28,8 @@ type Metrics struct {
chatUserCount prometheus.Gauge
currentChatMessageCount prometheus.Gauge
playbackErrorCount prometheus.Gauge
chatService *chat.Chat
}
// How often we poll for updates.
@ -76,6 +80,7 @@ func New() *Metrics {
windowedBandwidths: map[string]float64{},
windowedLatencies: map[string]float64{},
windowedDownloadDurations: map[string]float64{},
chatService: chat.Get(),
}
}
@ -84,12 +89,14 @@ func New() *Metrics {
// Start will begin the metrics collection and alerting.
func (m *Metrics) Start(getStatus func() models.Status) {
m.getStatus = getStatus
configRepository := configrepository.Get()
host := configRepository.GetServerURL()
if host == "" {
host = "unknown"
}
c := config.GetConfig()
c := config.Get()
m.labels = map[string]string{
"version": c.VersionNumber,

View file

@ -6,6 +6,7 @@ import (
"github.com/owncast/owncast/core"
"github.com/owncast/owncast/models"
"github.com/owncast/owncast/services/status"
"github.com/owncast/owncast/utils"
)
@ -13,8 +14,10 @@ func (m *Metrics) handlePlaybackPolling() {
m.metrics.m.Lock()
defer m.metrics.m.Unlock()
s := status.Get()
// Make sure this is fired first before all the values get cleared below.
if m.getStatus().Online {
if s.Online {
m.generateStreamHealthOverview()
}

View file

@ -4,9 +4,8 @@ import (
"time"
"github.com/nakabonne/tstorage"
"github.com/owncast/owncast/core"
"github.com/owncast/owncast/core/chat"
"github.com/owncast/owncast/models"
"github.com/owncast/owncast/services/status"
"github.com/owncast/owncast/storage/chatrepository"
"github.com/owncast/owncast/storage/userrepository"
log "github.com/sirupsen/logrus"
@ -30,12 +29,14 @@ func (m *Metrics) startViewerCollectionMetrics() {
}
func (m *Metrics) collectViewerCount() {
s := status.Get()
// Don't collect metrics for viewers if there's no stream active.
if !core.GetStatus().Online {
if !s.Online {
return
}
count := core.GetStatus().ViewerCount
count := s.ViewerCount
// Save active viewer count to our Prometheus collector.
m.activeViewerCount.Set(float64(count))
@ -52,9 +53,9 @@ func (m *Metrics) collectViewerCount() {
}
func (m *Metrics) collectChatClientCount() {
count := len(chat.GetClients())
count := len(m.chatService.GetClients())
m.activeChatClientCount.Set(float64(count))
chatRepository := chatrepository.GetChatRepository()
chatRepository := chatrepository.Get()
usersRepository := userrepository.Get()
// Total message count

View file

@ -22,12 +22,15 @@ type Notifier struct {
}
var (
configRepository = configrepository.Get()
notificationsRepository = notificationsrepository.Get()
configRepository *configrepository.SqlConfigRepository
notificationsRepository *notificationsrepository.SqlNotificationsRepository
)
// Setup will perform any pre-use setup for the notifier.
func Setup(datastore *data.Store) {
configRepository = configrepository.Get()
notificationsRepository = notificationsrepository.Get()
initializeBrowserPushIfNeeded()
}

61
services/status/status.go Normal file
View file

@ -0,0 +1,61 @@
package status
import (
"github.com/owncast/owncast/models"
"github.com/owncast/owncast/storage/configrepository"
)
type Status struct {
models.Stats
models.Status
broadcast *models.CurrentBroadcast
broadcaster *models.Broadcaster
StreamConnected bool
VersionNumber string `json:"versionNumber"`
StreamTitle string `json:"streamTitle"`
ViewerCount int `json:"viewerCount"`
OverallMaxViewerCount int `json:"overallMaxViewerCount"`
SessionMaxViewerCount int `json:"sessionMaxViewerCount"`
Online bool `json:"online"`
}
var temporaryGlobalInstance *Status
func New() *Status {
configRepository := configrepository.Get()
return &Status{
StreamTitle: configRepository.GetStreamTitle(),
}
}
// Get will return the global instance of the status service.
func Get() *Status {
if temporaryGlobalInstance == nil {
temporaryGlobalInstance = &Status{}
}
return temporaryGlobalInstance
}
// GetCurrentBroadcast will return the currently active broadcast.
func (s *Status) GetCurrentBroadcast() *models.CurrentBroadcast {
return s.broadcast
}
func (s *Status) SetCurrentBroadcast(broadcast *models.CurrentBroadcast) {
s.broadcast = broadcast
}
// SetBroadcaster will store the current inbound broadcasting details.
func (s *Status) SetBroadcaster(broadcaster *models.Broadcaster) {
s.broadcaster = broadcaster
}
// GetBroadcaster will return the details of the currently active broadcaster.
func (s *Status) GetBroadcaster() *models.Broadcaster {
return s.broadcaster
}

View file

@ -1,12 +1,11 @@
package webhooks
import (
"github.com/owncast/owncast/core/chat/events"
"github.com/owncast/owncast/models"
)
// SendChatEvent will send a chat event to webhook destinations.
func (w *LiveWebhookManager) SendChatEvent(chatEvent *events.UserMessageEvent) {
func (w *LiveWebhookManager) SendChatEvent(chatEvent *models.UserMessageEvent) {
webhookEvent := WebhookEvent{
Type: chatEvent.GetMessageType(),
EventData: &WebhookChatMessage{
@ -24,7 +23,7 @@ func (w *LiveWebhookManager) SendChatEvent(chatEvent *events.UserMessageEvent) {
}
// SendChatEventUsernameChanged will send a username changed event to webhook destinations.
func (w *LiveWebhookManager) SendChatEventUsernameChanged(event events.NameChangeEvent) {
func (w *LiveWebhookManager) SendChatEventUsernameChanged(event models.NameChangeEvent) {
webhookEvent := WebhookEvent{
Type: models.UserNameChanged,
EventData: event,
@ -34,7 +33,7 @@ func (w *LiveWebhookManager) SendChatEventUsernameChanged(event events.NameChang
}
// SendChatEventUserJoined sends a webhook notifying that a user has joined.
func (w *LiveWebhookManager) SendChatEventUserJoined(event events.UserJoinedEvent) {
func (w *LiveWebhookManager) SendChatEventUserJoined(event models.UserJoinedEvent) {
webhookEvent := WebhookEvent{
Type: models.UserJoined,
EventData: event,
@ -55,7 +54,7 @@ func SendChatEventUserParted(event events.UserPartEvent) {
// SendChatEventSetMessageVisibility sends a webhook notifying that the visibility of one or more
// messages has changed.
func (w *LiveWebhookManager) SendChatEventSetMessageVisibility(event events.SetMessageVisibilityEvent) {
func (w *LiveWebhookManager) SendChatEventSetMessageVisibility(event models.SetMessageVisibilityEvent) {
webhookEvent := WebhookEvent{
Type: models.VisibiltyToggled,
EventData: event,

View file

@ -4,14 +4,12 @@ import (
"testing"
"time"
"github.com/owncast/owncast/core/chat/events"
"github.com/owncast/owncast/core/user"
"github.com/owncast/owncast/models"
)
func TestSendChatEvent(t *testing.T) {
timestamp := time.Unix(72, 6).UTC()
user := user.User{
user := models.User{
ID: "user id",
DisplayName: "display name",
DisplayColor: 4,
@ -26,18 +24,18 @@ func TestSendChatEvent(t *testing.T) {
}
checkPayload(t, models.MessageSent, func() {
manager.SendChatEvent(&events.UserMessageEvent{
Event: events.Event{
Type: events.MessageSent,
manager.SendChatEvent(&models.UserMessageEvent{
Event: models.Event{
Type: models.MessageSent,
ID: "id",
Timestamp: timestamp,
},
UserEvent: events.UserEvent{
UserEvent: models.UserEvent{
User: &user,
ClientID: 51,
HiddenAt: nil,
},
MessageEvent: events.MessageEvent{
MessageEvent: models.MessageEvent{
OutboundEvent: nil,
Body: "body",
RawBody: "raw body",
@ -64,7 +62,7 @@ func TestSendChatEvent(t *testing.T) {
func TestSendChatEventUsernameChanged(t *testing.T) {
timestamp := time.Unix(72, 6).UTC()
user := user.User{
user := models.User{
ID: "user id",
DisplayName: "display name",
DisplayColor: 4,
@ -79,13 +77,13 @@ func TestSendChatEventUsernameChanged(t *testing.T) {
}
checkPayload(t, models.UserNameChanged, func() {
manager.SendChatEventUsernameChanged(events.NameChangeEvent{
Event: events.Event{
Type: events.UserNameChanged,
manager.SendChatEventUsernameChanged(models.NameChangeEvent{
Event: models.Event{
Type: models.UserNameChanged,
ID: "id",
Timestamp: timestamp,
},
UserEvent: events.UserEvent{
UserEvent: models.UserEvent{
User: &user,
ClientID: 51,
HiddenAt: nil,
@ -112,7 +110,7 @@ func TestSendChatEventUsernameChanged(t *testing.T) {
func TestSendChatEventUserJoined(t *testing.T) {
timestamp := time.Unix(72, 6).UTC()
user := user.User{
user := models.User{
ID: "user id",
DisplayName: "display name",
DisplayColor: 4,
@ -127,13 +125,13 @@ func TestSendChatEventUserJoined(t *testing.T) {
}
checkPayload(t, models.UserJoined, func() {
manager.SendChatEventUserJoined(events.UserJoinedEvent{
Event: events.Event{
Type: events.UserJoined,
manager.SendChatEventUserJoined(models.UserJoinedEvent{
Event: models.Event{
Type: models.UserJoined,
ID: "id",
Timestamp: timestamp,
},
UserEvent: events.UserEvent{
UserEvent: models.UserEvent{
User: &user,
ClientID: 51,
HiddenAt: nil,
@ -160,13 +158,13 @@ func TestSendChatEventSetMessageVisibility(t *testing.T) {
timestamp := time.Unix(72, 6).UTC()
checkPayload(t, models.VisibiltyToggled, func() {
manager.SendChatEventSetMessageVisibility(events.SetMessageVisibilityEvent{
Event: events.Event{
Type: events.VisibiltyUpdate,
manager.SendChatEventSetMessageVisibility(models.SetMessageVisibilityEvent{
Event: models.Event{
Type: models.VisibiltyUpdate,
ID: "id",
Timestamp: timestamp,
},
UserMessageEvent: events.UserMessageEvent{},
UserMessageEvent: models.UserMessageEvent{},
MessageIDs: []string{"message1", "message2"},
Visible: false,
})

View file

@ -10,11 +10,11 @@ type Manager interface {
// to be sent out to all registered webhook destinations.
type LiveWebhookManager struct {
queue chan Job
getStatus func() models.Status
getStatus func() *models.Status
}
// New creates a new webhook manager.
func New(getStatusFunc func() models.Status) *LiveWebhookManager {
func New(getStatusFunc func() *models.Status) *LiveWebhookManager {
m := &LiveWebhookManager{
getStatus: getStatusFunc,
}
@ -24,7 +24,7 @@ func New(getStatusFunc func() models.Status) *LiveWebhookManager {
// InitTemporarySingleton initializes the the temporary global instance of the webhook manager
// to be deleted once dependency injection is implemented.
func InitTemporarySingleton(getStatusFunc func() models.Status) {
func InitTemporarySingleton(getStatusFunc func() *models.Status) {
temporaryGlobalInstance = New(getStatusFunc)
}

View file

@ -8,14 +8,14 @@ import (
"github.com/teris-io/shortid"
)
var configRepository = configrepository.Get()
// SendStreamStatusEvent will send all webhook destinations the current stream status.
func (w *LiveWebhookManager) SendStreamStatusEvent(eventType models.EventType) {
w.sendStreamStatusEvent(eventType, shortid.MustGenerate(), time.Now())
}
func (w *LiveWebhookManager) sendStreamStatusEvent(eventType models.EventType, id string, timestamp time.Time) {
configRepository := configrepository.Get()
w.SendEventToWebhooks(WebhookEvent{
Type: eventType,
EventData: map[string]interface{}{

View file

@ -4,17 +4,19 @@ import (
"testing"
"time"
"github.com/owncast/owncast/core/chat/events"
"github.com/owncast/owncast/models"
"github.com/owncast/owncast/storage/configrepository"
)
func TestSendStreamStatusEvent(t *testing.T) {
configRepository := configrepository.Get()
configRepository.SetServerName("my server")
configRepository.SetServerSummary("my server where I stream")
configRepository.SetStreamTitle("my stream")
checkPayload(t, models.StreamStarted, func() {
manager.sendStreamStatusEvent(events.StreamStarted, "id", time.Unix(72, 6).UTC())
manager.sendStreamStatusEvent(models.StreamStarted, "id", time.Unix(72, 6).UTC())
}, `{
"id": "id",
"name": "my server",

Some files were not shown because too many files have changed in this diff Show more