diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/InitialSyncStrategy.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/InitialSyncStrategy.kt index 297cc213ed..7d93e30191 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/InitialSyncStrategy.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/InitialSyncStrategy.kt @@ -42,9 +42,9 @@ sealed class InitialSyncStrategy { val minSizeToSplit: Long = 1024 * 1024, /** * Limit per room to reach to decide to store a join room ephemeral Events into a file - * Empiric value: 6 kilobytes + * Empiric value: 1 kilobytes */ - val minSizeToStoreInFile: Long = 6 * 1024, + val minSizeToStoreInFile: Long = 1024, /** * Max number of rooms to insert at a time in database (to avoid too much RAM usage) */ diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/ReadReceiptHandler.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/ReadReceiptHandler.kt index a3c5891f68..7379962620 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/ReadReceiptHandler.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/ReadReceiptHandler.kt @@ -16,12 +16,13 @@ package org.matrix.android.sdk.internal.session.sync +import io.realm.Realm +import org.matrix.android.sdk.api.session.events.model.EventType import org.matrix.android.sdk.internal.database.model.ReadReceiptEntity import org.matrix.android.sdk.internal.database.model.ReadReceiptsSummaryEntity import org.matrix.android.sdk.internal.database.query.createUnmanaged import org.matrix.android.sdk.internal.database.query.getOrCreate import org.matrix.android.sdk.internal.database.query.where -import io.realm.Realm import timber.log.Timber import javax.inject.Inject @@ -35,7 +36,9 @@ typealias ReadReceiptContent = Map + * { + * "type": "m.receipt", + * "content": { + * "$ofZhdeinmEReG_X-agD3J2TIhosEPkuvl62HJ8pVMMs": { + * "m.read": { + * "@benoit.marty:matrix.org": { + * "ts": 1610468193999 + * } + * } + * }, + * "$ZMa_qwE_w_ZOj_vAxv7JuJeHCQfYzuQblmIZxkYmNMs": { + * "m.read": { + * "@benoitx:matrix.org": { + * "ts": 1610468049579 + * }, + * "@benoit.marty:matrix.org": { + * "ts": 1609156029466 + * } + * } + * } + * } + * } + * + */ private fun incrementalSyncStrategy(realm: Realm, roomId: String, content: ReadReceiptContent) { + // First check if we have data from init sync to handle + // TODO Rename contentFromInitSync + val initSyncContent = roomSyncEphemeralTemporaryStore.read(roomId) + ?.events + ?.firstOrNull { it.type == EventType.RECEIPT } + ?.let { + @Suppress("UNCHECKED_CAST") + it.content as? ReadReceiptContent + } + ?.also { + Timber.w("INIT_SYNC Copy RR for room $roomId") + } + + initSyncContent?.let { + Timber.w("BOOK Copy RR for room $roomId") + + // TODO Merge with data we just received + // TODO Store that when we enter the timeline + initialSyncStrategy(realm, roomId, it) + roomSyncEphemeralTemporaryStore.delete(roomId) + } + for ((eventId, receiptDict) in content) { val userIdsDict = receiptDict[READ_KEY] ?: continue val readReceiptsSummary = ReadReceiptsSummaryEntity.where(realm, eventId).findFirst() diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomSyncEphemeralTemporaryStore.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomSyncEphemeralTemporaryStore.kt new file mode 100644 index 0000000000..8d53c63bcc --- /dev/null +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomSyncEphemeralTemporaryStore.kt @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2021 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.matrix.android.sdk.internal.session.sync + +import com.squareup.moshi.JsonReader +import okio.buffer +import okio.source +import org.matrix.android.sdk.internal.di.MoshiProvider +import org.matrix.android.sdk.internal.di.SessionFilesDirectory +import org.matrix.android.sdk.internal.session.sync.model.RoomSyncEphemeral +import org.matrix.android.sdk.internal.util.md5 +import timber.log.Timber +import java.io.File +import javax.inject.Inject + +internal interface RoomSyncEphemeralTemporaryStore { + fun write(roomId: String, roomSyncEphemeralJson: String) + fun read(roomId: String): RoomSyncEphemeral? + fun reset() + fun delete(roomId: String) +} + +internal class RoomSyncEphemeralTemporaryStoreFile @Inject constructor( + @SessionFilesDirectory private val fileDirectory: File +) : RoomSyncEphemeralTemporaryStore { + + private val workingDir = File(fileDirectory, "rr") + .also { it.mkdirs() } + + /** + * Write RoomSyncEphemeral to a file + */ + override fun write(roomId: String, roomSyncEphemeralJson: String) { + Timber.w("INIT_SYNC Store RR for room $roomId") + getFile(roomId).writeText(roomSyncEphemeralJson) + } + + /** + * Read RoomSyncEphemeral from a file, or null if there is no file to read + */ + override fun read(roomId: String): RoomSyncEphemeral? { + return getFile(roomId) + .takeIf { it.exists() } + ?.inputStream() + ?.use { pos -> + MoshiProvider.providesMoshi().adapter(RoomSyncEphemeral::class.java) + .fromJson(JsonReader.of(pos.source().buffer())) + } + } + + override fun delete(roomId: String) { + getFile(roomId).delete() + } + + override fun reset() { + workingDir.deleteRecursively() + workingDir.mkdirs() + } + + private fun getFile(roomId: String): File { + return File(workingDir, "${roomId.md5()}.json") + } +} diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomSyncHandler.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomSyncHandler.kt index b2db6320f1..99ffa80760 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomSyncHandler.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/RoomSyncHandler.kt @@ -60,6 +60,7 @@ import org.matrix.android.sdk.internal.session.room.timeline.PaginationDirection import org.matrix.android.sdk.internal.session.room.timeline.TimelineInput import org.matrix.android.sdk.internal.session.room.typing.TypingEventContent import org.matrix.android.sdk.internal.session.sync.model.InvitedRoomSync +import org.matrix.android.sdk.internal.session.sync.model.LazyRoomSyncEphemeral import org.matrix.android.sdk.internal.session.sync.model.RoomSync import org.matrix.android.sdk.internal.session.sync.model.RoomSyncAccountData import org.matrix.android.sdk.internal.session.sync.model.RoomsSyncResponse @@ -94,19 +95,6 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle handleRoomSync(realm, HandlingStrategy.LEFT(roomsSyncResponse.leave), isInitialSync, reporter) } - fun handleInitSyncEphemeral(realm: Realm, - roomsSyncResponse: RoomsSyncResponse) { - roomsSyncResponse.join.forEach { roomSync -> - val ephemeralResult = roomSync.value.ephemeral - ?.roomSyncEphemeral - ?.events - ?.takeIf { it.isNotEmpty() } - ?.let { events -> handleEphemeral(realm, roomSync.key, events, true) } - - roomTypingUsersHandler.handle(realm, roomSync.key, ephemeralResult) - } - } - // PRIVATE METHODS ***************************************************************************** private fun handleRoomSync(realm: Realm, handlingStrategy: HandlingStrategy, isInitialSync: Boolean, reporter: ProgressReporter?) { @@ -124,7 +112,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle emptyList() } else { handlingStrategy.data.mapWithProgress(reporter, InitSyncStep.ImportingAccountJoinedRooms, 0.6f) { - handleJoinedRoom(realm, it.key, it.value, true, insertType, syncLocalTimeStampMillis) + handleJoinedRoom(realm, it.key, it.value, insertType, syncLocalTimeStampMillis) } } } @@ -165,7 +153,6 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle realm = realm, roomId = it, roomSync = handlingStrategy.data[it] ?: error("Should not happen"), - handleEphemeralEvents = false, insertType = EventInsertType.INITIAL_SYNC, syncLocalTimestampMillis = syncLocalTimeStampMillis ) @@ -177,7 +164,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle } else { // No need to split val rooms = handlingStrategy.data.mapWithProgress(reporter, InitSyncStep.ImportingAccountJoinedRooms, 0.6f) { - handleJoinedRoom(realm, it.key, it.value, false, EventInsertType.INITIAL_SYNC, syncLocalTimeStampMillis) + handleJoinedRoom(realm, it.key, it.value, EventInsertType.INITIAL_SYNC, syncLocalTimeStampMillis) } realm.insertOrUpdate(rooms) } @@ -186,17 +173,15 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle private fun handleJoinedRoom(realm: Realm, roomId: String, roomSync: RoomSync, - handleEphemeralEvents: Boolean, insertType: EventInsertType, syncLocalTimestampMillis: Long): RoomEntity { Timber.v("Handle join sync for room $roomId") - var ephemeralResult: EphemeralResult? = null - if (handleEphemeralEvents) { - ephemeralResult = roomSync.ephemeral?.roomSyncEphemeral?.events - ?.takeIf { it.isNotEmpty() } - ?.let { handleEphemeral(realm, roomId, it, insertType == EventInsertType.INITIAL_SYNC) } - } + val ephemeralResult = (roomSync.ephemeral as? LazyRoomSyncEphemeral.Parsed) + ?._roomSyncEphemeral + ?.events + ?.takeIf { it.isNotEmpty() } + ?.let { handleEphemeral(realm, roomId, it, insertType == EventInsertType.INITIAL_SYNC) } if (roomSync.accountData?.events?.isNotEmpty() == true) { handleRoomAccountDataEvents(realm, roomId, roomSync.accountData) diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncModule.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncModule.kt index 010c029c97..4b31dc4d9b 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncModule.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncModule.kt @@ -37,4 +37,7 @@ internal abstract class SyncModule { @Binds abstract fun bindSyncTask(task: DefaultSyncTask): SyncTask + + @Binds + abstract fun bindRoomSyncEphemeralTemporaryStore(store: RoomSyncEphemeralTemporaryStoreFile): RoomSyncEphemeralTemporaryStore } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncResponseHandler.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncResponseHandler.kt index d17a672485..fab1369aff 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncResponseHandler.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncResponseHandler.kt @@ -128,15 +128,6 @@ internal class SyncResponseHandler @Inject constructor(@SessionDatabase private cryptoSyncHandler.onSyncCompleted(syncResponse) } - suspend fun handleInitSyncSecondTransaction(syncResponse: SyncResponse) { - // Start another transaction to handle the ephemeral events - monarchy.awaitTransaction { realm -> - if (syncResponse.rooms != null) { - roomSyncHandler.handleInitSyncEphemeral(realm, syncResponse.rooms) - } - } - } - /** * At the moment we don't get any group data through the sync, so we poll where every hour. * You can also force to refetch group data using [Group] API. diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncTask.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncTask.kt index 00060a33b1..d47ca8fa68 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncTask.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncTask.kt @@ -62,7 +62,8 @@ internal class DefaultSyncTask @Inject constructor( private val globalErrorReceiver: GlobalErrorReceiver, @SessionFilesDirectory private val fileDirectory: File, - private val syncResponseParser: InitialSyncResponseParser + private val syncResponseParser: InitialSyncResponseParser, + private val roomSyncEphemeralTemporaryStore: RoomSyncEphemeralTemporaryStore ) : SyncTask { private val workingDir = File(fileDirectory, "is") @@ -102,13 +103,16 @@ internal class DefaultSyncTask @Inject constructor( if (isInitialSync) { Timber.v("INIT_SYNC with filter: ${requestParams["filter"]}") val initSyncStrategy = initialSyncStrategy - var syncResp: SyncResponse? = null logDuration("INIT_SYNC strategy: $initSyncStrategy") { if (initSyncStrategy is InitialSyncStrategy.Optimized) { + roomSyncEphemeralTemporaryStore.reset() + workingDir.mkdirs() val file = downloadInitSyncResponse(requestParams) - syncResp = reportSubtask(initialSyncProgressService, InitSyncStep.ImportingAccount, 1, 0.7F) { + reportSubtask(initialSyncProgressService, InitSyncStep.ImportingAccount, 1, 0.7F) { handleSyncFile(file, initSyncStrategy) } + // Delete all files + workingDir.deleteRecursively() } else { val syncResponse = logDuration("INIT_SYNC Request") { executeRequest(globalErrorReceiver) { @@ -125,15 +129,6 @@ internal class DefaultSyncTask @Inject constructor( } } initialSyncProgressService.endAll() - - if (initSyncStrategy is InitialSyncStrategy.Optimized) { - logDuration("INIT_SYNC Handle ephemeral") { - syncResponseHandler.handleInitSyncSecondTransaction(syncResp!!) - } - initialSyncStatusRepository.setStep(InitialSyncStatus.STEP_SUCCESS) - // Delete all files - workingDir.deleteRecursively() - } } else { val syncResponse = executeRequest(globalErrorReceiver) { apiCall = syncAPI.sync( @@ -147,7 +142,6 @@ internal class DefaultSyncTask @Inject constructor( } private suspend fun downloadInitSyncResponse(requestParams: Map): File { - workingDir.mkdirs() val workingFile = File(workingDir, "initSync.json") val status = initialSyncStatusRepository.getStep() if (workingFile.exists() && status >= InitialSyncStatus.STEP_DOWNLOADED) { @@ -201,8 +195,8 @@ internal class DefaultSyncTask @Inject constructor( } } - private suspend fun handleSyncFile(workingFile: File, initSyncStrategy: InitialSyncStrategy.Optimized): SyncResponse { - return logDuration("INIT_SYNC handleSyncFile()") { + private suspend fun handleSyncFile(workingFile: File, initSyncStrategy: InitialSyncStrategy.Optimized) { + logDuration("INIT_SYNC handleSyncFile()") { val syncResponse = logDuration("INIT_SYNC Read file and parse") { syncResponseParser.parse(initSyncStrategy, workingFile) } @@ -215,7 +209,7 @@ internal class DefaultSyncTask @Inject constructor( logDuration("INIT_SYNC Database insertion") { syncResponseHandler.handleResponse(syncResponse, null, initialSyncProgressService) } - syncResponse + initialSyncStatusRepository.setStep(InitialSyncStatus.STEP_SUCCESS) } } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/model/LazyRoomSyncEphemeral.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/model/LazyRoomSyncEphemeral.kt index 938168b5f4..83006c646b 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/model/LazyRoomSyncEphemeral.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/model/LazyRoomSyncEphemeral.kt @@ -16,28 +16,10 @@ package org.matrix.android.sdk.internal.session.sync.model -import com.squareup.moshi.JsonAdapter import com.squareup.moshi.JsonClass -import com.squareup.moshi.JsonReader -import okio.buffer -import okio.source -import java.io.File @JsonClass(generateAdapter = false) internal sealed class LazyRoomSyncEphemeral { data class Parsed(val _roomSyncEphemeral: RoomSyncEphemeral) : LazyRoomSyncEphemeral() - data class Stored(val roomSyncEphemeralAdapter: JsonAdapter, val file: File) : LazyRoomSyncEphemeral() - - val roomSyncEphemeral: RoomSyncEphemeral - get() { - return when (this) { - is Parsed -> _roomSyncEphemeral - is Stored -> { - // Parse the file now - file.inputStream().use { pos -> - roomSyncEphemeralAdapter.fromJson(JsonReader.of(pos.source().buffer()))!! - } - } - } - } + object Stored : LazyRoomSyncEphemeral() } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/parsing/DefaultLazyRoomSyncEphemeralJsonAdapter.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/parsing/DefaultLazyRoomSyncEphemeralJsonAdapter.kt index ef56802a66..22ac4f911d 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/parsing/DefaultLazyRoomSyncEphemeralJsonAdapter.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/parsing/DefaultLazyRoomSyncEphemeralJsonAdapter.kt @@ -22,11 +22,10 @@ import com.squareup.moshi.JsonReader import com.squareup.moshi.JsonWriter import com.squareup.moshi.ToJson import org.matrix.android.sdk.internal.session.sync.InitialSyncStrategy +import org.matrix.android.sdk.internal.session.sync.RoomSyncEphemeralTemporaryStore import org.matrix.android.sdk.internal.session.sync.model.LazyRoomSyncEphemeral import org.matrix.android.sdk.internal.session.sync.model.RoomSyncEphemeral import timber.log.Timber -import java.io.File -import java.util.concurrent.atomic.AtomicInteger internal class DefaultLazyRoomSyncEphemeralJsonAdapter { @@ -44,20 +43,15 @@ internal class DefaultLazyRoomSyncEphemeralJsonAdapter { } } -internal class SplitLazyRoomSyncJsonAdapter( - private val workingDirectory: File, +internal class SplitLazyRoomSyncEphemeralJsonAdapter( + private val roomSyncEphemeralTemporaryStore: RoomSyncEphemeralTemporaryStore, private val syncStrategy: InitialSyncStrategy.Optimized ) { - private val atomicInteger = AtomicInteger(0) - - private fun createFile(): File { - val index = atomicInteger.getAndIncrement() - return File(workingDirectory, "room_$index.json") - } - @FromJson fun fromJson(reader: JsonReader, delegate: JsonAdapter): LazyRoomSyncEphemeral? { val path = reader.path + val roomId = path.substringAfter("\$.rooms.join.").substringBeforeLast(".ephemeral") + val json = reader.nextSource().inputStream().bufferedReader().use { it.readText() } @@ -65,9 +59,8 @@ internal class SplitLazyRoomSyncJsonAdapter( return if (json.length > limit) { Timber.v("INIT_SYNC $path content length: ${json.length} copy to a file") // Copy the source to a file - val file = createFile() - file.writeText(json) - LazyRoomSyncEphemeral.Stored(delegate, file) + roomSyncEphemeralTemporaryStore.write(roomId, json) + LazyRoomSyncEphemeral.Stored } else { Timber.v("INIT_SYNC $path content length: ${json.length} parse it now") val roomSync = delegate.fromJson(json) ?: return null diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/parsing/InitialSyncResponseParser.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/parsing/InitialSyncResponseParser.kt index ae7b2a4468..bfa9974b77 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/parsing/InitialSyncResponseParser.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/parsing/InitialSyncResponseParser.kt @@ -20,29 +20,33 @@ import com.squareup.moshi.Moshi import okio.buffer import okio.source import org.matrix.android.sdk.internal.session.sync.InitialSyncStrategy +import org.matrix.android.sdk.internal.session.sync.RoomSyncEphemeralTemporaryStore import org.matrix.android.sdk.internal.session.sync.model.SyncResponse import timber.log.Timber import java.io.File import javax.inject.Inject -internal class InitialSyncResponseParser @Inject constructor(private val moshi: Moshi) { +internal class InitialSyncResponseParser @Inject constructor( + private val moshi: Moshi, + private val roomSyncEphemeralTemporaryStore: RoomSyncEphemeralTemporaryStore +) { fun parse(syncStrategy: InitialSyncStrategy.Optimized, workingFile: File): SyncResponse { val syncResponseLength = workingFile.length().toInt() Timber.v("INIT_SYNC Sync file size is $syncResponseLength bytes") val shouldSplit = syncResponseLength >= syncStrategy.minSizeToSplit Timber.v("INIT_SYNC should split in several files: $shouldSplit") - return getMoshi(syncStrategy, workingFile.parentFile!!, shouldSplit) + return getMoshi(syncStrategy, shouldSplit) .adapter(SyncResponse::class.java) .fromJson(workingFile.source().buffer())!! } - private fun getMoshi(syncStrategy: InitialSyncStrategy.Optimized, workingDirectory: File, shouldSplit: Boolean): Moshi { + private fun getMoshi(syncStrategy: InitialSyncStrategy.Optimized, shouldSplit: Boolean): Moshi { // If we don't have to split we'll rely on the already default moshi if (!shouldSplit) return moshi // Otherwise, we create a new adapter for handling Map of Lazy sync return moshi.newBuilder() - .add(SplitLazyRoomSyncJsonAdapter(workingDirectory, syncStrategy)) + .add(SplitLazyRoomSyncEphemeralJsonAdapter(roomSyncEphemeralTemporaryStore, syncStrategy)) .build() } }