From 3777b00ad7448876ee508f260040d9820885ef6f Mon Sep 17 00:00:00 2001 From: Benoit Marty Date: Thu, 11 Mar 2021 21:54:33 +0100 Subject: [PATCH 1/8] Store Ephemeral in files to handle them later (no second transaction) --- .../session/sync/InitialSyncStrategy.kt | 4 +- .../session/sync/ReadReceiptHandler.kt | 57 +++++++++++++- .../sync/RoomSyncEphemeralTemporaryStore.kt | 77 +++++++++++++++++++ .../internal/session/sync/RoomSyncHandler.kt | 31 ++------ .../sdk/internal/session/sync/SyncModule.kt | 3 + .../session/sync/SyncResponseHandler.kt | 9 --- .../sdk/internal/session/sync/SyncTask.kt | 26 +++---- .../sync/model/LazyRoomSyncEphemeral.kt | 20 +---- ...DefaultLazyRoomSyncEphemeralJsonAdapter.kt | 21 ++--- .../sync/parsing/InitialSyncResponseParser.kt | 12 ++- 10 files changed, 171 insertions(+), 89 deletions(-) create mode 100644 matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomSyncEphemeralTemporaryStore.kt diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/InitialSyncStrategy.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/InitialSyncStrategy.kt index 297cc213ed..7d93e30191 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/InitialSyncStrategy.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/InitialSyncStrategy.kt @@ -42,9 +42,9 @@ sealed class InitialSyncStrategy { val minSizeToSplit: Long = 1024 * 1024, /** * Limit per room to reach to decide to store a join room ephemeral Events into a file - * Empiric value: 6 kilobytes + * Empiric value: 1 kilobytes */ - val minSizeToStoreInFile: Long = 6 * 1024, + val minSizeToStoreInFile: Long = 1024, /** * Max number of rooms to insert at a time in database (to avoid too much RAM usage) */ diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/ReadReceiptHandler.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/ReadReceiptHandler.kt index a3c5891f68..7379962620 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/ReadReceiptHandler.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/ReadReceiptHandler.kt @@ -16,12 +16,13 @@ package org.matrix.android.sdk.internal.session.sync +import io.realm.Realm +import org.matrix.android.sdk.api.session.events.model.EventType import org.matrix.android.sdk.internal.database.model.ReadReceiptEntity import org.matrix.android.sdk.internal.database.model.ReadReceiptsSummaryEntity import org.matrix.android.sdk.internal.database.query.createUnmanaged import org.matrix.android.sdk.internal.database.query.getOrCreate import org.matrix.android.sdk.internal.database.query.where -import io.realm.Realm import timber.log.Timber import javax.inject.Inject @@ -35,7 +36,9 @@ typealias ReadReceiptContent = Map + * { + * "type": "m.receipt", + * "content": { + * "$ofZhdeinmEReG_X-agD3J2TIhosEPkuvl62HJ8pVMMs": { + * "m.read": { + * "@benoit.marty:matrix.org": { + * "ts": 1610468193999 + * } + * } + * }, + * "$ZMa_qwE_w_ZOj_vAxv7JuJeHCQfYzuQblmIZxkYmNMs": { + * "m.read": { + * "@benoitx:matrix.org": { + * "ts": 1610468049579 + * }, + * "@benoit.marty:matrix.org": { + * "ts": 1609156029466 + * } + * } + * } + * } + * } + * + */ private fun incrementalSyncStrategy(realm: Realm, roomId: String, content: ReadReceiptContent) { + // First check if we have data from init sync to handle + // TODO Rename contentFromInitSync + val initSyncContent = roomSyncEphemeralTemporaryStore.read(roomId) + ?.events + ?.firstOrNull { it.type == EventType.RECEIPT } + ?.let { + @Suppress("UNCHECKED_CAST") + it.content as? ReadReceiptContent + } + ?.also { + Timber.w("INIT_SYNC Copy RR for room $roomId") + } + + initSyncContent?.let { + Timber.w("BOOK Copy RR for room $roomId") + + // TODO Merge with data we just received + // TODO Store that when we enter the timeline + initialSyncStrategy(realm, roomId, it) + roomSyncEphemeralTemporaryStore.delete(roomId) + } + for ((eventId, receiptDict) in content) { val userIdsDict = receiptDict[READ_KEY] ?: continue val readReceiptsSummary = ReadReceiptsSummaryEntity.where(realm, eventId).findFirst() diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomSyncEphemeralTemporaryStore.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomSyncEphemeralTemporaryStore.kt new file mode 100644 index 0000000000..8d53c63bcc --- /dev/null +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomSyncEphemeralTemporaryStore.kt @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2021 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.matrix.android.sdk.internal.session.sync + +import com.squareup.moshi.JsonReader +import okio.buffer +import okio.source +import org.matrix.android.sdk.internal.di.MoshiProvider +import org.matrix.android.sdk.internal.di.SessionFilesDirectory +import org.matrix.android.sdk.internal.session.sync.model.RoomSyncEphemeral +import org.matrix.android.sdk.internal.util.md5 +import timber.log.Timber +import java.io.File +import javax.inject.Inject + +internal interface RoomSyncEphemeralTemporaryStore { + fun write(roomId: String, roomSyncEphemeralJson: String) + fun read(roomId: String): RoomSyncEphemeral? + fun reset() + fun delete(roomId: String) +} + +internal class RoomSyncEphemeralTemporaryStoreFile @Inject constructor( + @SessionFilesDirectory private val fileDirectory: File +) : RoomSyncEphemeralTemporaryStore { + + private val workingDir = File(fileDirectory, "rr") + .also { it.mkdirs() } + + /** + * Write RoomSyncEphemeral to a file + */ + override fun write(roomId: String, roomSyncEphemeralJson: String) { + Timber.w("INIT_SYNC Store RR for room $roomId") + getFile(roomId).writeText(roomSyncEphemeralJson) + } + + /** + * Read RoomSyncEphemeral from a file, or null if there is no file to read + */ + override fun read(roomId: String): RoomSyncEphemeral? { + return getFile(roomId) + .takeIf { it.exists() } + ?.inputStream() + ?.use { pos -> + MoshiProvider.providesMoshi().adapter(RoomSyncEphemeral::class.java) + .fromJson(JsonReader.of(pos.source().buffer())) + } + } + + override fun delete(roomId: String) { + getFile(roomId).delete() + } + + override fun reset() { + workingDir.deleteRecursively() + workingDir.mkdirs() + } + + private fun getFile(roomId: String): File { + return File(workingDir, "${roomId.md5()}.json") + } +} diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomSyncHandler.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomSyncHandler.kt index b2db6320f1..99ffa80760 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomSyncHandler.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomSyncHandler.kt @@ -60,6 +60,7 @@ import org.matrix.android.sdk.internal.session.room.timeline.PaginationDirection import org.matrix.android.sdk.internal.session.room.timeline.TimelineInput import org.matrix.android.sdk.internal.session.room.typing.TypingEventContent import org.matrix.android.sdk.internal.session.sync.model.InvitedRoomSync +import org.matrix.android.sdk.internal.session.sync.model.LazyRoomSyncEphemeral import org.matrix.android.sdk.internal.session.sync.model.RoomSync import org.matrix.android.sdk.internal.session.sync.model.RoomSyncAccountData import org.matrix.android.sdk.internal.session.sync.model.RoomsSyncResponse @@ -94,19 +95,6 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle handleRoomSync(realm, HandlingStrategy.LEFT(roomsSyncResponse.leave), isInitialSync, reporter) } - fun handleInitSyncEphemeral(realm: Realm, - roomsSyncResponse: RoomsSyncResponse) { - roomsSyncResponse.join.forEach { roomSync -> - val ephemeralResult = roomSync.value.ephemeral - ?.roomSyncEphemeral - ?.events - ?.takeIf { it.isNotEmpty() } - ?.let { events -> handleEphemeral(realm, roomSync.key, events, true) } - - roomTypingUsersHandler.handle(realm, roomSync.key, ephemeralResult) - } - } - // PRIVATE METHODS ***************************************************************************** private fun handleRoomSync(realm: Realm, handlingStrategy: HandlingStrategy, isInitialSync: Boolean, reporter: ProgressReporter?) { @@ -124,7 +112,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle emptyList() } else { handlingStrategy.data.mapWithProgress(reporter, InitSyncStep.ImportingAccountJoinedRooms, 0.6f) { - handleJoinedRoom(realm, it.key, it.value, true, insertType, syncLocalTimeStampMillis) + handleJoinedRoom(realm, it.key, it.value, insertType, syncLocalTimeStampMillis) } } } @@ -165,7 +153,6 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle realm = realm, roomId = it, roomSync = handlingStrategy.data[it] ?: error("Should not happen"), - handleEphemeralEvents = false, insertType = EventInsertType.INITIAL_SYNC, syncLocalTimestampMillis = syncLocalTimeStampMillis ) @@ -177,7 +164,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle } else { // No need to split val rooms = handlingStrategy.data.mapWithProgress(reporter, InitSyncStep.ImportingAccountJoinedRooms, 0.6f) { - handleJoinedRoom(realm, it.key, it.value, false, EventInsertType.INITIAL_SYNC, syncLocalTimeStampMillis) + handleJoinedRoom(realm, it.key, it.value, EventInsertType.INITIAL_SYNC, syncLocalTimeStampMillis) } realm.insertOrUpdate(rooms) } @@ -186,17 +173,15 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle private fun handleJoinedRoom(realm: Realm, roomId: String, roomSync: RoomSync, - handleEphemeralEvents: Boolean, insertType: EventInsertType, syncLocalTimestampMillis: Long): RoomEntity { Timber.v("Handle join sync for room $roomId") - var ephemeralResult: EphemeralResult? = null - if (handleEphemeralEvents) { - ephemeralResult = roomSync.ephemeral?.roomSyncEphemeral?.events - ?.takeIf { it.isNotEmpty() } - ?.let { handleEphemeral(realm, roomId, it, insertType == EventInsertType.INITIAL_SYNC) } - } + val ephemeralResult = (roomSync.ephemeral as? LazyRoomSyncEphemeral.Parsed) + ?._roomSyncEphemeral + ?.events + ?.takeIf { it.isNotEmpty() } + ?.let { handleEphemeral(realm, roomId, it, insertType == EventInsertType.INITIAL_SYNC) } if (roomSync.accountData?.events?.isNotEmpty() == true) { handleRoomAccountDataEvents(realm, roomId, roomSync.accountData) diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncModule.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncModule.kt index 010c029c97..4b31dc4d9b 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncModule.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncModule.kt @@ -37,4 +37,7 @@ internal abstract class SyncModule { @Binds abstract fun bindSyncTask(task: DefaultSyncTask): SyncTask + + @Binds + abstract fun bindRoomSyncEphemeralTemporaryStore(store: RoomSyncEphemeralTemporaryStoreFile): RoomSyncEphemeralTemporaryStore } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncResponseHandler.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncResponseHandler.kt index d17a672485..fab1369aff 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncResponseHandler.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncResponseHandler.kt @@ -128,15 +128,6 @@ internal class SyncResponseHandler @Inject constructor(@SessionDatabase private cryptoSyncHandler.onSyncCompleted(syncResponse) } - suspend fun handleInitSyncSecondTransaction(syncResponse: SyncResponse) { - // Start another transaction to handle the ephemeral events - monarchy.awaitTransaction { realm -> - if (syncResponse.rooms != null) { - roomSyncHandler.handleInitSyncEphemeral(realm, syncResponse.rooms) - } - } - } - /** * At the moment we don't get any group data through the sync, so we poll where every hour. * You can also force to refetch group data using [Group] API. diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncTask.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncTask.kt index 00060a33b1..d47ca8fa68 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncTask.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncTask.kt @@ -62,7 +62,8 @@ internal class DefaultSyncTask @Inject constructor( private val globalErrorReceiver: GlobalErrorReceiver, @SessionFilesDirectory private val fileDirectory: File, - private val syncResponseParser: InitialSyncResponseParser + private val syncResponseParser: InitialSyncResponseParser, + private val roomSyncEphemeralTemporaryStore: RoomSyncEphemeralTemporaryStore ) : SyncTask { private val workingDir = File(fileDirectory, "is") @@ -102,13 +103,16 @@ internal class DefaultSyncTask @Inject constructor( if (isInitialSync) { Timber.v("INIT_SYNC with filter: ${requestParams["filter"]}") val initSyncStrategy = initialSyncStrategy - var syncResp: SyncResponse? = null logDuration("INIT_SYNC strategy: $initSyncStrategy") { if (initSyncStrategy is InitialSyncStrategy.Optimized) { + roomSyncEphemeralTemporaryStore.reset() + workingDir.mkdirs() val file = downloadInitSyncResponse(requestParams) - syncResp = reportSubtask(initialSyncProgressService, InitSyncStep.ImportingAccount, 1, 0.7F) { + reportSubtask(initialSyncProgressService, InitSyncStep.ImportingAccount, 1, 0.7F) { handleSyncFile(file, initSyncStrategy) } + // Delete all files + workingDir.deleteRecursively() } else { val syncResponse = logDuration("INIT_SYNC Request") { executeRequest(globalErrorReceiver) { @@ -125,15 +129,6 @@ internal class DefaultSyncTask @Inject constructor( } } initialSyncProgressService.endAll() - - if (initSyncStrategy is InitialSyncStrategy.Optimized) { - logDuration("INIT_SYNC Handle ephemeral") { - syncResponseHandler.handleInitSyncSecondTransaction(syncResp!!) - } - initialSyncStatusRepository.setStep(InitialSyncStatus.STEP_SUCCESS) - // Delete all files - workingDir.deleteRecursively() - } } else { val syncResponse = executeRequest(globalErrorReceiver) { apiCall = syncAPI.sync( @@ -147,7 +142,6 @@ internal class DefaultSyncTask @Inject constructor( } private suspend fun downloadInitSyncResponse(requestParams: Map): File { - workingDir.mkdirs() val workingFile = File(workingDir, "initSync.json") val status = initialSyncStatusRepository.getStep() if (workingFile.exists() && status >= InitialSyncStatus.STEP_DOWNLOADED) { @@ -201,8 +195,8 @@ internal class DefaultSyncTask @Inject constructor( } } - private suspend fun handleSyncFile(workingFile: File, initSyncStrategy: InitialSyncStrategy.Optimized): SyncResponse { - return logDuration("INIT_SYNC handleSyncFile()") { + private suspend fun handleSyncFile(workingFile: File, initSyncStrategy: InitialSyncStrategy.Optimized) { + logDuration("INIT_SYNC handleSyncFile()") { val syncResponse = logDuration("INIT_SYNC Read file and parse") { syncResponseParser.parse(initSyncStrategy, workingFile) } @@ -215,7 +209,7 @@ internal class DefaultSyncTask @Inject constructor( logDuration("INIT_SYNC Database insertion") { syncResponseHandler.handleResponse(syncResponse, null, initialSyncProgressService) } - syncResponse + initialSyncStatusRepository.setStep(InitialSyncStatus.STEP_SUCCESS) } } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/model/LazyRoomSyncEphemeral.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/model/LazyRoomSyncEphemeral.kt index 938168b5f4..83006c646b 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/model/LazyRoomSyncEphemeral.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/model/LazyRoomSyncEphemeral.kt @@ -16,28 +16,10 @@ package org.matrix.android.sdk.internal.session.sync.model -import com.squareup.moshi.JsonAdapter import com.squareup.moshi.JsonClass -import com.squareup.moshi.JsonReader -import okio.buffer -import okio.source -import java.io.File @JsonClass(generateAdapter = false) internal sealed class LazyRoomSyncEphemeral { data class Parsed(val _roomSyncEphemeral: RoomSyncEphemeral) : LazyRoomSyncEphemeral() - data class Stored(val roomSyncEphemeralAdapter: JsonAdapter, val file: File) : LazyRoomSyncEphemeral() - - val roomSyncEphemeral: RoomSyncEphemeral - get() { - return when (this) { - is Parsed -> _roomSyncEphemeral - is Stored -> { - // Parse the file now - file.inputStream().use { pos -> - roomSyncEphemeralAdapter.fromJson(JsonReader.of(pos.source().buffer()))!! - } - } - } - } + object Stored : LazyRoomSyncEphemeral() } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/parsing/DefaultLazyRoomSyncEphemeralJsonAdapter.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/parsing/DefaultLazyRoomSyncEphemeralJsonAdapter.kt index ef56802a66..22ac4f911d 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/parsing/DefaultLazyRoomSyncEphemeralJsonAdapter.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/parsing/DefaultLazyRoomSyncEphemeralJsonAdapter.kt @@ -22,11 +22,10 @@ import com.squareup.moshi.JsonReader import com.squareup.moshi.JsonWriter import com.squareup.moshi.ToJson import org.matrix.android.sdk.internal.session.sync.InitialSyncStrategy +import org.matrix.android.sdk.internal.session.sync.RoomSyncEphemeralTemporaryStore import org.matrix.android.sdk.internal.session.sync.model.LazyRoomSyncEphemeral import org.matrix.android.sdk.internal.session.sync.model.RoomSyncEphemeral import timber.log.Timber -import java.io.File -import java.util.concurrent.atomic.AtomicInteger internal class DefaultLazyRoomSyncEphemeralJsonAdapter { @@ -44,20 +43,15 @@ internal class DefaultLazyRoomSyncEphemeralJsonAdapter { } } -internal class SplitLazyRoomSyncJsonAdapter( - private val workingDirectory: File, +internal class SplitLazyRoomSyncEphemeralJsonAdapter( + private val roomSyncEphemeralTemporaryStore: RoomSyncEphemeralTemporaryStore, private val syncStrategy: InitialSyncStrategy.Optimized ) { - private val atomicInteger = AtomicInteger(0) - - private fun createFile(): File { - val index = atomicInteger.getAndIncrement() - return File(workingDirectory, "room_$index.json") - } - @FromJson fun fromJson(reader: JsonReader, delegate: JsonAdapter): LazyRoomSyncEphemeral? { val path = reader.path + val roomId = path.substringAfter("\$.rooms.join.").substringBeforeLast(".ephemeral") + val json = reader.nextSource().inputStream().bufferedReader().use { it.readText() } @@ -65,9 +59,8 @@ internal class SplitLazyRoomSyncJsonAdapter( return if (json.length > limit) { Timber.v("INIT_SYNC $path content length: ${json.length} copy to a file") // Copy the source to a file - val file = createFile() - file.writeText(json) - LazyRoomSyncEphemeral.Stored(delegate, file) + roomSyncEphemeralTemporaryStore.write(roomId, json) + LazyRoomSyncEphemeral.Stored } else { Timber.v("INIT_SYNC $path content length: ${json.length} parse it now") val roomSync = delegate.fromJson(json) ?: return null diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/parsing/InitialSyncResponseParser.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/parsing/InitialSyncResponseParser.kt index ae7b2a4468..bfa9974b77 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/parsing/InitialSyncResponseParser.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/parsing/InitialSyncResponseParser.kt @@ -20,29 +20,33 @@ import com.squareup.moshi.Moshi import okio.buffer import okio.source import org.matrix.android.sdk.internal.session.sync.InitialSyncStrategy +import org.matrix.android.sdk.internal.session.sync.RoomSyncEphemeralTemporaryStore import org.matrix.android.sdk.internal.session.sync.model.SyncResponse import timber.log.Timber import java.io.File import javax.inject.Inject -internal class InitialSyncResponseParser @Inject constructor(private val moshi: Moshi) { +internal class InitialSyncResponseParser @Inject constructor( + private val moshi: Moshi, + private val roomSyncEphemeralTemporaryStore: RoomSyncEphemeralTemporaryStore +) { fun parse(syncStrategy: InitialSyncStrategy.Optimized, workingFile: File): SyncResponse { val syncResponseLength = workingFile.length().toInt() Timber.v("INIT_SYNC Sync file size is $syncResponseLength bytes") val shouldSplit = syncResponseLength >= syncStrategy.minSizeToSplit Timber.v("INIT_SYNC should split in several files: $shouldSplit") - return getMoshi(syncStrategy, workingFile.parentFile!!, shouldSplit) + return getMoshi(syncStrategy, shouldSplit) .adapter(SyncResponse::class.java) .fromJson(workingFile.source().buffer())!! } - private fun getMoshi(syncStrategy: InitialSyncStrategy.Optimized, workingDirectory: File, shouldSplit: Boolean): Moshi { + private fun getMoshi(syncStrategy: InitialSyncStrategy.Optimized, shouldSplit: Boolean): Moshi { // If we don't have to split we'll rely on the already default moshi if (!shouldSplit) return moshi // Otherwise, we create a new adapter for handling Map of Lazy sync return moshi.newBuilder() - .add(SplitLazyRoomSyncJsonAdapter(workingDirectory, syncStrategy)) + .add(SplitLazyRoomSyncEphemeralJsonAdapter(roomSyncEphemeralTemporaryStore, syncStrategy)) .build() } } From fe39c92e25a54ffc5437e58e828cc64bcf03728f Mon Sep 17 00:00:00 2001 From: Benoit Marty Date: Thu, 11 Mar 2021 22:24:23 +0100 Subject: [PATCH 2/8] Load RR when opening a timeline --- .../session/room/timeline/DefaultTimeline.kt | 19 +++++- .../room/timeline/DefaultTimelineService.kt | 36 ++++++----- .../session/sync/ReadReceiptHandler.kt | 61 +++++-------------- .../sync/RoomSyncEphemeralTemporaryStore.kt | 2 +- 4 files changed, 54 insertions(+), 64 deletions(-) diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/timeline/DefaultTimeline.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/timeline/DefaultTimeline.kt index d0946abe28..92877950eb 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/timeline/DefaultTimeline.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/timeline/DefaultTimeline.kt @@ -44,6 +44,7 @@ import org.matrix.android.sdk.internal.database.query.findAllInRoomWithSendState import org.matrix.android.sdk.internal.database.query.where import org.matrix.android.sdk.internal.database.query.whereRoomId import org.matrix.android.sdk.internal.session.room.membership.LoadRoomMembersTask +import org.matrix.android.sdk.internal.session.sync.ReadReceiptHandler import org.matrix.android.sdk.internal.task.TaskExecutor import org.matrix.android.sdk.internal.task.configureWith import org.matrix.android.sdk.internal.util.Debouncer @@ -73,7 +74,8 @@ internal class DefaultTimeline( private val timelineInput: TimelineInput, private val eventDecryptor: TimelineEventDecryptor, private val realmSessionProvider: RealmSessionProvider, - private val loadRoomMembersTask: LoadRoomMembersTask + private val loadRoomMembersTask: LoadRoomMembersTask, + private val readReceiptHandler: ReadReceiptHandler ) : Timeline, TimelineHiddenReadReceipts.Delegate, TimelineInput.Listener, @@ -182,11 +184,26 @@ internal class DefaultTimeline( } .executeBy(taskExecutor) + // Ensure ReadReceipt from init sync are loaded + ensureReadReceiptAreLoaded(realm) + isReady.set(true) } } } + private fun ensureReadReceiptAreLoaded(realm: Realm) { + readReceiptHandler.getContentFromInitSync(roomId) + ?.also { + Timber.w("INIT_SYNC Insert when opening timeline RR for room $roomId") + } + ?.let { readReceiptContent -> + realm.executeTransactionAsync { + readReceiptHandler.handle(it, roomId, readReceiptContent, true) + } + } + } + private fun TimelineSettings.shouldHandleHiddenReadReceipts(): Boolean { return buildReadReceipts && (filters.filterEdits || filters.filterTypes) } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/timeline/DefaultTimelineService.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/timeline/DefaultTimelineService.kt index bb774022d4..c3714a1303 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/timeline/DefaultTimelineService.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/timeline/DefaultTimelineService.kt @@ -17,10 +17,10 @@ package org.matrix.android.sdk.internal.session.room.timeline import androidx.lifecycle.LiveData -import dagger.assisted.Assisted -import dagger.assisted.AssistedInject -import dagger.assisted.AssistedFactory import com.zhuinden.monarchy.Monarchy +import dagger.assisted.Assisted +import dagger.assisted.AssistedFactory +import dagger.assisted.AssistedInject import io.realm.Sort import io.realm.kotlin.where import org.matrix.android.sdk.api.session.events.model.isImageMessage @@ -38,20 +38,23 @@ import org.matrix.android.sdk.internal.database.model.TimelineEventEntityFields import org.matrix.android.sdk.internal.database.query.where import org.matrix.android.sdk.internal.di.SessionDatabase import org.matrix.android.sdk.internal.session.room.membership.LoadRoomMembersTask +import org.matrix.android.sdk.internal.session.sync.ReadReceiptHandler import org.matrix.android.sdk.internal.task.TaskExecutor -internal class DefaultTimelineService @AssistedInject constructor(@Assisted private val roomId: String, - @SessionDatabase private val monarchy: Monarchy, - private val realmSessionProvider: RealmSessionProvider, - private val timelineInput: TimelineInput, - private val taskExecutor: TaskExecutor, - private val contextOfEventTask: GetContextOfEventTask, - private val eventDecryptor: TimelineEventDecryptor, - private val paginationTask: PaginationTask, - private val fetchTokenAndPaginateTask: FetchTokenAndPaginateTask, - private val timelineEventMapper: TimelineEventMapper, - private val readReceiptsSummaryMapper: ReadReceiptsSummaryMapper, - private val loadRoomMembersTask: LoadRoomMembersTask +internal class DefaultTimelineService @AssistedInject constructor( + @Assisted private val roomId: String, + @SessionDatabase private val monarchy: Monarchy, + private val realmSessionProvider: RealmSessionProvider, + private val timelineInput: TimelineInput, + private val taskExecutor: TaskExecutor, + private val contextOfEventTask: GetContextOfEventTask, + private val eventDecryptor: TimelineEventDecryptor, + private val paginationTask: PaginationTask, + private val fetchTokenAndPaginateTask: FetchTokenAndPaginateTask, + private val timelineEventMapper: TimelineEventMapper, + private val readReceiptsSummaryMapper: ReadReceiptsSummaryMapper, + private val loadRoomMembersTask: LoadRoomMembersTask, + private val readReceiptHandler: ReadReceiptHandler ) : TimelineService { @AssistedFactory @@ -74,7 +77,8 @@ internal class DefaultTimelineService @AssistedInject constructor(@Assisted priv eventDecryptor = eventDecryptor, fetchTokenAndPaginateTask = fetchTokenAndPaginateTask, realmSessionProvider = realmSessionProvider, - loadRoomMembersTask = loadRoomMembersTask + loadRoomMembersTask = loadRoomMembersTask, + readReceiptHandler = readReceiptHandler ) } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/ReadReceiptHandler.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/ReadReceiptHandler.kt index 7379962620..c24196ca8f 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/ReadReceiptHandler.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/ReadReceiptHandler.kt @@ -90,55 +90,11 @@ internal class ReadReceiptHandler @Inject constructor( realm.insertOrUpdate(readReceiptSummaries) } - /** - * Example of content: - * - *
-     * {
-     *     "type": "m.receipt",
-     *     "content": {
-     *         "$ofZhdeinmEReG_X-agD3J2TIhosEPkuvl62HJ8pVMMs": {
-     *             "m.read": {
-     *                 "@benoit.marty:matrix.org": {
-     *                     "ts": 1610468193999
-     *                 }
-     *             }
-     *         },
-     *         "$ZMa_qwE_w_ZOj_vAxv7JuJeHCQfYzuQblmIZxkYmNMs": {
-     *             "m.read": {
-     *                 "@benoitx:matrix.org": {
-     *                     "ts": 1610468049579
-     *                 },
-     *                 "@benoit.marty:matrix.org": {
-     *                     "ts": 1609156029466
-     *                 }
-     *             }
-     *         }
-     *     }
-     * }
-     * 
- */ private fun incrementalSyncStrategy(realm: Realm, roomId: String, content: ReadReceiptContent) { // First check if we have data from init sync to handle - // TODO Rename contentFromInitSync - val initSyncContent = roomSyncEphemeralTemporaryStore.read(roomId) - ?.events - ?.firstOrNull { it.type == EventType.RECEIPT } - ?.let { - @Suppress("UNCHECKED_CAST") - it.content as? ReadReceiptContent - } - ?.also { - Timber.w("INIT_SYNC Copy RR for room $roomId") - } - - initSyncContent?.let { - Timber.w("BOOK Copy RR for room $roomId") - - // TODO Merge with data we just received - // TODO Store that when we enter the timeline + getContentFromInitSync(roomId)?.let { + Timber.w("INIT_SYNC Insert during incremental sync RR for room $roomId") initialSyncStrategy(realm, roomId, it) - roomSyncEphemeralTemporaryStore.delete(roomId) } for ((eventId, receiptDict) in content) { @@ -163,4 +119,17 @@ internal class ReadReceiptHandler @Inject constructor( } } } + + fun getContentFromInitSync(roomId: String): ReadReceiptContent? { + return roomSyncEphemeralTemporaryStore.read(roomId) + ?.events + ?.firstOrNull { it.type == EventType.RECEIPT } + ?.let { + @Suppress("UNCHECKED_CAST") + it.content as? ReadReceiptContent + } + ?.also { + roomSyncEphemeralTemporaryStore.delete(roomId) + } + } } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomSyncEphemeralTemporaryStore.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomSyncEphemeralTemporaryStore.kt index 8d53c63bcc..5241c6f725 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomSyncEphemeralTemporaryStore.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomSyncEphemeralTemporaryStore.kt @@ -45,7 +45,7 @@ internal class RoomSyncEphemeralTemporaryStoreFile @Inject constructor( * Write RoomSyncEphemeral to a file */ override fun write(roomId: String, roomSyncEphemeralJson: String) { - Timber.w("INIT_SYNC Store RR for room $roomId") + Timber.w("INIT_SYNC Store ephemeral events for room $roomId") getFile(roomId).writeText(roomSyncEphemeralJson) } From 0b0634b53138b941941e312b803d14cb8885b5fb Mon Sep 17 00:00:00 2001 From: Benoit Marty Date: Thu, 11 Mar 2021 22:37:56 +0100 Subject: [PATCH 3/8] Ensure tmp file is always deleted when it is read --- .../android/sdk/internal/session/sync/ReadReceiptHandler.kt | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/ReadReceiptHandler.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/ReadReceiptHandler.kt index c24196ca8f..68beddfc5a 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/ReadReceiptHandler.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/ReadReceiptHandler.kt @@ -122,14 +122,12 @@ internal class ReadReceiptHandler @Inject constructor( fun getContentFromInitSync(roomId: String): ReadReceiptContent? { return roomSyncEphemeralTemporaryStore.read(roomId) + ?.also { roomSyncEphemeralTemporaryStore.delete(roomId) } ?.events ?.firstOrNull { it.type == EventType.RECEIPT } ?.let { @Suppress("UNCHECKED_CAST") it.content as? ReadReceiptContent } - ?.also { - roomSyncEphemeralTemporaryStore.delete(roomId) - } } } From 857bfcb9710784ab073a164d7d67eeb151a52a80 Mon Sep 17 00:00:00 2001 From: Benoit Marty Date: Fri, 12 Mar 2021 12:16:25 +0100 Subject: [PATCH 4/8] Cleanup --- CHANGES.md | 2 +- .../android/sdk/internal/session/sync/ReadReceiptHandler.kt | 6 ++---- .../session/sync/RoomSyncEphemeralTemporaryStore.kt | 2 +- 3 files changed, 4 insertions(+), 6 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index e4ff049550..33bc122e57 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -5,7 +5,7 @@ Features ✨: - Improvements 🙌: - - + - Lazy storage of ReadReceipts Bugfix 🐛: - diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/ReadReceiptHandler.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/ReadReceiptHandler.kt index 68beddfc5a..efa578f2b8 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/ReadReceiptHandler.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/ReadReceiptHandler.kt @@ -121,13 +121,11 @@ internal class ReadReceiptHandler @Inject constructor( } fun getContentFromInitSync(roomId: String): ReadReceiptContent? { + @Suppress("UNCHECKED_CAST") return roomSyncEphemeralTemporaryStore.read(roomId) ?.also { roomSyncEphemeralTemporaryStore.delete(roomId) } ?.events ?.firstOrNull { it.type == EventType.RECEIPT } - ?.let { - @Suppress("UNCHECKED_CAST") - it.content as? ReadReceiptContent - } + ?.content as? ReadReceiptContent } } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomSyncEphemeralTemporaryStore.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomSyncEphemeralTemporaryStore.kt index 5241c6f725..b9fd4e3d57 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomSyncEphemeralTemporaryStore.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomSyncEphemeralTemporaryStore.kt @@ -35,7 +35,7 @@ internal interface RoomSyncEphemeralTemporaryStore { } internal class RoomSyncEphemeralTemporaryStoreFile @Inject constructor( - @SessionFilesDirectory private val fileDirectory: File + @SessionFilesDirectory fileDirectory: File ) : RoomSyncEphemeralTemporaryStore { private val workingDir = File(fileDirectory, "rr") From ed662d3adde15b5b8a3e06e3b5629748f3ea1e4e Mon Sep 17 00:00:00 2001 From: Benoit Marty Date: Mon, 15 Mar 2021 14:19:23 +0100 Subject: [PATCH 5/8] Fix double RR issue when optimizing init sync --- .../internal/session/room/timeline/DefaultTimeline.kt | 2 +- .../sdk/internal/session/sync/ReadReceiptHandler.kt | 11 +++++++---- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/timeline/DefaultTimeline.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/timeline/DefaultTimeline.kt index 92877950eb..70d5f31042 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/timeline/DefaultTimeline.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/timeline/DefaultTimeline.kt @@ -199,7 +199,7 @@ internal class DefaultTimeline( } ?.let { readReceiptContent -> realm.executeTransactionAsync { - readReceiptHandler.handle(it, roomId, readReceiptContent, true) + readReceiptHandler.handle(it, roomId, readReceiptContent, false) } } } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/ReadReceiptHandler.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/ReadReceiptHandler.kt index efa578f2b8..a4a48752bc 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/ReadReceiptHandler.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/ReadReceiptHandler.kt @@ -56,9 +56,8 @@ internal class ReadReceiptHandler @Inject constructor( } fun handle(realm: Realm, roomId: String, content: ReadReceiptContent?, isInitialSync: Boolean) { - if (content == null) { - return - } + content ?: return + try { handleReadReceiptContent(realm, roomId, content, isInitialSync) } catch (exception: Exception) { @@ -94,9 +93,13 @@ internal class ReadReceiptHandler @Inject constructor( // First check if we have data from init sync to handle getContentFromInitSync(roomId)?.let { Timber.w("INIT_SYNC Insert during incremental sync RR for room $roomId") - initialSyncStrategy(realm, roomId, it) + doIncrementalSyncStrategy(realm, roomId, it) } + doIncrementalSyncStrategy(realm, roomId, content) + } + + private fun doIncrementalSyncStrategy(realm: Realm, roomId: String, content: ReadReceiptContent) { for ((eventId, receiptDict) in content) { val userIdsDict = receiptDict[READ_KEY] ?: continue val readReceiptsSummary = ReadReceiptsSummaryEntity.where(realm, eventId).findFirst() From e8bb3d81ed8f8b8f4b2bf73bf6c05c166b1303d3 Mon Sep 17 00:00:00 2001 From: Benoit Marty Date: Tue, 16 Mar 2021 12:15:28 +0100 Subject: [PATCH 6/8] Ganfra's review: inject Moshi Moshi --- .../session/sync/RoomSyncEphemeralTemporaryStore.kt | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomSyncEphemeralTemporaryStore.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomSyncEphemeralTemporaryStore.kt index b9fd4e3d57..c6ff71cfcf 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomSyncEphemeralTemporaryStore.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomSyncEphemeralTemporaryStore.kt @@ -17,9 +17,9 @@ package org.matrix.android.sdk.internal.session.sync import com.squareup.moshi.JsonReader +import com.squareup.moshi.Moshi import okio.buffer import okio.source -import org.matrix.android.sdk.internal.di.MoshiProvider import org.matrix.android.sdk.internal.di.SessionFilesDirectory import org.matrix.android.sdk.internal.session.sync.model.RoomSyncEphemeral import org.matrix.android.sdk.internal.util.md5 @@ -35,12 +35,15 @@ internal interface RoomSyncEphemeralTemporaryStore { } internal class RoomSyncEphemeralTemporaryStoreFile @Inject constructor( - @SessionFilesDirectory fileDirectory: File + @SessionFilesDirectory fileDirectory: File, + moshi: Moshi ) : RoomSyncEphemeralTemporaryStore { private val workingDir = File(fileDirectory, "rr") .also { it.mkdirs() } + private val roomSyncEphemeralAdapter = moshi.adapter(RoomSyncEphemeral::class.java) + /** * Write RoomSyncEphemeral to a file */ @@ -57,8 +60,7 @@ internal class RoomSyncEphemeralTemporaryStoreFile @Inject constructor( .takeIf { it.exists() } ?.inputStream() ?.use { pos -> - MoshiProvider.providesMoshi().adapter(RoomSyncEphemeral::class.java) - .fromJson(JsonReader.of(pos.source().buffer())) + roomSyncEphemeralAdapter.fromJson(JsonReader.of(pos.source().buffer())) } } From cb17fa60dcf916c460b2292af446e5c3c64d3426 Mon Sep 17 00:00:00 2001 From: Benoit Marty Date: Tue, 16 Mar 2021 13:28:36 +0100 Subject: [PATCH 7/8] Ganfra's review: delete files after the Realm transaction --- .../session/room/read/SetReadMarkersTask.kt | 2 +- .../session/room/timeline/DefaultTimeline.kt | 3 +- .../session/sync/ReadReceiptHandler.kt | 44 +++++++++++++++---- .../internal/session/sync/RoomSyncHandler.kt | 33 +++++++++----- .../session/sync/RoomTypingUsersHandler.kt | 1 + .../session/sync/SyncResponseHandler.kt | 30 ++++++++----- .../SyncResponsePostTreatmentAggregator.kt | 22 ++++++++++ ...cResponsePostTreatmentAggregatorHandler.kt | 33 ++++++++++++++ 8 files changed, 133 insertions(+), 35 deletions(-) create mode 100644 matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncResponsePostTreatmentAggregator.kt create mode 100644 matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncResponsePostTreatmentAggregatorHandler.kt diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/read/SetReadMarkersTask.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/read/SetReadMarkersTask.kt index c7f962a699..54d2307dd4 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/read/SetReadMarkersTask.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/read/SetReadMarkersTask.kt @@ -117,7 +117,7 @@ internal class DefaultSetReadMarkersTask @Inject constructor( } if (readReceiptId != null) { val readReceiptContent = ReadReceiptHandler.createContent(userId, readReceiptId) - readReceiptHandler.handle(realm, roomId, readReceiptContent, false) + readReceiptHandler.handle(realm, roomId, readReceiptContent, false, null) } if (shouldUpdateRoomSummary) { val roomSummary = RoomSummaryEntity.where(realm, roomId).findFirst() diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/timeline/DefaultTimeline.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/timeline/DefaultTimeline.kt index 70d5f31042..61f770b956 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/timeline/DefaultTimeline.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/timeline/DefaultTimeline.kt @@ -199,7 +199,8 @@ internal class DefaultTimeline( } ?.let { readReceiptContent -> realm.executeTransactionAsync { - readReceiptHandler.handle(it, roomId, readReceiptContent, false) + readReceiptHandler.handle(it, roomId, readReceiptContent, false, null) + readReceiptHandler.onContentFromInitSyncHandled(roomId) } } } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/ReadReceiptHandler.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/ReadReceiptHandler.kt index a4a48752bc..e5d9217db7 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/ReadReceiptHandler.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/ReadReceiptHandler.kt @@ -55,21 +55,29 @@ internal class ReadReceiptHandler @Inject constructor( } } - fun handle(realm: Realm, roomId: String, content: ReadReceiptContent?, isInitialSync: Boolean) { + fun handle(realm: Realm, + roomId: String, + content: ReadReceiptContent?, + isInitialSync: Boolean, + aggregator: SyncResponsePostTreatmentAggregator?) { content ?: return try { - handleReadReceiptContent(realm, roomId, content, isInitialSync) + handleReadReceiptContent(realm, roomId, content, isInitialSync, aggregator) } catch (exception: Exception) { Timber.e("Fail to handle read receipt for room $roomId") } } - private fun handleReadReceiptContent(realm: Realm, roomId: String, content: ReadReceiptContent, isInitialSync: Boolean) { + private fun handleReadReceiptContent(realm: Realm, + roomId: String, + content: ReadReceiptContent, + isInitialSync: Boolean, + aggregator: SyncResponsePostTreatmentAggregator?) { if (isInitialSync) { initialSyncStrategy(realm, roomId, content) } else { - incrementalSyncStrategy(realm, roomId, content) + incrementalSyncStrategy(realm, roomId, content, aggregator) } } @@ -89,11 +97,15 @@ internal class ReadReceiptHandler @Inject constructor( realm.insertOrUpdate(readReceiptSummaries) } - private fun incrementalSyncStrategy(realm: Realm, roomId: String, content: ReadReceiptContent) { + private fun incrementalSyncStrategy(realm: Realm, + roomId: String, + content: ReadReceiptContent, + aggregator: SyncResponsePostTreatmentAggregator?) { // First check if we have data from init sync to handle getContentFromInitSync(roomId)?.let { Timber.w("INIT_SYNC Insert during incremental sync RR for room $roomId") doIncrementalSyncStrategy(realm, roomId, it) + aggregator?.ephemeralFilesToDelete?.add(roomId) } doIncrementalSyncStrategy(realm, roomId, content) @@ -124,11 +136,25 @@ internal class ReadReceiptHandler @Inject constructor( } fun getContentFromInitSync(roomId: String): ReadReceiptContent? { + val dataFromFile = roomSyncEphemeralTemporaryStore.read(roomId) + + dataFromFile ?: return null + @Suppress("UNCHECKED_CAST") - return roomSyncEphemeralTemporaryStore.read(roomId) - ?.also { roomSyncEphemeralTemporaryStore.delete(roomId) } - ?.events - ?.firstOrNull { it.type == EventType.RECEIPT } + val content = dataFromFile + .events + .firstOrNull { it.type == EventType.RECEIPT } ?.content as? ReadReceiptContent + + if (content == null) { + // We can delete the file now + roomSyncEphemeralTemporaryStore.delete(roomId) + } + + return content + } + + fun onContentFromInitSyncHandled(roomId: String) { + roomSyncEphemeralTemporaryStore.delete(roomId) } } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomSyncHandler.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomSyncHandler.kt index 99ffa80760..6c07d4d241 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomSyncHandler.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomSyncHandler.kt @@ -88,16 +88,21 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle fun handle(realm: Realm, roomsSyncResponse: RoomsSyncResponse, isInitialSync: Boolean, + aggregator: SyncResponsePostTreatmentAggregator, reporter: ProgressReporter? = null) { Timber.v("Execute transaction from $this") - handleRoomSync(realm, HandlingStrategy.JOINED(roomsSyncResponse.join), isInitialSync, reporter) - handleRoomSync(realm, HandlingStrategy.INVITED(roomsSyncResponse.invite), isInitialSync, reporter) - handleRoomSync(realm, HandlingStrategy.LEFT(roomsSyncResponse.leave), isInitialSync, reporter) + handleRoomSync(realm, HandlingStrategy.JOINED(roomsSyncResponse.join), isInitialSync, aggregator, reporter) + handleRoomSync(realm, HandlingStrategy.INVITED(roomsSyncResponse.invite), isInitialSync, aggregator, reporter) + handleRoomSync(realm, HandlingStrategy.LEFT(roomsSyncResponse.leave), isInitialSync, aggregator,reporter) } // PRIVATE METHODS ***************************************************************************** - private fun handleRoomSync(realm: Realm, handlingStrategy: HandlingStrategy, isInitialSync: Boolean, reporter: ProgressReporter?) { + private fun handleRoomSync(realm: Realm, + handlingStrategy: HandlingStrategy, + isInitialSync: Boolean, + aggregator: SyncResponsePostTreatmentAggregator, + reporter: ProgressReporter?) { val insertType = if (isInitialSync) { EventInsertType.INITIAL_SYNC } else { @@ -107,12 +112,12 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle val rooms = when (handlingStrategy) { is HandlingStrategy.JOINED -> { if (isInitialSync && initialSyncStrategy is InitialSyncStrategy.Optimized) { - insertJoinRoomsFromInitSync(realm, handlingStrategy, syncLocalTimeStampMillis, reporter) + insertJoinRoomsFromInitSync(realm, handlingStrategy, syncLocalTimeStampMillis, aggregator, reporter) // Rooms are already inserted, return an empty list emptyList() } else { handlingStrategy.data.mapWithProgress(reporter, InitSyncStep.ImportingAccountJoinedRooms, 0.6f) { - handleJoinedRoom(realm, it.key, it.value, insertType, syncLocalTimeStampMillis) + handleJoinedRoom(realm, it.key, it.value, insertType, syncLocalTimeStampMillis, aggregator) } } } @@ -133,6 +138,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle private fun insertJoinRoomsFromInitSync(realm: Realm, handlingStrategy: HandlingStrategy.JOINED, syncLocalTimeStampMillis: Long, + aggregator: SyncResponsePostTreatmentAggregator, reporter: ProgressReporter?) { val maxSize = (initialSyncStrategy as? InitialSyncStrategy.Optimized)?.maxRoomsToInsert ?: Int.MAX_VALUE val listSize = handlingStrategy.data.keys.size @@ -154,7 +160,8 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle roomId = it, roomSync = handlingStrategy.data[it] ?: error("Should not happen"), insertType = EventInsertType.INITIAL_SYNC, - syncLocalTimestampMillis = syncLocalTimeStampMillis + syncLocalTimestampMillis = syncLocalTimeStampMillis, + aggregator ) } realm.insertOrUpdate(roomEntities) @@ -164,7 +171,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle } else { // No need to split val rooms = handlingStrategy.data.mapWithProgress(reporter, InitSyncStep.ImportingAccountJoinedRooms, 0.6f) { - handleJoinedRoom(realm, it.key, it.value, EventInsertType.INITIAL_SYNC, syncLocalTimeStampMillis) + handleJoinedRoom(realm, it.key, it.value, EventInsertType.INITIAL_SYNC, syncLocalTimeStampMillis, aggregator) } realm.insertOrUpdate(rooms) } @@ -174,14 +181,15 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle roomId: String, roomSync: RoomSync, insertType: EventInsertType, - syncLocalTimestampMillis: Long): RoomEntity { + syncLocalTimestampMillis: Long, + aggregator: SyncResponsePostTreatmentAggregator): RoomEntity { Timber.v("Handle join sync for room $roomId") val ephemeralResult = (roomSync.ephemeral as? LazyRoomSyncEphemeral.Parsed) ?._roomSyncEphemeral ?.events ?.takeIf { it.isNotEmpty() } - ?.let { handleEphemeral(realm, roomId, it, insertType == EventInsertType.INITIAL_SYNC) } + ?.let { handleEphemeral(realm, roomId, it, insertType == EventInsertType.INITIAL_SYNC, aggregator) } if (roomSync.accountData?.events?.isNotEmpty() == true) { handleRoomAccountDataEvents(realm, roomId, roomSync.accountData) @@ -421,14 +429,15 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle private fun handleEphemeral(realm: Realm, roomId: String, ephemeralEvents: List, - isInitialSync: Boolean): EphemeralResult { + isInitialSync: Boolean, + aggregator: SyncResponsePostTreatmentAggregator): EphemeralResult { var result = EphemeralResult() for (event in ephemeralEvents) { when (event.type) { EventType.RECEIPT -> { @Suppress("UNCHECKED_CAST") (event.content as? ReadReceiptContent)?.let { readReceiptContent -> - readReceiptHandler.handle(realm, roomId, readReceiptContent, isInitialSync) + readReceiptHandler.handle(realm, roomId, readReceiptContent, isInitialSync, aggregator) } } EventType.TYPING -> { diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomTypingUsersHandler.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomTypingUsersHandler.kt index f4f3e6ce43..b7851031ad 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomTypingUsersHandler.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomTypingUsersHandler.kt @@ -26,6 +26,7 @@ import javax.inject.Inject internal class RoomTypingUsersHandler @Inject constructor(@UserId private val userId: String, private val typingUsersTracker: DefaultTypingUsersTracker) { + // TODO This could be handled outside of the Realm transaction. Use the new aggregator? fun handle(realm: Realm, roomId: String, ephemeralResult: RoomSyncHandler.EphemeralResult?) { val roomMemberHelper = RoomMemberHelper(realm, roomId) val typingIds = ephemeralResult?.typingUserIds?.filter { it != userId }.orEmpty() diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncResponseHandler.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncResponseHandler.kt index fab1369aff..98ef5290a6 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncResponseHandler.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncResponseHandler.kt @@ -41,17 +41,19 @@ import kotlin.system.measureTimeMillis private const val GET_GROUP_DATA_WORKER = "GET_GROUP_DATA_WORKER" -internal class SyncResponseHandler @Inject constructor(@SessionDatabase private val monarchy: Monarchy, - @SessionId private val sessionId: String, - private val workManagerProvider: WorkManagerProvider, - private val roomSyncHandler: RoomSyncHandler, - private val userAccountDataSyncHandler: UserAccountDataSyncHandler, - private val groupSyncHandler: GroupSyncHandler, - private val cryptoSyncHandler: CryptoSyncHandler, - private val cryptoService: DefaultCryptoService, - private val tokenStore: SyncTokenStore, - private val processEventForPushTask: ProcessEventForPushTask, - private val pushRuleService: PushRuleService) { +internal class SyncResponseHandler @Inject constructor( + @SessionDatabase private val monarchy: Monarchy, + @SessionId private val sessionId: String, + private val workManagerProvider: WorkManagerProvider, + private val roomSyncHandler: RoomSyncHandler, + private val userAccountDataSyncHandler: UserAccountDataSyncHandler, + private val groupSyncHandler: GroupSyncHandler, + private val cryptoSyncHandler: CryptoSyncHandler, + private val aggregatorHandler: SyncResponsePostTreatmentAggregatorHandler, + private val cryptoService: DefaultCryptoService, + private val tokenStore: SyncTokenStore, + private val processEventForPushTask: ProcessEventForPushTask, + private val pushRuleService: PushRuleService) { suspend fun handleResponse(syncResponse: SyncResponse, fromToken: String?, @@ -81,13 +83,14 @@ internal class SyncResponseHandler @Inject constructor(@SessionDatabase private }.also { Timber.v("Finish handling toDevice in $it ms") } + val aggregator = SyncResponsePostTreatmentAggregator() // Start one big transaction monarchy.awaitTransaction { realm -> measureTimeMillis { Timber.v("Handle rooms") reportSubtask(reporter, InitSyncStep.ImportingAccountRoom, 1, 0.7f) { if (syncResponse.rooms != null) { - roomSyncHandler.handle(realm, syncResponse.rooms, isInitialSync, reporter) + roomSyncHandler.handle(realm, syncResponse.rooms, isInitialSync, aggregator,reporter) } } }.also { @@ -115,7 +118,10 @@ internal class SyncResponseHandler @Inject constructor(@SessionDatabase private } tokenStore.saveToken(realm, syncResponse.nextBatch) } + // Everything else we need to do outside the transaction + aggregatorHandler.handle(aggregator) + syncResponse.rooms?.let { checkPushRules(it, isInitialSync) userAccountDataSyncHandler.synchronizeWithServerIfNeeded(it.invite) diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncResponsePostTreatmentAggregator.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncResponsePostTreatmentAggregator.kt new file mode 100644 index 0000000000..ea10a32f3e --- /dev/null +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncResponsePostTreatmentAggregator.kt @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2021 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.matrix.android.sdk.internal.session.sync + +internal class SyncResponsePostTreatmentAggregator { + // List of RoomId + val ephemeralFilesToDelete = mutableListOf() +} diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncResponsePostTreatmentAggregatorHandler.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncResponsePostTreatmentAggregatorHandler.kt new file mode 100644 index 0000000000..8a3f115af5 --- /dev/null +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncResponsePostTreatmentAggregatorHandler.kt @@ -0,0 +1,33 @@ +/* + * Copyright (c) 2021 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.matrix.android.sdk.internal.session.sync + +import javax.inject.Inject + +internal class SyncResponsePostTreatmentAggregatorHandler @Inject constructor( + private val ephemeralTemporaryStore: RoomSyncEphemeralTemporaryStore +) { + fun handle(synResHaResponsePostTreatmentAggregator: SyncResponsePostTreatmentAggregator) { + cleanupEphemeralFiles(synResHaResponsePostTreatmentAggregator.ephemeralFilesToDelete) + } + + private fun cleanupEphemeralFiles(ephemeralFilesToDelete: List) { + ephemeralFilesToDelete.forEach { + ephemeralTemporaryStore.delete(it) + } + } +} From a30660ed4389623bd2eb72f2342da21d2afad7ea Mon Sep 17 00:00:00 2001 From: Benoit Marty Date: Tue, 16 Mar 2021 14:44:40 +0100 Subject: [PATCH 8/8] Cleanup --- .../matrix/android/sdk/internal/session/sync/RoomSyncHandler.kt | 2 +- .../android/sdk/internal/session/sync/SyncResponseHandler.kt | 2 +- .../session/sync/SyncResponsePostTreatmentAggregatorHandler.kt | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomSyncHandler.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomSyncHandler.kt index 6c07d4d241..a96d55d028 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomSyncHandler.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomSyncHandler.kt @@ -93,7 +93,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle Timber.v("Execute transaction from $this") handleRoomSync(realm, HandlingStrategy.JOINED(roomsSyncResponse.join), isInitialSync, aggregator, reporter) handleRoomSync(realm, HandlingStrategy.INVITED(roomsSyncResponse.invite), isInitialSync, aggregator, reporter) - handleRoomSync(realm, HandlingStrategy.LEFT(roomsSyncResponse.leave), isInitialSync, aggregator,reporter) + handleRoomSync(realm, HandlingStrategy.LEFT(roomsSyncResponse.leave), isInitialSync, aggregator, reporter) } // PRIVATE METHODS ***************************************************************************** diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncResponseHandler.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncResponseHandler.kt index 98ef5290a6..8e243c3443 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncResponseHandler.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncResponseHandler.kt @@ -90,7 +90,7 @@ internal class SyncResponseHandler @Inject constructor( Timber.v("Handle rooms") reportSubtask(reporter, InitSyncStep.ImportingAccountRoom, 1, 0.7f) { if (syncResponse.rooms != null) { - roomSyncHandler.handle(realm, syncResponse.rooms, isInitialSync, aggregator,reporter) + roomSyncHandler.handle(realm, syncResponse.rooms, isInitialSync, aggregator, reporter) } } }.also { diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncResponsePostTreatmentAggregatorHandler.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncResponsePostTreatmentAggregatorHandler.kt index 8a3f115af5..12b77c706b 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncResponsePostTreatmentAggregatorHandler.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncResponsePostTreatmentAggregatorHandler.kt @@ -1,5 +1,5 @@ /* - * Copyright (c) 2021 New Vector Ltd + * Copyright (c) 2021 The Matrix.org Foundation C.I.C. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License.