Store Ephemeral in files to handle them later (no second transaction)

This commit is contained in:
Benoit Marty 2021-03-11 21:54:33 +01:00 committed by Benoit Marty
parent e8d4fab305
commit 3777b00ad7
10 changed files with 171 additions and 89 deletions

View file

@ -42,9 +42,9 @@ sealed class InitialSyncStrategy {
val minSizeToSplit: Long = 1024 * 1024,
/**
* Limit per room to reach to decide to store a join room ephemeral Events into a file
* Empiric value: 6 kilobytes
* Empiric value: 1 kilobytes
*/
val minSizeToStoreInFile: Long = 6 * 1024,
val minSizeToStoreInFile: Long = 1024,
/**
* Max number of rooms to insert at a time in database (to avoid too much RAM usage)
*/

View file

@ -16,12 +16,13 @@
package org.matrix.android.sdk.internal.session.sync
import io.realm.Realm
import org.matrix.android.sdk.api.session.events.model.EventType
import org.matrix.android.sdk.internal.database.model.ReadReceiptEntity
import org.matrix.android.sdk.internal.database.model.ReadReceiptsSummaryEntity
import org.matrix.android.sdk.internal.database.query.createUnmanaged
import org.matrix.android.sdk.internal.database.query.getOrCreate
import org.matrix.android.sdk.internal.database.query.where
import io.realm.Realm
import timber.log.Timber
import javax.inject.Inject
@ -35,7 +36,9 @@ typealias ReadReceiptContent = Map<String, Map<String, Map<String, Map<String, D
private const val READ_KEY = "m.read"
private const val TIMESTAMP_KEY = "ts"
internal class ReadReceiptHandler @Inject constructor() {
internal class ReadReceiptHandler @Inject constructor(
private val roomSyncEphemeralTemporaryStore: RoomSyncEphemeralTemporaryStore
) {
companion object {
@ -87,7 +90,57 @@ internal class ReadReceiptHandler @Inject constructor() {
realm.insertOrUpdate(readReceiptSummaries)
}
/**
* Example of content:
*
* <pre>
* {
* "type": "m.receipt",
* "content": {
* "$ofZhdeinmEReG_X-agD3J2TIhosEPkuvl62HJ8pVMMs": {
* "m.read": {
* "@benoit.marty:matrix.org": {
* "ts": 1610468193999
* }
* }
* },
* "$ZMa_qwE_w_ZOj_vAxv7JuJeHCQfYzuQblmIZxkYmNMs": {
* "m.read": {
* "@benoitx:matrix.org": {
* "ts": 1610468049579
* },
* "@benoit.marty:matrix.org": {
* "ts": 1609156029466
* }
* }
* }
* }
* }
* </pre>
*/
private fun incrementalSyncStrategy(realm: Realm, roomId: String, content: ReadReceiptContent) {
// First check if we have data from init sync to handle
// TODO Rename contentFromInitSync
val initSyncContent = roomSyncEphemeralTemporaryStore.read(roomId)
?.events
?.firstOrNull { it.type == EventType.RECEIPT }
?.let {
@Suppress("UNCHECKED_CAST")
it.content as? ReadReceiptContent
}
?.also {
Timber.w("INIT_SYNC Copy RR for room $roomId")
}
initSyncContent?.let {
Timber.w("BOOK Copy RR for room $roomId")
// TODO Merge with data we just received
// TODO Store that when we enter the timeline
initialSyncStrategy(realm, roomId, it)
roomSyncEphemeralTemporaryStore.delete(roomId)
}
for ((eventId, receiptDict) in content) {
val userIdsDict = receiptDict[READ_KEY] ?: continue
val readReceiptsSummary = ReadReceiptsSummaryEntity.where(realm, eventId).findFirst()

View file

@ -0,0 +1,77 @@
/*
* Copyright (c) 2021 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.internal.session.sync
import com.squareup.moshi.JsonReader
import okio.buffer
import okio.source
import org.matrix.android.sdk.internal.di.MoshiProvider
import org.matrix.android.sdk.internal.di.SessionFilesDirectory
import org.matrix.android.sdk.internal.session.sync.model.RoomSyncEphemeral
import org.matrix.android.sdk.internal.util.md5
import timber.log.Timber
import java.io.File
import javax.inject.Inject
internal interface RoomSyncEphemeralTemporaryStore {
fun write(roomId: String, roomSyncEphemeralJson: String)
fun read(roomId: String): RoomSyncEphemeral?
fun reset()
fun delete(roomId: String)
}
internal class RoomSyncEphemeralTemporaryStoreFile @Inject constructor(
@SessionFilesDirectory private val fileDirectory: File
) : RoomSyncEphemeralTemporaryStore {
private val workingDir = File(fileDirectory, "rr")
.also { it.mkdirs() }
/**
* Write RoomSyncEphemeral to a file
*/
override fun write(roomId: String, roomSyncEphemeralJson: String) {
Timber.w("INIT_SYNC Store RR for room $roomId")
getFile(roomId).writeText(roomSyncEphemeralJson)
}
/**
* Read RoomSyncEphemeral from a file, or null if there is no file to read
*/
override fun read(roomId: String): RoomSyncEphemeral? {
return getFile(roomId)
.takeIf { it.exists() }
?.inputStream()
?.use { pos ->
MoshiProvider.providesMoshi().adapter(RoomSyncEphemeral::class.java)
.fromJson(JsonReader.of(pos.source().buffer()))
}
}
override fun delete(roomId: String) {
getFile(roomId).delete()
}
override fun reset() {
workingDir.deleteRecursively()
workingDir.mkdirs()
}
private fun getFile(roomId: String): File {
return File(workingDir, "${roomId.md5()}.json")
}
}

View file

@ -60,6 +60,7 @@ import org.matrix.android.sdk.internal.session.room.timeline.PaginationDirection
import org.matrix.android.sdk.internal.session.room.timeline.TimelineInput
import org.matrix.android.sdk.internal.session.room.typing.TypingEventContent
import org.matrix.android.sdk.internal.session.sync.model.InvitedRoomSync
import org.matrix.android.sdk.internal.session.sync.model.LazyRoomSyncEphemeral
import org.matrix.android.sdk.internal.session.sync.model.RoomSync
import org.matrix.android.sdk.internal.session.sync.model.RoomSyncAccountData
import org.matrix.android.sdk.internal.session.sync.model.RoomsSyncResponse
@ -94,19 +95,6 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
handleRoomSync(realm, HandlingStrategy.LEFT(roomsSyncResponse.leave), isInitialSync, reporter)
}
fun handleInitSyncEphemeral(realm: Realm,
roomsSyncResponse: RoomsSyncResponse) {
roomsSyncResponse.join.forEach { roomSync ->
val ephemeralResult = roomSync.value.ephemeral
?.roomSyncEphemeral
?.events
?.takeIf { it.isNotEmpty() }
?.let { events -> handleEphemeral(realm, roomSync.key, events, true) }
roomTypingUsersHandler.handle(realm, roomSync.key, ephemeralResult)
}
}
// PRIVATE METHODS *****************************************************************************
private fun handleRoomSync(realm: Realm, handlingStrategy: HandlingStrategy, isInitialSync: Boolean, reporter: ProgressReporter?) {
@ -124,7 +112,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
emptyList()
} else {
handlingStrategy.data.mapWithProgress(reporter, InitSyncStep.ImportingAccountJoinedRooms, 0.6f) {
handleJoinedRoom(realm, it.key, it.value, true, insertType, syncLocalTimeStampMillis)
handleJoinedRoom(realm, it.key, it.value, insertType, syncLocalTimeStampMillis)
}
}
}
@ -165,7 +153,6 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
realm = realm,
roomId = it,
roomSync = handlingStrategy.data[it] ?: error("Should not happen"),
handleEphemeralEvents = false,
insertType = EventInsertType.INITIAL_SYNC,
syncLocalTimestampMillis = syncLocalTimeStampMillis
)
@ -177,7 +164,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
} else {
// No need to split
val rooms = handlingStrategy.data.mapWithProgress(reporter, InitSyncStep.ImportingAccountJoinedRooms, 0.6f) {
handleJoinedRoom(realm, it.key, it.value, false, EventInsertType.INITIAL_SYNC, syncLocalTimeStampMillis)
handleJoinedRoom(realm, it.key, it.value, EventInsertType.INITIAL_SYNC, syncLocalTimeStampMillis)
}
realm.insertOrUpdate(rooms)
}
@ -186,17 +173,15 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
private fun handleJoinedRoom(realm: Realm,
roomId: String,
roomSync: RoomSync,
handleEphemeralEvents: Boolean,
insertType: EventInsertType,
syncLocalTimestampMillis: Long): RoomEntity {
Timber.v("Handle join sync for room $roomId")
var ephemeralResult: EphemeralResult? = null
if (handleEphemeralEvents) {
ephemeralResult = roomSync.ephemeral?.roomSyncEphemeral?.events
val ephemeralResult = (roomSync.ephemeral as? LazyRoomSyncEphemeral.Parsed)
?._roomSyncEphemeral
?.events
?.takeIf { it.isNotEmpty() }
?.let { handleEphemeral(realm, roomId, it, insertType == EventInsertType.INITIAL_SYNC) }
}
if (roomSync.accountData?.events?.isNotEmpty() == true) {
handleRoomAccountDataEvents(realm, roomId, roomSync.accountData)

View file

@ -37,4 +37,7 @@ internal abstract class SyncModule {
@Binds
abstract fun bindSyncTask(task: DefaultSyncTask): SyncTask
@Binds
abstract fun bindRoomSyncEphemeralTemporaryStore(store: RoomSyncEphemeralTemporaryStoreFile): RoomSyncEphemeralTemporaryStore
}

View file

@ -128,15 +128,6 @@ internal class SyncResponseHandler @Inject constructor(@SessionDatabase private
cryptoSyncHandler.onSyncCompleted(syncResponse)
}
suspend fun handleInitSyncSecondTransaction(syncResponse: SyncResponse) {
// Start another transaction to handle the ephemeral events
monarchy.awaitTransaction { realm ->
if (syncResponse.rooms != null) {
roomSyncHandler.handleInitSyncEphemeral(realm, syncResponse.rooms)
}
}
}
/**
* At the moment we don't get any group data through the sync, so we poll where every hour.
* You can also force to refetch group data using [Group] API.

View file

@ -62,7 +62,8 @@ internal class DefaultSyncTask @Inject constructor(
private val globalErrorReceiver: GlobalErrorReceiver,
@SessionFilesDirectory
private val fileDirectory: File,
private val syncResponseParser: InitialSyncResponseParser
private val syncResponseParser: InitialSyncResponseParser,
private val roomSyncEphemeralTemporaryStore: RoomSyncEphemeralTemporaryStore
) : SyncTask {
private val workingDir = File(fileDirectory, "is")
@ -102,13 +103,16 @@ internal class DefaultSyncTask @Inject constructor(
if (isInitialSync) {
Timber.v("INIT_SYNC with filter: ${requestParams["filter"]}")
val initSyncStrategy = initialSyncStrategy
var syncResp: SyncResponse? = null
logDuration("INIT_SYNC strategy: $initSyncStrategy") {
if (initSyncStrategy is InitialSyncStrategy.Optimized) {
roomSyncEphemeralTemporaryStore.reset()
workingDir.mkdirs()
val file = downloadInitSyncResponse(requestParams)
syncResp = reportSubtask(initialSyncProgressService, InitSyncStep.ImportingAccount, 1, 0.7F) {
reportSubtask(initialSyncProgressService, InitSyncStep.ImportingAccount, 1, 0.7F) {
handleSyncFile(file, initSyncStrategy)
}
// Delete all files
workingDir.deleteRecursively()
} else {
val syncResponse = logDuration("INIT_SYNC Request") {
executeRequest<SyncResponse>(globalErrorReceiver) {
@ -125,15 +129,6 @@ internal class DefaultSyncTask @Inject constructor(
}
}
initialSyncProgressService.endAll()
if (initSyncStrategy is InitialSyncStrategy.Optimized) {
logDuration("INIT_SYNC Handle ephemeral") {
syncResponseHandler.handleInitSyncSecondTransaction(syncResp!!)
}
initialSyncStatusRepository.setStep(InitialSyncStatus.STEP_SUCCESS)
// Delete all files
workingDir.deleteRecursively()
}
} else {
val syncResponse = executeRequest<SyncResponse>(globalErrorReceiver) {
apiCall = syncAPI.sync(
@ -147,7 +142,6 @@ internal class DefaultSyncTask @Inject constructor(
}
private suspend fun downloadInitSyncResponse(requestParams: Map<String, String>): File {
workingDir.mkdirs()
val workingFile = File(workingDir, "initSync.json")
val status = initialSyncStatusRepository.getStep()
if (workingFile.exists() && status >= InitialSyncStatus.STEP_DOWNLOADED) {
@ -201,8 +195,8 @@ internal class DefaultSyncTask @Inject constructor(
}
}
private suspend fun handleSyncFile(workingFile: File, initSyncStrategy: InitialSyncStrategy.Optimized): SyncResponse {
return logDuration("INIT_SYNC handleSyncFile()") {
private suspend fun handleSyncFile(workingFile: File, initSyncStrategy: InitialSyncStrategy.Optimized) {
logDuration("INIT_SYNC handleSyncFile()") {
val syncResponse = logDuration("INIT_SYNC Read file and parse") {
syncResponseParser.parse(initSyncStrategy, workingFile)
}
@ -215,7 +209,7 @@ internal class DefaultSyncTask @Inject constructor(
logDuration("INIT_SYNC Database insertion") {
syncResponseHandler.handleResponse(syncResponse, null, initialSyncProgressService)
}
syncResponse
initialSyncStatusRepository.setStep(InitialSyncStatus.STEP_SUCCESS)
}
}

View file

@ -16,28 +16,10 @@
package org.matrix.android.sdk.internal.session.sync.model
import com.squareup.moshi.JsonAdapter
import com.squareup.moshi.JsonClass
import com.squareup.moshi.JsonReader
import okio.buffer
import okio.source
import java.io.File
@JsonClass(generateAdapter = false)
internal sealed class LazyRoomSyncEphemeral {
data class Parsed(val _roomSyncEphemeral: RoomSyncEphemeral) : LazyRoomSyncEphemeral()
data class Stored(val roomSyncEphemeralAdapter: JsonAdapter<RoomSyncEphemeral>, val file: File) : LazyRoomSyncEphemeral()
val roomSyncEphemeral: RoomSyncEphemeral
get() {
return when (this) {
is Parsed -> _roomSyncEphemeral
is Stored -> {
// Parse the file now
file.inputStream().use { pos ->
roomSyncEphemeralAdapter.fromJson(JsonReader.of(pos.source().buffer()))!!
}
}
}
}
object Stored : LazyRoomSyncEphemeral()
}

View file

@ -22,11 +22,10 @@ import com.squareup.moshi.JsonReader
import com.squareup.moshi.JsonWriter
import com.squareup.moshi.ToJson
import org.matrix.android.sdk.internal.session.sync.InitialSyncStrategy
import org.matrix.android.sdk.internal.session.sync.RoomSyncEphemeralTemporaryStore
import org.matrix.android.sdk.internal.session.sync.model.LazyRoomSyncEphemeral
import org.matrix.android.sdk.internal.session.sync.model.RoomSyncEphemeral
import timber.log.Timber
import java.io.File
import java.util.concurrent.atomic.AtomicInteger
internal class DefaultLazyRoomSyncEphemeralJsonAdapter {
@ -44,20 +43,15 @@ internal class DefaultLazyRoomSyncEphemeralJsonAdapter {
}
}
internal class SplitLazyRoomSyncJsonAdapter(
private val workingDirectory: File,
internal class SplitLazyRoomSyncEphemeralJsonAdapter(
private val roomSyncEphemeralTemporaryStore: RoomSyncEphemeralTemporaryStore,
private val syncStrategy: InitialSyncStrategy.Optimized
) {
private val atomicInteger = AtomicInteger(0)
private fun createFile(): File {
val index = atomicInteger.getAndIncrement()
return File(workingDirectory, "room_$index.json")
}
@FromJson
fun fromJson(reader: JsonReader, delegate: JsonAdapter<RoomSyncEphemeral>): LazyRoomSyncEphemeral? {
val path = reader.path
val roomId = path.substringAfter("\$.rooms.join.").substringBeforeLast(".ephemeral")
val json = reader.nextSource().inputStream().bufferedReader().use {
it.readText()
}
@ -65,9 +59,8 @@ internal class SplitLazyRoomSyncJsonAdapter(
return if (json.length > limit) {
Timber.v("INIT_SYNC $path content length: ${json.length} copy to a file")
// Copy the source to a file
val file = createFile()
file.writeText(json)
LazyRoomSyncEphemeral.Stored(delegate, file)
roomSyncEphemeralTemporaryStore.write(roomId, json)
LazyRoomSyncEphemeral.Stored
} else {
Timber.v("INIT_SYNC $path content length: ${json.length} parse it now")
val roomSync = delegate.fromJson(json) ?: return null

View file

@ -20,29 +20,33 @@ import com.squareup.moshi.Moshi
import okio.buffer
import okio.source
import org.matrix.android.sdk.internal.session.sync.InitialSyncStrategy
import org.matrix.android.sdk.internal.session.sync.RoomSyncEphemeralTemporaryStore
import org.matrix.android.sdk.internal.session.sync.model.SyncResponse
import timber.log.Timber
import java.io.File
import javax.inject.Inject
internal class InitialSyncResponseParser @Inject constructor(private val moshi: Moshi) {
internal class InitialSyncResponseParser @Inject constructor(
private val moshi: Moshi,
private val roomSyncEphemeralTemporaryStore: RoomSyncEphemeralTemporaryStore
) {
fun parse(syncStrategy: InitialSyncStrategy.Optimized, workingFile: File): SyncResponse {
val syncResponseLength = workingFile.length().toInt()
Timber.v("INIT_SYNC Sync file size is $syncResponseLength bytes")
val shouldSplit = syncResponseLength >= syncStrategy.minSizeToSplit
Timber.v("INIT_SYNC should split in several files: $shouldSplit")
return getMoshi(syncStrategy, workingFile.parentFile!!, shouldSplit)
return getMoshi(syncStrategy, shouldSplit)
.adapter(SyncResponse::class.java)
.fromJson(workingFile.source().buffer())!!
}
private fun getMoshi(syncStrategy: InitialSyncStrategy.Optimized, workingDirectory: File, shouldSplit: Boolean): Moshi {
private fun getMoshi(syncStrategy: InitialSyncStrategy.Optimized, shouldSplit: Boolean): Moshi {
// If we don't have to split we'll rely on the already default moshi
if (!shouldSplit) return moshi
// Otherwise, we create a new adapter for handling Map of Lazy sync
return moshi.newBuilder()
.add(SplitLazyRoomSyncJsonAdapter(workingDirectory, syncStrategy))
.add(SplitLazyRoomSyncEphemeralJsonAdapter(roomSyncEphemeralTemporaryStore, syncStrategy))
.build()
}
}