mirror of
https://github.com/SchildiChat/SchildiChat-android.git
synced 2024-11-27 03:49:04 +03:00
Timeline: try new strategy for handling chunks (no merging)
This commit is contained in:
parent
e1f7ab8669
commit
9238037067
14 changed files with 929 additions and 754 deletions
|
@ -95,6 +95,8 @@ interface Timeline {
|
|||
*/
|
||||
fun getTimelineEventWithId(eventId: String?): TimelineEvent?
|
||||
|
||||
fun getPaginationState(direction: Direction): PaginationState
|
||||
|
||||
interface Listener {
|
||||
/**
|
||||
* Call when the timeline has been updated through pagination or sync.
|
||||
|
@ -112,8 +114,21 @@ interface Timeline {
|
|||
* Called when new events come through the sync
|
||||
*/
|
||||
fun onNewTimelineEvents(eventIds: List<String>)
|
||||
|
||||
fun onStateUpdated() {
|
||||
//NOOP
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Pagination state
|
||||
*/
|
||||
data class PaginationState(
|
||||
val hasMoreToLoad: Boolean = true,
|
||||
val loading: Boolean = false,
|
||||
val inError: Boolean = false
|
||||
)
|
||||
|
||||
/**
|
||||
* This is used to paginate in one or another direction.
|
||||
*/
|
||||
|
|
|
@ -16,22 +16,13 @@
|
|||
|
||||
package org.matrix.android.sdk.internal.database
|
||||
|
||||
import io.realm.Realm
|
||||
import io.realm.RealmConfiguration
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.launch
|
||||
import org.matrix.android.sdk.api.session.Session
|
||||
import org.matrix.android.sdk.internal.database.helper.nextDisplayIndex
|
||||
import org.matrix.android.sdk.internal.database.model.ChunkEntity
|
||||
import org.matrix.android.sdk.internal.database.model.ChunkEntityFields
|
||||
import org.matrix.android.sdk.internal.database.model.EventEntity
|
||||
import org.matrix.android.sdk.internal.database.model.RoomEntity
|
||||
import org.matrix.android.sdk.internal.database.model.TimelineEventEntity
|
||||
import org.matrix.android.sdk.internal.database.model.TimelineEventEntityFields
|
||||
import org.matrix.android.sdk.internal.database.model.deleteOnCascade
|
||||
import org.matrix.android.sdk.internal.di.SessionDatabase
|
||||
import org.matrix.android.sdk.api.session.SessionLifecycleObserver
|
||||
import org.matrix.android.sdk.internal.session.room.timeline.PaginationDirection
|
||||
import org.matrix.android.sdk.internal.database.model.RoomEntity
|
||||
import org.matrix.android.sdk.internal.di.SessionDatabase
|
||||
import org.matrix.android.sdk.internal.task.TaskExecutor
|
||||
import timber.log.Timber
|
||||
import javax.inject.Inject
|
||||
|
@ -53,11 +44,12 @@ internal class DatabaseCleaner @Inject constructor(@SessionDatabase private val
|
|||
awaitTransaction(realmConfiguration) { realm ->
|
||||
val allRooms = realm.where(RoomEntity::class.java).findAll()
|
||||
Timber.v("There are ${allRooms.size} rooms in this session")
|
||||
cleanUp(realm, MAX_NUMBER_OF_EVENTS_IN_DB / 2L)
|
||||
//cleanUp(realm, MAX_NUMBER_OF_EVENTS_IN_DB / 2L)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
private fun cleanUp(realm: Realm, threshold: Long) {
|
||||
val numberOfEvents = realm.where(EventEntity::class.java).findAll().size
|
||||
val numberOfTimelineEvents = realm.where(TimelineEventEntity::class.java).findAll().size
|
||||
|
@ -75,7 +67,7 @@ internal class DatabaseCleaner @Inject constructor(@SessionDatabase private val
|
|||
val thresholdDisplayIndex = maxDisplayIndex - threshold
|
||||
val eventsToRemove = chunk.timelineEvents.where().lessThan(TimelineEventEntityFields.DISPLAY_INDEX, thresholdDisplayIndex).findAll()
|
||||
Timber.v("There are ${eventsToRemove.size} events to clean in chunk: ${chunk.identifier()} from room ${chunk.room?.first()?.roomId}")
|
||||
chunk.numberOfTimelineEvents = chunk.numberOfTimelineEvents - eventsToRemove.size
|
||||
//chunk.numberOfTimelineEvents = chunk.numberOfTimelineEvents - eventsToRemove.size
|
||||
eventsToRemove.forEach {
|
||||
val canDeleteRoot = it.root?.stateKey == null
|
||||
it.deleteOnCascade(canDeleteRoot)
|
||||
|
@ -86,4 +78,6 @@ internal class DatabaseCleaner @Inject constructor(@SessionDatabase private val
|
|||
cleanUp(realm, (threshold / 1.5).toLong())
|
||||
}
|
||||
}
|
||||
|
||||
*/
|
||||
}
|
||||
|
|
|
@ -72,6 +72,7 @@ internal class SessionRealmConfigurationFactory @Inject constructor(
|
|||
.modules(SessionRealmModule())
|
||||
.schemaVersion(RealmSessionStoreMigration.SESSION_STORE_SCHEMA_VERSION)
|
||||
.migration(RealmSessionStoreMigration)
|
||||
.deleteRealmIfMigrationNeeded()
|
||||
.build()
|
||||
|
||||
// Try creating a realm instance and if it succeeds we can clear the flag
|
||||
|
|
|
@ -110,7 +110,7 @@ internal fun ChunkEntity.addTimelineEvent(roomId: String,
|
|||
true
|
||||
}
|
||||
}
|
||||
numberOfTimelineEvents++
|
||||
//numberOfTimelineEvents++
|
||||
timelineEvents.add(timelineEventEntity)
|
||||
}
|
||||
|
||||
|
|
|
@ -27,9 +27,11 @@ import org.matrix.android.sdk.internal.extensions.clearWith
|
|||
internal open class ChunkEntity(@Index var prevToken: String? = null,
|
||||
// Because of gaps we can have several chunks with nextToken == null
|
||||
@Index var nextToken: String? = null,
|
||||
var prevChunk: ChunkEntity? = null,
|
||||
var nextChunk: ChunkEntity? = null,
|
||||
var stateEvents: RealmList<EventEntity> = RealmList(),
|
||||
var timelineEvents: RealmList<TimelineEventEntity> = RealmList(),
|
||||
var numberOfTimelineEvents: Long = 0,
|
||||
//var numberOfTimelineEvents: Long = 0,
|
||||
// Only one chunk will have isLastForward == true
|
||||
@Index var isLastForward: Boolean = false,
|
||||
@Index var isLastBackward: Boolean = false
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
* Copyright (c) 2021 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -16,168 +16,257 @@
|
|||
|
||||
package org.matrix.android.sdk.internal.session.room.timeline
|
||||
|
||||
import io.realm.OrderedCollectionChangeSet
|
||||
import io.realm.OrderedRealmCollectionChangeListener
|
||||
import io.realm.Realm
|
||||
import io.realm.RealmConfiguration
|
||||
import io.realm.RealmQuery
|
||||
import io.realm.RealmResults
|
||||
import io.realm.Sort
|
||||
import org.matrix.android.sdk.api.MatrixCallback
|
||||
import org.matrix.android.sdk.api.extensions.orFalse
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.SupervisorJob
|
||||
import kotlinx.coroutines.android.asCoroutineDispatcher
|
||||
import kotlinx.coroutines.cancelChildren
|
||||
import kotlinx.coroutines.delay
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.withContext
|
||||
import okhttp3.internal.closeQuietly
|
||||
import org.matrix.android.sdk.api.extensions.tryOrNull
|
||||
import org.matrix.android.sdk.api.session.events.model.EventType
|
||||
import org.matrix.android.sdk.api.session.room.send.SendState
|
||||
import org.matrix.android.sdk.api.session.room.timeline.Timeline
|
||||
import org.matrix.android.sdk.api.session.room.timeline.TimelineEvent
|
||||
import org.matrix.android.sdk.api.session.room.timeline.TimelineSettings
|
||||
import org.matrix.android.sdk.api.util.CancelableBag
|
||||
import org.matrix.android.sdk.internal.database.RealmSessionProvider
|
||||
import org.matrix.android.sdk.internal.database.mapper.TimelineEventMapper
|
||||
import org.matrix.android.sdk.internal.database.model.ChunkEntity
|
||||
import org.matrix.android.sdk.internal.database.model.RoomEntity
|
||||
import org.matrix.android.sdk.internal.database.model.TimelineEventEntity
|
||||
import org.matrix.android.sdk.internal.database.model.TimelineEventEntityFields
|
||||
import org.matrix.android.sdk.internal.database.query.findAllInRoomWithSendStates
|
||||
import org.matrix.android.sdk.internal.database.query.where
|
||||
import org.matrix.android.sdk.internal.database.query.whereRoomId
|
||||
import org.matrix.android.sdk.internal.session.room.membership.LoadRoomMembersTask
|
||||
import org.matrix.android.sdk.internal.session.sync.ReadReceiptHandler
|
||||
import org.matrix.android.sdk.internal.task.TaskExecutor
|
||||
import org.matrix.android.sdk.internal.task.configureWith
|
||||
import org.matrix.android.sdk.internal.util.Debouncer
|
||||
import org.matrix.android.sdk.internal.task.SemaphoreCoroutineSequencer
|
||||
import org.matrix.android.sdk.internal.util.createBackgroundHandler
|
||||
import org.matrix.android.sdk.internal.util.createUIHandler
|
||||
import timber.log.Timber
|
||||
import java.util.Collections
|
||||
import java.util.UUID
|
||||
import java.util.concurrent.CopyOnWriteArrayList
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
import kotlin.math.max
|
||||
|
||||
private const val MIN_FETCHING_COUNT = 30
|
||||
|
||||
internal class DefaultTimeline(
|
||||
private val roomId: String,
|
||||
private var initialEventId: String? = null,
|
||||
private val realmConfiguration: RealmConfiguration,
|
||||
private val taskExecutor: TaskExecutor,
|
||||
private val contextOfEventTask: GetContextOfEventTask,
|
||||
private val fetchTokenAndPaginateTask: FetchTokenAndPaginateTask,
|
||||
private val paginationTask: PaginationTask,
|
||||
private val timelineEventMapper: TimelineEventMapper,
|
||||
private val settings: TimelineSettings,
|
||||
private val timelineInput: TimelineInput,
|
||||
private val eventDecryptor: TimelineEventDecryptor,
|
||||
private val realmSessionProvider: RealmSessionProvider,
|
||||
private val loadRoomMembersTask: LoadRoomMembersTask,
|
||||
private val readReceiptHandler: ReadReceiptHandler
|
||||
) : Timeline,
|
||||
TimelineInput.Listener,
|
||||
UIEchoManager.Listener {
|
||||
class DefaultTimeline internal constructor(private val roomId: String,
|
||||
private val initialEventId: String?,
|
||||
private val realmConfiguration: RealmConfiguration,
|
||||
private val loadRoomMembersTask: LoadRoomMembersTask,
|
||||
private val readReceiptHandler: ReadReceiptHandler,
|
||||
paginationTask: PaginationTask,
|
||||
getEventTask: GetContextOfEventTask,
|
||||
fetchTokenAndPaginateTask: FetchTokenAndPaginateTask,
|
||||
timelineEventMapper: TimelineEventMapper,
|
||||
timelineInput: TimelineInput,
|
||||
eventDecryptor: TimelineEventDecryptor) : Timeline {
|
||||
|
||||
companion object {
|
||||
val BACKGROUND_HANDLER = createBackgroundHandler("TIMELINE_DB_THREAD")
|
||||
val BACKGROUND_HANDLER = createBackgroundHandler("SimpleTimeline_Thread")
|
||||
}
|
||||
|
||||
private val listeners = CopyOnWriteArrayList<Timeline.Listener>()
|
||||
private val isStarted = AtomicBoolean(false)
|
||||
private val isReady = AtomicBoolean(false)
|
||||
private val mainHandler = createUIHandler()
|
||||
private val backgroundRealm = AtomicReference<Realm>()
|
||||
private val cancelableBag = CancelableBag()
|
||||
private val debouncer = Debouncer(mainHandler)
|
||||
|
||||
private lateinit var timelineEvents: RealmResults<TimelineEventEntity>
|
||||
private lateinit var sendingEvents: RealmResults<TimelineEventEntity>
|
||||
|
||||
private var prevDisplayIndex: Int? = null
|
||||
private var nextDisplayIndex: Int? = null
|
||||
|
||||
private val uiEchoManager = UIEchoManager(settings, this)
|
||||
|
||||
private val builtEvents = Collections.synchronizedList<TimelineEvent>(ArrayList())
|
||||
private val builtEventsIdMap = Collections.synchronizedMap(HashMap<String, Int>())
|
||||
private val backwardsState = AtomicReference(TimelineState())
|
||||
private val forwardsState = AtomicReference(TimelineState())
|
||||
|
||||
override val timelineID = UUID.randomUUID().toString()
|
||||
|
||||
override val isLive
|
||||
get() = !hasMoreToLoad(Timeline.Direction.FORWARDS)
|
||||
private val listeners = CopyOnWriteArrayList<Timeline.Listener>()
|
||||
private val isStarted = AtomicBoolean(false)
|
||||
private val forwardState = AtomicReference(Timeline.PaginationState())
|
||||
private val backwardState = AtomicReference(Timeline.PaginationState())
|
||||
|
||||
private val eventsChangeListener = OrderedRealmCollectionChangeListener<RealmResults<TimelineEventEntity>> { results, changeSet ->
|
||||
if (!results.isLoaded || !results.isValid) {
|
||||
return@OrderedRealmCollectionChangeListener
|
||||
}
|
||||
Timber.v("## SendEvent: [${System.currentTimeMillis()}] DB update for room $roomId")
|
||||
handleUpdates(results, changeSet)
|
||||
private val backgroundRealm = AtomicReference<Realm>()
|
||||
private val timelineDispatcher = BACKGROUND_HANDLER.asCoroutineDispatcher()
|
||||
private val timelineScope = CoroutineScope(SupervisorJob() + timelineDispatcher)
|
||||
private val sequencer = SemaphoreCoroutineSequencer()
|
||||
|
||||
private val strategyDependencies = LoadTimelineStrategy.Dependencies(
|
||||
eventDecryptor = eventDecryptor,
|
||||
paginationTask = paginationTask,
|
||||
fetchTokenAndPaginateTask = fetchTokenAndPaginateTask,
|
||||
timelineInput = timelineInput,
|
||||
timelineEventMapper = timelineEventMapper,
|
||||
realm = backgroundRealm,
|
||||
getContextOfEventTask = getEventTask,
|
||||
onEventsUpdated = this::postSnapshot
|
||||
)
|
||||
private var strategy: LoadTimelineStrategy = buildStrategy(LoadTimelineStrategy.Mode.Default)
|
||||
|
||||
override val isLive: Boolean
|
||||
get() = !getPaginationState(Timeline.Direction.FORWARDS).hasMoreToLoad
|
||||
|
||||
override fun addListener(listener: Timeline.Listener): Boolean {
|
||||
listeners.add(listener)
|
||||
postSnapshot()
|
||||
return true
|
||||
}
|
||||
|
||||
// Public methods ******************************************************************************
|
||||
override fun removeListener(listener: Timeline.Listener): Boolean {
|
||||
return listeners.remove(listener)
|
||||
}
|
||||
|
||||
override fun removeAllListeners() {
|
||||
listeners.clear()
|
||||
}
|
||||
|
||||
override fun start() {
|
||||
timelineScope.launch {
|
||||
loadRoomMemberIfNeeded()
|
||||
}
|
||||
timelineScope.launch {
|
||||
sequencer.post {
|
||||
if (isStarted.compareAndSet(false, true)) {
|
||||
val realm = Realm.getInstance(realmConfiguration)
|
||||
ensureReadReceiptAreLoaded(realm)
|
||||
backgroundRealm.set(realm)
|
||||
openAround(initialEventId)
|
||||
strategy.onStart()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun dispose() {
|
||||
timelineScope.coroutineContext.cancelChildren()
|
||||
timelineScope.launch {
|
||||
sequencer.post {
|
||||
if (isStarted.compareAndSet(true, false)) {
|
||||
strategy.onStop()
|
||||
backgroundRealm.get().closeQuietly()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun restartWithEventId(eventId: String?) {
|
||||
timelineScope.launch {
|
||||
openAround(eventId)
|
||||
}
|
||||
}
|
||||
|
||||
override fun hasMoreToLoad(direction: Timeline.Direction): Boolean {
|
||||
return getPaginationState(direction).hasMoreToLoad
|
||||
}
|
||||
|
||||
override fun paginate(direction: Timeline.Direction, count: Int) {
|
||||
BACKGROUND_HANDLER.post {
|
||||
if (!canPaginate(direction)) {
|
||||
return@post
|
||||
}
|
||||
Timber.v("Paginate $direction of $count items")
|
||||
val startDisplayIndex = if (direction == Timeline.Direction.BACKWARDS) prevDisplayIndex else nextDisplayIndex
|
||||
val shouldPostSnapshot = paginateInternal(startDisplayIndex, direction, count)
|
||||
if (shouldPostSnapshot) {
|
||||
postSnapshot()
|
||||
}
|
||||
timelineScope.launch {
|
||||
loadMore(count.toLong(), direction)
|
||||
}
|
||||
}
|
||||
|
||||
override fun pendingEventCount(): Int {
|
||||
return realmSessionProvider.withRealm {
|
||||
RoomEntity.where(it, roomId).findFirst()?.sendingTimelineEvents?.count() ?: 0
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
override fun failedToDeliverEventCount(): Int {
|
||||
return realmSessionProvider.withRealm {
|
||||
TimelineEventEntity.findAllInRoomWithSendStates(it, roomId, SendState.HAS_FAILED_STATES).count()
|
||||
return 0
|
||||
}
|
||||
|
||||
override fun getIndexOfEvent(eventId: String?): Int? {
|
||||
if (eventId == null) return null
|
||||
return strategy.getBuiltEventIndex(eventId)
|
||||
}
|
||||
|
||||
override fun getTimelineEventAtIndex(index: Int): TimelineEvent? {
|
||||
return null
|
||||
}
|
||||
|
||||
override fun getTimelineEventWithId(eventId: String?): TimelineEvent? {
|
||||
if (eventId == null) return null
|
||||
return strategy.getBuiltEvent(eventId)
|
||||
}
|
||||
|
||||
override fun getPaginationState(direction: Timeline.Direction): Timeline.PaginationState {
|
||||
return if (direction == Timeline.Direction.BACKWARDS) {
|
||||
backwardState
|
||||
} else {
|
||||
forwardState
|
||||
}.get()
|
||||
}
|
||||
|
||||
private suspend fun loadMore(count: Long, direction: Timeline.Direction) = withContext(timelineDispatcher) {
|
||||
val baseLogMessage = "loadMore(count: $count, direction: $direction, roomId: $roomId)"
|
||||
Timber.v("$baseLogMessage started")
|
||||
if (!isStarted.get()) {
|
||||
throw IllegalStateException("You should call start before using timeline")
|
||||
}
|
||||
val currentState = getPaginationState(direction)
|
||||
if (!currentState.hasMoreToLoad) {
|
||||
Timber.v("$baseLogMessage : nothing more to load")
|
||||
return@withContext
|
||||
}
|
||||
if (currentState.loading) {
|
||||
Timber.v("$baseLogMessage : already loading")
|
||||
return@withContext
|
||||
}
|
||||
updateState(direction) {
|
||||
it.copy(loading = true)
|
||||
}
|
||||
val loadMoreResult = strategy.loadMore(count, direction)
|
||||
val hasMoreToLoad = loadMoreResult != LoadMoreResult.REACHED_END
|
||||
updateState(direction) {
|
||||
it.copy(loading = false, hasMoreToLoad = hasMoreToLoad)
|
||||
}
|
||||
postSnapshot()
|
||||
}
|
||||
|
||||
private suspend fun openAround(eventId: String?) = withContext(timelineDispatcher) {
|
||||
val baseLogMessage = "openAround(eventId: $eventId)"
|
||||
Timber.v("$baseLogMessage started")
|
||||
if (!isStarted.get()) {
|
||||
throw IllegalStateException("You should call start before using timeline")
|
||||
}
|
||||
strategy.onStop()
|
||||
strategy = if (eventId == null) {
|
||||
buildStrategy(LoadTimelineStrategy.Mode.Default)
|
||||
} else {
|
||||
buildStrategy(LoadTimelineStrategy.Mode.Permalink(eventId))
|
||||
}
|
||||
updateState(Timeline.Direction.FORWARDS) {
|
||||
it.copy(loading = false, hasMoreToLoad = eventId != null)
|
||||
}
|
||||
updateState(Timeline.Direction.BACKWARDS) {
|
||||
it.copy(loading = false, hasMoreToLoad = true)
|
||||
}
|
||||
strategy.onStart()
|
||||
postSnapshot()
|
||||
}
|
||||
|
||||
private fun postSnapshot() {
|
||||
timelineScope.launch {
|
||||
val snapshot = strategy.buildSnapshot()
|
||||
withContext(Dispatchers.Main) {
|
||||
listeners.forEach {
|
||||
tryOrNull { it.onTimelineUpdated(snapshot) }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun start() {
|
||||
if (isStarted.compareAndSet(false, true)) {
|
||||
Timber.v("Start timeline for roomId: $roomId and eventId: $initialEventId")
|
||||
timelineInput.listeners.add(this)
|
||||
BACKGROUND_HANDLER.post {
|
||||
eventDecryptor.start()
|
||||
val realm = Realm.getInstance(realmConfiguration)
|
||||
backgroundRealm.set(realm)
|
||||
|
||||
val roomEntity = RoomEntity.where(realm, roomId = roomId).findFirst()
|
||||
?: throw IllegalStateException("Can't open a timeline without a room")
|
||||
|
||||
// We don't want to filter here because some sending events that are not displayed
|
||||
// are still used for ui echo (relation like reaction)
|
||||
sendingEvents = roomEntity.sendingTimelineEvents.where()/*.filterEventsWithSettings()*/.findAll()
|
||||
sendingEvents.addChangeListener { events ->
|
||||
uiEchoManager.onSentEventsInDatabase(events.map { it.eventId })
|
||||
postSnapshot()
|
||||
}
|
||||
|
||||
timelineEvents = buildEventQuery(realm).sort(TimelineEventEntityFields.DISPLAY_INDEX, Sort.DESCENDING).findAll()
|
||||
timelineEvents.addChangeListener(eventsChangeListener)
|
||||
handleInitialLoad()
|
||||
loadRoomMembersTask
|
||||
.configureWith(LoadRoomMembersTask.Params(roomId))
|
||||
.executeBy(taskExecutor)
|
||||
|
||||
// Ensure ReadReceipt from init sync are loaded
|
||||
ensureReadReceiptAreLoaded(realm)
|
||||
|
||||
isReady.set(true)
|
||||
private suspend fun updateState(direction: Timeline.Direction, update: (Timeline.PaginationState) -> Timeline.PaginationState) {
|
||||
val stateReference = when (direction) {
|
||||
Timeline.Direction.FORWARDS -> forwardState
|
||||
Timeline.Direction.BACKWARDS -> backwardState
|
||||
}
|
||||
val currentValue = stateReference.get()
|
||||
val newValue = update(currentValue)
|
||||
stateReference.set(newValue)
|
||||
withContext(Dispatchers.Main) {
|
||||
listeners.forEach {
|
||||
tryOrNull { it.onStateUpdated() }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun buildStrategy(mode: LoadTimelineStrategy.Mode): LoadTimelineStrategy {
|
||||
return LoadTimelineStrategy(
|
||||
roomId = roomId,
|
||||
timelineId = timelineID,
|
||||
mode = mode,
|
||||
dependencies = strategyDependencies
|
||||
)
|
||||
}
|
||||
|
||||
private suspend fun loadRoomMemberIfNeeded() {
|
||||
val loadRoomMembersParam = LoadRoomMembersTask.Params(roomId)
|
||||
try {
|
||||
loadRoomMembersTask.execute(loadRoomMembersParam)
|
||||
} catch (failure: Throwable) {
|
||||
Timber.v("Failed to load room members. Retry in 10s.")
|
||||
delay(10_000L)
|
||||
loadRoomMemberIfNeeded()
|
||||
}
|
||||
}
|
||||
|
||||
private fun ensureReadReceiptAreLoaded(realm: Realm) {
|
||||
readReceiptHandler.getContentFromInitSync(roomId)
|
||||
?.also {
|
||||
|
@ -190,542 +279,4 @@ internal class DefaultTimeline(
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun dispose() {
|
||||
if (isStarted.compareAndSet(true, false)) {
|
||||
isReady.set(false)
|
||||
timelineInput.listeners.remove(this)
|
||||
Timber.v("Dispose timeline for roomId: $roomId and eventId: $initialEventId")
|
||||
cancelableBag.cancel()
|
||||
BACKGROUND_HANDLER.removeCallbacksAndMessages(null)
|
||||
BACKGROUND_HANDLER.post {
|
||||
if (this::sendingEvents.isInitialized) {
|
||||
sendingEvents.removeAllChangeListeners()
|
||||
}
|
||||
if (this::timelineEvents.isInitialized) {
|
||||
timelineEvents.removeAllChangeListeners()
|
||||
}
|
||||
clearAllValues()
|
||||
backgroundRealm.getAndSet(null).also {
|
||||
it?.close()
|
||||
}
|
||||
eventDecryptor.destroy()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun restartWithEventId(eventId: String?) {
|
||||
dispose()
|
||||
initialEventId = eventId
|
||||
start()
|
||||
postSnapshot()
|
||||
}
|
||||
|
||||
override fun getTimelineEventAtIndex(index: Int): TimelineEvent? {
|
||||
return builtEvents.getOrNull(index)
|
||||
}
|
||||
|
||||
override fun getIndexOfEvent(eventId: String?): Int? {
|
||||
return builtEventsIdMap[eventId]
|
||||
}
|
||||
|
||||
override fun getTimelineEventWithId(eventId: String?): TimelineEvent? {
|
||||
return builtEventsIdMap[eventId]?.let {
|
||||
getTimelineEventAtIndex(it)
|
||||
}
|
||||
}
|
||||
|
||||
override fun hasMoreToLoad(direction: Timeline.Direction): Boolean {
|
||||
return hasMoreInCache(direction) || !hasReachedEnd(direction)
|
||||
}
|
||||
|
||||
override fun addListener(listener: Timeline.Listener): Boolean {
|
||||
if (listeners.contains(listener)) {
|
||||
return false
|
||||
}
|
||||
return listeners.add(listener).also {
|
||||
postSnapshot()
|
||||
}
|
||||
}
|
||||
|
||||
override fun removeListener(listener: Timeline.Listener): Boolean {
|
||||
return listeners.remove(listener)
|
||||
}
|
||||
|
||||
override fun removeAllListeners() {
|
||||
listeners.clear()
|
||||
}
|
||||
|
||||
override fun onNewTimelineEvents(roomId: String, eventIds: List<String>) {
|
||||
if (isLive && this.roomId == roomId) {
|
||||
listeners.forEach {
|
||||
it.onNewTimelineEvents(eventIds)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun onLocalEchoCreated(roomId: String, timelineEvent: TimelineEvent) {
|
||||
if (roomId != this.roomId || !isLive) return
|
||||
uiEchoManager.onLocalEchoCreated(timelineEvent)
|
||||
listeners.forEach {
|
||||
tryOrNull {
|
||||
it.onNewTimelineEvents(listOf(timelineEvent.eventId))
|
||||
}
|
||||
}
|
||||
postSnapshot()
|
||||
}
|
||||
|
||||
override fun onLocalEchoUpdated(roomId: String, eventId: String, sendState: SendState) {
|
||||
if (roomId != this.roomId || !isLive) return
|
||||
if (uiEchoManager.onSendStateUpdated(eventId, sendState)) {
|
||||
postSnapshot()
|
||||
}
|
||||
}
|
||||
|
||||
override fun rebuildEvent(eventId: String, builder: (TimelineEvent) -> TimelineEvent?): Boolean {
|
||||
return tryOrNull {
|
||||
builtEventsIdMap[eventId]?.let { builtIndex ->
|
||||
// Update the relation of existing event
|
||||
builtEvents[builtIndex]?.let { te ->
|
||||
val rebuiltEvent = builder(te)
|
||||
// If rebuilt event is filtered its returned as null and should be removed.
|
||||
if (rebuiltEvent == null) {
|
||||
builtEventsIdMap.remove(eventId)
|
||||
builtEventsIdMap.entries.filter { it.value > builtIndex }.forEach { it.setValue(it.value - 1) }
|
||||
builtEvents.removeAt(builtIndex)
|
||||
} else {
|
||||
builtEvents[builtIndex] = rebuiltEvent
|
||||
}
|
||||
true
|
||||
}
|
||||
}
|
||||
} ?: false
|
||||
}
|
||||
|
||||
// Private methods *****************************************************************************
|
||||
|
||||
private fun hasMoreInCache(direction: Timeline.Direction) = getState(direction).hasMoreInCache
|
||||
|
||||
private fun hasReachedEnd(direction: Timeline.Direction) = getState(direction).hasReachedEnd
|
||||
|
||||
private fun updateLoadingStates(results: RealmResults<TimelineEventEntity>) {
|
||||
val lastCacheEvent = results.lastOrNull()
|
||||
val firstCacheEvent = results.firstOrNull()
|
||||
val chunkEntity = getLiveChunk()
|
||||
|
||||
updateState(Timeline.Direction.FORWARDS) {
|
||||
it.copy(
|
||||
hasMoreInCache = !builtEventsIdMap.containsKey(firstCacheEvent?.eventId),
|
||||
hasReachedEnd = chunkEntity?.isLastForward ?: false
|
||||
)
|
||||
}
|
||||
updateState(Timeline.Direction.BACKWARDS) {
|
||||
it.copy(
|
||||
hasMoreInCache = !builtEventsIdMap.containsKey(lastCacheEvent?.eventId),
|
||||
hasReachedEnd = chunkEntity?.isLastBackward ?: false || lastCacheEvent?.root?.type == EventType.STATE_ROOM_CREATE
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This has to be called on TimelineThread as it accesses realm live results
|
||||
* @return true if createSnapshot should be posted
|
||||
*/
|
||||
private fun paginateInternal(startDisplayIndex: Int?,
|
||||
direction: Timeline.Direction,
|
||||
count: Int): Boolean {
|
||||
if (count == 0) {
|
||||
return false
|
||||
}
|
||||
updateState(direction) { it.copy(requestedPaginationCount = count, isPaginating = true) }
|
||||
val builtCount = buildTimelineEvents(startDisplayIndex, direction, count.toLong())
|
||||
val shouldFetchMore = builtCount < count && !hasReachedEnd(direction)
|
||||
if (shouldFetchMore) {
|
||||
val newRequestedCount = count - builtCount
|
||||
updateState(direction) { it.copy(requestedPaginationCount = newRequestedCount) }
|
||||
val fetchingCount = max(MIN_FETCHING_COUNT, newRequestedCount)
|
||||
executePaginationTask(direction, fetchingCount)
|
||||
} else {
|
||||
updateState(direction) { it.copy(isPaginating = false, requestedPaginationCount = 0) }
|
||||
}
|
||||
return !shouldFetchMore
|
||||
}
|
||||
|
||||
private fun createSnapshot(): List<TimelineEvent> {
|
||||
return buildSendingEvents() + builtEvents.toList()
|
||||
}
|
||||
|
||||
private fun buildSendingEvents(): List<TimelineEvent> {
|
||||
val builtSendingEvents = mutableListOf<TimelineEvent>()
|
||||
if (hasReachedEnd(Timeline.Direction.FORWARDS) && !hasMoreInCache(Timeline.Direction.FORWARDS)) {
|
||||
uiEchoManager.getInMemorySendingEvents()
|
||||
.updateWithUiEchoInto(builtSendingEvents)
|
||||
sendingEvents
|
||||
.filter { timelineEvent ->
|
||||
builtSendingEvents.none { it.eventId == timelineEvent.eventId }
|
||||
}
|
||||
.map { timelineEventMapper.map(it) }
|
||||
.updateWithUiEchoInto(builtSendingEvents)
|
||||
}
|
||||
return builtSendingEvents
|
||||
}
|
||||
|
||||
private fun List<TimelineEvent>.updateWithUiEchoInto(target: MutableList<TimelineEvent>) {
|
||||
target.addAll(
|
||||
// Get most up to date send state (in memory)
|
||||
map { uiEchoManager.updateSentStateWithUiEcho(it) }
|
||||
)
|
||||
}
|
||||
|
||||
private fun canPaginate(direction: Timeline.Direction): Boolean {
|
||||
return isReady.get() && !getState(direction).isPaginating && hasMoreToLoad(direction)
|
||||
}
|
||||
|
||||
private fun getState(direction: Timeline.Direction): TimelineState {
|
||||
return when (direction) {
|
||||
Timeline.Direction.FORWARDS -> forwardsState.get()
|
||||
Timeline.Direction.BACKWARDS -> backwardsState.get()
|
||||
}
|
||||
}
|
||||
|
||||
private fun updateState(direction: Timeline.Direction, update: (TimelineState) -> TimelineState) {
|
||||
val stateReference = when (direction) {
|
||||
Timeline.Direction.FORWARDS -> forwardsState
|
||||
Timeline.Direction.BACKWARDS -> backwardsState
|
||||
}
|
||||
val currentValue = stateReference.get()
|
||||
val newValue = update(currentValue)
|
||||
stateReference.set(newValue)
|
||||
}
|
||||
|
||||
/**
|
||||
* This has to be called on TimelineThread as it accesses realm live results
|
||||
*/
|
||||
private fun handleInitialLoad() {
|
||||
var shouldFetchInitialEvent = false
|
||||
val currentInitialEventId = initialEventId
|
||||
val initialDisplayIndex = if (currentInitialEventId == null) {
|
||||
timelineEvents.firstOrNull()?.displayIndex
|
||||
} else {
|
||||
val initialEvent = timelineEvents.where()
|
||||
.equalTo(TimelineEventEntityFields.EVENT_ID, initialEventId)
|
||||
.findFirst()
|
||||
|
||||
shouldFetchInitialEvent = initialEvent == null
|
||||
initialEvent?.displayIndex
|
||||
}
|
||||
prevDisplayIndex = initialDisplayIndex
|
||||
nextDisplayIndex = initialDisplayIndex
|
||||
if (currentInitialEventId != null && shouldFetchInitialEvent) {
|
||||
fetchEvent(currentInitialEventId)
|
||||
} else {
|
||||
val count = timelineEvents.size.coerceAtMost(settings.initialSize)
|
||||
if (initialEventId == null) {
|
||||
paginateInternal(initialDisplayIndex, Timeline.Direction.BACKWARDS, count)
|
||||
} else {
|
||||
paginateInternal(initialDisplayIndex, Timeline.Direction.FORWARDS, (count / 2).coerceAtLeast(1))
|
||||
paginateInternal(initialDisplayIndex?.minus(1), Timeline.Direction.BACKWARDS, (count / 2).coerceAtLeast(1))
|
||||
}
|
||||
}
|
||||
postSnapshot()
|
||||
}
|
||||
|
||||
/**
|
||||
* This has to be called on TimelineThread as it accesses realm live results
|
||||
*/
|
||||
private fun handleUpdates(results: RealmResults<TimelineEventEntity>, changeSet: OrderedCollectionChangeSet) {
|
||||
// If changeSet has deletion we are having a gap, so we clear everything
|
||||
if (changeSet.deletionRanges.isNotEmpty()) {
|
||||
clearAllValues()
|
||||
}
|
||||
var postSnapshot = false
|
||||
changeSet.insertionRanges.forEach { range ->
|
||||
val (startDisplayIndex, direction) = if (range.startIndex == 0) {
|
||||
Pair(results[range.length - 1]!!.displayIndex, Timeline.Direction.FORWARDS)
|
||||
} else {
|
||||
Pair(results[range.startIndex]!!.displayIndex, Timeline.Direction.BACKWARDS)
|
||||
}
|
||||
val state = getState(direction)
|
||||
if (state.isPaginating) {
|
||||
// We are getting new items from pagination
|
||||
postSnapshot = paginateInternal(startDisplayIndex, direction, state.requestedPaginationCount)
|
||||
} else {
|
||||
// We are getting new items from sync
|
||||
buildTimelineEvents(startDisplayIndex, direction, range.length.toLong())
|
||||
postSnapshot = true
|
||||
}
|
||||
}
|
||||
changeSet.changes.forEach { index ->
|
||||
val eventEntity = results[index]
|
||||
eventEntity?.eventId?.let { eventId ->
|
||||
postSnapshot = rebuildEvent(eventId) {
|
||||
buildTimelineEvent(eventEntity)
|
||||
} || postSnapshot
|
||||
}
|
||||
}
|
||||
if (postSnapshot) {
|
||||
postSnapshot()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This has to be called on TimelineThread as it accesses realm live results
|
||||
*/
|
||||
private fun executePaginationTask(direction: Timeline.Direction, limit: Int) {
|
||||
val currentChunk = getLiveChunk()
|
||||
val token = if (direction == Timeline.Direction.BACKWARDS) currentChunk?.prevToken else currentChunk?.nextToken
|
||||
if (token == null) {
|
||||
if (direction == Timeline.Direction.BACKWARDS
|
||||
|| (direction == Timeline.Direction.FORWARDS && currentChunk?.hasBeenALastForwardChunk().orFalse())) {
|
||||
// We are in the case where event exists, but we do not know the token.
|
||||
// Fetch (again) the last event to get a token
|
||||
val lastKnownEventId = if (direction == Timeline.Direction.FORWARDS) {
|
||||
timelineEvents.firstOrNull()?.eventId
|
||||
} else {
|
||||
timelineEvents.lastOrNull()?.eventId
|
||||
}
|
||||
if (lastKnownEventId == null) {
|
||||
updateState(direction) { it.copy(isPaginating = false, requestedPaginationCount = 0) }
|
||||
} else {
|
||||
val params = FetchTokenAndPaginateTask.Params(
|
||||
roomId = roomId,
|
||||
limit = limit,
|
||||
direction = direction.toPaginationDirection(),
|
||||
lastKnownEventId = lastKnownEventId
|
||||
)
|
||||
cancelableBag += fetchTokenAndPaginateTask
|
||||
.configureWith(params) {
|
||||
this.callback = createPaginationCallback(limit, direction)
|
||||
}
|
||||
.executeBy(taskExecutor)
|
||||
}
|
||||
} else {
|
||||
updateState(direction) { it.copy(isPaginating = false, requestedPaginationCount = 0) }
|
||||
}
|
||||
} else {
|
||||
val params = PaginationTask.Params(
|
||||
roomId = roomId,
|
||||
from = token,
|
||||
direction = direction.toPaginationDirection(),
|
||||
limit = limit
|
||||
)
|
||||
Timber.v("Should fetch $limit items $direction")
|
||||
cancelableBag += paginationTask
|
||||
.configureWith(params) {
|
||||
this.callback = createPaginationCallback(limit, direction)
|
||||
}
|
||||
.executeBy(taskExecutor)
|
||||
}
|
||||
}
|
||||
|
||||
// For debug purpose only
|
||||
private fun dumpAndLogChunks() {
|
||||
val liveChunk = getLiveChunk()
|
||||
Timber.w("Live chunk: $liveChunk")
|
||||
|
||||
Realm.getInstance(realmConfiguration).use { realm ->
|
||||
ChunkEntity.where(realm, roomId).findAll()
|
||||
.also { Timber.w("Found ${it.size} chunks") }
|
||||
.forEach {
|
||||
Timber.w("")
|
||||
Timber.w("ChunkEntity: $it")
|
||||
Timber.w("prevToken: ${it.prevToken}")
|
||||
Timber.w("nextToken: ${it.nextToken}")
|
||||
Timber.w("isLastBackward: ${it.isLastBackward}")
|
||||
Timber.w("isLastForward: ${it.isLastForward}")
|
||||
it.timelineEvents.forEach { tle ->
|
||||
Timber.w(" TLE: ${tle.root?.content}")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This has to be called on TimelineThread as it accesses realm live results
|
||||
*/
|
||||
private fun getTokenLive(direction: Timeline.Direction): String? {
|
||||
val chunkEntity = getLiveChunk() ?: return null
|
||||
return if (direction == Timeline.Direction.BACKWARDS) chunkEntity.prevToken else chunkEntity.nextToken
|
||||
}
|
||||
|
||||
/**
|
||||
* This has to be called on TimelineThread as it accesses realm live results
|
||||
* Return the current Chunk
|
||||
*/
|
||||
private fun getLiveChunk(): ChunkEntity? {
|
||||
return timelineEvents.firstOrNull()?.chunk?.firstOrNull()
|
||||
}
|
||||
|
||||
/**
|
||||
* This has to be called on TimelineThread as it accesses realm live results
|
||||
* @return the number of items who have been added
|
||||
*/
|
||||
private fun buildTimelineEvents(startDisplayIndex: Int?,
|
||||
direction: Timeline.Direction,
|
||||
count: Long): Int {
|
||||
if (count < 1 || startDisplayIndex == null) {
|
||||
return 0
|
||||
}
|
||||
val start = System.currentTimeMillis()
|
||||
val offsetResults = getOffsetResults(startDisplayIndex, direction, count)
|
||||
if (offsetResults.isEmpty()) {
|
||||
return 0
|
||||
}
|
||||
val offsetIndex = offsetResults.last()!!.displayIndex
|
||||
if (direction == Timeline.Direction.BACKWARDS) {
|
||||
prevDisplayIndex = offsetIndex - 1
|
||||
} else {
|
||||
nextDisplayIndex = offsetIndex + 1
|
||||
}
|
||||
offsetResults.forEach { eventEntity ->
|
||||
|
||||
val timelineEvent = buildTimelineEvent(eventEntity)
|
||||
val transactionId = timelineEvent.root.unsignedData?.transactionId
|
||||
uiEchoManager.onSyncedEvent(transactionId)
|
||||
|
||||
if (timelineEvent.isEncrypted()
|
||||
&& timelineEvent.root.mxDecryptionResult == null) {
|
||||
timelineEvent.root.eventId?.also { eventDecryptor.requestDecryption(TimelineEventDecryptor.DecryptionRequest(timelineEvent.root, timelineID)) }
|
||||
}
|
||||
|
||||
val position = if (direction == Timeline.Direction.FORWARDS) 0 else builtEvents.size
|
||||
builtEvents.add(position, timelineEvent)
|
||||
// Need to shift :/
|
||||
builtEventsIdMap.entries.filter { it.value >= position }.forEach { it.setValue(it.value + 1) }
|
||||
builtEventsIdMap[eventEntity.eventId] = position
|
||||
}
|
||||
val time = System.currentTimeMillis() - start
|
||||
Timber.v("Built ${offsetResults.size} items from db in $time ms")
|
||||
// For the case where wo reach the lastForward chunk
|
||||
updateLoadingStates(timelineEvents)
|
||||
return offsetResults.size
|
||||
}
|
||||
|
||||
private fun buildTimelineEvent(eventEntity: TimelineEventEntity): TimelineEvent {
|
||||
return timelineEventMapper.map(
|
||||
timelineEventEntity = eventEntity,
|
||||
buildReadReceipts = settings.buildReadReceipts
|
||||
).let { timelineEvent ->
|
||||
// eventually enhance with ui echo?
|
||||
uiEchoManager.decorateEventWithReactionUiEcho(timelineEvent) ?: timelineEvent
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This has to be called on TimelineThread as it accesses realm live results
|
||||
*/
|
||||
private fun getOffsetResults(startDisplayIndex: Int,
|
||||
direction: Timeline.Direction,
|
||||
count: Long): RealmResults<TimelineEventEntity> {
|
||||
val offsetQuery = timelineEvents.where()
|
||||
if (direction == Timeline.Direction.BACKWARDS) {
|
||||
offsetQuery
|
||||
.sort(TimelineEventEntityFields.DISPLAY_INDEX, Sort.DESCENDING)
|
||||
.lessThanOrEqualTo(TimelineEventEntityFields.DISPLAY_INDEX, startDisplayIndex)
|
||||
} else {
|
||||
offsetQuery
|
||||
.sort(TimelineEventEntityFields.DISPLAY_INDEX, Sort.ASCENDING)
|
||||
.greaterThanOrEqualTo(TimelineEventEntityFields.DISPLAY_INDEX, startDisplayIndex)
|
||||
}
|
||||
return offsetQuery
|
||||
.limit(count)
|
||||
.findAll()
|
||||
}
|
||||
|
||||
private fun buildEventQuery(realm: Realm): RealmQuery<TimelineEventEntity> {
|
||||
return if (initialEventId == null) {
|
||||
TimelineEventEntity
|
||||
.whereRoomId(realm, roomId = roomId)
|
||||
.equalTo(TimelineEventEntityFields.CHUNK.IS_LAST_FORWARD, true)
|
||||
} else {
|
||||
TimelineEventEntity
|
||||
.whereRoomId(realm, roomId = roomId)
|
||||
.`in`("${TimelineEventEntityFields.CHUNK.TIMELINE_EVENTS}.${TimelineEventEntityFields.EVENT_ID}", arrayOf(initialEventId))
|
||||
}
|
||||
}
|
||||
|
||||
private fun fetchEvent(eventId: String) {
|
||||
val params = GetContextOfEventTask.Params(roomId, eventId)
|
||||
cancelableBag += contextOfEventTask.configureWith(params) {
|
||||
callback = object : MatrixCallback<TokenChunkEventPersistor.Result> {
|
||||
override fun onSuccess(data: TokenChunkEventPersistor.Result) {
|
||||
postSnapshot()
|
||||
}
|
||||
|
||||
override fun onFailure(failure: Throwable) {
|
||||
postFailure(failure)
|
||||
}
|
||||
}
|
||||
}
|
||||
.executeBy(taskExecutor)
|
||||
}
|
||||
|
||||
private fun postSnapshot() {
|
||||
BACKGROUND_HANDLER.post {
|
||||
if (isReady.get().not()) {
|
||||
return@post
|
||||
}
|
||||
updateLoadingStates(timelineEvents)
|
||||
val snapshot = createSnapshot()
|
||||
val runnable = Runnable {
|
||||
listeners.forEach {
|
||||
it.onTimelineUpdated(snapshot)
|
||||
}
|
||||
}
|
||||
debouncer.debounce("post_snapshot", runnable, 1)
|
||||
}
|
||||
}
|
||||
|
||||
private fun postFailure(throwable: Throwable) {
|
||||
if (isReady.get().not()) {
|
||||
return
|
||||
}
|
||||
val runnable = Runnable {
|
||||
listeners.forEach {
|
||||
it.onTimelineFailure(throwable)
|
||||
}
|
||||
}
|
||||
mainHandler.post(runnable)
|
||||
}
|
||||
|
||||
private fun clearAllValues() {
|
||||
prevDisplayIndex = null
|
||||
nextDisplayIndex = null
|
||||
builtEvents.clear()
|
||||
builtEventsIdMap.clear()
|
||||
backwardsState.set(TimelineState())
|
||||
forwardsState.set(TimelineState())
|
||||
}
|
||||
|
||||
private fun createPaginationCallback(limit: Int, direction: Timeline.Direction): MatrixCallback<TokenChunkEventPersistor.Result> {
|
||||
return object : MatrixCallback<TokenChunkEventPersistor.Result> {
|
||||
override fun onSuccess(data: TokenChunkEventPersistor.Result) {
|
||||
when (data) {
|
||||
TokenChunkEventPersistor.Result.SUCCESS -> {
|
||||
Timber.v("Success fetching $limit items $direction from pagination request")
|
||||
}
|
||||
TokenChunkEventPersistor.Result.REACHED_END -> {
|
||||
postSnapshot()
|
||||
}
|
||||
TokenChunkEventPersistor.Result.SHOULD_FETCH_MORE ->
|
||||
// Database won't be updated, so we force pagination request
|
||||
BACKGROUND_HANDLER.post {
|
||||
executePaginationTask(direction, limit)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun onFailure(failure: Throwable) {
|
||||
updateState(direction) { it.copy(isPaginating = false, requestedPaginationCount = 0) }
|
||||
postSnapshot()
|
||||
Timber.v("Failure fetching $limit items $direction from pagination request")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Extension methods ***************************************************************************
|
||||
|
||||
private fun Timeline.Direction.toPaginationDirection(): PaginationDirection {
|
||||
return if (this == Timeline.Direction.BACKWARDS) PaginationDirection.BACKWARDS else PaginationDirection.FORWARDS
|
||||
}
|
||||
}
|
||||
|
|
|
@ -65,17 +65,14 @@ internal class DefaultTimelineService @AssistedInject constructor(
|
|||
roomId = roomId,
|
||||
initialEventId = eventId,
|
||||
realmConfiguration = monarchy.realmConfiguration,
|
||||
taskExecutor = taskExecutor,
|
||||
contextOfEventTask = contextOfEventTask,
|
||||
paginationTask = paginationTask,
|
||||
timelineEventMapper = timelineEventMapper,
|
||||
settings = settings,
|
||||
timelineInput = timelineInput,
|
||||
eventDecryptor = eventDecryptor,
|
||||
fetchTokenAndPaginateTask = fetchTokenAndPaginateTask,
|
||||
realmSessionProvider = realmSessionProvider,
|
||||
loadRoomMembersTask = loadRoomMembersTask,
|
||||
readReceiptHandler = readReceiptHandler
|
||||
readReceiptHandler = readReceiptHandler,
|
||||
getEventTask = contextOfEventTask
|
||||
)
|
||||
}
|
||||
|
||||
|
|
|
@ -1,5 +1,5 @@
|
|||
/*
|
||||
* Copyright (c) 2021 The Matrix.org Foundation C.I.C.
|
||||
* Copyright (c) 2021 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
|
@ -16,9 +16,8 @@
|
|||
|
||||
package org.matrix.android.sdk.internal.session.room.timeline
|
||||
|
||||
internal data class TimelineState(
|
||||
val hasReachedEnd: Boolean = false,
|
||||
val hasMoreInCache: Boolean = true,
|
||||
val isPaginating: Boolean = false,
|
||||
val requestedPaginationCount: Int = 0
|
||||
)
|
||||
internal enum class LoadMoreResult {
|
||||
REACHED_END,
|
||||
SUCCESS,
|
||||
FAILURE
|
||||
}
|
|
@ -0,0 +1,198 @@
|
|||
/*
|
||||
* Copyright (c) 2021 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.matrix.android.sdk.internal.session.room.timeline
|
||||
|
||||
import io.realm.OrderedCollectionChangeSet
|
||||
import io.realm.OrderedRealmCollectionChangeListener
|
||||
import io.realm.Realm
|
||||
import io.realm.RealmResults
|
||||
import org.matrix.android.sdk.api.extensions.orFalse
|
||||
import org.matrix.android.sdk.api.session.room.send.SendState
|
||||
import org.matrix.android.sdk.api.session.room.timeline.Timeline
|
||||
import org.matrix.android.sdk.api.session.room.timeline.TimelineEvent
|
||||
import org.matrix.android.sdk.api.session.room.timeline.TimelineSettings
|
||||
import org.matrix.android.sdk.internal.database.mapper.TimelineEventMapper
|
||||
import org.matrix.android.sdk.internal.database.model.ChunkEntity
|
||||
import org.matrix.android.sdk.internal.database.model.ChunkEntityFields
|
||||
import org.matrix.android.sdk.internal.database.query.findAllIncludingEvents
|
||||
import org.matrix.android.sdk.internal.database.query.where
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
|
||||
internal class LoadTimelineStrategy(
|
||||
private val roomId: String,
|
||||
private val timelineId: String,
|
||||
private val mode: Mode,
|
||||
private val dependencies: Dependencies) {
|
||||
|
||||
sealed class Mode {
|
||||
object Default : Mode()
|
||||
data class Permalink(val originEventId: String) : Mode()
|
||||
|
||||
fun originEventId(): String? {
|
||||
return if (this is Permalink) {
|
||||
originEventId
|
||||
} else {
|
||||
null
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
data class Dependencies(
|
||||
val realm: AtomicReference<Realm>,
|
||||
val eventDecryptor: TimelineEventDecryptor,
|
||||
val paginationTask: PaginationTask,
|
||||
val fetchTokenAndPaginateTask: FetchTokenAndPaginateTask,
|
||||
val getContextOfEventTask: GetContextOfEventTask,
|
||||
val timelineInput: TimelineInput,
|
||||
val timelineEventMapper: TimelineEventMapper,
|
||||
val onEventsUpdated: () -> Unit
|
||||
)
|
||||
|
||||
private var chunkEntity: RealmResults<ChunkEntity>? = null
|
||||
private var timelineChunk: TimelineChunk? = null
|
||||
|
||||
private val chunkEntityListener = OrderedRealmCollectionChangeListener { _: RealmResults<ChunkEntity>, changeSet: OrderedCollectionChangeSet ->
|
||||
val shouldRebuildChunk = changeSet.insertions.isNotEmpty()
|
||||
if (shouldRebuildChunk) {
|
||||
timelineChunk?.close(closeNext = true, closePrev = true)
|
||||
timelineChunk = chunkEntity?.createTimelineChunk()
|
||||
dependencies.onEventsUpdated()
|
||||
}
|
||||
}
|
||||
|
||||
private val uiEchoManagerListener = object : UIEchoManager.Listener {
|
||||
override fun rebuildEvent(eventId: String, builder: (TimelineEvent) -> TimelineEvent?): Boolean {
|
||||
return timelineChunk?.rebuildEvent(eventId, builder, searchInNext = true, searchInPrev = true).orFalse()
|
||||
}
|
||||
}
|
||||
|
||||
private val timelineInputListener = object : TimelineInput.Listener {
|
||||
override fun onLocalEchoCreated(roomId: String, timelineEvent: TimelineEvent) {
|
||||
if (roomId != this@LoadTimelineStrategy.roomId) {
|
||||
return
|
||||
}
|
||||
if (uiEchoManager.onLocalEchoCreated(timelineEvent)) {
|
||||
dependencies.onEventsUpdated()
|
||||
}
|
||||
}
|
||||
|
||||
override fun onLocalEchoUpdated(roomId: String, eventId: String, sendState: SendState) {
|
||||
if (roomId != this@LoadTimelineStrategy.roomId) {
|
||||
return
|
||||
}
|
||||
if (uiEchoManager.onSendStateUpdated(eventId, sendState)) {
|
||||
dependencies.onEventsUpdated()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private val uiEchoManager = UIEchoManager(TimelineSettings(10), uiEchoManagerListener)
|
||||
private val sendingEventsDataSource: SendingEventsDataSource = RealmSendingEventsDataSource(
|
||||
roomId = roomId,
|
||||
realm = dependencies.realm,
|
||||
uiEchoManager = uiEchoManager,
|
||||
timelineEventMapper = dependencies.timelineEventMapper,
|
||||
onEventsUpdated = dependencies.onEventsUpdated
|
||||
)
|
||||
|
||||
suspend fun onStart() {
|
||||
dependencies.eventDecryptor.start()
|
||||
dependencies.timelineInput.listeners.add(timelineInputListener)
|
||||
val realm = dependencies.realm.get()
|
||||
sendingEventsDataSource.start()
|
||||
chunkEntity = getChunkEntity(realm).also {
|
||||
it.addChangeListener(chunkEntityListener)
|
||||
timelineChunk = it.createTimelineChunk()
|
||||
}
|
||||
if(mode is Mode.Default){
|
||||
loadMore(10, Timeline.Direction.BACKWARDS)
|
||||
}
|
||||
}
|
||||
|
||||
fun onStop() {
|
||||
dependencies.eventDecryptor.destroy()
|
||||
dependencies.timelineInput.listeners.remove(timelineInputListener)
|
||||
chunkEntity?.removeChangeListener(chunkEntityListener)
|
||||
sendingEventsDataSource.stop()
|
||||
timelineChunk?.close(closeNext = true, closePrev = true)
|
||||
chunkEntity = null
|
||||
timelineChunk = null
|
||||
}
|
||||
|
||||
suspend fun loadMore(count: Long, direction: Timeline.Direction): LoadMoreResult {
|
||||
return if (mode is Mode.Permalink && timelineChunk == null) {
|
||||
val params = GetContextOfEventTask.Params(roomId, mode.originEventId)
|
||||
try {
|
||||
dependencies.getContextOfEventTask.execute(params)
|
||||
LoadMoreResult.SUCCESS
|
||||
} catch (failure: Throwable) {
|
||||
LoadMoreResult.FAILURE
|
||||
}
|
||||
} else {
|
||||
timelineChunk?.loadMore(count, direction) ?: LoadMoreResult.FAILURE
|
||||
}
|
||||
}
|
||||
|
||||
fun getBuiltEventIndex(eventId: String): Int? {
|
||||
return timelineChunk?.getBuiltEventIndex(eventId, searchInNext = true, searchInPrev = true)
|
||||
}
|
||||
|
||||
fun getBuiltEvent(eventId: String): TimelineEvent? {
|
||||
return timelineChunk?.getBuiltEvent(eventId, searchInNext = true, searchInPrev = true)
|
||||
}
|
||||
|
||||
fun buildSnapshot(): List<TimelineEvent> {
|
||||
return buildSendingEvents() + timelineChunk?.builtItems(includesNext = true, includesPrev = true).orEmpty()
|
||||
}
|
||||
|
||||
private fun buildSendingEvents(): List<TimelineEvent> {
|
||||
return if (timelineChunk?.hasReachedLastForward().orFalse()) {
|
||||
sendingEventsDataSource.buildSendingEvents()
|
||||
} else {
|
||||
emptyList()
|
||||
}
|
||||
}
|
||||
|
||||
private fun getChunkEntity(realm: Realm): RealmResults<ChunkEntity> {
|
||||
return if (mode is Mode.Permalink) {
|
||||
ChunkEntity.findAllIncludingEvents(realm, listOf(mode.originEventId))
|
||||
} else {
|
||||
ChunkEntity.where(realm, roomId)
|
||||
.equalTo(ChunkEntityFields.IS_LAST_FORWARD, true)
|
||||
.findAll()
|
||||
}
|
||||
}
|
||||
|
||||
private fun RealmResults<ChunkEntity>.createTimelineChunk(): TimelineChunk? {
|
||||
return firstOrNull()?.let {
|
||||
return TimelineChunk(
|
||||
chunkEntity = it,
|
||||
roomId = roomId,
|
||||
timelineId = timelineId,
|
||||
eventDecryptor = dependencies.eventDecryptor,
|
||||
paginationTask = dependencies.paginationTask,
|
||||
fetchTokenAndPaginateTask = dependencies.fetchTokenAndPaginateTask,
|
||||
timelineEventMapper = dependencies.timelineEventMapper,
|
||||
uiEchoManager = uiEchoManager,
|
||||
initialEventId = mode.originEventId(),
|
||||
onBuiltEvents = dependencies.onEventsUpdated
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -0,0 +1,84 @@
|
|||
/*
|
||||
* Copyright (c) 2021 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.matrix.android.sdk.internal.session.room.timeline
|
||||
|
||||
import io.realm.Realm
|
||||
import io.realm.RealmChangeListener
|
||||
import io.realm.RealmList
|
||||
import org.matrix.android.sdk.api.session.room.timeline.TimelineEvent
|
||||
import org.matrix.android.sdk.internal.database.mapper.TimelineEventMapper
|
||||
import org.matrix.android.sdk.internal.database.model.RoomEntity
|
||||
import org.matrix.android.sdk.internal.database.model.TimelineEventEntity
|
||||
import org.matrix.android.sdk.internal.database.query.where
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
|
||||
internal interface SendingEventsDataSource {
|
||||
fun start()
|
||||
fun stop()
|
||||
fun buildSendingEvents(): List<TimelineEvent>
|
||||
}
|
||||
|
||||
internal class RealmSendingEventsDataSource(
|
||||
private val roomId: String,
|
||||
private val realm: AtomicReference<Realm>,
|
||||
private val uiEchoManager: UIEchoManager,
|
||||
private val timelineEventMapper: TimelineEventMapper,
|
||||
private val onEventsUpdated: () -> Unit
|
||||
) : SendingEventsDataSource {
|
||||
|
||||
private var roomEntity: RoomEntity? = null
|
||||
private var sendingTimelineEvents: RealmList<TimelineEventEntity>? = null
|
||||
|
||||
private val sendingTimelineEventsListener = RealmChangeListener<RealmList<TimelineEventEntity>> { events ->
|
||||
uiEchoManager.onSentEventsInDatabase(events.map { it.eventId })
|
||||
onEventsUpdated()
|
||||
}
|
||||
|
||||
override fun start() {
|
||||
val safeRealm = realm.get()
|
||||
roomEntity = RoomEntity.where(safeRealm, roomId = roomId).findFirst()
|
||||
sendingTimelineEvents = roomEntity?.sendingTimelineEvents
|
||||
sendingTimelineEvents?.addChangeListener(sendingTimelineEventsListener)
|
||||
}
|
||||
|
||||
override fun stop() {
|
||||
sendingTimelineEvents?.removeChangeListener(sendingTimelineEventsListener)
|
||||
sendingTimelineEvents = null
|
||||
roomEntity = null
|
||||
}
|
||||
|
||||
override fun buildSendingEvents(): List<TimelineEvent> {
|
||||
val builtSendingEvents = mutableListOf<TimelineEvent>()
|
||||
uiEchoManager.getInMemorySendingEvents()
|
||||
.addWithUiEcho(builtSendingEvents)
|
||||
sendingTimelineEvents?.freeze()
|
||||
?.filter { timelineEvent ->
|
||||
builtSendingEvents.none { it.eventId == timelineEvent.eventId }
|
||||
}
|
||||
?.map {
|
||||
timelineEventMapper.map(it)
|
||||
}?.addWithUiEcho(builtSendingEvents)
|
||||
|
||||
return builtSendingEvents
|
||||
}
|
||||
|
||||
private fun List<TimelineEvent>.addWithUiEcho(target: MutableList<TimelineEvent>) {
|
||||
target.addAll(
|
||||
map { uiEchoManager.updateSentStateWithUiEcho(it) }
|
||||
)
|
||||
}
|
||||
}
|
|
@ -0,0 +1,362 @@
|
|||
/*
|
||||
* Copyright (c) 2021 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.matrix.android.sdk.internal.session.room.timeline
|
||||
|
||||
import io.realm.OrderedCollectionChangeSet
|
||||
import io.realm.OrderedRealmCollectionChangeListener
|
||||
import io.realm.RealmObjectChangeListener
|
||||
import io.realm.RealmQuery
|
||||
import io.realm.RealmResults
|
||||
import io.realm.Sort
|
||||
import org.matrix.android.sdk.api.extensions.orFalse
|
||||
import org.matrix.android.sdk.api.extensions.tryOrNull
|
||||
import org.matrix.android.sdk.api.session.room.timeline.Timeline
|
||||
import org.matrix.android.sdk.api.session.room.timeline.TimelineEvent
|
||||
import org.matrix.android.sdk.internal.database.mapper.TimelineEventMapper
|
||||
import org.matrix.android.sdk.internal.database.model.ChunkEntity
|
||||
import org.matrix.android.sdk.internal.database.model.TimelineEventEntity
|
||||
import org.matrix.android.sdk.internal.database.model.TimelineEventEntityFields
|
||||
import timber.log.Timber
|
||||
import java.util.Collections
|
||||
|
||||
/**
|
||||
* This is the value used to fetch on server. It's better to make constant as otherwise we can have weird chunks with disparate and small chunk of data.
|
||||
*/
|
||||
private const val PAGINATION_COUNT = 50
|
||||
|
||||
internal class TimelineChunk constructor(private val chunkEntity: ChunkEntity,
|
||||
private val roomId: String,
|
||||
private val timelineId: String,
|
||||
private val eventDecryptor: TimelineEventDecryptor,
|
||||
private val paginationTask: PaginationTask,
|
||||
private val fetchTokenAndPaginateTask: FetchTokenAndPaginateTask,
|
||||
private val timelineEventMapper: TimelineEventMapper,
|
||||
private val uiEchoManager: UIEchoManager? = null,
|
||||
private val initialEventId: String?,
|
||||
private val onBuiltEvents: () -> Unit) {
|
||||
|
||||
private val chunkObjectListener = RealmObjectChangeListener<ChunkEntity> { _, changeSet ->
|
||||
Timber.v("on chunk (${chunkEntity.identifier()}) changed: ${changeSet?.changedFields?.joinToString(",")}")
|
||||
}
|
||||
|
||||
private val timelineEventCollectionListener = OrderedRealmCollectionChangeListener { results: RealmResults<TimelineEventEntity>, changeSet: OrderedCollectionChangeSet ->
|
||||
val frozenResults = results.freeze()
|
||||
Timber.v("on timeline event changed: $changeSet")
|
||||
handleChangeSet(frozenResults, changeSet)
|
||||
}
|
||||
|
||||
private var timelineEventEntities: RealmResults<TimelineEventEntity> = chunkEntity.sortedTimelineEvents()
|
||||
private val builtEvents: MutableList<TimelineEvent> = Collections.synchronizedList(ArrayList())
|
||||
private val builtEventsIndexes: MutableMap<String, Int> = Collections.synchronizedMap(HashMap<String, Int>())
|
||||
|
||||
private var nextChunk: TimelineChunk? = null
|
||||
private var prevChunk: TimelineChunk? = null
|
||||
|
||||
init {
|
||||
timelineEventEntities.addChangeListener(timelineEventCollectionListener)
|
||||
chunkEntity.addChangeListener(chunkObjectListener)
|
||||
}
|
||||
|
||||
fun hasReachedLastForward(): Boolean {
|
||||
return if (chunkEntity.isLastForward) {
|
||||
true
|
||||
} else {
|
||||
nextChunk?.hasReachedLastForward().orFalse()
|
||||
}
|
||||
}
|
||||
|
||||
fun builtItems(includesNext: Boolean, includesPrev: Boolean): List<TimelineEvent> {
|
||||
val deepBuiltItems = ArrayList<TimelineEvent>(builtEvents.size)
|
||||
if (includesNext) {
|
||||
val nextEvents = nextChunk?.builtItems(includesNext = true, includesPrev = false).orEmpty()
|
||||
deepBuiltItems.addAll(nextEvents)
|
||||
}
|
||||
deepBuiltItems.addAll(builtEvents)
|
||||
if (includesPrev) {
|
||||
val prevEvents = prevChunk?.builtItems(includesNext = false, includesPrev = true).orEmpty()
|
||||
deepBuiltItems.addAll(prevEvents)
|
||||
}
|
||||
return deepBuiltItems
|
||||
}
|
||||
|
||||
suspend fun loadMore(count: Long, direction: Timeline.Direction): LoadMoreResult {
|
||||
val loadFromDbCount = loadFromDb(count, direction)
|
||||
val offsetCount = count - loadFromDbCount
|
||||
// We have built the right amount of data
|
||||
if (offsetCount == 0L) {
|
||||
onBuiltEvents()
|
||||
return LoadMoreResult.SUCCESS
|
||||
}
|
||||
return if (direction == Timeline.Direction.FORWARDS) {
|
||||
val nextChunkEntity = chunkEntity.nextChunk
|
||||
if (nextChunkEntity == null) {
|
||||
val token = chunkEntity.nextToken ?: return LoadMoreResult.REACHED_END // TODO handle previous live chunk
|
||||
try {
|
||||
fetchFromServer(token, direction)
|
||||
} catch (failure: Throwable) {
|
||||
Timber.v("Failed to fetch from server: $failure")
|
||||
LoadMoreResult.FAILURE
|
||||
}
|
||||
} else {
|
||||
// otherwise we delegate to the next chunk
|
||||
if (nextChunk == null) {
|
||||
nextChunk = createTimelineChunk(nextChunkEntity)
|
||||
}
|
||||
nextChunk?.loadMore(offsetCount, direction) ?: LoadMoreResult.FAILURE
|
||||
}
|
||||
} else {
|
||||
val prevChunkEntity = chunkEntity.prevChunk
|
||||
if (prevChunkEntity == null) {
|
||||
val token = chunkEntity.prevToken ?: return LoadMoreResult.REACHED_END
|
||||
try {
|
||||
fetchFromServer(token, direction)
|
||||
} catch (failure: Throwable) {
|
||||
Timber.v("Failed to fetch from server: $failure")
|
||||
LoadMoreResult.FAILURE
|
||||
}
|
||||
} else {
|
||||
// otherwise we delegate to the prev chunk
|
||||
if (prevChunk == null) {
|
||||
prevChunk = createTimelineChunk(prevChunkEntity)
|
||||
}
|
||||
prevChunk?.loadMore(offsetCount, direction) ?: LoadMoreResult.FAILURE
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun loadFromDb(count: Long, direction: Timeline.Direction): Long {
|
||||
val displayIndex = getNextDisplayIndex(direction) ?: return 0
|
||||
val baseQuery = timelineEventEntities.where()
|
||||
val timelineEvents = baseQuery.offsets(direction, count, displayIndex).findAll().orEmpty()
|
||||
if (timelineEvents.isEmpty()) return 0
|
||||
if (direction == Timeline.Direction.FORWARDS) {
|
||||
builtEventsIndexes.entries.forEach { it.setValue(it.value + timelineEvents.size) }
|
||||
}
|
||||
timelineEvents
|
||||
.mapIndexed { index, timelineEventEntity ->
|
||||
val timelineEvent = timelineEventEntity.buildAndDecryptIfNeeded()
|
||||
if (direction == Timeline.Direction.FORWARDS) {
|
||||
builtEventsIndexes[timelineEvent.eventId] = index
|
||||
builtEvents.add(index, timelineEvent)
|
||||
} else {
|
||||
builtEventsIndexes[timelineEvent.eventId] = builtEvents.size
|
||||
builtEvents.add(timelineEvent)
|
||||
}
|
||||
}
|
||||
return timelineEvents.size.toLong()
|
||||
}
|
||||
|
||||
private fun TimelineEventEntity.buildAndDecryptIfNeeded(): TimelineEvent {
|
||||
val timelineEvent = buildTimelineEvent(this)
|
||||
val transactionId = timelineEvent.root.unsignedData?.transactionId
|
||||
uiEchoManager?.onSyncedEvent(transactionId)
|
||||
if (timelineEvent.isEncrypted()
|
||||
&& timelineEvent.root.mxDecryptionResult == null) {
|
||||
timelineEvent.root.eventId?.also { eventDecryptor.requestDecryption(TimelineEventDecryptor.DecryptionRequest(timelineEvent.root, timelineId)) }
|
||||
}
|
||||
return timelineEvent
|
||||
}
|
||||
|
||||
private fun buildTimelineEvent(eventEntity: TimelineEventEntity) = timelineEventMapper.map(
|
||||
timelineEventEntity = eventEntity
|
||||
).let {
|
||||
// eventually enhance with ui echo?
|
||||
(uiEchoManager?.decorateEventWithReactionUiEcho(it) ?: it)
|
||||
}
|
||||
|
||||
private fun createTimelineChunk(chunkEntity: ChunkEntity): TimelineChunk {
|
||||
return TimelineChunk(
|
||||
chunkEntity = chunkEntity,
|
||||
timelineId = timelineId,
|
||||
eventDecryptor = eventDecryptor,
|
||||
roomId = roomId,
|
||||
paginationTask = paginationTask,
|
||||
fetchTokenAndPaginateTask = fetchTokenAndPaginateTask,
|
||||
timelineEventMapper = timelineEventMapper,
|
||||
uiEchoManager = uiEchoManager,
|
||||
initialEventId = null,
|
||||
onBuiltEvents = onBuiltEvents
|
||||
)
|
||||
}
|
||||
|
||||
private suspend fun fetchFromServer(token: String, direction: Timeline.Direction): LoadMoreResult {
|
||||
val paginationParams = PaginationTask.Params(roomId, token, direction.toPaginationDirection(), PAGINATION_COUNT)
|
||||
val paginationResult = paginationTask.execute(paginationParams)
|
||||
return when (paginationResult) {
|
||||
TokenChunkEventPersistor.Result.REACHED_END -> LoadMoreResult.REACHED_END
|
||||
TokenChunkEventPersistor.Result.SHOULD_FETCH_MORE,
|
||||
TokenChunkEventPersistor.Result.SUCCESS -> LoadMoreResult.SUCCESS
|
||||
}
|
||||
}
|
||||
|
||||
fun getBuiltEventIndex(eventId: String, searchInNext: Boolean, searchInPrev: Boolean): Int? {
|
||||
val builtEventIndex = builtEventsIndexes[eventId]
|
||||
if (builtEventIndex != null) {
|
||||
return getOffsetIndex() + builtEventIndex
|
||||
}
|
||||
if (searchInNext) {
|
||||
val nextBuiltEventIndex = nextChunk?.getBuiltEventIndex(eventId, searchInNext = true, searchInPrev = false)
|
||||
if (nextBuiltEventIndex != null) {
|
||||
return nextBuiltEventIndex
|
||||
}
|
||||
}
|
||||
if (searchInPrev) {
|
||||
val prevBuiltEventIndex = prevChunk?.getBuiltEventIndex(eventId, searchInNext = false, searchInPrev = true)
|
||||
if (prevBuiltEventIndex != null) {
|
||||
return prevBuiltEventIndex
|
||||
}
|
||||
}
|
||||
return null
|
||||
}
|
||||
|
||||
fun getBuiltEvent(eventId: String, searchInNext: Boolean, searchInPrev: Boolean): TimelineEvent? {
|
||||
val builtEventIndex = builtEventsIndexes[eventId]
|
||||
if (builtEventIndex != null) {
|
||||
return builtEvents.getOrNull(builtEventIndex)
|
||||
}
|
||||
if (searchInNext) {
|
||||
val nextBuiltEvent = nextChunk?.getBuiltEvent(eventId, searchInNext = true, searchInPrev = false)
|
||||
if (nextBuiltEvent != null) {
|
||||
return nextBuiltEvent
|
||||
}
|
||||
}
|
||||
if (searchInPrev) {
|
||||
val prevBuiltEvent = prevChunk?.getBuiltEvent(eventId, searchInNext = false, searchInPrev = true)
|
||||
if (prevBuiltEvent != null) {
|
||||
return prevBuiltEvent
|
||||
}
|
||||
}
|
||||
return null
|
||||
}
|
||||
|
||||
private fun getOffsetIndex(): Int {
|
||||
var offset = 0
|
||||
var currentNextChunk = nextChunk
|
||||
while (currentNextChunk != null) {
|
||||
offset += currentNextChunk.builtEvents.size
|
||||
currentNextChunk = currentNextChunk.nextChunk
|
||||
}
|
||||
return offset
|
||||
}
|
||||
|
||||
private fun handleChangeSet(frozenResults: RealmResults<TimelineEventEntity>, changeSet: OrderedCollectionChangeSet) {
|
||||
val insertions = changeSet.insertionRanges
|
||||
for (range in insertions) {
|
||||
val newItems = frozenResults
|
||||
.subList(range.startIndex, range.startIndex + range.length)
|
||||
.map { it.buildAndDecryptIfNeeded() }
|
||||
builtEventsIndexes.entries.filter { it.value >= range.startIndex }.forEach { it.setValue(it.value + range.length) }
|
||||
newItems.mapIndexed { index, timelineEvent ->
|
||||
val correctedIndex = range.startIndex + index
|
||||
builtEvents.add(correctedIndex, timelineEvent)
|
||||
builtEventsIndexes[timelineEvent.eventId] = correctedIndex
|
||||
}
|
||||
}
|
||||
val modifications = changeSet.changeRanges
|
||||
for (range in modifications) {
|
||||
for (modificationIndex in (range.startIndex until range.startIndex + range.length)) {
|
||||
val updatedEntity = frozenResults[modificationIndex] ?: continue
|
||||
try {
|
||||
builtEvents[modificationIndex] = updatedEntity.buildAndDecryptIfNeeded()
|
||||
} catch (failure: Throwable) {
|
||||
Timber.v("Fail to update items at index: $modificationIndex")
|
||||
}
|
||||
}
|
||||
}
|
||||
if (insertions.isNotEmpty() || modifications.isNotEmpty()) {
|
||||
onBuiltEvents()
|
||||
}
|
||||
}
|
||||
|
||||
fun rebuildEvent(eventId: String, builder: (TimelineEvent) -> TimelineEvent?, searchInNext: Boolean, searchInPrev: Boolean): Boolean {
|
||||
return tryOrNull {
|
||||
val builtIndex = getBuiltEventIndex(eventId, searchInNext = false, searchInPrev = false)
|
||||
if (builtIndex == null) {
|
||||
val foundInPrev = searchInPrev && prevChunk?.rebuildEvent(eventId, builder, searchInNext = false, searchInPrev = true).orFalse()
|
||||
if (foundInPrev) {
|
||||
return true
|
||||
}
|
||||
if (searchInNext) {
|
||||
return prevChunk?.rebuildEvent(eventId, builder, searchInPrev = false, searchInNext = true).orFalse()
|
||||
}
|
||||
return false
|
||||
}
|
||||
// Update the relation of existing event
|
||||
builtEvents.getOrNull(builtIndex)?.let { te ->
|
||||
val rebuiltEvent = builder(te)
|
||||
builtEvents[builtIndex] = rebuiltEvent!!
|
||||
true
|
||||
}
|
||||
}
|
||||
?: false
|
||||
}
|
||||
|
||||
fun close(closeNext: Boolean, closePrev: Boolean) {
|
||||
if (closeNext) {
|
||||
nextChunk?.close(closeNext = true, closePrev = false)
|
||||
}
|
||||
if (closePrev) {
|
||||
prevChunk?.close(closeNext = false, closePrev = true)
|
||||
}
|
||||
nextChunk = null
|
||||
prevChunk = null
|
||||
chunkEntity.removeChangeListener(chunkObjectListener)
|
||||
timelineEventEntities.removeChangeListener(timelineEventCollectionListener)
|
||||
}
|
||||
|
||||
private fun getNextDisplayIndex(direction: Timeline.Direction): Int? {
|
||||
val frozenTimelineEvents = timelineEventEntities.freeze()
|
||||
if (frozenTimelineEvents.isEmpty()) {
|
||||
return null
|
||||
}
|
||||
return if (builtEvents.isEmpty()) {
|
||||
if (initialEventId != null) {
|
||||
frozenTimelineEvents.where().equalTo(TimelineEventEntityFields.EVENT_ID, initialEventId).findFirst()?.displayIndex
|
||||
} else if (direction == Timeline.Direction.BACKWARDS) {
|
||||
frozenTimelineEvents.first()?.displayIndex
|
||||
} else {
|
||||
frozenTimelineEvents.last()?.displayIndex
|
||||
}
|
||||
} else if (direction == Timeline.Direction.FORWARDS) {
|
||||
builtEvents.first().displayIndex + 1
|
||||
} else {
|
||||
builtEvents.last().displayIndex - 1
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun RealmQuery<TimelineEventEntity>.offsets(
|
||||
direction: Timeline.Direction,
|
||||
count: Long,
|
||||
startDisplayIndex: Int
|
||||
): RealmQuery<TimelineEventEntity> {
|
||||
sort(TimelineEventEntityFields.DISPLAY_INDEX, Sort.DESCENDING)
|
||||
if (direction == Timeline.Direction.BACKWARDS) {
|
||||
lessThanOrEqualTo(TimelineEventEntityFields.DISPLAY_INDEX, startDisplayIndex)
|
||||
} else {
|
||||
greaterThanOrEqualTo(TimelineEventEntityFields.DISPLAY_INDEX, startDisplayIndex)
|
||||
}
|
||||
return limit(count)
|
||||
}
|
||||
|
||||
private fun Timeline.Direction.toPaginationDirection(): PaginationDirection {
|
||||
return if (this == Timeline.Direction.BACKWARDS) PaginationDirection.BACKWARDS else PaginationDirection.FORWARDS
|
||||
}
|
||||
|
||||
private fun ChunkEntity.sortedTimelineEvents(): RealmResults<TimelineEventEntity> {
|
||||
return timelineEvents.sort(TimelineEventEntityFields.DISPLAY_INDEX, Sort.DESCENDING)
|
||||
}
|
|
@ -25,22 +25,16 @@ import org.matrix.android.sdk.api.session.room.send.SendState
|
|||
import org.matrix.android.sdk.internal.database.helper.addIfNecessary
|
||||
import org.matrix.android.sdk.internal.database.helper.addStateEvent
|
||||
import org.matrix.android.sdk.internal.database.helper.addTimelineEvent
|
||||
import org.matrix.android.sdk.internal.database.helper.merge
|
||||
import org.matrix.android.sdk.internal.database.mapper.toEntity
|
||||
import org.matrix.android.sdk.internal.database.model.ChunkEntity
|
||||
import org.matrix.android.sdk.internal.database.model.EventInsertType
|
||||
import org.matrix.android.sdk.internal.database.model.RoomEntity
|
||||
import org.matrix.android.sdk.internal.database.model.RoomSummaryEntity
|
||||
import org.matrix.android.sdk.internal.database.model.deleteOnCascade
|
||||
import org.matrix.android.sdk.internal.database.model.TimelineEventEntity
|
||||
import org.matrix.android.sdk.internal.database.query.copyToRealmOrIgnore
|
||||
import org.matrix.android.sdk.internal.database.query.create
|
||||
import org.matrix.android.sdk.internal.database.query.find
|
||||
import org.matrix.android.sdk.internal.database.query.findAllIncludingEvents
|
||||
import org.matrix.android.sdk.internal.database.query.findLastForwardChunkOfRoom
|
||||
import org.matrix.android.sdk.internal.database.query.getOrCreate
|
||||
import org.matrix.android.sdk.internal.database.query.where
|
||||
import org.matrix.android.sdk.internal.di.SessionDatabase
|
||||
import org.matrix.android.sdk.internal.session.room.summary.RoomSummaryEventsHelper
|
||||
import org.matrix.android.sdk.internal.util.awaitTransaction
|
||||
import timber.log.Timber
|
||||
import javax.inject.Inject
|
||||
|
@ -136,21 +130,21 @@ internal class TokenChunkEventPersistor @Inject constructor(@SessionDatabase pri
|
|||
prevToken = receivedChunk.end
|
||||
}
|
||||
|
||||
val existingChunk = ChunkEntity.find(realm, roomId, prevToken = prevToken, nextToken = nextToken)
|
||||
if (existingChunk != null) {
|
||||
Timber.v("This chunk is already in the db, returns")
|
||||
return@awaitTransaction
|
||||
}
|
||||
val prevChunk = ChunkEntity.find(realm, roomId, nextToken = prevToken)
|
||||
val nextChunk = ChunkEntity.find(realm, roomId, prevToken = nextToken)
|
||||
|
||||
// The current chunk is the one we will keep all along the merge processChanges.
|
||||
// We try to look for a chunk next to the token,
|
||||
// otherwise we create a whole new one which is unlinked (not live)
|
||||
val currentChunk = if (direction == PaginationDirection.FORWARDS) {
|
||||
prevChunk?.apply { this.nextToken = nextToken }
|
||||
} else {
|
||||
nextChunk?.apply { this.prevToken = prevToken }
|
||||
val currentChunk = ChunkEntity.create(realm, prevToken = prevToken, nextToken = nextToken).apply {
|
||||
this.nextChunk = nextChunk
|
||||
this.prevChunk = prevChunk
|
||||
}
|
||||
?: ChunkEntity.create(realm, prevToken, nextToken)
|
||||
|
||||
if (receivedChunk.events.isNullOrEmpty() && !receivedChunk.hasMore()) {
|
||||
handleReachEnd(realm, roomId, direction, currentChunk)
|
||||
nextChunk?.prevChunk = currentChunk
|
||||
prevChunk?.nextChunk = currentChunk
|
||||
if (receivedChunk.events.isEmpty() && !receivedChunk.hasMore()) {
|
||||
handleReachEnd(roomId, direction, currentChunk)
|
||||
} else {
|
||||
handlePagination(realm, roomId, direction, receivedChunk, currentChunk)
|
||||
}
|
||||
|
@ -166,17 +160,10 @@ internal class TokenChunkEventPersistor @Inject constructor(@SessionDatabase pri
|
|||
}
|
||||
}
|
||||
|
||||
private fun handleReachEnd(realm: Realm, roomId: String, direction: PaginationDirection, currentChunk: ChunkEntity) {
|
||||
private fun handleReachEnd(roomId: String, direction: PaginationDirection, currentChunk: ChunkEntity) {
|
||||
Timber.v("Reach end of $roomId")
|
||||
if (direction == PaginationDirection.FORWARDS) {
|
||||
val currentLastForwardChunk = ChunkEntity.findLastForwardChunkOfRoom(realm, roomId)
|
||||
if (currentChunk != currentLastForwardChunk) {
|
||||
currentChunk.isLastForward = true
|
||||
currentLastForwardChunk?.deleteOnCascade(deleteStateEvents = false, canDeleteRoot = false)
|
||||
RoomSummaryEntity.where(realm, roomId).findFirst()?.apply {
|
||||
latestPreviewableEvent = RoomSummaryEventsHelper.getLatestPreviewableEvent(realm, roomId)
|
||||
}
|
||||
}
|
||||
Timber.v("We should keep the lastForward chunk unique, the one from sync")
|
||||
} else {
|
||||
currentChunk.isLastBackward = true
|
||||
}
|
||||
|
@ -209,6 +196,21 @@ internal class TokenChunkEventPersistor @Inject constructor(@SessionDatabase pri
|
|||
if (event.eventId == null || event.senderId == null) {
|
||||
return@forEach
|
||||
}
|
||||
//We check for the timeline event with this id
|
||||
val eventId = event.eventId
|
||||
val existingTimelineEvent = TimelineEventEntity.where(realm, roomId, eventId).findFirst()
|
||||
// If it exists, we want to skip here
|
||||
val existingChunk = existingTimelineEvent?.chunk?.firstOrNull()
|
||||
if (existingChunk != null) {
|
||||
if (direction == PaginationDirection.BACKWARDS) {
|
||||
currentChunk.prevChunk = existingChunk
|
||||
existingChunk.nextChunk = currentChunk
|
||||
} else if (direction == PaginationDirection.FORWARDS) {
|
||||
currentChunk.nextChunk = existingChunk
|
||||
existingChunk.prevChunk = currentChunk
|
||||
}
|
||||
return@forEach
|
||||
}
|
||||
val ageLocalTs = event.unsignedData?.age?.let { now - it }
|
||||
eventIds.add(event.eventId)
|
||||
val eventEntity = event.toEntity(roomId, SendState.SYNCED, ageLocalTs).copyToRealmOrIgnore(realm, EventInsertType.PAGINATION)
|
||||
|
@ -220,29 +222,8 @@ internal class TokenChunkEventPersistor @Inject constructor(@SessionDatabase pri
|
|||
}
|
||||
roomMemberContentsByUser[event.stateKey] = contentToUse.toModel<RoomMemberContent>()
|
||||
}
|
||||
|
||||
currentChunk.addTimelineEvent(roomId, eventEntity, direction, roomMemberContentsByUser)
|
||||
}
|
||||
// Find all the chunks which contain at least one event from the list of eventIds
|
||||
val chunks = ChunkEntity.findAllIncludingEvents(realm, eventIds)
|
||||
Timber.d("Found ${chunks.size} chunks containing at least one of the eventIds")
|
||||
val chunksToDelete = ArrayList<ChunkEntity>()
|
||||
chunks.forEach {
|
||||
if (it != currentChunk) {
|
||||
Timber.d("Merge $it")
|
||||
currentChunk.merge(roomId, it, direction)
|
||||
chunksToDelete.add(it)
|
||||
}
|
||||
}
|
||||
chunksToDelete.forEach {
|
||||
it.deleteOnCascade(deleteStateEvents = false, canDeleteRoot = false)
|
||||
}
|
||||
val roomSummaryEntity = RoomSummaryEntity.getOrCreate(realm, roomId)
|
||||
val shouldUpdateSummary = roomSummaryEntity.latestPreviewableEvent == null
|
||||
|| (chunksToDelete.isNotEmpty() && currentChunk.isLastForward && direction == PaginationDirection.FORWARDS)
|
||||
if (shouldUpdateSummary) {
|
||||
roomSummaryEntity.latestPreviewableEvent = RoomSummaryEventsHelper.getLatestPreviewableEvent(realm, roomId)
|
||||
}
|
||||
if (currentChunk.isValid) {
|
||||
RoomEntity.where(realm, roomId).findFirst()?.addIfNecessary(currentChunk)
|
||||
}
|
||||
|
|
|
@ -70,13 +70,12 @@ internal class UIEchoManager(
|
|||
return existingState != sendState
|
||||
}
|
||||
|
||||
fun onLocalEchoCreated(timelineEvent: TimelineEvent) {
|
||||
// Manage some ui echos (do it before filter because actual event could be filtered out)
|
||||
fun onLocalEchoCreated(timelineEvent: TimelineEvent): Boolean {
|
||||
when (timelineEvent.root.getClearType()) {
|
||||
EventType.REDACTION -> {
|
||||
}
|
||||
EventType.REACTION -> {
|
||||
val content = timelineEvent.root.content?.toModel<ReactionContent>()
|
||||
val content: ReactionContent? = timelineEvent.root.content?.toModel<ReactionContent>()
|
||||
if (RelationType.ANNOTATION == content?.relatesTo?.type) {
|
||||
val reaction = content.relatesTo.key
|
||||
val relatedEventID = content.relatesTo.eventId
|
||||
|
@ -96,11 +95,12 @@ internal class UIEchoManager(
|
|||
}
|
||||
Timber.v("On local echo created: ${timelineEvent.eventId}")
|
||||
inMemorySendingEvents.add(0, timelineEvent)
|
||||
return true
|
||||
}
|
||||
|
||||
fun decorateEventWithReactionUiEcho(timelineEvent: TimelineEvent): TimelineEvent? {
|
||||
fun decorateEventWithReactionUiEcho(timelineEvent: TimelineEvent): TimelineEvent {
|
||||
val relatedEventID = timelineEvent.eventId
|
||||
val contents = inMemoryReactions[relatedEventID] ?: return null
|
||||
val contents = inMemoryReactions[relatedEventID] ?: return timelineEvent
|
||||
|
||||
var existingAnnotationSummary = timelineEvent.annotations ?: EventAnnotationsSummary(
|
||||
relatedEventID
|
||||
|
|
|
@ -39,13 +39,13 @@ import im.vector.app.features.home.room.detail.timeline.factory.MergedHeaderItem
|
|||
import im.vector.app.features.home.room.detail.timeline.factory.ReadReceiptsItemFactory
|
||||
import im.vector.app.features.home.room.detail.timeline.factory.TimelineItemFactory
|
||||
import im.vector.app.features.home.room.detail.timeline.factory.TimelineItemFactoryParams
|
||||
import im.vector.app.features.home.room.detail.timeline.helper.TimelineEventsGroups
|
||||
import im.vector.app.features.home.room.detail.timeline.helper.ContentDownloadStateTrackerBinder
|
||||
import im.vector.app.features.home.room.detail.timeline.helper.ContentUploadStateTrackerBinder
|
||||
import im.vector.app.features.home.room.detail.timeline.helper.TimelineControllerInterceptorHelper
|
||||
import im.vector.app.features.home.room.detail.timeline.helper.TimelineEventDiffUtilCallback
|
||||
import im.vector.app.features.home.room.detail.timeline.helper.TimelineEventVisibilityHelper
|
||||
import im.vector.app.features.home.room.detail.timeline.helper.TimelineEventVisibilityStateChangedListener
|
||||
import im.vector.app.features.home.room.detail.timeline.helper.TimelineEventsGroups
|
||||
import im.vector.app.features.home.room.detail.timeline.helper.TimelineMediaSizeProvider
|
||||
import im.vector.app.features.home.room.detail.timeline.item.BasedMergedItem
|
||||
import im.vector.app.features.home.room.detail.timeline.item.DaySeparatorItem
|
||||
|
@ -244,17 +244,8 @@ class TimelineEventController @Inject constructor(private val dateFormatter: Vec
|
|||
interceptorHelper.intercept(models, partialState.unreadState, timeline, callback)
|
||||
}
|
||||
|
||||
fun update(viewState: RoomDetailViewState) = synchronized(modelCache) {
|
||||
fun update(viewState: RoomDetailViewState) {
|
||||
val newPartialState = PartialState(viewState)
|
||||
if (partialState.highlightedEventId != newPartialState.highlightedEventId) {
|
||||
// Clear cache to force a refresh
|
||||
for (i in 0 until modelCache.size) {
|
||||
if (modelCache[i]?.eventId == viewState.highlightedEventId
|
||||
|| modelCache[i]?.eventId == partialState.highlightedEventId) {
|
||||
modelCache[i] = null
|
||||
}
|
||||
}
|
||||
}
|
||||
if (newPartialState != partialState) {
|
||||
partialState = newPartialState
|
||||
requestModelBuild()
|
||||
|
@ -394,7 +385,7 @@ class TimelineEventController @Inject constructor(private val dateFormatter: Vec
|
|||
it.id(event.localId)
|
||||
it.setOnVisibilityStateChanged(TimelineEventVisibilityStateChangedListener(callback, event))
|
||||
}
|
||||
val isCacheable = eventModel is ItemWithEvents && eventModel.isCacheable()
|
||||
val isCacheable = eventModel is ItemWithEvents && eventModel.isCacheable() && !params.isHighlighted
|
||||
return CacheItemData(
|
||||
localId = event.localId,
|
||||
eventId = event.root.eventId,
|
||||
|
|
Loading…
Reference in a new issue