Rx: fix startWith on mainThread

This commit is contained in:
ganfra 2020-01-23 10:18:22 +01:00
parent fee2ec6b66
commit c65f25d7ae
3 changed files with 37 additions and 10 deletions

View file

@ -20,6 +20,7 @@ import androidx.lifecycle.LiveData
import androidx.lifecycle.Observer
import io.reactivex.Observable
import io.reactivex.android.MainThreadDisposable
import io.reactivex.android.schedulers.AndroidSchedulers
import io.reactivex.schedulers.Schedulers
private class LiveDataObservable<T>(
@ -57,6 +58,14 @@ private class LiveDataObservable<T>(
}
}
fun <T> LiveData<T>.asObservable(): Observable<T> {
internal fun <T> LiveData<T>.asObservable(): Observable<T> {
return LiveDataObservable(this).observeOn(Schedulers.computation())
}
internal fun <T> Observable<T>.startWithCallable(supplier: () -> T): Observable<T> {
val startObservable = Observable
.fromCallable(supplier)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
return startWith(startObservable)
}

View file

@ -35,27 +35,37 @@ class RxRoom(private val room: Room) {
fun liveRoomSummary(): Observable<Optional<RoomSummary>> {
return room.getRoomSummaryLive().asObservable()
.startWith(room.roomSummary().toOptional())
.startWithCallable {
room.roomSummary().toOptional()
}
}
fun liveRoomMembers(queryParams: RoomMemberQueryParams): Observable<List<RoomMemberSummary>> {
return room.getRoomMembersLive(queryParams).asObservable()
.startWith(room.getRoomMembers(queryParams))
.startWithCallable {
room.getRoomMembers(queryParams)
}
}
fun liveAnnotationSummary(eventId: String): Observable<Optional<EventAnnotationsSummary>> {
return room.getEventAnnotationsSummaryLive(eventId).asObservable()
.startWith(room.getEventAnnotationsSummary(eventId).toOptional())
.startWithCallable {
room.getEventAnnotationsSummary(eventId).toOptional()
}
}
fun liveTimelineEvent(eventId: String): Observable<Optional<TimelineEvent>> {
return room.getTimeLineEventLive(eventId).asObservable()
.startWith(room.getTimeLineEvent(eventId).toOptional())
.startWithCallable {
room.getTimeLineEvent(eventId).toOptional()
}
}
fun liveStateEvent(eventType: String): Observable<Optional<Event>> {
return room.getStateEventLive(eventType).asObservable()
.startWith(room.getStateEvent(eventType).toOptional())
.startWithCallable {
room.getStateEvent(eventType).toOptional()
}
}
fun liveReadMarker(): Observable<Optional<String>> {

View file

@ -36,17 +36,23 @@ class RxSession(private val session: Session) {
fun liveRoomSummaries(queryParams: RoomSummaryQueryParams): Observable<List<RoomSummary>> {
return session.getRoomSummariesLive(queryParams).asObservable()
.startWith(session.getRoomSummaries(queryParams))
.startWithCallable {
session.getRoomSummaries(queryParams)
}
}
fun liveGroupSummaries(queryParams: GroupSummaryQueryParams): Observable<List<GroupSummary>> {
return session.getGroupSummariesLive(queryParams).asObservable()
.startWith(session.getGroupSummaries(queryParams))
.startWithCallable {
session.getGroupSummaries(queryParams)
}
}
fun liveBreadcrumbs(): Observable<List<RoomSummary>> {
return session.getBreadcrumbsLive().asObservable()
.startWith(session.getBreadcrumbs())
.startWithCallable {
session.getBreadcrumbs()
}
}
fun liveSyncState(): Observable<SyncState> {
@ -59,7 +65,9 @@ class RxSession(private val session: Session) {
fun liveUser(userId: String): Observable<Optional<User>> {
return session.getUserLive(userId).asObservable()
.startWith(session.getUser(userId).toOptional())
.startWithCallable {
session.getUser(userId).toOptional()
}
}
fun liveUsers(): Observable<List<User>> {