Add config file. Turn on/off ipfs

This commit is contained in:
Gabe Kangas 2020-06-01 16:53:31 -07:00
parent 9bad847be4
commit ca622c85c7
13 changed files with 166 additions and 51 deletions

View file

@ -65,7 +65,6 @@ func (c *Client) Listen() {
// Listen write request via chanel
func (c *Client) listenWrite() {
log.Println("Listening write to client")
for {
select {
@ -85,7 +84,6 @@ func (c *Client) listenWrite() {
// Listen read request via chanel
func (c *Client) listenRead() {
log.Println("Listening read from client")
for {
select {

67
config.go Normal file
View file

@ -0,0 +1,67 @@
package main
import (
"fmt"
"io/ioutil"
"log"
"gopkg.in/yaml.v2"
)
// Config struct
type Config struct {
IPFS IPFS `yaml:"ipfs"`
PublicHLSPath string `yaml:"publicHLSPath"`
PrivateHLSPath string `yaml:"privateHLSPath"`
VideoSettings VideoSettings `yaml:"videoSettings"`
Files Files `yaml:"files"`
FFMpegPath string `yaml:"ffmpegPath"`
WebServerPort int `yaml:"webServerPort"`
}
type VideoSettings struct {
ResolutionWidth int `yaml:"resolutionWidth"`
ChunkLengthInSeconds int `yaml:"chunkLengthInSeconds"`
}
// MaxNumberOnDisk must be at least as large as MaxNumberInPlaylist
type Files struct {
MaxNumberInPlaylist int `yaml:"maxNumberInPlaylist"`
MaxNumberOnDisk int `yaml:"maxNumberOnDisk"`
}
type IPFS struct {
Enabled bool `yaml:"enabled"`
Gateway string `yaml:"gateway"`
}
func getConfig() Config {
filePath := "config/config.yaml"
if !fileExists(filePath) {
log.Fatal("ERROR: valid config/config.yaml is required")
}
yamlFile, err := ioutil.ReadFile(filePath)
var config Config
err = yaml.Unmarshal(yamlFile, &config)
if err != nil {
panic(err)
}
return config
}
func checkConfig(config Config) {
if !fileExists(config.PrivateHLSPath) {
panic(fmt.Sprintf("%s does not exist.", config.PrivateHLSPath))
}
if !fileExists(config.PublicHLSPath) {
panic(fmt.Sprintf("%s does not exist.", config.PublicHLSPath))
}
if !fileExists(config.FFMpegPath) {
panic(fmt.Sprintf("ffmpeg does not exist at %s.", config.FFMpegPath))
}
}

16
config/config.yaml Normal file
View file

@ -0,0 +1,16 @@
publicHLSPath: webroot/hls
privateHLSPath: hls
ffmpegPath: /usr/local/bin/ffmpeg
webServerPort: 8080
videoSettings:
resolutionWidth: 900
chunkLengthInSeconds: 4
files:
maxNumberInPlaylist: 30
maxNumberOnDisk: 60
ipfs:
enabled: true
gateway: https://ipfs.io

View file

@ -5,19 +5,26 @@ import (
"log"
"os"
"os/exec"
"path"
"strconv"
"strings"
)
func startFfmpeg() {
outputDir := "webroot"
chunkLength := "4"
func startFfmpeg(configuration Config) {
var outputDir = configuration.PublicHLSPath
var hlsPlaylistName = path.Join(configuration.PublicHLSPath, "stream.m3u8")
log.Printf("Starting transcoder with segments saving to %s.", outputDir)
if configuration.IPFS.Enabled {
outputDir = configuration.PrivateHLSPath
hlsPlaylistName = path.Join(outputDir, "temp.m3u8")
}
// ffmpegCmd := "cat streampipe.flv | ffmpeg -hide_banner -i pipe: -vf scale=900:-2 -g 48 -keyint_min 48 -preset ultrafast -f hls -hls_list_size 30 -hls_time 10 -strftime 1 -use_localtime 1 -hls_segment_filename 'hls/stream-%Y%m%d-%s.ts' -hls_flags delete_segments -segment_wrap 100 hls/temp.m3u8"
log.Printf("Starting transcoder saving to /%s.", outputDir)
ffmpegCmd := "cat streampipe.flv | ffmpeg -hide_banner -i pipe: -vf scale=900:-2 -g 48 -keyint_min 48 -preset ultrafast -f hls -hls_list_size 30 -hls_time " + chunkLength + " -strftime 1 -use_localtime 1 -hls_segment_filename '" + outputDir + "/stream-%Y%m%d-%s.ts' -hls_flags delete_segments -segment_wrap 100 hls/temp.m3u8"
fmt.Println(ffmpegCmd)
ffmpegCmd := "cat streampipe.flv | " + configuration.FFMpegPath +
" -hide_banner -i pipe: -vf scale=" + strconv.Itoa(configuration.VideoSettings.ResolutionWidth) + ":-2 -g 48 -keyint_min 48 -preset ultrafast -f hls -hls_list_size 30 -hls_time " +
strconv.Itoa(configuration.VideoSettings.ChunkLengthInSeconds) + " -strftime 1 -use_localtime 1 -hls_segment_filename '" +
outputDir + "/stream-%Y%m%d-%s.ts' -hls_flags delete_segments -segment_wrap 100 " + hlsPlaylistName
_, err := exec.Command("bash", "-c", ffmpegCmd).Output()
verifyError(err)
@ -29,9 +36,9 @@ func verifyError(e error) {
}
}
func generateRemotePlaylist(playlist string, segments map[string]string) string {
func generateRemotePlaylist(playlist string, gateway string, segments map[string]string) string {
for local, remote := range segments {
playlist = strings.ReplaceAll(playlist, local, "https://gateway.temporal.cloud"+remote)
playlist = strings.ReplaceAll(playlist, local, gateway+remote)
}
return playlist
}

1
go.sum
View file

@ -1179,6 +1179,7 @@ gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWD
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.5 h1:ymVxjfMaHvXD8RqPRmzHHsB3VvucivSkIAvJFDI5O3c=
gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
grpc.go4.org v0.0.0-20170609214715-11d0a25b4919/go.mod h1:77eQGdRu53HpSqPFJFmuJdjuHRquDANNeA4x7B8WQ9o=
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

View file

@ -64,7 +64,7 @@ func (h *Handler) OnPublish(timestamp uint32, cmd *rtmpmsg.NetStreamPublish) err
}
h.flvEnc = enc
go startFfmpeg()
go startFfmpeg(configuration)
return nil
}

13
ipfs.go
View file

@ -4,11 +4,12 @@ import (
"context"
"fmt"
"io/ioutil"
"log"
"os"
"path/filepath"
"sync"
log "github.com/sirupsen/logrus"
config "github.com/ipfs/go-ipfs-config"
files "github.com/ipfs/go-ipfs-files"
icore "github.com/ipfs/interface-go-ipfs-core"
@ -26,12 +27,10 @@ import (
"github.com/ipfs/go-ipfs/repo/fsrepo"
)
var directory = "hls"
var directoryHash string
var node *core.IpfsNode
//var ctx, _ = context.WithCancel(context.Background())
var ctx = context.Background()
func createIPFSDirectory(ipfs *icore.CoreAPI, directoryName string) {
@ -79,9 +78,6 @@ func addFileToDirectory(ipfs *icore.CoreAPI, originalFileHashToModifyPath path.P
newDirectoryHash, err := (*ipfs).Object().AddLink(ctx, directoryToAddToPath, filename, originalFileHashToModifyPath)
verifyError(err)
fmt.Printf("New hash: %s\n", newDirectoryHash.String())
return newDirectoryHash.String() + "/" + filename
}
@ -251,7 +247,7 @@ func createIPFSInstance() (*icore.CoreAPI, *core.IpfsNode, error) {
}
func startIPFSNode(ipfs icore.CoreAPI, node *core.IpfsNode) { //} icore.CoreAPI {
defer fmt.Println("---- IPFS node exited!")
defer log.Println("IPFS node exited")
log.Println("IPFS node is running")
@ -265,6 +261,9 @@ func startIPFSNode(ipfs icore.CoreAPI, node *core.IpfsNode) { //} icore.CoreAPI
// IPFS Cluster Pinning nodes
"/ip4/138.201.67.219/tcp/4001/p2p/QmUd6zHcbkbcs7SMxwLs48qZVX3vpcM8errYS7xEczwRMA",
"/ip4/104.131.131.82/tcp/4001/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ", // mars.i.ipfs.io
"/ip4/104.131.131.82/udp/4001/quic/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ", // mars.i.ipfs.io
// You can add more nodes here, for example, another IPFS node you might have running locally, mine was:
// "/ip4/127.0.0.1/tcp/4010/p2p/QmZp2fhDLxjYue2RiUvLwT9MWdnbDxam32qYFnGmxZDh5L",
// "/ip4/127.0.0.1/udp/4010/quic/p2p/QmZp2fhDLxjYue2RiUvLwT9MWdnbDxam32qYFnGmxZDh5L",

37
main.go
View file

@ -4,6 +4,7 @@ import (
"io"
"net"
"net/http"
"strconv"
icore "github.com/ipfs/interface-go-ipfs-core"
"github.com/sirupsen/logrus"
@ -12,21 +13,35 @@ import (
)
var ipfs icore.CoreAPI
var configuration = getConfig()
func main() {
resetDirectories()
checkConfig(configuration)
// resetDirectories()
var hlsDirectoryPath = configuration.PublicHLSPath
log.Println("Starting up. Please wait...")
if configuration.IPFS.Enabled {
hlsDirectoryPath = configuration.PrivateHLSPath
enableIPFS()
go monitorVideoContent(hlsDirectoryPath, configuration, &ipfs)
}
go startChatServer()
startRTMPService()
}
func enableIPFS() {
log.Println("Enabling IPFS support...")
ipfsInstance, node, _ := createIPFSInstance()
ipfs = *ipfsInstance
createIPFSDirectory(ipfsInstance, "./hls")
// touch("hls/stream.m3u8")
go startIPFSNode(ipfs, node)
go monitorVideoContent("./hls/", ipfsInstance)
go startChatServer()
startRTMPService()
}
func startChatServer() {
@ -39,11 +54,14 @@ func startChatServer() {
// static files
http.Handle("/", http.FileServer(http.Dir("webroot")))
log.Fatal(http.ListenAndServe(":8080", nil))
log.Fatal(http.ListenAndServe(":"+strconv.Itoa(configuration.WebServerPort), nil))
}
func startRTMPService() {
tcpAddr, err := net.ResolveTCPAddr("tcp", ":1935")
port := 1935
log.Printf("RTMP server is listening for incoming stream on port %d.\n", port)
tcpAddr, err := net.ResolveTCPAddr("tcp", ":"+strconv.Itoa(port))
if err != nil {
log.Panicf("Failed: %+v", err)
}
@ -74,5 +92,4 @@ func startRTMPService() {
if err := srv.Serve(listener); err != nil {
log.Panicf("Failed: %+v", err)
}
}

View file

@ -1,19 +1,22 @@
package main
import (
"fmt"
"io/ioutil"
"log"
"path"
"path/filepath"
"time"
log "github.com/sirupsen/logrus"
icore "github.com/ipfs/interface-go-ipfs-core"
"github.com/radovskyb/watcher"
)
var filesToUpload = make(map[string]string)
func monitorVideoContent(path string, ipfs *icore.CoreAPI) {
func monitorVideoContent(pathToMonitor string, configuration Config, ipfs *icore.CoreAPI) {
log.Printf("Using %s for IPFS files...\n", pathToMonitor)
w := watcher.New()
go func() {
@ -25,28 +28,30 @@ func monitorVideoContent(path string, ipfs *icore.CoreAPI) {
}
if filepath.Base(event.Path) == "temp.m3u8" {
if configuration.IPFS.Enabled {
for filePath, objectID := range filesToUpload {
if objectID != "" {
continue
}
newObjectPath := save("hls/"+filePath, ipfs)
fmt.Println(filePath, newObjectPath)
newObjectPath := save(path.Join(configuration.PrivateHLSPath, filePath), ipfs)
filesToUpload[filePath] = newObjectPath
}
}
playlistBytes, err := ioutil.ReadFile(event.Path)
verifyError(err)
playlistString := string(playlistBytes)
if false {
playlistString = generateRemotePlaylist(playlistString, filesToUpload)
if configuration.IPFS.Enabled {
playlistString = generateRemotePlaylist(playlistString, configuration.IPFS.Gateway, filesToUpload)
}
writePlaylist(playlistString, "webroot/stream.m3u8")
writePlaylist(playlistString, path.Join(configuration.PublicHLSPath, "/stream.m3u8"))
} else if filepath.Ext(event.Path) == ".ts" {
if configuration.IPFS.Enabled {
filesToUpload[filepath.Base(event.Path)] = ""
// copy(event.Path, "webroot/"+filepath.Base(event.Path))
}
}
case err := <-w.Error:
log.Fatalln(err)
@ -57,7 +62,7 @@ func monitorVideoContent(path string, ipfs *icore.CoreAPI) {
}()
// Watch this folder for changes.
if err := w.Add(path); err != nil {
if err := w.Add(pathToMonitor); err != nil {
log.Fatalln(err)
}

View file

@ -76,9 +76,6 @@ func (s *Server) sendAll(msg *Message) {
// Listen and serve.
// It serves client connection and broadcast request.
func (s *Server) Listen() {
log.Println("Listening server...")
// websocket handler
onConnected := func(ws *websocket.Conn) {
defer func() {
@ -93,7 +90,6 @@ func (s *Server) Listen() {
client.Listen()
}
http.Handle(s.pattern, websocket.Handler(onConnected))
log.Println("Created handler")
for {
select {

View file

@ -44,3 +44,12 @@ func copy(src, dst string) {
return
}
}
func fileExists(name string) bool {
if _, err := os.Stat(name); err != nil {
if os.IsNotExist(err) {
return false
}
}
return true
}

0
webroot/hls/.gitkeep Normal file
View file

View file

@ -20,7 +20,7 @@
<video id="video" controls></video>
<script>
var video = document.getElementById('video');
var videoSrc = 'stream.m3u8';
var videoSrc = 'hls/stream.m3u8';
if (Hls.isSupported()) {
var hls = new Hls();
hls.loadSource(videoSrc);