diff --git a/ffmpeg.go b/ffmpeg.go index eca514e36..4cbf2ce0c 100644 --- a/ffmpeg.go +++ b/ffmpeg.go @@ -2,17 +2,25 @@ package main import ( "fmt" + "log" "os" "os/exec" "strings" ) -func pipeTest() { - ffmpegCmd := "cat streampipe.flv | ffmpeg -hide_banner -i pipe: -preset ultrafast -f hls -hls_list_size 30 -hls_time 10 -strftime 1 -use_localtime 1 -hls_segment_filename 'hls/stream-%Y%m%d-%s.ts' -hls_flags delete_segments -segment_wrap 100 hls/temp.m3u8" +func startFfmpeg() { + outputDir := "webroot" + chunkLength := "4" - out, err := exec.Command("bash", "-c", ffmpegCmd).Output() + log.Printf("Starting transcoder with segments saving to %s.", outputDir) + + // ffmpegCmd := "cat streampipe.flv | ffmpeg -hide_banner -i pipe: -vf scale=900:-2 -g 48 -keyint_min 48 -preset ultrafast -f hls -hls_list_size 30 -hls_time 10 -strftime 1 -use_localtime 1 -hls_segment_filename 'hls/stream-%Y%m%d-%s.ts' -hls_flags delete_segments -segment_wrap 100 hls/temp.m3u8" + + ffmpegCmd := "cat streampipe.flv | ffmpeg -hide_banner -i pipe: -vf scale=900:-2 -g 48 -keyint_min 48 -preset ultrafast -f hls -hls_list_size 30 -hls_time " + chunkLength + " -strftime 1 -use_localtime 1 -hls_segment_filename '" + outputDir + "/stream-%Y%m%d-%s.ts' -hls_flags delete_segments -segment_wrap 100 hls/temp.m3u8" + fmt.Println(ffmpegCmd) + + _, err := exec.Command("bash", "-c", ffmpegCmd).Output() verifyError(err) - fmt.Println(string(out)) } func verifyError(e error) { diff --git a/handler.go b/handler.go index f53b7c0de..d666d1c53 100644 --- a/handler.go +++ b/handler.go @@ -29,17 +29,17 @@ func (h *Handler) OnServe(conn *rtmp.Conn) { } func (h *Handler) OnConnect(timestamp uint32, cmd *rtmpmsg.NetConnectionConnect) error { - log.Printf("OnConnect: %#v", cmd) + // log.Printf("OnConnect: %#v", cmd) return nil } func (h *Handler) OnCreateStream(timestamp uint32, cmd *rtmpmsg.NetConnectionCreateStream) error { - log.Printf("OnCreateStream: %#v", cmd) + // log.Printf("OnCreateStream: %#v", cmd) return nil } func (h *Handler) OnPublish(timestamp uint32, cmd *rtmpmsg.NetStreamPublish) error { - log.Printf("OnPublish: %#v", cmd) + // log.Printf("OnPublish: %#v", cmd) // (example) Reject a connection when PublishingName is empty if cmd.PublishingName == "" { @@ -50,7 +50,6 @@ func (h *Handler) OnPublish(timestamp uint32, cmd *rtmpmsg.NetStreamPublish) err p := filepath.Join( filepath.Clean(filepath.Join("./", fmt.Sprintf("%s.flv", "streampipe"))), ) - fmt.Println(p) syscall.Mkfifo(p, 0666) f, err := os.OpenFile(p, os.O_RDWR, os.ModeNamedPipe) if err != nil { @@ -65,13 +64,11 @@ func (h *Handler) OnPublish(timestamp uint32, cmd *rtmpmsg.NetStreamPublish) err } h.flvEnc = enc - go pipeTest() + go startFfmpeg() return nil } -var counter = 0 - func (h *Handler) OnSetDataFrame(timestamp uint32, data *rtmpmsg.NetStreamSetDataFrame) error { r := bytes.NewReader(data.Payload) @@ -81,7 +78,7 @@ func (h *Handler) OnSetDataFrame(timestamp uint32, data *rtmpmsg.NetStreamSetDat return nil // ignore } - log.Printf("SetDataFrame: Script = %#v", script) + // log.Printf("SetDataFrame: Script = %#v", script) if err := h.flvEnc.Encode(&flvtag.FlvTag{ TagType: flvtag.TagTypeScriptData, @@ -91,9 +88,6 @@ func (h *Handler) OnSetDataFrame(timestamp uint32, data *rtmpmsg.NetStreamSetDat log.Printf("Failed to write script data: Err = %+v", err) } - counter++ - fmt.Println("-------------> " + string(counter)) - return nil } diff --git a/ipfs.go b/ipfs.go index 03b5da4e3..36233f6b9 100644 --- a/ipfs.go +++ b/ipfs.go @@ -43,7 +43,6 @@ func createIPFSDirectory(ipfs *icore.CoreAPI, directoryName string) { verifyError(err) directoryHash = newlyCreatedDirectoryHash.String() - fmt.Println("Created directory hash " + directoryHash) } func save(filePath string, ipfs *icore.CoreAPI) string { @@ -107,8 +106,6 @@ func setupPlugins(externalPluginsPath string) error { // Creates an IPFS node and returns its coreAPI func createNode(ctx context.Context, repoPath string) (icore.CoreAPI, *core.IpfsNode, error) { - fmt.Println("CreateNode...") - // Open the repo repo, err := fsrepo.Open(repoPath) verifyError(err) @@ -142,10 +139,7 @@ func createNode(ctx context.Context, repoPath string) (icore.CoreAPI, *core.Ipfs } func createTempRepo(ctx context.Context) (string, error) { - fmt.Println("createTempRepo...") - repoPath, err := ioutil.TempDir("", "ipfs-shell") - fmt.Println(repoPath) if err != nil { return "", fmt.Errorf("failed to get temp dir: %s", err) } @@ -259,7 +253,7 @@ func createIPFSInstance() (*icore.CoreAPI, *core.IpfsNode, error) { func startIPFSNode(ipfs icore.CoreAPI, node *core.IpfsNode) { //} icore.CoreAPI { defer fmt.Println("---- IPFS node exited!") - fmt.Println("IPFS node is running") + log.Println("IPFS node is running") bootstrapNodes := []string{ // IPFS Bootstrapper nodes. diff --git a/main.go b/main.go index 41e58faae..c0c1f4312 100644 --- a/main.go +++ b/main.go @@ -6,6 +6,7 @@ import ( "net/http" icore "github.com/ipfs/interface-go-ipfs-core" + "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus" "github.com/yutopp/go-rtmp" ) @@ -55,7 +56,7 @@ func startRTMPService() { srv := rtmp.NewServer(&rtmp.ServerConfig{ OnConnect: func(conn net.Conn) (io.ReadWriteCloser, *rtmp.ConnConfig) { l := log.StandardLogger() - //l.SetLevel(logrus.DebugLevel) + l.SetLevel(logrus.WarnLevel) h := &Handler{} diff --git a/playlistMonitor.go b/playlistMonitor.go index a601ca490..f709da811 100644 --- a/playlistMonitor.go +++ b/playlistMonitor.go @@ -35,12 +35,15 @@ func monitorVideoContent(path string, ipfs *icore.CoreAPI) { filesToUpload[filePath] = newObjectPath } - playlistBytes, err := ioutil.ReadFile(event.Path) verifyError(err) playlistString := string(playlistBytes) - remotePlaylistString := generateRemotePlaylist(playlistString, filesToUpload) - writePlaylist(remotePlaylistString, "webroot/stream.m3u8") + + if false { + playlistString = generateRemotePlaylist(playlistString, filesToUpload) + } + writePlaylist(playlistString, "webroot/stream.m3u8") + } else if filepath.Ext(event.Path) == ".ts" { filesToUpload[filepath.Base(event.Path)] = "" // copy(event.Path, "webroot/"+filepath.Base(event.Path))