Use Timeline interface to paginate

This commit is contained in:
Maxime NATUREL 2023-01-20 17:38:55 +01:00
parent 4cfd6d29fc
commit 492b8a012d
9 changed files with 143 additions and 76 deletions

View file

@ -26,6 +26,12 @@ interface PollHistoryService {
val loadingPeriodInDays: Int
/**
* This must be called when you don't need the service anymore.
* It ensures the underlying database get closed.
*/
fun dispose()
/**
* Ask to load more polls starting from last loaded polls for a period defined by
* [loadingPeriodInDays].

View file

@ -36,24 +36,24 @@ internal open class PollHistoryStatusEntity(
var currentTimestampTargetBackwardMs: Long? = null,
/**
* Timestamp of the oldest event synced in milliseconds.
* Timestamp of the oldest event synced once target has been reached in milliseconds.
*/
var oldestTimestampReachedMs: Long? = null,
var oldestTimestampTargetReachedMs: Long? = null,
/**
* Id of the oldest event synced.
*/
var oldestEventIdReached: String? = null,
/**
* Id of the most recent event synced.
*/
var mostRecentEventIdReached: String? = null,
/**
* Indicate whether all polls in a room have been synced in backward direction.
*/
var isEndOfPollsBackward: Boolean = false,
/**
* Token of the end of the last synced chunk in backward direction.
*/
var tokenEndBackward: String? = null,
/**
* Token of the start of the last synced chunk in forward direction.
*/
var tokenStartForward: String? = null,
) : RealmObject() {
companion object
@ -65,10 +65,10 @@ internal open class PollHistoryStatusEntity(
return PollHistoryStatusEntity(
roomId = roomId,
currentTimestampTargetBackwardMs = currentTimestampTargetBackwardMs,
oldestTimestampReachedMs = oldestTimestampReachedMs,
oldestTimestampTargetReachedMs = oldestTimestampTargetReachedMs,
oldestEventIdReached = oldestEventIdReached,
mostRecentEventIdReached = mostRecentEventIdReached,
isEndOfPollsBackward = isEndOfPollsBackward,
tokenEndBackward = tokenEndBackward,
tokenStartForward = tokenStartForward,
)
}
@ -76,7 +76,7 @@ internal open class PollHistoryStatusEntity(
* Indicate whether at least one poll sync has been fully completed backward for the given room.
*/
val hasCompletedASyncBackward: Boolean
get() = oldestTimestampReachedMs != null
get() = oldestTimestampTargetReachedMs != null
/**
* Indicate whether all polls in a room have been synced for the current timestamp target in backward direction.
@ -86,7 +86,7 @@ internal open class PollHistoryStatusEntity(
private fun checkIfCurrentTimestampTargetBackwardIsReached(): Boolean {
val currentTarget = currentTimestampTargetBackwardMs
val lastTarget = oldestTimestampReachedMs
val lastTarget = oldestTimestampTargetReachedMs
// last timestamp target should be older or equal to the current target
return currentTarget != null && lastTarget != null && lastTarget <= currentTarget
}
@ -95,7 +95,7 @@ internal open class PollHistoryStatusEntity(
* Compute the number of days of history currently synced.
*/
fun getNbSyncedDays(currentMs: Long): Int {
val oldestTimestamp = oldestTimestampReachedMs
val oldestTimestamp = oldestTimestampTargetReachedMs
return if (oldestTimestamp == null) {
0
} else {

View file

@ -77,11 +77,12 @@ internal class DefaultRoomFactory @Inject constructor(
) : RoomFactory {
override fun create(roomId: String): Room {
val timelineService = timelineServiceFactory.create(roomId)
return DefaultRoom(
roomId = roomId,
roomSummaryDataSource = roomSummaryDataSource,
roomCryptoService = roomCryptoServiceFactory.create(roomId),
timelineService = timelineServiceFactory.create(roomId),
timelineService = timelineService,
threadsService = threadsServiceFactory.create(roomId),
threadsLocalService = threadsLocalServiceFactory.create(roomId),
sendService = sendServiceFactory.create(roomId),
@ -101,7 +102,7 @@ internal class DefaultRoomFactory @Inject constructor(
roomVersionService = roomVersionServiceFactory.create(roomId),
viaParameterFinder = viaParameterFinder,
locationSharingService = locationSharingServiceFactory.create(roomId),
pollHistoryService = pollHistoryServiceFactory.create(roomId),
pollHistoryService = pollHistoryServiceFactory.create(roomId, timelineService),
coroutineDispatchers = coroutineDispatchers
)
}

View file

@ -28,6 +28,8 @@ import org.matrix.android.sdk.api.session.events.model.EventType
import org.matrix.android.sdk.api.session.room.poll.LoadedPollsStatus
import org.matrix.android.sdk.api.session.room.poll.PollHistoryService
import org.matrix.android.sdk.api.session.room.timeline.TimelineEvent
import org.matrix.android.sdk.api.session.room.timeline.TimelineService
import org.matrix.android.sdk.api.session.room.timeline.TimelineSettings
import org.matrix.android.sdk.internal.database.mapper.TimelineEventMapper
import org.matrix.android.sdk.internal.database.model.PollHistoryStatusEntity
import org.matrix.android.sdk.internal.database.model.PollHistoryStatusEntityFields
@ -43,6 +45,7 @@ private const val EVENTS_PAGE_SIZE = 250
// TODO add unit tests
internal class DefaultPollHistoryService @AssistedInject constructor(
@Assisted private val roomId: String,
@Assisted private val timelineService: TimelineService,
@SessionDatabase private val monarchy: Monarchy,
private val clock: Clock,
private val loadMorePollsTask: LoadMorePollsTask,
@ -52,14 +55,30 @@ internal class DefaultPollHistoryService @AssistedInject constructor(
@AssistedFactory
interface Factory {
fun create(roomId: String): DefaultPollHistoryService
fun create(roomId: String, timelineService: TimelineService): DefaultPollHistoryService
}
override val loadingPeriodInDays: Int
get() = LOADING_PERIOD_IN_DAYS
private val timeline by lazy {
// TODO check if we need to add a way to avoid using the current filter in rooms
val settings = TimelineSettings(
initialSize = EVENTS_PAGE_SIZE,
buildReadReceipts = false,
rootThreadEventId = null,
useLiveSenderInfo = false,
)
timelineService.createTimeline(eventId = null, settings = settings).also { it.start() }
}
override fun dispose() {
timeline.dispose()
}
override suspend fun loadMore(): LoadedPollsStatus {
val params = LoadMorePollsTask.Params(
timeline = timeline,
roomId = roomId,
currentTimestampMs = clock.epochMillis(),
loadingPeriodInDays = loadingPeriodInDays,
@ -78,6 +97,8 @@ internal class DefaultPollHistoryService @AssistedInject constructor(
override suspend fun syncPolls() {
// TODO unmock
// TODO when sync forward, jump to most recent event Id + paginate forward + jump to oldest eventId after
// TODO avoid possibility to call sync and loadMore at the same time from the service API, how?
delay(1000)
}
@ -85,7 +106,7 @@ internal class DefaultPollHistoryService @AssistedInject constructor(
val pollHistoryStatusLiveData = getPollHistoryStatus()
return Transformations.switchMap(pollHistoryStatusLiveData) { results ->
val oldestTimestamp = results.firstOrNull()?.oldestTimestampReachedMs ?: clock.epochMillis()
val oldestTimestamp = results.firstOrNull()?.oldestTimestampTargetReachedMs ?: clock.epochMillis()
Timber.d("oldestTimestamp=$oldestTimestamp")
getPollStartEventsAfter(oldestTimestamp)
}

View file

@ -17,25 +17,20 @@
package org.matrix.android.sdk.internal.session.room.poll
import com.zhuinden.monarchy.Monarchy
import org.matrix.android.sdk.api.session.events.model.isPoll
import org.matrix.android.sdk.api.session.events.model.isPollResponse
import org.matrix.android.sdk.api.session.room.poll.LoadedPollsStatus
import org.matrix.android.sdk.api.session.room.timeline.Timeline
import org.matrix.android.sdk.api.session.room.timeline.TimelineEvent
import org.matrix.android.sdk.internal.database.model.PollHistoryStatusEntity
import org.matrix.android.sdk.internal.database.query.getOrCreate
import org.matrix.android.sdk.internal.di.SessionDatabase
import org.matrix.android.sdk.internal.network.GlobalErrorReceiver
import org.matrix.android.sdk.internal.network.executeRequest
import org.matrix.android.sdk.internal.session.room.RoomAPI
import org.matrix.android.sdk.internal.session.room.event.FilterAndStoreEventsTask
import org.matrix.android.sdk.internal.session.room.poll.PollConstants.MILLISECONDS_PER_DAY
import org.matrix.android.sdk.internal.session.room.timeline.PaginationDirection
import org.matrix.android.sdk.internal.session.room.timeline.PaginationResponse
import org.matrix.android.sdk.internal.task.Task
import org.matrix.android.sdk.internal.util.awaitTransaction
import javax.inject.Inject
internal interface LoadMorePollsTask : Task<LoadMorePollsTask.Params, LoadedPollsStatus> {
data class Params(
val timeline: Timeline,
val roomId: String,
val currentTimestampMs: Long,
val loadingPeriodInDays: Int,
@ -45,16 +40,15 @@ internal interface LoadMorePollsTask : Task<LoadMorePollsTask.Params, LoadedPoll
internal class DefaultLoadMorePollsTask @Inject constructor(
@SessionDatabase private val monarchy: Monarchy,
private val roomAPI: RoomAPI,
private val globalErrorReceiver: GlobalErrorReceiver,
private val filterAndStoreEventsTask: FilterAndStoreEventsTask,
) : LoadMorePollsTask {
override suspend fun execute(params: LoadMorePollsTask.Params): LoadedPollsStatus {
var currentPollHistoryStatus = updatePollHistoryStatus(params)
params.timeline.restartWithEventId(eventId = currentPollHistoryStatus.oldestEventIdReached)
while (shouldFetchMoreEventsBackward(currentPollHistoryStatus)) {
currentPollHistoryStatus = fetchMorePollEventsBackward(params, currentPollHistoryStatus)
currentPollHistoryStatus = fetchMorePollEventsBackward(params)
}
// TODO
// check how it behaves when cancelling the process: it should resume where it was stopped
@ -77,7 +71,7 @@ internal class DefaultLoadMorePollsTask @Inject constructor(
return monarchy.awaitTransaction { realm ->
val status = PollHistoryStatusEntity.getOrCreate(realm, params.roomId)
val currentTargetTimestampMs = status.currentTimestampTargetBackwardMs
val lastTargetTimestampMs = status.oldestTimestampReachedMs
val lastTargetTimestampMs = status.oldestTimestampTargetReachedMs
val loadingPeriodMs: Long = MILLISECONDS_PER_DAY * params.loadingPeriodInDays.toLong()
if (currentTargetTimestampMs == null) {
// first load, compute the target timestamp
@ -91,62 +85,61 @@ internal class DefaultLoadMorePollsTask @Inject constructor(
}
}
private suspend fun fetchMorePollEventsBackward(
params: LoadMorePollsTask.Params,
status: PollHistoryStatusEntity
): PollHistoryStatusEntity {
val response = executeRequest(globalErrorReceiver) {
roomAPI.getRoomMessagesFrom(
private suspend fun fetchMorePollEventsBackward(params: LoadMorePollsTask.Params): PollHistoryStatusEntity {
val events = params.timeline.awaitPaginate(
direction = Timeline.Direction.BACKWARDS,
count = params.eventsPageSize,
)
val paginationState = params.timeline.getPaginationState(direction = Timeline.Direction.BACKWARDS)
return updatePollHistoryStatus(
roomId = params.roomId,
from = status.tokenEndBackward,
dir = PaginationDirection.BACKWARDS.value,
limit = params.eventsPageSize,
filter = null
events = events,
paginationState = paginationState,
)
}
filterAndStorePollEvents(roomId = params.roomId, paginationResponse = response)
return updatePollHistoryStatus(roomId = params.roomId, paginationResponse = response)
}
private suspend fun filterAndStorePollEvents(roomId: String, paginationResponse: PaginationResponse) {
val filterTaskParams = FilterAndStoreEventsTask.Params(
roomId = roomId,
events = paginationResponse.events,
filterPredicate = { it.isPoll() || it.isPollResponse() }
)
filterAndStoreEventsTask.execute(filterTaskParams)
}
private suspend fun updatePollHistoryStatus(roomId: String, paginationResponse: PaginationResponse): PollHistoryStatusEntity {
private suspend fun updatePollHistoryStatus(
roomId: String,
events: List<TimelineEvent>,
paginationState: Timeline.PaginationState,
): PollHistoryStatusEntity {
return monarchy.awaitTransaction { realm ->
val status = PollHistoryStatusEntity.getOrCreate(realm, roomId)
val tokenStartForward = status.tokenStartForward
val mostRecentEventIdReached = status.mostRecentEventIdReached
if (tokenStartForward == null) {
// save the start token for next forward call
status.tokenEndBackward = paginationResponse.start
if (mostRecentEventIdReached == null) {
// save it for next forward pagination
val mostRecentEvent = events
.maxByOrNull { it.root.originServerTs ?: Long.MIN_VALUE }
?.root
status.mostRecentEventIdReached = mostRecentEvent?.eventId
}
val oldestEventTimestamp = paginationResponse.events
.minByOrNull { it.originServerTs ?: Long.MAX_VALUE }
?.originServerTs
val oldestEvent = events
.minByOrNull { it.root.originServerTs ?: Long.MAX_VALUE }
?.root
val oldestEventTimestamp = oldestEvent?.originServerTs
val oldestEventId = oldestEvent?.eventId
val currentTargetTimestamp = status.currentTimestampTargetBackwardMs
if (paginationResponse.end == null) {
if (paginationState.hasMoreToLoad.not()) {
// start of the timeline is reached, there are no more events
status.isEndOfPollsBackward = true
if (oldestEventTimestamp != null && oldestEventTimestamp > 0) {
status.oldestTimestampReachedMs = oldestEventTimestamp
status.oldestTimestampTargetReachedMs = oldestEventTimestamp
}
} else if (oldestEventTimestamp != null && currentTargetTimestamp != null && oldestEventTimestamp <= currentTargetTimestamp) {
// target has been reached
status.oldestTimestampReachedMs = oldestEventTimestamp
status.tokenEndBackward = paginationResponse.end
} else {
status.tokenEndBackward = paginationResponse.end
status.oldestTimestampTargetReachedMs = oldestEventTimestamp
}
if(oldestEventId != null) {
// save it for next backward pagination
status.oldestEventIdReached = oldestEventId
}
// return a copy of the Realm object

View file

@ -23,13 +23,13 @@ import dagger.assisted.AssistedInject
import im.vector.app.core.di.MavericksAssistedViewModelFactory
import im.vector.app.core.di.hiltMavericksViewModelFactory
import im.vector.app.core.platform.VectorViewModel
import im.vector.app.features.roomprofile.polls.list.domain.DisposePollHistoryUseCase
import im.vector.app.features.roomprofile.polls.list.domain.GetPollsUseCase
import im.vector.app.features.roomprofile.polls.list.domain.LoadMorePollsUseCase
import im.vector.app.features.roomprofile.polls.list.domain.SyncPollsUseCase
import im.vector.app.features.roomprofile.polls.list.ui.PollSummaryMapper
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.mapNotNull
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch
@ -38,6 +38,7 @@ class RoomPollsViewModel @AssistedInject constructor(
private val getPollsUseCase: GetPollsUseCase,
private val loadMorePollsUseCase: LoadMorePollsUseCase,
private val syncPollsUseCase: SyncPollsUseCase,
private val disposePollHistoryUseCase: DisposePollHistoryUseCase,
private val pollSummaryMapper: PollSummaryMapper,
) : VectorViewModel<RoomPollsViewState, RoomPollsAction, RoomPollsViewEvent>(initialState) {
@ -54,6 +55,11 @@ class RoomPollsViewModel @AssistedInject constructor(
observePolls(roomId)
}
override fun onCleared() {
withState { disposePollHistoryUseCase.execute(it.roomId) }
super.onCleared()
}
private fun syncPolls(roomId: String) {
viewModelScope.launch {
setState { copy(isSyncing = true) }

View file

@ -38,6 +38,10 @@ class RoomPollDataSource @Inject constructor(
?: throw PollHistoryError.UnknownRoomError
}
fun dispose(roomId: String) {
getPollHistoryService(roomId).dispose()
}
fun getPolls(roomId: String): Flow<List<TimelineEvent>> {
return getPollHistoryService(roomId).getPollEvents().asFlow()
}

View file

@ -25,6 +25,10 @@ class RoomPollRepository @Inject constructor(
private val roomPollDataSource: RoomPollDataSource,
) {
fun dispose(roomId: String) {
roomPollDataSource.dispose(roomId)
}
fun getPolls(roomId: String): Flow<List<TimelineEvent>> {
return roomPollDataSource.getPolls(roomId)
}

View file

@ -0,0 +1,32 @@
/*
* Copyright (c) 2023 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.app.features.roomprofile.polls.list.domain
import im.vector.app.features.roomprofile.polls.list.data.RoomPollRepository
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.map
import org.matrix.android.sdk.api.session.room.timeline.TimelineEvent
import javax.inject.Inject
class DisposePollHistoryUseCase @Inject constructor(
private val roomPollRepository: RoomPollRepository,
) {
fun execute(roomId: String) {
roomPollRepository.dispose(roomId)
}
}