Ganfra's review: delete files after the Realm transaction

This commit is contained in:
Benoit Marty 2021-03-16 13:28:36 +01:00
parent e8bb3d81ed
commit cb17fa60dc
8 changed files with 133 additions and 35 deletions

View file

@ -117,7 +117,7 @@ internal class DefaultSetReadMarkersTask @Inject constructor(
} }
if (readReceiptId != null) { if (readReceiptId != null) {
val readReceiptContent = ReadReceiptHandler.createContent(userId, readReceiptId) val readReceiptContent = ReadReceiptHandler.createContent(userId, readReceiptId)
readReceiptHandler.handle(realm, roomId, readReceiptContent, false) readReceiptHandler.handle(realm, roomId, readReceiptContent, false, null)
} }
if (shouldUpdateRoomSummary) { if (shouldUpdateRoomSummary) {
val roomSummary = RoomSummaryEntity.where(realm, roomId).findFirst() val roomSummary = RoomSummaryEntity.where(realm, roomId).findFirst()

View file

@ -199,7 +199,8 @@ internal class DefaultTimeline(
} }
?.let { readReceiptContent -> ?.let { readReceiptContent ->
realm.executeTransactionAsync { realm.executeTransactionAsync {
readReceiptHandler.handle(it, roomId, readReceiptContent, false) readReceiptHandler.handle(it, roomId, readReceiptContent, false, null)
readReceiptHandler.onContentFromInitSyncHandled(roomId)
} }
} }
} }

View file

@ -55,21 +55,29 @@ internal class ReadReceiptHandler @Inject constructor(
} }
} }
fun handle(realm: Realm, roomId: String, content: ReadReceiptContent?, isInitialSync: Boolean) { fun handle(realm: Realm,
roomId: String,
content: ReadReceiptContent?,
isInitialSync: Boolean,
aggregator: SyncResponsePostTreatmentAggregator?) {
content ?: return content ?: return
try { try {
handleReadReceiptContent(realm, roomId, content, isInitialSync) handleReadReceiptContent(realm, roomId, content, isInitialSync, aggregator)
} catch (exception: Exception) { } catch (exception: Exception) {
Timber.e("Fail to handle read receipt for room $roomId") Timber.e("Fail to handle read receipt for room $roomId")
} }
} }
private fun handleReadReceiptContent(realm: Realm, roomId: String, content: ReadReceiptContent, isInitialSync: Boolean) { private fun handleReadReceiptContent(realm: Realm,
roomId: String,
content: ReadReceiptContent,
isInitialSync: Boolean,
aggregator: SyncResponsePostTreatmentAggregator?) {
if (isInitialSync) { if (isInitialSync) {
initialSyncStrategy(realm, roomId, content) initialSyncStrategy(realm, roomId, content)
} else { } else {
incrementalSyncStrategy(realm, roomId, content) incrementalSyncStrategy(realm, roomId, content, aggregator)
} }
} }
@ -89,11 +97,15 @@ internal class ReadReceiptHandler @Inject constructor(
realm.insertOrUpdate(readReceiptSummaries) realm.insertOrUpdate(readReceiptSummaries)
} }
private fun incrementalSyncStrategy(realm: Realm, roomId: String, content: ReadReceiptContent) { private fun incrementalSyncStrategy(realm: Realm,
roomId: String,
content: ReadReceiptContent,
aggregator: SyncResponsePostTreatmentAggregator?) {
// First check if we have data from init sync to handle // First check if we have data from init sync to handle
getContentFromInitSync(roomId)?.let { getContentFromInitSync(roomId)?.let {
Timber.w("INIT_SYNC Insert during incremental sync RR for room $roomId") Timber.w("INIT_SYNC Insert during incremental sync RR for room $roomId")
doIncrementalSyncStrategy(realm, roomId, it) doIncrementalSyncStrategy(realm, roomId, it)
aggregator?.ephemeralFilesToDelete?.add(roomId)
} }
doIncrementalSyncStrategy(realm, roomId, content) doIncrementalSyncStrategy(realm, roomId, content)
@ -124,11 +136,25 @@ internal class ReadReceiptHandler @Inject constructor(
} }
fun getContentFromInitSync(roomId: String): ReadReceiptContent? { fun getContentFromInitSync(roomId: String): ReadReceiptContent? {
val dataFromFile = roomSyncEphemeralTemporaryStore.read(roomId)
dataFromFile ?: return null
@Suppress("UNCHECKED_CAST") @Suppress("UNCHECKED_CAST")
return roomSyncEphemeralTemporaryStore.read(roomId) val content = dataFromFile
?.also { roomSyncEphemeralTemporaryStore.delete(roomId) } .events
?.events .firstOrNull { it.type == EventType.RECEIPT }
?.firstOrNull { it.type == EventType.RECEIPT }
?.content as? ReadReceiptContent ?.content as? ReadReceiptContent
if (content == null) {
// We can delete the file now
roomSyncEphemeralTemporaryStore.delete(roomId)
}
return content
}
fun onContentFromInitSyncHandled(roomId: String) {
roomSyncEphemeralTemporaryStore.delete(roomId)
} }
} }

View file

@ -88,16 +88,21 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
fun handle(realm: Realm, fun handle(realm: Realm,
roomsSyncResponse: RoomsSyncResponse, roomsSyncResponse: RoomsSyncResponse,
isInitialSync: Boolean, isInitialSync: Boolean,
aggregator: SyncResponsePostTreatmentAggregator,
reporter: ProgressReporter? = null) { reporter: ProgressReporter? = null) {
Timber.v("Execute transaction from $this") Timber.v("Execute transaction from $this")
handleRoomSync(realm, HandlingStrategy.JOINED(roomsSyncResponse.join), isInitialSync, reporter) handleRoomSync(realm, HandlingStrategy.JOINED(roomsSyncResponse.join), isInitialSync, aggregator, reporter)
handleRoomSync(realm, HandlingStrategy.INVITED(roomsSyncResponse.invite), isInitialSync, reporter) handleRoomSync(realm, HandlingStrategy.INVITED(roomsSyncResponse.invite), isInitialSync, aggregator, reporter)
handleRoomSync(realm, HandlingStrategy.LEFT(roomsSyncResponse.leave), isInitialSync, reporter) handleRoomSync(realm, HandlingStrategy.LEFT(roomsSyncResponse.leave), isInitialSync, aggregator,reporter)
} }
// PRIVATE METHODS ***************************************************************************** // PRIVATE METHODS *****************************************************************************
private fun handleRoomSync(realm: Realm, handlingStrategy: HandlingStrategy, isInitialSync: Boolean, reporter: ProgressReporter?) { private fun handleRoomSync(realm: Realm,
handlingStrategy: HandlingStrategy,
isInitialSync: Boolean,
aggregator: SyncResponsePostTreatmentAggregator,
reporter: ProgressReporter?) {
val insertType = if (isInitialSync) { val insertType = if (isInitialSync) {
EventInsertType.INITIAL_SYNC EventInsertType.INITIAL_SYNC
} else { } else {
@ -107,12 +112,12 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
val rooms = when (handlingStrategy) { val rooms = when (handlingStrategy) {
is HandlingStrategy.JOINED -> { is HandlingStrategy.JOINED -> {
if (isInitialSync && initialSyncStrategy is InitialSyncStrategy.Optimized) { if (isInitialSync && initialSyncStrategy is InitialSyncStrategy.Optimized) {
insertJoinRoomsFromInitSync(realm, handlingStrategy, syncLocalTimeStampMillis, reporter) insertJoinRoomsFromInitSync(realm, handlingStrategy, syncLocalTimeStampMillis, aggregator, reporter)
// Rooms are already inserted, return an empty list // Rooms are already inserted, return an empty list
emptyList() emptyList()
} else { } else {
handlingStrategy.data.mapWithProgress(reporter, InitSyncStep.ImportingAccountJoinedRooms, 0.6f) { handlingStrategy.data.mapWithProgress(reporter, InitSyncStep.ImportingAccountJoinedRooms, 0.6f) {
handleJoinedRoom(realm, it.key, it.value, insertType, syncLocalTimeStampMillis) handleJoinedRoom(realm, it.key, it.value, insertType, syncLocalTimeStampMillis, aggregator)
} }
} }
} }
@ -133,6 +138,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
private fun insertJoinRoomsFromInitSync(realm: Realm, private fun insertJoinRoomsFromInitSync(realm: Realm,
handlingStrategy: HandlingStrategy.JOINED, handlingStrategy: HandlingStrategy.JOINED,
syncLocalTimeStampMillis: Long, syncLocalTimeStampMillis: Long,
aggregator: SyncResponsePostTreatmentAggregator,
reporter: ProgressReporter?) { reporter: ProgressReporter?) {
val maxSize = (initialSyncStrategy as? InitialSyncStrategy.Optimized)?.maxRoomsToInsert ?: Int.MAX_VALUE val maxSize = (initialSyncStrategy as? InitialSyncStrategy.Optimized)?.maxRoomsToInsert ?: Int.MAX_VALUE
val listSize = handlingStrategy.data.keys.size val listSize = handlingStrategy.data.keys.size
@ -154,7 +160,8 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
roomId = it, roomId = it,
roomSync = handlingStrategy.data[it] ?: error("Should not happen"), roomSync = handlingStrategy.data[it] ?: error("Should not happen"),
insertType = EventInsertType.INITIAL_SYNC, insertType = EventInsertType.INITIAL_SYNC,
syncLocalTimestampMillis = syncLocalTimeStampMillis syncLocalTimestampMillis = syncLocalTimeStampMillis,
aggregator
) )
} }
realm.insertOrUpdate(roomEntities) realm.insertOrUpdate(roomEntities)
@ -164,7 +171,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
} else { } else {
// No need to split // No need to split
val rooms = handlingStrategy.data.mapWithProgress(reporter, InitSyncStep.ImportingAccountJoinedRooms, 0.6f) { val rooms = handlingStrategy.data.mapWithProgress(reporter, InitSyncStep.ImportingAccountJoinedRooms, 0.6f) {
handleJoinedRoom(realm, it.key, it.value, EventInsertType.INITIAL_SYNC, syncLocalTimeStampMillis) handleJoinedRoom(realm, it.key, it.value, EventInsertType.INITIAL_SYNC, syncLocalTimeStampMillis, aggregator)
} }
realm.insertOrUpdate(rooms) realm.insertOrUpdate(rooms)
} }
@ -174,14 +181,15 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
roomId: String, roomId: String,
roomSync: RoomSync, roomSync: RoomSync,
insertType: EventInsertType, insertType: EventInsertType,
syncLocalTimestampMillis: Long): RoomEntity { syncLocalTimestampMillis: Long,
aggregator: SyncResponsePostTreatmentAggregator): RoomEntity {
Timber.v("Handle join sync for room $roomId") Timber.v("Handle join sync for room $roomId")
val ephemeralResult = (roomSync.ephemeral as? LazyRoomSyncEphemeral.Parsed) val ephemeralResult = (roomSync.ephemeral as? LazyRoomSyncEphemeral.Parsed)
?._roomSyncEphemeral ?._roomSyncEphemeral
?.events ?.events
?.takeIf { it.isNotEmpty() } ?.takeIf { it.isNotEmpty() }
?.let { handleEphemeral(realm, roomId, it, insertType == EventInsertType.INITIAL_SYNC) } ?.let { handleEphemeral(realm, roomId, it, insertType == EventInsertType.INITIAL_SYNC, aggregator) }
if (roomSync.accountData?.events?.isNotEmpty() == true) { if (roomSync.accountData?.events?.isNotEmpty() == true) {
handleRoomAccountDataEvents(realm, roomId, roomSync.accountData) handleRoomAccountDataEvents(realm, roomId, roomSync.accountData)
@ -421,14 +429,15 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
private fun handleEphemeral(realm: Realm, private fun handleEphemeral(realm: Realm,
roomId: String, roomId: String,
ephemeralEvents: List<Event>, ephemeralEvents: List<Event>,
isInitialSync: Boolean): EphemeralResult { isInitialSync: Boolean,
aggregator: SyncResponsePostTreatmentAggregator): EphemeralResult {
var result = EphemeralResult() var result = EphemeralResult()
for (event in ephemeralEvents) { for (event in ephemeralEvents) {
when (event.type) { when (event.type) {
EventType.RECEIPT -> { EventType.RECEIPT -> {
@Suppress("UNCHECKED_CAST") @Suppress("UNCHECKED_CAST")
(event.content as? ReadReceiptContent)?.let { readReceiptContent -> (event.content as? ReadReceiptContent)?.let { readReceiptContent ->
readReceiptHandler.handle(realm, roomId, readReceiptContent, isInitialSync) readReceiptHandler.handle(realm, roomId, readReceiptContent, isInitialSync, aggregator)
} }
} }
EventType.TYPING -> { EventType.TYPING -> {

View file

@ -26,6 +26,7 @@ import javax.inject.Inject
internal class RoomTypingUsersHandler @Inject constructor(@UserId private val userId: String, internal class RoomTypingUsersHandler @Inject constructor(@UserId private val userId: String,
private val typingUsersTracker: DefaultTypingUsersTracker) { private val typingUsersTracker: DefaultTypingUsersTracker) {
// TODO This could be handled outside of the Realm transaction. Use the new aggregator?
fun handle(realm: Realm, roomId: String, ephemeralResult: RoomSyncHandler.EphemeralResult?) { fun handle(realm: Realm, roomId: String, ephemeralResult: RoomSyncHandler.EphemeralResult?) {
val roomMemberHelper = RoomMemberHelper(realm, roomId) val roomMemberHelper = RoomMemberHelper(realm, roomId)
val typingIds = ephemeralResult?.typingUserIds?.filter { it != userId }.orEmpty() val typingIds = ephemeralResult?.typingUserIds?.filter { it != userId }.orEmpty()

View file

@ -41,17 +41,19 @@ import kotlin.system.measureTimeMillis
private const val GET_GROUP_DATA_WORKER = "GET_GROUP_DATA_WORKER" private const val GET_GROUP_DATA_WORKER = "GET_GROUP_DATA_WORKER"
internal class SyncResponseHandler @Inject constructor(@SessionDatabase private val monarchy: Monarchy, internal class SyncResponseHandler @Inject constructor(
@SessionId private val sessionId: String, @SessionDatabase private val monarchy: Monarchy,
private val workManagerProvider: WorkManagerProvider, @SessionId private val sessionId: String,
private val roomSyncHandler: RoomSyncHandler, private val workManagerProvider: WorkManagerProvider,
private val userAccountDataSyncHandler: UserAccountDataSyncHandler, private val roomSyncHandler: RoomSyncHandler,
private val groupSyncHandler: GroupSyncHandler, private val userAccountDataSyncHandler: UserAccountDataSyncHandler,
private val cryptoSyncHandler: CryptoSyncHandler, private val groupSyncHandler: GroupSyncHandler,
private val cryptoService: DefaultCryptoService, private val cryptoSyncHandler: CryptoSyncHandler,
private val tokenStore: SyncTokenStore, private val aggregatorHandler: SyncResponsePostTreatmentAggregatorHandler,
private val processEventForPushTask: ProcessEventForPushTask, private val cryptoService: DefaultCryptoService,
private val pushRuleService: PushRuleService) { private val tokenStore: SyncTokenStore,
private val processEventForPushTask: ProcessEventForPushTask,
private val pushRuleService: PushRuleService) {
suspend fun handleResponse(syncResponse: SyncResponse, suspend fun handleResponse(syncResponse: SyncResponse,
fromToken: String?, fromToken: String?,
@ -81,13 +83,14 @@ internal class SyncResponseHandler @Inject constructor(@SessionDatabase private
}.also { }.also {
Timber.v("Finish handling toDevice in $it ms") Timber.v("Finish handling toDevice in $it ms")
} }
val aggregator = SyncResponsePostTreatmentAggregator()
// Start one big transaction // Start one big transaction
monarchy.awaitTransaction { realm -> monarchy.awaitTransaction { realm ->
measureTimeMillis { measureTimeMillis {
Timber.v("Handle rooms") Timber.v("Handle rooms")
reportSubtask(reporter, InitSyncStep.ImportingAccountRoom, 1, 0.7f) { reportSubtask(reporter, InitSyncStep.ImportingAccountRoom, 1, 0.7f) {
if (syncResponse.rooms != null) { if (syncResponse.rooms != null) {
roomSyncHandler.handle(realm, syncResponse.rooms, isInitialSync, reporter) roomSyncHandler.handle(realm, syncResponse.rooms, isInitialSync, aggregator,reporter)
} }
} }
}.also { }.also {
@ -115,7 +118,10 @@ internal class SyncResponseHandler @Inject constructor(@SessionDatabase private
} }
tokenStore.saveToken(realm, syncResponse.nextBatch) tokenStore.saveToken(realm, syncResponse.nextBatch)
} }
// Everything else we need to do outside the transaction // Everything else we need to do outside the transaction
aggregatorHandler.handle(aggregator)
syncResponse.rooms?.let { syncResponse.rooms?.let {
checkPushRules(it, isInitialSync) checkPushRules(it, isInitialSync)
userAccountDataSyncHandler.synchronizeWithServerIfNeeded(it.invite) userAccountDataSyncHandler.synchronizeWithServerIfNeeded(it.invite)

View file

@ -0,0 +1,22 @@
/*
* Copyright (c) 2021 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.internal.session.sync
internal class SyncResponsePostTreatmentAggregator {
// List of RoomId
val ephemeralFilesToDelete = mutableListOf<String>()
}

View file

@ -0,0 +1,33 @@
/*
* Copyright (c) 2021 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.internal.session.sync
import javax.inject.Inject
internal class SyncResponsePostTreatmentAggregatorHandler @Inject constructor(
private val ephemeralTemporaryStore: RoomSyncEphemeralTemporaryStore
) {
fun handle(synResHaResponsePostTreatmentAggregator: SyncResponsePostTreatmentAggregator) {
cleanupEphemeralFiles(synResHaResponsePostTreatmentAggregator.ephemeralFilesToDelete)
}
private fun cleanupEphemeralFiles(ephemeralFilesToDelete: List<String>) {
ephemeralFilesToDelete.forEach {
ephemeralTemporaryStore.delete(it)
}
}
}