Will now run as a self-contained IPFS node

This commit is contained in:
Gabe Kangas 2020-06-01 12:15:07 -07:00
parent af698063bd
commit bf5d792ac0
6 changed files with 870 additions and 66 deletions

View file

@ -8,7 +8,7 @@ import (
) )
func pipeTest() { func pipeTest() {
ffmpegCmd := "cat streampipe.flv | ffmpeg -hide_banner -i pipe: -preset ultrafast -f hls -hls_list_size 10 -hls_time 10 -strftime 1 -hls_segment_filename 'hls/stream-%Y%m%d-%s.ts' -hls_flags delete_segments -segment_wrap 100 hls/temp.m3u8" ffmpegCmd := "cat streampipe.flv | ffmpeg -hide_banner -i pipe: -preset ultrafast -f hls -hls_list_size 30 -hls_time 10 -strftime 1 -use_localtime 1 -hls_segment_filename 'hls/stream-%Y%m%d-%s.ts' -hls_flags delete_segments -segment_wrap 100 hls/temp.m3u8"
out, err := exec.Command("bash", "-c", ffmpegCmd).Output() out, err := exec.Command("bash", "-c", ffmpegCmd).Output()
verifyError(err) verifyError(err)
@ -30,6 +30,8 @@ func generateRemotePlaylist(playlist string, segments map[string]string) string
func writePlaylist(data string, filePath string) { func writePlaylist(data string, filePath string) {
f, err := os.Create(filePath) f, err := os.Create(filePath)
defer f.Close()
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
return return
@ -37,7 +39,6 @@ func writePlaylist(data string, filePath string) {
_, err = f.WriteString(data) _, err = f.WriteString(data)
if err != nil { if err != nil {
fmt.Println(err) fmt.Println(err)
f.Close()
return return
} }
} }

1
go.mod
View file

@ -5,6 +5,7 @@ go 1.14
require ( require (
github.com/AlekSi/pointer v1.1.0 // indirect github.com/AlekSi/pointer v1.1.0 // indirect
github.com/grafov/m3u8 v0.11.1 // indirect github.com/grafov/m3u8 v0.11.1 // indirect
github.com/ipfs/go-ipfs v0.5.1 // indirect
github.com/ipfs/go-ipfs-api v0.0.3 // indirect github.com/ipfs/go-ipfs-api v0.0.3 // indirect
github.com/ipfs/go-ipfs-http-client v0.0.5 // indirect github.com/ipfs/go-ipfs-http-client v0.0.5 // indirect
github.com/pkg/errors v0.9.1 // indirect github.com/pkg/errors v0.9.1 // indirect

574
go.sum

File diff suppressed because it is too large Load diff

299
ipfs.go
View file

@ -1,73 +1,290 @@
package main package main
import ( import (
"bufio" "context"
"fmt" "fmt"
"io/ioutil"
"log"
"os" "os"
"path/filepath" "path/filepath"
"strings" "sync"
shell "github.com/ipfs/go-ipfs-api" config "github.com/ipfs/go-ipfs-config"
files "github.com/ipfs/go-ipfs-files"
icore "github.com/ipfs/interface-go-ipfs-core"
"github.com/ipfs/interface-go-ipfs-core/options"
"github.com/ipfs/interface-go-ipfs-core/path"
peer "github.com/libp2p/go-libp2p-peer"
peerstore "github.com/libp2p/go-libp2p-peerstore"
ma "github.com/multiformats/go-multiaddr"
"github.com/ipfs/go-ipfs/core"
"github.com/ipfs/go-ipfs/core/coreapi"
"github.com/ipfs/go-ipfs/core/corehttp"
"github.com/ipfs/go-ipfs/core/node/libp2p" // This package is needed so that all the preloaded plugins are loaded automatically
"github.com/ipfs/go-ipfs/plugin/loader"
"github.com/ipfs/go-ipfs/repo/fsrepo"
) )
var directory = "hls" var directory = "hls"
var directoryHash string var directoryHash string
func createIPFSDirectory() { var node *core.IpfsNode
sh := shell.NewShell("localhost:5001")
newlyCreatedDirectoryHash, error := sh.AddDir(directory) //var ctx, _ = context.WithCancel(context.Background())
verifyError((error)) var ctx = context.Background()
directoryHash = newlyCreatedDirectoryHash
func createIPFSDirectory(ipfs *icore.CoreAPI, directoryName string) {
directory, err := getUnixfsNode(directoryName)
verifyError(err)
defer directory.Close()
newlyCreatedDirectoryHash, err := (*ipfs).Unixfs().Add(ctx, directory)
verifyError(err)
directoryHash = newlyCreatedDirectoryHash.String()
fmt.Println("Created directory hash " + directoryHash)
} }
func save(filePath string) string { func save(filePath string, ipfs *icore.CoreAPI) string {
file, err := os.Open(filePath) someFile, err := getUnixfsNode(filePath)
payload := bufio.NewReader(file)
// Where your local node is running on localhost:5001 defer someFile.Close()
sh := shell.NewShell("localhost:5001")
cid, err := sh.Add(payload, shell.Pin(true))
if err != nil { if err != nil {
fmt.Fprintf(os.Stderr, "error: %s", err) panic(fmt.Errorf("Could not get File: %s", err))
os.Exit(1)
} }
filename := filepath.Base(filePath) opts := []options.UnixfsAddOption{
newDirectoryHash := addFileToDirectory(directoryHash, cid, filename) options.Unixfs.Pin(false),
newFilePath := fmt.Sprintf("/ipfs/%s/%s", newDirectoryHash, filename) // options.Unixfs.CidVersion(1),
// fmt.Printf("added %s -> %s\n", filePath, newFilePath) // options.Unixfs.RawLeaves(false),
// options.Unixfs.Nocopy(false),
return newFilePath
} }
func saveData(stringData string, filename string) string { cidFile, err := (*ipfs).Unixfs().Add(ctx, someFile, opts...)
payload := strings.NewReader(stringData)
// Where your local node is running on localhost:5001
sh := shell.NewShell("localhost:5001")
cid, err := sh.Add(payload)
if err != nil { if err != nil {
fmt.Fprintf(os.Stderr, "error: %s", err) panic(fmt.Errorf("Could not add File: %s", err))
os.Exit(1)
} }
newDirectoryHash := addFileToDirectory(directoryHash, cid, filename) // fmt.Printf("Added file to IPFS with CID %s\n", cidFile.String())
newFilePath := fmt.Sprintf("/ipfs/%s/%s", newDirectoryHash, filename)
// fmt.Printf("added %s -> %s\n", filename, newFilePath)
return newFilePath newHash := addFileToDirectory(ipfs, cidFile, directoryHash, filepath.Base(filePath))
return newHash
} }
func addFileToDirectory(directoryHash string, fileHash string, filename string) string { func addFileToDirectory(ipfs *icore.CoreAPI, originalFileHashToModifyPath path.Path, directoryToAddTo string, filename string) string {
// Where your local node is running on localhost:5001 directoryToAddToPath := path.New(directoryToAddTo)
sh := shell.NewShell("localhost:5001") newDirectoryHash, err := (*ipfs).Object().AddLink(ctx, directoryToAddToPath, filename, originalFileHashToModifyPath)
newDirectoryHash, err := sh.Patch("/ipfs/"+directoryHash, "add-link", filename, fileHash) verifyError(err)
fmt.Printf("New hash: %s\n", newDirectoryHash.String())
return newDirectoryHash.String() + "/" + filename
}
func setupPlugins(externalPluginsPath string) error {
// Load any external plugins if available on externalPluginsPath
plugins, err := loader.NewPluginLoader(filepath.Join(externalPluginsPath, "plugins"))
if err != nil { if err != nil {
fmt.Fprintf(os.Stderr, "error: %s", err) return fmt.Errorf("error loading plugins: %s", err)
os.Exit(1) }
// Load preloaded and external plugins
if err := plugins.Initialize(); err != nil {
return fmt.Errorf("error initializing plugins: %s", err)
}
if err := plugins.Inject(); err != nil {
return fmt.Errorf("error initializing plugins: %s", err)
}
return nil
}
// Creates an IPFS node and returns its coreAPI
func createNode(ctx context.Context, repoPath string) (icore.CoreAPI, *core.IpfsNode, error) {
fmt.Println("CreateNode...")
// Open the repo
repo, err := fsrepo.Open(repoPath)
verifyError(err)
if err != nil {
return nil, nil, err
}
// Construct the node
nodeOptions := &core.BuildCfg{
Online: true,
Routing: libp2p.DHTOption, // This option sets the node to be a full DHT node (both fetching and storing DHT Records)
// Routing: libp2p.DHTClientOption, // This option sets the node to be a client DHT node (only fetching records)
Repo: repo,
}
node, err := core.NewNode(ctx, nodeOptions)
node.IsDaemon = true
if err != nil {
return nil, nil, err
}
// Attach the Core API to the constructed node
coreAPI, err := coreapi.NewCoreAPI(node)
if err != nil {
return nil, nil, err
}
return coreAPI, node, nil
}
func createTempRepo(ctx context.Context) (string, error) {
fmt.Println("createTempRepo...")
repoPath, err := ioutil.TempDir("", "ipfs-shell")
fmt.Println(repoPath)
if err != nil {
return "", fmt.Errorf("failed to get temp dir: %s", err)
}
// Create a config with default options and a 2048 bit key
cfg, err := config.Init(ioutil.Discard, 2048)
if err != nil {
return "", err
}
// Create the repo with the config
err = fsrepo.Init(repoPath, cfg)
if err != nil {
return "", fmt.Errorf("failed to init ephemeral node: %s", err)
}
return repoPath, nil
}
// Spawns a node to be used just for this run (i.e. creates a tmp repo)
func spawnEphemeral(ctx context.Context) (icore.CoreAPI, *core.IpfsNode, error) {
if err := setupPlugins(""); err != nil {
return nil, nil, err
}
// Create a Temporary Repo
repoPath, err := createTempRepo(ctx)
if err != nil {
return nil, nil, fmt.Errorf("failed to create temp repo: %s", err)
}
// Spawning an ephemeral IPFS node
coreAPI, node, err := createNode(ctx, repoPath)
return coreAPI, node, err
}
// Spawns a node on the default repo location, if the repo exists
func spawnDefault(ctx context.Context) (icore.CoreAPI, *core.IpfsNode, error) {
defaultPath, err := config.PathRoot()
fmt.Println(defaultPath)
if err != nil {
// shouldn't be possible
return nil, nil, err
}
if err := setupPlugins(defaultPath); err != nil {
return nil, nil, err
}
coreAPI, node, err := createNode(ctx, defaultPath)
return coreAPI, node, err
}
func connectToPeers(ctx context.Context, ipfs icore.CoreAPI, peers []string) error {
var wg sync.WaitGroup
peerInfos := make(map[peer.ID]*peerstore.PeerInfo, len(peers))
for _, addrStr := range peers {
addr, err := ma.NewMultiaddr(addrStr)
if err != nil {
return err
}
pii, err := peerstore.InfoFromP2pAddr(addr)
if err != nil {
return err
}
pi, ok := peerInfos[pii.ID]
if !ok {
pi = &peerstore.PeerInfo{ID: pii.ID}
peerInfos[pi.ID] = pi
}
pi.Addrs = append(pi.Addrs, pii.Addrs...)
}
wg.Add(len(peerInfos))
for _, peerInfo := range peerInfos {
go func(peerInfo *peerstore.PeerInfo) {
defer wg.Done()
err := ipfs.Swarm().Connect(ctx, *peerInfo)
if err != nil {
log.Printf("failed to connect to %s: %s", peerInfo.ID, err)
}
}(peerInfo)
}
wg.Wait()
return nil
}
func getUnixfsNode(path string) (files.Node, error) {
st, err := os.Stat(path)
if err != nil {
return nil, err
}
f, err := files.NewSerialFile(path, false, st)
if err != nil {
return nil, err
}
return f, nil
}
func createIPFSInstance() (*icore.CoreAPI, *core.IpfsNode, error) {
// Spawn a node using a temporary path, creating a temporary repo for the run
api, node, error := spawnEphemeral(ctx)
// api, node, error := spawnDefault(ctx)
return &api, node, error
}
func startIPFSNode(ipfs icore.CoreAPI, node *core.IpfsNode) { //} icore.CoreAPI {
defer fmt.Println("---- IPFS node exited!")
fmt.Println("IPFS node is running")
// bootstrapNodes := []string{
// // IPFS Bootstrapper nodes.
// "/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN",
// "/dnsaddr/bootstrap.libp2p.io/p2p/QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa",
// "/dnsaddr/bootstrap.libp2p.io/p2p/QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb",
// "/dnsaddr/bootstrap.libp2p.io/p2p/QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt",
// // IPFS Cluster Pinning nodes
// "/ip4/138.201.67.219/tcp/4001/p2p/QmUd6zHcbkbcs7SMxwLs48qZVX3vpcM8errYS7xEczwRMA",
// // You can add more nodes here, for example, another IPFS node you might have running locally, mine was:
// // "/ip4/127.0.0.1/tcp/4010/p2p/QmZp2fhDLxjYue2RiUvLwT9MWdnbDxam32qYFnGmxZDh5L",
// // "/ip4/127.0.0.1/udp/4010/quic/p2p/QmZp2fhDLxjYue2RiUvLwT9MWdnbDxam32qYFnGmxZDh5L",
// }
// connectToPeers(ctx, ipfs, bootstrapNodes)
addr := "/ip4/127.0.0.1/tcp/5001"
var opts = []corehttp.ServeOption{
corehttp.GatewayOption(true, "/ipfs", "/ipns"),
}
if err := corehttp.ListenAndServe(node, addr, opts...); err != nil {
return
} }
return newDirectoryHash
} }

45
main.go
View file

@ -5,19 +5,43 @@ import (
"net" "net"
"net/http" "net/http"
icore "github.com/ipfs/interface-go-ipfs-core"
log "github.com/sirupsen/logrus" log "github.com/sirupsen/logrus"
"github.com/yutopp/go-rtmp" "github.com/yutopp/go-rtmp"
) )
var ipfs icore.CoreAPI
func main() { func main() {
createIPFSDirectory()
resetDirectories() resetDirectories()
touch("hls/stream.m3u8")
go monitorVideoContent("./hls/") ipfsInstance, node, _ := createIPFSInstance()
ipfs = *ipfsInstance
createIPFSDirectory(ipfsInstance, "./hls")
// touch("hls/stream.m3u8")
go startIPFSNode(ipfs, node)
go monitorVideoContent("./hls/", ipfsInstance)
go startChatServer() go startChatServer()
startRTMPService()
}
func startChatServer() {
// log.SetFlags(log.Lshortfile)
// websocket server
server := NewServer("/entry")
go server.Listen()
// static files
http.Handle("/", http.FileServer(http.Dir("webroot")))
log.Fatal(http.ListenAndServe(":8080", nil))
}
func startRTMPService() {
tcpAddr, err := net.ResolveTCPAddr("tcp", ":1935") tcpAddr, err := net.ResolveTCPAddr("tcp", ":1935")
if err != nil { if err != nil {
log.Panicf("Failed: %+v", err) log.Panicf("Failed: %+v", err)
@ -51,16 +75,3 @@ func main() {
} }
} }
func startChatServer() {
// log.SetFlags(log.Lshortfile)
// websocket server
server := NewServer("/entry")
go server.Listen()
// static files
http.Handle("/", http.FileServer(http.Dir("webroot")))
log.Fatal(http.ListenAndServe(":8080", nil))
}

View file

@ -7,12 +7,13 @@ import (
"path/filepath" "path/filepath"
"time" "time"
icore "github.com/ipfs/interface-go-ipfs-core"
"github.com/radovskyb/watcher" "github.com/radovskyb/watcher"
) )
var filesToUpload = make(map[string]string) var filesToUpload = make(map[string]string)
func monitorVideoContent(path string) { func monitorVideoContent(path string, ipfs *icore.CoreAPI) {
w := watcher.New() w := watcher.New()
go func() { go func() {
@ -23,14 +24,13 @@ func monitorVideoContent(path string) {
continue continue
} }
if filepath.Base(event.Path) == "temp.m3u8" { if filepath.Base(event.Path) == "temp.m3u8" {
// fmt.Printf("Upload playlist + %d files\n", len(filesToUpload))
for filePath, objectID := range filesToUpload { for filePath, objectID := range filesToUpload {
if objectID != "" { if objectID != "" {
continue continue
} }
newObjectPath := save("hls/" + filePath)
newObjectPath := save("hls/"+filePath, ipfs)
fmt.Println(filePath, newObjectPath) fmt.Println(filePath, newObjectPath)
filesToUpload[filePath] = newObjectPath filesToUpload[filePath] = newObjectPath
@ -58,7 +58,7 @@ func monitorVideoContent(path string) {
log.Fatalln(err) log.Fatalln(err)
} }
if err := w.Start(time.Millisecond * 500); err != nil { if err := w.Start(time.Millisecond * 100); err != nil {
log.Fatalln(err) log.Fatalln(err)
} }
} }