rework anime downloader

This commit is contained in:
LuftVerbot 2023-11-01 15:48:14 +01:00
parent f3ec9da2da
commit f0180e0d15
2 changed files with 229 additions and 215 deletions

View file

@ -17,7 +17,6 @@ import eu.kanade.tachiyomi.R
import eu.kanade.tachiyomi.animesource.UnmeteredSource
import eu.kanade.tachiyomi.animesource.model.Video
import eu.kanade.tachiyomi.animesource.online.AnimeHttpSource
import eu.kanade.tachiyomi.animesource.online.fetchUrlFromVideo
import eu.kanade.tachiyomi.data.cache.EpisodeCache
import eu.kanade.tachiyomi.data.download.anime.model.AnimeDownload
import eu.kanade.tachiyomi.data.library.anime.AnimeLibraryUpdateNotifier
@ -30,6 +29,8 @@ import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.asStateFlow
import kotlinx.coroutines.flow.collectLatest
@ -43,11 +44,8 @@ import kotlinx.coroutines.launch
import kotlinx.coroutines.supervisorScope
import logcat.LogPriority
import okhttp3.HttpUrl.Companion.toHttpUrl
import rx.Observable
import rx.android.schedulers.AndroidSchedulers
import rx.subjects.PublishSubject
import tachiyomi.core.util.lang.launchIO
import tachiyomi.core.util.lang.launchNow
import tachiyomi.core.util.lang.withUIContext
import tachiyomi.core.util.system.ImageUtil
import tachiyomi.core.util.system.logcat
@ -60,7 +58,6 @@ import uy.kohesive.injekt.api.get
import uy.kohesive.injekt.injectLazy
import java.io.File
import java.util.Locale
import java.util.concurrent.TimeUnit
import kotlin.coroutines.cancellation.CancellationException
/**
@ -124,7 +121,7 @@ class AnimeDownloader(
var isFFmpegRunning: Boolean = false
init {
launchNow {
scope.launch {
val episodes = async { store.restore() }
addAllToQueue(episodes.await())
}
@ -201,7 +198,7 @@ class AnimeDownloader(
}
/**
* Prepares the subscriptions to start downloading.
* Prepares the jobs to start downloading.
*/
private fun launchDownloaderJob() {
if (isRunning) return
@ -212,11 +209,12 @@ class AnimeDownloader(
val activeDownloads = queue.asSequence()
.filter { it.status.value <= AnimeDownload.State.DOWNLOADING.value } // Ignore completed downloads, leave them in the queue
.groupBy { it.source }
.toList().take(5) // Concurrently download from 5 different sources
.toList().take(3) // Concurrently download from 5 different sources
.map { (_, downloads) -> downloads.first() }
emit(activeDownloads)
if (activeDownloads.isEmpty()) break
// Suspend until a download enters the ERROR state
val activeDownloadsErroredFlow =
combine(activeDownloads.map(AnimeDownload::statusFlow)) { states ->
@ -254,6 +252,11 @@ class AnimeDownloader(
if (download.status == AnimeDownload.State.DOWNLOADED) {
removeFromQueue(download)
}
if (download.status == AnimeDownload.State.QUEUE) {
pause()
}
if (areAllAnimeDownloadsFinished()) {
stop()
}
@ -294,12 +297,15 @@ class AnimeDownloader(
val source = sourceManager.get(anime.source) as? AnimeHttpSource ?: return@launchIO
val wasEmpty = queueState.value.isEmpty()
val episodesWithoutDir = episodes
// Filter out those already downloaded.
.filter { provider.findEpisodeDir(it.name, it.scanlator, anime.title, source) == null }
// Add chapters to queue from the start.
.sortedByDescending { it.sourceOrder }
// Called in background thread, the operation can be slow with SAF.
val episodesWithoutDir =
episodes
// Filter out those already downloaded.
.filter { provider.findEpisodeDir(it.name, it.scanlator, anime.title, source) == null }
// Add episodes to queue from the start.
.sortedByDescending { it.sourceOrder }
// Runs in main thread (synchronization needed).
val episodesToQueue = episodesWithoutDir
// Filter out those already enqueued.
.filter { episode -> queueState.value.none { it.episode.id == episode.id } }
@ -311,7 +317,8 @@ class AnimeDownloader(
// Start downloader if needed
if (autoStart && wasEmpty) {
val queuedDownloads = queueState.value.count { it: AnimeDownload -> it.source !is UnmeteredSource }
val queuedDownloads =
queueState.value.count { it: AnimeDownload -> it.source !is UnmeteredSource }
val maxDownloadsFromSource = queueState.value
.groupBy { it.source }
.filterKeys { it !is UnmeteredSource }
@ -326,7 +333,10 @@ class AnimeDownloader(
notifier.onWarning(
context.getString(R.string.download_queue_size_warning),
WARNING_NOTIF_TIMEOUT_MS,
NotificationHandler.openUrl(context, AnimeLibraryUpdateNotifier.HELP_WARNING_URL),
NotificationHandler.openUrl(
context,
AnimeLibraryUpdateNotifier.HELP_WARNING_URL,
),
)
}
}
@ -340,85 +350,80 @@ class AnimeDownloader(
*
* @param download the episode to be downloaded.
*/
private fun downloadEpisode(download: AnimeDownload): Observable<AnimeDownload> = Observable.defer {
private suspend fun downloadEpisode(download: AnimeDownload) {
val animeDir = provider.getAnimeDir(download.anime.title, download.source)
val availSpace = DiskUtil.getAvailableStorageSpace(animeDir)
if (availSpace != -1L && availSpace < MIN_DISK_SPACE) {
download.status = AnimeDownload.State.ERROR
notifier.onError(context.getString(R.string.download_insufficient_space), download.episode.name, download.anime.title)
return@defer Observable.just(download)
notifier.onError(
context.getString(R.string.download_insufficient_space),
download.episode.name,
download.anime.title,
)
return
}
val episodeDirname = provider.getEpisodeDirName(download.episode.name, download.episode.scanlator)
val episodeDirname =
provider.getEpisodeDirName(download.episode.name, download.episode.scanlator)
val tmpDir = animeDir.createDirectory(episodeDirname + TMP_DIR_SUFFIX)
notifier.onProgressChange(download)
val videoObservable = if (download.video == null) {
val video = if (download.video == null) {
// Pull video from network and add them to download object
download.source.fetchVideoList(download.episode.toSEpisode()).map { it.first() }
.doOnNext { video ->
if (video == null) {
throw Exception(context.getString(R.string.video_list_empty_error))
}
download.video = video
}
try {
val fetchedVideo =
download.source.getVideoList(download.episode.toSEpisode()).first()
download.video = fetchedVideo
fetchedVideo
} catch (e: Exception) {
throw Exception(context.getString(R.string.video_list_empty_error))
}
} else {
// Or if the video already exists, start from the file
Observable.just(download.video!!)
// Or if the video already exists, return it
download.video!!
}
videoObservable
.doOnNext { _ ->
if (download.video?.bytesDownloaded == 0L) {
// Delete all temporary (unfinished) files
tmpDir.listFiles()
?.filter { it.name!!.endsWith(".tmp") }
?.forEach { it.delete() }
}
if (download.video!!.bytesDownloaded == 0L) {
// Delete all temporary (unfinished) files
tmpDir.listFiles()
?.filter { it.name!!.endsWith(".tmp") }
?.forEach { it.delete() }
}
download.downloadedImages = 0
download.status = AnimeDownload.State.DOWNLOADING
download.downloadedImages = 0
download.status = AnimeDownload.State.DOWNLOADING
val progressJob = scope.launch {
while (download.status == AnimeDownload.State.DOWNLOADING) {
delay(50)
val progress = download.video!!.progress
if (download.totalProgress != progress) {
download.totalProgress = progress
notifier.onProgressChange(download)
}
}
// Get all the URLs to the source images, fetch pages if necessary
.flatMap { download.source.fetchUrlFromVideo(it) }
.doOnNext {
Observable.interval(50, TimeUnit.MILLISECONDS)
// Get the sum of percentages for all the pages.
.flatMap {
Observable.just(download.video)
.flatMap { Observable.just(it!!.progress) }
}
.takeUntil { download.status != AnimeDownload.State.DOWNLOADING }
// Keep only the latest emission to avoid backpressure.
.onBackpressureLatest()
.observeOn(AndroidSchedulers.mainThread())
.subscribe { progress ->
// Update the view only if the progress has changed.
if (download.totalProgress != progress) {
download.totalProgress = progress
notifier.onProgressChange(download)
}
}
}
// Start downloading videos, consider we can have downloaded images already
// Concurrently do 5 videos at a time (a fossil of the manga downloads)
.flatMap({ video -> getOrAnimeDownloadVideo(video, download, tmpDir) }, 5)
.onBackpressureLatest()
// Do when video is downloaded.
.toList()
.map { download }
// Do after download completes
.doOnNext {
ensureSuccessfulAnimeDownload(download, animeDir, tmpDir, episodeDirname)
if (download.status == AnimeDownload.State.DOWNLOADED) notifier.dismissProgress()
}
// If the video list threw, it will resume here
.onErrorReturn { error ->
download.status = AnimeDownload.State.ERROR
notifier.onError(error.message, download.episode.name, download.anime.title)
download
}
try {
// Replace this with your actual download logic
getOrAnimeDownloadVideo(video, download, tmpDir)
} catch (e: Exception) {
download.status = AnimeDownload.State.ERROR
notifier.onError(e.message, download.episode.name, download.anime.title)
} finally {
progressJob.cancel()
}
try {
ensureSuccessfulAnimeDownload(download, animeDir, tmpDir, episodeDirname)
if (download.status == AnimeDownload.State.DOWNLOADED) {
notifier.dismissProgress()
}
} catch (e: Exception) {
download.status = AnimeDownload.State.ERROR
notifier.onError(e.message, download.episode.name, download.anime.title)
}
}
/**
@ -429,27 +434,18 @@ class AnimeDownloader(
* @param download the download of the video.
* @param tmpDir the temporary directory of the download.
*/
private fun getOrAnimeDownloadVideo(video: Video, download: AnimeDownload, tmpDir: UniFile): Observable<Video> {
private suspend fun getOrAnimeDownloadVideo(video: Video, download: AnimeDownload, tmpDir: UniFile): Video {
// If the video URL is empty, do nothing
if (video.videoUrl == null) {
return Observable.just(video)
return video
}
val filename = DiskUtil.buildValidFilename(download.episode.name)
if (video.bytesDownloaded == 0L) {
val tmpFile = tmpDir.findFile("$filename.tmp")
// Delete temp file if it exists
tmpFile?.delete()
}
// Try to find the video file
val videoFile = tmpDir.listFiles()?.firstOrNull { it.name!!.startsWith("$filename.") }
// If the video is already downloaded, do nothing. Otherwise download from network
val pageObservable = when {
videoFile != null -> Observable.just(videoFile)
val file = when {
videoFile != null -> videoFile
episodeCache.isImageInCache(video.videoUrl!!) -> copyVideoFromCache(episodeCache.getVideoFile(video.videoUrl!!), tmpDir, filename)
else -> {
if (preferences.useExternalDownloader().get() == download.changeDownloader) {
@ -461,22 +457,19 @@ class AnimeDownloader(
}
}
return pageObservable
// When the video is ready, set image path, progress (just in case) and status
.doOnNext { file ->
video.videoUrl = file.uri.path
video.progress = 100
download.downloadedImages++
video.status = Video.State.READY
}
.map { video }
// Mark this video as error and allow to download the remaining
.onErrorReturn {
video.progress = 0
video.status = Video.State.ERROR
notifier.onError(it.message, download.episode.name, download.anime.title)
video
}
// When the video is ready, set image path, progress (just in case) and status
try {
video.videoUrl = file.uri.path
video.progress = 100
download.downloadedImages++
video.status = Video.State.READY
} catch (e: Exception) {
video.progress = 0
video.status = Video.State.ERROR
notifier.onError(e.message, download.episode.name, download.anime.title)
}
return video
}
/**
@ -487,20 +480,24 @@ class AnimeDownloader(
* @param tmpDir the temporary directory of the download.
* @param filename the filename of the video.
*/
private fun downloadVideo(video: Video, download: AnimeDownload, tmpDir: UniFile, filename: String): Observable<UniFile> {
private suspend fun downloadVideo(video: Video, download: AnimeDownload, tmpDir: UniFile, filename: String): UniFile {
video.status = Video.State.DOWNLOAD_IMAGE
video.progress = 0
var tries = 0
return newObservable(video, download, tmpDir, filename)
// Retry 3 times, waiting 2, 4 and 8 seconds between attempts.
.onErrorResumeNext {
if (tries >= 2) {
return@onErrorResumeNext Observable.error(it)
}
// Define a suspend function to encapsulate the retry logic
suspend fun attemptDownload(): UniFile {
return try {
newObservable(video, download, tmpDir, filename)
} catch (e: Exception) {
if (tries >= 2) throw e
tries++
return@onErrorResumeNext Observable.timer((2 shl tries - 1) * 1000L, TimeUnit.MILLISECONDS)
.flatMap { newObservable(video, download, tmpDir, filename) }
delay((2 shl (tries - 1)) * 1000L)
attemptDownload()
}
}
return attemptDownload()
}
private fun isMpd(video: Video): Boolean {
@ -511,22 +508,19 @@ class AnimeDownloader(
return video.videoUrl?.toHttpUrl()?.encodedPath?.endsWith(".m3u8") ?: false
}
private fun ffmpegObservable(video: Video, download: AnimeDownload, tmpDir: UniFile, filename: String): Observable<UniFile> {
private suspend fun ffmpegObservable(video: Video, download: AnimeDownload, tmpDir: UniFile, filename: String): UniFile = coroutineScope {
isFFmpegRunning = true
val headers = video.headers ?: download.source.headers
val headerOptions = headers.joinToString("", "-headers '", "'") {
"${it.first}: ${it.second}\r\n"
}
val videoFile = tmpDir.findFile("$filename.tmp")
?: tmpDir.createFile("$filename.tmp")!!
val videoFile = tmpDir.findFile("$filename.tmp") ?: tmpDir.createFile("$filename.tmp")!!
val ffmpegFilename = { videoFile.uri.toFFmpegString(context) }
val ffmpegOptions = getFFmpegOptions(video, headerOptions, ffmpegFilename())
val ffprobeCommand = { file: String, ffprobeHeaders: String? ->
FFmpegKitConfig.parseArguments("${ffprobeHeaders?.plus(" ") ?: ""}-v error -show_entries format=duration -of default=noprint_wrappers=1:nokey=1 \"$file\"")
}
var duration = 0L
var nextLineIsDuration = false
val logCallback = LogCallback { log ->
@ -544,7 +538,6 @@ class AnimeDownloader(
}
}
val session = FFmpegSession.create(ffmpegOptions, {}, logCallback, {})
val inputDuration = getDuration(ffprobeCommand(video.videoUrl!!, headerOptions)) ?: 0F
duration = inputDuration.toLong()
FFmpegKitConfig.ffmpegExecute(session)
@ -557,12 +550,11 @@ class AnimeDownloader(
logcat(LogPriority.ERROR) { trace }
throw Exception("Error in ffmpeg!")
}
return Observable.just(session)
.map {
val file = tmpDir.findFile("$filename.tmp")
file?.renameTo("$filename.mkv")
file ?: throw Exception("Downloaded file not found")
}
val file = tmpDir.findFile("$filename.tmp")?.apply {
renameTo("$filename.mkv")
}
file ?: throw Exception("Downloaded file not found")
}
private fun parseTimeStringToSeconds(timeString: String): Long? {
@ -632,26 +624,24 @@ class AnimeDownloader(
return hours * 3600000L + minutes * 60000L + fullSeconds * 1000L + hundredths * 10L
}
private fun newObservable(video: Video, download: AnimeDownload, tmpDir: UniFile, filename: String): Observable<UniFile> {
private suspend fun newObservable(video: Video, download: AnimeDownload, tmpDir: UniFile, filename: String): UniFile {
return if (isHls(video) || isMpd(video)) {
ffmpegObservable(video, download, tmpDir, filename)
} else {
download.source.fetchVideo(video)
.map { response ->
val file = tmpDir.findFile("$filename.tmp") ?: tmpDir.createFile("$filename.tmp")
try {
response.body.source().saveTo(file.openOutputStream(true))
// val extension = getImageExtension(response, file)
// TODO: support other file formats!!
file.renameTo("$filename.mp4")
} catch (e: Exception) {
response.close()
if (!queueState.value.equals(download)) file.delete()
// file.delete()
throw e
}
file
}
val response = download.source.fetchVideo(video)
val file = tmpDir.findFile("$filename.tmp") ?: tmpDir.createFile("$filename.tmp")
try {
response.body.source().saveTo(file.openOutputStream(true))
// val extension = getImageExtension(response, file)
// TODO: support other file formats!!
file.renameTo("$filename.mp4")
} catch (e: Exception) {
response.close()
if (!queueState.value.equals(download)) file.delete()
// file.delete()
throw e
}
file
}
}
@ -663,70 +653,70 @@ class AnimeDownloader(
* @param tmpDir the temporary directory of the download.
* @param filename the filename of the video.
*/
private fun downloadVideoExternal(video: Video, source: AnimeHttpSource, tmpDir: UniFile, filename: String): Observable<UniFile> {
private fun downloadVideoExternal(video: Video, source: AnimeHttpSource, tmpDir: UniFile, filename: String): UniFile {
video.status = Video.State.DOWNLOAD_IMAGE
video.progress = 0
return Observable.just(tmpDir.createFile("$filename.mp4")).map {
try {
// TODO: support other file formats!!
// start download with intent
val pm = context.packageManager
val pkgName = preferences.externalDownloaderSelection().get()
val intent: Intent
if (!pkgName.isNullOrEmpty()) {
intent = pm.getLaunchIntentForPackage(pkgName)!!
when {
// 1DM
pkgName.startsWith("idm.internet.download.manager") -> {
intent.apply {
component = ComponentName(pkgName, "idm.internet.download.manager.Downloader")
action = Intent.ACTION_VIEW
data = Uri.parse(video.videoUrl)
putExtra("extra_filename", filename)
}
}
// ADM
pkgName.startsWith("com.dv.adm") -> {
val headers = (video.headers ?: source.headers).toList()
val bundle = Bundle()
headers.forEach { a -> bundle.putString(a.first, a.second.replace("http", "h_ttp")) }
intent.apply {
component = ComponentName(pkgName, "$pkgName.AEditor")
action = Intent.ACTION_VIEW
putExtra("com.dv.get.ACTION_LIST_ADD", "${Uri.parse(video.videoUrl)}<info>$filename.mp4")
putExtra("com.dv.get.ACTION_LIST_PATH", tmpDir.filePath!!.substringBeforeLast("_"))
putExtra("android.media.intent.extra.HTTP_HEADERS", bundle)
}
it.delete()
tmpDir.delete()
queueState.value.find { Anime -> Anime.video == video }?.let { download ->
download.status = AnimeDownload.State.DOWNLOADED
// Delete successful downloads from queue
if (download.status == AnimeDownload.State.DOWNLOADED) {
// Remove downloaded episode from queue
removeFromQueue(download)
}
if (areAllAnimeDownloadsFinished()) {
stop()
}
}
try {
val file = tmpDir.createFile("$filename.mp4")
// TODO: support other file formats!!
// start download with intent
val pm = context.packageManager
val pkgName = preferences.externalDownloaderSelection().get()
val intent: Intent
if (!pkgName.isNullOrEmpty()) {
intent = pm.getLaunchIntentForPackage(pkgName) ?: throw Exception("Launch intent not found")
when {
// 1DM
pkgName.startsWith("idm.internet.download.manager") -> {
intent.apply {
component = ComponentName(pkgName, "idm.internet.download.manager.Downloader")
action = Intent.ACTION_VIEW
data = Uri.parse(video.videoUrl)
putExtra("extra_filename", filename)
}
}
} else {
intent = Intent(Intent.ACTION_VIEW)
intent.apply {
addFlags(Intent.FLAG_ACTIVITY_NEW_TASK)
setDataAndType(Uri.parse(video.videoUrl), "video/*")
putExtra("extra_filename", filename)
// ADM
pkgName.startsWith("com.dv.adm") -> {
val headers = (video.headers ?: source.headers).toList()
val bundle = Bundle()
headers.forEach { a -> bundle.putString(a.first, a.second.replace("http", "h_ttp")) }
intent.apply {
component = ComponentName(pkgName, "$pkgName.AEditor")
action = Intent.ACTION_VIEW
putExtra("com.dv.get.ACTION_LIST_ADD", "${Uri.parse(video.videoUrl)}<info>$filename.mp4")
putExtra("com.dv.get.ACTION_LIST_PATH", tmpDir.filePath!!.substringBeforeLast("_"))
putExtra("android.media.intent.extra.HTTP_HEADERS", bundle)
}
file.delete()
tmpDir.delete()
queueState.value.find { Anime -> Anime.video == video }?.let { download ->
download.status = AnimeDownload.State.DOWNLOADED
// Delete successful downloads from queue
if (download.status == AnimeDownload.State.DOWNLOADED) {
// Remove downloaded episode from queue
removeFromQueue(download)
}
if (areAllAnimeDownloadsFinished()) {
stop()
}
}
}
}
context.startActivity(intent)
} catch (e: Exception) {
it.delete()
throw e
} else {
intent = Intent(Intent.ACTION_VIEW).apply {
addFlags(Intent.FLAG_ACTIVITY_NEW_TASK)
setDataAndType(Uri.parse(video.videoUrl), "video/*")
putExtra("extra_filename", filename)
}
}
it
context.startActivity(intent)
return file
} catch (e: Exception) {
tmpDir.findFile("$filename.mp4")?.delete()
throw e
}
}
@ -737,19 +727,19 @@ class AnimeDownloader(
* @param tmpDir the temporary directory of the download.
* @param filename the filename of the video.
*/
private fun copyVideoFromCache(cacheFile: File, tmpDir: UniFile, filename: String): Observable<UniFile> {
return Observable.just(cacheFile).map {
val tmpFile = tmpDir.createFile("$filename.tmp")
cacheFile.inputStream().use { input ->
tmpFile.openOutputStream().use { output ->
input.copyTo(output)
}
private fun copyVideoFromCache(cacheFile: File, tmpDir: UniFile, filename: String): UniFile {
val tmpFile = tmpDir.createFile("$filename.tmp")
cacheFile.inputStream().use { input ->
tmpFile.openOutputStream().use { output ->
input.copyTo(output)
}
val extension = ImageUtil.findImageType(cacheFile.inputStream()) ?: return@map tmpFile
tmpFile.renameTo("$filename.${extension.extension}")
cacheFile.delete()
tmpFile
}
val extension = ImageUtil.findImageType(cacheFile.inputStream())
if (extension != null) {
tmpFile.renameTo("$filename.${extension.extension}")
}
cacheFile.delete()
return tmpFile
}
/**
@ -833,7 +823,7 @@ class AnimeDownloader(
download.status = AnimeDownload.State.NOT_DOWNLOADED
}
}
queue - downloads
queue - downloads.toSet()
}
}

View file

@ -7,19 +7,26 @@ import eu.kanade.tachiyomi.animesource.model.SAnime
import eu.kanade.tachiyomi.animesource.model.SEpisode
import eu.kanade.tachiyomi.animesource.model.Video
import eu.kanade.tachiyomi.network.GET
import eu.kanade.tachiyomi.network.HttpException
import eu.kanade.tachiyomi.network.NetworkHelper
import eu.kanade.tachiyomi.network.asObservableSuccess
import eu.kanade.tachiyomi.network.newCachelessCallWithProgress
import okhttp3.Call
import okhttp3.Callback
import okhttp3.Headers
import okhttp3.OkHttpClient
import okhttp3.Request
import okhttp3.Response
import rx.Observable
import uy.kohesive.injekt.injectLazy
import java.io.IOException
import java.net.URI
import java.net.URISyntaxException
import java.security.MessageDigest
import java.util.concurrent.TimeUnit
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine
/**
* A simple implementation for sources from a website.
@ -307,12 +314,29 @@ abstract class AnimeHttpSource : AnimeCatalogueSource {
*
* @param video the page whose source image has to be downloaded.
*/
fun fetchVideo(video: Video): Observable<Response> {
suspend fun fetchVideo(video: Video): Response {
val animeDownloadClient = client.newBuilder()
.callTimeout(30, TimeUnit.MINUTES)
.build()
return animeDownloadClient.newCachelessCallWithProgress(videoRequest(video, video.totalBytesDownloaded), video)
.asObservableSuccess()
return suspendCoroutine { continuation ->
animeDownloadClient.newCachelessCallWithProgress(videoRequest(video, video.totalBytesDownloaded), video)
.enqueue(
object : Callback {
override fun onFailure(call: Call, e: IOException) {
continuation.resumeWithException(e)
}
override fun onResponse(call: Call, response: Response) {
if (response.isSuccessful) {
continuation.resume(response)
} else {
continuation.resumeWithException(HttpException(response.code))
}
}
},
)
}
}
/**