diff --git a/client.go b/client.go index 2025e5de2..e82548f70 100644 --- a/client.go +++ b/client.go @@ -65,7 +65,6 @@ func (c *Client) Listen() { // Listen write request via chanel func (c *Client) listenWrite() { - log.Println("Listening write to client") for { select { @@ -85,7 +84,6 @@ func (c *Client) listenWrite() { // Listen read request via chanel func (c *Client) listenRead() { - log.Println("Listening read from client") for { select { diff --git a/config.go b/config.go new file mode 100644 index 000000000..bb1f22f38 --- /dev/null +++ b/config.go @@ -0,0 +1,67 @@ +package main + +import ( + "fmt" + "io/ioutil" + "log" + + "gopkg.in/yaml.v2" +) + +// Config struct +type Config struct { + IPFS IPFS `yaml:"ipfs"` + PublicHLSPath string `yaml:"publicHLSPath"` + PrivateHLSPath string `yaml:"privateHLSPath"` + VideoSettings VideoSettings `yaml:"videoSettings"` + Files Files `yaml:"files"` + FFMpegPath string `yaml:"ffmpegPath"` + WebServerPort int `yaml:"webServerPort"` +} + +type VideoSettings struct { + ResolutionWidth int `yaml:"resolutionWidth"` + ChunkLengthInSeconds int `yaml:"chunkLengthInSeconds"` +} + +// MaxNumberOnDisk must be at least as large as MaxNumberInPlaylist +type Files struct { + MaxNumberInPlaylist int `yaml:"maxNumberInPlaylist"` + MaxNumberOnDisk int `yaml:"maxNumberOnDisk"` +} + +type IPFS struct { + Enabled bool `yaml:"enabled"` + Gateway string `yaml:"gateway"` +} + +func getConfig() Config { + filePath := "config/config.yaml" + + if !fileExists(filePath) { + log.Fatal("ERROR: valid config/config.yaml is required") + } + + yamlFile, err := ioutil.ReadFile(filePath) + + var config Config + err = yaml.Unmarshal(yamlFile, &config) + if err != nil { + panic(err) + } + return config +} + +func checkConfig(config Config) { + if !fileExists(config.PrivateHLSPath) { + panic(fmt.Sprintf("%s does not exist.", config.PrivateHLSPath)) + } + + if !fileExists(config.PublicHLSPath) { + panic(fmt.Sprintf("%s does not exist.", config.PublicHLSPath)) + } + + if !fileExists(config.FFMpegPath) { + panic(fmt.Sprintf("ffmpeg does not exist at %s.", config.FFMpegPath)) + } +} diff --git a/config/config.yaml b/config/config.yaml new file mode 100644 index 000000000..78aa3567c --- /dev/null +++ b/config/config.yaml @@ -0,0 +1,16 @@ +publicHLSPath: webroot/hls +privateHLSPath: hls +ffmpegPath: /usr/local/bin/ffmpeg +webServerPort: 8080 + +videoSettings: + resolutionWidth: 900 + chunkLengthInSeconds: 4 + +files: + maxNumberInPlaylist: 30 + maxNumberOnDisk: 60 + +ipfs: + enabled: true + gateway: https://ipfs.io \ No newline at end of file diff --git a/ffmpeg.go b/ffmpeg.go index 4cbf2ce0c..da73b7700 100644 --- a/ffmpeg.go +++ b/ffmpeg.go @@ -5,19 +5,26 @@ import ( "log" "os" "os/exec" + "path" + "strconv" "strings" ) -func startFfmpeg() { - outputDir := "webroot" - chunkLength := "4" +func startFfmpeg(configuration Config) { + var outputDir = configuration.PublicHLSPath + var hlsPlaylistName = path.Join(configuration.PublicHLSPath, "stream.m3u8") - log.Printf("Starting transcoder with segments saving to %s.", outputDir) + if configuration.IPFS.Enabled { + outputDir = configuration.PrivateHLSPath + hlsPlaylistName = path.Join(outputDir, "temp.m3u8") + } - // ffmpegCmd := "cat streampipe.flv | ffmpeg -hide_banner -i pipe: -vf scale=900:-2 -g 48 -keyint_min 48 -preset ultrafast -f hls -hls_list_size 30 -hls_time 10 -strftime 1 -use_localtime 1 -hls_segment_filename 'hls/stream-%Y%m%d-%s.ts' -hls_flags delete_segments -segment_wrap 100 hls/temp.m3u8" + log.Printf("Starting transcoder saving to /%s.", outputDir) - ffmpegCmd := "cat streampipe.flv | ffmpeg -hide_banner -i pipe: -vf scale=900:-2 -g 48 -keyint_min 48 -preset ultrafast -f hls -hls_list_size 30 -hls_time " + chunkLength + " -strftime 1 -use_localtime 1 -hls_segment_filename '" + outputDir + "/stream-%Y%m%d-%s.ts' -hls_flags delete_segments -segment_wrap 100 hls/temp.m3u8" - fmt.Println(ffmpegCmd) + ffmpegCmd := "cat streampipe.flv | " + configuration.FFMpegPath + + " -hide_banner -i pipe: -vf scale=" + strconv.Itoa(configuration.VideoSettings.ResolutionWidth) + ":-2 -g 48 -keyint_min 48 -preset ultrafast -f hls -hls_list_size 30 -hls_time " + + strconv.Itoa(configuration.VideoSettings.ChunkLengthInSeconds) + " -strftime 1 -use_localtime 1 -hls_segment_filename '" + + outputDir + "/stream-%Y%m%d-%s.ts' -hls_flags delete_segments -segment_wrap 100 " + hlsPlaylistName _, err := exec.Command("bash", "-c", ffmpegCmd).Output() verifyError(err) @@ -29,9 +36,9 @@ func verifyError(e error) { } } -func generateRemotePlaylist(playlist string, segments map[string]string) string { +func generateRemotePlaylist(playlist string, gateway string, segments map[string]string) string { for local, remote := range segments { - playlist = strings.ReplaceAll(playlist, local, "https://gateway.temporal.cloud"+remote) + playlist = strings.ReplaceAll(playlist, local, gateway+remote) } return playlist } diff --git a/go.sum b/go.sum index 57b50a4a1..68479b44c 100644 --- a/go.sum +++ b/go.sum @@ -1179,6 +1179,7 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWD gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.5 h1:ymVxjfMaHvXD8RqPRmzHHsB3VvucivSkIAvJFDI5O3c= gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= grpc.go4.org v0.0.0-20170609214715-11d0a25b4919/go.mod h1:77eQGdRu53HpSqPFJFmuJdjuHRquDANNeA4x7B8WQ9o= honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/handler.go b/handler.go index d666d1c53..d05e2ab16 100644 --- a/handler.go +++ b/handler.go @@ -64,7 +64,7 @@ func (h *Handler) OnPublish(timestamp uint32, cmd *rtmpmsg.NetStreamPublish) err } h.flvEnc = enc - go startFfmpeg() + go startFfmpeg(configuration) return nil } diff --git a/ipfs.go b/ipfs.go index 36233f6b9..c3f73ce48 100644 --- a/ipfs.go +++ b/ipfs.go @@ -4,11 +4,12 @@ import ( "context" "fmt" "io/ioutil" - "log" "os" "path/filepath" "sync" + log "github.com/sirupsen/logrus" + config "github.com/ipfs/go-ipfs-config" files "github.com/ipfs/go-ipfs-files" icore "github.com/ipfs/interface-go-ipfs-core" @@ -26,12 +27,10 @@ import ( "github.com/ipfs/go-ipfs/repo/fsrepo" ) -var directory = "hls" var directoryHash string var node *core.IpfsNode -//var ctx, _ = context.WithCancel(context.Background()) var ctx = context.Background() func createIPFSDirectory(ipfs *icore.CoreAPI, directoryName string) { @@ -79,9 +78,6 @@ func addFileToDirectory(ipfs *icore.CoreAPI, originalFileHashToModifyPath path.P newDirectoryHash, err := (*ipfs).Object().AddLink(ctx, directoryToAddToPath, filename, originalFileHashToModifyPath) verifyError(err) - - fmt.Printf("New hash: %s\n", newDirectoryHash.String()) - return newDirectoryHash.String() + "/" + filename } @@ -251,7 +247,7 @@ func createIPFSInstance() (*icore.CoreAPI, *core.IpfsNode, error) { } func startIPFSNode(ipfs icore.CoreAPI, node *core.IpfsNode) { //} icore.CoreAPI { - defer fmt.Println("---- IPFS node exited!") + defer log.Println("IPFS node exited") log.Println("IPFS node is running") @@ -265,6 +261,9 @@ func startIPFSNode(ipfs icore.CoreAPI, node *core.IpfsNode) { //} icore.CoreAPI // IPFS Cluster Pinning nodes "/ip4/138.201.67.219/tcp/4001/p2p/QmUd6zHcbkbcs7SMxwLs48qZVX3vpcM8errYS7xEczwRMA", + "/ip4/104.131.131.82/tcp/4001/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ", // mars.i.ipfs.io + "/ip4/104.131.131.82/udp/4001/quic/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ", // mars.i.ipfs.io + // You can add more nodes here, for example, another IPFS node you might have running locally, mine was: // "/ip4/127.0.0.1/tcp/4010/p2p/QmZp2fhDLxjYue2RiUvLwT9MWdnbDxam32qYFnGmxZDh5L", // "/ip4/127.0.0.1/udp/4010/quic/p2p/QmZp2fhDLxjYue2RiUvLwT9MWdnbDxam32qYFnGmxZDh5L", diff --git a/main.go b/main.go index c0c1f4312..6b1877682 100644 --- a/main.go +++ b/main.go @@ -4,6 +4,7 @@ import ( "io" "net" "net/http" + "strconv" icore "github.com/ipfs/interface-go-ipfs-core" "github.com/sirupsen/logrus" @@ -12,21 +13,35 @@ import ( ) var ipfs icore.CoreAPI +var configuration = getConfig() func main() { - resetDirectories() + checkConfig(configuration) + // resetDirectories() + + var hlsDirectoryPath = configuration.PublicHLSPath + + log.Println("Starting up. Please wait...") + + if configuration.IPFS.Enabled { + hlsDirectoryPath = configuration.PrivateHLSPath + enableIPFS() + go monitorVideoContent(hlsDirectoryPath, configuration, &ipfs) + } + + go startChatServer() + + startRTMPService() +} + +func enableIPFS() { + log.Println("Enabling IPFS support...") ipfsInstance, node, _ := createIPFSInstance() ipfs = *ipfsInstance createIPFSDirectory(ipfsInstance, "./hls") - // touch("hls/stream.m3u8") - go startIPFSNode(ipfs, node) - go monitorVideoContent("./hls/", ipfsInstance) - go startChatServer() - - startRTMPService() } func startChatServer() { @@ -39,11 +54,14 @@ func startChatServer() { // static files http.Handle("/", http.FileServer(http.Dir("webroot"))) - log.Fatal(http.ListenAndServe(":8080", nil)) + log.Fatal(http.ListenAndServe(":"+strconv.Itoa(configuration.WebServerPort), nil)) } func startRTMPService() { - tcpAddr, err := net.ResolveTCPAddr("tcp", ":1935") + port := 1935 + log.Printf("RTMP server is listening for incoming stream on port %d.\n", port) + + tcpAddr, err := net.ResolveTCPAddr("tcp", ":"+strconv.Itoa(port)) if err != nil { log.Panicf("Failed: %+v", err) } @@ -74,5 +92,4 @@ func startRTMPService() { if err := srv.Serve(listener); err != nil { log.Panicf("Failed: %+v", err) } - } diff --git a/playlistMonitor.go b/playlistMonitor.go index f709da811..37325e9ea 100644 --- a/playlistMonitor.go +++ b/playlistMonitor.go @@ -1,19 +1,22 @@ package main import ( - "fmt" "io/ioutil" - "log" + "path" "path/filepath" "time" + log "github.com/sirupsen/logrus" + icore "github.com/ipfs/interface-go-ipfs-core" "github.com/radovskyb/watcher" ) var filesToUpload = make(map[string]string) -func monitorVideoContent(path string, ipfs *icore.CoreAPI) { +func monitorVideoContent(pathToMonitor string, configuration Config, ipfs *icore.CoreAPI) { + log.Printf("Using %s for IPFS files...\n", pathToMonitor) + w := watcher.New() go func() { @@ -25,28 +28,30 @@ func monitorVideoContent(path string, ipfs *icore.CoreAPI) { } if filepath.Base(event.Path) == "temp.m3u8" { - for filePath, objectID := range filesToUpload { - if objectID != "" { - continue + if configuration.IPFS.Enabled { + for filePath, objectID := range filesToUpload { + if objectID != "" { + continue + } + + newObjectPath := save(path.Join(configuration.PrivateHLSPath, filePath), ipfs) + filesToUpload[filePath] = newObjectPath } - - newObjectPath := save("hls/"+filePath, ipfs) - fmt.Println(filePath, newObjectPath) - - filesToUpload[filePath] = newObjectPath } + playlistBytes, err := ioutil.ReadFile(event.Path) verifyError(err) playlistString := string(playlistBytes) - if false { - playlistString = generateRemotePlaylist(playlistString, filesToUpload) + if configuration.IPFS.Enabled { + playlistString = generateRemotePlaylist(playlistString, configuration.IPFS.Gateway, filesToUpload) } - writePlaylist(playlistString, "webroot/stream.m3u8") + writePlaylist(playlistString, path.Join(configuration.PublicHLSPath, "/stream.m3u8")) } else if filepath.Ext(event.Path) == ".ts" { - filesToUpload[filepath.Base(event.Path)] = "" - // copy(event.Path, "webroot/"+filepath.Base(event.Path)) + if configuration.IPFS.Enabled { + filesToUpload[filepath.Base(event.Path)] = "" + } } case err := <-w.Error: log.Fatalln(err) @@ -57,7 +62,7 @@ func monitorVideoContent(path string, ipfs *icore.CoreAPI) { }() // Watch this folder for changes. - if err := w.Add(path); err != nil { + if err := w.Add(pathToMonitor); err != nil { log.Fatalln(err) } diff --git a/server.go b/server.go index 2cdc0a6b3..398f7c0b6 100644 --- a/server.go +++ b/server.go @@ -76,9 +76,6 @@ func (s *Server) sendAll(msg *Message) { // Listen and serve. // It serves client connection and broadcast request. func (s *Server) Listen() { - - log.Println("Listening server...") - // websocket handler onConnected := func(ws *websocket.Conn) { defer func() { @@ -93,7 +90,6 @@ func (s *Server) Listen() { client.Listen() } http.Handle(s.pattern, websocket.Handler(onConnected)) - log.Println("Created handler") for { select { diff --git a/utils.go b/utils.go index ca6f53ae4..0de198e2c 100644 --- a/utils.go +++ b/utils.go @@ -44,3 +44,12 @@ func copy(src, dst string) { return } } + +func fileExists(name string) bool { + if _, err := os.Stat(name); err != nil { + if os.IsNotExist(err) { + return false + } + } + return true +} diff --git a/webroot/hls/.gitkeep b/webroot/hls/.gitkeep new file mode 100644 index 000000000..e69de29bb diff --git a/webroot/index.html b/webroot/index.html index 11cf2e558..935ef5595 100644 --- a/webroot/index.html +++ b/webroot/index.html @@ -20,7 +20,7 @@