From e4bc8990fbe2aa4bc31977f1061bac0c70d7a58f Mon Sep 17 00:00:00 2001 From: Two-Ai <81279822+Two-Ai@users.noreply.github.com> Date: Sat, 21 Jan 2023 16:46:16 -0500 Subject: [PATCH] Replace RxJava in HttpPageLoader downloader (#8955) * Convert downloader Observable to flow Uses `runInterruptible` to turn the blocking call to `queue.take()` into a cancellable call. Flow collection is ended by cancelling the scope in `recycle`. This means the `HttpPageLoader` can't be reused after calling `recycle`, but this was true with the `Observable` as well.) * Convert load Observables to suspending function Inlining the Observables allows for some simplification of the error handling. Behavior should be otherwise identical. * Convert cleanup Completable to coroutine Uses global `launchIO`, not ideal but similar to previous behavior. Can't be scheduled on the local `scope` as this runs after `scope` is cancelled. --- .../ui/reader/loader/HttpPageLoader.kt | 135 +++++++----------- 1 file changed, 53 insertions(+), 82 deletions(-) diff --git a/app/src/main/java/eu/kanade/tachiyomi/ui/reader/loader/HttpPageLoader.kt b/app/src/main/java/eu/kanade/tachiyomi/ui/reader/loader/HttpPageLoader.kt index 2dfb45b16..c6db6f12b 100644 --- a/app/src/main/java/eu/kanade/tachiyomi/ui/reader/loader/HttpPageLoader.kt +++ b/app/src/main/java/eu/kanade/tachiyomi/ui/reader/loader/HttpPageLoader.kt @@ -6,16 +6,20 @@ import eu.kanade.tachiyomi.source.model.Page import eu.kanade.tachiyomi.source.online.HttpSource import eu.kanade.tachiyomi.ui.reader.model.ReaderChapter import eu.kanade.tachiyomi.ui.reader.model.ReaderPage -import eu.kanade.tachiyomi.util.lang.plusAssign -import eu.kanade.tachiyomi.util.system.logcat +import eu.kanade.tachiyomi.util.lang.awaitSingle +import eu.kanade.tachiyomi.util.lang.launchIO import kotlinx.coroutines.CancellationException -import logcat.LogPriority -import rx.Completable +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.cancel +import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.runInterruptible import rx.Observable import rx.schedulers.Schedulers import rx.subjects.PublishSubject import rx.subjects.SerializedSubject -import rx.subscriptions.CompositeSubscription import uy.kohesive.injekt.Injekt import uy.kohesive.injekt.api.get import java.util.concurrent.PriorityBlockingQueue @@ -31,33 +35,27 @@ class HttpPageLoader( private val chapterCache: ChapterCache = Injekt.get(), ) : PageLoader() { + private val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO) + /** * A queue used to manage requests one by one while allowing priorities. */ private val queue = PriorityBlockingQueue() - /** - * Current active subscriptions. - */ - private val subscriptions = CompositeSubscription() - private val preloadSize = 4 init { - subscriptions += Observable.defer { Observable.just(queue.take().page) } - .filter { it.status == Page.State.QUEUE } - .concatMap { source.fetchImageFromCacheThenNet(it) } - .repeat() - .subscribeOn(Schedulers.io()) - .subscribe( - { - }, - { error -> - if (error !is InterruptedException) { - logcat(LogPriority.ERROR, error) - } - }, - ) + scope.launchIO { + flow { + while (true) { + emit(runInterruptible { queue.take() }.page) + } + } + .filter { it.status == Page.State.QUEUE } + .collect { + loadPage(it) + } + } } /** @@ -65,21 +63,23 @@ class HttpPageLoader( */ override fun recycle() { super.recycle() - subscriptions.unsubscribe() + scope.cancel() queue.clear() // Cache current page list progress for online chapters to allow a faster reopen val pages = chapter.pages if (pages != null) { - Completable - .fromAction { + launchIO { + try { // Convert to pages without reader information val pagesToSave = pages.map { Page(it.index, it.url, it.imageUrl) } chapterCache.putPageListToCache(chapter.chapter.toDomainChapter()!!, pagesToSave) + } catch (e: Throwable) { + if (e is CancellationException) { + throw e + } } - .onErrorComplete() - .subscribeOn(Schedulers.io()) - .subscribe() + } } } @@ -192,61 +192,32 @@ class HttpPageLoader( } /** - * Returns an observable of the page with the downloaded image. + * Loads the page, retrieving the image URL and downloading the image if necessary. + * Downloaded images are stored in the chapter cache. * * @param page the page whose source image has to be downloaded. */ - private fun HttpSource.fetchImageFromCacheThenNet(page: ReaderPage): Observable { - return if (page.imageUrl.isNullOrEmpty()) { - getImageUrl(page).flatMap { getCachedImage(it) } - } else { - getCachedImage(page) + private suspend fun loadPage(page: ReaderPage) { + try { + if (page.imageUrl.isNullOrEmpty()) { + page.status = Page.State.LOAD_PAGE + page.imageUrl = source.fetchImageUrl(page).awaitSingle() + } + val imageUrl = page.imageUrl!! + + if (!chapterCache.isImageInCache(imageUrl)) { + page.status = Page.State.DOWNLOAD_IMAGE + val imageResponse = source.fetchImage(page).awaitSingle() + chapterCache.putImageToCache(imageUrl, imageResponse) + } + + page.stream = { chapterCache.getImageFile(imageUrl).inputStream() } + page.status = Page.State.READY + } catch (e: Throwable) { + page.status = Page.State.ERROR + if (e is CancellationException) { + throw e + } } } - - private fun HttpSource.getImageUrl(page: ReaderPage): Observable { - page.status = Page.State.LOAD_PAGE - return fetchImageUrl(page) - .doOnError { page.status = Page.State.ERROR } - .onErrorReturn { null } - .doOnNext { page.imageUrl = it } - .map { page } - } - - /** - * Returns an observable of the page that gets the image from the chapter or fallbacks to - * network and copies it to the cache calling [cacheImage]. - * - * @param page the page. - */ - private fun HttpSource.getCachedImage(page: ReaderPage): Observable { - val imageUrl = page.imageUrl ?: return Observable.just(page) - - return Observable.just(page) - .flatMap { - if (!chapterCache.isImageInCache(imageUrl)) { - cacheImage(page) - } else { - Observable.just(page) - } - } - .doOnNext { - page.stream = { chapterCache.getImageFile(imageUrl).inputStream() } - page.status = Page.State.READY - } - .doOnError { page.status = Page.State.ERROR } - .onErrorReturn { page } - } - - /** - * Returns an observable of the page that downloads the image to [ChapterCache]. - * - * @param page the page. - */ - private fun HttpSource.cacheImage(page: ReaderPage): Observable { - page.status = Page.State.DOWNLOAD_IMAGE - return fetchImage(page) - .doOnNext { chapterCache.putImageToCache(page.imageUrl!!, it) } - .map { page } - } }