mirror of
https://github.com/owncast/owncast.git
synced 2024-11-23 21:28:29 +03:00
190 lines
4 KiB
Go
190 lines
4 KiB
Go
package chat
|
|
|
|
import (
|
|
"fmt"
|
|
"net/http"
|
|
"sync"
|
|
"time"
|
|
|
|
log "github.com/sirupsen/logrus"
|
|
"golang.org/x/net/websocket"
|
|
|
|
"github.com/owncast/owncast/core/data"
|
|
"github.com/owncast/owncast/core/webhooks"
|
|
"github.com/owncast/owncast/models"
|
|
)
|
|
|
|
var (
|
|
_server *server
|
|
)
|
|
|
|
var l = &sync.RWMutex{}
|
|
|
|
// Server represents the server which handles the chat.
|
|
type server struct {
|
|
Clients map[string]*Client
|
|
|
|
pattern string
|
|
listener models.ChatListener
|
|
|
|
addCh chan *Client
|
|
delCh chan *Client
|
|
sendAllCh chan models.ChatEvent
|
|
pingCh chan models.PingMessage
|
|
doneCh chan bool
|
|
errCh chan error
|
|
}
|
|
|
|
// Add adds a client to the server.
|
|
func (s *server) add(c *Client) {
|
|
s.addCh <- c
|
|
}
|
|
|
|
// Remove removes a client from the server.
|
|
func (s *server) remove(c *Client) {
|
|
s.delCh <- c
|
|
}
|
|
|
|
// SendToAll sends a message to all of the connected clients.
|
|
func (s *server) SendToAll(msg models.ChatEvent) {
|
|
s.sendAllCh <- msg
|
|
}
|
|
|
|
// Err handles an error.
|
|
func (s *server) err(err error) {
|
|
s.errCh <- err
|
|
}
|
|
|
|
func (s *server) sendAll(msg models.ChatEvent) {
|
|
l.RLock()
|
|
for _, c := range s.Clients {
|
|
c.write(msg)
|
|
}
|
|
l.RUnlock()
|
|
}
|
|
|
|
func (s *server) ping() {
|
|
ping := models.PingMessage{MessageType: models.PING}
|
|
|
|
l.RLock()
|
|
for _, c := range s.Clients {
|
|
c.pingch <- ping
|
|
}
|
|
l.RUnlock()
|
|
}
|
|
|
|
func (s *server) usernameChanged(msg models.NameChangeEvent) {
|
|
l.RLock()
|
|
for _, c := range s.Clients {
|
|
c.usernameChangeChannel <- msg
|
|
}
|
|
l.RUnlock()
|
|
|
|
go webhooks.SendChatEventUsernameChanged(msg)
|
|
}
|
|
|
|
func (s *server) userJoined(msg models.UserJoinedEvent) {
|
|
l.RLock()
|
|
if s.listener.IsStreamConnected() {
|
|
for _, c := range s.Clients {
|
|
c.userJoinedChannel <- msg
|
|
}
|
|
}
|
|
l.RUnlock()
|
|
|
|
go webhooks.SendChatEventUserJoined(msg)
|
|
}
|
|
|
|
func (s *server) onConnection(ws *websocket.Conn) {
|
|
client := NewClient(ws)
|
|
|
|
defer func() {
|
|
s.removeClient(client)
|
|
|
|
if err := ws.Close(); err != nil {
|
|
log.Debugln(err)
|
|
//s.errCh <- err
|
|
}
|
|
}()
|
|
|
|
s.add(client)
|
|
client.listen()
|
|
}
|
|
|
|
// Listen and serve.
|
|
// It serves client connection and broadcast request.
|
|
func (s *server) Listen() {
|
|
http.Handle(s.pattern, websocket.Handler(s.onConnection))
|
|
|
|
log.Tracef("Starting the websocket listener on: %s", s.pattern)
|
|
|
|
for {
|
|
select {
|
|
// add new a client
|
|
case c := <-s.addCh:
|
|
l.Lock()
|
|
s.Clients[c.socketID] = c
|
|
|
|
if !c.Ignore {
|
|
s.listener.ClientAdded(c.GetViewerClientFromChatClient())
|
|
s.sendWelcomeMessageToClient(c)
|
|
}
|
|
l.Unlock()
|
|
|
|
// remove a client
|
|
case c := <-s.delCh:
|
|
s.removeClient(c)
|
|
case msg := <-s.sendAllCh:
|
|
// message was received from a client and should be sanitized, validated
|
|
// and distributed to other clients.
|
|
//
|
|
// Will turn markdown into html, sanitize user-supplied raw html
|
|
// and standardize this message into something safe we can send everyone else.
|
|
msg.RenderAndSanitizeMessageBody()
|
|
|
|
if !msg.Empty() {
|
|
s.listener.MessageSent(msg)
|
|
s.sendAll(msg)
|
|
|
|
// Store in the message history
|
|
msg.SetDefaults()
|
|
if !msg.Ephemeral {
|
|
addMessage(msg)
|
|
}
|
|
|
|
// Send webhooks
|
|
go webhooks.SendChatEvent(msg)
|
|
}
|
|
case ping := <-s.pingCh:
|
|
fmt.Println("PING?", ping)
|
|
|
|
case err := <-s.errCh:
|
|
log.Trace("Error: ", err.Error())
|
|
|
|
case <-s.doneCh:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (s *server) removeClient(c *Client) {
|
|
l.Lock()
|
|
if _, ok := s.Clients[c.socketID]; ok {
|
|
delete(s.Clients, c.socketID)
|
|
|
|
s.listener.ClientRemoved(c.socketID)
|
|
log.Tracef("The client was connected for %s and sent %d messages (%s)", time.Since(c.ConnectedAt), c.MessageCount, c.ClientID)
|
|
}
|
|
l.Unlock()
|
|
}
|
|
|
|
func (s *server) sendWelcomeMessageToClient(c *Client) {
|
|
go func() {
|
|
// Add an artificial delay so people notice this message come in.
|
|
time.Sleep(7 * time.Second)
|
|
|
|
initialChatMessageText := fmt.Sprintf("Welcome to %s! %s", data.GetServerName(), data.GetServerSummary())
|
|
initialMessage := models.ChatEvent{ClientID: "owncast-server", Author: data.GetServerName(), Body: initialChatMessageText, ID: "initial-message-1", MessageType: "SYSTEM", Visible: true, Timestamp: time.Now()}
|
|
c.write(initialMessage)
|
|
}()
|
|
}
|