2020-06-23 04:11:56 +03:00
package chat
import (
2021-07-20 05:22:29 +03:00
"encoding/json"
2021-11-03 05:27:41 +03:00
"fmt"
2020-06-23 04:11:56 +03:00
"net/http"
2020-12-22 06:42:47 +03:00
"sync"
2020-06-23 04:11:56 +03:00
"time"
log "github.com/sirupsen/logrus"
2020-06-23 09:52:50 +03:00
2021-07-20 05:22:29 +03:00
"github.com/gorilla/websocket"
2022-03-07 10:26:24 +03:00
"github.com/owncast/owncast/config"
2021-07-20 05:22:29 +03:00
"github.com/owncast/owncast/core/chat/events"
2021-02-19 10:05:52 +03:00
"github.com/owncast/owncast/core/data"
"github.com/owncast/owncast/core/webhooks"
2024-07-02 04:58:50 +03:00
"github.com/owncast/owncast/models"
"github.com/owncast/owncast/persistence/userrepository"
2024-07-03 07:26:33 +03:00
"github.com/owncast/owncast/services/geoip"
2021-07-20 05:22:29 +03:00
"github.com/owncast/owncast/utils"
2020-06-23 04:11:56 +03:00
)
2021-09-12 10:18:15 +03:00
var _server * Server
2020-06-23 23:11:01 +03:00
2021-09-12 10:18:15 +03:00
// Server represents an instance of the chat server.
type Server struct {
2023-05-30 20:31:43 +03:00
clients map [ uint ] * Client
2020-12-22 06:42:47 +03:00
2021-07-20 05:22:29 +03:00
// send outbound message payload to all clients
outbound chan [ ] byte
2020-06-23 04:11:56 +03:00
2021-07-20 05:22:29 +03:00
// receive inbound message payload from all clients
inbound chan chatClientEvent
2020-06-23 23:11:01 +03:00
2021-07-20 05:22:29 +03:00
// unregister requests from clients.
2021-07-27 03:30:28 +03:00
unregister chan uint // the ChatClient id
2021-10-12 23:21:37 +03:00
2023-10-09 00:22:28 +03:00
geoipClient * geoip . Client
// a map of user IDs and timers that fire for chat part messages.
userPartedTimers map [ string ] * time . Ticker
2023-05-30 20:31:43 +03:00
seq uint
maxSocketConnectionLimit int64
mu sync . RWMutex
2020-06-23 04:11:56 +03:00
}
2021-09-12 10:18:15 +03:00
// NewChat will return a new instance of the chat server.
func NewChat ( ) * Server {
2021-08-14 01:26:44 +03:00
maximumConcurrentConnectionLimit := getMaximumConcurrentConnectionLimit ( )
setSystemConcurrentConnectionLimit ( maximumConcurrentConnectionLimit )
2021-09-12 10:18:15 +03:00
server := & Server {
clients : map [ uint ] * Client { } ,
2021-08-14 01:26:44 +03:00
outbound : make ( chan [ ] byte ) ,
inbound : make ( chan chatClientEvent ) ,
unregister : make ( chan uint ) ,
maxSocketConnectionLimit : maximumConcurrentConnectionLimit ,
2021-10-12 23:21:37 +03:00
geoipClient : geoip . NewClient ( ) ,
2023-09-10 20:58:11 +03:00
userPartedTimers : map [ string ] * time . Ticker { } ,
2021-07-20 05:22:29 +03:00
}
2020-06-23 04:11:56 +03:00
2021-07-20 05:22:29 +03:00
return server
2020-06-23 04:11:56 +03:00
}
2021-09-12 10:18:15 +03:00
// Run will start the chat server.
func ( s * Server ) Run ( ) {
2021-07-20 05:22:29 +03:00
for {
select {
2021-09-12 10:18:15 +03:00
case clientID := <- s . unregister :
2023-09-10 20:58:11 +03:00
if client , ok := s . clients [ clientID ] ; ok {
s . handleClientDisconnected ( client )
2021-07-20 05:22:29 +03:00
s . mu . Lock ( )
2021-09-12 10:18:15 +03:00
delete ( s . clients , clientID )
2021-07-20 05:22:29 +03:00
s . mu . Unlock ( )
}
case message := <- s . inbound :
s . eventReceived ( message )
}
}
2020-06-23 04:11:56 +03:00
}
2021-07-20 05:22:29 +03:00
// Addclient registers new connection as a User.
2024-07-02 04:58:50 +03:00
func ( s * Server ) Addclient ( conn * websocket . Conn , user * models . User , accessToken string , userAgent string , ipAddress string ) * Client {
2021-09-12 10:18:15 +03:00
client := & Client {
2021-07-20 05:22:29 +03:00
server : s ,
conn : conn ,
User : user ,
2022-03-07 07:34:49 +03:00
IPAddress : ipAddress ,
2021-07-20 05:22:29 +03:00
accessToken : accessToken ,
2021-07-31 22:48:42 +03:00
send : make ( chan [ ] byte , 256 ) ,
2021-07-20 05:22:29 +03:00
UserAgent : userAgent ,
ConnectedAt : time . Now ( ) ,
}
2023-09-10 20:58:11 +03:00
shouldSendJoinedMessages := data . GetChatJoinPartMessagesEnabled ( )
2021-09-22 00:06:23 +03:00
2024-02-19 02:49:50 +03:00
// If there are existing clients connected for this user do not send
// a user joined message. Do not put this under a mutex, as
// GetClientsForUser already has a lock.
if existingConnectedClients , _ := GetClientsForUser ( user . ID ) ; len ( existingConnectedClients ) > 0 {
shouldSendJoinedMessages = false
}
2021-07-20 05:22:29 +03:00
s . mu . Lock ( )
{
2023-09-10 20:58:11 +03:00
// If there is a pending disconnect timer then clear it.
// Do not send user joined message if enough time hasn't passed where the
// user chat part message hasn't been sent yet.
if ticker , ok := s . userPartedTimers [ user . ID ] ; ok {
ticker . Stop ( )
delete ( s . userPartedTimers , user . ID )
shouldSendJoinedMessages = false
}
2022-12-14 06:17:32 +03:00
client . Id = s . seq
s . clients [ client . Id ] = client
2021-07-20 05:22:29 +03:00
s . seq ++
}
s . mu . Unlock ( )
2022-12-14 06:17:32 +03:00
log . Traceln ( "Adding client" , client . Id , "total count:" , len ( s . clients ) )
2021-07-20 05:22:29 +03:00
go client . writePump ( )
go client . readPump ( )
client . sendConnectedClientInfo ( )
if getStatus ( ) . Online {
2021-09-22 00:06:23 +03:00
if shouldSendJoinedMessages {
s . sendUserJoinedMessage ( client )
}
2021-07-20 05:22:29 +03:00
s . sendWelcomeMessageToClient ( client )
}
2021-08-13 00:55:21 +03:00
// Asynchronously, optionally, fetch GeoIP data.
2021-09-12 10:18:15 +03:00
go func ( client * Client ) {
2021-10-12 23:21:37 +03:00
client . Geo = s . geoipClient . GetGeoFromIP ( ipAddress )
2021-08-13 00:55:21 +03:00
} ( client )
2021-07-20 05:22:29 +03:00
return client
2020-06-23 04:11:56 +03:00
}
2021-09-12 10:18:15 +03:00
func ( s * Server ) sendUserJoinedMessage ( c * Client ) {
2021-07-20 05:22:29 +03:00
userJoinedEvent := events . UserJoinedEvent { }
userJoinedEvent . SetDefaults ( )
userJoinedEvent . User = c . User
2022-12-14 06:17:32 +03:00
userJoinedEvent . ClientID = c . Id
2021-07-20 05:22:29 +03:00
if err := s . Broadcast ( userJoinedEvent . GetBroadcastPayload ( ) ) ; err != nil {
log . Errorln ( "error adding client to chat server" , err )
2020-06-23 04:11:56 +03:00
}
2021-07-20 05:22:29 +03:00
// Send chat user joined webhook
webhooks . SendChatEventUserJoined ( userJoinedEvent )
2020-06-23 04:11:56 +03:00
}
2023-09-10 20:58:11 +03:00
func ( s * Server ) handleClientDisconnected ( c * Client ) {
2022-12-14 06:17:32 +03:00
if _ , ok := s . clients [ c . Id ] ; ok {
log . Debugln ( "Deleting" , c . Id )
delete ( s . clients , c . Id )
2020-06-23 04:11:56 +03:00
}
2023-09-10 20:58:11 +03:00
additionalClientCheck , _ := GetClientsForUser ( c . User . ID )
if len ( additionalClientCheck ) > 0 {
// This user is still connected to chat with another client.
return
}
s . userPartedTimers [ c . User . ID ] = time . NewTicker ( 10 * time . Second )
go func ( ) {
<- s . userPartedTimers [ c . User . ID ] . C
s . sendUserPartedMessage ( c )
} ( )
}
func ( s * Server ) sendUserPartedMessage ( c * Client ) {
s . userPartedTimers [ c . User . ID ] . Stop ( )
delete ( s . userPartedTimers , c . User . ID )
userPartEvent := events . UserPartEvent { }
userPartEvent . SetDefaults ( )
userPartEvent . User = c . User
userPartEvent . ClientID = c . Id
// If part messages are disabled.
if data . GetChatJoinPartMessagesEnabled ( ) {
if err := s . Broadcast ( userPartEvent . GetBroadcastPayload ( ) ) ; err != nil {
log . Errorln ( "error sending chat part message" , err )
}
}
// Send chat user joined webhook
webhooks . SendChatEventUserParted ( userPartEvent )
2020-06-23 04:11:56 +03:00
}
2021-09-12 10:18:15 +03:00
// HandleClientConnection is fired when a single client connects to the websocket.
func ( s * Server ) HandleClientConnection ( w http . ResponseWriter , r * http . Request ) {
2021-07-20 05:22:29 +03:00
if data . GetChatDisabled ( ) {
_ , _ = w . Write ( [ ] byte ( events . ChatDisabled ) )
return
2020-07-29 07:30:03 +03:00
}
2021-02-19 10:05:52 +03:00
2022-03-07 07:34:49 +03:00
ipAddress := utils . GetIPAddressFromRequest ( r )
// Check if this client's IP address is banned. If so send a rejection.
if blocked , err := data . IsIPAddressBanned ( ipAddress ) ; blocked {
log . Debugln ( "Client ip address has been blocked. Rejecting." )
w . WriteHeader ( http . StatusForbidden )
return
} else if err != nil {
log . Errorln ( "error determining if IP address is blocked: " , err )
}
2021-07-20 05:22:29 +03:00
// Limit concurrent chat connections
2021-08-14 01:26:44 +03:00
if int64 ( len ( s . clients ) ) >= s . maxSocketConnectionLimit {
log . Warnln ( "rejecting incoming client connection as it exceeds the max client count of" , s . maxSocketConnectionLimit )
2021-07-20 05:22:29 +03:00
_ , _ = w . Write ( [ ] byte ( events . ErrorMaxConnectionsExceeded ) )
return
}
2022-05-03 23:01:50 +03:00
// To allow dev web environments to connect.
upgrader . CheckOrigin = func ( r * http . Request ) bool {
return true
}
2021-07-20 05:22:29 +03:00
conn , err := upgrader . Upgrade ( w , r , nil )
if err != nil {
log . Debugln ( err )
return
}
accessToken := r . URL . Query ( ) . Get ( "accessToken" )
if accessToken == "" {
log . Errorln ( "Access token is required" )
// Return HTTP status code
2021-09-12 10:18:15 +03:00
_ = conn . Close ( )
2021-07-20 05:22:29 +03:00
return
}
2024-07-02 04:58:50 +03:00
userRepository := userrepository . Get ( )
2021-07-20 05:22:29 +03:00
// A user is required to use the websocket
2024-07-02 04:58:50 +03:00
user := userRepository . GetUserByToken ( accessToken )
2021-07-20 05:22:29 +03:00
if user == nil {
2022-04-22 00:55:26 +03:00
// Send error that registration is required
2021-07-20 05:22:29 +03:00
_ = conn . WriteJSON ( events . EventPayload {
"type" : events . ErrorNeedsRegistration ,
} )
2021-09-12 10:18:15 +03:00
_ = conn . Close ( )
2021-07-20 05:22:29 +03:00
return
}
// User is disabled therefore we should disconnect.
if user . DisabledAt != nil {
2021-09-12 10:18:15 +03:00
log . Traceln ( "Disabled user" , user . ID , user . DisplayName , "rejected" )
2021-07-20 05:22:29 +03:00
_ = conn . WriteJSON ( events . EventPayload {
"type" : events . ErrorUserDisabled ,
} )
2021-09-12 10:18:15 +03:00
_ = conn . Close ( )
2021-07-20 05:22:29 +03:00
return
2021-02-19 10:05:52 +03:00
}
2021-03-04 07:44:13 +03:00
2021-07-20 05:22:29 +03:00
userAgent := r . UserAgent ( )
2021-08-13 00:55:21 +03:00
s . Addclient ( conn , user , accessToken , userAgent , ipAddress )
2020-07-29 07:30:03 +03:00
}
2021-07-20 05:22:29 +03:00
// Broadcast sends message to all connected clients.
2021-09-12 10:18:15 +03:00
func ( s * Server ) Broadcast ( payload events . EventPayload ) error {
2021-07-20 05:22:29 +03:00
data , err := json . Marshal ( payload )
if err != nil {
return err
}
2022-04-16 05:30:05 +03:00
s . mu . RLock ( )
defer s . mu . RUnlock ( )
2020-06-23 09:52:50 +03:00
2021-07-20 05:22:29 +03:00
for _ , client := range s . clients {
if client == nil {
continue
}
2020-06-23 09:52:50 +03:00
2021-07-20 05:22:29 +03:00
select {
case client . send <- data :
default :
2022-04-16 05:30:05 +03:00
go client . close ( )
2020-06-23 09:52:50 +03:00
}
2021-07-20 05:22:29 +03:00
}
2020-06-23 09:52:50 +03:00
2021-07-20 05:22:29 +03:00
return nil
2020-06-23 09:52:50 +03:00
}
2021-09-12 10:18:15 +03:00
// Send will send a single payload to a single connected client.
func ( s * Server ) Send ( payload events . EventPayload , client * Client ) {
2021-07-20 05:22:29 +03:00
data , err := json . Marshal ( payload )
if err != nil {
log . Errorln ( err )
return
}
2020-06-23 04:11:56 +03:00
2021-07-20 05:22:29 +03:00
client . send <- data
}
2020-06-23 04:11:56 +03:00
2022-03-07 07:34:49 +03:00
// DisconnectClients will forcefully disconnect all clients belonging to a user by ID.
func ( s * Server ) DisconnectClients ( clients [ ] * Client ) {
2021-07-20 05:22:29 +03:00
for _ , client := range clients {
2021-09-12 10:18:15 +03:00
log . Traceln ( "Disconnecting client" , client . User . ID , "owned by" , client . User . DisplayName )
2021-03-14 21:46:27 +03:00
2021-09-12 10:18:15 +03:00
go func ( client * Client ) {
2021-07-20 05:22:29 +03:00
event := events . UserDisabledEvent { }
event . SetDefaults ( )
2021-03-10 22:07:47 +03:00
2021-07-20 05:22:29 +03:00
// Send this disabled event specifically to this single connected client
// to let them know they've been banned.
_server . Send ( event . GetBroadcastPayload ( ) , client )
2020-10-14 02:45:52 +03:00
2021-07-20 05:22:29 +03:00
// Give the socket time to send out the above message.
// Unfortunately I don't know of any way to get a real callback to know when
// the message was successfully sent, so give it a couple seconds.
time . Sleep ( 2 * time . Second )
2021-02-19 10:05:52 +03:00
2021-07-20 05:22:29 +03:00
// Forcefully disconnect if still valid.
if client != nil {
client . close ( )
2021-02-01 01:57:50 +03:00
}
2021-07-20 05:22:29 +03:00
} ( client )
}
}
2020-06-23 04:11:56 +03:00
2021-11-03 05:27:41 +03:00
// SendConnectedClientInfoToUser will find all the connected clients assigned to a user
// and re-send each the connected client info.
func SendConnectedClientInfoToUser ( userID string ) error {
clients , err := GetClientsForUser ( userID )
if err != nil {
return err
}
2024-07-02 04:58:50 +03:00
userRepository := userrepository . Get ( )
2021-11-03 05:27:41 +03:00
// Get an updated reference to the user.
2024-07-02 04:58:50 +03:00
user := userRepository . GetUserByID ( userID )
2021-11-03 05:27:41 +03:00
if user == nil {
return fmt . Errorf ( "user not found" )
}
if err != nil {
return err
}
for _ , client := range clients {
// Update the client's reference to its user.
client . User = user
// Send the update to the client.
client . sendConnectedClientInfo ( )
}
return nil
}
// SendActionToUser will send system action text to all connected clients
// assigned to a user ID.
func SendActionToUser ( userID string , text string ) error {
clients , err := GetClientsForUser ( userID )
if err != nil {
return err
}
for _ , client := range clients {
_server . sendActionToClient ( client , text )
}
return nil
}
2021-09-12 10:18:15 +03:00
func ( s * Server ) eventReceived ( event chatClientEvent ) {
2022-03-07 10:26:24 +03:00
c := event . client
u := c . User
// If established chat user only mode is enabled and the user is not old
// enough then reject this event and send them an informative message.
if u != nil && data . GetChatEstbalishedUsersOnlyMode ( ) && time . Since ( event . client . User . CreatedAt ) < config . GetDefaults ( ) . ChatEstablishedUserModeTimeDuration && ! u . IsModerator ( ) {
s . sendActionToClient ( c , "You have not been an established chat participant long enough to take part in chat. Please enjoy the stream and try again later." )
return
}
2021-07-20 05:22:29 +03:00
var typecheck map [ string ] interface { }
if err := json . Unmarshal ( event . data , & typecheck ) ; err != nil {
log . Debugln ( err )
}
2020-06-23 04:11:56 +03:00
2021-07-20 05:22:29 +03:00
eventType := typecheck [ "type" ]
switch eventType {
case events . MessageSent :
s . userMessageSent ( event )
case events . UserNameChanged :
s . userNameChanged ( event )
2022-08-10 05:56:45 +03:00
case events . UserColorChanged :
s . userColorChanged ( event )
2021-07-20 05:22:29 +03:00
default :
2022-09-21 20:03:16 +03:00
log . Debugln ( logSanitize ( fmt . Sprint ( eventType ) ) , "event not found:" , logSanitize ( fmt . Sprint ( typecheck ) ) )
2020-06-23 04:11:56 +03:00
}
}
2020-07-20 01:14:51 +03:00
2021-09-12 10:18:15 +03:00
func ( s * Server ) sendWelcomeMessageToClient ( c * Client ) {
2021-07-20 05:22:29 +03:00
// Add an artificial delay so people notice this message come in.
time . Sleep ( 7 * time . Second )
welcomeMessage := utils . RenderSimpleMarkdown ( data . GetServerWelcomeMessage ( ) )
2020-12-22 06:42:47 +03:00
2021-07-20 05:22:29 +03:00
if welcomeMessage != "" {
s . sendSystemMessageToClient ( c , welcomeMessage )
2020-12-22 06:42:47 +03:00
}
}
2021-09-12 10:18:15 +03:00
func ( s * Server ) sendAllWelcomeMessage ( ) {
2021-07-20 05:22:29 +03:00
welcomeMessage := utils . RenderSimpleMarkdown ( data . GetServerWelcomeMessage ( ) )
2020-07-20 01:14:51 +03:00
2021-07-20 05:22:29 +03:00
if welcomeMessage != "" {
clientMessage := events . SystemMessageEvent {
Event : events . Event { } ,
MessageEvent : events . MessageEvent {
Body : welcomeMessage ,
} ,
2021-03-22 00:10:56 +03:00
}
2021-07-20 05:22:29 +03:00
clientMessage . SetDefaults ( )
_ = s . Broadcast ( clientMessage . GetBroadcastPayload ( ) )
}
}
2021-09-12 10:18:15 +03:00
func ( s * Server ) sendSystemMessageToClient ( c * Client , message string ) {
2021-07-20 05:22:29 +03:00
clientMessage := events . SystemMessageEvent {
Event : events . Event { } ,
MessageEvent : events . MessageEvent {
Body : message ,
} ,
}
clientMessage . SetDefaults ( )
2021-09-13 11:26:28 +03:00
clientMessage . RenderBody ( )
2021-07-20 05:22:29 +03:00
s . Send ( clientMessage . GetBroadcastPayload ( ) , c )
}
2021-09-12 10:18:15 +03:00
func ( s * Server ) sendActionToClient ( c * Client , message string ) {
2021-07-20 05:22:29 +03:00
clientMessage := events . ActionEvent {
MessageEvent : events . MessageEvent {
Body : message ,
} ,
2021-11-03 05:27:41 +03:00
Event : events . Event {
Type : events . ChatActionSent ,
} ,
2021-07-20 05:22:29 +03:00
}
clientMessage . SetDefaults ( )
clientMessage . RenderBody ( )
s . Send ( clientMessage . GetBroadcastPayload ( ) , c )
2020-10-07 09:14:33 +03:00
}