mirror of
https://github.com/element-hq/element-android
synced 2024-11-27 11:59:12 +03:00
Read init sync to a file and split into smaller files to handle it
This commit is contained in:
parent
9d2407ccb0
commit
c938795576
16 changed files with 583 additions and 25 deletions
|
@ -90,6 +90,7 @@ Improvements 🙌:
|
|||
- SSO support for cross signing (#1062)
|
||||
- Deactivate account when logged in with SSO (#1264)
|
||||
- SSO UIA doesn't work (#2754)
|
||||
- Improve initial sync performance (#983)
|
||||
|
||||
Bugfix 🐛:
|
||||
- Fix clear cache issue: sometimes, after a clear cache, there is still a token, so the init sync service is not started.
|
||||
|
|
|
@ -14,9 +14,9 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.matrix.android.sdk.internal.database.model;
|
||||
package org.matrix.android.sdk.internal.database.model
|
||||
|
||||
public enum EventInsertType {
|
||||
internal enum class EventInsertType {
|
||||
INITIAL_SYNC,
|
||||
INCREMENTAL_SYNC,
|
||||
PAGINATION,
|
|
@ -36,6 +36,7 @@ import org.matrix.android.sdk.internal.network.parsing.ForceToBooleanJsonAdapter
|
|||
import org.matrix.android.sdk.internal.network.parsing.RuntimeJsonAdapterFactory
|
||||
import org.matrix.android.sdk.internal.network.parsing.TlsVersionMoshiAdapter
|
||||
import org.matrix.android.sdk.internal.network.parsing.UriMoshiAdapter
|
||||
import org.matrix.android.sdk.internal.session.sync.model.LazyRoomSyncJsonAdapter
|
||||
|
||||
object MoshiProvider {
|
||||
|
||||
|
@ -44,6 +45,7 @@ object MoshiProvider {
|
|||
.add(ForceToBooleanJsonAdapter())
|
||||
.add(CipherSuiteMoshiAdapter())
|
||||
.add(TlsVersionMoshiAdapter())
|
||||
.add(LazyRoomSyncJsonAdapter())
|
||||
.add(RuntimeJsonAdapterFactory.of(MessageContent::class.java, "msgtype", MessageDefaultContent::class.java)
|
||||
.registerSubtype(MessageTextContent::class.java, MessageType.MSGTYPE_TEXT)
|
||||
.registerSubtype(MessageNoticeContent::class.java, MessageType.MSGTYPE_NOTICE)
|
||||
|
|
|
@ -50,7 +50,7 @@ internal class DefaultProcessEventForPushTask @Inject constructor(
|
|||
}
|
||||
val newJoinEvents = params.syncResponse.join
|
||||
.mapNotNull { (key, value) ->
|
||||
value.timeline?.events?.map { it.copy(roomId = key) }
|
||||
value.roomSync.timeline?.events?.map { it.copy(roomId = key) }
|
||||
}
|
||||
.flatten()
|
||||
val inviteEvents = params.syncResponse.invite
|
||||
|
@ -80,7 +80,7 @@ internal class DefaultProcessEventForPushTask @Inject constructor(
|
|||
|
||||
val allRedactedEvents = params.syncResponse.join
|
||||
.asSequence()
|
||||
.mapNotNull { (_, value) -> value.timeline?.events }
|
||||
.mapNotNull { (_, value) -> value.roomSync.timeline?.events }
|
||||
.flatten()
|
||||
.filter { it.type == EventType.REDACTION }
|
||||
.mapNotNull { it.redacts }
|
||||
|
|
|
@ -0,0 +1,111 @@
|
|||
/*
|
||||
* Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.matrix.android.sdk.internal.session.sync
|
||||
|
||||
import com.squareup.moshi.JsonClass
|
||||
import okio.buffer
|
||||
import okio.source
|
||||
import org.matrix.android.sdk.internal.di.MoshiProvider
|
||||
import timber.log.Timber
|
||||
import java.io.File
|
||||
|
||||
@JsonClass(generateAdapter = true)
|
||||
internal data class InitialSyncStatus(
|
||||
val step: Int = STEP_INIT,
|
||||
val downloadedDate: Long = 0
|
||||
) {
|
||||
companion object {
|
||||
const val STEP_INIT = 0
|
||||
const val STEP_DOWNLOADING = 1
|
||||
const val STEP_DOWNLOADED = 2
|
||||
const val STEP_PARSED = 3
|
||||
const val STEP_SUCCESS = 4
|
||||
}
|
||||
}
|
||||
|
||||
internal interface InitialSyncStatusRepository {
|
||||
fun getStep(): Int
|
||||
|
||||
fun setStep(step: Int)
|
||||
}
|
||||
|
||||
/**
|
||||
* This class handle the current status of an initial sync and persist it on the disk, to be robust against crash
|
||||
*/
|
||||
internal class FileInitialSyncStatusRepository(directory: File) : InitialSyncStatusRepository {
|
||||
|
||||
companion object {
|
||||
// After 2 hours, we consider that the downloaded file is outdated:
|
||||
// - if a problem occurs, it's for big accounts, and big accounts have lots of new events in 2 hours
|
||||
// - For small accounts, there should be no problem, so 2 hours delay will never be used.
|
||||
private const val INIT_SYNC_FILE_LIFETIME = 2 * 60 * 60 * 1_000L
|
||||
}
|
||||
|
||||
private val file = File(directory, "status.json")
|
||||
private val jsonAdapter = MoshiProvider.providesMoshi().adapter(InitialSyncStatus::class.java)
|
||||
|
||||
private var cache: InitialSyncStatus? = null
|
||||
|
||||
override fun getStep(): Int {
|
||||
ensureCache()
|
||||
val state = cache?.step ?: InitialSyncStatus.STEP_INIT
|
||||
return if (state >= InitialSyncStatus.STEP_DOWNLOADED
|
||||
&& System.currentTimeMillis() > (cache?.downloadedDate ?: 0) + INIT_SYNC_FILE_LIFETIME) {
|
||||
Timber.v("INIT_SYNC downloaded file is outdated, download it again")
|
||||
// The downloaded file is outdated
|
||||
setStep(InitialSyncStatus.STEP_INIT)
|
||||
InitialSyncStatus.STEP_INIT
|
||||
} else {
|
||||
state
|
||||
}
|
||||
}
|
||||
|
||||
override fun setStep(step: Int) {
|
||||
var newStatus = cache?.copy(step = step) ?: InitialSyncStatus(step = step)
|
||||
if (step == InitialSyncStatus.STEP_DOWNLOADED) {
|
||||
// Also store the downloaded date
|
||||
newStatus = newStatus.copy(
|
||||
downloadedDate = System.currentTimeMillis()
|
||||
)
|
||||
}
|
||||
cache = newStatus
|
||||
writeFile()
|
||||
}
|
||||
|
||||
private fun ensureCache() {
|
||||
if (cache == null) readFile()
|
||||
}
|
||||
|
||||
/**
|
||||
* File -> Cache
|
||||
*/
|
||||
private fun readFile() {
|
||||
cache = file
|
||||
.takeIf { it.exists() }
|
||||
?.let { jsonAdapter.fromJson(it.source().buffer()) }
|
||||
}
|
||||
|
||||
/**
|
||||
* Cache -> File
|
||||
*/
|
||||
private fun writeFile() {
|
||||
file.delete()
|
||||
cache
|
||||
?.let { jsonAdapter.toJson(it) }
|
||||
?.let { file.writeText(it) }
|
||||
}
|
||||
}
|
|
@ -0,0 +1,53 @@
|
|||
/*
|
||||
* Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.matrix.android.sdk.internal.session.sync
|
||||
|
||||
var initialSyncStrategy: InitialSyncStrategy = InitialSyncStrategy.Optimized()
|
||||
|
||||
sealed class InitialSyncStrategy {
|
||||
/**
|
||||
* Parse the result in its entirety
|
||||
* Pros:
|
||||
* - Faster to handle parsed data
|
||||
* Cons:
|
||||
* - Slower to download and parse data
|
||||
* - big RAM usage
|
||||
* - not robust to crash
|
||||
*/
|
||||
object Legacy : InitialSyncStrategy()
|
||||
|
||||
/**
|
||||
* Optimized.
|
||||
* First store the request result in a file, to avoid doing it again in case of crash
|
||||
*/
|
||||
data class Optimized(
|
||||
/**
|
||||
* Limit to reach to decide to split the init sync response into smaller files
|
||||
* Empiric value: 1 megabytes
|
||||
*/
|
||||
val minSizeToSplit: Long = 1024 * 1024,
|
||||
/**
|
||||
* Limit per room to reach to decide to store a join room into a file
|
||||
* Empiric value: 10 kilobytes
|
||||
*/
|
||||
val minSizeToStoreInFile: Long = 10 * 1024,
|
||||
/**
|
||||
* Max number of rooms to insert at a time in database (to avoid too much RAM usage)
|
||||
*/
|
||||
val maxRoomsToInsert: Int = 100
|
||||
) : InitialSyncStrategy()
|
||||
}
|
|
@ -51,6 +51,7 @@ import org.matrix.android.sdk.internal.di.UserId
|
|||
import org.matrix.android.sdk.internal.extensions.clearWith
|
||||
import org.matrix.android.sdk.internal.session.DefaultInitialSyncProgressService
|
||||
import org.matrix.android.sdk.internal.session.mapWithProgress
|
||||
import org.matrix.android.sdk.internal.session.reportSubtask
|
||||
import org.matrix.android.sdk.internal.session.room.membership.RoomChangeMembershipStateDataSource
|
||||
import org.matrix.android.sdk.internal.session.room.membership.RoomMemberEventHandler
|
||||
import org.matrix.android.sdk.internal.session.room.read.FullyReadContent
|
||||
|
@ -59,12 +60,14 @@ import org.matrix.android.sdk.internal.session.room.timeline.PaginationDirection
|
|||
import org.matrix.android.sdk.internal.session.room.timeline.TimelineInput
|
||||
import org.matrix.android.sdk.internal.session.room.typing.TypingEventContent
|
||||
import org.matrix.android.sdk.internal.session.sync.model.InvitedRoomSync
|
||||
import org.matrix.android.sdk.internal.session.sync.model.LazyRoomSync
|
||||
import org.matrix.android.sdk.internal.session.sync.model.RoomSync
|
||||
import org.matrix.android.sdk.internal.session.sync.model.RoomSyncAccountData
|
||||
import org.matrix.android.sdk.internal.session.sync.model.RoomSyncEphemeral
|
||||
import org.matrix.android.sdk.internal.session.sync.model.RoomsSyncResponse
|
||||
import timber.log.Timber
|
||||
import javax.inject.Inject
|
||||
import kotlin.math.ceil
|
||||
|
||||
internal class RoomSyncHandler @Inject constructor(private val readReceiptHandler: ReadReceiptHandler,
|
||||
private val roomSummaryUpdater: RoomSummaryUpdater,
|
||||
|
@ -78,7 +81,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
|
|||
private val timelineInput: TimelineInput) {
|
||||
|
||||
sealed class HandlingStrategy {
|
||||
data class JOINED(val data: Map<String, RoomSync>) : HandlingStrategy()
|
||||
data class JOINED(val data: Map<String, LazyRoomSync>) : HandlingStrategy()
|
||||
data class INVITED(val data: Map<String, InvitedRoomSync>) : HandlingStrategy()
|
||||
data class LEFT(val data: Map<String, RoomSync>) : HandlingStrategy()
|
||||
}
|
||||
|
@ -105,10 +108,17 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
|
|||
}
|
||||
val syncLocalTimeStampMillis = System.currentTimeMillis()
|
||||
val rooms = when (handlingStrategy) {
|
||||
is HandlingStrategy.JOINED ->
|
||||
handlingStrategy.data.mapWithProgress(reporter, R.string.initial_sync_start_importing_account_joined_rooms, 0.6f) {
|
||||
handleJoinedRoom(realm, it.key, it.value, isInitialSync, insertType, syncLocalTimeStampMillis)
|
||||
is HandlingStrategy.JOINED -> {
|
||||
if (isInitialSync && initialSyncStrategy is InitialSyncStrategy.Optimized) {
|
||||
insertJoinRooms(realm, handlingStrategy, insertType, syncLocalTimeStampMillis, reporter)
|
||||
// Rooms are already inserted, return an empty list
|
||||
emptyList()
|
||||
} else {
|
||||
handlingStrategy.data.mapWithProgress(reporter, R.string.initial_sync_start_importing_account_joined_rooms, 0.6f) {
|
||||
handleJoinedRoom(realm, it.key, it.value.roomSync, insertType, syncLocalTimeStampMillis)
|
||||
}
|
||||
}
|
||||
}
|
||||
is HandlingStrategy.INVITED ->
|
||||
handlingStrategy.data.mapWithProgress(reporter, R.string.initial_sync_start_importing_account_invited_rooms, 0.1f) {
|
||||
handleInvitedRoom(realm, it.key, it.value, insertType, syncLocalTimeStampMillis)
|
||||
|
@ -123,17 +133,57 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
|
|||
realm.insertOrUpdate(rooms)
|
||||
}
|
||||
|
||||
private fun insertJoinRooms(realm: Realm,
|
||||
handlingStrategy: HandlingStrategy.JOINED,
|
||||
insertType: EventInsertType,
|
||||
syncLocalTimeStampMillis: Long,
|
||||
reporter: DefaultInitialSyncProgressService?) {
|
||||
val maxSize = (initialSyncStrategy as? InitialSyncStrategy.Optimized)?.maxRoomsToInsert ?: Int.MAX_VALUE
|
||||
val listSize = handlingStrategy.data.keys.size
|
||||
val numberOfChunks = ceil(listSize / maxSize.toDouble()).toInt()
|
||||
|
||||
if (numberOfChunks > 1) {
|
||||
reportSubtask(reporter, R.string.initial_sync_start_importing_account_joined_rooms, numberOfChunks, 0.6f) {
|
||||
val chunkSize = listSize / numberOfChunks
|
||||
Timber.v("INIT_SYNC $listSize rooms to insert, split into $numberOfChunks sublists of $chunkSize items")
|
||||
// I cannot find a better way to chunk a map, so chunk the keys and then create new maps
|
||||
handlingStrategy.data.keys
|
||||
.chunked(chunkSize)
|
||||
.forEachIndexed { index, roomIds ->
|
||||
val roomEntities = roomIds
|
||||
.also { Timber.v("INIT_SYNC insert ${roomIds.size} rooms") }
|
||||
.map {
|
||||
handleJoinedRoom(
|
||||
realm,
|
||||
it,
|
||||
(handlingStrategy.data[it] ?: error("Should not happen")).roomSync,
|
||||
insertType,
|
||||
syncLocalTimeStampMillis
|
||||
)
|
||||
}
|
||||
realm.insertOrUpdate(roomEntities)
|
||||
reporter?.reportProgress(index + 1)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// No need to split
|
||||
val rooms = handlingStrategy.data.mapWithProgress(reporter, R.string.initial_sync_start_importing_account_joined_rooms, 0.6f) {
|
||||
handleJoinedRoom(realm, it.key, it.value.roomSync, insertType, syncLocalTimeStampMillis)
|
||||
}
|
||||
realm.insertOrUpdate(rooms)
|
||||
}
|
||||
}
|
||||
|
||||
private fun handleJoinedRoom(realm: Realm,
|
||||
roomId: String,
|
||||
roomSync: RoomSync,
|
||||
isInitialSync: Boolean,
|
||||
insertType: EventInsertType,
|
||||
syncLocalTimestampMillis: Long): RoomEntity {
|
||||
Timber.v("Handle join sync for room $roomId")
|
||||
|
||||
var ephemeralResult: EphemeralResult? = null
|
||||
if (roomSync.ephemeral?.events?.isNotEmpty() == true) {
|
||||
ephemeralResult = handleEphemeral(realm, roomId, roomSync.ephemeral, isInitialSync)
|
||||
ephemeralResult = handleEphemeral(realm, roomId, roomSync.ephemeral, insertType == EventInsertType.INITIAL_SYNC)
|
||||
}
|
||||
|
||||
if (roomSync.accountData?.events?.isNotEmpty() == true) {
|
||||
|
@ -173,8 +223,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
|
|||
roomSync.timeline.prevToken,
|
||||
roomSync.timeline.limited,
|
||||
insertType,
|
||||
syncLocalTimestampMillis,
|
||||
isInitialSync
|
||||
syncLocalTimestampMillis
|
||||
)
|
||||
roomEntity.addIfNecessary(chunkEntity)
|
||||
}
|
||||
|
@ -278,8 +327,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
|
|||
prevToken: String? = null,
|
||||
isLimited: Boolean = true,
|
||||
insertType: EventInsertType,
|
||||
syncLocalTimestampMillis: Long,
|
||||
isInitialSync: Boolean): ChunkEntity {
|
||||
syncLocalTimestampMillis: Long): ChunkEntity {
|
||||
val lastChunk = ChunkEntity.findLastForwardChunkOfRoom(realm, roomEntity.roomId)
|
||||
val chunkEntity = if (!isLimited && lastChunk != null) {
|
||||
lastChunk
|
||||
|
@ -299,7 +347,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
|
|||
}
|
||||
eventIds.add(event.eventId)
|
||||
|
||||
if (event.isEncrypted() && !isInitialSync) {
|
||||
if (event.isEncrypted() && insertType != EventInsertType.INITIAL_SYNC) {
|
||||
decryptIfNeeded(event, roomId)
|
||||
}
|
||||
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
|
||||
package org.matrix.android.sdk.internal.session.sync
|
||||
|
||||
import okhttp3.ResponseBody
|
||||
import org.matrix.android.sdk.internal.network.NetworkConstants
|
||||
import org.matrix.android.sdk.internal.network.TimeOutInterceptor
|
||||
import org.matrix.android.sdk.internal.session.sync.model.SyncResponse
|
||||
|
@ -23,6 +24,7 @@ import retrofit2.Call
|
|||
import retrofit2.http.GET
|
||||
import retrofit2.http.Header
|
||||
import retrofit2.http.QueryMap
|
||||
import retrofit2.http.Streaming
|
||||
|
||||
internal interface SyncAPI {
|
||||
/**
|
||||
|
@ -34,4 +36,15 @@ internal interface SyncAPI {
|
|||
@Header(TimeOutInterceptor.READ_TIMEOUT) readTimeOut: Long = TimeOutInterceptor.DEFAULT_LONG_TIMEOUT,
|
||||
@Header(TimeOutInterceptor.WRITE_TIMEOUT) writeTimeOut: Long = TimeOutInterceptor.DEFAULT_LONG_TIMEOUT
|
||||
): Call<SyncResponse>
|
||||
|
||||
/**
|
||||
* Set all the timeouts to 1 minute by default
|
||||
*/
|
||||
@Streaming
|
||||
@GET(NetworkConstants.URI_API_PREFIX_PATH_R0 + "sync")
|
||||
fun syncStream(@QueryMap params: Map<String, String>,
|
||||
@Header(TimeOutInterceptor.CONNECT_TIMEOUT) connectTimeOut: Long = TimeOutInterceptor.DEFAULT_LONG_TIMEOUT,
|
||||
@Header(TimeOutInterceptor.READ_TIMEOUT) readTimeOut: Long = TimeOutInterceptor.DEFAULT_LONG_TIMEOUT,
|
||||
@Header(TimeOutInterceptor.WRITE_TIMEOUT) writeTimeOut: Long = TimeOutInterceptor.DEFAULT_LONG_TIMEOUT
|
||||
): Call<ResponseBody>
|
||||
}
|
||||
|
|
|
@ -16,18 +16,32 @@
|
|||
|
||||
package org.matrix.android.sdk.internal.session.sync
|
||||
|
||||
import okhttp3.ResponseBody
|
||||
import okio.buffer
|
||||
import okio.source
|
||||
import org.matrix.android.sdk.R
|
||||
import org.matrix.android.sdk.internal.di.MoshiProvider
|
||||
import org.matrix.android.sdk.internal.di.SessionFilesDirectory
|
||||
import org.matrix.android.sdk.internal.di.UserId
|
||||
import org.matrix.android.sdk.internal.network.GlobalErrorReceiver
|
||||
import org.matrix.android.sdk.internal.network.TimeOutInterceptor
|
||||
import org.matrix.android.sdk.internal.network.executeRequest
|
||||
import org.matrix.android.sdk.internal.network.toFailure
|
||||
import org.matrix.android.sdk.internal.session.DefaultInitialSyncProgressService
|
||||
import org.matrix.android.sdk.internal.session.filter.FilterRepository
|
||||
import org.matrix.android.sdk.internal.session.homeserver.GetHomeServerCapabilitiesTask
|
||||
import org.matrix.android.sdk.internal.session.reportSubtask
|
||||
import org.matrix.android.sdk.internal.session.sync.model.LazyRoomSync
|
||||
import org.matrix.android.sdk.internal.session.sync.model.LazyRoomSyncJsonAdapter
|
||||
import org.matrix.android.sdk.internal.session.sync.model.SyncResponse
|
||||
import org.matrix.android.sdk.internal.session.user.UserStore
|
||||
import org.matrix.android.sdk.internal.task.Task
|
||||
import org.matrix.android.sdk.internal.util.logDuration
|
||||
import retrofit2.Response
|
||||
import retrofit2.awaitResponse
|
||||
import timber.log.Timber
|
||||
import java.io.File
|
||||
import java.net.SocketTimeoutException
|
||||
import javax.inject.Inject
|
||||
|
||||
internal interface SyncTask : Task<SyncTask.Params, Unit> {
|
||||
|
@ -48,9 +62,14 @@ internal class DefaultSyncTask @Inject constructor(
|
|||
private val getHomeServerCapabilitiesTask: GetHomeServerCapabilitiesTask,
|
||||
private val userStore: UserStore,
|
||||
private val syncTaskSequencer: SyncTaskSequencer,
|
||||
private val globalErrorReceiver: GlobalErrorReceiver
|
||||
private val globalErrorReceiver: GlobalErrorReceiver,
|
||||
@SessionFilesDirectory
|
||||
private val fileDirectory: File
|
||||
) : SyncTask {
|
||||
|
||||
private val workingDir = File(fileDirectory, "is")
|
||||
private val initialSyncStatusRepository: InitialSyncStatusRepository = FileInitialSyncStatusRepository(workingDir)
|
||||
|
||||
override suspend fun execute(params: SyncTask.Params) = syncTaskSequencer.post {
|
||||
doSync(params)
|
||||
}
|
||||
|
@ -81,20 +100,137 @@ internal class DefaultSyncTask @Inject constructor(
|
|||
|
||||
val readTimeOut = (params.timeout + TIMEOUT_MARGIN).coerceAtLeast(TimeOutInterceptor.DEFAULT_LONG_TIMEOUT)
|
||||
|
||||
val syncResponse = executeRequest<SyncResponse>(globalErrorReceiver) {
|
||||
apiCall = syncAPI.sync(
|
||||
params = requestParams,
|
||||
readTimeOut = readTimeOut
|
||||
)
|
||||
}
|
||||
syncResponseHandler.handleResponse(syncResponse, token)
|
||||
if (isInitialSync) {
|
||||
logDuration("INIT_SYNC strategy: $initialSyncStrategy") {
|
||||
if (initialSyncStrategy is InitialSyncStrategy.Optimized) {
|
||||
safeInitialSync(requestParams)
|
||||
} else {
|
||||
val syncResponse = logDuration("INIT_SYNC Request") {
|
||||
executeRequest<SyncResponse>(globalErrorReceiver) {
|
||||
apiCall = syncAPI.sync(
|
||||
params = requestParams,
|
||||
readTimeOut = readTimeOut
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
logDuration("INIT_SYNC Database insertion") {
|
||||
syncResponseHandler.handleResponse(syncResponse, token)
|
||||
}
|
||||
}
|
||||
}
|
||||
initialSyncProgressService.endAll()
|
||||
} else {
|
||||
val syncResponse = executeRequest<SyncResponse>(globalErrorReceiver) {
|
||||
apiCall = syncAPI.sync(
|
||||
params = requestParams,
|
||||
readTimeOut = readTimeOut
|
||||
)
|
||||
}
|
||||
syncResponseHandler.handleResponse(syncResponse, token)
|
||||
}
|
||||
Timber.v("Sync task finished on Thread: ${Thread.currentThread().name}")
|
||||
}
|
||||
|
||||
private suspend fun safeInitialSync(requestParams: Map<String, String>) {
|
||||
workingDir.mkdirs()
|
||||
val workingFile = File(workingDir, "initSync.json")
|
||||
val status = initialSyncStatusRepository.getStep()
|
||||
if (workingFile.exists() && status >= InitialSyncStatus.STEP_DOWNLOADED) {
|
||||
// Go directly to the parse step
|
||||
Timber.v("INIT_SYNC file is already here")
|
||||
} else {
|
||||
initialSyncStatusRepository.setStep(InitialSyncStatus.STEP_DOWNLOADING)
|
||||
val syncResponse = logDuration("INIT_SYNC Perform server request") {
|
||||
reportSubtask(initialSyncProgressService, R.string.initial_sync_start_server_computing, 0, 0.5f) {
|
||||
getSyncResponse(requestParams, MAX_NUMBER_OF_RETRY_AFTER_TIMEOUT)
|
||||
}
|
||||
}
|
||||
|
||||
if (syncResponse.isSuccessful) {
|
||||
logDuration("INIT_SYNC Download and save to file") {
|
||||
reportSubtask(initialSyncProgressService, R.string.initial_sync_start_downloading, 0, 0.5f) {
|
||||
syncResponse.body()?.byteStream()?.use { inputStream ->
|
||||
workingFile.outputStream().use { outputStream ->
|
||||
inputStream.copyTo(outputStream)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
throw syncResponse.toFailure(globalErrorReceiver)
|
||||
.also { Timber.w("INIT_SYNC request failure: $this") }
|
||||
}
|
||||
initialSyncStatusRepository.setStep(InitialSyncStatus.STEP_DOWNLOADED)
|
||||
}
|
||||
handleSyncFile(workingFile)
|
||||
|
||||
// Delete all files
|
||||
workingDir.deleteRecursively()
|
||||
}
|
||||
|
||||
private suspend fun getSyncResponse(requestParams: Map<String, String>, maxNumberOfRetries: Int): Response<ResponseBody> {
|
||||
var retry = maxNumberOfRetries
|
||||
while (true) {
|
||||
retry--
|
||||
try {
|
||||
return syncAPI.syncStream(
|
||||
params = requestParams
|
||||
).awaitResponse()
|
||||
} catch (throwable: Throwable) {
|
||||
if (throwable is SocketTimeoutException && retry > 0) {
|
||||
Timber.w("INIT_SYNC timeout retry left: $retry")
|
||||
} else {
|
||||
Timber.e(throwable, "INIT_SYNC timeout, no retry left, or other error")
|
||||
throw throwable
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun handleSyncFile(workingFile: File) {
|
||||
val syncResponseLength = workingFile.length().toInt()
|
||||
|
||||
logDuration("INIT_SYNC handleSyncFile() file size $syncResponseLength bytes") {
|
||||
if (syncResponseLength < (initialSyncStrategy as? InitialSyncStrategy.Optimized)?.minSizeToSplit ?: Long.MAX_VALUE) {
|
||||
// OK, no need to split just handle as a regular sync response
|
||||
Timber.v("INIT_SYNC no need to split")
|
||||
handleInitialSyncFile(workingFile)
|
||||
} else {
|
||||
Timber.v("INIT_SYNC Split into several smaller files")
|
||||
// Set file mode
|
||||
// TODO This is really ugly, I should improve that
|
||||
LazyRoomSyncJsonAdapter.initWith(workingFile)
|
||||
|
||||
handleInitialSyncFile(workingFile)
|
||||
|
||||
// Reset file mode
|
||||
LazyRoomSyncJsonAdapter.reset()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun handleInitialSyncFile(workingFile: File) {
|
||||
val syncResponse = logDuration("INIT_SYNC Read file and parse") {
|
||||
MoshiProvider.providesMoshi().adapter(SyncResponse::class.java)
|
||||
.fromJson(workingFile.source().buffer())!!
|
||||
}
|
||||
initialSyncStatusRepository.setStep(InitialSyncStatus.STEP_PARSED)
|
||||
|
||||
// Log some stats
|
||||
val nbOfJoinedRooms = syncResponse.rooms?.join?.size ?: 0
|
||||
val nbOfJoinedRoomsInFile = syncResponse.rooms?.join?.values?.count { it is LazyRoomSync.Stored }
|
||||
Timber.v("INIT_SYNC $nbOfJoinedRooms rooms, $nbOfJoinedRoomsInFile stored into files")
|
||||
|
||||
logDuration("INIT_SYNC Database insertion") {
|
||||
syncResponseHandler.handleResponse(syncResponse, null)
|
||||
}
|
||||
initialSyncStatusRepository.setStep(InitialSyncStatus.STEP_SUCCESS)
|
||||
}
|
||||
|
||||
companion object {
|
||||
private const val MAX_NUMBER_OF_RETRY_AFTER_TIMEOUT = 50
|
||||
|
||||
private const val TIMEOUT_MARGIN: Long = 10_000
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,44 @@
|
|||
/*
|
||||
* Copyright (c) 2021 The Matrix.org Foundation C.I.C.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.matrix.android.sdk.internal.session.sync.model
|
||||
|
||||
import com.squareup.moshi.JsonClass
|
||||
import com.squareup.moshi.JsonReader
|
||||
import okio.buffer
|
||||
import okio.source
|
||||
import org.matrix.android.sdk.internal.di.MoshiProvider
|
||||
import java.io.File
|
||||
|
||||
@JsonClass(generateAdapter = false)
|
||||
internal sealed class LazyRoomSync {
|
||||
data class Parsed(val _roomSync: RoomSync) : LazyRoomSync()
|
||||
data class Stored(val file: File) : LazyRoomSync()
|
||||
|
||||
val roomSync: RoomSync
|
||||
get() {
|
||||
return when (this) {
|
||||
is Parsed -> _roomSync
|
||||
is Stored -> {
|
||||
// Parse the file now
|
||||
file.inputStream().use { pos ->
|
||||
MoshiProvider.providesMoshi().adapter(RoomSync::class.java)
|
||||
.fromJson(JsonReader.of(pos.source().buffer()))!!
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,90 @@
|
|||
/*
|
||||
* Copyright (c) 2021 The Matrix.org Foundation C.I.C.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.matrix.android.sdk.internal.session.sync.model
|
||||
|
||||
import com.squareup.moshi.FromJson
|
||||
import com.squareup.moshi.JsonAdapter
|
||||
import com.squareup.moshi.JsonReader
|
||||
import com.squareup.moshi.JsonWriter
|
||||
import com.squareup.moshi.ToJson
|
||||
import org.matrix.android.sdk.internal.di.MoshiProvider
|
||||
import org.matrix.android.sdk.internal.session.sync.InitialSyncStrategy
|
||||
import org.matrix.android.sdk.internal.session.sync.initialSyncStrategy
|
||||
import timber.log.Timber
|
||||
import java.io.File
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
internal class LazyRoomSyncJsonAdapter : JsonAdapter<LazyRoomSync>() {
|
||||
|
||||
@FromJson
|
||||
override fun fromJson(reader: JsonReader): LazyRoomSync {
|
||||
return if (workingDirectory != null) {
|
||||
val path = reader.path
|
||||
// val roomId = reader.path.substringAfter("\$.rooms.join.")
|
||||
|
||||
// inputStream.available() return 0... So read it to a String then decide to store in a file or to parse it now
|
||||
val json = reader.nextSource().inputStream().bufferedReader().use {
|
||||
it.readText()
|
||||
}
|
||||
|
||||
val limit = (initialSyncStrategy as? InitialSyncStrategy.Optimized)?.minSizeToStoreInFile ?: Long.MAX_VALUE
|
||||
if (json.length > limit) {
|
||||
Timber.v("INIT_SYNC $path content length: ${json.length} copy to a file")
|
||||
// Copy the source to a file
|
||||
val file = createFile()
|
||||
file.writeText(json)
|
||||
LazyRoomSync.Stored(file)
|
||||
} else {
|
||||
Timber.v("INIT_SYNC $path content length: ${json.length} parse it now")
|
||||
// Parse it now
|
||||
val roomSync = MoshiProvider.providesMoshi().adapter(RoomSync::class.java).fromJson(json)!!
|
||||
LazyRoomSync.Parsed(roomSync)
|
||||
}
|
||||
} else {
|
||||
// Parse it now
|
||||
val roomSync = MoshiProvider.providesMoshi().adapter(RoomSync::class.java).fromJson(reader)!!
|
||||
LazyRoomSync.Parsed(roomSync)
|
||||
}
|
||||
}
|
||||
|
||||
@ToJson
|
||||
override fun toJson(writer: JsonWriter, value: LazyRoomSync?) {
|
||||
// This Adapter is not supposed to serialize object
|
||||
throw UnsupportedOperationException()
|
||||
}
|
||||
|
||||
companion object {
|
||||
fun initWith(file: File) {
|
||||
workingDirectory = file.parentFile
|
||||
atomicInteger.set(0)
|
||||
}
|
||||
|
||||
fun reset() {
|
||||
workingDirectory = null
|
||||
}
|
||||
|
||||
private fun createFile(): File {
|
||||
val parent = workingDirectory ?: error("workingDirectory is not initialized")
|
||||
val index = atomicInteger.getAndIncrement()
|
||||
|
||||
return File(parent, "room_$index.json")
|
||||
}
|
||||
|
||||
private var workingDirectory: File? = null
|
||||
private val atomicInteger = AtomicInteger(0)
|
||||
}
|
||||
}
|
|
@ -24,7 +24,7 @@ internal data class RoomsSyncResponse(
|
|||
/**
|
||||
* Joined rooms: keys are rooms ids.
|
||||
*/
|
||||
@Json(name = "join") val join: Map<String, RoomSync> = emptyMap(),
|
||||
@Json(name = "join") val join: Map<String, LazyRoomSync> = emptyMap(),
|
||||
|
||||
/**
|
||||
* Invitations. The rooms that the user has been invited to: keys are rooms ids.
|
||||
|
|
|
@ -0,0 +1,29 @@
|
|||
/*
|
||||
* Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.matrix.android.sdk.internal.util
|
||||
|
||||
import timber.log.Timber
|
||||
|
||||
internal suspend fun <T> logDuration(message: String,
|
||||
block: suspend () -> T): T {
|
||||
Timber.v("$message -- BEGIN")
|
||||
val start = System.currentTimeMillis()
|
||||
val result = block()
|
||||
val duration = System.currentTimeMillis() - start
|
||||
Timber.v("$message -- END duration: $duration ms")
|
||||
return result
|
||||
}
|
|
@ -199,6 +199,8 @@
|
|||
<string name="room_displayname_empty_room">Empty room</string>
|
||||
<string name="room_displayname_empty_room_was">Empty room (was %s)</string>
|
||||
|
||||
<string name="initial_sync_start_server_computing">Initial Sync:\nWaiting for server response…</string>
|
||||
<string name="initial_sync_start_downloading">Initial Sync:\nDownloading data…</string>
|
||||
<string name="initial_sync_start_importing_account">Initial Sync:\nImporting account…</string>
|
||||
<string name="initial_sync_start_importing_account_crypto">Initial Sync:\nImporting crypto</string>
|
||||
<string name="initial_sync_start_importing_account_rooms">Initial Sync:\nImporting Rooms</string>
|
||||
|
|
|
@ -40,6 +40,8 @@ import im.vector.app.core.platform.ToolbarConfigurable
|
|||
import im.vector.app.core.platform.VectorBaseActivity
|
||||
import im.vector.app.core.pushers.PushersManager
|
||||
import im.vector.app.databinding.ActivityHomeBinding
|
||||
import im.vector.app.features.MainActivity
|
||||
import im.vector.app.features.MainActivityArgs
|
||||
import im.vector.app.features.disclaimer.showDisclaimerDialog
|
||||
import im.vector.app.features.matrixto.MatrixToBottomSheet
|
||||
import im.vector.app.features.notifications.NotificationDrawerManager
|
||||
|
@ -60,6 +62,8 @@ import kotlinx.parcelize.Parcelize
|
|||
import org.matrix.android.sdk.api.session.InitialSyncProgressService
|
||||
import org.matrix.android.sdk.api.session.permalinks.PermalinkService
|
||||
import org.matrix.android.sdk.api.util.MatrixItem
|
||||
import org.matrix.android.sdk.internal.session.sync.InitialSyncStrategy
|
||||
import org.matrix.android.sdk.internal.session.sync.initialSyncStrategy
|
||||
import timber.log.Timber
|
||||
import javax.inject.Inject
|
||||
|
||||
|
@ -368,6 +372,20 @@ class HomeActivity :
|
|||
bugReporter.openBugReportScreen(this, false)
|
||||
return true
|
||||
}
|
||||
R.id.menu_home_init_sync_legacy -> {
|
||||
// Configure the SDK
|
||||
initialSyncStrategy = InitialSyncStrategy.Legacy
|
||||
// And clear cache
|
||||
MainActivity.restartApp(this, MainActivityArgs(clearCache = true))
|
||||
return true
|
||||
}
|
||||
R.id.menu_home_init_sync_optimized -> {
|
||||
// Configure the SDK
|
||||
initialSyncStrategy = InitialSyncStrategy.Optimized()
|
||||
// And clear cache
|
||||
MainActivity.restartApp(this, MainActivityArgs(clearCache = true))
|
||||
return true
|
||||
}
|
||||
R.id.menu_home_filter -> {
|
||||
navigator.openRoomsFiltering(this)
|
||||
return true
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
<?xml version="1.0" encoding="utf-8"?>
|
||||
<menu xmlns:android="http://schemas.android.com/apk/res/android"
|
||||
xmlns:app="http://schemas.android.com/apk/res-auto">
|
||||
xmlns:app="http://schemas.android.com/apk/res-auto"
|
||||
xmlns:tools="http://schemas.android.com/tools">
|
||||
|
||||
<item
|
||||
android:id="@+id/menu_home_setting"
|
||||
|
@ -18,6 +19,16 @@
|
|||
android:icon="@drawable/ic_material_bug_report"
|
||||
android:title="@string/send_bug_report" />
|
||||
|
||||
<item
|
||||
android:id="@+id/menu_home_init_sync_legacy"
|
||||
android:title="Init sync legacy"
|
||||
tools:ignore="HardcodedText" />
|
||||
|
||||
<item
|
||||
android:id="@+id/menu_home_init_sync_optimized"
|
||||
android:title="Init sync optimized"
|
||||
tools:ignore="HardcodedText" />
|
||||
|
||||
<item
|
||||
android:id="@+id/menu_home_filter"
|
||||
android:icon="@drawable/ic_search"
|
||||
|
|
Loading…
Reference in a new issue