From 16047d884ddc29f12d6c4567581c02fd348ad683 Mon Sep 17 00:00:00 2001 From: Gabe Kangas Date: Tue, 2 Jun 2020 23:05:15 -0700 Subject: [PATCH] Refactor ipfs storage behind a standard interface --- chunkStorage.go | 7 ++ ffmpeg.go | 8 -- ipfs.go | 100 ++++++++-------- ipfsStorage.go | 289 +++++++++++++++++++++++++++++++++++++++++++++ main.go | 19 +-- playlistMonitor.go | 7 +- 6 files changed, 356 insertions(+), 74 deletions(-) create mode 100644 chunkStorage.go create mode 100644 ipfsStorage.go diff --git a/chunkStorage.go b/chunkStorage.go new file mode 100644 index 000000000..705705a1d --- /dev/null +++ b/chunkStorage.go @@ -0,0 +1,7 @@ +package main + +type ChunkStorage interface { + Setup(config Config) + Save(filePath string) string + GenerateRemotePlaylist(playlist string, segments map[string]string) string +} diff --git a/ffmpeg.go b/ffmpeg.go index f117d8888..ab55ef132 100644 --- a/ffmpeg.go +++ b/ffmpeg.go @@ -7,7 +7,6 @@ import ( "os/exec" "path" "strconv" - "strings" ) func startFfmpeg(configuration Config) { @@ -37,13 +36,6 @@ func verifyError(e error) { } } -func generateRemotePlaylist(playlist string, gateway string, segments map[string]string) string { - for local, remote := range segments { - playlist = strings.ReplaceAll(playlist, local, gateway+remote) - } - return playlist -} - func writePlaylist(data string, filePath string) { f, err := os.Create(filePath) defer f.Close() diff --git a/ipfs.go b/ipfs.go index c3f73ce48..97cf02edc 100644 --- a/ipfs.go +++ b/ipfs.go @@ -6,6 +6,7 @@ import ( "io/ioutil" "os" "path/filepath" + "strings" "sync" log "github.com/sirupsen/logrus" @@ -22,31 +23,34 @@ import ( "github.com/ipfs/go-ipfs/core" "github.com/ipfs/go-ipfs/core/coreapi" "github.com/ipfs/go-ipfs/core/corehttp" - "github.com/ipfs/go-ipfs/core/node/libp2p" // This package is needed so that all the preloaded plugins are loaded automatically + "github.com/ipfs/go-ipfs/core/node/libp2p" "github.com/ipfs/go-ipfs/plugin/loader" "github.com/ipfs/go-ipfs/repo/fsrepo" ) -var directoryHash string +type IPFSStorage struct { + ipfs *icore.CoreAPI + node *core.IpfsNode -var node *core.IpfsNode - -var ctx = context.Background() - -func createIPFSDirectory(ipfs *icore.CoreAPI, directoryName string) { - directory, err := getUnixfsNode(directoryName) - verifyError(err) - defer directory.Close() - - newlyCreatedDirectoryHash, err := (*ipfs).Unixfs().Add(ctx, directory) - verifyError(err) - - directoryHash = newlyCreatedDirectoryHash.String() + ctx context.Context + directoryHash string + gateway string } -func save(filePath string, ipfs *icore.CoreAPI) string { - someFile, err := getUnixfsNode(filePath) +func (s *IPFSStorage) Setup(config Config) { + s.gateway = config.IPFS.Gateway + s.ctx = context.Background() + + ipfsInstance, node, _ := s.createIPFSInstance() + s.ipfs = ipfsInstance + s.node = node + + s.createIPFSDirectory("./hls") +} + +func (s *IPFSStorage) Save(filePath string) string { + someFile, err := getUnixfsNode(filePath) defer someFile.Close() if err != nil { @@ -60,7 +64,7 @@ func save(filePath string, ipfs *icore.CoreAPI) string { // options.Unixfs.Nocopy(false), } - cidFile, err := (*ipfs).Unixfs().Add(ctx, someFile, opts...) + cidFile, err := (*s.ipfs).Unixfs().Add(s.ctx, someFile, opts...) if err != nil { panic(fmt.Errorf("Could not add File: %s", err)) @@ -68,17 +72,16 @@ func save(filePath string, ipfs *icore.CoreAPI) string { // fmt.Printf("Added file to IPFS with CID %s\n", cidFile.String()) - newHash := addFileToDirectory(ipfs, cidFile, directoryHash, filepath.Base(filePath)) + newHash := s.addFileToDirectory(cidFile, filepath.Base(filePath)) return newHash } -func addFileToDirectory(ipfs *icore.CoreAPI, originalFileHashToModifyPath path.Path, directoryToAddTo string, filename string) string { - directoryToAddToPath := path.New(directoryToAddTo) - newDirectoryHash, err := (*ipfs).Object().AddLink(ctx, directoryToAddToPath, filename, originalFileHashToModifyPath) - - verifyError(err) - return newDirectoryHash.String() + "/" + filename +func (s *IPFSStorage) GenerateRemotePlaylist(playlist string, segments map[string]string) string { + for local, remote := range segments { + playlist = strings.ReplaceAll(playlist, local, s.gateway+remote) + } + return playlist } func setupPlugins(externalPluginsPath string) error { @@ -173,23 +176,6 @@ func spawnEphemeral(ctx context.Context) (icore.CoreAPI, *core.IpfsNode, error) return coreAPI, node, err } -// Spawns a node on the default repo location, if the repo exists -func spawnDefault(ctx context.Context) (icore.CoreAPI, *core.IpfsNode, error) { - defaultPath, err := config.PathRoot() - fmt.Println(defaultPath) - if err != nil { - // shouldn't be possible - return nil, nil, err - } - - if err := setupPlugins(defaultPath); err != nil { - return nil, nil, err - } - - coreAPI, node, err := createNode(ctx, defaultPath) - return coreAPI, node, err -} - func connectToPeers(ctx context.Context, ipfs icore.CoreAPI, peers []string) error { var wg sync.WaitGroup peerInfos := make(map[peer.ID]*peerstore.PeerInfo, len(peers)) @@ -239,14 +225,23 @@ func getUnixfsNode(path string) (files.Node, error) { return f, nil } -func createIPFSInstance() (*icore.CoreAPI, *core.IpfsNode, error) { +func (s *IPFSStorage) addFileToDirectory(originalFileHashToModifyPath path.Path, filename string) string { + // fmt.Println("directoryToAddTo: "+s.directoryHash, "filename: "+filename, "originalFileHashToModifyPath: "+originalFileHashToModifyPath.String()) + directoryToAddToPath := path.New(s.directoryHash) + newDirectoryHash, err := (*s.ipfs).Object().AddLink(s.ctx, directoryToAddToPath, filename, originalFileHashToModifyPath) + + verifyError(err) + return newDirectoryHash.String() + "/" + filename +} + +func (s *IPFSStorage) createIPFSInstance() (*icore.CoreAPI, *core.IpfsNode, error) { // Spawn a node using a temporary path, creating a temporary repo for the run - api, node, error := spawnEphemeral(ctx) + api, node, error := spawnEphemeral(s.ctx) // api, node, error := spawnDefault(ctx) return &api, node, error } -func startIPFSNode(ipfs icore.CoreAPI, node *core.IpfsNode) { //} icore.CoreAPI { +func (s *IPFSStorage) startIPFSNode() { //} icore.CoreAPI { defer log.Println("IPFS node exited") log.Println("IPFS node is running") @@ -269,15 +264,24 @@ func startIPFSNode(ipfs icore.CoreAPI, node *core.IpfsNode) { //} icore.CoreAPI // "/ip4/127.0.0.1/udp/4010/quic/p2p/QmZp2fhDLxjYue2RiUvLwT9MWdnbDxam32qYFnGmxZDh5L", } - go connectToPeers(ctx, ipfs, bootstrapNodes) + go connectToPeers(s.ctx, *s.ipfs, bootstrapNodes) addr := "/ip4/127.0.0.1/tcp/5001" var opts = []corehttp.ServeOption{ corehttp.GatewayOption(true, "/ipfs", "/ipns"), } - if err := corehttp.ListenAndServe(node, addr, opts...); err != nil { + if err := corehttp.ListenAndServe(s.node, addr, opts...); err != nil { return } - +} + +func (s *IPFSStorage) createIPFSDirectory(directoryName string) { + directory, err := getUnixfsNode(directoryName) + verifyError(err) + defer directory.Close() + + newlyCreatedDirectoryHash, err := (*s.ipfs).Unixfs().Add(s.ctx, directory) + verifyError(err) + s.directoryHash = newlyCreatedDirectoryHash.String() } diff --git a/ipfsStorage.go b/ipfsStorage.go new file mode 100644 index 000000000..5199f54dc --- /dev/null +++ b/ipfsStorage.go @@ -0,0 +1,289 @@ +package main + +import ( + "context" + "fmt" + "io/ioutil" + "os" + "path/filepath" + "strings" + "sync" + + log "github.com/sirupsen/logrus" + + config "github.com/ipfs/go-ipfs-config" + files "github.com/ipfs/go-ipfs-files" + icore "github.com/ipfs/interface-go-ipfs-core" + "github.com/ipfs/interface-go-ipfs-core/options" + "github.com/ipfs/interface-go-ipfs-core/path" + peer "github.com/libp2p/go-libp2p-peer" + peerstore "github.com/libp2p/go-libp2p-peerstore" + ma "github.com/multiformats/go-multiaddr" + + "github.com/ipfs/go-ipfs/core" + "github.com/ipfs/go-ipfs/core/coreapi" + "github.com/ipfs/go-ipfs/core/corehttp" + "github.com/ipfs/go-ipfs/core/node/libp2p" + "github.com/ipfs/go-ipfs/plugin/loader" + "github.com/ipfs/go-ipfs/repo/fsrepo" +) + +type IPFSStorage struct { + ipfs *icore.CoreAPI + node *core.IpfsNode + + ctx context.Context + directoryHash string + gateway string +} + +func (s *IPFSStorage) Setup(config Config) { + log.Println("Setting up IPFS for external storage of video...") + + s.gateway = config.IPFS.Gateway + + s.ctx = context.Background() + + ipfsInstance, node, _ := s.createIPFSInstance() + s.ipfs = ipfsInstance + s.node = node + + s.createIPFSDirectory("./hls") +} + +func (s *IPFSStorage) Save(filePath string) string { + someFile, err := getUnixfsNode(filePath) + defer someFile.Close() + + if err != nil { + panic(fmt.Errorf("Could not get File: %s", err)) + } + + opts := []options.UnixfsAddOption{ + options.Unixfs.Pin(false), + // options.Unixfs.CidVersion(1), + // options.Unixfs.RawLeaves(false), + // options.Unixfs.Nocopy(false), + } + + cidFile, err := (*s.ipfs).Unixfs().Add(s.ctx, someFile, opts...) + + if err != nil { + panic(fmt.Errorf("Could not add File: %s", err)) + } + + // fmt.Printf("Added file to IPFS with CID %s\n", cidFile.String()) + + newHash := s.addFileToDirectory(cidFile, filepath.Base(filePath)) + + return newHash +} + +func (s *IPFSStorage) GenerateRemotePlaylist(playlist string, segments map[string]string) string { + for local, remote := range segments { + playlist = strings.ReplaceAll(playlist, local, s.gateway+remote) + } + return playlist +} + +func setupPlugins(externalPluginsPath string) error { + // Load any external plugins if available on externalPluginsPath + plugins, err := loader.NewPluginLoader(filepath.Join(externalPluginsPath, "plugins")) + if err != nil { + return fmt.Errorf("error loading plugins: %s", err) + } + + // Load preloaded and external plugins + if err := plugins.Initialize(); err != nil { + return fmt.Errorf("error initializing plugins: %s", err) + } + + if err := plugins.Inject(); err != nil { + return fmt.Errorf("error initializing plugins: %s", err) + } + + return nil +} + +// Creates an IPFS node and returns its coreAPI +func createNode(ctx context.Context, repoPath string) (icore.CoreAPI, *core.IpfsNode, error) { + // Open the repo + repo, err := fsrepo.Open(repoPath) + verifyError(err) + + if err != nil { + return nil, nil, err + } + + // Construct the node + + nodeOptions := &core.BuildCfg{ + Online: true, + Routing: libp2p.DHTOption, // This option sets the node to be a full DHT node (both fetching and storing DHT Records) + // Routing: libp2p.DHTClientOption, // This option sets the node to be a client DHT node (only fetching records) + Repo: repo, + } + + node, err := core.NewNode(ctx, nodeOptions) + node.IsDaemon = true + + if err != nil { + return nil, nil, err + } + + // Attach the Core API to the constructed node + coreAPI, err := coreapi.NewCoreAPI(node) + if err != nil { + return nil, nil, err + } + return coreAPI, node, nil +} + +func createTempRepo(ctx context.Context) (string, error) { + repoPath, err := ioutil.TempDir("", "ipfs-shell") + if err != nil { + return "", fmt.Errorf("failed to get temp dir: %s", err) + } + + // Create a config with default options and a 2048 bit key + cfg, err := config.Init(ioutil.Discard, 2048) + + if err != nil { + return "", err + } + + // Create the repo with the config + err = fsrepo.Init(repoPath, cfg) + if err != nil { + return "", fmt.Errorf("failed to init ephemeral node: %s", err) + } + + return repoPath, nil +} + +// Spawns a node to be used just for this run (i.e. creates a tmp repo) +func spawnEphemeral(ctx context.Context) (icore.CoreAPI, *core.IpfsNode, error) { + if err := setupPlugins(""); err != nil { + return nil, nil, err + } + + // Create a Temporary Repo + repoPath, err := createTempRepo(ctx) + if err != nil { + return nil, nil, fmt.Errorf("failed to create temp repo: %s", err) + } + + // Spawning an ephemeral IPFS node + coreAPI, node, err := createNode(ctx, repoPath) + return coreAPI, node, err +} + +func connectToPeers(ctx context.Context, ipfs icore.CoreAPI, peers []string) error { + var wg sync.WaitGroup + peerInfos := make(map[peer.ID]*peerstore.PeerInfo, len(peers)) + for _, addrStr := range peers { + addr, err := ma.NewMultiaddr(addrStr) + if err != nil { + return err + } + pii, err := peerstore.InfoFromP2pAddr(addr) + if err != nil { + return err + } + pi, ok := peerInfos[pii.ID] + if !ok { + pi = &peerstore.PeerInfo{ID: pii.ID} + peerInfos[pi.ID] = pi + } + pi.Addrs = append(pi.Addrs, pii.Addrs...) + } + + wg.Add(len(peerInfos)) + for _, peerInfo := range peerInfos { + go func(peerInfo *peerstore.PeerInfo) { + defer wg.Done() + err := ipfs.Swarm().Connect(ctx, *peerInfo) + if err != nil { + log.Printf("failed to connect to %s: %s", peerInfo.ID, err) + } + }(peerInfo) + } + wg.Wait() + return nil +} + +func getUnixfsNode(path string) (files.Node, error) { + st, err := os.Stat(path) + if err != nil { + return nil, err + } + + f, err := files.NewSerialFile(path, false, st) + + if err != nil { + return nil, err + } + + return f, nil +} + +func (s *IPFSStorage) addFileToDirectory(originalFileHashToModifyPath path.Path, filename string) string { + // fmt.Println("directoryToAddTo: "+s.directoryHash, "filename: "+filename, "originalFileHashToModifyPath: "+originalFileHashToModifyPath.String()) + directoryToAddToPath := path.New(s.directoryHash) + newDirectoryHash, err := (*s.ipfs).Object().AddLink(s.ctx, directoryToAddToPath, filename, originalFileHashToModifyPath) + + verifyError(err) + return newDirectoryHash.String() + "/" + filename +} + +func (s *IPFSStorage) createIPFSInstance() (*icore.CoreAPI, *core.IpfsNode, error) { + // Spawn a node using a temporary path, creating a temporary repo for the run + api, node, error := spawnEphemeral(s.ctx) + // api, node, error := spawnDefault(ctx) + return &api, node, error +} + +func (s *IPFSStorage) startIPFSNode() { //} icore.CoreAPI { + defer log.Println("IPFS node exited") + + log.Println("IPFS node is running") + + bootstrapNodes := []string{ + // IPFS Bootstrapper nodes. + "/dnsaddr/bootstrap.libp2p.io/p2p/QmNnooDu7bfjPFoTZYxMNLWUQJyrVwtbZg5gBMjTezGAJN", + "/dnsaddr/bootstrap.libp2p.io/p2p/QmQCU2EcMqAqQPR2i9bChDtGNJchTbq5TbXJJ16u19uLTa", + "/dnsaddr/bootstrap.libp2p.io/p2p/QmbLHAnMoJPWSCR5Zhtx6BHJX9KiKNN6tpvbUcqanj75Nb", + "/dnsaddr/bootstrap.libp2p.io/p2p/QmcZf59bWwK5XFi76CZX8cbJ4BhTzzA3gU1ZjYZcYW3dwt", + + // IPFS Cluster Pinning nodes + "/ip4/138.201.67.219/tcp/4001/p2p/QmUd6zHcbkbcs7SMxwLs48qZVX3vpcM8errYS7xEczwRMA", + + "/ip4/104.131.131.82/tcp/4001/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ", // mars.i.ipfs.io + "/ip4/104.131.131.82/udp/4001/quic/p2p/QmaCpDMGvV2BGHeYERUEnRQAwe3N8SzbUtfsmvsqQLuvuJ", // mars.i.ipfs.io + + // You can add more nodes here, for example, another IPFS node you might have running locally, mine was: + // "/ip4/127.0.0.1/tcp/4010/p2p/QmZp2fhDLxjYue2RiUvLwT9MWdnbDxam32qYFnGmxZDh5L", + // "/ip4/127.0.0.1/udp/4010/quic/p2p/QmZp2fhDLxjYue2RiUvLwT9MWdnbDxam32qYFnGmxZDh5L", + } + + go connectToPeers(s.ctx, *s.ipfs, bootstrapNodes) + + addr := "/ip4/127.0.0.1/tcp/5001" + var opts = []corehttp.ServeOption{ + corehttp.GatewayOption(true, "/ipfs", "/ipns"), + } + + if err := corehttp.ListenAndServe(s.node, addr, opts...); err != nil { + return + } +} + +func (s *IPFSStorage) createIPFSDirectory(directoryName string) { + directory, err := getUnixfsNode(directoryName) + verifyError(err) + defer directory.Close() + + newlyCreatedDirectoryHash, err := (*s.ipfs).Unixfs().Add(s.ctx, directory) + verifyError(err) + s.directoryHash = newlyCreatedDirectoryHash.String() +} diff --git a/main.go b/main.go index 5fc3e8567..3afa084d6 100644 --- a/main.go +++ b/main.go @@ -5,11 +5,10 @@ import ( "net/http" "strconv" - icore "github.com/ipfs/interface-go-ipfs-core" log "github.com/sirupsen/logrus" ) -var ipfs icore.CoreAPI +var storage ChunkStorage var configuration = getConfig() var server *Server @@ -24,9 +23,11 @@ func main() { log.Println("Starting up. Please wait...") if configuration.IPFS.Enabled { + storage = &IPFSStorage{} + (storage).Setup(configuration) + hlsDirectoryPath = configuration.PrivateHLSPath - enableIPFS() - go monitorVideoContent(hlsDirectoryPath, configuration, &ipfs) + go monitorVideoContent(hlsDirectoryPath, configuration, storage) } go startChatServer() @@ -34,16 +35,6 @@ func main() { startRTMPService() } -func enableIPFS() { - log.Println("Enabling IPFS support...") - - ipfsInstance, node, _ := createIPFSInstance() - ipfs = *ipfsInstance - - createIPFSDirectory(ipfsInstance, "./hls") - go startIPFSNode(ipfs, node) -} - func startChatServer() { // log.SetFlags(log.Lshortfile) diff --git a/playlistMonitor.go b/playlistMonitor.go index 37325e9ea..3061c778f 100644 --- a/playlistMonitor.go +++ b/playlistMonitor.go @@ -8,13 +8,12 @@ import ( log "github.com/sirupsen/logrus" - icore "github.com/ipfs/interface-go-ipfs-core" "github.com/radovskyb/watcher" ) var filesToUpload = make(map[string]string) -func monitorVideoContent(pathToMonitor string, configuration Config, ipfs *icore.CoreAPI) { +func monitorVideoContent(pathToMonitor string, configuration Config, storage ChunkStorage) { log.Printf("Using %s for IPFS files...\n", pathToMonitor) w := watcher.New() @@ -34,7 +33,7 @@ func monitorVideoContent(pathToMonitor string, configuration Config, ipfs *icore continue } - newObjectPath := save(path.Join(configuration.PrivateHLSPath, filePath), ipfs) + newObjectPath := storage.Save(path.Join(configuration.PrivateHLSPath, filePath)) filesToUpload[filePath] = newObjectPath } } @@ -44,7 +43,7 @@ func monitorVideoContent(pathToMonitor string, configuration Config, ipfs *icore playlistString := string(playlistBytes) if configuration.IPFS.Enabled { - playlistString = generateRemotePlaylist(playlistString, configuration.IPFS.Gateway, filesToUpload) + playlistString = storage.GenerateRemotePlaylist(playlistString, filesToUpload) } writePlaylist(playlistString, path.Join(configuration.PublicHLSPath, "/stream.m3u8"))