From 7cc2ce44d37248be3b4c0a7c423176a305748dae Mon Sep 17 00:00:00 2001 From: Gabe Kangas Date: Wed, 14 Jun 2023 17:38:15 -0700 Subject: [PATCH] feat: move rtmp under video --- core/core.go | 2 +- core/streamState.go | 2 +- video/rtmp/broadcaster.go | 34 +++++++ video/rtmp/rtmp.go | 164 +++++++++++++++++++++++++++++++ video/rtmp/utils.go | 96 ++++++++++++++++++ video/rtmp/utils_test.go | 35 +++++++ webserver/handlers/disconnect.go | 2 +- 7 files changed, 332 insertions(+), 3 deletions(-) create mode 100644 video/rtmp/broadcaster.go create mode 100644 video/rtmp/rtmp.go create mode 100644 video/rtmp/utils.go create mode 100644 video/rtmp/utils_test.go diff --git a/core/core.go b/core/core.go index ca02cd8ca..ac7a7586b 100644 --- a/core/core.go +++ b/core/core.go @@ -11,11 +11,11 @@ import ( "github.com/owncast/owncast/config" "github.com/owncast/owncast/core/chat" "github.com/owncast/owncast/core/data" - "github.com/owncast/owncast/core/rtmp" "github.com/owncast/owncast/core/webhooks" "github.com/owncast/owncast/models" "github.com/owncast/owncast/services/notifications" "github.com/owncast/owncast/utils" + "github.com/owncast/owncast/video/rtmp" "github.com/owncast/owncast/video/transcoder" "github.com/owncast/owncast/yp" ) diff --git a/core/streamState.go b/core/streamState.go index 46eca1b3c..1c0768a7e 100644 --- a/core/streamState.go +++ b/core/streamState.go @@ -11,11 +11,11 @@ import ( "github.com/owncast/owncast/config" "github.com/owncast/owncast/core/chat" "github.com/owncast/owncast/core/data" - "github.com/owncast/owncast/core/rtmp" "github.com/owncast/owncast/core/webhooks" "github.com/owncast/owncast/models" "github.com/owncast/owncast/services/notifications" "github.com/owncast/owncast/utils" + "github.com/owncast/owncast/video/rtmp" "github.com/owncast/owncast/video/transcoder" ) diff --git a/video/rtmp/broadcaster.go b/video/rtmp/broadcaster.go new file mode 100644 index 000000000..a2d212926 --- /dev/null +++ b/video/rtmp/broadcaster.go @@ -0,0 +1,34 @@ +package rtmp + +import ( + "time" + + "github.com/nareix/joy5/format/flv/flvio" + "github.com/owncast/owncast/models" + log "github.com/sirupsen/logrus" +) + +func setCurrentBroadcasterInfo(t flvio.Tag, remoteAddr string) { + data, err := getInboundDetailsFromMetadata(t.DebugFields()) + if err != nil { + log.Traceln("Unable to parse inbound broadcaster details:", err) + } + + broadcaster := models.Broadcaster{ + RemoteAddr: remoteAddr, + Time: time.Now(), + StreamDetails: models.InboundStreamDetails{ + Width: data.Width, + Height: data.Height, + VideoBitrate: int(data.VideoBitrate), + VideoCodec: getVideoCodec(data.VideoCodec), + VideoFramerate: data.VideoFramerate, + AudioBitrate: int(data.AudioBitrate), + AudioCodec: getAudioCodec(data.AudioCodec), + Encoder: data.Encoder, + VideoOnly: data.AudioCodec == nil, + }, + } + + _setBroadcaster(broadcaster) +} diff --git a/video/rtmp/rtmp.go b/video/rtmp/rtmp.go new file mode 100644 index 000000000..b9a4f96dd --- /dev/null +++ b/video/rtmp/rtmp.go @@ -0,0 +1,164 @@ +package rtmp + +import ( + "fmt" + "io" + "net" + "time" + + "github.com/nareix/joy5/format/flv" + "github.com/nareix/joy5/format/flv/flvio" + log "github.com/sirupsen/logrus" + + "github.com/nareix/joy5/format/rtmp" + "github.com/owncast/owncast/config" + "github.com/owncast/owncast/core/data" + "github.com/owncast/owncast/models" +) + +var _hasInboundRTMPConnection = false + +var ( + _pipe *io.PipeWriter + _rtmpConnection net.Conn +) + +var ( + _setStreamAsConnected func(*io.PipeReader) + _setBroadcaster func(models.Broadcaster) +) + +// Start starts the rtmp service, listening on specified RTMP port. +func Start(setStreamAsConnected func(*io.PipeReader), setBroadcaster func(models.Broadcaster)) { + _setStreamAsConnected = setStreamAsConnected + _setBroadcaster = setBroadcaster + + port := data.GetRTMPPortNumber() + s := rtmp.NewServer() + var lis net.Listener + var err error + if lis, err = net.Listen("tcp", fmt.Sprintf(":%d", port)); err != nil { + log.Fatal(err) + } + + s.LogEvent = func(c *rtmp.Conn, nc net.Conn, e int) { + es := rtmp.EventString[e] + log.Traceln("RTMP", nc.LocalAddr(), nc.RemoteAddr(), es) + } + + s.HandleConn = HandleConn + + if err != nil { + log.Panicln(err) + } + log.Tracef("RTMP server is listening for incoming stream on port: %d", port) + + for { + nc, err := lis.Accept() + if err != nil { + time.Sleep(time.Second) + continue + } + go s.HandleNetConn(nc) + } +} + +// HandleConn is fired when an inbound RTMP connection takes place. +func HandleConn(c *rtmp.Conn, nc net.Conn) { + c.LogTagEvent = func(isRead bool, t flvio.Tag) { + if t.Type == flvio.TAG_AMF0 { + log.Tracef("%+v\n", t.DebugFields()) + setCurrentBroadcasterInfo(t, nc.RemoteAddr().String()) + } + } + + if _hasInboundRTMPConnection { + log.Errorln("stream already running; can not overtake an existing stream") + _ = nc.Close() + return + } + + accessGranted := false + validStreamingKeys := data.GetStreamKeys() + + // If a stream key override was specified then use that instead. + if config.TemporaryStreamKey != "" { + validStreamingKeys = []models.StreamKey{{Key: config.TemporaryStreamKey}} + } + + for _, key := range validStreamingKeys { + if secretMatch(key.Key, c.URL.Path) { + accessGranted = true + break + } + } + + if !accessGranted { + log.Errorln("invalid streaming key; rejecting incoming stream") + _ = nc.Close() + return + } + + rtmpOut, rtmpIn := io.Pipe() + _pipe = rtmpIn + log.Infoln("Inbound stream connected.") + _setStreamAsConnected(rtmpOut) + + _hasInboundRTMPConnection = true + _rtmpConnection = nc + + w := flv.NewMuxer(rtmpIn) + + for { + if !_hasInboundRTMPConnection { + break + } + + // If we don't get a readable packet in 10 seconds give up and disconnect + if err := _rtmpConnection.SetReadDeadline(time.Now().Add(10 * time.Second)); err != nil { + log.Debugln(err) + } + + pkt, err := c.ReadPacket() + + // Broadcaster disconnected + if err == io.EOF { + handleDisconnect(nc) + return + } + + // Read timeout. Disconnect. + if neterr, ok := err.(net.Error); ok && neterr.Timeout() { + log.Debugln("Timeout reading the inbound stream from the broadcaster. Assuming that they disconnected and ending the stream.") + handleDisconnect(nc) + return + } + + if err := w.WritePacket(pkt); err != nil { + log.Errorln("unable to write rtmp packet", err) + handleDisconnect(nc) + return + } + } +} + +func handleDisconnect(conn net.Conn) { + if !_hasInboundRTMPConnection { + return + } + + log.Infoln("Inbound stream disconnected.") + _ = conn.Close() + _ = _pipe.Close() + _hasInboundRTMPConnection = false +} + +// Disconnect will force disconnect the current inbound RTMP connection. +func Disconnect() { + if _rtmpConnection == nil { + return + } + + log.Traceln("Inbound stream disconnect requested.") + handleDisconnect(_rtmpConnection) +} diff --git a/video/rtmp/utils.go b/video/rtmp/utils.go new file mode 100644 index 000000000..3c0614601 --- /dev/null +++ b/video/rtmp/utils.go @@ -0,0 +1,96 @@ +package rtmp + +import ( + "crypto/subtle" + "encoding/json" + "errors" + "fmt" + "regexp" + "strings" + + "github.com/nareix/joy5/format/flv/flvio" + "github.com/owncast/owncast/models" + log "github.com/sirupsen/logrus" +) + +const unknownString = "Unknown" + +var _getInboundDetailsFromMetadataRE = regexp.MustCompile(`\{(.*?)\}`) + +func getInboundDetailsFromMetadata(metadata []interface{}) (models.RTMPStreamMetadata, error) { + metadataComponentsString := fmt.Sprintf("%+v", metadata) + if !strings.Contains(metadataComponentsString, "onMetaData") { + return models.RTMPStreamMetadata{}, errors.New("Not a onMetaData message") + } + + submatchall := _getInboundDetailsFromMetadataRE.FindAllString(metadataComponentsString, 1) + + if len(submatchall) == 0 { + return models.RTMPStreamMetadata{}, errors.New("unable to parse inbound metadata") + } + + metadataJSONString := submatchall[0] + var details models.RTMPStreamMetadata + err := json.Unmarshal([]byte(metadataJSONString), &details) + return details, err +} + +func getAudioCodec(codec interface{}) string { + if codec == nil { + return "No audio" + } + + var codecID float64 + if assertedCodecID, ok := codec.(float64); ok { + codecID = assertedCodecID + } else { + return codec.(string) + } + + switch codecID { + case flvio.SOUND_MP3: + return "MP3" + case flvio.SOUND_AAC: + return "AAC" + case flvio.SOUND_SPEEX: + return "Speex" + } + + return unknownString +} + +func getVideoCodec(codec interface{}) string { + if codec == nil { + return unknownString + } + + var codecID float64 + if assertedCodecID, ok := codec.(float64); ok { + codecID = assertedCodecID + } else { + return codec.(string) + } + + switch codecID { + case flvio.VIDEO_H264: + return "H.264" + case flvio.VIDEO_H265: + return "H.265" + } + + return unknownString +} + +func secretMatch(configStreamKey string, path string) bool { + prefix := "/live/" + + if !strings.HasPrefix(path, prefix) { + log.Debug("RTMP path does not start with " + prefix) + return false // We need the path to begin with $prefix + } + + streamingKey := path[len(prefix):] // Remove $prefix + + matches := subtle.ConstantTimeCompare([]byte(streamingKey), []byte(configStreamKey)) == 1 + return matches +} diff --git a/video/rtmp/utils_test.go b/video/rtmp/utils_test.go new file mode 100644 index 000000000..233c09b48 --- /dev/null +++ b/video/rtmp/utils_test.go @@ -0,0 +1,35 @@ +package rtmp + +import "testing" + +func Test_secretMatch(t *testing.T) { + tests := []struct { + name string + streamKey string + path string + want bool + }{ + {"positive", "abc", "/live/abc", true}, + {"negative", "abc", "/live/def", false}, + {"positive with numbers", "abc123", "/live/abc123", true}, + {"negative with numbers", "abc123", "/live/def456", false}, + {"positive with url chars", "one/two/three", "/live/one/two/three", true}, + {"negative with url chars", "one/two/three", "/live/four/five/six", false}, + {"check the entire secret", "three", "/live/one/two/three", false}, + {"with /live/ in secret", "one/live/three", "/live/one/live/three", true}, + {"bad path", "anything", "nonsense", false}, + {"missing secret", "abc", "/live/", false}, + {"missing secret and missing last slash", "abc", "/live", false}, + {"streamkey before /live/", "streamkey", "/streamkey/live", false}, + {"missing /live/", "anything", "/something/else", false}, + {"stuff before and after /live/", "after", "/before/live/after", false}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := secretMatch(tt.streamKey, tt.path); got != tt.want { + t.Errorf("secretMatch() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/webserver/handlers/disconnect.go b/webserver/handlers/disconnect.go index c9cf6547c..898c7fcd7 100644 --- a/webserver/handlers/disconnect.go +++ b/webserver/handlers/disconnect.go @@ -6,7 +6,7 @@ import ( "github.com/owncast/owncast/core" "github.com/owncast/owncast/webserver/responses" - "github.com/owncast/owncast/core/rtmp" + "github.com/owncast/owncast/video/rtmp" ) // DisconnectInboundConnection will force-disconnect an inbound stream.