From f0180e0d15e6e3d648f858f49c5932bbc22f5e00 Mon Sep 17 00:00:00 2001 From: LuftVerbot <97435834+LuftVerbot@users.noreply.github.com> Date: Wed, 1 Nov 2023 15:48:14 +0100 Subject: [PATCH] rework anime downloader --- .../data/download/anime/AnimeDownloader.kt | 414 +++++++++--------- .../animesource/online/AnimeHttpSource.kt | 30 +- 2 files changed, 229 insertions(+), 215 deletions(-) diff --git a/app/src/main/java/eu/kanade/tachiyomi/data/download/anime/AnimeDownloader.kt b/app/src/main/java/eu/kanade/tachiyomi/data/download/anime/AnimeDownloader.kt index 01eef646c..8ac6f4c5a 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/data/download/anime/AnimeDownloader.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/data/download/anime/AnimeDownloader.kt @@ -17,7 +17,6 @@ import eu.kanade.tachiyomi.R import eu.kanade.tachiyomi.animesource.UnmeteredSource import eu.kanade.tachiyomi.animesource.model.Video import eu.kanade.tachiyomi.animesource.online.AnimeHttpSource -import eu.kanade.tachiyomi.animesource.online.fetchUrlFromVideo import eu.kanade.tachiyomi.data.cache.EpisodeCache import eu.kanade.tachiyomi.data.download.anime.model.AnimeDownload import eu.kanade.tachiyomi.data.library.anime.AnimeLibraryUpdateNotifier @@ -30,6 +29,8 @@ import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.async +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay import kotlinx.coroutines.flow.MutableStateFlow import kotlinx.coroutines.flow.asStateFlow import kotlinx.coroutines.flow.collectLatest @@ -43,11 +44,8 @@ import kotlinx.coroutines.launch import kotlinx.coroutines.supervisorScope import logcat.LogPriority import okhttp3.HttpUrl.Companion.toHttpUrl -import rx.Observable -import rx.android.schedulers.AndroidSchedulers import rx.subjects.PublishSubject import tachiyomi.core.util.lang.launchIO -import tachiyomi.core.util.lang.launchNow import tachiyomi.core.util.lang.withUIContext import tachiyomi.core.util.system.ImageUtil import tachiyomi.core.util.system.logcat @@ -60,7 +58,6 @@ import uy.kohesive.injekt.api.get import uy.kohesive.injekt.injectLazy import java.io.File import java.util.Locale -import java.util.concurrent.TimeUnit import kotlin.coroutines.cancellation.CancellationException /** @@ -124,7 +121,7 @@ class AnimeDownloader( var isFFmpegRunning: Boolean = false init { - launchNow { + scope.launch { val episodes = async { store.restore() } addAllToQueue(episodes.await()) } @@ -201,7 +198,7 @@ class AnimeDownloader( } /** - * Prepares the subscriptions to start downloading. + * Prepares the jobs to start downloading. */ private fun launchDownloaderJob() { if (isRunning) return @@ -212,11 +209,12 @@ class AnimeDownloader( val activeDownloads = queue.asSequence() .filter { it.status.value <= AnimeDownload.State.DOWNLOADING.value } // Ignore completed downloads, leave them in the queue .groupBy { it.source } - .toList().take(5) // Concurrently download from 5 different sources + .toList().take(3) // Concurrently download from 5 different sources .map { (_, downloads) -> downloads.first() } emit(activeDownloads) if (activeDownloads.isEmpty()) break + // Suspend until a download enters the ERROR state val activeDownloadsErroredFlow = combine(activeDownloads.map(AnimeDownload::statusFlow)) { states -> @@ -254,6 +252,11 @@ class AnimeDownloader( if (download.status == AnimeDownload.State.DOWNLOADED) { removeFromQueue(download) } + + if (download.status == AnimeDownload.State.QUEUE) { + pause() + } + if (areAllAnimeDownloadsFinished()) { stop() } @@ -294,12 +297,15 @@ class AnimeDownloader( val source = sourceManager.get(anime.source) as? AnimeHttpSource ?: return@launchIO val wasEmpty = queueState.value.isEmpty() - val episodesWithoutDir = episodes - // Filter out those already downloaded. - .filter { provider.findEpisodeDir(it.name, it.scanlator, anime.title, source) == null } - // Add chapters to queue from the start. - .sortedByDescending { it.sourceOrder } + // Called in background thread, the operation can be slow with SAF. + val episodesWithoutDir = + episodes + // Filter out those already downloaded. + .filter { provider.findEpisodeDir(it.name, it.scanlator, anime.title, source) == null } + // Add episodes to queue from the start. + .sortedByDescending { it.sourceOrder } + // Runs in main thread (synchronization needed). val episodesToQueue = episodesWithoutDir // Filter out those already enqueued. .filter { episode -> queueState.value.none { it.episode.id == episode.id } } @@ -311,7 +317,8 @@ class AnimeDownloader( // Start downloader if needed if (autoStart && wasEmpty) { - val queuedDownloads = queueState.value.count { it: AnimeDownload -> it.source !is UnmeteredSource } + val queuedDownloads = + queueState.value.count { it: AnimeDownload -> it.source !is UnmeteredSource } val maxDownloadsFromSource = queueState.value .groupBy { it.source } .filterKeys { it !is UnmeteredSource } @@ -326,7 +333,10 @@ class AnimeDownloader( notifier.onWarning( context.getString(R.string.download_queue_size_warning), WARNING_NOTIF_TIMEOUT_MS, - NotificationHandler.openUrl(context, AnimeLibraryUpdateNotifier.HELP_WARNING_URL), + NotificationHandler.openUrl( + context, + AnimeLibraryUpdateNotifier.HELP_WARNING_URL, + ), ) } } @@ -340,85 +350,80 @@ class AnimeDownloader( * * @param download the episode to be downloaded. */ - private fun downloadEpisode(download: AnimeDownload): Observable = Observable.defer { + private suspend fun downloadEpisode(download: AnimeDownload) { val animeDir = provider.getAnimeDir(download.anime.title, download.source) val availSpace = DiskUtil.getAvailableStorageSpace(animeDir) if (availSpace != -1L && availSpace < MIN_DISK_SPACE) { download.status = AnimeDownload.State.ERROR - notifier.onError(context.getString(R.string.download_insufficient_space), download.episode.name, download.anime.title) - return@defer Observable.just(download) + notifier.onError( + context.getString(R.string.download_insufficient_space), + download.episode.name, + download.anime.title, + ) + return } - val episodeDirname = provider.getEpisodeDirName(download.episode.name, download.episode.scanlator) + val episodeDirname = + provider.getEpisodeDirName(download.episode.name, download.episode.scanlator) val tmpDir = animeDir.createDirectory(episodeDirname + TMP_DIR_SUFFIX) notifier.onProgressChange(download) - val videoObservable = if (download.video == null) { + val video = if (download.video == null) { // Pull video from network and add them to download object - download.source.fetchVideoList(download.episode.toSEpisode()).map { it.first() } - .doOnNext { video -> - if (video == null) { - throw Exception(context.getString(R.string.video_list_empty_error)) - } - download.video = video - } + try { + val fetchedVideo = + download.source.getVideoList(download.episode.toSEpisode()).first() + download.video = fetchedVideo + fetchedVideo + } catch (e: Exception) { + throw Exception(context.getString(R.string.video_list_empty_error)) + } } else { - // Or if the video already exists, start from the file - Observable.just(download.video!!) + // Or if the video already exists, return it + download.video!! } - videoObservable - .doOnNext { _ -> - if (download.video?.bytesDownloaded == 0L) { - // Delete all temporary (unfinished) files - tmpDir.listFiles() - ?.filter { it.name!!.endsWith(".tmp") } - ?.forEach { it.delete() } - } + if (download.video!!.bytesDownloaded == 0L) { + // Delete all temporary (unfinished) files + tmpDir.listFiles() + ?.filter { it.name!!.endsWith(".tmp") } + ?.forEach { it.delete() } + } - download.downloadedImages = 0 - download.status = AnimeDownload.State.DOWNLOADING + download.downloadedImages = 0 + download.status = AnimeDownload.State.DOWNLOADING + + val progressJob = scope.launch { + while (download.status == AnimeDownload.State.DOWNLOADING) { + delay(50) + val progress = download.video!!.progress + if (download.totalProgress != progress) { + download.totalProgress = progress + notifier.onProgressChange(download) + } } - // Get all the URLs to the source images, fetch pages if necessary - .flatMap { download.source.fetchUrlFromVideo(it) } - .doOnNext { - Observable.interval(50, TimeUnit.MILLISECONDS) - // Get the sum of percentages for all the pages. - .flatMap { - Observable.just(download.video) - .flatMap { Observable.just(it!!.progress) } - } - .takeUntil { download.status != AnimeDownload.State.DOWNLOADING } - // Keep only the latest emission to avoid backpressure. - .onBackpressureLatest() - .observeOn(AndroidSchedulers.mainThread()) - .subscribe { progress -> - // Update the view only if the progress has changed. - if (download.totalProgress != progress) { - download.totalProgress = progress - notifier.onProgressChange(download) - } - } - } - // Start downloading videos, consider we can have downloaded images already - // Concurrently do 5 videos at a time (a fossil of the manga downloads) - .flatMap({ video -> getOrAnimeDownloadVideo(video, download, tmpDir) }, 5) - .onBackpressureLatest() - // Do when video is downloaded. - .toList() - .map { download } - // Do after download completes - .doOnNext { - ensureSuccessfulAnimeDownload(download, animeDir, tmpDir, episodeDirname) - if (download.status == AnimeDownload.State.DOWNLOADED) notifier.dismissProgress() - } - // If the video list threw, it will resume here - .onErrorReturn { error -> - download.status = AnimeDownload.State.ERROR - notifier.onError(error.message, download.episode.name, download.anime.title) - download + } + + try { + // Replace this with your actual download logic + getOrAnimeDownloadVideo(video, download, tmpDir) + } catch (e: Exception) { + download.status = AnimeDownload.State.ERROR + notifier.onError(e.message, download.episode.name, download.anime.title) + } finally { + progressJob.cancel() + } + + try { + ensureSuccessfulAnimeDownload(download, animeDir, tmpDir, episodeDirname) + if (download.status == AnimeDownload.State.DOWNLOADED) { + notifier.dismissProgress() } + } catch (e: Exception) { + download.status = AnimeDownload.State.ERROR + notifier.onError(e.message, download.episode.name, download.anime.title) + } } /** @@ -429,27 +434,18 @@ class AnimeDownloader( * @param download the download of the video. * @param tmpDir the temporary directory of the download. */ - private fun getOrAnimeDownloadVideo(video: Video, download: AnimeDownload, tmpDir: UniFile): Observable