owncast/core/chat/client.go

249 lines
5.8 KiB
Go

package chat
import (
"encoding/json"
"fmt"
"io"
"time"
log "github.com/sirupsen/logrus"
"golang.org/x/net/websocket"
"github.com/owncast/owncast/geoip"
"github.com/owncast/owncast/models"
"github.com/owncast/owncast/utils"
"github.com/teris-io/shortid"
"golang.org/x/time/rate"
)
const channelBufSize = 100
//Client represents a chat client.
type Client struct {
ConnectedAt time.Time
MessageCount int
UserAgent string
IPAddress string
Username *string
ClientID string // How we identify unique viewers when counting viewer counts.
Geo *geoip.GeoDetails `json:"geo"`
Ignore bool // If set to true this will not be treated as a viewer
socketID string // How we identify a single websocket client.
ws *websocket.Conn
ch chan models.ChatEvent
pingch chan models.PingMessage
usernameChangeChannel chan models.NameChangeEvent
userJoinedChannel chan models.UserJoinedEvent
doneCh chan bool
rateLimiter *rate.Limiter
}
// NewClient creates a new chat client.
func NewClient(ws *websocket.Conn) *Client {
if ws == nil {
log.Panicln("ws cannot be nil")
}
var ignoreClient = false
for _, extraData := range ws.Config().Protocol {
if extraData == "IGNORE_CLIENT" {
ignoreClient = true
}
}
ch := make(chan models.ChatEvent, channelBufSize)
doneCh := make(chan bool)
pingch := make(chan models.PingMessage)
usernameChangeChannel := make(chan models.NameChangeEvent)
userJoinedChannel := make(chan models.UserJoinedEvent)
ipAddress := utils.GetIPAddressFromRequest(ws.Request())
userAgent := ws.Request().UserAgent()
socketID, _ := shortid.Generate()
clientID := socketID
rateLimiter := rate.NewLimiter(0.6, 5)
return &Client{time.Now(), 0, userAgent, ipAddress, nil, clientID, nil, ignoreClient, socketID, ws, ch, pingch, usernameChangeChannel, userJoinedChannel, doneCh, rateLimiter}
}
func (c *Client) write(msg models.ChatEvent) {
select {
case c.ch <- msg:
default:
_server.removeClient(c)
_server.err(fmt.Errorf("client %s is disconnected", c.ClientID))
}
}
// Listen Write and Read request via channel.
func (c *Client) listen() {
go c.listenWrite()
c.listenRead()
}
// Listen write request via channel.
func (c *Client) listenWrite() {
for {
select {
// Send a PING keepalive
case msg := <-c.pingch:
err := websocket.JSON.Send(c.ws, msg)
if err != nil {
c.handleClientSocketError(err)
}
// send message to the client
case msg := <-c.ch:
err := websocket.JSON.Send(c.ws, msg)
if err != nil {
c.handleClientSocketError(err)
}
case msg := <-c.usernameChangeChannel:
err := websocket.JSON.Send(c.ws, msg)
if err != nil {
c.handleClientSocketError(err)
}
case msg := <-c.userJoinedChannel:
err := websocket.JSON.Send(c.ws, msg)
if err != nil {
c.handleClientSocketError(err)
}
// receive done request
case <-c.doneCh:
_server.removeClient(c)
c.doneCh <- true // for listenRead method
return
}
}
}
func (c *Client) handleClientSocketError(err error) {
_server.removeClient(c)
}
func (c *Client) passesRateLimit() bool {
if !c.rateLimiter.Allow() {
log.Debugln("Client", c.ClientID, "has exceeded the messaging rate limiting thresholds.")
return false
}
return true
}
// Listen read request via channel.
func (c *Client) listenRead() {
for {
select {
// receive done request
case <-c.doneCh:
_server.remove(c)
c.doneCh <- true // for listenWrite method
return
// read data from websocket connection
default:
var data []byte
err := websocket.Message.Receive(c.ws, &data)
if err != nil {
if err == io.EOF {
c.doneCh <- true
return
}
c.handleClientSocketError(err)
}
if !c.passesRateLimit() {
continue
}
var messageTypeCheck map[string]interface{}
err = json.Unmarshal(data, &messageTypeCheck)
// Bad messages should be thrown away
if err != nil {
log.Debugln("Badly formatted message received from", c.Username, c.ws.Request().RemoteAddr)
continue
}
// If we can't tell the type of message, also throw it away.
if messageTypeCheck == nil {
log.Debugln("Untyped message received from", c.Username, c.ws.Request().RemoteAddr)
continue
}
messageType := messageTypeCheck["type"].(string)
if messageType == models.MessageSent {
c.chatMessageReceived(data)
} else if messageType == models.UserNameChanged {
c.userChangedName(data)
} else if messageType == models.UserJoined {
c.userJoined(data)
}
}
}
}
func (c *Client) userJoined(data []byte) {
var msg models.UserJoinedEvent
if err := json.Unmarshal(data, &msg); err != nil {
log.Errorln(err)
return
}
msg.ID = shortid.MustGenerate()
msg.Type = models.UserJoined
msg.Timestamp = time.Now()
c.Username = &msg.Username
_server.userJoined(msg)
}
func (c *Client) userChangedName(data []byte) {
var msg models.NameChangeEvent
err := json.Unmarshal(data, &msg)
if err != nil {
log.Errorln(err)
}
msg.Type = models.UserNameChanged
msg.ID = shortid.MustGenerate()
_server.usernameChanged(msg)
c.Username = &msg.NewName
}
func (c *Client) chatMessageReceived(data []byte) {
var msg models.ChatEvent
err := json.Unmarshal(data, &msg)
if err != nil {
log.Errorln(err)
}
msg.SetDefaults()
c.MessageCount++
c.Username = &msg.Author
msg.ClientID = c.ClientID
msg.RenderAndSanitizeMessageBody()
_server.SendToAll(msg)
}
// GetViewerClientFromChatClient returns a general models.Client from a chat websocket client.
func (c *Client) GetViewerClientFromChatClient() models.Client {
return models.Client{
ConnectedAt: c.ConnectedAt,
MessageCount: c.MessageCount,
UserAgent: c.UserAgent,
IPAddress: c.IPAddress,
Username: c.Username,
ClientID: c.ClientID,
Geo: geoip.GetGeoFromIP(c.IPAddress),
}
}