Merge pull request #4405 from vector-im/feature/fga/timeline_chunks_rework

Feature/fga/timeline chunks rework
This commit is contained in:
Benoit Marty 2022-01-03 20:20:42 +01:00 committed by GitHub
commit d670d3e872
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
35 changed files with 1507 additions and 1397 deletions

1
changelog.d/4405.feature Normal file
View file

@ -0,0 +1 @@
Change internal timeline management.

1
changelog.d/4405.removal Normal file
View file

@ -0,0 +1 @@
Introduce method onStateUpdated on Timeline.Callback

View file

@ -145,36 +145,9 @@ class CommonTestHelper(context: Context) {
* @param nbOfMessages the number of time the message will be sent * @param nbOfMessages the number of time the message will be sent
*/ */
fun sendTextMessage(room: Room, message: String, nbOfMessages: Int, timeout: Long = TestConstants.timeOutMillis): List<TimelineEvent> { fun sendTextMessage(room: Room, message: String, nbOfMessages: Int, timeout: Long = TestConstants.timeOutMillis): List<TimelineEvent> {
val sentEvents = ArrayList<TimelineEvent>(nbOfMessages)
val timeline = room.createTimeline(null, TimelineSettings(10)) val timeline = room.createTimeline(null, TimelineSettings(10))
timeline.start() timeline.start()
waitWithLatch(timeout + 1_000L * nbOfMessages) { latch -> val sentEvents = sendTextMessagesBatched(timeline, room, message, nbOfMessages, timeout)
val timelineListener = object : Timeline.Listener {
override fun onTimelineFailure(throwable: Throwable) {
}
override fun onNewTimelineEvents(eventIds: List<String>) {
// noop
}
override fun onTimelineUpdated(snapshot: List<TimelineEvent>) {
val newMessages = snapshot
.filter { it.root.sendState == SendState.SYNCED }
.filter { it.root.getClearType() == EventType.MESSAGE }
.filter { it.root.getClearContent().toModel<MessageContent>()?.body?.startsWith(message) == true }
Timber.v("New synced message size: ${newMessages.size}")
if (newMessages.size == nbOfMessages) {
sentEvents.addAll(newMessages)
// Remove listener now, if not at the next update sendEvents could change
timeline.removeListener(this)
latch.countDown()
}
}
}
timeline.addListener(timelineListener)
sendTextMessagesBatched(room, message, nbOfMessages)
}
timeline.dispose() timeline.dispose()
// Check that all events has been created // Check that all events has been created
assertEquals("Message number do not match $sentEvents", nbOfMessages.toLong(), sentEvents.size.toLong()) assertEquals("Message number do not match $sentEvents", nbOfMessages.toLong(), sentEvents.size.toLong())
@ -182,9 +155,10 @@ class CommonTestHelper(context: Context) {
} }
/** /**
* Will send nb of messages provided by count parameter but waits a bit every 10 messages to avoid gap in sync * Will send nb of messages provided by count parameter but waits every 10 messages to avoid gap in sync
*/ */
private fun sendTextMessagesBatched(room: Room, message: String, count: Int) { private fun sendTextMessagesBatched(timeline: Timeline, room: Room, message: String, count: Int, timeout: Long): List<TimelineEvent> {
val sentEvents = ArrayList<TimelineEvent>(count)
(1 until count + 1) (1 until count + 1)
.map { "$message #$it" } .map { "$message #$it" }
.chunked(10) .chunked(10)
@ -192,8 +166,34 @@ class CommonTestHelper(context: Context) {
batchedMessages.forEach { formattedMessage -> batchedMessages.forEach { formattedMessage ->
room.sendTextMessage(formattedMessage) room.sendTextMessage(formattedMessage)
} }
Thread.sleep(1_000L) waitWithLatch(timeout) { latch ->
val timelineListener = object : Timeline.Listener {
override fun onTimelineUpdated(snapshot: List<TimelineEvent>) {
val allSentMessages = snapshot
.filter { it.root.sendState == SendState.SYNCED }
.filter { it.root.getClearType() == EventType.MESSAGE }
.filter { it.root.getClearContent().toModel<MessageContent>()?.body?.startsWith(message) == true }
val hasSyncedAllBatchedMessages = allSentMessages
.map {
it.root.getClearContent().toModel<MessageContent>()?.body
}
.containsAll(batchedMessages)
if (allSentMessages.size == count) {
sentEvents.addAll(allSentMessages)
}
if (hasSyncedAllBatchedMessages) {
timeline.removeListener(this)
latch.countDown()
}
}
}
timeline.addListener(timelineListener)
}
} }
return sentEvents
} }
// PRIVATE METHODS ***************************************************************************** // PRIVATE METHODS *****************************************************************************
@ -332,13 +332,6 @@ class CommonTestHelper(context: Context) {
fun createEventListener(latch: CountDownLatch, predicate: (List<TimelineEvent>) -> Boolean): Timeline.Listener { fun createEventListener(latch: CountDownLatch, predicate: (List<TimelineEvent>) -> Boolean): Timeline.Listener {
return object : Timeline.Listener { return object : Timeline.Listener {
override fun onTimelineFailure(throwable: Throwable) {
// noop
}
override fun onNewTimelineEvents(eventIds: List<String>) {
// noop
}
override fun onTimelineUpdated(snapshot: List<TimelineEvent>) { override fun onTimelineUpdated(snapshot: List<TimelineEvent>) {
if (predicate(snapshot)) { if (predicate(snapshot)) {

View file

@ -1,183 +0,0 @@
/*
* Copyright 2020 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.session.room.timeline
import org.amshove.kluent.shouldBeFalse
import org.amshove.kluent.shouldBeTrue
import org.junit.Assert.assertTrue
import org.junit.FixMethodOrder
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.JUnit4
import org.junit.runners.MethodSorters
import org.matrix.android.sdk.InstrumentedTest
import org.matrix.android.sdk.api.extensions.orFalse
import org.matrix.android.sdk.api.session.events.model.EventType
import org.matrix.android.sdk.api.session.events.model.toModel
import org.matrix.android.sdk.api.session.room.model.message.MessageContent
import org.matrix.android.sdk.api.session.room.timeline.Timeline
import org.matrix.android.sdk.api.session.room.timeline.TimelineSettings
import org.matrix.android.sdk.common.CommonTestHelper
import org.matrix.android.sdk.common.CryptoTestHelper
import org.matrix.android.sdk.common.checkSendOrder
import timber.log.Timber
import java.util.concurrent.CountDownLatch
@RunWith(JUnit4::class)
@FixMethodOrder(MethodSorters.JVM)
class TimelineBackToPreviousLastForwardTest : InstrumentedTest {
private val commonTestHelper = CommonTestHelper(context())
private val cryptoTestHelper = CryptoTestHelper(commonTestHelper)
/**
* This test ensure that if we have a chunk in the timeline which is due to a sync, and we click to permalink of an
* even contained in a previous lastForward chunk, we will be able to go back to the live
*/
@Test
fun backToPreviousLastForwardTest() {
val cryptoTestData = cryptoTestHelper.doE2ETestWithAliceAndBobInARoom(false)
val aliceSession = cryptoTestData.firstSession
val bobSession = cryptoTestData.secondSession!!
val aliceRoomId = cryptoTestData.roomId
aliceSession.cryptoService().setWarnOnUnknownDevices(false)
bobSession.cryptoService().setWarnOnUnknownDevices(false)
val roomFromAlicePOV = aliceSession.getRoom(aliceRoomId)!!
val roomFromBobPOV = bobSession.getRoom(aliceRoomId)!!
val bobTimeline = roomFromBobPOV.createTimeline(null, TimelineSettings(30))
bobTimeline.start()
var roomCreationEventId: String? = null
run {
val lock = CountDownLatch(1)
val eventsListener = commonTestHelper.createEventListener(lock) { snapshot ->
Timber.e("Bob timeline updated: with ${snapshot.size} events:")
snapshot.forEach {
Timber.w(" event ${it.root}")
}
roomCreationEventId = snapshot.lastOrNull()?.root?.eventId
// Ok, we have the 8 first messages of the initial sync (room creation and bob join event)
snapshot.size == 8
}
bobTimeline.addListener(eventsListener)
commonTestHelper.await(lock)
bobTimeline.removeAllListeners()
bobTimeline.hasMoreToLoad(Timeline.Direction.BACKWARDS).shouldBeFalse()
bobTimeline.hasMoreToLoad(Timeline.Direction.FORWARDS).shouldBeFalse()
}
// Bob stop to sync
bobSession.stopSync()
val messageRoot = "First messages from Alice"
// Alice sends 30 messages
commonTestHelper.sendTextMessage(
roomFromAlicePOV,
messageRoot,
30)
// Bob start to sync
bobSession.startSync(true)
run {
val lock = CountDownLatch(1)
val eventsListener = commonTestHelper.createEventListener(lock) { snapshot ->
Timber.e("Bob timeline updated: with ${snapshot.size} events:")
snapshot.forEach {
Timber.w(" event ${it.root}")
}
// Ok, we have the 10 last messages from Alice.
snapshot.size == 10 &&
snapshot.all { it.root.content.toModel<MessageContent>()?.body?.startsWith(messageRoot).orFalse() }
}
bobTimeline.addListener(eventsListener)
commonTestHelper.await(lock)
bobTimeline.removeAllListeners()
bobTimeline.hasMoreToLoad(Timeline.Direction.BACKWARDS).shouldBeTrue()
bobTimeline.hasMoreToLoad(Timeline.Direction.FORWARDS).shouldBeFalse()
}
// Bob navigate to the first event (room creation event), so inside the previous last forward chunk
run {
val lock = CountDownLatch(1)
val eventsListener = commonTestHelper.createEventListener(lock) { snapshot ->
Timber.e("Bob timeline updated: with ${snapshot.size} events:")
snapshot.forEach {
Timber.w(" event ${it.root}")
}
// The event is in db, so it is fetch and auto pagination occurs, half of the number of events we have for this chunk (?)
snapshot.size == 4
}
bobTimeline.addListener(eventsListener)
// Restart the timeline to the first sent event, which is already in the database, so pagination should start automatically
assertTrue(roomFromBobPOV.getTimeLineEvent(roomCreationEventId!!) != null)
bobTimeline.restartWithEventId(roomCreationEventId)
commonTestHelper.await(lock)
bobTimeline.removeAllListeners()
bobTimeline.hasMoreToLoad(Timeline.Direction.FORWARDS).shouldBeTrue()
bobTimeline.hasMoreToLoad(Timeline.Direction.BACKWARDS).shouldBeFalse()
}
// Bob scroll to the future
run {
val lock = CountDownLatch(1)
val eventsListener = commonTestHelper.createEventListener(lock) { snapshot ->
Timber.e("Bob timeline updated: with ${snapshot.size} events:")
snapshot.forEach {
Timber.w(" event ${it.root}")
}
// Bob can see the first event of the room (so Back pagination has worked)
snapshot.lastOrNull()?.root?.getClearType() == EventType.STATE_ROOM_CREATE &&
// 8 for room creation item, and 30 for the forward pagination
snapshot.size == 38 &&
snapshot.checkSendOrder(messageRoot, 30, 0)
}
bobTimeline.addListener(eventsListener)
bobTimeline.paginate(Timeline.Direction.FORWARDS, 50)
commonTestHelper.await(lock)
bobTimeline.removeAllListeners()
bobTimeline.hasMoreToLoad(Timeline.Direction.FORWARDS).shouldBeFalse()
bobTimeline.hasMoreToLoad(Timeline.Direction.BACKWARDS).shouldBeFalse()
}
bobTimeline.dispose()
cryptoTestData.cleanUp(commonTestHelper)
}
}

View file

@ -16,6 +16,8 @@
package org.matrix.android.sdk.session.room.timeline package org.matrix.android.sdk.session.room.timeline
import kotlinx.coroutines.runBlocking
import org.amshove.kluent.internal.assertEquals
import org.amshove.kluent.shouldBeFalse import org.amshove.kluent.shouldBeFalse
import org.amshove.kluent.shouldBeTrue import org.amshove.kluent.shouldBeTrue
import org.junit.FixMethodOrder import org.junit.FixMethodOrder
@ -123,54 +125,29 @@ class TimelineForwardPaginationTest : InstrumentedTest {
// Alice paginates BACKWARD and FORWARD of 50 events each // Alice paginates BACKWARD and FORWARD of 50 events each
// Then she can only navigate FORWARD // Then she can only navigate FORWARD
run { run {
val lock = CountDownLatch(1) val snapshot = runBlocking {
val aliceEventsListener = commonTestHelper.createEventListener(lock) { snapshot -> aliceTimeline.awaitPaginate(Timeline.Direction.BACKWARDS, 50)
Timber.e("Alice timeline updated: with ${snapshot.size} events:") aliceTimeline.awaitPaginate(Timeline.Direction.FORWARDS, 50)
snapshot.forEach {
Timber.w(" event ${it.root.content}")
}
// Alice can see the first event of the room (so Back pagination has worked)
snapshot.lastOrNull()?.root?.getClearType() == EventType.STATE_ROOM_CREATE &&
// 6 for room creation item (backward pagination), 1 for the context, and 50 for the forward pagination
snapshot.size == 57 // 6 + 1 + 50
} }
aliceTimeline.addListener(aliceEventsListener)
// Restart the timeline to the first sent event
// We ask to load event backward and forward
aliceTimeline.paginate(Timeline.Direction.BACKWARDS, 50)
aliceTimeline.paginate(Timeline.Direction.FORWARDS, 50)
commonTestHelper.await(lock)
aliceTimeline.removeAllListeners()
aliceTimeline.hasMoreToLoad(Timeline.Direction.FORWARDS).shouldBeTrue() aliceTimeline.hasMoreToLoad(Timeline.Direction.FORWARDS).shouldBeTrue()
aliceTimeline.hasMoreToLoad(Timeline.Direction.BACKWARDS).shouldBeFalse() aliceTimeline.hasMoreToLoad(Timeline.Direction.BACKWARDS).shouldBeFalse()
assertEquals(EventType.STATE_ROOM_CREATE, snapshot.lastOrNull()?.root?.getClearType())
// 6 for room creation item (backward pagination), 1 for the context, and 50 for the forward pagination
// 6 + 1 + 50
assertEquals(57, snapshot.size)
} }
// Alice paginates once again FORWARD for 50 events // Alice paginates once again FORWARD for 50 events
// All the timeline is retrieved, she cannot paginate anymore in both direction // All the timeline is retrieved, she cannot paginate anymore in both direction
run { run {
val lock = CountDownLatch(1)
val aliceEventsListener = commonTestHelper.createEventListener(lock) { snapshot ->
Timber.e("Alice timeline updated: with ${snapshot.size} events:")
snapshot.forEach {
Timber.w(" event ${it.root.content}")
}
// 6 for room creation item (backward pagination),and numberOfMessagesToSend (all the message of the room)
snapshot.size == 6 + numberOfMessagesToSend &&
snapshot.checkSendOrder(message, numberOfMessagesToSend, 0)
}
aliceTimeline.addListener(aliceEventsListener)
// Ask for a forward pagination // Ask for a forward pagination
aliceTimeline.paginate(Timeline.Direction.FORWARDS, 50) val snapshot = runBlocking {
aliceTimeline.awaitPaginate(Timeline.Direction.FORWARDS, 50)
commonTestHelper.await(lock) }
aliceTimeline.removeAllListeners() // 6 for room creation item (backward pagination),and numberOfMessagesToSend (all the message of the room)
snapshot.size == 6 + numberOfMessagesToSend &&
snapshot.checkSendOrder(message, numberOfMessagesToSend, 0)
// The timeline is fully loaded // The timeline is fully loaded
aliceTimeline.hasMoreToLoad(Timeline.Direction.FORWARDS).shouldBeFalse() aliceTimeline.hasMoreToLoad(Timeline.Direction.FORWARDS).shouldBeFalse()

View file

@ -168,10 +168,8 @@ class TimelinePreviousLastForwardTest : InstrumentedTest {
bobTimeline.addListener(eventsListener) bobTimeline.addListener(eventsListener)
// Restart the timeline to the first sent event, and paginate in both direction // Restart the timeline to the first sent event
bobTimeline.restartWithEventId(firstMessageFromAliceId) bobTimeline.restartWithEventId(firstMessageFromAliceId)
bobTimeline.paginate(Timeline.Direction.BACKWARDS, 50)
bobTimeline.paginate(Timeline.Direction.FORWARDS, 50)
commonTestHelper.await(lock) commonTestHelper.await(lock)
bobTimeline.removeAllListeners() bobTimeline.removeAllListeners()

View file

@ -0,0 +1,104 @@
/*
* Copyright 2020 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.session.room.timeline
import kotlinx.coroutines.runBlocking
import org.amshove.kluent.internal.assertEquals
import org.junit.FixMethodOrder
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.JUnit4
import org.junit.runners.MethodSorters
import org.matrix.android.sdk.InstrumentedTest
import org.matrix.android.sdk.api.extensions.orFalse
import org.matrix.android.sdk.api.session.events.model.isTextMessage
import org.matrix.android.sdk.api.session.events.model.toModel
import org.matrix.android.sdk.api.session.room.model.message.MessageTextContent
import org.matrix.android.sdk.api.session.room.timeline.Timeline
import org.matrix.android.sdk.api.session.room.timeline.TimelineSettings
import org.matrix.android.sdk.common.CommonTestHelper
import org.matrix.android.sdk.common.CryptoTestHelper
import org.matrix.android.sdk.common.TestConstants
@RunWith(JUnit4::class)
@FixMethodOrder(MethodSorters.JVM)
class TimelineSimpleBackPaginationTest : InstrumentedTest {
private val commonTestHelper = CommonTestHelper(context())
private val cryptoTestHelper = CryptoTestHelper(commonTestHelper)
@Test
fun timeline_backPaginate_shouldReachEndOfTimeline() {
val numberOfMessagesToSent = 200
val cryptoTestData = cryptoTestHelper.doE2ETestWithAliceAndBobInARoom(false)
val aliceSession = cryptoTestData.firstSession
val bobSession = cryptoTestData.secondSession!!
val roomId = cryptoTestData.roomId
aliceSession.cryptoService().setWarnOnUnknownDevices(false)
bobSession.cryptoService().setWarnOnUnknownDevices(false)
val roomFromAlicePOV = aliceSession.getRoom(roomId)!!
val roomFromBobPOV = bobSession.getRoom(roomId)!!
// Alice sends X messages
val message = "Message from Alice"
commonTestHelper.sendTextMessage(
roomFromAlicePOV,
message,
numberOfMessagesToSent)
val bobTimeline = roomFromBobPOV.createTimeline(null, TimelineSettings(30))
bobTimeline.start()
commonTestHelper.waitWithLatch(timeout = TestConstants.timeOutMillis * 10) {
val listener = object : Timeline.Listener {
override fun onStateUpdated(direction: Timeline.Direction, state: Timeline.PaginationState) {
if (direction == Timeline.Direction.FORWARDS) {
return
}
if (state.hasMoreToLoad && !state.loading) {
bobTimeline.paginate(Timeline.Direction.BACKWARDS, 30)
} else if (!state.hasMoreToLoad) {
bobTimeline.removeListener(this)
it.countDown()
}
}
}
bobTimeline.addListener(listener)
bobTimeline.paginate(Timeline.Direction.BACKWARDS, 30)
}
assertEquals(false, bobTimeline.hasMoreToLoad(Timeline.Direction.FORWARDS))
assertEquals(false, bobTimeline.hasMoreToLoad(Timeline.Direction.BACKWARDS))
val onlySentEvents = runBlocking {
bobTimeline.getSnapshot()
}
.filter {
it.root.isTextMessage()
}.filter {
(it.root.content.toModel<MessageTextContent>())?.body?.startsWith(message).orFalse()
}
assertEquals(numberOfMessagesToSent, onlySentEvents.size)
bobTimeline.dispose()
cryptoTestData.cleanUp(commonTestHelper)
}
}

View file

@ -1,84 +0,0 @@
/*
* Copyright 2020 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.session.room.timeline
import com.zhuinden.monarchy.Monarchy
import org.matrix.android.sdk.InstrumentedTest
internal class TimelineTest : InstrumentedTest {
companion object {
private const val ROOM_ID = "roomId"
}
private lateinit var monarchy: Monarchy
// @Before
// fun setup() {
// Timber.plant(Timber.DebugTree())
// Realm.init(context())
// val testConfiguration = RealmConfiguration.Builder().name("test-realm")
// .modules(SessionRealmModule()).build()
//
// Realm.deleteRealm(testConfiguration)
// monarchy = Monarchy.Builder().setRealmConfiguration(testConfiguration).build()
// RoomDataHelper.fakeInitialSync(monarchy, ROOM_ID)
// }
//
// private fun createTimeline(initialEventId: String? = null): Timeline {
// val taskExecutor = TaskExecutor(testCoroutineDispatchers)
// val tokenChunkEventPersistor = TokenChunkEventPersistor(monarchy)
// val paginationTask = FakePaginationTask @Inject constructor(tokenChunkEventPersistor)
// val getContextOfEventTask = FakeGetContextOfEventTask @Inject constructor(tokenChunkEventPersistor)
// val roomMemberExtractor = SenderRoomMemberExtractor(ROOM_ID)
// val timelineEventFactory = TimelineEventFactory(roomMemberExtractor, EventRelationExtractor())
// return DefaultTimeline(
// ROOM_ID,
// initialEventId,
// monarchy.realmConfiguration,
// taskExecutor,
// getContextOfEventTask,
// timelineEventFactory,
// paginationTask,
// null)
// }
//
// @Test
// fun backPaginate_shouldLoadMoreEvents_whenPaginateIsCalled() {
// val timeline = createTimeline()
// timeline.start()
// val paginationCount = 30
// var initialLoad = 0
// val latch = CountDownLatch(2)
// var timelineEvents: List<TimelineEvent> = emptyList()
// timeline.listener = object : Timeline.Listener {
// override fun onTimelineUpdated(snapshot: List<TimelineEvent>) {
// if (snapshot.isNotEmpty()) {
// if (initialLoad == 0) {
// initialLoad = snapshot.size
// }
// timelineEvents = snapshot
// latch.countDown()
// timeline.paginate(Timeline.Direction.BACKWARDS, paginationCount)
// }
// }
// }
// latch.await()
// timelineEvents.size shouldBeEqualTo initialLoad + paginationCount
// timeline.dispose()
// }
}

View file

@ -71,14 +71,10 @@ interface Timeline {
fun paginate(direction: Direction, count: Int) fun paginate(direction: Direction, count: Int)
/** /**
* Returns the number of sending events * This is the same than the regular paginate method but waits for the results instead
* of relying on the timeline listener.
*/ */
fun pendingEventCount(): Int suspend fun awaitPaginate(direction: Direction, count: Int): List<TimelineEvent>
/**
* Returns the number of failed sending events.
*/
fun failedToDeliverEventCount(): Int
/** /**
* Returns the index of a built event or null. * Returns the index of a built event or null.
@ -86,14 +82,14 @@ interface Timeline {
fun getIndexOfEvent(eventId: String?): Int? fun getIndexOfEvent(eventId: String?): Int?
/** /**
* Returns the built [TimelineEvent] at index or null * Returns the current pagination state for the direction.
*/ */
fun getTimelineEventAtIndex(index: Int): TimelineEvent? fun getPaginationState(direction: Direction): PaginationState
/** /**
* Returns the built [TimelineEvent] with eventId or null * Returns a snapshot of the timeline in his current state.
*/ */
fun getTimelineEventWithId(eventId: String?): TimelineEvent? fun getSnapshot(): List<TimelineEvent>
interface Listener { interface Listener {
/** /**
@ -101,19 +97,33 @@ interface Timeline {
* The latest event is the first in the list * The latest event is the first in the list
* @param snapshot the most up to date snapshot * @param snapshot the most up to date snapshot
*/ */
fun onTimelineUpdated(snapshot: List<TimelineEvent>) fun onTimelineUpdated(snapshot: List<TimelineEvent>) = Unit
/** /**
* Called whenever an error we can't recover from occurred * Called whenever an error we can't recover from occurred
*/ */
fun onTimelineFailure(throwable: Throwable) fun onTimelineFailure(throwable: Throwable) = Unit
/** /**
* Called when new events come through the sync * Called when new events come through the sync
*/ */
fun onNewTimelineEvents(eventIds: List<String>) fun onNewTimelineEvents(eventIds: List<String>) = Unit
/**
* Called when the pagination state has changed in one direction
*/
fun onStateUpdated(direction: Direction, state: PaginationState) = Unit
} }
/**
* Pagination state
*/
data class PaginationState(
val hasMoreToLoad: Boolean = true,
val loading: Boolean = false,
val inError: Boolean = false
)
/** /**
* This is used to paginate in one or another direction. * This is used to paginate in one or another direction.
*/ */

View file

@ -47,6 +47,10 @@ data class TimelineEvent(
*/ */
val localId: Long, val localId: Long,
val eventId: String, val eventId: String,
/**
* This display index is the position in the current chunk.
* It's not unique on the timeline as it's reset on each chunk.
*/
val displayIndex: Int, val displayIndex: Int,
val senderInfo: SenderInfo, val senderInfo: SenderInfo,
val annotations: EventAnnotationsSummary? = null, val annotations: EventAnnotationsSummary? = null,

View file

@ -1,89 +0,0 @@
/*
* Copyright 2020 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.internal.database
import io.realm.Realm
import io.realm.RealmConfiguration
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import org.matrix.android.sdk.api.session.Session
import org.matrix.android.sdk.api.session.SessionLifecycleObserver
import org.matrix.android.sdk.internal.database.helper.nextDisplayIndex
import org.matrix.android.sdk.internal.database.model.ChunkEntity
import org.matrix.android.sdk.internal.database.model.ChunkEntityFields
import org.matrix.android.sdk.internal.database.model.EventEntity
import org.matrix.android.sdk.internal.database.model.RoomEntity
import org.matrix.android.sdk.internal.database.model.TimelineEventEntity
import org.matrix.android.sdk.internal.database.model.TimelineEventEntityFields
import org.matrix.android.sdk.internal.database.model.deleteOnCascade
import org.matrix.android.sdk.internal.di.SessionDatabase
import org.matrix.android.sdk.internal.session.room.timeline.PaginationDirection
import org.matrix.android.sdk.internal.task.TaskExecutor
import timber.log.Timber
import javax.inject.Inject
private const val MAX_NUMBER_OF_EVENTS_IN_DB = 35_000L
private const val MIN_NUMBER_OF_EVENTS_BY_CHUNK = 300
/**
* This class makes sure to stay under a maximum number of events as it makes Realm to be unusable when listening to events
* when the database is getting too big. This will try incrementally to remove the biggest chunks until we get below the threshold.
* We make sure to still have a minimum number of events so it's not becoming unusable.
* So this won't work for users with a big number of very active rooms.
*/
internal class DatabaseCleaner @Inject constructor(@SessionDatabase private val realmConfiguration: RealmConfiguration,
private val taskExecutor: TaskExecutor) : SessionLifecycleObserver {
override fun onSessionStarted(session: Session) {
taskExecutor.executorScope.launch(Dispatchers.Default) {
awaitTransaction(realmConfiguration) { realm ->
val allRooms = realm.where(RoomEntity::class.java).findAll()
Timber.v("There are ${allRooms.size} rooms in this session")
cleanUp(realm, MAX_NUMBER_OF_EVENTS_IN_DB / 2L)
}
}
}
private fun cleanUp(realm: Realm, threshold: Long) {
val numberOfEvents = realm.where(EventEntity::class.java).findAll().size
val numberOfTimelineEvents = realm.where(TimelineEventEntity::class.java).findAll().size
Timber.v("Number of events in db: $numberOfEvents | Number of timeline events in db: $numberOfTimelineEvents")
if (threshold <= MIN_NUMBER_OF_EVENTS_BY_CHUNK || numberOfTimelineEvents < MAX_NUMBER_OF_EVENTS_IN_DB) {
Timber.v("Db is low enough")
} else {
val thresholdChunks = realm.where(ChunkEntity::class.java)
.greaterThan(ChunkEntityFields.NUMBER_OF_TIMELINE_EVENTS, threshold)
.findAll()
Timber.v("There are ${thresholdChunks.size} chunks to clean with more than $threshold events")
for (chunk in thresholdChunks) {
val maxDisplayIndex = chunk.nextDisplayIndex(PaginationDirection.FORWARDS)
val thresholdDisplayIndex = maxDisplayIndex - threshold
val eventsToRemove = chunk.timelineEvents.where().lessThan(TimelineEventEntityFields.DISPLAY_INDEX, thresholdDisplayIndex).findAll()
Timber.v("There are ${eventsToRemove.size} events to clean in chunk: ${chunk.identifier()} from room ${chunk.room?.first()?.roomId}")
chunk.numberOfTimelineEvents = chunk.numberOfTimelineEvents - eventsToRemove.size
eventsToRemove.forEach {
val canDeleteRoot = it.root?.stateKey == null
it.deleteOnCascade(canDeleteRoot)
}
// We reset the prevToken so we will need to fetch again.
chunk.prevToken = null
}
cleanUp(realm, (threshold / 1.5).toLong())
}
}
}

View file

@ -25,6 +25,7 @@ import org.matrix.android.sdk.api.session.room.model.RoomJoinRulesContent
import org.matrix.android.sdk.api.session.room.model.VersioningState import org.matrix.android.sdk.api.session.room.model.VersioningState
import org.matrix.android.sdk.api.session.room.model.create.RoomCreateContent import org.matrix.android.sdk.api.session.room.model.create.RoomCreateContent
import org.matrix.android.sdk.api.session.room.model.tag.RoomTag import org.matrix.android.sdk.api.session.room.model.tag.RoomTag
import org.matrix.android.sdk.internal.database.model.ChunkEntityFields
import org.matrix.android.sdk.internal.database.model.CurrentStateEventEntityFields import org.matrix.android.sdk.internal.database.model.CurrentStateEventEntityFields
import org.matrix.android.sdk.internal.database.model.EditAggregatedSummaryEntityFields import org.matrix.android.sdk.internal.database.model.EditAggregatedSummaryEntityFields
import org.matrix.android.sdk.internal.database.model.EditionOfEventFields import org.matrix.android.sdk.internal.database.model.EditionOfEventFields
@ -54,7 +55,7 @@ internal class RealmSessionStoreMigration @Inject constructor(
) : RealmMigration { ) : RealmMigration {
companion object { companion object {
const val SESSION_STORE_SCHEMA_VERSION = 19L const val SESSION_STORE_SCHEMA_VERSION = 20L
} }
/** /**
@ -86,6 +87,7 @@ internal class RealmSessionStoreMigration @Inject constructor(
if (oldVersion <= 16) migrateTo17(realm) if (oldVersion <= 16) migrateTo17(realm)
if (oldVersion <= 17) migrateTo18(realm) if (oldVersion <= 17) migrateTo18(realm)
if (oldVersion <= 18) migrateTo19(realm) if (oldVersion <= 18) migrateTo19(realm)
if (oldVersion <= 19) migrateTo20(realm)
} }
private fun migrateTo1(realm: DynamicRealm) { private fun migrateTo1(realm: DynamicRealm) {
@ -390,4 +392,26 @@ internal class RealmSessionStoreMigration @Inject constructor(
} }
} }
} }
private fun migrateTo20(realm: DynamicRealm) {
Timber.d("Step 19 -> 20")
realm.schema.get("ChunkEntity")?.apply {
if (hasField("numberOfTimelineEvents")) {
removeField("numberOfTimelineEvents")
}
var cleanOldChunks = false
if (!hasField(ChunkEntityFields.NEXT_CHUNK.`$`)) {
cleanOldChunks = true
addRealmObjectField(ChunkEntityFields.NEXT_CHUNK.`$`, this)
}
if (!hasField(ChunkEntityFields.PREV_CHUNK.`$`)) {
cleanOldChunks = true
addRealmObjectField(ChunkEntityFields.PREV_CHUNK.`$`, this)
}
if (cleanOldChunks) {
val chunkEntities = realm.where("ChunkEntity").equalTo(ChunkEntityFields.IS_LAST_FORWARD, false).findAll()
chunkEntities.deleteAllFromRealm()
}
}
}
} }

View file

@ -110,7 +110,7 @@ internal fun ChunkEntity.addTimelineEvent(roomId: String,
true true
} }
} }
numberOfTimelineEvents++ // numberOfTimelineEvents++
timelineEvents.add(timelineEventEntity) timelineEvents.add(timelineEventEntity)
} }
@ -191,3 +191,29 @@ internal fun ChunkEntity.nextDisplayIndex(direction: PaginationDirection): Int {
} }
} }
} }
internal fun ChunkEntity.doesNextChunksVerifyCondition(linkCondition: (ChunkEntity) -> Boolean): Boolean {
var nextChunkToCheck = this.nextChunk
while (nextChunkToCheck != null) {
if (linkCondition(nextChunkToCheck)) {
return true
}
nextChunkToCheck = nextChunkToCheck.nextChunk
}
return false
}
internal fun ChunkEntity.isMoreRecentThan(chunkToCheck: ChunkEntity): Boolean {
if (this.isLastForward) return true
if (chunkToCheck.isLastForward) return false
// Check if the chunk to check is linked to this one
if (chunkToCheck.doesNextChunksVerifyCondition { it == this }) {
return true
}
// Otherwise check if this chunk is linked to last forward
if (this.doesNextChunksVerifyCondition { it.isLastForward }) {
return true
}
// We don't know, so we assume it's false
return false
}

View file

@ -28,3 +28,13 @@ internal fun TimelineEventEntity.Companion.nextId(realm: Realm): Long {
currentIdNum.toLong() + 1 currentIdNum.toLong() + 1
} }
} }
internal fun TimelineEventEntity.isMoreRecentThan(eventToCheck: TimelineEventEntity): Boolean {
val currentChunk = this.chunk?.first() ?: return false
val chunkToCheck = eventToCheck.chunk?.firstOrNull() ?: return false
return if (currentChunk == chunkToCheck) {
this.displayIndex >= eventToCheck.displayIndex
} else {
currentChunk.isMoreRecentThan(chunkToCheck)
}
}

View file

@ -27,9 +27,10 @@ import org.matrix.android.sdk.internal.extensions.clearWith
internal open class ChunkEntity(@Index var prevToken: String? = null, internal open class ChunkEntity(@Index var prevToken: String? = null,
// Because of gaps we can have several chunks with nextToken == null // Because of gaps we can have several chunks with nextToken == null
@Index var nextToken: String? = null, @Index var nextToken: String? = null,
var prevChunk: ChunkEntity? = null,
var nextChunk: ChunkEntity? = null,
var stateEvents: RealmList<EventEntity> = RealmList(), var stateEvents: RealmList<EventEntity> = RealmList(),
var timelineEvents: RealmList<TimelineEventEntity> = RealmList(), var timelineEvents: RealmList<TimelineEventEntity> = RealmList(),
var numberOfTimelineEvents: Long = 0,
// Only one chunk will have isLastForward == true // Only one chunk will have isLastForward == true
@Index var isLastForward: Boolean = false, @Index var isLastForward: Boolean = false,
@Index var isLastBackward: Boolean = false @Index var isLastBackward: Boolean = false

View file

@ -40,8 +40,6 @@ internal open class EventEntity(@Index var eventId: String = "",
var unsignedData: String? = null, var unsignedData: String? = null,
var redacts: String? = null, var redacts: String? = null,
var decryptionResultJson: String? = null, var decryptionResultJson: String? = null,
var decryptionErrorCode: String? = null,
var decryptionErrorReason: String? = null,
var ageLocalTs: Long? = null var ageLocalTs: Long? = null
) : RealmObject() { ) : RealmObject() {
@ -55,6 +53,16 @@ internal open class EventEntity(@Index var eventId: String = "",
sendStateStr = value.name sendStateStr = value.name
} }
var decryptionErrorCode: String? = null
set(value) {
if (value != field) field = value
}
var decryptionErrorReason: String? = null
set(value) {
if (value != field) field = value
}
companion object companion object
fun setDecryptionResult(result: MXEventDecryptionResult, clearEvent: JsonDict? = null) { fun setDecryptionResult(result: MXEventDecryptionResult, clearEvent: JsonDict? = null) {

View file

@ -46,7 +46,5 @@ internal fun TimelineEventEntity.deleteOnCascade(canDeleteRoot: Boolean) {
if (canDeleteRoot) { if (canDeleteRoot) {
root?.deleteFromRealm() root?.deleteFromRealm()
} }
annotations?.deleteOnCascade()
readReceipts?.deleteOnCascade()
deleteFromRealm() deleteFromRealm()
} }

View file

@ -18,6 +18,7 @@ package org.matrix.android.sdk.internal.database.query
import io.realm.Realm import io.realm.Realm
import io.realm.RealmConfiguration import io.realm.RealmConfiguration
import org.matrix.android.sdk.api.session.events.model.LocalEcho import org.matrix.android.sdk.api.session.events.model.LocalEcho
import org.matrix.android.sdk.internal.database.helper.isMoreRecentThan
import org.matrix.android.sdk.internal.database.model.ChunkEntity import org.matrix.android.sdk.internal.database.model.ChunkEntity
import org.matrix.android.sdk.internal.database.model.ReadMarkerEntity import org.matrix.android.sdk.internal.database.model.ReadMarkerEntity
import org.matrix.android.sdk.internal.database.model.ReadReceiptEntity import org.matrix.android.sdk.internal.database.model.ReadReceiptEntity
@ -33,28 +34,26 @@ internal fun isEventRead(realmConfiguration: RealmConfiguration,
if (LocalEcho.isLocalEchoId(eventId)) { if (LocalEcho.isLocalEchoId(eventId)) {
return true return true
} }
// If we don't know if the event has been read, we assume it's not
var isEventRead = false var isEventRead = false
Realm.getInstance(realmConfiguration).use { realm -> Realm.getInstance(realmConfiguration).use { realm ->
val liveChunk = ChunkEntity.findLastForwardChunkOfRoom(realm, roomId) ?: return@use val latestEvent = TimelineEventEntity.latestEvent(realm, roomId, true)
val eventToCheck = liveChunk.timelineEvents.find(eventId) // If latest event is from you we are sure the event is read
if (latestEvent?.root?.sender == userId) {
return true
}
val eventToCheck = TimelineEventEntity.where(realm, roomId, eventId).findFirst()
isEventRead = when { isEventRead = when {
eventToCheck == null -> hasReadMissingEvent( eventToCheck == null -> false
realm = realm,
latestChunkEntity = liveChunk,
roomId = roomId,
userId = userId,
eventId = eventId
)
eventToCheck.root?.sender == userId -> true eventToCheck.root?.sender == userId -> true
else -> { else -> {
val readReceipt = ReadReceiptEntity.where(realm, roomId, userId).findFirst() ?: return@use val readReceipt = ReadReceiptEntity.where(realm, roomId, userId).findFirst() ?: return@use
val readReceiptIndex = liveChunk.timelineEvents.find(readReceipt.eventId)?.displayIndex ?: Int.MIN_VALUE val readReceiptEvent = TimelineEventEntity.where(realm, roomId, readReceipt.eventId).findFirst() ?: return@use
eventToCheck.displayIndex <= readReceiptIndex readReceiptEvent.isMoreRecentThan(eventToCheck)
} }
} }
} }
return isEventRead return isEventRead
} }

View file

@ -47,7 +47,6 @@ import org.matrix.android.sdk.internal.crypto.secrets.DefaultSharedSecretStorage
import org.matrix.android.sdk.internal.crypto.tasks.DefaultRedactEventTask import org.matrix.android.sdk.internal.crypto.tasks.DefaultRedactEventTask
import org.matrix.android.sdk.internal.crypto.tasks.RedactEventTask import org.matrix.android.sdk.internal.crypto.tasks.RedactEventTask
import org.matrix.android.sdk.internal.crypto.verification.VerificationMessageProcessor import org.matrix.android.sdk.internal.crypto.verification.VerificationMessageProcessor
import org.matrix.android.sdk.internal.database.DatabaseCleaner
import org.matrix.android.sdk.internal.database.EventInsertLiveObserver import org.matrix.android.sdk.internal.database.EventInsertLiveObserver
import org.matrix.android.sdk.internal.database.RealmSessionProvider import org.matrix.android.sdk.internal.database.RealmSessionProvider
import org.matrix.android.sdk.internal.database.SessionRealmConfigurationFactory import org.matrix.android.sdk.internal.database.SessionRealmConfigurationFactory
@ -339,10 +338,6 @@ internal abstract class SessionModule {
@IntoSet @IntoSet
abstract fun bindIdentityService(service: DefaultIdentityService): SessionLifecycleObserver abstract fun bindIdentityService(service: DefaultIdentityService): SessionLifecycleObserver
@Binds
@IntoSet
abstract fun bindDatabaseCleaner(cleaner: DatabaseCleaner): SessionLifecycleObserver
@Binds @Binds
@IntoSet @IntoSet
abstract fun bindRealmSessionProvider(provider: RealmSessionProvider): SessionLifecycleObserver abstract fun bindRealmSessionProvider(provider: RealmSessionProvider): SessionLifecycleObserver

View file

@ -34,12 +34,10 @@ import org.matrix.android.sdk.internal.database.query.isEventRead
import org.matrix.android.sdk.internal.database.query.where import org.matrix.android.sdk.internal.database.query.where
import org.matrix.android.sdk.internal.di.SessionDatabase import org.matrix.android.sdk.internal.di.SessionDatabase
import org.matrix.android.sdk.internal.di.UserId import org.matrix.android.sdk.internal.di.UserId
import org.matrix.android.sdk.internal.task.TaskExecutor
internal class DefaultReadService @AssistedInject constructor( internal class DefaultReadService @AssistedInject constructor(
@Assisted private val roomId: String, @Assisted private val roomId: String,
@SessionDatabase private val monarchy: Monarchy, @SessionDatabase private val monarchy: Monarchy,
private val taskExecutor: TaskExecutor,
private val setReadMarkersTask: SetReadMarkersTask, private val setReadMarkersTask: SetReadMarkersTask,
private val readReceiptsSummaryMapper: ReadReceiptsSummaryMapper, private val readReceiptsSummaryMapper: ReadReceiptsSummaryMapper,
@UserId private val userId: String @UserId private val userId: String

View file

@ -23,6 +23,7 @@ import dagger.assisted.AssistedFactory
import dagger.assisted.AssistedInject import dagger.assisted.AssistedInject
import io.realm.Sort import io.realm.Sort
import io.realm.kotlin.where import io.realm.kotlin.where
import org.matrix.android.sdk.api.MatrixCoroutineDispatchers
import org.matrix.android.sdk.api.session.events.model.isImageMessage import org.matrix.android.sdk.api.session.events.model.isImageMessage
import org.matrix.android.sdk.api.session.events.model.isVideoMessage import org.matrix.android.sdk.api.session.events.model.isVideoMessage
import org.matrix.android.sdk.api.session.room.timeline.Timeline import org.matrix.android.sdk.api.session.room.timeline.Timeline
@ -54,7 +55,8 @@ internal class DefaultTimelineService @AssistedInject constructor(
private val timelineEventMapper: TimelineEventMapper, private val timelineEventMapper: TimelineEventMapper,
private val loadRoomMembersTask: LoadRoomMembersTask, private val loadRoomMembersTask: LoadRoomMembersTask,
private val threadsAwarenessHandler: ThreadsAwarenessHandler, private val threadsAwarenessHandler: ThreadsAwarenessHandler,
private val readReceiptHandler: ReadReceiptHandler private val readReceiptHandler: ReadReceiptHandler,
private val coroutineDispatchers: MatrixCoroutineDispatchers
) : TimelineService { ) : TimelineService {
@AssistedFactory @AssistedFactory
@ -66,19 +68,18 @@ internal class DefaultTimelineService @AssistedInject constructor(
return DefaultTimeline( return DefaultTimeline(
roomId = roomId, roomId = roomId,
initialEventId = eventId, initialEventId = eventId,
settings = settings,
realmConfiguration = monarchy.realmConfiguration, realmConfiguration = monarchy.realmConfiguration,
taskExecutor = taskExecutor, coroutineDispatchers = coroutineDispatchers,
contextOfEventTask = contextOfEventTask,
paginationTask = paginationTask, paginationTask = paginationTask,
timelineEventMapper = timelineEventMapper, timelineEventMapper = timelineEventMapper,
settings = settings,
timelineInput = timelineInput, timelineInput = timelineInput,
eventDecryptor = eventDecryptor, eventDecryptor = eventDecryptor,
fetchTokenAndPaginateTask = fetchTokenAndPaginateTask, fetchTokenAndPaginateTask = fetchTokenAndPaginateTask,
realmSessionProvider = realmSessionProvider,
loadRoomMembersTask = loadRoomMembersTask, loadRoomMembersTask = loadRoomMembersTask,
threadsAwarenessHandler = threadsAwarenessHandler, readReceiptHandler = readReceiptHandler,
readReceiptHandler = readReceiptHandler getEventTask = contextOfEventTask,
threadsAwarenessHandler = threadsAwarenessHandler
) )
} }

View file

@ -16,9 +16,8 @@
package org.matrix.android.sdk.internal.session.room.timeline package org.matrix.android.sdk.internal.session.room.timeline
internal data class TimelineState( internal enum class LoadMoreResult {
val hasReachedEnd: Boolean = false, REACHED_END,
val hasMoreInCache: Boolean = true, SUCCESS,
val isPaginating: Boolean = false, FAILURE
val requestedPaginationCount: Int = 0 }
)

View file

@ -0,0 +1,232 @@
/*
* Copyright (c) 2021 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.internal.session.room.timeline
import io.realm.OrderedCollectionChangeSet
import io.realm.OrderedRealmCollectionChangeListener
import io.realm.Realm
import io.realm.RealmResults
import kotlinx.coroutines.CompletableDeferred
import org.matrix.android.sdk.api.extensions.orFalse
import org.matrix.android.sdk.api.session.room.send.SendState
import org.matrix.android.sdk.api.session.room.timeline.Timeline
import org.matrix.android.sdk.api.session.room.timeline.TimelineEvent
import org.matrix.android.sdk.api.session.room.timeline.TimelineSettings
import org.matrix.android.sdk.internal.database.mapper.TimelineEventMapper
import org.matrix.android.sdk.internal.database.model.ChunkEntity
import org.matrix.android.sdk.internal.database.model.ChunkEntityFields
import org.matrix.android.sdk.internal.database.query.findAllIncludingEvents
import org.matrix.android.sdk.internal.database.query.where
import org.matrix.android.sdk.internal.session.sync.handler.room.ThreadsAwarenessHandler
import java.util.concurrent.atomic.AtomicReference
/**
* This class is responsible for keeping an instance of chunkEntity and timelineChunk according to the strategy.
* There is 2 different mode: Live and Permalink.
* In Live, we will query for the live chunk (isLastForward = true).
* In Permalink, we will query for the chunk including the eventId we are looking for.
* Once we got a ChunkEntity we wrap it with TimelineChunk class so we dispatch any methods for loading data.
*/
internal class LoadTimelineStrategy(
private val roomId: String,
private val timelineId: String,
private val mode: Mode,
private val dependencies: Dependencies) {
sealed interface Mode {
object Live : Mode
data class Permalink(val originEventId: String) : Mode
fun originEventId(): String? {
return if (this is Permalink) {
originEventId
} else {
null
}
}
}
data class Dependencies(
val timelineSettings: TimelineSettings,
val realm: AtomicReference<Realm>,
val eventDecryptor: TimelineEventDecryptor,
val paginationTask: PaginationTask,
val fetchTokenAndPaginateTask: FetchTokenAndPaginateTask,
val getContextOfEventTask: GetContextOfEventTask,
val timelineInput: TimelineInput,
val timelineEventMapper: TimelineEventMapper,
val threadsAwarenessHandler: ThreadsAwarenessHandler,
val onEventsUpdated: (Boolean) -> Unit,
val onLimitedTimeline: () -> Unit,
val onNewTimelineEvents: (List<String>) -> Unit
)
private var getContextLatch: CompletableDeferred<Unit>? = null
private var chunkEntity: RealmResults<ChunkEntity>? = null
private var timelineChunk: TimelineChunk? = null
private val chunkEntityListener = OrderedRealmCollectionChangeListener { _: RealmResults<ChunkEntity>, changeSet: OrderedCollectionChangeSet ->
// Can be call either when you open a permalink on an unknown event
// or when there is a gap in the timeline.
val shouldRebuildChunk = changeSet.insertions.isNotEmpty()
if (shouldRebuildChunk) {
timelineChunk?.close(closeNext = true, closePrev = true)
timelineChunk = chunkEntity?.createTimelineChunk()
// If we are waiting for a result of get context, post completion
getContextLatch?.complete(Unit)
// If we have a gap, just tell the timeline about it.
if (timelineChunk?.hasReachedLastForward().orFalse()) {
dependencies.onLimitedTimeline()
}
}
}
private val uiEchoManagerListener = object : UIEchoManager.Listener {
override fun rebuildEvent(eventId: String, builder: (TimelineEvent) -> TimelineEvent?): Boolean {
return timelineChunk?.rebuildEvent(eventId, builder, searchInNext = true, searchInPrev = true).orFalse()
}
}
private val timelineInputListener = object : TimelineInput.Listener {
override fun onLocalEchoCreated(roomId: String, timelineEvent: TimelineEvent) {
if (roomId != this@LoadTimelineStrategy.roomId) {
return
}
if (uiEchoManager.onLocalEchoCreated(timelineEvent)) {
dependencies.onNewTimelineEvents(listOf(timelineEvent.eventId))
dependencies.onEventsUpdated(false)
}
}
override fun onLocalEchoUpdated(roomId: String, eventId: String, sendState: SendState) {
if (roomId != this@LoadTimelineStrategy.roomId) {
return
}
if (uiEchoManager.onSendStateUpdated(eventId, sendState)) {
dependencies.onEventsUpdated(false)
}
}
override fun onNewTimelineEvents(roomId: String, eventIds: List<String>) {
if (roomId == this@LoadTimelineStrategy.roomId && hasReachedLastForward()) {
dependencies.onNewTimelineEvents(eventIds)
}
}
}
private val uiEchoManager = UIEchoManager(uiEchoManagerListener)
private val sendingEventsDataSource: SendingEventsDataSource = RealmSendingEventsDataSource(
roomId = roomId,
realm = dependencies.realm,
uiEchoManager = uiEchoManager,
timelineEventMapper = dependencies.timelineEventMapper,
onEventsUpdated = dependencies.onEventsUpdated
)
fun onStart() {
dependencies.eventDecryptor.start()
dependencies.timelineInput.listeners.add(timelineInputListener)
val realm = dependencies.realm.get()
sendingEventsDataSource.start()
chunkEntity = getChunkEntity(realm).also {
it.addChangeListener(chunkEntityListener)
timelineChunk = it.createTimelineChunk()
}
}
fun onStop() {
dependencies.eventDecryptor.destroy()
dependencies.timelineInput.listeners.remove(timelineInputListener)
chunkEntity?.removeChangeListener(chunkEntityListener)
sendingEventsDataSource.stop()
timelineChunk?.close(closeNext = true, closePrev = true)
getContextLatch?.cancel()
chunkEntity = null
timelineChunk = null
}
suspend fun loadMore(count: Int, direction: Timeline.Direction, fetchOnServerIfNeeded: Boolean = true): LoadMoreResult {
if (mode is Mode.Permalink && timelineChunk == null) {
val params = GetContextOfEventTask.Params(roomId, mode.originEventId)
try {
getContextLatch = CompletableDeferred()
dependencies.getContextOfEventTask.execute(params)
// waits for the query to be fulfilled
getContextLatch?.await()
getContextLatch = null
} catch (failure: Throwable) {
return LoadMoreResult.FAILURE
}
}
return timelineChunk?.loadMore(count, direction, fetchOnServerIfNeeded) ?: LoadMoreResult.FAILURE
}
fun getBuiltEventIndex(eventId: String): Int? {
return timelineChunk?.getBuiltEventIndex(eventId, searchInNext = true, searchInPrev = true)
}
fun getBuiltEvent(eventId: String): TimelineEvent? {
return timelineChunk?.getBuiltEvent(eventId, searchInNext = true, searchInPrev = true)
}
fun buildSnapshot(): List<TimelineEvent> {
return buildSendingEvents() + timelineChunk?.builtItems(includesNext = true, includesPrev = true).orEmpty()
}
private fun buildSendingEvents(): List<TimelineEvent> {
return if (hasReachedLastForward()) {
sendingEventsDataSource.buildSendingEvents()
} else {
emptyList()
}
}
private fun getChunkEntity(realm: Realm): RealmResults<ChunkEntity> {
return if (mode is Mode.Permalink) {
ChunkEntity.findAllIncludingEvents(realm, listOf(mode.originEventId))
} else {
ChunkEntity.where(realm, roomId)
.equalTo(ChunkEntityFields.IS_LAST_FORWARD, true)
.findAll()
}
}
private fun hasReachedLastForward(): Boolean {
return timelineChunk?.hasReachedLastForward().orFalse()
}
private fun RealmResults<ChunkEntity>.createTimelineChunk(): TimelineChunk? {
return firstOrNull()?.let {
return TimelineChunk(
chunkEntity = it,
timelineSettings = dependencies.timelineSettings,
roomId = roomId,
timelineId = timelineId,
eventDecryptor = dependencies.eventDecryptor,
paginationTask = dependencies.paginationTask,
fetchTokenAndPaginateTask = dependencies.fetchTokenAndPaginateTask,
timelineEventMapper = dependencies.timelineEventMapper,
uiEchoManager = uiEchoManager,
threadsAwarenessHandler = dependencies.threadsAwarenessHandler,
initialEventId = mode.originEventId(),
onBuiltEvents = dependencies.onEventsUpdated
)
}
}
}

View file

@ -0,0 +1,86 @@
/*
* Copyright (c) 2021 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.internal.session.room.timeline
import io.realm.Realm
import io.realm.RealmChangeListener
import io.realm.RealmList
import org.matrix.android.sdk.api.session.room.timeline.TimelineEvent
import org.matrix.android.sdk.internal.database.mapper.TimelineEventMapper
import org.matrix.android.sdk.internal.database.model.RoomEntity
import org.matrix.android.sdk.internal.database.model.TimelineEventEntity
import org.matrix.android.sdk.internal.database.query.where
import java.util.concurrent.atomic.AtomicReference
internal interface SendingEventsDataSource {
fun start()
fun stop()
fun buildSendingEvents(): List<TimelineEvent>
}
internal class RealmSendingEventsDataSource(
private val roomId: String,
private val realm: AtomicReference<Realm>,
private val uiEchoManager: UIEchoManager,
private val timelineEventMapper: TimelineEventMapper,
private val onEventsUpdated: (Boolean) -> Unit
) : SendingEventsDataSource {
private var roomEntity: RoomEntity? = null
private var sendingTimelineEvents: RealmList<TimelineEventEntity>? = null
private var frozenSendingTimelineEvents: RealmList<TimelineEventEntity>? = null
private val sendingTimelineEventsListener = RealmChangeListener<RealmList<TimelineEventEntity>> { events ->
uiEchoManager.onSentEventsInDatabase(events.map { it.eventId })
frozenSendingTimelineEvents = sendingTimelineEvents?.freeze()
onEventsUpdated(false)
}
override fun start() {
val safeRealm = realm.get()
roomEntity = RoomEntity.where(safeRealm, roomId = roomId).findFirst()
sendingTimelineEvents = roomEntity?.sendingTimelineEvents
sendingTimelineEvents?.addChangeListener(sendingTimelineEventsListener)
}
override fun stop() {
sendingTimelineEvents?.removeChangeListener(sendingTimelineEventsListener)
sendingTimelineEvents = null
roomEntity = null
}
override fun buildSendingEvents(): List<TimelineEvent> {
val builtSendingEvents = mutableListOf<TimelineEvent>()
uiEchoManager.getInMemorySendingEvents()
.addWithUiEcho(builtSendingEvents)
frozenSendingTimelineEvents
?.filter { timelineEvent ->
builtSendingEvents.none { it.eventId == timelineEvent.eventId }
}
?.map {
timelineEventMapper.map(it)
}?.addWithUiEcho(builtSendingEvents)
return builtSendingEvents
}
private fun List<TimelineEvent>.addWithUiEcho(target: MutableList<TimelineEvent>) {
target.addAll(
map { uiEchoManager.updateSentStateWithUiEcho(it) }
)
}
}

View file

@ -0,0 +1,479 @@
/*
* Copyright (c) 2021 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.internal.session.room.timeline
import io.realm.OrderedCollectionChangeSet
import io.realm.OrderedRealmCollectionChangeListener
import io.realm.RealmObjectChangeListener
import io.realm.RealmQuery
import io.realm.RealmResults
import io.realm.Sort
import kotlinx.coroutines.CompletableDeferred
import org.matrix.android.sdk.api.extensions.orFalse
import org.matrix.android.sdk.api.extensions.tryOrNull
import org.matrix.android.sdk.api.session.events.model.EventType
import org.matrix.android.sdk.api.session.room.timeline.Timeline
import org.matrix.android.sdk.api.session.room.timeline.TimelineEvent
import org.matrix.android.sdk.api.session.room.timeline.TimelineSettings
import org.matrix.android.sdk.internal.database.mapper.EventMapper
import org.matrix.android.sdk.internal.database.mapper.TimelineEventMapper
import org.matrix.android.sdk.internal.database.model.ChunkEntity
import org.matrix.android.sdk.internal.database.model.ChunkEntityFields
import org.matrix.android.sdk.internal.database.model.TimelineEventEntity
import org.matrix.android.sdk.internal.database.model.TimelineEventEntityFields
import org.matrix.android.sdk.internal.session.sync.handler.room.ThreadsAwarenessHandler
import timber.log.Timber
import java.util.Collections
import java.util.concurrent.atomic.AtomicBoolean
/**
* This is a wrapper around a ChunkEntity in the database.
* It does mainly listen to the db timeline events.
* It also triggers pagination to the server when needed, or dispatch to the prev or next chunk if any.
*/
internal class TimelineChunk(private val chunkEntity: ChunkEntity,
private val timelineSettings: TimelineSettings,
private val roomId: String,
private val timelineId: String,
private val eventDecryptor: TimelineEventDecryptor,
private val paginationTask: PaginationTask,
private val fetchTokenAndPaginateTask: FetchTokenAndPaginateTask,
private val timelineEventMapper: TimelineEventMapper,
private val uiEchoManager: UIEchoManager? = null,
private val threadsAwarenessHandler: ThreadsAwarenessHandler,
private val initialEventId: String?,
private val onBuiltEvents: (Boolean) -> Unit) {
private val isLastForward = AtomicBoolean(chunkEntity.isLastForward)
private val isLastBackward = AtomicBoolean(chunkEntity.isLastBackward)
private var prevChunkLatch: CompletableDeferred<Unit>? = null
private var nextChunkLatch: CompletableDeferred<Unit>? = null
private val chunkObjectListener = RealmObjectChangeListener<ChunkEntity> { _, changeSet ->
if (changeSet == null) return@RealmObjectChangeListener
if (changeSet.isDeleted.orFalse()) {
return@RealmObjectChangeListener
}
Timber.v("on chunk (${chunkEntity.identifier()}) changed: ${changeSet.changedFields?.joinToString(",")}")
if (changeSet.isFieldChanged(ChunkEntityFields.IS_LAST_FORWARD)) {
isLastForward.set(chunkEntity.isLastForward)
}
if (changeSet.isFieldChanged(ChunkEntityFields.IS_LAST_BACKWARD)) {
isLastBackward.set(chunkEntity.isLastBackward)
}
if (changeSet.isFieldChanged(ChunkEntityFields.NEXT_CHUNK.`$`)) {
nextChunk = createTimelineChunk(chunkEntity.nextChunk)
nextChunkLatch?.complete(Unit)
}
if (changeSet.isFieldChanged(ChunkEntityFields.PREV_CHUNK.`$`)) {
prevChunk = createTimelineChunk(chunkEntity.prevChunk)
prevChunkLatch?.complete(Unit)
}
}
private val timelineEventsChangeListener =
OrderedRealmCollectionChangeListener { results: RealmResults<TimelineEventEntity>, changeSet: OrderedCollectionChangeSet ->
Timber.v("on timeline events chunk update")
val frozenResults = results.freeze()
handleDatabaseChangeSet(frozenResults, changeSet)
}
private var timelineEventEntities: RealmResults<TimelineEventEntity> = chunkEntity.sortedTimelineEvents()
private val builtEvents: MutableList<TimelineEvent> = Collections.synchronizedList(ArrayList())
private val builtEventsIndexes: MutableMap<String, Int> = Collections.synchronizedMap(HashMap<String, Int>())
private var nextChunk: TimelineChunk? = null
private var prevChunk: TimelineChunk? = null
init {
timelineEventEntities.addChangeListener(timelineEventsChangeListener)
chunkEntity.addChangeListener(chunkObjectListener)
}
fun hasReachedLastForward(): Boolean {
return if (isLastForward.get()) {
true
} else {
nextChunk?.hasReachedLastForward().orFalse()
}
}
fun builtItems(includesNext: Boolean, includesPrev: Boolean): List<TimelineEvent> {
val deepBuiltItems = ArrayList<TimelineEvent>(builtEvents.size)
if (includesNext) {
val nextEvents = nextChunk?.builtItems(includesNext = true, includesPrev = false).orEmpty()
deepBuiltItems.addAll(nextEvents)
}
deepBuiltItems.addAll(builtEvents)
if (includesPrev) {
val prevEvents = prevChunk?.builtItems(includesNext = false, includesPrev = true).orEmpty()
deepBuiltItems.addAll(prevEvents)
}
return deepBuiltItems
}
/**
* This will take care of loading and building events of this chunk for the given direction and count.
* If @param fetchFromServerIfNeeded is true, it will try to fetch more events on server to get the right amount of data.
* This method will also post a snapshot as soon the data is built from db to avoid waiting for server response.
*/
suspend fun loadMore(count: Int, direction: Timeline.Direction, fetchOnServerIfNeeded: Boolean = true): LoadMoreResult {
if (direction == Timeline.Direction.FORWARDS && nextChunk != null) {
return nextChunk?.loadMore(count, direction, fetchOnServerIfNeeded) ?: LoadMoreResult.FAILURE
} else if (direction == Timeline.Direction.BACKWARDS && prevChunk != null) {
return prevChunk?.loadMore(count, direction, fetchOnServerIfNeeded) ?: LoadMoreResult.FAILURE
}
val loadFromStorageCount = loadFromStorage(count, direction)
Timber.v("Has loaded $loadFromStorageCount items from storage in $direction")
val offsetCount = count - loadFromStorageCount
return if (direction == Timeline.Direction.FORWARDS && isLastForward.get()) {
LoadMoreResult.REACHED_END
} else if (direction == Timeline.Direction.BACKWARDS && isLastBackward.get()) {
LoadMoreResult.REACHED_END
} else if (offsetCount == 0) {
LoadMoreResult.SUCCESS
} else {
delegateLoadMore(fetchOnServerIfNeeded, offsetCount, direction)
}
}
private suspend fun delegateLoadMore(fetchFromServerIfNeeded: Boolean, offsetCount: Int, direction: Timeline.Direction): LoadMoreResult {
return if (direction == Timeline.Direction.FORWARDS) {
val nextChunkEntity = chunkEntity.nextChunk
when {
nextChunkEntity != null -> {
if (nextChunk == null) {
nextChunk = createTimelineChunk(nextChunkEntity)
}
nextChunk?.loadMore(offsetCount, direction, fetchFromServerIfNeeded) ?: LoadMoreResult.FAILURE
}
fetchFromServerIfNeeded -> {
fetchFromServer(offsetCount, chunkEntity.nextToken, direction)
}
else -> {
LoadMoreResult.SUCCESS
}
}
} else {
val prevChunkEntity = chunkEntity.prevChunk
when {
prevChunkEntity != null -> {
if (prevChunk == null) {
prevChunk = createTimelineChunk(prevChunkEntity)
}
prevChunk?.loadMore(offsetCount, direction, fetchFromServerIfNeeded) ?: LoadMoreResult.FAILURE
}
fetchFromServerIfNeeded -> {
fetchFromServer(offsetCount, chunkEntity.prevToken, direction)
}
else -> {
LoadMoreResult.SUCCESS
}
}
}
}
fun getBuiltEventIndex(eventId: String, searchInNext: Boolean, searchInPrev: Boolean): Int? {
val builtEventIndex = builtEventsIndexes[eventId]
if (builtEventIndex != null) {
return getOffsetIndex() + builtEventIndex
}
if (searchInNext) {
val nextBuiltEventIndex = nextChunk?.getBuiltEventIndex(eventId, searchInNext = true, searchInPrev = false)
if (nextBuiltEventIndex != null) {
return nextBuiltEventIndex
}
}
if (searchInPrev) {
val prevBuiltEventIndex = prevChunk?.getBuiltEventIndex(eventId, searchInNext = false, searchInPrev = true)
if (prevBuiltEventIndex != null) {
return prevBuiltEventIndex
}
}
return null
}
fun getBuiltEvent(eventId: String, searchInNext: Boolean, searchInPrev: Boolean): TimelineEvent? {
val builtEventIndex = builtEventsIndexes[eventId]
if (builtEventIndex != null) {
return builtEvents.getOrNull(builtEventIndex)
}
if (searchInNext) {
val nextBuiltEvent = nextChunk?.getBuiltEvent(eventId, searchInNext = true, searchInPrev = false)
if (nextBuiltEvent != null) {
return nextBuiltEvent
}
}
if (searchInPrev) {
val prevBuiltEvent = prevChunk?.getBuiltEvent(eventId, searchInNext = false, searchInPrev = true)
if (prevBuiltEvent != null) {
return prevBuiltEvent
}
}
return null
}
fun rebuildEvent(eventId: String, builder: (TimelineEvent) -> TimelineEvent?, searchInNext: Boolean, searchInPrev: Boolean): Boolean {
return tryOrNull {
val builtIndex = getBuiltEventIndex(eventId, searchInNext = false, searchInPrev = false)
if (builtIndex == null) {
val foundInPrev = searchInPrev && prevChunk?.rebuildEvent(eventId, builder, searchInNext = false, searchInPrev = true).orFalse()
if (foundInPrev) {
return true
}
if (searchInNext) {
return prevChunk?.rebuildEvent(eventId, builder, searchInPrev = false, searchInNext = true).orFalse()
}
return false
}
// Update the relation of existing event
builtEvents.getOrNull(builtIndex)?.let { te ->
val rebuiltEvent = builder(te)
builtEvents[builtIndex] = rebuiltEvent!!
true
}
}
?: false
}
fun close(closeNext: Boolean, closePrev: Boolean) {
if (closeNext) {
nextChunk?.close(closeNext = true, closePrev = false)
}
if (closePrev) {
prevChunk?.close(closeNext = false, closePrev = true)
}
nextChunk = null
nextChunkLatch?.cancel()
prevChunk = null
prevChunkLatch?.cancel()
chunkEntity.removeChangeListener(chunkObjectListener)
timelineEventEntities.removeChangeListener(timelineEventsChangeListener)
}
/**
* This method tries to read events from the current chunk.
*/
private suspend fun loadFromStorage(count: Int, direction: Timeline.Direction): Int {
val displayIndex = getNextDisplayIndex(direction) ?: return 0
val baseQuery = timelineEventEntities.where()
val timelineEvents = baseQuery.offsets(direction, count, displayIndex).findAll().orEmpty()
if (timelineEvents.isEmpty()) return 0
fetchRootThreadEventsIfNeeded(timelineEvents)
if (direction == Timeline.Direction.FORWARDS) {
builtEventsIndexes.entries.forEach { it.setValue(it.value + timelineEvents.size) }
}
timelineEvents
.mapIndexed { index, timelineEventEntity ->
val timelineEvent = timelineEventEntity.buildAndDecryptIfNeeded()
if (timelineEvent.root.type == EventType.STATE_ROOM_CREATE) {
isLastBackward.set(true)
}
if (direction == Timeline.Direction.FORWARDS) {
builtEventsIndexes[timelineEvent.eventId] = index
builtEvents.add(index, timelineEvent)
} else {
builtEventsIndexes[timelineEvent.eventId] = builtEvents.size
builtEvents.add(timelineEvent)
}
}
return timelineEvents.size
}
/**
* This function is responsible to fetch and store the root event of a thread event
* in order to be able to display the event to the user appropriately
*/
private suspend fun fetchRootThreadEventsIfNeeded(offsetResults: List<TimelineEventEntity>) {
val eventEntityList = offsetResults
.mapNotNull {
it.root
}.map {
EventMapper.map(it)
}
threadsAwarenessHandler.fetchRootThreadEventsIfNeeded(eventEntityList)
}
private fun TimelineEventEntity.buildAndDecryptIfNeeded(): TimelineEvent {
val timelineEvent = buildTimelineEvent(this)
val transactionId = timelineEvent.root.unsignedData?.transactionId
uiEchoManager?.onSyncedEvent(transactionId)
if (timelineEvent.isEncrypted() &&
timelineEvent.root.mxDecryptionResult == null) {
timelineEvent.root.eventId?.also { eventDecryptor.requestDecryption(TimelineEventDecryptor.DecryptionRequest(timelineEvent.root, timelineId)) }
}
return timelineEvent
}
private fun buildTimelineEvent(eventEntity: TimelineEventEntity) = timelineEventMapper.map(
timelineEventEntity = eventEntity,
buildReadReceipts = timelineSettings.buildReadReceipts
).let {
// eventually enhance with ui echo?
(uiEchoManager?.decorateEventWithReactionUiEcho(it) ?: it)
}
/**
* Will try to fetch a new chunk on the home server.
* It will take care to update the database by inserting new events and linking new chunk
* with this one.
*/
private suspend fun fetchFromServer(count: Int, token: String?, direction: Timeline.Direction): LoadMoreResult {
val latch = if (direction == Timeline.Direction.FORWARDS) {
nextChunkLatch = CompletableDeferred()
nextChunkLatch
} else {
prevChunkLatch = CompletableDeferred()
prevChunkLatch
}
val loadMoreResult = try {
if (token == null) {
if (direction == Timeline.Direction.BACKWARDS || !chunkEntity.hasBeenALastForwardChunk()) return LoadMoreResult.REACHED_END
val lastKnownEventId = chunkEntity.sortedTimelineEvents().firstOrNull()?.eventId ?: return LoadMoreResult.FAILURE
val taskParams = FetchTokenAndPaginateTask.Params(roomId, lastKnownEventId, direction.toPaginationDirection(), count)
fetchTokenAndPaginateTask.execute(taskParams).toLoadMoreResult()
} else {
Timber.v("Fetch $count more events on server")
val taskParams = PaginationTask.Params(roomId, token, direction.toPaginationDirection(), count)
paginationTask.execute(taskParams).toLoadMoreResult()
}
} catch (failure: Throwable) {
Timber.e("Failed to fetch from server: $failure", failure)
LoadMoreResult.FAILURE
}
return if (loadMoreResult == LoadMoreResult.SUCCESS) {
latch?.await()
loadMore(count, direction, fetchOnServerIfNeeded = false)
} else {
loadMoreResult
}
}
private fun TokenChunkEventPersistor.Result.toLoadMoreResult(): LoadMoreResult {
return when (this) {
TokenChunkEventPersistor.Result.REACHED_END -> LoadMoreResult.REACHED_END
TokenChunkEventPersistor.Result.SHOULD_FETCH_MORE,
TokenChunkEventPersistor.Result.SUCCESS -> LoadMoreResult.SUCCESS
}
}
private fun getOffsetIndex(): Int {
var offset = 0
var currentNextChunk = nextChunk
while (currentNextChunk != null) {
offset += currentNextChunk.builtEvents.size
currentNextChunk = currentNextChunk.nextChunk
}
return offset
}
/**
* This method is responsible for managing insertions and updates of events on this chunk.
*
*/
private fun handleDatabaseChangeSet(frozenResults: RealmResults<TimelineEventEntity>, changeSet: OrderedCollectionChangeSet) {
val insertions = changeSet.insertionRanges
for (range in insertions) {
val newItems = frozenResults
.subList(range.startIndex, range.startIndex + range.length)
.map { it.buildAndDecryptIfNeeded() }
builtEventsIndexes.entries.filter { it.value >= range.startIndex }.forEach { it.setValue(it.value + range.length) }
newItems.mapIndexed { index, timelineEvent ->
if (timelineEvent.root.type == EventType.STATE_ROOM_CREATE) {
isLastBackward.set(true)
}
val correctedIndex = range.startIndex + index
builtEvents.add(correctedIndex, timelineEvent)
builtEventsIndexes[timelineEvent.eventId] = correctedIndex
}
}
val modifications = changeSet.changeRanges
for (range in modifications) {
for (modificationIndex in (range.startIndex until range.startIndex + range.length)) {
val updatedEntity = frozenResults[modificationIndex] ?: continue
try {
builtEvents[modificationIndex] = updatedEntity.buildAndDecryptIfNeeded()
} catch (failure: Throwable) {
Timber.v("Fail to update items at index: $modificationIndex")
}
}
}
if (insertions.isNotEmpty() || modifications.isNotEmpty()) {
onBuiltEvents(true)
}
}
private fun getNextDisplayIndex(direction: Timeline.Direction): Int? {
val frozenTimelineEvents = timelineEventEntities.freeze()
if (frozenTimelineEvents.isEmpty()) {
return null
}
return if (builtEvents.isEmpty()) {
if (initialEventId != null) {
frozenTimelineEvents.where().equalTo(TimelineEventEntityFields.EVENT_ID, initialEventId).findFirst()?.displayIndex
} else if (direction == Timeline.Direction.BACKWARDS) {
frozenTimelineEvents.first()?.displayIndex
} else {
frozenTimelineEvents.last()?.displayIndex
}
} else if (direction == Timeline.Direction.FORWARDS) {
builtEvents.first().displayIndex + 1
} else {
builtEvents.last().displayIndex - 1
}
}
private fun createTimelineChunk(chunkEntity: ChunkEntity?): TimelineChunk? {
if (chunkEntity == null) return null
return TimelineChunk(
chunkEntity = chunkEntity,
timelineSettings = timelineSettings,
roomId = roomId,
timelineId = timelineId,
eventDecryptor = eventDecryptor,
paginationTask = paginationTask,
fetchTokenAndPaginateTask = fetchTokenAndPaginateTask,
timelineEventMapper = timelineEventMapper,
uiEchoManager = uiEchoManager,
threadsAwarenessHandler = threadsAwarenessHandler,
initialEventId = null,
onBuiltEvents = this.onBuiltEvents
)
}
}
private fun RealmQuery<TimelineEventEntity>.offsets(
direction: Timeline.Direction,
count: Int,
startDisplayIndex: Int
): RealmQuery<TimelineEventEntity> {
sort(TimelineEventEntityFields.DISPLAY_INDEX, Sort.DESCENDING)
if (direction == Timeline.Direction.BACKWARDS) {
lessThanOrEqualTo(TimelineEventEntityFields.DISPLAY_INDEX, startDisplayIndex)
} else {
greaterThanOrEqualTo(TimelineEventEntityFields.DISPLAY_INDEX, startDisplayIndex)
}
return limit(count.toLong())
}
private fun Timeline.Direction.toPaginationDirection(): PaginationDirection {
return if (this == Timeline.Direction.BACKWARDS) PaginationDirection.BACKWARDS else PaginationDirection.FORWARDS
}
private fun ChunkEntity.sortedTimelineEvents(): RealmResults<TimelineEventEntity> {
return timelineEvents.sort(TimelineEventEntityFields.DISPLAY_INDEX, Sort.DESCENDING)
}

View file

@ -23,6 +23,9 @@ import javax.inject.Inject
@SessionScope @SessionScope
internal class TimelineInput @Inject constructor() { internal class TimelineInput @Inject constructor() {
val listeners = mutableSetOf<Listener>()
fun onLocalEchoCreated(roomId: String, timelineEvent: TimelineEvent) { fun onLocalEchoCreated(roomId: String, timelineEvent: TimelineEvent) {
listeners.toSet().forEach { it.onLocalEchoCreated(roomId, timelineEvent) } listeners.toSet().forEach { it.onLocalEchoCreated(roomId, timelineEvent) }
} }
@ -35,8 +38,6 @@ internal class TimelineInput @Inject constructor() {
listeners.toSet().forEach { it.onNewTimelineEvents(roomId, eventIds) } listeners.toSet().forEach { it.onNewTimelineEvents(roomId, eventIds) }
} }
val listeners = mutableSetOf<Listener>()
internal interface Listener { internal interface Listener {
fun onLocalEchoCreated(roomId: String, timelineEvent: TimelineEvent) = Unit fun onLocalEchoCreated(roomId: String, timelineEvent: TimelineEvent) = Unit
fun onLocalEchoUpdated(roomId: String, eventId: String, sendState: SendState) = Unit fun onLocalEchoUpdated(roomId: String, eventId: String, sendState: SendState) = Unit

View file

@ -25,94 +25,25 @@ import org.matrix.android.sdk.api.session.room.send.SendState
import org.matrix.android.sdk.internal.database.helper.addIfNecessary import org.matrix.android.sdk.internal.database.helper.addIfNecessary
import org.matrix.android.sdk.internal.database.helper.addStateEvent import org.matrix.android.sdk.internal.database.helper.addStateEvent
import org.matrix.android.sdk.internal.database.helper.addTimelineEvent import org.matrix.android.sdk.internal.database.helper.addTimelineEvent
import org.matrix.android.sdk.internal.database.helper.merge
import org.matrix.android.sdk.internal.database.mapper.toEntity import org.matrix.android.sdk.internal.database.mapper.toEntity
import org.matrix.android.sdk.internal.database.model.ChunkEntity import org.matrix.android.sdk.internal.database.model.ChunkEntity
import org.matrix.android.sdk.internal.database.model.EventInsertType import org.matrix.android.sdk.internal.database.model.EventInsertType
import org.matrix.android.sdk.internal.database.model.RoomEntity import org.matrix.android.sdk.internal.database.model.RoomEntity
import org.matrix.android.sdk.internal.database.model.RoomSummaryEntity import org.matrix.android.sdk.internal.database.model.TimelineEventEntity
import org.matrix.android.sdk.internal.database.model.deleteOnCascade
import org.matrix.android.sdk.internal.database.query.copyToRealmOrIgnore import org.matrix.android.sdk.internal.database.query.copyToRealmOrIgnore
import org.matrix.android.sdk.internal.database.query.create import org.matrix.android.sdk.internal.database.query.create
import org.matrix.android.sdk.internal.database.query.find import org.matrix.android.sdk.internal.database.query.find
import org.matrix.android.sdk.internal.database.query.findAllIncludingEvents
import org.matrix.android.sdk.internal.database.query.findLastForwardChunkOfRoom
import org.matrix.android.sdk.internal.database.query.getOrCreate
import org.matrix.android.sdk.internal.database.query.where import org.matrix.android.sdk.internal.database.query.where
import org.matrix.android.sdk.internal.di.SessionDatabase import org.matrix.android.sdk.internal.di.SessionDatabase
import org.matrix.android.sdk.internal.session.room.summary.RoomSummaryEventsHelper
import org.matrix.android.sdk.internal.util.awaitTransaction import org.matrix.android.sdk.internal.util.awaitTransaction
import timber.log.Timber import timber.log.Timber
import javax.inject.Inject import javax.inject.Inject
/** /**
* Insert Chunk in DB, and eventually merge with existing chunk event * Insert Chunk in DB, and eventually link next and previous chunk in db.
*/ */
internal class TokenChunkEventPersistor @Inject constructor(@SessionDatabase private val monarchy: Monarchy) { internal class TokenChunkEventPersistor @Inject constructor(@SessionDatabase private val monarchy: Monarchy) {
/**
* <pre>
* ========================================================================================================
* | Backward case |
* ========================================================================================================
*
* *--------------------------* *--------------------------*
* | startToken1 | | startToken1 |
* *--------------------------* *--------------------------*
* | | | |
* | | | |
* | receivedChunk backward | | |
* | Events | | |
* | | | |
* | | | |
* | | | |
* *--------------------------* *--------------------------* | |
* | startToken0 | | endToken1 | => | Merged chunk |
* *--------------------------* *--------------------------* | Events |
* | | | |
* | | | |
* | Current Chunk | | |
* | Events | | |
* | | | |
* | | | |
* | | | |
* *--------------------------* *--------------------------*
* | endToken0 | | endToken0 |
* *--------------------------* *--------------------------*
*
*
* ========================================================================================================
* | Forward case |
* ========================================================================================================
*
* *--------------------------* *--------------------------*
* | startToken0 | | startToken0 |
* *--------------------------* *--------------------------*
* | | | |
* | | | |
* | Current Chunk | | |
* | Events | | |
* | | | |
* | | | |
* | | | |
* *--------------------------* *--------------------------* | |
* | endToken0 | | startToken1 | => | Merged chunk |
* *--------------------------* *--------------------------* | Events |
* | | | |
* | | | |
* | receivedChunk forward | | |
* | Events | | |
* | | | |
* | | | |
* | | | |
* *--------------------------* *--------------------------*
* | endToken1 | | endToken1 |
* *--------------------------* *--------------------------*
*
* ========================================================================================================
* </pre>
*/
enum class Result { enum class Result {
SHOULD_FETCH_MORE, SHOULD_FETCH_MORE,
REACHED_END, REACHED_END,
@ -136,21 +67,21 @@ internal class TokenChunkEventPersistor @Inject constructor(@SessionDatabase pri
prevToken = receivedChunk.end prevToken = receivedChunk.end
} }
val existingChunk = ChunkEntity.find(realm, roomId, prevToken = prevToken, nextToken = nextToken)
if (existingChunk != null) {
Timber.v("This chunk is already in the db, returns")
return@awaitTransaction
}
val prevChunk = ChunkEntity.find(realm, roomId, nextToken = prevToken) val prevChunk = ChunkEntity.find(realm, roomId, nextToken = prevToken)
val nextChunk = ChunkEntity.find(realm, roomId, prevToken = nextToken) val nextChunk = ChunkEntity.find(realm, roomId, prevToken = nextToken)
val currentChunk = ChunkEntity.create(realm, prevToken = prevToken, nextToken = nextToken).apply {
// The current chunk is the one we will keep all along the merge processChanges. this.nextChunk = nextChunk
// We try to look for a chunk next to the token, this.prevChunk = prevChunk
// otherwise we create a whole new one which is unlinked (not live)
val currentChunk = if (direction == PaginationDirection.FORWARDS) {
prevChunk?.apply { this.nextToken = nextToken }
} else {
nextChunk?.apply { this.prevToken = prevToken }
} }
?: ChunkEntity.create(realm, prevToken, nextToken) nextChunk?.prevChunk = currentChunk
prevChunk?.nextChunk = currentChunk
if (receivedChunk.events.isNullOrEmpty() && !receivedChunk.hasMore()) { if (receivedChunk.events.isEmpty() && !receivedChunk.hasMore()) {
handleReachEnd(realm, roomId, direction, currentChunk) handleReachEnd(roomId, direction, currentChunk)
} else { } else {
handlePagination(realm, roomId, direction, receivedChunk, currentChunk) handlePagination(realm, roomId, direction, receivedChunk, currentChunk)
} }
@ -166,17 +97,10 @@ internal class TokenChunkEventPersistor @Inject constructor(@SessionDatabase pri
} }
} }
private fun handleReachEnd(realm: Realm, roomId: String, direction: PaginationDirection, currentChunk: ChunkEntity) { private fun handleReachEnd(roomId: String, direction: PaginationDirection, currentChunk: ChunkEntity) {
Timber.v("Reach end of $roomId") Timber.v("Reach end of $roomId")
if (direction == PaginationDirection.FORWARDS) { if (direction == PaginationDirection.FORWARDS) {
val currentLastForwardChunk = ChunkEntity.findLastForwardChunkOfRoom(realm, roomId) Timber.v("We should keep the lastForward chunk unique, the one from sync")
if (currentChunk != currentLastForwardChunk) {
currentChunk.isLastForward = true
currentLastForwardChunk?.deleteOnCascade(deleteStateEvents = false, canDeleteRoot = false)
RoomSummaryEntity.where(realm, roomId).findFirst()?.apply {
latestPreviewableEvent = RoomSummaryEventsHelper.getLatestPreviewableEvent(realm, roomId)
}
}
} else { } else {
currentChunk.isLastBackward = true currentChunk.isLastBackward = true
} }
@ -204,44 +128,50 @@ internal class TokenChunkEventPersistor @Inject constructor(@SessionDatabase pri
roomMemberContentsByUser[stateEvent.stateKey] = stateEvent.content.toModel<RoomMemberContent>() roomMemberContentsByUser[stateEvent.stateKey] = stateEvent.content.toModel<RoomMemberContent>()
} }
} }
val eventIds = ArrayList<String>(eventList.size) run processTimelineEvents@{
eventList.forEach { event -> eventList.forEach { event ->
if (event.eventId == null || event.senderId == null) { if (event.eventId == null || event.senderId == null) {
return@forEach return@forEach
}
val ageLocalTs = event.unsignedData?.age?.let { now - it }
eventIds.add(event.eventId)
val eventEntity = event.toEntity(roomId, SendState.SYNCED, ageLocalTs).copyToRealmOrIgnore(realm, EventInsertType.PAGINATION)
if (event.type == EventType.STATE_ROOM_MEMBER && event.stateKey != null) {
val contentToUse = if (direction == PaginationDirection.BACKWARDS) {
event.prevContent
} else {
event.content
} }
roomMemberContentsByUser[event.stateKey] = contentToUse.toModel<RoomMemberContent>() // We check for the timeline event with this id
val eventId = event.eventId
val existingTimelineEvent = TimelineEventEntity.where(realm, roomId, eventId).findFirst()
// If it exists, we want to stop here, just link the prevChunk
val existingChunk = existingTimelineEvent?.chunk?.firstOrNull()
if (existingChunk != null) {
when (direction) {
PaginationDirection.BACKWARDS -> {
if (currentChunk.nextChunk == existingChunk) {
Timber.w("Avoid double link, shouldn't happen in an ideal world")
} else {
currentChunk.prevChunk = existingChunk
existingChunk.nextChunk = currentChunk
}
}
PaginationDirection.FORWARDS -> {
if (currentChunk.prevChunk == existingChunk) {
Timber.w("Avoid double link, shouldn't happen in an ideal world")
} else {
currentChunk.nextChunk = existingChunk
existingChunk.prevChunk = currentChunk
}
}
}
// Stop processing here
return@processTimelineEvents
}
val ageLocalTs = event.unsignedData?.age?.let { now - it }
val eventEntity = event.toEntity(roomId, SendState.SYNCED, ageLocalTs).copyToRealmOrIgnore(realm, EventInsertType.PAGINATION)
if (event.type == EventType.STATE_ROOM_MEMBER && event.stateKey != null) {
val contentToUse = if (direction == PaginationDirection.BACKWARDS) {
event.prevContent
} else {
event.content
}
roomMemberContentsByUser[event.stateKey] = contentToUse.toModel<RoomMemberContent>()
}
currentChunk.addTimelineEvent(roomId, eventEntity, direction, roomMemberContentsByUser)
} }
currentChunk.addTimelineEvent(roomId, eventEntity, direction, roomMemberContentsByUser)
}
// Find all the chunks which contain at least one event from the list of eventIds
val chunks = ChunkEntity.findAllIncludingEvents(realm, eventIds)
Timber.d("Found ${chunks.size} chunks containing at least one of the eventIds")
val chunksToDelete = ArrayList<ChunkEntity>()
chunks.forEach {
if (it != currentChunk) {
Timber.d("Merge $it")
currentChunk.merge(roomId, it, direction)
chunksToDelete.add(it)
}
}
chunksToDelete.forEach {
it.deleteOnCascade(deleteStateEvents = false, canDeleteRoot = false)
}
val roomSummaryEntity = RoomSummaryEntity.getOrCreate(realm, roomId)
val shouldUpdateSummary = roomSummaryEntity.latestPreviewableEvent == null ||
(chunksToDelete.isNotEmpty() && currentChunk.isLastForward && direction == PaginationDirection.FORWARDS)
if (shouldUpdateSummary) {
roomSummaryEntity.latestPreviewableEvent = RoomSummaryEventsHelper.getLatestPreviewableEvent(realm, roomId)
} }
if (currentChunk.isValid) { if (currentChunk.isValid) {
RoomEntity.where(realm, roomId).findFirst()?.addIfNecessary(currentChunk) RoomEntity.where(realm, roomId).findFirst()?.addIfNecessary(currentChunk)

View file

@ -24,14 +24,10 @@ import org.matrix.android.sdk.api.session.room.model.ReactionAggregatedSummary
import org.matrix.android.sdk.api.session.room.model.relation.ReactionContent import org.matrix.android.sdk.api.session.room.model.relation.ReactionContent
import org.matrix.android.sdk.api.session.room.send.SendState import org.matrix.android.sdk.api.session.room.send.SendState
import org.matrix.android.sdk.api.session.room.timeline.TimelineEvent import org.matrix.android.sdk.api.session.room.timeline.TimelineEvent
import org.matrix.android.sdk.api.session.room.timeline.TimelineSettings
import timber.log.Timber import timber.log.Timber
import java.util.Collections import java.util.Collections
internal class UIEchoManager( internal class UIEchoManager(private val listener: Listener) {
private val settings: TimelineSettings,
private val listener: Listener
) {
interface Listener { interface Listener {
fun rebuildEvent(eventId: String, builder: (TimelineEvent) -> TimelineEvent?): Boolean fun rebuildEvent(eventId: String, builder: (TimelineEvent) -> TimelineEvent?): Boolean
@ -70,13 +66,12 @@ internal class UIEchoManager(
return existingState != sendState return existingState != sendState
} }
fun onLocalEchoCreated(timelineEvent: TimelineEvent) { fun onLocalEchoCreated(timelineEvent: TimelineEvent): Boolean {
// Manage some ui echos (do it before filter because actual event could be filtered out)
when (timelineEvent.root.getClearType()) { when (timelineEvent.root.getClearType()) {
EventType.REDACTION -> { EventType.REDACTION -> {
} }
EventType.REACTION -> { EventType.REACTION -> {
val content = timelineEvent.root.content?.toModel<ReactionContent>() val content: ReactionContent? = timelineEvent.root.content?.toModel<ReactionContent>()
if (RelationType.ANNOTATION == content?.relatesTo?.type) { if (RelationType.ANNOTATION == content?.relatesTo?.type) {
val reaction = content.relatesTo.key val reaction = content.relatesTo.key
val relatedEventID = content.relatesTo.eventId val relatedEventID = content.relatesTo.eventId
@ -96,11 +91,12 @@ internal class UIEchoManager(
} }
Timber.v("On local echo created: ${timelineEvent.eventId}") Timber.v("On local echo created: ${timelineEvent.eventId}")
inMemorySendingEvents.add(0, timelineEvent) inMemorySendingEvents.add(0, timelineEvent)
return true
} }
fun decorateEventWithReactionUiEcho(timelineEvent: TimelineEvent): TimelineEvent? { fun decorateEventWithReactionUiEcho(timelineEvent: TimelineEvent): TimelineEvent {
val relatedEventID = timelineEvent.eventId val relatedEventID = timelineEvent.eventId
val contents = inMemoryReactions[relatedEventID] ?: return null val contents = inMemoryReactions[relatedEventID] ?: return timelineEvent
var existingAnnotationSummary = timelineEvent.annotations ?: EventAnnotationsSummary( var existingAnnotationSummary = timelineEvent.annotations ?: EventAnnotationsSummary(
relatedEventID relatedEventID

View file

@ -345,15 +345,17 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
syncLocalTimestampMillis: Long, syncLocalTimestampMillis: Long,
aggregator: SyncResponsePostTreatmentAggregator): ChunkEntity { aggregator: SyncResponsePostTreatmentAggregator): ChunkEntity {
val lastChunk = ChunkEntity.findLastForwardChunkOfRoom(realm, roomEntity.roomId) val lastChunk = ChunkEntity.findLastForwardChunkOfRoom(realm, roomEntity.roomId)
if (isLimited && lastChunk != null) {
lastChunk.deleteOnCascade(deleteStateEvents = true, canDeleteRoot = true)
}
val chunkEntity = if (!isLimited && lastChunk != null) { val chunkEntity = if (!isLimited && lastChunk != null) {
lastChunk lastChunk
} else { } else {
realm.createObject<ChunkEntity>().apply { this.prevToken = prevToken } realm.createObject<ChunkEntity>().apply {
this.prevToken = prevToken
this.isLastForward = true
}
} }
// Only one chunk has isLastForward set to true
lastChunk?.isLastForward = false
chunkEntity.isLastForward = true
val eventIds = ArrayList<String>(eventList.size) val eventIds = ArrayList<String>(eventList.size)
val roomMemberContentsByUser = HashMap<String, RoomMemberContent?>() val roomMemberContentsByUser = HashMap<String, RoomMemberContent?>()

View file

@ -17,9 +17,6 @@
package im.vector.app.core.epoxy package im.vector.app.core.epoxy
import androidx.annotation.CallSuper import androidx.annotation.CallSuper
import androidx.lifecycle.Lifecycle
import androidx.lifecycle.LifecycleOwner
import androidx.lifecycle.LifecycleRegistry
import com.airbnb.epoxy.EpoxyModelWithHolder import com.airbnb.epoxy.EpoxyModelWithHolder
import com.airbnb.epoxy.VisibilityState import com.airbnb.epoxy.VisibilityState
import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.CoroutineScope
@ -30,24 +27,19 @@ import kotlinx.coroutines.cancelChildren
/** /**
* EpoxyModelWithHolder which can listen to visibility state change * EpoxyModelWithHolder which can listen to visibility state change
*/ */
abstract class VectorEpoxyModel<H : VectorEpoxyHolder> : EpoxyModelWithHolder<H>(), LifecycleOwner { abstract class VectorEpoxyModel<H : VectorEpoxyHolder> : EpoxyModelWithHolder<H>() {
protected val coroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.Main) protected val coroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.Main)
private val lifecycleRegistry: LifecycleRegistry = LifecycleRegistry(this)
override fun getLifecycle() = lifecycleRegistry
private var onModelVisibilityStateChangedListener: OnVisibilityStateChangedListener? = null private var onModelVisibilityStateChangedListener: OnVisibilityStateChangedListener? = null
@CallSuper @CallSuper
override fun bind(holder: H) { override fun bind(holder: H) {
super.bind(holder) super.bind(holder)
lifecycleRegistry.currentState = Lifecycle.State.STARTED
} }
@CallSuper @CallSuper
override fun unbind(holder: H) { override fun unbind(holder: H) {
lifecycleRegistry.currentState = Lifecycle.State.DESTROYED
coroutineScope.coroutineContext.cancelChildren() coroutineScope.coroutineContext.cancelChildren()
super.unbind(holder) super.unbind(holder)
} }

View file

@ -184,11 +184,13 @@ import im.vector.app.features.widgets.WidgetActivity
import im.vector.app.features.widgets.WidgetArgs import im.vector.app.features.widgets.WidgetArgs
import im.vector.app.features.widgets.WidgetKind import im.vector.app.features.widgets.WidgetKind
import im.vector.app.features.widgets.permissions.RoomWidgetPermissionBottomSheet import im.vector.app.features.widgets.permissions.RoomWidgetPermissionBottomSheet
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.debounce import kotlinx.coroutines.flow.debounce
import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import kotlinx.parcelize.Parcelize import kotlinx.parcelize.Parcelize
import nl.dionsegijn.konfetti.models.Shape import nl.dionsegijn.konfetti.models.Shape
import nl.dionsegijn.konfetti.models.Size import nl.dionsegijn.konfetti.models.Size
@ -1307,27 +1309,28 @@ class RoomDetailFragment @Inject constructor(
private fun updateJumpToReadMarkerViewVisibility() { private fun updateJumpToReadMarkerViewVisibility() {
viewLifecycleOwner.lifecycleScope.launchWhenResumed { viewLifecycleOwner.lifecycleScope.launchWhenResumed {
withState(roomDetailViewModel) { val state = roomDetailViewModel.awaitState()
val showJumpToUnreadBanner = when (it.unreadState) { val showJumpToUnreadBanner = when (state.unreadState) {
UnreadState.Unknown, UnreadState.Unknown,
UnreadState.HasNoUnread -> false UnreadState.HasNoUnread -> false
is UnreadState.ReadMarkerNotLoaded -> true is UnreadState.ReadMarkerNotLoaded -> true
is UnreadState.HasUnread -> { is UnreadState.HasUnread -> {
if (it.canShowJumpToReadMarker) { if (state.canShowJumpToReadMarker) {
val lastVisibleItem = layoutManager.findLastCompletelyVisibleItemPosition() val lastVisibleItem = layoutManager.findLastCompletelyVisibleItemPosition()
val positionOfReadMarker = timelineEventController.getPositionOfReadMarker() val positionOfReadMarker = withContext(Dispatchers.Default) {
if (positionOfReadMarker == null) { timelineEventController.getPositionOfReadMarker()
false
} else {
positionOfReadMarker > lastVisibleItem
}
} else {
false
} }
if (positionOfReadMarker == null) {
false
} else {
positionOfReadMarker > lastVisibleItem
}
} else {
false
} }
} }
views.jumpToReadMarkerView.isVisible = showJumpToUnreadBanner
} }
views.jumpToReadMarkerView.isVisible = showJumpToUnreadBanner
} }
} }

View file

@ -160,6 +160,7 @@ class RoomDetailViewModel @AssistedInject constructor(
observeMyRoomMember() observeMyRoomMember()
observeActiveRoomWidgets() observeActiveRoomWidgets()
observePowerLevel() observePowerLevel()
setupPreviewUrlObservers()
room.getRoomSummaryLive() room.getRoomSummaryLive()
viewModelScope.launch(Dispatchers.IO) { viewModelScope.launch(Dispatchers.IO) {
tryOrNull { room.markAsRead(ReadService.MarkAsReadParams.READ_RECEIPT) } tryOrNull { room.markAsRead(ReadService.MarkAsReadParams.READ_RECEIPT) }
@ -263,6 +264,30 @@ class RoomDetailViewModel @AssistedInject constructor(
} }
} }
private fun setupPreviewUrlObservers() {
if (!vectorPreferences.showUrlPreviews()) {
return
}
combine(
timelineEvents,
room.flow().liveRoomSummary()
.unwrap()
.map { it.isEncrypted }
.distinctUntilChanged()
) { snapshot, isRoomEncrypted ->
if (isRoomEncrypted) {
return@combine
}
withContext(Dispatchers.Default) {
Timber.v("On new timeline events for urlpreview on ${Thread.currentThread()}")
snapshot.forEach {
previewUrlRetriever.getPreviewUrl(it)
}
}
}
.launchIn(viewModelScope)
}
fun getOtherUserIds() = room.roomSummary()?.otherMemberIds fun getOtherUserIds() = room.roomSummary()?.otherMemberIds
override fun handle(action: RoomDetailAction) { override fun handle(action: RoomDetailAction) {
@ -720,7 +745,6 @@ class RoomDetailViewModel @AssistedInject constructor(
} }
private fun handleNavigateToEvent(action: RoomDetailAction.NavigateToEvent) { private fun handleNavigateToEvent(action: RoomDetailAction.NavigateToEvent) {
stopTrackingUnreadMessages()
val targetEventId: String = action.eventId val targetEventId: String = action.eventId
val indexOfEvent = timeline.getIndexOfEvent(targetEventId) val indexOfEvent = timeline.getIndexOfEvent(targetEventId)
if (indexOfEvent == null) { if (indexOfEvent == null) {
@ -796,12 +820,12 @@ class RoomDetailViewModel @AssistedInject constructor(
.chunk(1000) .chunk(1000)
.filter { it.isNotEmpty() } .filter { it.isNotEmpty() }
.onEach { actions -> .onEach { actions ->
val bufferedMostRecentDisplayedEvent = actions.maxByOrNull { it.event.displayIndex }?.event ?: return@onEach val bufferedMostRecentDisplayedEvent = actions.minByOrNull { it.event.indexOfEvent() }?.event ?: return@onEach
val globalMostRecentDisplayedEvent = mostRecentDisplayedEvent val globalMostRecentDisplayedEvent = mostRecentDisplayedEvent
if (trackUnreadMessages.get()) { if (trackUnreadMessages.get()) {
if (globalMostRecentDisplayedEvent == null) { if (globalMostRecentDisplayedEvent == null) {
mostRecentDisplayedEvent = bufferedMostRecentDisplayedEvent mostRecentDisplayedEvent = bufferedMostRecentDisplayedEvent
} else if (bufferedMostRecentDisplayedEvent.displayIndex > globalMostRecentDisplayedEvent.displayIndex) { } else if (bufferedMostRecentDisplayedEvent.indexOfEvent() < globalMostRecentDisplayedEvent.indexOfEvent()) {
mostRecentDisplayedEvent = bufferedMostRecentDisplayedEvent mostRecentDisplayedEvent = bufferedMostRecentDisplayedEvent
} }
} }
@ -815,6 +839,12 @@ class RoomDetailViewModel @AssistedInject constructor(
.launchIn(viewModelScope) .launchIn(viewModelScope)
} }
/**
* Returns the index of event in the timeline.
* Returns Int.MAX_VALUE if not found
*/
private fun TimelineEvent.indexOfEvent(): Int = timeline.getIndexOfEvent(eventId) ?: Int.MAX_VALUE
private fun handleMarkAllAsRead() { private fun handleMarkAllAsRead() {
setState { copy(unreadState = UnreadState.HasNoUnread) } setState { copy(unreadState = UnreadState.HasNoUnread) }
viewModelScope.launch { viewModelScope.launch {
@ -1037,16 +1067,6 @@ class RoomDetailViewModel @AssistedInject constructor(
// tryEmit doesn't work with SharedFlow without cache // tryEmit doesn't work with SharedFlow without cache
timelineEvents.emit(snapshot) timelineEvents.emit(snapshot)
} }
// PreviewUrl
if (vectorPreferences.showUrlPreviews()) {
withState { state ->
snapshot
.takeIf { state.asyncRoomSummary.invoke()?.isEncrypted == false }
?.forEach {
previewUrlRetriever.getPreviewUrl(it)
}
}
}
} }
override fun onTimelineFailure(throwable: Throwable) { override fun onTimelineFailure(throwable: Throwable) {

View file

@ -70,7 +70,9 @@ import org.matrix.android.sdk.api.session.room.model.message.MessageImageInfoCon
import org.matrix.android.sdk.api.session.room.model.message.MessageVideoContent import org.matrix.android.sdk.api.session.room.model.message.MessageVideoContent
import org.matrix.android.sdk.api.session.room.timeline.Timeline import org.matrix.android.sdk.api.session.room.timeline.Timeline
import org.matrix.android.sdk.api.session.room.timeline.TimelineEvent import org.matrix.android.sdk.api.session.room.timeline.TimelineEvent
import timber.log.Timber
import javax.inject.Inject import javax.inject.Inject
import kotlin.system.measureTimeMillis
class TimelineEventController @Inject constructor(private val dateFormatter: VectorDateFormatter, class TimelineEventController @Inject constructor(private val dateFormatter: VectorDateFormatter,
private val vectorPreferences: VectorPreferences, private val vectorPreferences: VectorPreferences,
@ -244,22 +246,11 @@ class TimelineEventController @Inject constructor(private val dateFormatter: Vec
interceptorHelper.intercept(models, partialState.unreadState, timeline, callback) interceptorHelper.intercept(models, partialState.unreadState, timeline, callback)
} }
fun update(viewState: RoomDetailViewState) = backgroundHandler.post { fun update(viewState: RoomDetailViewState) {
synchronized(modelCache) { val newPartialState = PartialState(viewState)
val newPartialState = PartialState(viewState) if (newPartialState != partialState) {
if (partialState.highlightedEventId != newPartialState.highlightedEventId) { partialState = newPartialState
// Clear cache to force a refresh requestModelBuild()
for (i in 0 until modelCache.size) {
if (modelCache[i]?.eventId == viewState.highlightedEventId ||
modelCache[i]?.eventId == partialState.highlightedEventId) {
modelCache[i] = null
}
}
}
if (newPartialState != partialState) {
partialState = newPartialState
requestModelBuild()
}
} }
} }
@ -310,14 +301,6 @@ class TimelineEventController @Inject constructor(private val dateFormatter: Vec
submitSnapshot(snapshot) submitSnapshot(snapshot)
} }
override fun onTimelineFailure(throwable: Throwable) {
// no-op, already handled
}
override fun onNewTimelineEvents(eventIds: List<String>) {
// no-op, already handled
}
private fun submitSnapshot(newSnapshot: List<TimelineEvent>) { private fun submitSnapshot(newSnapshot: List<TimelineEvent>) {
backgroundHandler.post { backgroundHandler.post {
inSubmitList = true inSubmitList = true
@ -325,7 +308,7 @@ class TimelineEventController @Inject constructor(private val dateFormatter: Vec
currentSnapshot = newSnapshot currentSnapshot = newSnapshot
val diffResult = DiffUtil.calculateDiff(diffCallback) val diffResult = DiffUtil.calculateDiff(diffCallback)
diffResult.dispatchUpdatesTo(listUpdateCallback) diffResult.dispatchUpdatesTo(listUpdateCallback)
requestModelBuild() requestDelayedModelBuild(0)
inSubmitList = false inSubmitList = false
} }
} }
@ -335,7 +318,10 @@ class TimelineEventController @Inject constructor(private val dateFormatter: Vec
} }
private fun getModels(): List<EpoxyModel<*>> { private fun getModels(): List<EpoxyModel<*>> {
buildCacheItemsIfNeeded() val timeForBuilding = measureTimeMillis {
buildCacheItemsIfNeeded()
}
Timber.v("Time for building cache items: $timeForBuilding ms")
return modelCache return modelCache
.map { cacheItemData -> .map { cacheItemData ->
val eventModel = if (cacheItemData == null || mergedHeaderItemFactory.isCollapsed(cacheItemData.localId)) { val eventModel = if (cacheItemData == null || mergedHeaderItemFactory.isCollapsed(cacheItemData.localId)) {
@ -360,7 +346,11 @@ class TimelineEventController @Inject constructor(private val dateFormatter: Vec
if (modelCache.isEmpty()) { if (modelCache.isEmpty()) {
return return
} }
preprocessReverseEvents() val preprocessEventsTiming = measureTimeMillis {
preprocessReverseEvents()
}
Timber.v("Preprocess events took $preprocessEventsTiming ms")
var numberOfEventsToBuild = 0
val lastSentEventWithoutReadReceipts = searchLastSentEventWithoutReadReceipts(receiptsByEvent) val lastSentEventWithoutReadReceipts = searchLastSentEventWithoutReadReceipts(receiptsByEvent)
(0 until modelCache.size).forEach { position -> (0 until modelCache.size).forEach { position ->
val event = currentSnapshot[position] val event = currentSnapshot[position]
@ -370,7 +360,7 @@ class TimelineEventController @Inject constructor(private val dateFormatter: Vec
timelineEventVisibilityHelper.shouldShowEvent(it, partialState.highlightedEventId) timelineEventVisibilityHelper.shouldShowEvent(it, partialState.highlightedEventId)
} }
// Should be build if not cached or if model should be refreshed // Should be build if not cached or if model should be refreshed
if (modelCache[position] == null || modelCache[position]?.isCacheable == false) { if (modelCache[position] == null || modelCache[position]?.isCacheable(partialState) == false) {
val timelineEventsGroup = timelineEventsGroups.getOrNull(event) val timelineEventsGroup = timelineEventsGroups.getOrNull(event)
val params = TimelineItemFactoryParams( val params = TimelineItemFactoryParams(
event = event, event = event,
@ -383,11 +373,13 @@ class TimelineEventController @Inject constructor(private val dateFormatter: Vec
eventsGroup = timelineEventsGroup eventsGroup = timelineEventsGroup
) )
modelCache[position] = buildCacheItem(params) modelCache[position] = buildCacheItem(params)
numberOfEventsToBuild++
} }
val itemCachedData = modelCache[position] ?: return@forEach val itemCachedData = modelCache[position] ?: return@forEach
// Then update with additional models if needed // Then update with additional models if needed
modelCache[position] = itemCachedData.enrichWithModels(event, nextEvent, position, receiptsByEvent) modelCache[position] = itemCachedData.enrichWithModels(event, nextEvent, position, receiptsByEvent)
} }
Timber.v("Number of events to rebuild: $numberOfEventsToBuild on ${modelCache.size} total events")
} }
private fun buildCacheItem(params: TimelineItemFactoryParams): CacheItemData { private fun buildCacheItem(params: TimelineItemFactoryParams): CacheItemData {
@ -400,7 +392,7 @@ class TimelineEventController @Inject constructor(private val dateFormatter: Vec
it.id(event.localId) it.id(event.localId)
it.setOnVisibilityStateChanged(TimelineEventVisibilityStateChangedListener(callback, event)) it.setOnVisibilityStateChanged(TimelineEventVisibilityStateChangedListener(callback, event))
} }
val isCacheable = eventModel is ItemWithEvents && eventModel.isCacheable() val isCacheable = (eventModel !is ItemWithEvents || eventModel.isCacheable()) && !params.isHighlighted
return CacheItemData( return CacheItemData(
localId = event.localId, localId = event.localId,
eventId = event.root.eventId, eventId = event.root.eventId,
@ -552,6 +544,10 @@ class TimelineEventController @Inject constructor(private val dateFormatter: Vec
val eventModel: EpoxyModel<*>? = null, val eventModel: EpoxyModel<*>? = null,
val mergedHeaderModel: BasedMergedItem<*>? = null, val mergedHeaderModel: BasedMergedItem<*>? = null,
val formattedDayModel: DaySeparatorItem? = null, val formattedDayModel: DaySeparatorItem? = null,
val isCacheable: Boolean = true private val isCacheable: Boolean = true
) ) {
fun isCacheable(partialState: PartialState): Boolean {
return isCacheable && partialState.highlightedEventId != eventId
}
}
} }

View file

@ -133,28 +133,10 @@ class MergedTimelines(
secondaryTimeline.paginate(direction, count) secondaryTimeline.paginate(direction, count)
} }
override fun pendingEventCount(): Int {
return mainTimeline.pendingEventCount() + secondaryTimeline.pendingEventCount()
}
override fun failedToDeliverEventCount(): Int {
return mainTimeline.pendingEventCount() + secondaryTimeline.pendingEventCount()
}
override fun getTimelineEventAtIndex(index: Int): TimelineEvent? {
return mergedEvents.getOrNull(index)
}
override fun getIndexOfEvent(eventId: String?): Int? { override fun getIndexOfEvent(eventId: String?): Int? {
return positionsMapping[eventId] return positionsMapping[eventId]
} }
override fun getTimelineEventWithId(eventId: String?): TimelineEvent? {
return positionsMapping[eventId]?.let {
getTimelineEventAtIndex(it)
}
}
private fun processTimelineUpdates(isInit: KMutableProperty0<Boolean>, eventsRef: MutableList<TimelineEvent>, newData: List<TimelineEvent>) { private fun processTimelineUpdates(isInit: KMutableProperty0<Boolean>, eventsRef: MutableList<TimelineEvent>, newData: List<TimelineEvent>) {
coroutineScope.launch(Dispatchers.Default) { coroutineScope.launch(Dispatchers.Default) {
processingSemaphore.withPermit { processingSemaphore.withPermit {