Remove use of RxJava from TrackPresenter

This commit is contained in:
arkon 2021-01-04 14:47:23 -05:00
parent 8a792e6d76
commit 7eb0868791

View file

@ -11,12 +11,10 @@ import eu.kanade.tachiyomi.ui.base.presenter.BasePresenter
import eu.kanade.tachiyomi.util.lang.await
import eu.kanade.tachiyomi.util.lang.launchIO
import eu.kanade.tachiyomi.util.lang.launchUI
import eu.kanade.tachiyomi.util.lang.runAsObservable
import eu.kanade.tachiyomi.util.system.toast
import rx.Observable
import rx.Subscription
import rx.android.schedulers.AndroidSchedulers
import rx.schedulers.Schedulers
import kotlinx.coroutines.Job
import kotlinx.coroutines.async
import kotlinx.coroutines.awaitAll
import uy.kohesive.injekt.Injekt
import uy.kohesive.injekt.api.get
@ -33,59 +31,57 @@ class TrackPresenter(
private val loggedServices by lazy { trackManager.services.filter { it.isLogged } }
private var trackSubscription: Subscription? = null
private var searchSubscription: Subscription? = null
private var refreshSubscription: Subscription? = null
private var trackJob: Job? = null
private var searchJob: Job? = null
private var refreshJob: Job? = null
override fun onCreate(savedState: Bundle?) {
super.onCreate(savedState)
fetchTrackings()
}
fun fetchTrackings() {
trackSubscription?.let { remove(it) }
trackSubscription = db.getTracks(manga)
.asRxObservable()
.map { tracks ->
loggedServices.map { service ->
TrackItem(tracks.find { it.sync_id == service.id }, service)
}
private fun fetchTrackings() {
trackJob?.cancel()
trackJob = launchIO {
val tracks = db.getTracks(manga).await()
trackList = loggedServices.map { service ->
TrackItem(tracks.find { it.sync_id == service.id }, service)
}
.observeOn(AndroidSchedulers.mainThread())
.doOnNext { trackList = it }
.subscribeLatestCache(TrackController::onNextTrackings)
view?.onNextTrackings(trackList)
}
}
fun refresh() {
refreshSubscription?.let { remove(it) }
refreshSubscription = Observable.from(trackList)
.filter { it.track != null }
.flatMap { item ->
runAsObservable({ item.service.refresh(item.track!!) })
.flatMap { db.insertTrack(it).asRxObservable() }
.map { item }
.onErrorReturn { item }
refreshJob?.cancel()
refreshJob = launchIO {
try {
trackList
.filter { it.track != null }
.map {
async {
val track = it.service.refresh(it.track!!)
db.insertTrack(track).await()
}
}
.awaitAll()
view?.onRefreshDone()
} catch (e: Throwable) {
view?.onRefreshError(e)
}
.toList()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeFirst(
{ view, _ -> view.onRefreshDone() },
TrackController::onRefreshError
)
}
}
fun search(query: String, service: TrackService) {
searchSubscription?.let { remove(it) }
searchSubscription = runAsObservable({ service.search(query) })
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeLatestCache(
TrackController::onSearchResults,
TrackController::onSearchResultsError
)
searchJob?.cancel()
searchJob = launchIO {
try {
val results = service.search(query)
launchUI { view?.onSearchResults(results) }
} catch (e: Throwable) {
launchUI { view?.onSearchResultsError(e) }
}
}
}
fun registerTracking(item: Track?, service: TrackService) {
@ -115,12 +111,10 @@ class TrackPresenter(
db.insertTrack(track).await()
view?.onRefreshDone()
} catch (e: Throwable) {
launchUI {
view?.onRefreshError(e)
launchUI { view?.onRefreshError(e) }
// Restart on error to set old values
fetchTrackings()
}
// Restart on error to set old values
fetchTrackings()
}
}
}