Handle read receipt saving (still need to find a way to improve insertion )

This commit is contained in:
ganfra 2018-10-24 18:11:01 +02:00
parent efceda1def
commit 7ecbe09661
13 changed files with 138 additions and 57 deletions

View file

@ -0,0 +1,7 @@
package im.vector.matrix.android.api.session.room.model
data class ReadReceipt(
val userId: String,
val eventId: String,
val originServerTs: Long
)

View file

@ -0,0 +1,11 @@
package im.vector.matrix.android.internal.database.model
import io.realm.RealmObject
import io.realm.annotations.PrimaryKey
open class ReadReceiptEntity(@PrimaryKey var primaryKey: String = "",
var userId: String = "",
var eventId: String = "",
var roomId: String = "",
var originServerTs: Double = 0.0
) : RealmObject()

View file

@ -13,10 +13,11 @@ open class RoomEntity(@PrimaryKey var roomId: String = "",
private var membershipStr: String = MyMembership.NONE.name
@delegate:Ignore var membership: MyMembership by Delegates.observable(MyMembership.valueOf(membershipStr)) { _, _, newValue ->
@delegate:Ignore
var membership: MyMembership by Delegates.observable(MyMembership.valueOf(membershipStr)) { _, _, newValue ->
membershipStr = newValue.name
}
companion object;
companion object
}

View file

@ -23,7 +23,7 @@ class DefaultRoomService(private val monarchy: Monarchy) : RoomService {
override fun getRoom(roomId: String): Room? {
var room: Room? = null
monarchy.doWithRealm { realm ->
room = RoomEntity.where(realm, roomId).findFirst()?.let { it.asDomain() }
room = RoomEntity.where(realm, roomId).findFirst()?.asDomain()
}
return room
}

View file

@ -18,7 +18,7 @@ class RoomModule : Module {
}
scope(DefaultSession.SCOPE) {
PaginationRequest(get(), get(), get())
PaginationRequest(get(), get(), get(), get())
}
}.invoke()
}

View file

@ -12,7 +12,7 @@ import im.vector.matrix.android.internal.database.model.RoomEntity
import im.vector.matrix.android.internal.database.model.RoomSummaryEntity
import im.vector.matrix.android.internal.database.query.last
import im.vector.matrix.android.internal.database.query.where
import io.realm.RealmResults
import io.realm.Realm
import timber.log.Timber
import java.util.concurrent.atomic.AtomicBoolean
@ -42,39 +42,38 @@ internal class RoomSummaryUpdater(private val monarchy: Monarchy,
if (changeSet == null) {
return
}
manageRoomResults(changeSet.realmResults, changeSet.orderedCollectionChangeSet.changes)
manageRoomResults(changeSet.realmResults, changeSet.orderedCollectionChangeSet.insertions)
val rooms = changeSet.realmResults.map { it.asDomain() }
val indexesToUpdate = changeSet.orderedCollectionChangeSet.changes + changeSet.orderedCollectionChangeSet.insertions
monarchy.writeAsync { realm ->
manageRoomList(realm, rooms, indexesToUpdate)
}
}
private fun manageRoomResults(rooms: RealmResults<RoomEntity>, indexes: IntArray) {
private fun manageRoomList(realm: Realm, rooms: List<Room>, indexes: IntArray) {
indexes.forEach {
val room = rooms[it]?.asDomain()
val room = rooms[it]
try {
manageRoom(room)
manageRoom(realm, room)
} catch (e: Exception) {
Timber.e(e, "An error occured when updating room summaries")
}
}
}
private fun manageRoom(room: Room?) {
private fun manageRoom(realm: Realm, room: Room?) {
if (room == null) {
return
}
val roomSummary = RoomSummaryEntity.where(realm, room.roomId).findFirst()
?: RoomSummaryEntity(room.roomId)
val lastMessageEvent = EventEntity.where(realm, room.roomId, EventType.MESSAGE).last()
val lastTopicEvent = EventEntity.where(realm, room.roomId, EventType.STATE_ROOM_TOPIC).last()?.asDomain()
monarchy.writeAsync { realm ->
val roomSummary = RoomSummaryEntity.where(realm, room.roomId).findFirst()
?: RoomSummaryEntity(room.roomId)
val lastMessageEvent = EventEntity.where(realm, room.roomId, EventType.MESSAGE).last()
val lastTopicEvent = EventEntity.where(realm, room.roomId, EventType.STATE_ROOM_TOPIC).last()?.asDomain()
roomSummary.displayName = roomDisplayNameResolver.resolve(context, room).toString()
roomSummary.topic = lastTopicEvent?.content<RoomTopicContent>()?.topic
roomSummary.lastMessage = lastMessageEvent
}
roomSummary.displayName = roomDisplayNameResolver.resolve(context, room).toString()
roomSummary.topic = lastTopicEvent?.content<RoomTopicContent>()?.topic
roomSummary.lastMessage = lastMessageEvent
}
}

View file

@ -28,7 +28,8 @@ import kotlinx.coroutines.withContext
class PaginationRequest(private val roomAPI: RoomAPI,
private val monarchy: Monarchy,
private val coroutineDispatchers: MatrixCoroutineDispatchers) {
private val coroutineDispatchers: MatrixCoroutineDispatchers,
private val stateEventsChunkHandler: StateEventsChunkHandler) {
fun execute(roomId: String,
from: String?,
@ -69,37 +70,32 @@ class PaginationRequest(private val roomAPI: RoomAPI,
}
}
private fun insertInDb(chunkEvent: TokenChunkEvent, roomId: String) {
private fun insertInDb(receivedChunk: TokenChunkEvent, roomId: String) {
monarchy.runTransactionSync { realm ->
val roomEntity = RoomEntity.where(realm, roomId).findFirst()
?: return@runTransactionSync
?: return@runTransactionSync
val nextChunk = ChunkEntity.findWithPrevToken(realm, roomId, chunkEvent.nextToken)
val prevChunk = ChunkEntity.findWithNextToken(realm, roomId, chunkEvent.prevToken)
val currentChunk = ChunkEntity.findWithPrevToken(realm, roomId, receivedChunk.nextToken)
?: ChunkEntity()
currentChunk.prevToken = receivedChunk.prevToken
val eventIds = chunkEvent.chunk.filter { it.eventId != null }.map { it.eventId!! }
val prevChunk = ChunkEntity.findWithNextToken(realm, roomId, receivedChunk.prevToken)
val eventIds = receivedChunk.chunk.filter { it.eventId != null }.map { it.eventId!! }
val chunksOverlapped = ChunkEntity.findAllIncludingEvents(realm, eventIds)
val hasOverlapped = chunksOverlapped.isNotEmpty()
val currentChunk = if (nextChunk != null) {
nextChunk
} else {
ChunkEntity()
}
currentChunk.prevToken = chunkEvent.prevToken
val stateEventsChunk = StateEventsChunkHandler().handle(realm, roomId, chunkEvent.stateEvents)
val stateEventsChunk = stateEventsChunkHandler.handle(realm, roomId, receivedChunk.stateEvents)
if (!roomEntity.chunks.contains(stateEventsChunk)) {
roomEntity.chunks.add(stateEventsChunk)
}
chunkEvent.chunk.forEach { event ->
receivedChunk.chunk.forEach { event ->
val eventEntity = event.asEntity().let {
realm.copyToRealmOrUpdate(it)
}
if (!currentChunk.events.contains(eventEntity)) {
currentChunk.events.add(0, eventEntity)
currentChunk.events.add(eventEntity)
}
}
@ -111,7 +107,7 @@ class PaginationRequest(private val roomAPI: RoomAPI,
chunksOverlapped.forEach { overlapped ->
overlapped.events.forEach { event ->
if (!currentChunk.events.contains(event)) {
currentChunk.events.add(0, event)
currentChunk.events.add(event)
}
}
currentChunk.prevToken = overlapped.prevToken

View file

@ -0,0 +1,37 @@
package im.vector.matrix.android.internal.session.sync
import im.vector.matrix.android.internal.database.model.ReadReceiptEntity
import io.realm.Realm
// the receipts dictionnaries
// key : $EventId
// value : dict key $UserId
// value dict key ts
// dict value ts value
typealias ReadReceiptContent = Map<String, Map<String, Map<String, Map<String, Double>>>>
class ReadReceiptHandler {
fun handle(realm: Realm, roomId: String, content: ReadReceiptContent?): List<ReadReceiptEntity> {
if (content == null) {
return emptyList()
}
return content
.flatMap { (eventId, receiptDict) ->
receiptDict
.filterKeys { it == "m.read" }
.flatMap { (_, userIdsDict) ->
userIdsDict.map { (userId, paramsDict) ->
val ts = paramsDict.filterKeys { it == "ts" }
.values
.firstOrNull() ?: 0.0
val primaryKey = roomId + userId
ReadReceiptEntity(primaryKey, userId, eventId, roomId, ts)
}
}
}
.apply { realm.insertOrUpdate(this) }
}
}

View file

@ -2,6 +2,7 @@ package im.vector.matrix.android.internal.session.sync
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.api.session.events.model.Event
import im.vector.matrix.android.api.session.events.model.EventType
import im.vector.matrix.android.api.session.room.model.MyMembership
import im.vector.matrix.android.internal.database.mapper.asEntity
import im.vector.matrix.android.internal.database.model.ChunkEntity
@ -12,11 +13,14 @@ import im.vector.matrix.android.internal.database.query.findLastLiveChunkFromRoo
import im.vector.matrix.android.internal.database.query.where
import im.vector.matrix.android.internal.session.sync.model.InvitedRoomSync
import im.vector.matrix.android.internal.session.sync.model.RoomSync
import im.vector.matrix.android.internal.session.sync.model.RoomSyncEphemeral
import im.vector.matrix.android.internal.session.sync.model.RoomSyncSummary
import io.realm.Realm
class RoomSyncHandler(private val monarchy: Monarchy) {
internal class RoomSyncHandler(private val monarchy: Monarchy,
private val stateEventsChunkHandler: StateEventsChunkHandler,
private val readReceiptHandler: ReadReceiptHandler) {
sealed class HandlingStrategy {
data class JOINED(val data: Map<String, RoomSync>) : HandlingStrategy()
@ -27,14 +31,21 @@ class RoomSyncHandler(private val monarchy: Monarchy) {
fun handleRoomSync(handlingStrategy: HandlingStrategy) {
monarchy.runTransactionSync { realm ->
val roomEntities = when (handlingStrategy) {
is HandlingStrategy.JOINED -> handlingStrategy.data.map { handleJoinedRoom(realm, it.key, it.value) }
is HandlingStrategy.JOINED -> handlingStrategy.data.map { handleJoinedRoom(realm, it.key, it.value) }
is HandlingStrategy.INVITED -> handlingStrategy.data.map { handleInvitedRoom(realm, it.key, it.value) }
is HandlingStrategy.LEFT -> handlingStrategy.data.map { handleLeftRoom(it.key, it.value) }
is HandlingStrategy.LEFT -> handlingStrategy.data.map { handleLeftRoom(it.key, it.value) }
}
realm.insertOrUpdate(roomEntities)
}
}
if (handlingStrategy is HandlingStrategy.JOINED) {
monarchy.runTransactionSync { realm ->
handlingStrategy.data.forEach { (roomId, roomSync) ->
handleEphemeral(realm, roomId, roomSync.ephemeral)
}
}
}
}
// PRIVATE METHODS *****************************************************************************
@ -55,7 +66,7 @@ class RoomSyncHandler(private val monarchy: Monarchy) {
handleRoomSummary(realm, roomId, roomSync.summary)
}
if (roomSync.state != null && roomSync.state.events.isNotEmpty()) {
val chunkEntity = StateEventsChunkHandler().handle(realm, roomId, roomSync.state.events)
val chunkEntity = stateEventsChunkHandler.handle(realm, roomId, roomSync.state.events)
if (!roomEntity.chunks.contains(chunkEntity)) {
roomEntity.chunks.add(chunkEntity)
}
@ -99,7 +110,7 @@ class RoomSyncHandler(private val monarchy: Monarchy) {
roomSummary: RoomSyncSummary) {
val roomSummaryEntity = RoomSummaryEntity.where(realm, roomId).findFirst()
?: RoomSummaryEntity(roomId)
?: RoomSummaryEntity(roomId)
if (roomSummary.heroes.isNotEmpty()) {
roomSummaryEntity.heroes.clear()
@ -141,4 +152,16 @@ class RoomSyncHandler(private val monarchy: Monarchy) {
return chunkEntity
}
private fun handleEphemeral(realm: Realm,
roomId: String,
ephemeral: RoomSyncEphemeral?) {
if (ephemeral == null || ephemeral.events.isNullOrEmpty()) {
return
}
ephemeral.events
.filter { it.type == EventType.RECEIPT }
.map { it.content<ReadReceiptContent>() }
.flatMap { readReceiptHandler.handle(realm, roomId, it) }
}
}

View file

@ -18,7 +18,15 @@ class SyncModule : Module {
}
scope(DefaultSession.SCOPE) {
RoomSyncHandler(get())
StateEventsChunkHandler()
}
scope(DefaultSession.SCOPE) {
ReadReceiptHandler()
}
scope(DefaultSession.SCOPE) {
RoomSyncHandler(get(), get(), get())
}
scope(DefaultSession.SCOPE) {
@ -37,6 +45,5 @@ class SyncModule : Module {
SyncThread(get(), get(), get(), get())
}
}.invoke()
}

View file

@ -16,9 +16,9 @@ import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
class SyncRequest(private val syncAPI: SyncAPI,
private val coroutineDispatchers: MatrixCoroutineDispatchers,
private val syncResponseHandler: SyncResponseHandler) {
internal class SyncRequest(private val syncAPI: SyncAPI,
private val coroutineDispatchers: MatrixCoroutineDispatchers,
private val syncResponseHandler: SyncResponseHandler) {
fun execute(token: String?, callback: MatrixCallback<SyncResponse>): Cancelable {

View file

@ -3,7 +3,7 @@ package im.vector.matrix.android.internal.session.sync
import im.vector.matrix.android.internal.session.sync.model.SyncResponse
import timber.log.Timber
class SyncResponseHandler(private val roomSyncHandler: RoomSyncHandler) {
internal class SyncResponseHandler(private val roomSyncHandler: RoomSyncHandler) {
fun handleResponse(syncResponse: SyncResponse?, fromToken: String?, isCatchingUp: Boolean) {
if (syncResponse == null) {

View file

@ -3,20 +3,20 @@ package im.vector.matrix.android.internal.session.sync.job
import im.vector.matrix.android.api.MatrixCallback
import im.vector.matrix.android.api.failure.Failure
import im.vector.matrix.android.api.util.Cancelable
import im.vector.matrix.android.internal.network.NetworkConnectivityChecker
import im.vector.matrix.android.internal.session.sync.SyncRequest
import im.vector.matrix.android.internal.session.sync.SyncTokenStore
import im.vector.matrix.android.internal.session.sync.model.SyncResponse
import im.vector.matrix.android.internal.network.NetworkConnectivityChecker
import im.vector.matrix.android.internal.util.BackgroundDetectionObserver
import timber.log.Timber
import java.util.concurrent.CountDownLatch
private const val RETRY_WAIT_TIME_MS = 10_000L
class SyncThread(private val syncRequest: SyncRequest,
private val networkConnectivityChecker: NetworkConnectivityChecker,
private val syncTokenStore: SyncTokenStore,
private val backgroundDetectionObserver: BackgroundDetectionObserver
internal class SyncThread(private val syncRequest: SyncRequest,
private val networkConnectivityChecker: NetworkConnectivityChecker,
private val syncTokenStore: SyncTokenStore,
private val backgroundDetectionObserver: BackgroundDetectionObserver
) : Thread(), NetworkConnectivityChecker.Listener, BackgroundDetectionObserver.Listener {
enum class State {