mirror of
https://github.com/owncast/owncast.git
synced 2024-11-21 20:28:15 +03:00
Refactor ipfs storage behind a standard interface
This commit is contained in:
parent
a1e9271d3b
commit
16047d884d
6 changed files with 356 additions and 74 deletions
7
chunkStorage.go
Normal file
7
chunkStorage.go
Normal file
|
@ -0,0 +1,7 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
type ChunkStorage interface {
|
||||||
|
Setup(config Config)
|
||||||
|
Save(filePath string) string
|
||||||
|
GenerateRemotePlaylist(playlist string, segments map[string]string) string
|
||||||
|
}
|
|
@ -7,7 +7,6 @@ import (
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"path"
|
"path"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func startFfmpeg(configuration Config) {
|
func startFfmpeg(configuration Config) {
|
||||||
|
@ -37,13 +36,6 @@ func verifyError(e error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func generateRemotePlaylist(playlist string, gateway string, segments map[string]string) string {
|
|
||||||
for local, remote := range segments {
|
|
||||||
playlist = strings.ReplaceAll(playlist, local, gateway+remote)
|
|
||||||
}
|
|
||||||
return playlist
|
|
||||||
}
|
|
||||||
|
|
||||||
func writePlaylist(data string, filePath string) {
|
func writePlaylist(data string, filePath string) {
|
||||||
f, err := os.Create(filePath)
|
f, err := os.Create(filePath)
|
||||||
defer f.Close()
|
defer f.Close()
|
||||||
|
|
100
ipfs.go
100
ipfs.go
|
@ -6,6 +6,7 @@ import (
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
@ -22,31 +23,34 @@ import (
|
||||||
"github.com/ipfs/go-ipfs/core"
|
"github.com/ipfs/go-ipfs/core"
|
||||||
"github.com/ipfs/go-ipfs/core/coreapi"
|
"github.com/ipfs/go-ipfs/core/coreapi"
|
||||||
"github.com/ipfs/go-ipfs/core/corehttp"
|
"github.com/ipfs/go-ipfs/core/corehttp"
|
||||||
"github.com/ipfs/go-ipfs/core/node/libp2p" // This package is needed so that all the preloaded plugins are loaded automatically
|
"github.com/ipfs/go-ipfs/core/node/libp2p"
|
||||||
"github.com/ipfs/go-ipfs/plugin/loader"
|
"github.com/ipfs/go-ipfs/plugin/loader"
|
||||||
"github.com/ipfs/go-ipfs/repo/fsrepo"
|
"github.com/ipfs/go-ipfs/repo/fsrepo"
|
||||||
)
|
)
|
||||||
|
|
||||||
var directoryHash string
|
type IPFSStorage struct {
|
||||||
|
ipfs *icore.CoreAPI
|
||||||
|
node *core.IpfsNode
|
||||||
|
|
||||||
var node *core.IpfsNode
|
ctx context.Context
|
||||||
|
directoryHash string
|
||||||
var ctx = context.Background()
|
gateway string
|
||||||
|
|
||||||
func createIPFSDirectory(ipfs *icore.CoreAPI, directoryName string) {
|
|
||||||
directory, err := getUnixfsNode(directoryName)
|
|
||||||
verifyError(err)
|
|
||||||
defer directory.Close()
|
|
||||||
|
|
||||||
newlyCreatedDirectoryHash, err := (*ipfs).Unixfs().Add(ctx, directory)
|
|
||||||
verifyError(err)
|
|
||||||
|
|
||||||
directoryHash = newlyCreatedDirectoryHash.String()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func save(filePath string, ipfs *icore.CoreAPI) string {
|
func (s *IPFSStorage) Setup(config Config) {
|
||||||
someFile, err := getUnixfsNode(filePath)
|
s.gateway = config.IPFS.Gateway
|
||||||
|
|
||||||
|
s.ctx = context.Background()
|
||||||
|
|
||||||
|
ipfsInstance, node, _ := s.createIPFSInstance()
|
||||||
|
s.ipfs = ipfsInstance
|
||||||
|
s.node = node
|
||||||
|
|
||||||
|
s.createIPFSDirectory("./hls")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *IPFSStorage) Save(filePath string) string {
|
||||||
|
someFile, err := getUnixfsNode(filePath)
|
||||||
defer someFile.Close()
|
defer someFile.Close()
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -60,7 +64,7 @@ func save(filePath string, ipfs *icore.CoreAPI) string {
|
||||||
// options.Unixfs.Nocopy(false),
|
// options.Unixfs.Nocopy(false),
|
||||||
}
|
}
|
||||||
|
|
||||||
cidFile, err := (*ipfs).Unixfs().Add(ctx, someFile, opts...)
|
cidFile, err := (*s.ipfs).Unixfs().Add(s.ctx, someFile, opts...)
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
panic(fmt.Errorf("Could not add File: %s", err))
|
panic(fmt.Errorf("Could not add File: %s", err))
|
||||||
|
@ -68,17 +72,16 @@ func save(filePath string, ipfs *icore.CoreAPI) string {
|
||||||
|
|
||||||
// fmt.Printf("Added file to IPFS with CID %s\n", cidFile.String())
|
// fmt.Printf("Added file to IPFS with CID %s\n", cidFile.String())
|
||||||
|
|
||||||
newHash := addFileToDirectory(ipfs, cidFile, directoryHash, filepath.Base(filePath))
|
newHash := s.addFileToDirectory(cidFile, filepath.Base(filePath))
|
||||||
|
|
||||||
return newHash
|
return newHash
|
||||||
}
|
}
|
||||||
|
|
||||||
func addFileToDirectory(ipfs *icore.CoreAPI, originalFileHashToModifyPath path.Path, directoryToAddTo string, filename string) string {
|
func (s *IPFSStorage) GenerateRemotePlaylist(playlist string, segments map[string]string) string {
|
||||||
directoryToAddToPath := path.New(directoryToAddTo)
|
for local, remote := range segments {
|
||||||
newDirectoryHash, err := (*ipfs).Object().AddLink(ctx, directoryToAddToPath, filename, originalFileHashToModifyPath)
|
playlist = strings.ReplaceAll(playlist, local, s.gateway+remote)
|
||||||
|
}
|
||||||
verifyError(err)
|
return playlist
|
||||||
return newDirectoryHash.String() + "/" + filename
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func setupPlugins(externalPluginsPath string) error {
|
func setupPlugins(externalPluginsPath string) error {
|
||||||
|
@ -173,23 +176,6 @@ func spawnEphemeral(ctx context.Context) (icore.CoreAPI, *core.IpfsNode, error)
|
||||||
return coreAPI, node, err
|
return coreAPI, node, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Spawns a node on the default repo location, if the repo exists
|
|
||||||
func spawnDefault(ctx context.Context) (icore.CoreAPI, *core.IpfsNode, error) {
|
|
||||||
defaultPath, err := config.PathRoot()
|
|
||||||
fmt.Println(defaultPath)
|
|
||||||
if err != nil {
|
|
||||||
// shouldn't be possible
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := setupPlugins(defaultPath); err != nil {
|
|
||||||
return nil, nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
coreAPI, node, err := createNode(ctx, defaultPath)
|
|
||||||
return coreAPI, node, err
|
|
||||||
}
|
|
||||||
|
|
||||||
func connectToPeers(ctx context.Context, ipfs icore.CoreAPI, peers []string) error {
|
func connectToPeers(ctx context.Context, ipfs icore.CoreAPI, peers []string) error {
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
peerInfos := make(map[peer.ID]*peerstore.PeerInfo, len(peers))
|
peerInfos := make(map[peer.ID]*peerstore.PeerInfo, len(peers))
|
||||||
|
@ -239,14 +225,23 @@ func getUnixfsNode(path string) (files.Node, error) {
|
||||||
return f, nil
|
return f, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func createIPFSInstance() (*icore.CoreAPI, *core.IpfsNode, error) {
|
func (s *IPFSStorage) addFileToDirectory(originalFileHashToModifyPath path.Path, filename string) string {
|
||||||
|
// fmt.Println("directoryToAddTo: "+s.directoryHash, "filename: "+filename, "originalFileHashToModifyPath: "+originalFileHashToModifyPath.String())
|
||||||
|
directoryToAddToPath := path.New(s.directoryHash)
|
||||||
|
newDirectoryHash, err := (*s.ipfs).Object().AddLink(s.ctx, directoryToAddToPath, filename, originalFileHashToModifyPath)
|
||||||
|
|
||||||
|
verifyError(err)
|
||||||
|
return newDirectoryHash.String() + "/" + filename
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *IPFSStorage) createIPFSInstance() (*icore.CoreAPI, *core.IpfsNode, error) {
|
||||||
// Spawn a node using a temporary path, creating a temporary repo for the run
|
// Spawn a node using a temporary path, creating a temporary repo for the run
|
||||||
api, node, error := spawnEphemeral(ctx)
|
api, node, error := spawnEphemeral(s.ctx)
|
||||||
// api, node, error := spawnDefault(ctx)
|
// api, node, error := spawnDefault(ctx)
|
||||||
return &api, node, error
|
return &api, node, error
|
||||||
}
|
}
|
||||||
|
|
||||||
func startIPFSNode(ipfs icore.CoreAPI, node *core.IpfsNode) { //} icore.CoreAPI {
|
func (s *IPFSStorage) startIPFSNode() { //} icore.CoreAPI {
|
||||||
defer log.Println("IPFS node exited")
|
defer log.Println("IPFS node exited")
|
||||||
|
|
||||||
log.Println("IPFS node is running")
|
log.Println("IPFS node is running")
|
||||||
|
@ -269,15 +264,24 @@ func startIPFSNode(ipfs icore.CoreAPI, node *core.IpfsNode) { //} icore.CoreAPI
|
||||||
// "/ip4/127.0.0.1/udp/4010/quic/p2p/QmZp2fhDLxjYue2RiUvLwT9MWdnbDxam32qYFnGmxZDh5L",
|
// "/ip4/127.0.0.1/udp/4010/quic/p2p/QmZp2fhDLxjYue2RiUvLwT9MWdnbDxam32qYFnGmxZDh5L",
|
||||||
}
|
}
|
||||||
|
|
||||||
go connectToPeers(ctx, ipfs, bootstrapNodes)
|
go connectToPeers(s.ctx, *s.ipfs, bootstrapNodes)
|
||||||
|
|
||||||
addr := "/ip4/127.0.0.1/tcp/5001"
|
addr := "/ip4/127.0.0.1/tcp/5001"
|
||||||
var opts = []corehttp.ServeOption{
|
var opts = []corehttp.ServeOption{
|
||||||
corehttp.GatewayOption(true, "/ipfs", "/ipns"),
|
corehttp.GatewayOption(true, "/ipfs", "/ipns"),
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := corehttp.ListenAndServe(node, addr, opts...); err != nil {
|
if err := corehttp.ListenAndServe(s.node, addr, opts...); err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *IPFSStorage) createIPFSDirectory(directoryName string) {
|
||||||
|
directory, err := getUnixfsNode(directoryName)
|
||||||
|
verifyError(err)
|
||||||
|
defer directory.Close()
|
||||||
|
|
||||||
|
newlyCreatedDirectoryHash, err := (*s.ipfs).Unixfs().Add(s.ctx, directory)
|
||||||
|
verifyError(err)
|
||||||
|
s.directoryHash = newlyCreatedDirectoryHash.String()
|
||||||
}
|
}
|
||||||
|
|
289
ipfsStorage.go
Normal file
289
ipfsStorage.go
Normal file
|
@ -0,0 +1,289 @@
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"io/ioutil"
|
||||||
|
"os"
|
||||||
|
"path/filepath"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
|
config "github.com/ipfs/go-ipfs-config"
|
||||||
|
files "github.com/ipfs/go-ipfs-files"
|
||||||
|
icore "github.com/ipfs/interface-go-ipfs-core"
|
||||||
|
"github.com/ipfs/interface-go-ipfs-core/options"
|
||||||
|
"github.com/ipfs/interface-go-ipfs-core/path"
|
||||||
|
peer "github.com/libp2p/go-libp2p-peer"
|
||||||
|
peerstore "github.com/libp2p/go-libp2p-peerstore"
|
||||||
|
ma "github.com/multiformats/go-multiaddr"
|
||||||
|
|
||||||
|
"github.com/ipfs/go-ipfs/core"
|
||||||
|
"github.com/ipfs/go-ipfs/core/coreapi"
|
||||||
|
"github.com/ipfs/go-ipfs/core/corehttp"
|
||||||
|
"github.com/ipfs/go-ipfs/core/node/libp2p"
|
||||||
|
"github.com/ipfs/go-ipfs/plugin/loader"
|
||||||
|
"github.com/ipfs/go-ipfs/repo/fsrepo"
|
||||||
|
)
|
||||||
|
|
||||||
|
type IPFSStorage struct {
|
||||||
|
ipfs *icore.CoreAPI
|
||||||
|
node *core.IpfsNode
|
||||||
|
|
||||||
|
ctx context.Context
|
||||||
|
directoryHash string
|
||||||
|
gateway string
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *IPFSStorage) Setup(config Config) {
|
||||||
|
log.Println("Setting up IPFS for external storage of video...")
|
||||||
|
|
||||||
|
s.gateway = config.IPFS.Gateway
|
||||||
|
|
||||||
|
s.ctx = context.Background()
|
||||||
|
|
||||||
|
ipfsInstance, node, _ := s.createIPFSInstance()
|
||||||
|
s.ipfs = ipfsInstance
|
||||||
|
s.node = node
|
||||||
|
|
||||||
|
s.createIPFSDirectory("./hls")
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *IPFSStorage) Save(filePath string) string {
|
||||||
|
someFile, err := getUnixfsNode(filePath)
|
||||||
|
defer someFile.Close()
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
panic(fmt.Errorf("Could not get File: %s", err))
|
||||||
|
}
|
||||||
|
|
||||||
|
opts := []options.UnixfsAddOption{
|
||||||
|
options.Unixfs.Pin(false),
|
||||||
|
// options.Unixfs.CidVersion(1),
|
||||||
|
// options.Unixfs.RawLeaves(false),
|
||||||
|
// options.Unixfs.Nocopy(false),
|
||||||
|
}
|
||||||
|
|
||||||
|
cidFile, err := (*s.ipfs).Unixfs().Add(s.ctx, someFile, opts...)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
panic(fmt.Errorf("Could not add File: %s", err))
|
||||||
|
}
|
||||||
|
|
||||||
|
// fmt.Printf("Added file to IPFS with CID %s\n", cidFile.String())
|
||||||
|
|
||||||
|
newHash := s.addFileToDirectory(cidFile, filepath.Base(filePath))
|
||||||
|
|
||||||
|
return newHash
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *IPFSStorage) GenerateRemotePlaylist(playlist string, segments map[string]string) string {
|
||||||
|
for local, remote := range segments {
|
||||||
|
playlist = strings.ReplaceAll(playlist, local, s.gateway+remote)
|
||||||
|
}
|
||||||
|
return playlist
|
||||||
|
}
|
||||||
|
|
||||||
|
func setupPlugins(externalPluginsPath string) error {
|
||||||
|
// Load any external plugins if available on externalPluginsPath
|
||||||
|
plugins, err := loader.NewPluginLoader(filepath.Join(externalPluginsPath, "plugins"))
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("error loading plugins: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Load preloaded and external plugins
|
||||||
|
if err := plugins.Initialize(); err != nil {
|
||||||
|
return fmt.Errorf("error initializing plugins: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := plugins.Inject(); err != nil {
|
||||||
|
return fmt.Errorf("error initializing plugins: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Creates an IPFS node and returns its coreAPI
|
||||||
|
func createNode(ctx context.Context, repoPath string) (icore.CoreAPI, *core.IpfsNode, error) {
|
||||||
|
// Open the repo
|
||||||
|
repo, err := fsrepo.Open(repoPath)
|
||||||
|
verifyError(err)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Construct the node
|
||||||
|
|
||||||
|
nodeOptions := &core.BuildCfg{
|
||||||
|
Online: true,
|
||||||
|
Routing: libp2p.DHTOption, // This option sets the node to be a full DHT node (both fetching and storing DHT Records)
|
||||||
|
// Routing: libp2p.DHTClientOption, // This option sets the node to be a client DHT node (only fetching records)
|
||||||
|
Repo: repo,
|
||||||
|
}
|
||||||
|
|
||||||
|
node, err := core.NewNode(ctx, nodeOptions)
|
||||||
|
node.IsDaemon = true
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Attach the Core API to the constructed node
|
||||||
|
coreAPI, err := coreapi.NewCoreAPI(node)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
return coreAPI, node, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func createTempRepo(ctx context.Context) (string, error) {
|
||||||
|
repoPath, err := ioutil.TempDir("", "ipfs-shell")
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("failed to get temp dir: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a config with default options and a 2048 bit key
|
||||||
|
cfg, err := config.Init(ioutil.Discard, 2048)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return "", err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create the repo with the config
|
||||||
|
err = fsrepo.Init(repoPath, cfg)
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("failed to init ephemeral node: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return repoPath, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Spawns a node to be used just for this run (i.e. creates a tmp repo)
|
||||||
|
func spawnEphemeral(ctx context.Context) (icore.CoreAPI, *core.IpfsNode, error) {
|
||||||
|
if err := setupPlugins(""); err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a Temporary Repo
|
||||||
|
repoPath, err := createTempRepo(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return nil, nil, fmt.Errorf("failed to create temp repo: %s", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Spawning an ephemeral IPFS node
|
||||||
|
coreAPI, node, err := createNode(ctx, repoPath)
|
||||||
|
return coreAPI, node, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func connectToPeers(ctx context.Context, ipfs icore.CoreAPI, peers []string) error {
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
peerInfos := make(map[peer.ID]*peerstore.PeerInfo, len(peers))
|
||||||
|
for _, addrStr := range peers {
|
||||||
|
addr, err := ma.NewMultiaddr(addrStr)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
pii, err := peerstore.InfoFromP2pAddr(addr)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
pi, ok := peerInfos[pii.ID]
|
||||||
|
if !ok {
|
||||||
|
pi = &peerstore.PeerInfo{ID: pii.ID}
|
||||||
|
peerInfos[pi.ID] = pi
|
||||||
|
}
|
||||||
|
pi.Addrs = append(pi.Addrs, pii.Addrs...)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Add(len(peerInfos))
|
||||||
|
for _, peerInfo := range peerInfos {
|
||||||
|
go func(peerInfo *peerstore.PeerInfo) {
|
||||||
|
defer wg.Done()
|
||||||
|
err := ipfs.Swarm().Connect(ctx, *peerInfo)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("failed to connect to %s: %s", peerInfo.ID, err)
|
||||||
|
}
|
||||||
|
}(peerInfo)
|
||||||
|
}
|
||||||
|
wg.Wait()
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func getUnixfsNode(path string) (files.Node, error) {
|
||||||
|
st, err := os.Stat(path)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
f, err := files.NewSerialFile(path, false, st)
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return f, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *IPFSStorage) addFileToDirectory(originalFileHashToModifyPath path.Path, filename string) string {
|
||||||
|
// fmt.Println("directoryToAddTo: "+s.directoryHash, "filename: "+filename, "originalFileHashToModifyPath: "+originalFileHashToModifyPath.String())
|
||||||
|
directoryToAddToPath := path.New(s.directoryHash)
|
||||||
|
newDirectoryHash, err := (*s.ipfs).Object().AddLink(s.ctx, directoryToAddToPath, filename, originalFileHashToModifyPath)
|
||||||
|
|
||||||
|
verifyError(err)
|
||||||
|
return newDirectoryHash.String() + "/" + filename
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *IPFSStorage) createIPFSInstance() (*icore.CoreAPI, *core.IpfsNode, error) {
|
||||||
|
// Spawn a node using a temporary path, creating a temporary repo for the run
|
||||||
|
api, node, error := spawnEphemeral(s.ctx)
|
||||||
|
// api, node, error := spawnDefault(ctx)
|
||||||
|
return &api, node, error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *IPFSStorage) startIPFSNode() { //} icore.CoreAPI {
|
||||||
|
defer log.Println("IPFS node exited")
|
||||||
|
|
||||||
|
log.Println("IPFS node is running")
|
||||||
|
|
||||||
|
bootstrapNodes := []string{
|
||||||
|
// IPFS Bootstrapper nodes.
|
||||||
|
"/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN",
|
||||||
|
"/dnsaddr/bootstrap.libp2p.io/p2p/QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa",
|
||||||
|
"/dnsaddr/bootstrap.libp2p.io/p2p/QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb",
|
||||||
|
"/dnsaddr/bootstrap.libp2p.io/p2p/QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt",
|
||||||
|
|
||||||
|
// IPFS Cluster Pinning nodes
|
||||||
|
"/ip4/138.201.67.219/tcp/4001/p2p/QmUd6zHcbkbcs7SMxwLs48qZVX3vpcM8errYS7xEczwRMA",
|
||||||
|
|
||||||
|
"/ip4/104.131.131.82/tcp/4001/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ", // mars.i.ipfs.io
|
||||||
|
"/ip4/104.131.131.82/udp/4001/quic/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ", // mars.i.ipfs.io
|
||||||
|
|
||||||
|
// You can add more nodes here, for example, another IPFS node you might have running locally, mine was:
|
||||||
|
// "/ip4/127.0.0.1/tcp/4010/p2p/QmZp2fhDLxjYue2RiUvLwT9MWdnbDxam32qYFnGmxZDh5L",
|
||||||
|
// "/ip4/127.0.0.1/udp/4010/quic/p2p/QmZp2fhDLxjYue2RiUvLwT9MWdnbDxam32qYFnGmxZDh5L",
|
||||||
|
}
|
||||||
|
|
||||||
|
go connectToPeers(s.ctx, *s.ipfs, bootstrapNodes)
|
||||||
|
|
||||||
|
addr := "/ip4/127.0.0.1/tcp/5001"
|
||||||
|
var opts = []corehttp.ServeOption{
|
||||||
|
corehttp.GatewayOption(true, "/ipfs", "/ipns"),
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := corehttp.ListenAndServe(s.node, addr, opts...); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *IPFSStorage) createIPFSDirectory(directoryName string) {
|
||||||
|
directory, err := getUnixfsNode(directoryName)
|
||||||
|
verifyError(err)
|
||||||
|
defer directory.Close()
|
||||||
|
|
||||||
|
newlyCreatedDirectoryHash, err := (*s.ipfs).Unixfs().Add(s.ctx, directory)
|
||||||
|
verifyError(err)
|
||||||
|
s.directoryHash = newlyCreatedDirectoryHash.String()
|
||||||
|
}
|
19
main.go
19
main.go
|
@ -5,11 +5,10 @@ import (
|
||||||
"net/http"
|
"net/http"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
|
||||||
icore "github.com/ipfs/interface-go-ipfs-core"
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
)
|
)
|
||||||
|
|
||||||
var ipfs icore.CoreAPI
|
var storage ChunkStorage
|
||||||
var configuration = getConfig()
|
var configuration = getConfig()
|
||||||
var server *Server
|
var server *Server
|
||||||
|
|
||||||
|
@ -24,9 +23,11 @@ func main() {
|
||||||
log.Println("Starting up. Please wait...")
|
log.Println("Starting up. Please wait...")
|
||||||
|
|
||||||
if configuration.IPFS.Enabled {
|
if configuration.IPFS.Enabled {
|
||||||
|
storage = &IPFSStorage{}
|
||||||
|
(storage).Setup(configuration)
|
||||||
|
|
||||||
hlsDirectoryPath = configuration.PrivateHLSPath
|
hlsDirectoryPath = configuration.PrivateHLSPath
|
||||||
enableIPFS()
|
go monitorVideoContent(hlsDirectoryPath, configuration, storage)
|
||||||
go monitorVideoContent(hlsDirectoryPath, configuration, &ipfs)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
go startChatServer()
|
go startChatServer()
|
||||||
|
@ -34,16 +35,6 @@ func main() {
|
||||||
startRTMPService()
|
startRTMPService()
|
||||||
}
|
}
|
||||||
|
|
||||||
func enableIPFS() {
|
|
||||||
log.Println("Enabling IPFS support...")
|
|
||||||
|
|
||||||
ipfsInstance, node, _ := createIPFSInstance()
|
|
||||||
ipfs = *ipfsInstance
|
|
||||||
|
|
||||||
createIPFSDirectory(ipfsInstance, "./hls")
|
|
||||||
go startIPFSNode(ipfs, node)
|
|
||||||
}
|
|
||||||
|
|
||||||
func startChatServer() {
|
func startChatServer() {
|
||||||
// log.SetFlags(log.Lshortfile)
|
// log.SetFlags(log.Lshortfile)
|
||||||
|
|
||||||
|
|
|
@ -8,13 +8,12 @@ import (
|
||||||
|
|
||||||
log "github.com/sirupsen/logrus"
|
log "github.com/sirupsen/logrus"
|
||||||
|
|
||||||
icore "github.com/ipfs/interface-go-ipfs-core"
|
|
||||||
"github.com/radovskyb/watcher"
|
"github.com/radovskyb/watcher"
|
||||||
)
|
)
|
||||||
|
|
||||||
var filesToUpload = make(map[string]string)
|
var filesToUpload = make(map[string]string)
|
||||||
|
|
||||||
func monitorVideoContent(pathToMonitor string, configuration Config, ipfs *icore.CoreAPI) {
|
func monitorVideoContent(pathToMonitor string, configuration Config, storage ChunkStorage) {
|
||||||
log.Printf("Using %s for IPFS files...\n", pathToMonitor)
|
log.Printf("Using %s for IPFS files...\n", pathToMonitor)
|
||||||
|
|
||||||
w := watcher.New()
|
w := watcher.New()
|
||||||
|
@ -34,7 +33,7 @@ func monitorVideoContent(pathToMonitor string, configuration Config, ipfs *icore
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
newObjectPath := save(path.Join(configuration.PrivateHLSPath, filePath), ipfs)
|
newObjectPath := storage.Save(path.Join(configuration.PrivateHLSPath, filePath))
|
||||||
filesToUpload[filePath] = newObjectPath
|
filesToUpload[filePath] = newObjectPath
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -44,7 +43,7 @@ func monitorVideoContent(pathToMonitor string, configuration Config, ipfs *icore
|
||||||
playlistString := string(playlistBytes)
|
playlistString := string(playlistBytes)
|
||||||
|
|
||||||
if configuration.IPFS.Enabled {
|
if configuration.IPFS.Enabled {
|
||||||
playlistString = generateRemotePlaylist(playlistString, configuration.IPFS.Gateway, filesToUpload)
|
playlistString = storage.GenerateRemotePlaylist(playlistString, filesToUpload)
|
||||||
}
|
}
|
||||||
writePlaylist(playlistString, path.Join(configuration.PublicHLSPath, "/stream.m3u8"))
|
writePlaylist(playlistString, path.Join(configuration.PublicHLSPath, "/stream.m3u8"))
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue