mirror of
https://github.com/SchildiChat/SchildiChat-android.git
synced 2025-03-18 04:08:44 +03:00
Timeline: change a bit when postPagination is triggered
This commit is contained in:
parent
b53433e61b
commit
1e2e9e1070
6 changed files with 77 additions and 38 deletions
|
@ -90,7 +90,7 @@ class TimelineSimpleBackPaginationTest : InstrumentedTest {
|
|||
assertEquals(false, bobTimeline.hasMoreToLoad(Timeline.Direction.BACKWARDS))
|
||||
|
||||
val onlySentEvents = runBlocking {
|
||||
bobTimeline.awaitSnapshot()
|
||||
bobTimeline.getSnapshot()
|
||||
}
|
||||
.filter {
|
||||
it.root.isTextMessage()
|
||||
|
|
|
@ -73,7 +73,6 @@ interface Timeline {
|
|||
/**
|
||||
* This is the same than the regular paginate method but waits for the results instead
|
||||
* of relying on the timeline listener.
|
||||
* Note that it will still trigger onTimelineUpdated internally.
|
||||
*/
|
||||
suspend fun awaitPaginate(direction: Direction, count: Int): List<TimelineEvent>
|
||||
|
||||
|
@ -90,7 +89,7 @@ interface Timeline {
|
|||
/**
|
||||
* Returns a snapshot of the timeline in his current state.
|
||||
*/
|
||||
suspend fun awaitSnapshot(): List<TimelineEvent>
|
||||
fun getSnapshot(): List<TimelineEvent>
|
||||
|
||||
interface Listener {
|
||||
/**
|
||||
|
|
|
@ -24,6 +24,10 @@ import kotlinx.coroutines.SupervisorJob
|
|||
import kotlinx.coroutines.android.asCoroutineDispatcher
|
||||
import kotlinx.coroutines.cancelChildren
|
||||
import kotlinx.coroutines.delay
|
||||
import kotlinx.coroutines.flow.MutableSharedFlow
|
||||
import kotlinx.coroutines.flow.launchIn
|
||||
import kotlinx.coroutines.flow.onEach
|
||||
import kotlinx.coroutines.flow.sample
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.withContext
|
||||
import okhttp3.internal.closeQuietly
|
||||
|
@ -72,6 +76,7 @@ internal class DefaultTimeline internal constructor(private val roomId: String,
|
|||
private val timelineDispatcher = BACKGROUND_HANDLER.asCoroutineDispatcher()
|
||||
private val timelineScope = CoroutineScope(SupervisorJob() + timelineDispatcher)
|
||||
private val sequencer = SemaphoreCoroutineSequencer()
|
||||
private val postSnapshotSignalFlow = MutableSharedFlow<Unit>(0)
|
||||
|
||||
private val strategyDependencies = LoadTimelineStrategy.Dependencies(
|
||||
timelineSettings = settings,
|
||||
|
@ -83,7 +88,7 @@ internal class DefaultTimeline internal constructor(private val roomId: String,
|
|||
timelineInput = timelineInput,
|
||||
timelineEventMapper = timelineEventMapper,
|
||||
threadsAwarenessHandler = threadsAwarenessHandler,
|
||||
onEventsUpdated = this::postSnapshot,
|
||||
onEventsUpdated = this::sendSignalToPostSnapshot,
|
||||
onLimitedTimeline = this::onLimitedTimeline,
|
||||
onNewTimelineEvents = this::onNewTimelineEvents
|
||||
)
|
||||
|
@ -95,7 +100,12 @@ internal class DefaultTimeline internal constructor(private val roomId: String,
|
|||
|
||||
override fun addListener(listener: Timeline.Listener): Boolean {
|
||||
listeners.add(listener)
|
||||
postSnapshot()
|
||||
timelineScope.launch {
|
||||
val snapshot = strategy.buildSnapshot()
|
||||
withContext(Dispatchers.Main) {
|
||||
tryOrNull { listener.onTimelineUpdated(snapshot) }
|
||||
}
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
|
@ -117,7 +127,9 @@ internal class DefaultTimeline internal constructor(private val roomId: String,
|
|||
val realm = Realm.getInstance(realmConfiguration)
|
||||
ensureReadReceiptAreLoaded(realm)
|
||||
backgroundRealm.set(realm)
|
||||
listenToPostSnapshotSignals()
|
||||
openAround(initialEventId)
|
||||
postSnapshot()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -148,7 +160,10 @@ internal class DefaultTimeline internal constructor(private val roomId: String,
|
|||
|
||||
override fun paginate(direction: Timeline.Direction, count: Int) {
|
||||
timelineScope.launch {
|
||||
loadMore(count, direction, fetchOnServerIfNeeded = true)
|
||||
val postSnapshot = loadMore(count, direction, fetchOnServerIfNeeded = true)
|
||||
if (postSnapshot) {
|
||||
postSnapshot()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -156,11 +171,11 @@ internal class DefaultTimeline internal constructor(private val roomId: String,
|
|||
withContext(timelineDispatcher) {
|
||||
loadMore(count, direction, fetchOnServerIfNeeded = true)
|
||||
}
|
||||
return awaitSnapshot()
|
||||
return getSnapshot()
|
||||
}
|
||||
|
||||
override suspend fun awaitSnapshot(): List<TimelineEvent> = withContext(timelineDispatcher) {
|
||||
strategy.buildSnapshot()
|
||||
override fun getSnapshot(): List<TimelineEvent> {
|
||||
return strategy.buildSnapshot()
|
||||
}
|
||||
|
||||
override fun getIndexOfEvent(eventId: String?): Int? {
|
||||
|
@ -176,7 +191,7 @@ internal class DefaultTimeline internal constructor(private val roomId: String,
|
|||
}.get()
|
||||
}
|
||||
|
||||
private suspend fun loadMore(count: Int, direction: Timeline.Direction, fetchOnServerIfNeeded: Boolean) {
|
||||
private suspend fun loadMore(count: Int, direction: Timeline.Direction, fetchOnServerIfNeeded: Boolean): Boolean {
|
||||
val baseLogMessage = "loadMore(count: $count, direction: $direction, roomId: $roomId, fetchOnServer: $fetchOnServerIfNeeded)"
|
||||
Timber.v("$baseLogMessage started")
|
||||
if (!isStarted.get()) {
|
||||
|
@ -185,11 +200,11 @@ internal class DefaultTimeline internal constructor(private val roomId: String,
|
|||
val currentState = getPaginationState(direction)
|
||||
if (!currentState.hasMoreToLoad) {
|
||||
Timber.v("$baseLogMessage : nothing more to load")
|
||||
return
|
||||
return false
|
||||
}
|
||||
if (currentState.loading) {
|
||||
Timber.v("$baseLogMessage : already loading")
|
||||
return
|
||||
return false
|
||||
}
|
||||
updateState(direction) {
|
||||
it.copy(loading = true)
|
||||
|
@ -200,6 +215,7 @@ internal class DefaultTimeline internal constructor(private val roomId: String,
|
|||
updateState(direction) {
|
||||
it.copy(loading = false, hasMoreToLoad = hasMoreToLoad)
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
private suspend fun openAround(eventId: String?) = withContext(timelineDispatcher) {
|
||||
|
@ -221,9 +237,10 @@ internal class DefaultTimeline internal constructor(private val roomId: String,
|
|||
direction = Timeline.Direction.BACKWARDS,
|
||||
fetchOnServerIfNeeded = false
|
||||
)
|
||||
Timber.v("$baseLogMessage finished")
|
||||
}
|
||||
|
||||
private suspend fun initPaginationStates(eventId: String?) {
|
||||
private fun initPaginationStates(eventId: String?) {
|
||||
updateState(Timeline.Direction.FORWARDS) {
|
||||
it.copy(loading = false, hasMoreToLoad = eventId != null)
|
||||
}
|
||||
|
@ -232,21 +249,40 @@ internal class DefaultTimeline internal constructor(private val roomId: String,
|
|||
}
|
||||
}
|
||||
|
||||
private fun sendSignalToPostSnapshot(withThrottling: Boolean) {
|
||||
timelineScope.launch {
|
||||
if (withThrottling) {
|
||||
postSnapshotSignalFlow.emit(Unit)
|
||||
} else {
|
||||
postSnapshot()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Suppress("EXPERIMENTAL_API_USAGE")
|
||||
private fun listenToPostSnapshotSignals() {
|
||||
postSnapshotSignalFlow
|
||||
.sample(150)
|
||||
.onEach {
|
||||
postSnapshot()
|
||||
}
|
||||
.launchIn(timelineScope)
|
||||
}
|
||||
|
||||
private fun onLimitedTimeline() {
|
||||
timelineScope.launch {
|
||||
initPaginationStates(null)
|
||||
loadMore(settings.initialSize, Timeline.Direction.BACKWARDS, false)
|
||||
postSnapshot()
|
||||
}
|
||||
}
|
||||
|
||||
private fun postSnapshot() {
|
||||
timelineScope.launch {
|
||||
val snapshot = strategy.buildSnapshot()
|
||||
Timber.v("Post snapshot of ${snapshot.size} items")
|
||||
withContext(Dispatchers.Main) {
|
||||
listeners.forEach {
|
||||
tryOrNull { it.onTimelineUpdated(snapshot) }
|
||||
}
|
||||
private suspend fun postSnapshot() {
|
||||
val snapshot = strategy.buildSnapshot()
|
||||
Timber.v("Post snapshot of ${snapshot.size} events")
|
||||
withContext(Dispatchers.Main) {
|
||||
listeners.forEach {
|
||||
tryOrNull { it.onTimelineUpdated(snapshot) }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -259,7 +295,7 @@ internal class DefaultTimeline internal constructor(private val roomId: String,
|
|||
}
|
||||
}
|
||||
|
||||
private suspend fun updateState(direction: Timeline.Direction, update: (Timeline.PaginationState) -> Timeline.PaginationState) {
|
||||
private fun updateState(direction: Timeline.Direction, update: (Timeline.PaginationState) -> Timeline.PaginationState) {
|
||||
val stateReference = when (direction) {
|
||||
Timeline.Direction.FORWARDS -> forwardState
|
||||
Timeline.Direction.BACKWARDS -> backwardState
|
||||
|
@ -272,10 +308,12 @@ internal class DefaultTimeline internal constructor(private val roomId: String,
|
|||
}
|
||||
}
|
||||
|
||||
private suspend fun postPaginationState(direction: Timeline.Direction, state: Timeline.PaginationState) = withContext(Dispatchers.Main) {
|
||||
Timber.v("Post $direction pagination state: $state ")
|
||||
listeners.forEach {
|
||||
tryOrNull { it.onStateUpdated(direction, state) }
|
||||
private fun postPaginationState(direction: Timeline.Direction, state: Timeline.PaginationState) {
|
||||
timelineScope.launch(Dispatchers.Main) {
|
||||
Timber.v("Post $direction pagination state: $state ")
|
||||
listeners.forEach {
|
||||
tryOrNull { it.onStateUpdated(direction, state) }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -71,7 +71,7 @@ internal class LoadTimelineStrategy(
|
|||
val timelineInput: TimelineInput,
|
||||
val timelineEventMapper: TimelineEventMapper,
|
||||
val threadsAwarenessHandler: ThreadsAwarenessHandler,
|
||||
val onEventsUpdated: () -> Unit,
|
||||
val onEventsUpdated: (Boolean) -> Unit,
|
||||
val onLimitedTimeline: () -> Unit,
|
||||
val onNewTimelineEvents: (List<String>) -> Unit
|
||||
)
|
||||
|
@ -110,7 +110,7 @@ internal class LoadTimelineStrategy(
|
|||
}
|
||||
if (uiEchoManager.onLocalEchoCreated(timelineEvent)) {
|
||||
dependencies.onNewTimelineEvents(listOf(timelineEvent.eventId))
|
||||
dependencies.onEventsUpdated()
|
||||
dependencies.onEventsUpdated(false)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -119,7 +119,7 @@ internal class LoadTimelineStrategy(
|
|||
return
|
||||
}
|
||||
if (uiEchoManager.onSendStateUpdated(eventId, sendState)) {
|
||||
dependencies.onEventsUpdated()
|
||||
dependencies.onEventsUpdated(false)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -37,15 +37,17 @@ internal class RealmSendingEventsDataSource(
|
|||
private val realm: AtomicReference<Realm>,
|
||||
private val uiEchoManager: UIEchoManager,
|
||||
private val timelineEventMapper: TimelineEventMapper,
|
||||
private val onEventsUpdated: () -> Unit
|
||||
private val onEventsUpdated: (Boolean) -> Unit
|
||||
) : SendingEventsDataSource {
|
||||
|
||||
private var roomEntity: RoomEntity? = null
|
||||
private var sendingTimelineEvents: RealmList<TimelineEventEntity>? = null
|
||||
private var frozenSendingTimelineEvents: RealmList<TimelineEventEntity>? = null
|
||||
|
||||
private val sendingTimelineEventsListener = RealmChangeListener<RealmList<TimelineEventEntity>> { events ->
|
||||
uiEchoManager.onSentEventsInDatabase(events.map { it.eventId })
|
||||
onEventsUpdated()
|
||||
frozenSendingTimelineEvents = sendingTimelineEvents?.freeze()
|
||||
onEventsUpdated(false)
|
||||
}
|
||||
|
||||
override fun start() {
|
||||
|
@ -65,7 +67,7 @@ internal class RealmSendingEventsDataSource(
|
|||
val builtSendingEvents = mutableListOf<TimelineEvent>()
|
||||
uiEchoManager.getInMemorySendingEvents()
|
||||
.addWithUiEcho(builtSendingEvents)
|
||||
sendingTimelineEvents?.freeze()
|
||||
frozenSendingTimelineEvents
|
||||
?.filter { timelineEvent ->
|
||||
builtSendingEvents.none { it.eventId == timelineEvent.eventId }
|
||||
}
|
||||
|
|
|
@ -56,7 +56,7 @@ internal class TimelineChunk(private val chunkEntity: ChunkEntity,
|
|||
private val uiEchoManager: UIEchoManager? = null,
|
||||
private val threadsAwarenessHandler: ThreadsAwarenessHandler,
|
||||
private val initialEventId: String?,
|
||||
private val onBuiltEvents: () -> Unit) {
|
||||
private val onBuiltEvents: (Boolean) -> Unit) {
|
||||
|
||||
private val isLastForward = AtomicBoolean(chunkEntity.isLastForward)
|
||||
private val isLastBackward = AtomicBoolean(chunkEntity.isLastBackward)
|
||||
|
@ -86,6 +86,7 @@ internal class TimelineChunk(private val chunkEntity: ChunkEntity,
|
|||
}
|
||||
|
||||
private val timelineEventCollectionListener = OrderedRealmCollectionChangeListener { results: RealmResults<TimelineEventEntity>, changeSet: OrderedCollectionChangeSet ->
|
||||
Timber.v("on timeline events chunk update")
|
||||
val frozenResults = results.freeze()
|
||||
handleDatabaseChangeSet(frozenResults, changeSet)
|
||||
}
|
||||
|
@ -135,9 +136,9 @@ internal class TimelineChunk(private val chunkEntity: ChunkEntity,
|
|||
} else if (direction == Timeline.Direction.BACKWARDS && prevChunk != null) {
|
||||
return prevChunk?.loadMore(count, direction, fetchOnServerIfNeeded) ?: LoadMoreResult.FAILURE
|
||||
}
|
||||
val loadFromDbCount = loadFromStorage(count, direction)
|
||||
Timber.v("Has loaded $loadFromDbCount items from storage")
|
||||
val offsetCount = count - loadFromDbCount
|
||||
val loadFromStorageCount = loadFromStorage(count, direction)
|
||||
Timber.v("Has loaded $loadFromStorageCount items from storage in $direction")
|
||||
val offsetCount = count - loadFromStorageCount
|
||||
return if (direction == Timeline.Direction.FORWARDS && isLastForward.get()) {
|
||||
LoadMoreResult.REACHED_END
|
||||
} else if (direction == Timeline.Direction.BACKWARDS && isLastBackward.get()) {
|
||||
|
@ -289,7 +290,6 @@ internal class TimelineChunk(private val chunkEntity: ChunkEntity,
|
|||
builtEvents.add(timelineEvent)
|
||||
}
|
||||
}
|
||||
onBuiltEvents()
|
||||
return timelineEvents.size
|
||||
}
|
||||
|
||||
|
@ -412,7 +412,7 @@ internal class TimelineChunk(private val chunkEntity: ChunkEntity,
|
|||
}
|
||||
}
|
||||
if (insertions.isNotEmpty() || modifications.isNotEmpty()) {
|
||||
onBuiltEvents()
|
||||
onBuiltEvents(true)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue