Timeline rework : initial commit - to amend.

This commit is contained in:
ganfra 2019-03-13 22:30:05 +01:00
parent 388eae6a1c
commit 820709d433
16 changed files with 307 additions and 48 deletions

View file

@ -0,0 +1,55 @@
/*
*
* * Copyright 2019 New Vector Ltd
* *
* * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
* * You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/
package im.vector.matrix.android.api.session.room.timeline
interface Timeline {
fun paginate(direction: Direction, count: Int)
fun addListener(listener: Listener)
fun removeListener(listener: Listener)
fun removeAllListeners()
interface Listener {
}
enum class Direction(val value: String) {
/**
* Forwards when the event is added to the end of the timeline.
* These events come from the /sync stream or from forwards pagination.
*/
FORWARDS("f"),
/**
* Backwards when the event is added to the start of the timeline.
* These events come from a back pagination.
*/
BACKWARDS("b");
fun reversed(): Direction {
return when (this) {
FORWARDS -> BACKWARDS
BACKWARDS -> FORWARDS
}
}
}
}

View file

@ -32,4 +32,6 @@ interface TimelineService {
*/
fun timeline(eventId: String? = null): LiveData<TimelineData>
fun createTimeline(eventId: String?): Timeline
}

View file

@ -42,6 +42,7 @@ internal abstract class RealmLiveEntityObserver<T : RealmObject>(protected val m
queryResults.addChangeListener { t, changeSet ->
onChanged(t, changeSet)
}
processInitialResults(queryResults)
results = AtomicReference(queryResults)
}
}
@ -55,18 +56,22 @@ internal abstract class RealmLiveEntityObserver<T : RealmObject>(protected val m
}
}
// PRIVATE
private fun onChanged(realmResults: RealmResults<T>, changeSet: OrderedCollectionChangeSet) {
protected open fun onChanged(realmResults: RealmResults<T>, changeSet: OrderedCollectionChangeSet) {
val insertionIndexes = changeSet.insertions
val updateIndexes = changeSet.changes
val deletionIndexes = changeSet.deletions
val inserted = realmResults.filterIndexed { index, _ -> insertionIndexes.contains(index) }
val updated = realmResults.filterIndexed { index, _ -> updateIndexes.contains(index) }
val deleted = realmResults.filterIndexed { index, _ -> deletionIndexes.contains(index) }
process(inserted, updated, deleted)
processChanges(inserted, updated, deleted)
}
abstract fun process(inserted: List<T>, updated: List<T>, deleted: List<T>)
protected open fun processInitialResults(results: RealmResults<T>) {
// no-op
}
protected open fun processChanges(inserted: List<T>, updated: List<T>, deleted: List<T>) {
//no-op
}
}

View file

@ -20,7 +20,6 @@ import im.vector.matrix.android.api.session.events.model.Event
import im.vector.matrix.android.api.session.events.model.EventType
import im.vector.matrix.android.internal.database.mapper.asDomain
import im.vector.matrix.android.internal.database.mapper.toEntity
import im.vector.matrix.android.internal.database.mapper.updateWith
import im.vector.matrix.android.internal.database.model.ChunkEntity
import im.vector.matrix.android.internal.database.model.EventEntity
import im.vector.matrix.android.internal.database.model.EventEntityFields
@ -76,10 +75,6 @@ internal fun ChunkEntity.addAll(roomId: String,
}
}
internal fun ChunkEntity.updateDisplayIndexes() {
events.forEachIndexed { index, eventEntity -> eventEntity.displayIndex = index }
}
internal fun ChunkEntity.add(roomId: String,
event: Event,
direction: PaginationDirection,
@ -90,6 +85,12 @@ internal fun ChunkEntity.add(roomId: String,
if (event.eventId.isNullOrEmpty() || events.fastContains(event.eventId)) {
return
}
var currentDisplayIndex = lastDisplayIndex(direction, 0)
if (direction == PaginationDirection.FORWARDS) {
currentDisplayIndex += 1
} else {
currentDisplayIndex -= 1
}
var currentStateIndex = lastStateIndex(direction, defaultValue = stateIndexOffset)
if (direction == PaginationDirection.FORWARDS && EventType.isStateEvent(event.type)) {
currentStateIndex += 1
@ -99,10 +100,13 @@ internal fun ChunkEntity.add(roomId: String,
currentStateIndex -= 1
}
}
val eventEntity = event.toEntity(roomId)
eventEntity.updateWith(currentStateIndex, isUnlinked)
val position = if (direction == PaginationDirection.FORWARDS) 0 else this.events.size
events.add(position, eventEntity)
val eventEntity = event.toEntity(roomId).apply {
this.stateIndex = currentStateIndex
this.isUnlinked = isUnlinked
this.displayIndex = currentDisplayIndex
}
// We are not using the order of the list, but will be sorting with displayIndex field
events.add(eventEntity)
}
private fun ChunkEntity.assertIsManaged() {
@ -111,6 +115,13 @@ private fun ChunkEntity.assertIsManaged() {
}
}
internal fun ChunkEntity.lastDisplayIndex(direction: PaginationDirection, defaultValue: Int = 0): Int {
return when (direction) {
PaginationDirection.FORWARDS -> events.where().sort(EventEntityFields.DISPLAY_INDEX, Sort.DESCENDING).findFirst()?.displayIndex
PaginationDirection.BACKWARDS -> events.where().sort(EventEntityFields.DISPLAY_INDEX, Sort.ASCENDING).findFirst()?.displayIndex
} ?: defaultValue
}
internal fun ChunkEntity.lastStateIndex(direction: PaginationDirection, defaultValue: Int = 0): Int {
return when (direction) {
PaginationDirection.FORWARDS -> events.where().sort(EventEntityFields.STATE_INDEX, Sort.DESCENDING).findFirst()?.stateIndex

View file

@ -18,7 +18,6 @@ package im.vector.matrix.android.internal.database.helper
import im.vector.matrix.android.api.session.events.model.Event
import im.vector.matrix.android.internal.database.mapper.toEntity
import im.vector.matrix.android.internal.database.mapper.updateWith
import im.vector.matrix.android.internal.database.model.ChunkEntity
import im.vector.matrix.android.internal.database.model.RoomEntity
import im.vector.matrix.android.internal.database.query.fastContains
@ -30,7 +29,6 @@ internal fun RoomEntity.deleteOnCascade(chunkEntity: ChunkEntity) {
}
internal fun RoomEntity.addOrUpdate(chunkEntity: ChunkEntity) {
chunkEntity.updateDisplayIndexes()
if (!chunks.contains(chunkEntity)) {
chunks.add(chunkEntity)
}
@ -47,8 +45,10 @@ internal fun RoomEntity.addStateEvents(stateEvents: List<Event>,
if (event.eventId == null || (filterDuplicates && fastContains(event.eventId))) {
return@forEach
}
val eventEntity = event.toEntity(roomId)
eventEntity.updateWith(stateIndex, isUnlinked)
val eventEntity = event.toEntity(roomId).apply {
this.stateIndex = stateIndex
this.isUnlinked = isUnlinked
}
untimelinedStateEvents.add(eventEntity)
}
}

View file

@ -57,11 +57,6 @@ internal object EventMapper {
}
internal fun EventEntity.updateWith(stateIndex: Int, isUnlinked: Boolean) {
this.stateIndex = stateIndex
this.isUnlinked = isUnlinked
}
internal fun EventEntity.asDomain(): Event {
return EventMapper.map(this)
}

View file

@ -59,7 +59,7 @@ internal fun EventEntity.Companion.latestEvent(realm: Realm,
query?.not()?.`in`(EventEntityFields.TYPE, excludedTypes.toTypedArray())
}
return query
?.sort(EventEntityFields.DISPLAY_INDEX)
?.sort(EventEntityFields.DISPLAY_INDEX, Sort.DESCENDING)
?.findFirst()
}

View file

@ -23,6 +23,7 @@ import androidx.work.OneTimeWorkRequestBuilder
import androidx.work.WorkManager
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.internal.database.RealmLiveEntityObserver
import im.vector.matrix.android.internal.database.model.EventEntity
import im.vector.matrix.android.internal.database.model.GroupEntity
import im.vector.matrix.android.internal.database.query.where
import im.vector.matrix.android.internal.util.WorkerParamsFactory
@ -38,7 +39,7 @@ internal class GroupSummaryUpdater(monarchy: Monarchy
.setRequiredNetworkType(NetworkType.CONNECTED)
.build()
override fun process(inserted: List<GroupEntity>, updated: List<GroupEntity>, deleted: List<GroupEntity>) {
override fun processChanges(inserted: List<GroupEntity>, updated: List<GroupEntity>, deleted: List<GroupEntity>) {
val newGroupIds = inserted.map { it.groupId }
val getGroupDataWorkerParams = GetGroupDataWorker.Params(newGroupIds)
val workData = WorkerParamsFactory.toData(getGroupDataWorkerParams)

View file

@ -44,7 +44,7 @@ internal class RoomSummaryUpdater(monarchy: Monarchy,
override val query = Monarchy.Query<RoomEntity> { RoomEntity.where(it) }
override fun process(inserted: List<RoomEntity>, updated: List<RoomEntity>, deleted: List<RoomEntity>) {
override fun processChanges(inserted: List<RoomEntity>, updated: List<RoomEntity>, deleted: List<RoomEntity>) {
val rooms = (inserted + updated).map { it.roomId }
monarchy.writeAsync { realm ->
rooms.forEach { updateRoom(realm, it) }

View file

@ -34,7 +34,7 @@ internal class EventsPruner(monarchy: Monarchy) :
override val query = Monarchy.Query<EventEntity> { EventEntity.where(it, type = EventType.REDACTION) }
override fun process(inserted: List<EventEntity>, updated: List<EventEntity>, deleted: List<EventEntity>) {
override fun processChanges(inserted: List<EventEntity>, updated: List<EventEntity>, deleted: List<EventEntity>) {
val redactionEvents = inserted.map { it.asDomain() }
val pruneEventWorkerParams = PruneEventWorker.Params(redactionEvents)
val workData = WorkerParamsFactory.toData(pruneEventWorkerParams)

View file

@ -91,9 +91,10 @@ internal class DefaultSetReadMarkersTask(private val roomAPI: RoomAPI,
val liveChunk = ChunkEntity.findLastLiveChunkFromRoom(it, roomId)
?: return@doWithRealm
val readReceiptIndex = liveChunk.events.find(readReceipt.eventId)?.displayIndex
?: Int.MIN_VALUE
val eventToCheckIndex = liveChunk.events.find(eventId)?.displayIndex
?: Int.MAX_VALUE
val eventToCheckIndex = liveChunk.events.find(eventId)?.displayIndex ?: -1
isEventRead = eventToCheckIndex >= readReceiptIndex
isEventRead = eventToCheckIndex <= readReceiptIndex
}
return isEventRead
}

View file

@ -23,7 +23,6 @@ import im.vector.matrix.android.api.session.events.model.Event
import im.vector.matrix.android.api.session.room.send.SendService
import im.vector.matrix.android.api.util.Cancelable
import im.vector.matrix.android.internal.database.helper.add
import im.vector.matrix.android.internal.database.helper.updateDisplayIndexes
import im.vector.matrix.android.internal.database.model.ChunkEntity
import im.vector.matrix.android.internal.database.query.findLastLiveChunkFromRoom
import im.vector.matrix.android.internal.session.room.timeline.PaginationDirection
@ -49,7 +48,6 @@ internal class DefaultSendService(private val roomId: String,
val chunkEntity = ChunkEntity.findLastLiveChunkFromRoom(realm, roomId)
?: return@tryTransactionAsync
chunkEntity.add(roomId, event, PaginationDirection.FORWARDS)
chunkEntity.updateDisplayIndexes()
}
val sendContentWorkerParams = SendEventWorker.Params(roomId, event)

View file

@ -0,0 +1,189 @@
/*
*
* * Copyright 2019 New Vector Ltd
* *
* * Licensed under the Apache License, Version 2.0 (the "License");
* * you may not use this file except in compliance with the License.
* * You may obtain a copy of the License at
* *
* * http://www.apache.org/licenses/LICENSE-2.0
* *
* * Unless required by applicable law or agreed to in writing, software
* * distributed under the License is distributed on an "AS IS" BASIS,
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* * See the License for the specific language governing permissions and
* * limitations under the License.
*
*/
package im.vector.matrix.android.internal.session.room.timeline
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.api.session.room.timeline.Timeline
import im.vector.matrix.android.api.session.room.timeline.TimelineEvent
import im.vector.matrix.android.internal.database.RealmLiveEntityObserver
import im.vector.matrix.android.internal.database.mapper.asDomain
import im.vector.matrix.android.internal.database.model.ChunkEntityFields
import im.vector.matrix.android.internal.database.model.EventEntity
import im.vector.matrix.android.internal.database.model.EventEntityFields
import im.vector.matrix.android.internal.database.query.where
import im.vector.matrix.android.internal.session.room.members.RoomMemberExtractor
import im.vector.matrix.android.internal.task.TaskExecutor
import io.realm.OrderedCollectionChangeSet
import io.realm.RealmResults
import io.realm.Sort
private const val INITIAL_LOAD_SIZE = 30
internal class DefaultTimeline(
private val roomId: String,
private val initialEventId: String? = null,
private val monarchy: Monarchy,
private val taskExecutor: TaskExecutor,
private val boundaryCallback: TimelineBoundaryCallback,
private val contextOfEventTask: GetContextOfEventTask,
private val roomMemberExtractor: RoomMemberExtractor
) : Timeline {
private var prevDisplayIndex: Int = 0
private var nextDisplayIndex: Int = 0
private val isLive = initialEventId == null
private val listeners = mutableListOf<Timeline.Listener>()
private val builtEvents = mutableListOf<TimelineEvent>()
private lateinit var liveResults: RealmResults<EventEntity>
private val entityObserver = object : RealmLiveEntityObserver<EventEntity>(monarchy) {
override val query: Monarchy.Query<EventEntity>
get() = buildQuery(initialEventId)
override fun onChanged(realmResults: RealmResults<EventEntity>, changeSet: OrderedCollectionChangeSet) {
changeSet.insertionRanges.forEach {
val (startIndex, direction) = if (it.startIndex == 0) {
Pair(realmResults[it.length]!!.displayIndex, Timeline.Direction.FORWARDS)
} else {
Pair(realmResults[it.startIndex]!!.displayIndex, Timeline.Direction.FORWARDS)
}
addFromLiveResults(startIndex, direction, it.length.toLong())
}
}
override fun processInitialResults(results: RealmResults<EventEntity>) {
// Results are ordered DESCENDING, so first items is the most recent
liveResults = results
val initialDisplayIndex = if (isLive) {
results.first()?.displayIndex
} else {
results.where().equalTo(EventEntityFields.EVENT_ID, initialEventId).findFirst()?.displayIndex
} ?: 0
prevDisplayIndex = initialDisplayIndex
nextDisplayIndex = initialDisplayIndex
val count = Math.min(INITIAL_LOAD_SIZE, results.size).toLong()
if (isLive) {
addFromLiveResults(initialDisplayIndex, Timeline.Direction.BACKWARDS, count)
} else {
val forwardCount = count / 2L
val backwardCount = count - forwardCount
addFromLiveResults(initialDisplayIndex, Timeline.Direction.BACKWARDS, backwardCount)
addFromLiveResults(initialDisplayIndex, Timeline.Direction.BACKWARDS, forwardCount)
}
}
}
override fun paginate(direction: Timeline.Direction, count: Int) {
monarchy.postToMonarchyThread {
val startDisplayIndex = if (direction == Timeline.Direction.BACKWARDS) prevDisplayIndex else nextDisplayIndex
val shouldHitNetwork = addFromLiveResults(startDisplayIndex, direction, count.toLong()).not()
if (shouldHitNetwork) {
if (direction == Timeline.Direction.BACKWARDS) {
val itemAtEnd = builtEvents.last()
boundaryCallback.onItemAtEndLoaded(itemAtEnd)
} else {
val itemAtFront = builtEvents.first()
boundaryCallback.onItemAtFrontLoaded(itemAtFront)
}
}
}
}
override fun addListener(listener: Timeline.Listener) {
if (listeners.isEmpty()) {
entityObserver.start()
}
listeners.add(listener)
}
override fun removeListener(listener: Timeline.Listener) {
listeners.remove(listener)
if (listeners.isEmpty()) {
entityObserver.dispose()
}
}
override fun removeAllListeners() {
listeners.clear()
if (listeners.isEmpty()) {
entityObserver.dispose()
}
}
/**
* @return true if count items has been added
*/
private fun addFromLiveResults(startDisplayIndex: Int,
direction: Timeline.Direction,
count: Long): Boolean {
val offsetResults = getOffsetResults(startDisplayIndex, direction, count)
if (offsetResults.isEmpty()) {
return false
}
val offsetIndex = offsetResults.last()!!.displayIndex
if (direction == Timeline.Direction.BACKWARDS) {
prevDisplayIndex = offsetIndex - 1
} else {
nextDisplayIndex = offsetIndex + 1
}
offsetResults.forEach { eventEntity ->
val roomMember = roomMemberExtractor.extractFrom(eventEntity)
val timelineEvent = TimelineEvent(eventEntity.asDomain(), eventEntity.localId, roomMember)
val position = if (direction == Timeline.Direction.FORWARDS) 0 else builtEvents.size
builtEvents.add(position, timelineEvent)
}
return offsetResults.size.toLong() == count
}
private fun getOffsetResults(startDisplayIndex: Int,
direction: Timeline.Direction,
count: Long): RealmResults<EventEntity> {
val offsetQuery = liveResults.where()
if (direction == Timeline.Direction.BACKWARDS) {
offsetQuery
.sort(EventEntityFields.DISPLAY_INDEX, Sort.DESCENDING)
.lessThanOrEqualTo(EventEntityFields.DISPLAY_INDEX, startDisplayIndex)
} else {
offsetQuery
.sort(EventEntityFields.DISPLAY_INDEX, Sort.ASCENDING)
.greaterThanOrEqualTo(EventEntityFields.DISPLAY_INDEX, startDisplayIndex)
}
return offsetQuery.limit(count).findAll()
}
private fun buildQuery(eventId: String?): Monarchy.Query<EventEntity> {
return Monarchy.Query<EventEntity> { realm ->
val query = if (eventId == null) {
EventEntity
.where(realm, roomId = roomId, linkFilterMode = EventEntity.LinkFilterMode.LINKED_ONLY)
.equalTo("${EventEntityFields.CHUNK}.${ChunkEntityFields.IS_LAST}", true)
} else {
EventEntity
.where(realm, roomId = roomId, linkFilterMode = EventEntity.LinkFilterMode.BOTH)
.`in`("${EventEntityFields.CHUNK}.${ChunkEntityFields.EVENTS.EVENT_ID}", arrayOf(eventId))
}
query.sort(EventEntityFields.DISPLAY_INDEX, Sort.DESCENDING)
}
}
}

View file

@ -20,10 +20,7 @@ import androidx.lifecycle.LiveData
import androidx.paging.LivePagedListBuilder
import androidx.paging.PagedList
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.api.session.room.timeline.TimelineEventInterceptor
import im.vector.matrix.android.api.session.room.timeline.TimelineEvent
import im.vector.matrix.android.api.session.room.timeline.TimelineData
import im.vector.matrix.android.api.session.room.timeline.TimelineService
import im.vector.matrix.android.api.session.room.timeline.*
import im.vector.matrix.android.internal.database.mapper.asDomain
import im.vector.matrix.android.internal.database.model.ChunkEntityFields
import im.vector.matrix.android.internal.database.model.EventEntity
@ -37,6 +34,7 @@ import im.vector.matrix.android.internal.util.PagingRequestHelper
import im.vector.matrix.android.internal.util.tryTransactionAsync
import io.realm.Realm
import io.realm.RealmQuery
import io.realm.Sort
private const val PAGE_SIZE = 100
private const val PREFETCH_DISTANCE = 30
@ -79,6 +77,10 @@ internal class DefaultTimelineService(private val roomId: String,
}
}
override fun createTimeline(eventId: String?): Timeline {
return DefaultTimeline(roomId, eventId, monarchy, taskExecutor, boundaryCallback, contextOfEventTask, roomMemberExtractor)
}
// PRIVATE FUNCTIONS ***************************************************************************
private fun getInitialLoadKey(eventId: String?): Int {
@ -137,7 +139,7 @@ internal class DefaultTimelineService(private val roomId: String,
.where(realm, roomId = roomId, linkFilterMode = EventEntity.LinkFilterMode.BOTH)
.`in`("${EventEntityFields.CHUNK}.${ChunkEntityFields.EVENTS.EVENT_ID}", arrayOf(eventId))
}
return query.sort(EventEntityFields.DISPLAY_INDEX)
return query.sort(EventEntityFields.DISPLAY_INDEX, Sort.DESCENDING)
}

View file

@ -54,7 +54,7 @@ internal class TokenChunkEventPersistor(private val monarchy: Monarchy) {
val prevChunk = ChunkEntity.find(realm, roomId, nextToken = prevToken)
val nextChunk = ChunkEntity.find(realm, roomId, prevToken = nextToken)
// The current chunk is the one we will keep all along the merge process.
// The current chunk is the one we will keep all along the merge processChanges.
// We try to look for a chunk next to the token,
// otherwise we create a whole new one

View file

@ -42,7 +42,7 @@ internal class UserEntityUpdater(monarchy: Monarchy,
}
override fun process(inserted: List<EventEntity>, updated: List<EventEntity>, deleted: List<EventEntity>) {
override fun processChanges(inserted: List<EventEntity>, updated: List<EventEntity>, deleted: List<EventEntity>) {
val roomMembersEvents = inserted.map { it.eventId }
val taskParams = UpdateUserTask.Params(roomMembersEvents)
updateUserTask