Merge pull request #2674 from vector-im/feature/bma/init_sync_poc

Init sync optimization
This commit is contained in:
Benoit Marty 2021-02-23 11:23:52 +01:00 committed by GitHub
commit 9db644afa2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
32 changed files with 898 additions and 204 deletions

View file

@ -6,6 +6,7 @@ Features ✨:
Improvements 🙌:
- Fetch homeserver type and version and display in a new setting screen and add info in rageshakes (#2831)
- Improve initial sync performance (#983)
Bugfix 🐛:
-

View file

@ -35,6 +35,7 @@ import org.matrix.android.sdk.api.session.file.FileService
import org.matrix.android.sdk.api.session.group.GroupService
import org.matrix.android.sdk.api.session.homeserver.HomeServerCapabilitiesService
import org.matrix.android.sdk.api.session.identity.IdentityService
import org.matrix.android.sdk.api.session.initsync.InitialSyncProgressService
import org.matrix.android.sdk.api.session.integrationmanager.IntegrationManagerService
import org.matrix.android.sdk.api.session.media.MediaService
import org.matrix.android.sdk.api.session.permalinks.PermalinkService

View file

@ -5,7 +5,7 @@
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.api.session
package org.matrix.android.sdk.api.session.initsync
import androidx.annotation.StringRes
import androidx.lifecycle.LiveData

View file

@ -14,9 +14,9 @@
* limitations under the License.
*/
package org.matrix.android.sdk.internal.database.model;
package org.matrix.android.sdk.internal.database.model
public enum EventInsertType {
internal enum class EventInsertType {
INITIAL_SYNC,
INCREMENTAL_SYNC,
PAGINATION,

View file

@ -36,6 +36,7 @@ import org.matrix.android.sdk.internal.network.parsing.ForceToBooleanJsonAdapter
import org.matrix.android.sdk.internal.network.parsing.RuntimeJsonAdapterFactory
import org.matrix.android.sdk.internal.network.parsing.TlsVersionMoshiAdapter
import org.matrix.android.sdk.internal.network.parsing.UriMoshiAdapter
import org.matrix.android.sdk.internal.session.sync.parsing.DefaultLazyRoomSyncJsonAdapter
object MoshiProvider {
@ -44,6 +45,8 @@ object MoshiProvider {
.add(ForceToBooleanJsonAdapter())
.add(CipherSuiteMoshiAdapter())
.add(TlsVersionMoshiAdapter())
// Use addLast here so we can inject a SplitLazyRoomSyncJsonAdapter later to override the default parsing.
.addLast(DefaultLazyRoomSyncJsonAdapter())
.add(RuntimeJsonAdapterFactory.of(MessageContent::class.java, "msgtype", MessageDefaultContent::class.java)
.registerSubtype(MessageTextContent::class.java, MessageType.MSGTYPE_TEXT)
.registerSubtype(MessageNoticeContent::class.java, MessageType.MSGTYPE_NOTICE)

View file

@ -1,136 +0,0 @@
/*
* Copyright 2020 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.internal.session
import androidx.annotation.StringRes
import androidx.lifecycle.LiveData
import androidx.lifecycle.MutableLiveData
import org.matrix.android.sdk.api.session.InitialSyncProgressService
import timber.log.Timber
import javax.inject.Inject
@SessionScope
class DefaultInitialSyncProgressService @Inject constructor() : InitialSyncProgressService {
private val status = MutableLiveData<InitialSyncProgressService.Status>()
private var rootTask: TaskInfo? = null
override fun getInitialSyncProgressStatus(): LiveData<InitialSyncProgressService.Status> {
return status
}
fun startTask(@StringRes nameRes: Int, totalProgress: Int, parentWeight: Float = 1f) {
// Create a rootTask, or add a child to the leaf
if (rootTask == null) {
rootTask = TaskInfo(nameRes, totalProgress)
} else {
val currentLeaf = rootTask!!.leaf()
val newTask = TaskInfo(nameRes,
totalProgress,
currentLeaf,
parentWeight)
currentLeaf.child = newTask
}
reportProgress(0)
}
fun reportProgress(progress: Int) {
rootTask?.leaf()?.setProgress(progress)
}
fun endTask(nameRes: Int) {
val endedTask = rootTask?.leaf()
if (endedTask?.nameRes == nameRes) {
// close it
val parent = endedTask.parent
parent?.child = null
parent?.setProgress(endedTask.offset + (endedTask.totalProgress * endedTask.parentWeight).toInt())
}
if (endedTask?.parent == null) {
status.postValue(InitialSyncProgressService.Status.Idle)
}
}
fun endAll() {
rootTask = null
status.postValue(InitialSyncProgressService.Status.Idle)
}
private inner class TaskInfo(@StringRes var nameRes: Int,
var totalProgress: Int,
var parent: TaskInfo? = null,
var parentWeight: Float = 1f,
var offset: Int = parent?.currentProgress ?: 0) {
var child: TaskInfo? = null
var currentProgress: Int = 0
/**
* Get the further child
*/
fun leaf(): TaskInfo {
var last = this
while (last.child != null) {
last = last.child!!
}
return last
}
/**
* Set progress of the parent if any (which will post value), or post the value
*/
fun setProgress(progress: Int) {
currentProgress = progress
// val newProgress = Math.min(currentProgress + progress, totalProgress)
parent?.let {
val parentProgress = (currentProgress * parentWeight).toInt()
it.setProgress(offset + parentProgress)
} ?: run {
Timber.v("--- ${leaf().nameRes}: $currentProgress")
status.postValue(InitialSyncProgressService.Status.Progressing(leaf().nameRes, currentProgress))
}
}
}
}
inline fun <T> reportSubtask(reporter: DefaultInitialSyncProgressService?,
@StringRes nameRes: Int,
totalProgress: Int,
parentWeight: Float = 1f,
block: () -> T): T {
reporter?.startTask(nameRes, totalProgress, parentWeight)
return block().also {
reporter?.endTask(nameRes)
}
}
inline fun <K, V, R> Map<out K, V>.mapWithProgress(reporter: DefaultInitialSyncProgressService?,
taskId: Int,
weight: Float,
transform: (Map.Entry<K, V>) -> R): List<R> {
val total = count().toFloat()
var current = 0
reporter?.startTask(taskId, 100, weight)
return map {
reporter?.reportProgress((current / total * 100).toInt())
current++
transform.invoke(it)
}.also {
reporter?.endTask(taskId)
}
}

View file

@ -24,7 +24,7 @@ import org.matrix.android.sdk.api.auth.data.SessionParams
import org.matrix.android.sdk.api.failure.GlobalError
import org.matrix.android.sdk.api.federation.FederationService
import org.matrix.android.sdk.api.pushrules.PushRuleService
import org.matrix.android.sdk.api.session.InitialSyncProgressService
import org.matrix.android.sdk.api.session.initsync.InitialSyncProgressService
import org.matrix.android.sdk.api.session.Session
import org.matrix.android.sdk.api.session.account.AccountService
import org.matrix.android.sdk.api.session.accountdata.AccountDataService

View file

@ -32,7 +32,7 @@ import org.matrix.android.sdk.api.auth.data.HomeServerConnectionConfig
import org.matrix.android.sdk.api.auth.data.SessionParams
import org.matrix.android.sdk.api.auth.data.sessionId
import org.matrix.android.sdk.api.crypto.MXCryptoConfig
import org.matrix.android.sdk.api.session.InitialSyncProgressService
import org.matrix.android.sdk.api.session.initsync.InitialSyncProgressService
import org.matrix.android.sdk.api.session.Session
import org.matrix.android.sdk.api.session.accountdata.AccountDataService
import org.matrix.android.sdk.api.session.homeserver.HomeServerCapabilitiesService
@ -77,6 +77,7 @@ import org.matrix.android.sdk.internal.session.call.CallEventProcessor
import org.matrix.android.sdk.internal.session.download.DownloadProgressInterceptor
import org.matrix.android.sdk.internal.session.homeserver.DefaultHomeServerCapabilitiesService
import org.matrix.android.sdk.internal.session.identity.DefaultIdentityService
import org.matrix.android.sdk.internal.session.initsync.DefaultInitialSyncProgressService
import org.matrix.android.sdk.internal.session.integrationmanager.IntegrationManager
import org.matrix.android.sdk.internal.session.permalinks.DefaultPermalinkService
import org.matrix.android.sdk.internal.session.room.EventRelationsAggregationProcessor

View file

@ -0,0 +1,94 @@
/*
* Copyright 2020 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.internal.session.initsync
import androidx.annotation.StringRes
import androidx.lifecycle.LiveData
import androidx.lifecycle.MutableLiveData
import org.matrix.android.sdk.api.session.initsync.InitialSyncProgressService
import org.matrix.android.sdk.internal.session.SessionScope
import javax.inject.Inject
@SessionScope
internal class DefaultInitialSyncProgressService @Inject constructor()
: InitialSyncProgressService,
ProgressReporter {
private val status = MutableLiveData<InitialSyncProgressService.Status>()
private var rootTask: TaskInfo? = null
override fun getInitialSyncProgressStatus(): LiveData<InitialSyncProgressService.Status> {
return status
}
/**
* Create a rootTask
*/
fun startRoot(@StringRes nameRes: Int,
totalProgress: Int) {
endAll()
rootTask = TaskInfo(nameRes, totalProgress, null, 1F)
reportProgress(0F)
}
/**
* Add a child to the leaf
*/
override fun startTask(@StringRes nameRes: Int,
totalProgress: Int,
parentWeight: Float) {
val currentLeaf = rootTask?.leaf() ?: return
currentLeaf.child = TaskInfo(
nameRes = nameRes,
totalProgress = totalProgress,
parent = currentLeaf,
parentWeight = parentWeight
)
reportProgress(0F)
}
override fun reportProgress(progress: Float) {
rootTask?.let { root ->
root.leaf().let { leaf ->
// Update the progress of the leaf and all its parents
leaf.setProgress(progress)
// Then update the live data using leaf wording and root progress
status.postValue(InitialSyncProgressService.Status.Progressing(leaf.nameRes, root.currentProgress.toInt()))
}
}
}
override fun endTask() {
rootTask?.leaf()?.let { endedTask ->
// Ensure the task progress is complete
reportProgress(endedTask.totalProgress.toFloat())
endedTask.parent?.child = null
if (endedTask.parent != null) {
// And close it
endedTask.parent.child = null
} else {
status.postValue(InitialSyncProgressService.Status.Idle)
}
}
}
fun endAll() {
rootTask = null
status.postValue(InitialSyncProgressService.Status.Idle)
}
}

View file

@ -0,0 +1,45 @@
/*
* Copyright 2021 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.internal.session.initsync
import androidx.annotation.StringRes
internal inline fun <T> reportSubtask(reporter: ProgressReporter?,
@StringRes nameRes: Int,
totalProgress: Int,
parentWeight: Float,
block: () -> T): T {
reporter?.startTask(nameRes, totalProgress, parentWeight)
return block().also {
reporter?.endTask()
}
}
internal inline fun <K, V, R> Map<out K, V>.mapWithProgress(reporter: ProgressReporter?,
@StringRes nameRes: Int,
parentWeight: Float,
transform: (Map.Entry<K, V>) -> R): List<R> {
var current = 0F
reporter?.startTask(nameRes, count() + 1, parentWeight)
return map {
reporter?.reportProgress(current)
current++
transform.invoke(it)
}.also {
reporter?.endTask()
}
}

View file

@ -0,0 +1,29 @@
/*
* Copyright 2021 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.internal.session.initsync
import androidx.annotation.StringRes
internal interface ProgressReporter {
fun startTask(@StringRes nameRes: Int,
totalProgress: Int,
parentWeight: Float)
fun reportProgress(progress: Float)
fun endTask()
}

View file

@ -0,0 +1,54 @@
/*
* Copyright 2021 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.internal.session.initsync
import androidx.annotation.StringRes
import timber.log.Timber
internal class TaskInfo(@StringRes val nameRes: Int,
val totalProgress: Int,
val parent: TaskInfo?,
val parentWeight: Float) {
var child: TaskInfo? = null
var currentProgress = 0F
private set
private val offset = parent?.currentProgress ?: 0F
/**
* Get the further child
*/
fun leaf(): TaskInfo {
var last = this
while (last.child != null) {
last = last.child!!
}
return last
}
/**
* Set progress of this task and update the parent progress iteratively
*/
fun setProgress(progress: Float) {
Timber.v("setProgress: $progress / $totalProgress")
currentProgress = progress
parent?.let {
val parentProgress = (currentProgress / totalProgress) * (parentWeight * it.totalProgress)
it.setProgress(offset + parentProgress)
}
}
}

View file

@ -50,7 +50,7 @@ internal class DefaultProcessEventForPushTask @Inject constructor(
}
val newJoinEvents = params.syncResponse.join
.mapNotNull { (key, value) ->
value.timeline?.events?.map { it.copy(roomId = key) }
value.roomSync.timeline?.events?.map { it.copy(roomId = key) }
}
.flatten()
val inviteEvents = params.syncResponse.invite
@ -80,7 +80,7 @@ internal class DefaultProcessEventForPushTask @Inject constructor(
val allRedactedEvents = params.syncResponse.join
.asSequence()
.mapNotNull { (_, value) -> value.timeline?.events }
.mapNotNull { (_, value) -> value.roomSync.timeline?.events }
.flatten()
.filter { it.type == EventType.REDACTION }
.mapNotNull { it.redacts }

View file

@ -26,7 +26,7 @@ import org.matrix.android.sdk.internal.crypto.MXEventDecryptionResult
import org.matrix.android.sdk.internal.crypto.algorithms.olm.OlmDecryptionResult
import org.matrix.android.sdk.internal.crypto.model.event.OlmEventContent
import org.matrix.android.sdk.internal.crypto.verification.DefaultVerificationService
import org.matrix.android.sdk.internal.session.DefaultInitialSyncProgressService
import org.matrix.android.sdk.internal.session.initsync.ProgressReporter
import org.matrix.android.sdk.internal.session.sync.model.SyncResponse
import org.matrix.android.sdk.internal.session.sync.model.ToDeviceSyncResponse
import timber.log.Timber
@ -35,10 +35,10 @@ import javax.inject.Inject
internal class CryptoSyncHandler @Inject constructor(private val cryptoService: DefaultCryptoService,
private val verificationService: DefaultVerificationService) {
fun handleToDevice(toDevice: ToDeviceSyncResponse, initialSyncProgressService: DefaultInitialSyncProgressService? = null) {
fun handleToDevice(toDevice: ToDeviceSyncResponse, progressReporter: ProgressReporter? = null) {
val total = toDevice.events?.size ?: 0
toDevice.events?.forEachIndexed { index, event ->
initialSyncProgressService?.reportProgress(((index / total.toFloat()) * 100).toInt())
progressReporter?.reportProgress(index * 100F / total)
// Decrypt event if necessary
Timber.i("## CRYPTO | To device event from ${event.senderId} of type:${event.type}")
decryptToDeviceEvent(event, null)
@ -75,7 +75,7 @@ internal class CryptoSyncHandler @Inject constructor(private val cryptoService:
// try to find device id to ease log reading
val deviceId = cryptoService.getCryptoDeviceInfo(event.senderId!!).firstOrNull {
it.identityKey() == senderKey
}?.deviceId ?: senderKey
}?.deviceId ?: senderKey
Timber.e("## CRYPTO | Failed to decrypt to device event from ${event.senderId}|$deviceId reason:<${event.mCryptoError ?: exception}>")
}

View file

@ -16,17 +16,17 @@
package org.matrix.android.sdk.internal.session.sync
import io.realm.Realm
import org.matrix.android.sdk.R
import org.matrix.android.sdk.api.session.room.model.Membership
import org.matrix.android.sdk.internal.database.model.GroupEntity
import org.matrix.android.sdk.internal.database.model.GroupSummaryEntity
import org.matrix.android.sdk.internal.database.query.getOrCreate
import org.matrix.android.sdk.internal.database.query.where
import org.matrix.android.sdk.internal.session.DefaultInitialSyncProgressService
import org.matrix.android.sdk.internal.session.mapWithProgress
import org.matrix.android.sdk.internal.session.initsync.ProgressReporter
import org.matrix.android.sdk.internal.session.initsync.mapWithProgress
import org.matrix.android.sdk.internal.session.sync.model.GroupsSyncResponse
import org.matrix.android.sdk.internal.session.sync.model.InvitedGroupSync
import io.realm.Realm
import javax.inject.Inject
internal class GroupSyncHandler @Inject constructor() {
@ -37,11 +37,9 @@ internal class GroupSyncHandler @Inject constructor() {
data class LEFT(val data: Map<String, Any>) : HandlingStrategy()
}
fun handle(
realm: Realm,
roomsSyncResponse: GroupsSyncResponse,
reporter: DefaultInitialSyncProgressService? = null
) {
fun handle(realm: Realm,
roomsSyncResponse: GroupsSyncResponse,
reporter: ProgressReporter? = null) {
handleGroupSync(realm, HandlingStrategy.JOINED(roomsSyncResponse.join), reporter)
handleGroupSync(realm, HandlingStrategy.INVITED(roomsSyncResponse.invite), reporter)
handleGroupSync(realm, HandlingStrategy.LEFT(roomsSyncResponse.leave), reporter)
@ -49,7 +47,7 @@ internal class GroupSyncHandler @Inject constructor() {
// PRIVATE METHODS *****************************************************************************
private fun handleGroupSync(realm: Realm, handlingStrategy: HandlingStrategy, reporter: DefaultInitialSyncProgressService?) {
private fun handleGroupSync(realm: Realm, handlingStrategy: HandlingStrategy, reporter: ProgressReporter?) {
val groups = when (handlingStrategy) {
is HandlingStrategy.JOINED ->
handlingStrategy.data.mapWithProgress(reporter, R.string.initial_sync_start_importing_account_groups, 0.6f) {

View file

@ -0,0 +1,111 @@
/*
* Copyright 2020 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.internal.session.sync
import com.squareup.moshi.JsonClass
import okio.buffer
import okio.source
import org.matrix.android.sdk.internal.di.MoshiProvider
import timber.log.Timber
import java.io.File
@JsonClass(generateAdapter = true)
internal data class InitialSyncStatus(
val step: Int = STEP_INIT,
val downloadedDate: Long = 0
) {
companion object {
const val STEP_INIT = 0
const val STEP_DOWNLOADING = 1
const val STEP_DOWNLOADED = 2
const val STEP_PARSED = 3
const val STEP_SUCCESS = 4
}
}
internal interface InitialSyncStatusRepository {
fun getStep(): Int
fun setStep(step: Int)
}
/**
* This class handle the current status of an initial sync and persist it on the disk, to be robust against crash
*/
internal class FileInitialSyncStatusRepository(directory: File) : InitialSyncStatusRepository {
companion object {
// After 2 hours, we consider that the downloaded file is outdated:
// - if a problem occurs, it's for big accounts, and big accounts have lots of new events in 2 hours
// - For small accounts, there should be no problem, so 2 hours delay will never be used.
private const val INIT_SYNC_FILE_LIFETIME = 2 * 60 * 60 * 1_000L
}
private val file = File(directory, "status.json")
private val jsonAdapter = MoshiProvider.providesMoshi().adapter(InitialSyncStatus::class.java)
private var cache: InitialSyncStatus? = null
override fun getStep(): Int {
ensureCache()
val state = cache?.step ?: InitialSyncStatus.STEP_INIT
return if (state >= InitialSyncStatus.STEP_DOWNLOADED
&& System.currentTimeMillis() > (cache?.downloadedDate ?: 0) + INIT_SYNC_FILE_LIFETIME) {
Timber.v("INIT_SYNC downloaded file is outdated, download it again")
// The downloaded file is outdated
setStep(InitialSyncStatus.STEP_INIT)
InitialSyncStatus.STEP_INIT
} else {
state
}
}
override fun setStep(step: Int) {
var newStatus = cache?.copy(step = step) ?: InitialSyncStatus(step = step)
if (step == InitialSyncStatus.STEP_DOWNLOADED) {
// Also store the downloaded date
newStatus = newStatus.copy(
downloadedDate = System.currentTimeMillis()
)
}
cache = newStatus
writeFile()
}
private fun ensureCache() {
if (cache == null) readFile()
}
/**
* File -> Cache
*/
private fun readFile() {
cache = file
.takeIf { it.exists() }
?.let { jsonAdapter.fromJson(it.source().buffer()) }
}
/**
* Cache -> File
*/
private fun writeFile() {
file.delete()
cache
?.let { jsonAdapter.toJson(it) }
?.let { file.writeText(it) }
}
}

View file

@ -0,0 +1,53 @@
/*
* Copyright 2020 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.internal.session.sync
var initialSyncStrategy: InitialSyncStrategy = InitialSyncStrategy.Optimized()
sealed class InitialSyncStrategy {
/**
* Parse the result in its entirety
* Pros:
* - Faster to handle parsed data
* Cons:
* - Slower to download and parse data
* - big RAM usage
* - not robust to crash
*/
object Legacy : InitialSyncStrategy()
/**
* Optimized.
* First store the request result in a file, to avoid doing it again in case of crash
*/
data class Optimized(
/**
* Limit to reach to decide to split the init sync response into smaller files
* Empiric value: 1 megabytes
*/
val minSizeToSplit: Long = 1024 * 1024,
/**
* Limit per room to reach to decide to store a join room into a file
* Empiric value: 10 kilobytes
*/
val minSizeToStoreInFile: Long = 10 * 1024,
/**
* Max number of rooms to insert at a time in database (to avoid too much RAM usage)
*/
val maxRoomsToInsert: Int = 100
) : InitialSyncStrategy()
}

View file

@ -49,8 +49,9 @@ import org.matrix.android.sdk.internal.database.query.where
import org.matrix.android.sdk.internal.di.MoshiProvider
import org.matrix.android.sdk.internal.di.UserId
import org.matrix.android.sdk.internal.extensions.clearWith
import org.matrix.android.sdk.internal.session.DefaultInitialSyncProgressService
import org.matrix.android.sdk.internal.session.mapWithProgress
import org.matrix.android.sdk.internal.session.initsync.ProgressReporter
import org.matrix.android.sdk.internal.session.initsync.mapWithProgress
import org.matrix.android.sdk.internal.session.initsync.reportSubtask
import org.matrix.android.sdk.internal.session.room.membership.RoomChangeMembershipStateDataSource
import org.matrix.android.sdk.internal.session.room.membership.RoomMemberEventHandler
import org.matrix.android.sdk.internal.session.room.read.FullyReadContent
@ -59,12 +60,14 @@ import org.matrix.android.sdk.internal.session.room.timeline.PaginationDirection
import org.matrix.android.sdk.internal.session.room.timeline.TimelineInput
import org.matrix.android.sdk.internal.session.room.typing.TypingEventContent
import org.matrix.android.sdk.internal.session.sync.model.InvitedRoomSync
import org.matrix.android.sdk.internal.session.sync.model.LazyRoomSync
import org.matrix.android.sdk.internal.session.sync.model.RoomSync
import org.matrix.android.sdk.internal.session.sync.model.RoomSyncAccountData
import org.matrix.android.sdk.internal.session.sync.model.RoomSyncEphemeral
import org.matrix.android.sdk.internal.session.sync.model.RoomsSyncResponse
import timber.log.Timber
import javax.inject.Inject
import kotlin.math.ceil
internal class RoomSyncHandler @Inject constructor(private val readReceiptHandler: ReadReceiptHandler,
private val roomSummaryUpdater: RoomSummaryUpdater,
@ -78,17 +81,15 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
private val timelineInput: TimelineInput) {
sealed class HandlingStrategy {
data class JOINED(val data: Map<String, RoomSync>) : HandlingStrategy()
data class JOINED(val data: Map<String, LazyRoomSync>) : HandlingStrategy()
data class INVITED(val data: Map<String, InvitedRoomSync>) : HandlingStrategy()
data class LEFT(val data: Map<String, RoomSync>) : HandlingStrategy()
}
fun handle(
realm: Realm,
roomsSyncResponse: RoomsSyncResponse,
isInitialSync: Boolean,
reporter: DefaultInitialSyncProgressService? = null
) {
fun handle(realm: Realm,
roomsSyncResponse: RoomsSyncResponse,
isInitialSync: Boolean,
reporter: ProgressReporter? = null) {
Timber.v("Execute transaction from $this")
handleRoomSync(realm, HandlingStrategy.JOINED(roomsSyncResponse.join), isInitialSync, reporter)
handleRoomSync(realm, HandlingStrategy.INVITED(roomsSyncResponse.invite), isInitialSync, reporter)
@ -97,7 +98,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
// PRIVATE METHODS *****************************************************************************
private fun handleRoomSync(realm: Realm, handlingStrategy: HandlingStrategy, isInitialSync: Boolean, reporter: DefaultInitialSyncProgressService?) {
private fun handleRoomSync(realm: Realm, handlingStrategy: HandlingStrategy, isInitialSync: Boolean, reporter: ProgressReporter?) {
val insertType = if (isInitialSync) {
EventInsertType.INITIAL_SYNC
} else {
@ -105,10 +106,17 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
}
val syncLocalTimeStampMillis = System.currentTimeMillis()
val rooms = when (handlingStrategy) {
is HandlingStrategy.JOINED ->
handlingStrategy.data.mapWithProgress(reporter, R.string.initial_sync_start_importing_account_joined_rooms, 0.6f) {
handleJoinedRoom(realm, it.key, it.value, isInitialSync, insertType, syncLocalTimeStampMillis)
is HandlingStrategy.JOINED -> {
if (isInitialSync && initialSyncStrategy is InitialSyncStrategy.Optimized) {
insertJoinRooms(realm, handlingStrategy, insertType, syncLocalTimeStampMillis, reporter)
// Rooms are already inserted, return an empty list
emptyList()
} else {
handlingStrategy.data.mapWithProgress(reporter, R.string.initial_sync_start_importing_account_joined_rooms, 0.6f) {
handleJoinedRoom(realm, it.key, it.value.roomSync, insertType, syncLocalTimeStampMillis)
}
}
}
is HandlingStrategy.INVITED ->
handlingStrategy.data.mapWithProgress(reporter, R.string.initial_sync_start_importing_account_invited_rooms, 0.1f) {
handleInvitedRoom(realm, it.key, it.value, insertType, syncLocalTimeStampMillis)
@ -123,17 +131,57 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
realm.insertOrUpdate(rooms)
}
private fun insertJoinRooms(realm: Realm,
handlingStrategy: HandlingStrategy.JOINED,
insertType: EventInsertType,
syncLocalTimeStampMillis: Long,
reporter: ProgressReporter?) {
val maxSize = (initialSyncStrategy as? InitialSyncStrategy.Optimized)?.maxRoomsToInsert ?: Int.MAX_VALUE
val listSize = handlingStrategy.data.keys.size
val numberOfChunks = ceil(listSize / maxSize.toDouble()).toInt()
if (numberOfChunks > 1) {
reportSubtask(reporter, R.string.initial_sync_start_importing_account_joined_rooms, numberOfChunks, 0.6f) {
val chunkSize = listSize / numberOfChunks
Timber.v("INIT_SYNC $listSize rooms to insert, split into $numberOfChunks sublists of $chunkSize items")
// I cannot find a better way to chunk a map, so chunk the keys and then create new maps
handlingStrategy.data.keys
.chunked(chunkSize)
.forEachIndexed { index, roomIds ->
val roomEntities = roomIds
.also { Timber.v("INIT_SYNC insert ${roomIds.size} rooms") }
.map {
handleJoinedRoom(
realm,
it,
(handlingStrategy.data[it] ?: error("Should not happen")).roomSync,
insertType,
syncLocalTimeStampMillis
)
}
realm.insertOrUpdate(roomEntities)
reporter?.reportProgress(index + 1F)
}
}
} else {
// No need to split
val rooms = handlingStrategy.data.mapWithProgress(reporter, R.string.initial_sync_start_importing_account_joined_rooms, 0.6f) {
handleJoinedRoom(realm, it.key, it.value.roomSync, insertType, syncLocalTimeStampMillis)
}
realm.insertOrUpdate(rooms)
}
}
private fun handleJoinedRoom(realm: Realm,
roomId: String,
roomSync: RoomSync,
isInitialSync: Boolean,
insertType: EventInsertType,
syncLocalTimestampMillis: Long): RoomEntity {
Timber.v("Handle join sync for room $roomId")
var ephemeralResult: EphemeralResult? = null
if (roomSync.ephemeral?.events?.isNotEmpty() == true) {
ephemeralResult = handleEphemeral(realm, roomId, roomSync.ephemeral, isInitialSync)
ephemeralResult = handleEphemeral(realm, roomId, roomSync.ephemeral, insertType == EventInsertType.INITIAL_SYNC)
}
if (roomSync.accountData?.events?.isNotEmpty() == true) {
@ -173,8 +221,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
roomSync.timeline.prevToken,
roomSync.timeline.limited,
insertType,
syncLocalTimestampMillis,
isInitialSync
syncLocalTimestampMillis
)
roomEntity.addIfNecessary(chunkEntity)
}
@ -278,8 +325,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
prevToken: String? = null,
isLimited: Boolean = true,
insertType: EventInsertType,
syncLocalTimestampMillis: Long,
isInitialSync: Boolean): ChunkEntity {
syncLocalTimestampMillis: Long): ChunkEntity {
val lastChunk = ChunkEntity.findLastForwardChunkOfRoom(realm, roomEntity.roomId)
val chunkEntity = if (!isLimited && lastChunk != null) {
lastChunk
@ -299,7 +345,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
}
eventIds.add(event.eventId)
if (event.isEncrypted() && !isInitialSync) {
if (event.isEncrypted() && insertType != EventInsertType.INITIAL_SYNC) {
decryptIfNeeded(event, roomId)
}

View file

@ -16,6 +16,7 @@
package org.matrix.android.sdk.internal.session.sync
import okhttp3.ResponseBody
import org.matrix.android.sdk.internal.network.NetworkConstants
import org.matrix.android.sdk.internal.network.TimeOutInterceptor
import org.matrix.android.sdk.internal.session.sync.model.SyncResponse
@ -23,6 +24,7 @@ import retrofit2.Call
import retrofit2.http.GET
import retrofit2.http.Header
import retrofit2.http.QueryMap
import retrofit2.http.Streaming
internal interface SyncAPI {
/**
@ -34,4 +36,15 @@ internal interface SyncAPI {
@Header(TimeOutInterceptor.READ_TIMEOUT) readTimeOut: Long = TimeOutInterceptor.DEFAULT_LONG_TIMEOUT,
@Header(TimeOutInterceptor.WRITE_TIMEOUT) writeTimeOut: Long = TimeOutInterceptor.DEFAULT_LONG_TIMEOUT
): Call<SyncResponse>
/**
* Set all the timeouts to 1 minute by default
*/
@Streaming
@GET(NetworkConstants.URI_API_PREFIX_PATH_R0 + "sync")
fun syncStream(@QueryMap params: Map<String, String>,
@Header(TimeOutInterceptor.CONNECT_TIMEOUT) connectTimeOut: Long = TimeOutInterceptor.DEFAULT_LONG_TIMEOUT,
@Header(TimeOutInterceptor.READ_TIMEOUT) readTimeOut: Long = TimeOutInterceptor.DEFAULT_LONG_TIMEOUT,
@Header(TimeOutInterceptor.WRITE_TIMEOUT) writeTimeOut: Long = TimeOutInterceptor.DEFAULT_LONG_TIMEOUT
): Call<ResponseBody>
}

View file

@ -25,10 +25,10 @@ import org.matrix.android.sdk.internal.crypto.DefaultCryptoService
import org.matrix.android.sdk.internal.di.SessionDatabase
import org.matrix.android.sdk.internal.di.SessionId
import org.matrix.android.sdk.internal.di.WorkManagerProvider
import org.matrix.android.sdk.internal.session.DefaultInitialSyncProgressService
import org.matrix.android.sdk.internal.session.initsync.ProgressReporter
import org.matrix.android.sdk.internal.session.group.GetGroupDataWorker
import org.matrix.android.sdk.internal.session.notification.ProcessEventForPushTask
import org.matrix.android.sdk.internal.session.reportSubtask
import org.matrix.android.sdk.internal.session.initsync.reportSubtask
import org.matrix.android.sdk.internal.session.sync.model.GroupsSyncResponse
import org.matrix.android.sdk.internal.session.sync.model.RoomsSyncResponse
import org.matrix.android.sdk.internal.session.sync.model.SyncResponse
@ -51,13 +51,13 @@ internal class SyncResponseHandler @Inject constructor(@SessionDatabase private
private val cryptoService: DefaultCryptoService,
private val tokenStore: SyncTokenStore,
private val processEventForPushTask: ProcessEventForPushTask,
private val pushRuleService: PushRuleService,
private val initialSyncProgressService: DefaultInitialSyncProgressService) {
private val pushRuleService: PushRuleService) {
suspend fun handleResponse(syncResponse: SyncResponse, fromToken: String?) {
suspend fun handleResponse(syncResponse: SyncResponse,
fromToken: String?,
reporter: ProgressReporter?) {
val isInitialSync = fromToken == null
Timber.v("Start handling sync, is InitialSync: $isInitialSync")
val reporter = initialSyncProgressService.takeIf { isInitialSync }
measureTimeMillis {
if (!cryptoService.isStarted()) {
@ -85,7 +85,7 @@ internal class SyncResponseHandler @Inject constructor(@SessionDatabase private
monarchy.awaitTransaction { realm ->
measureTimeMillis {
Timber.v("Handle rooms")
reportSubtask(reporter, R.string.initial_sync_start_importing_account_rooms, 100, 0.7f) {
reportSubtask(reporter, R.string.initial_sync_start_importing_account_rooms, 1, 0.7f) {
if (syncResponse.rooms != null) {
roomSyncHandler.handle(realm, syncResponse.rooms, isInitialSync, reporter)
}
@ -95,7 +95,7 @@ internal class SyncResponseHandler @Inject constructor(@SessionDatabase private
}
measureTimeMillis {
reportSubtask(reporter, R.string.initial_sync_start_importing_account_groups, 100, 0.1f) {
reportSubtask(reporter, R.string.initial_sync_start_importing_account_groups, 1, 0.1f) {
Timber.v("Handle groups")
if (syncResponse.groups != null) {
groupSyncHandler.handle(realm, syncResponse.groups, reporter)
@ -106,7 +106,7 @@ internal class SyncResponseHandler @Inject constructor(@SessionDatabase private
}
measureTimeMillis {
reportSubtask(reporter, R.string.initial_sync_start_importing_account_data, 100, 0.1f) {
reportSubtask(reporter, R.string.initial_sync_start_importing_account_data, 1, 0.1f) {
Timber.v("Handle accountData")
userAccountDataSyncHandler.handle(realm, syncResponse.accountData)
}

View file

@ -16,18 +16,29 @@
package org.matrix.android.sdk.internal.session.sync
import okhttp3.ResponseBody
import org.matrix.android.sdk.R
import org.matrix.android.sdk.internal.di.SessionFilesDirectory
import org.matrix.android.sdk.internal.di.UserId
import org.matrix.android.sdk.internal.network.GlobalErrorReceiver
import org.matrix.android.sdk.internal.network.TimeOutInterceptor
import org.matrix.android.sdk.internal.network.executeRequest
import org.matrix.android.sdk.internal.session.DefaultInitialSyncProgressService
import org.matrix.android.sdk.internal.network.toFailure
import org.matrix.android.sdk.internal.session.filter.FilterRepository
import org.matrix.android.sdk.internal.session.homeserver.GetHomeServerCapabilitiesTask
import org.matrix.android.sdk.internal.session.initsync.DefaultInitialSyncProgressService
import org.matrix.android.sdk.internal.session.initsync.reportSubtask
import org.matrix.android.sdk.internal.session.sync.model.LazyRoomSync
import org.matrix.android.sdk.internal.session.sync.model.SyncResponse
import org.matrix.android.sdk.internal.session.sync.parsing.InitialSyncResponseParser
import org.matrix.android.sdk.internal.session.user.UserStore
import org.matrix.android.sdk.internal.task.Task
import org.matrix.android.sdk.internal.util.logDuration
import retrofit2.Response
import retrofit2.awaitResponse
import timber.log.Timber
import java.io.File
import java.net.SocketTimeoutException
import javax.inject.Inject
internal interface SyncTask : Task<SyncTask.Params, Unit> {
@ -48,9 +59,15 @@ internal class DefaultSyncTask @Inject constructor(
private val getHomeServerCapabilitiesTask: GetHomeServerCapabilitiesTask,
private val userStore: UserStore,
private val syncTaskSequencer: SyncTaskSequencer,
private val globalErrorReceiver: GlobalErrorReceiver
private val globalErrorReceiver: GlobalErrorReceiver,
@SessionFilesDirectory
private val fileDirectory: File,
private val syncResponseParser: InitialSyncResponseParser
) : SyncTask {
private val workingDir = File(fileDirectory, "is")
private val initialSyncStatusRepository: InitialSyncStatusRepository = FileInitialSyncStatusRepository(workingDir)
override suspend fun execute(params: SyncTask.Params) = syncTaskSequencer.post {
doSync(params)
}
@ -73,28 +90,128 @@ internal class DefaultSyncTask @Inject constructor(
if (isInitialSync) {
// We might want to get the user information in parallel too
userStore.createOrUpdate(userId)
initialSyncProgressService.endAll()
initialSyncProgressService.startTask(R.string.initial_sync_start_importing_account, 100)
initialSyncProgressService.startRoot(R.string.initial_sync_start_importing_account, 100)
}
// Maybe refresh the home server capabilities data we know
getHomeServerCapabilitiesTask.execute(GetHomeServerCapabilitiesTask.Params(forceRefresh = false))
val readTimeOut = (params.timeout + TIMEOUT_MARGIN).coerceAtLeast(TimeOutInterceptor.DEFAULT_LONG_TIMEOUT)
val syncResponse = executeRequest<SyncResponse>(globalErrorReceiver) {
apiCall = syncAPI.sync(
params = requestParams,
readTimeOut = readTimeOut
)
}
syncResponseHandler.handleResponse(syncResponse, token)
if (isInitialSync) {
val initSyncStrategy = initialSyncStrategy
logDuration("INIT_SYNC strategy: $initSyncStrategy") {
if (initSyncStrategy is InitialSyncStrategy.Optimized) {
safeInitialSync(requestParams, initSyncStrategy)
} else {
val syncResponse = logDuration("INIT_SYNC Request") {
executeRequest<SyncResponse>(globalErrorReceiver) {
apiCall = syncAPI.sync(
params = requestParams,
readTimeOut = readTimeOut
)
}
}
logDuration("INIT_SYNC Database insertion") {
syncResponseHandler.handleResponse(syncResponse, token, initialSyncProgressService)
}
}
}
initialSyncProgressService.endAll()
} else {
val syncResponse = executeRequest<SyncResponse>(globalErrorReceiver) {
apiCall = syncAPI.sync(
params = requestParams,
readTimeOut = readTimeOut
)
}
syncResponseHandler.handleResponse(syncResponse, token, null)
}
Timber.v("Sync task finished on Thread: ${Thread.currentThread().name}")
}
private suspend fun safeInitialSync(requestParams: Map<String, String>, initSyncStrategy: InitialSyncStrategy.Optimized) {
workingDir.mkdirs()
val workingFile = File(workingDir, "initSync.json")
val status = initialSyncStatusRepository.getStep()
if (workingFile.exists() && status >= InitialSyncStatus.STEP_DOWNLOADED) {
// Go directly to the parse step
Timber.v("INIT_SYNC file is already here")
reportSubtask(initialSyncProgressService, R.string.initial_sync_start_downloading, 1, 0.3f) {
// Empty task
}
} else {
initialSyncStatusRepository.setStep(InitialSyncStatus.STEP_DOWNLOADING)
val syncResponse = logDuration("INIT_SYNC Perform server request") {
reportSubtask(initialSyncProgressService, R.string.initial_sync_start_server_computing, 1, 0.2f) {
getSyncResponse(requestParams, MAX_NUMBER_OF_RETRY_AFTER_TIMEOUT)
}
}
if (syncResponse.isSuccessful) {
logDuration("INIT_SYNC Download and save to file") {
reportSubtask(initialSyncProgressService, R.string.initial_sync_start_downloading, 1, 0.1f) {
syncResponse.body()?.byteStream()?.use { inputStream ->
workingFile.outputStream().use { outputStream ->
inputStream.copyTo(outputStream)
}
}
}
}
} else {
throw syncResponse.toFailure(globalErrorReceiver)
.also { Timber.w("INIT_SYNC request failure: $this") }
}
initialSyncStatusRepository.setStep(InitialSyncStatus.STEP_DOWNLOADED)
}
reportSubtask(initialSyncProgressService, R.string.initial_sync_start_importing_account, 1, 0.7F) {
handleSyncFile(workingFile, initSyncStrategy)
}
// Delete all files
workingDir.deleteRecursively()
}
private suspend fun getSyncResponse(requestParams: Map<String, String>, maxNumberOfRetries: Int): Response<ResponseBody> {
var retry = maxNumberOfRetries
while (true) {
retry--
try {
return syncAPI.syncStream(
params = requestParams
).awaitResponse()
} catch (throwable: Throwable) {
if (throwable is SocketTimeoutException && retry > 0) {
Timber.w("INIT_SYNC timeout retry left: $retry")
} else {
Timber.e(throwable, "INIT_SYNC timeout, no retry left, or other error")
throw throwable
}
}
}
}
private suspend fun handleSyncFile(workingFile: File, initSyncStrategy: InitialSyncStrategy.Optimized) {
logDuration("INIT_SYNC handleSyncFile()") {
val syncResponse = logDuration("INIT_SYNC Read file and parse") {
syncResponseParser.parse(initSyncStrategy, workingFile)
}
initialSyncStatusRepository.setStep(InitialSyncStatus.STEP_PARSED)
// Log some stats
val nbOfJoinedRooms = syncResponse.rooms?.join?.size ?: 0
val nbOfJoinedRoomsInFile = syncResponse.rooms?.join?.values?.count { it is LazyRoomSync.Stored }
Timber.v("INIT_SYNC $nbOfJoinedRooms rooms, $nbOfJoinedRoomsInFile stored into files")
logDuration("INIT_SYNC Database insertion") {
syncResponseHandler.handleResponse(syncResponse, null, initialSyncProgressService)
}
initialSyncStatusRepository.setStep(InitialSyncStatus.STEP_SUCCESS)
}
}
companion object {
private const val MAX_NUMBER_OF_RETRY_AFTER_TIMEOUT = 50
private const val TIMEOUT_MARGIN: Long = 10_000
}
}

View file

@ -0,0 +1,43 @@
/*
* Copyright (c) 2021 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.internal.session.sync.model
import com.squareup.moshi.JsonAdapter
import com.squareup.moshi.JsonClass
import com.squareup.moshi.JsonReader
import okio.buffer
import okio.source
import java.io.File
@JsonClass(generateAdapter = false)
internal sealed class LazyRoomSync {
data class Parsed(val _roomSync: RoomSync) : LazyRoomSync()
data class Stored(val roomSyncAdapter: JsonAdapter<RoomSync>, val file: File) : LazyRoomSync()
val roomSync: RoomSync
get() {
return when (this) {
is Parsed -> _roomSync
is Stored -> {
// Parse the file now
file.inputStream().use { pos ->
roomSyncAdapter.fromJson(JsonReader.of(pos.source().buffer()))!!
}
}
}
}
}

View file

@ -24,7 +24,7 @@ internal data class RoomsSyncResponse(
/**
* Joined rooms: keys are rooms ids.
*/
@Json(name = "join") val join: Map<String, RoomSync> = emptyMap(),
@Json(name = "join") val join: Map<String, LazyRoomSync> = emptyMap(),
/**
* Invitations. The rooms that the user has been invited to: keys are rooms ids.

View file

@ -0,0 +1,48 @@
/*
* Copyright (c) 2021 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.internal.session.sync.parsing
import com.squareup.moshi.Moshi
import okio.buffer
import okio.source
import org.matrix.android.sdk.internal.session.sync.InitialSyncStrategy
import org.matrix.android.sdk.internal.session.sync.model.SyncResponse
import timber.log.Timber
import java.io.File
import javax.inject.Inject
internal class InitialSyncResponseParser @Inject constructor(private val moshi: Moshi) {
fun parse(syncStrategy: InitialSyncStrategy.Optimized, workingFile: File): SyncResponse {
val syncResponseLength = workingFile.length().toInt()
Timber.v("INIT_SYNC Sync file size is $syncResponseLength bytes")
val shouldSplit = syncResponseLength >= syncStrategy.minSizeToSplit
Timber.v("INIT_SYNC should split in several files: $shouldSplit")
return getMoshi(syncStrategy, workingFile.parentFile!!, shouldSplit)
.adapter(SyncResponse::class.java)
.fromJson(workingFile.source().buffer())!!
}
private fun getMoshi(syncStrategy: InitialSyncStrategy.Optimized, workingDirectory: File, shouldSplit: Boolean): Moshi {
// If we don't have to split we'll rely on the already default moshi
if (!shouldSplit) return moshi
// Otherwise, we create a new adapter for handling Map of Lazy sync
return moshi.newBuilder()
.add(SplitLazyRoomSyncJsonAdapter(workingDirectory, syncStrategy))
.build()
}
}

View file

@ -0,0 +1,84 @@
/*
* Copyright (c) 2021 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.internal.session.sync.parsing
import com.squareup.moshi.FromJson
import com.squareup.moshi.JsonAdapter
import com.squareup.moshi.JsonReader
import com.squareup.moshi.JsonWriter
import com.squareup.moshi.ToJson
import org.matrix.android.sdk.internal.session.sync.InitialSyncStrategy
import org.matrix.android.sdk.internal.session.sync.model.LazyRoomSync
import org.matrix.android.sdk.internal.session.sync.model.RoomSync
import timber.log.Timber
import java.io.File
import java.util.concurrent.atomic.AtomicInteger
internal class DefaultLazyRoomSyncJsonAdapter {
@FromJson
fun fromJson(reader: JsonReader, delegate: JsonAdapter<RoomSync>): LazyRoomSync? {
val roomSync = delegate.fromJson(reader) ?: return null
return LazyRoomSync.Parsed(roomSync)
}
@ToJson
fun toJson(writer: JsonWriter, value: LazyRoomSync?) {
// This Adapter is not supposed to serialize object
Timber.v("To json $value with $writer")
throw UnsupportedOperationException()
}
}
internal class SplitLazyRoomSyncJsonAdapter(
private val workingDirectory: File,
private val syncStrategy: InitialSyncStrategy.Optimized
) {
private val atomicInteger = AtomicInteger(0)
private fun createFile(): File {
val index = atomicInteger.getAndIncrement()
return File(workingDirectory, "room_$index.json")
}
@FromJson
fun fromJson(reader: JsonReader, delegate: JsonAdapter<RoomSync>): LazyRoomSync? {
val path = reader.path
val json = reader.nextSource().inputStream().bufferedReader().use {
it.readText()
}
val limit = syncStrategy.minSizeToStoreInFile
return if (json.length > limit) {
Timber.v("INIT_SYNC $path content length: ${json.length} copy to a file")
// Copy the source to a file
val file = createFile()
file.writeText(json)
LazyRoomSync.Stored(delegate, file)
} else {
Timber.v("INIT_SYNC $path content length: ${json.length} parse it now")
val roomSync = delegate.fromJson(json) ?: return null
LazyRoomSync.Parsed(roomSync)
}
}
@ToJson
fun toJson(writer: JsonWriter, value: LazyRoomSync?) {
// This Adapter is not supposed to serialize object
Timber.v("To json $value with $writer")
throw UnsupportedOperationException()
}
}

View file

@ -0,0 +1,51 @@
/*
* Copyright 2020 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.internal.util
import org.matrix.android.sdk.BuildConfig
import timber.log.Timber
internal suspend fun <T> logDuration(message: String,
block: suspend () -> T): T {
Timber.v("$message -- BEGIN")
val start = System.currentTimeMillis()
val result = logRamUsage(message) {
block()
}
val duration = System.currentTimeMillis() - start
Timber.v("$message -- END duration: $duration ms")
return result
}
internal suspend fun <T> logRamUsage(message: String, block: suspend () -> T): T {
return if (BuildConfig.DEBUG) {
val runtime = Runtime.getRuntime()
runtime.gc()
val freeMemoryInMb = runtime.freeMemory() / 1048576L
val usedMemInMBStart = runtime.totalMemory() / 1048576L - freeMemoryInMb
Timber.v("$message -- BEGIN (free memory: $freeMemoryInMb MB)")
val result = block()
runtime.gc()
val usedMemInMBEnd = (runtime.totalMemory() - runtime.freeMemory()) / 1048576L
val usedMemInMBDiff = usedMemInMBEnd - usedMemInMBStart
Timber.v("$message -- END RAM usage: $usedMemInMBDiff MB")
result
} else {
block()
}
}

View file

@ -199,6 +199,8 @@
<string name="room_displayname_empty_room">Empty room</string>
<string name="room_displayname_empty_room_was">Empty room (was %s)</string>
<string name="initial_sync_start_server_computing">Initial Sync:\nWaiting for server response…</string>
<string name="initial_sync_start_downloading">Initial Sync:\nDownloading data…</string>
<string name="initial_sync_start_importing_account">Initial Sync:\nImporting account…</string>
<string name="initial_sync_start_importing_account_crypto">Initial Sync:\nImporting crypto</string>
<string name="initial_sync_start_importing_account_rooms">Initial Sync:\nImporting Rooms</string>

View file

@ -161,7 +161,7 @@ Formatter\.formatShortFileSize===1
# android\.text\.TextUtils
### This is not a rule, but a warning: the number of "enum class" has changed. For Json classes, it is mandatory that they have `@JsonClass(generateAdapter = false)`. If the enum is not used as a Json class, change the value in file forbidden_strings_in_code.txt
enum class===88
enum class===89
### Do not import temporary legacy classes
import org.matrix.android.sdk.internal.legacy.riot===3

View file

@ -21,6 +21,7 @@ import android.content.Intent
import android.net.Uri
import android.os.Bundle
import android.os.Parcelable
import android.view.Menu
import android.view.MenuItem
import androidx.appcompat.app.AlertDialog
import androidx.appcompat.widget.Toolbar
@ -40,6 +41,8 @@ import im.vector.app.core.platform.ToolbarConfigurable
import im.vector.app.core.platform.VectorBaseActivity
import im.vector.app.core.pushers.PushersManager
import im.vector.app.databinding.ActivityHomeBinding
import im.vector.app.features.MainActivity
import im.vector.app.features.MainActivityArgs
import im.vector.app.features.disclaimer.showDisclaimerDialog
import im.vector.app.features.matrixto.MatrixToBottomSheet
import im.vector.app.features.notifications.NotificationDrawerManager
@ -57,9 +60,11 @@ import im.vector.app.features.workers.signout.ServerBackupStatusViewState
import im.vector.app.push.fcm.FcmHelper
import io.reactivex.android.schedulers.AndroidSchedulers
import kotlinx.parcelize.Parcelize
import org.matrix.android.sdk.api.session.InitialSyncProgressService
import org.matrix.android.sdk.api.session.initsync.InitialSyncProgressService
import org.matrix.android.sdk.api.session.permalinks.PermalinkService
import org.matrix.android.sdk.api.util.MatrixItem
import org.matrix.android.sdk.internal.session.sync.InitialSyncStrategy
import org.matrix.android.sdk.internal.session.sync.initialSyncStrategy
import timber.log.Timber
import javax.inject.Inject
@ -358,6 +363,12 @@ class HomeActivity :
override fun getMenuRes() = R.menu.home
override fun onPrepareOptionsMenu(menu: Menu): Boolean {
menu.findItem(R.id.menu_home_init_sync_legacy)?.isVisible = vectorPreferences.developerMode()
menu.findItem(R.id.menu_home_init_sync_optimized)?.isVisible = vectorPreferences.developerMode()
return super.onPrepareOptionsMenu(menu)
}
override fun onOptionsItemSelected(item: MenuItem): Boolean {
when (item.itemId) {
R.id.menu_home_suggestion -> {
@ -368,6 +379,20 @@ class HomeActivity :
bugReporter.openBugReportScreen(this, false)
return true
}
R.id.menu_home_init_sync_legacy -> {
// Configure the SDK
initialSyncStrategy = InitialSyncStrategy.Legacy
// And clear cache
MainActivity.restartApp(this, MainActivityArgs(clearCache = true))
return true
}
R.id.menu_home_init_sync_optimized -> {
// Configure the SDK
initialSyncStrategy = InitialSyncStrategy.Optimized()
// And clear cache
MainActivity.restartApp(this, MainActivityArgs(clearCache = true))
return true
}
R.id.menu_home_filter -> {
navigator.openRoomsFiltering(this)
return true

View file

@ -40,7 +40,7 @@ import org.matrix.android.sdk.api.auth.registration.RegistrationFlowResponse
import org.matrix.android.sdk.api.auth.registration.nextUncompletedStage
import org.matrix.android.sdk.api.extensions.tryOrNull
import org.matrix.android.sdk.api.pushrules.RuleIds
import org.matrix.android.sdk.api.session.InitialSyncProgressService
import org.matrix.android.sdk.api.session.initsync.InitialSyncProgressService
import org.matrix.android.sdk.api.session.room.model.Membership
import org.matrix.android.sdk.api.session.room.roomSummaryQueryParams
import org.matrix.android.sdk.api.util.toMatrixItem
@ -130,7 +130,7 @@ class HomeActivityViewModel @AssistedInject constructor(
// Schedule a check of the bootstrap when the init sync will be finished
checkBootstrap = true
}
is InitialSyncProgressService.Status.Idle -> {
is InitialSyncProgressService.Status.Idle -> {
if (checkBootstrap) {
checkBootstrap = false
maybeBootstrapCrossSigningAfterInitialSync()

View file

@ -17,7 +17,7 @@
package im.vector.app.features.home
import com.airbnb.mvrx.MvRxState
import org.matrix.android.sdk.api.session.InitialSyncProgressService
import org.matrix.android.sdk.api.session.initsync.InitialSyncProgressService
data class HomeActivityViewState(
val initialSyncProgressServiceStatus: InitialSyncProgressService.Status = InitialSyncProgressService.Status.Idle

View file

@ -1,6 +1,7 @@
<?xml version="1.0" encoding="utf-8"?>
<menu xmlns:android="http://schemas.android.com/apk/res/android"
xmlns:app="http://schemas.android.com/apk/res-auto">
xmlns:app="http://schemas.android.com/apk/res-auto"
xmlns:tools="http://schemas.android.com/tools">
<item
android:id="@+id/menu_home_setting"
@ -18,6 +19,16 @@
android:icon="@drawable/ic_material_bug_report"
android:title="@string/send_bug_report" />
<item
android:id="@+id/menu_home_init_sync_legacy"
android:title="Do a legacy init sync"
tools:ignore="HardcodedText" />
<item
android:id="@+id/menu_home_init_sync_optimized"
android:title="Do an optimized init sync"
tools:ignore="HardcodedText" />
<item
android:id="@+id/menu_home_filter"
android:icon="@drawable/ic_search"