Sync: make only one big transaction to avoid having bad states

This commit is contained in:
ganfra 2019-12-13 18:21:44 +01:00
parent fe2be90002
commit 5dd46e82d7
12 changed files with 196 additions and 160 deletions

View file

@ -26,7 +26,6 @@ import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.android.asCoroutineDispatcher import kotlinx.coroutines.android.asCoroutineDispatcher
import kotlinx.coroutines.asCoroutineDispatcher import kotlinx.coroutines.asCoroutineDispatcher
import org.matrix.olm.OlmManager import org.matrix.olm.OlmManager
import java.util.concurrent.Executors
@Module @Module
internal object MatrixModule { internal object MatrixModule {

View file

@ -16,7 +16,6 @@
package im.vector.matrix.android.internal.session.sync package im.vector.matrix.android.internal.session.sync
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.R import im.vector.matrix.android.R
import im.vector.matrix.android.api.session.room.model.Membership import im.vector.matrix.android.api.session.room.model.Membership
import im.vector.matrix.android.internal.database.model.GroupEntity import im.vector.matrix.android.internal.database.model.GroupEntity
@ -25,11 +24,10 @@ import im.vector.matrix.android.internal.session.DefaultInitialSyncProgressServi
import im.vector.matrix.android.internal.session.mapWithProgress import im.vector.matrix.android.internal.session.mapWithProgress
import im.vector.matrix.android.internal.session.sync.model.GroupsSyncResponse import im.vector.matrix.android.internal.session.sync.model.GroupsSyncResponse
import im.vector.matrix.android.internal.session.sync.model.InvitedGroupSync import im.vector.matrix.android.internal.session.sync.model.InvitedGroupSync
import im.vector.matrix.android.internal.util.awaitTransaction
import io.realm.Realm import io.realm.Realm
import javax.inject.Inject import javax.inject.Inject
internal class GroupSyncHandler @Inject constructor(private val monarchy: Monarchy) { internal class GroupSyncHandler @Inject constructor() {
sealed class HandlingStrategy { sealed class HandlingStrategy {
data class JOINED(val data: Map<String, Any>) : HandlingStrategy() data class JOINED(val data: Map<String, Any>) : HandlingStrategy()
@ -37,12 +35,14 @@ internal class GroupSyncHandler @Inject constructor(private val monarchy: Monarc
data class LEFT(val data: Map<String, Any>) : HandlingStrategy() data class LEFT(val data: Map<String, Any>) : HandlingStrategy()
} }
suspend fun handle(roomsSyncResponse: GroupsSyncResponse, reporter: DefaultInitialSyncProgressService? = null) { fun handle(
monarchy.awaitTransaction { realm -> realm: Realm,
handleGroupSync(realm, HandlingStrategy.JOINED(roomsSyncResponse.join), reporter) roomsSyncResponse: GroupsSyncResponse,
handleGroupSync(realm, HandlingStrategy.INVITED(roomsSyncResponse.invite), reporter) reporter: DefaultInitialSyncProgressService? = null
handleGroupSync(realm, HandlingStrategy.LEFT(roomsSyncResponse.leave), reporter) ) {
} handleGroupSync(realm, HandlingStrategy.JOINED(roomsSyncResponse.join), reporter)
handleGroupSync(realm, HandlingStrategy.INVITED(roomsSyncResponse.invite), reporter)
handleGroupSync(realm, HandlingStrategy.LEFT(roomsSyncResponse.leave), reporter)
} }
// PRIVATE METHODS ***************************************************************************** // PRIVATE METHODS *****************************************************************************

View file

@ -16,9 +16,7 @@
package im.vector.matrix.android.internal.session.sync package im.vector.matrix.android.internal.session.sync
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.R import im.vector.matrix.android.R
import im.vector.matrix.android.api.pushrules.RuleScope
import im.vector.matrix.android.api.session.events.model.Event import im.vector.matrix.android.api.session.events.model.Event
import im.vector.matrix.android.api.session.events.model.EventType import im.vector.matrix.android.api.session.events.model.EventType
import im.vector.matrix.android.api.session.events.model.toModel import im.vector.matrix.android.api.session.events.model.toModel
@ -34,31 +32,21 @@ import im.vector.matrix.android.internal.database.query.findLastLiveChunkFromRoo
import im.vector.matrix.android.internal.database.query.where import im.vector.matrix.android.internal.database.query.where
import im.vector.matrix.android.internal.session.DefaultInitialSyncProgressService import im.vector.matrix.android.internal.session.DefaultInitialSyncProgressService
import im.vector.matrix.android.internal.session.mapWithProgress import im.vector.matrix.android.internal.session.mapWithProgress
import im.vector.matrix.android.internal.session.notification.DefaultPushRuleService
import im.vector.matrix.android.internal.session.notification.ProcessEventForPushTask
import im.vector.matrix.android.internal.session.room.RoomSummaryUpdater import im.vector.matrix.android.internal.session.room.RoomSummaryUpdater
import im.vector.matrix.android.internal.session.room.read.FullyReadContent import im.vector.matrix.android.internal.session.room.read.FullyReadContent
import im.vector.matrix.android.internal.session.room.timeline.PaginationDirection import im.vector.matrix.android.internal.session.room.timeline.PaginationDirection
import im.vector.matrix.android.internal.session.sync.model.* import im.vector.matrix.android.internal.session.sync.model.*
import im.vector.matrix.android.internal.session.user.UserEntityFactory import im.vector.matrix.android.internal.session.user.UserEntityFactory
import im.vector.matrix.android.internal.task.TaskExecutor
import im.vector.matrix.android.internal.task.configureWith
import im.vector.matrix.android.internal.util.awaitTransaction
import io.realm.Realm import io.realm.Realm
import io.realm.kotlin.createObject import io.realm.kotlin.createObject
import timber.log.Timber import timber.log.Timber
import javax.inject.Inject import javax.inject.Inject
internal class RoomSyncHandler @Inject constructor(private val monarchy: Monarchy, internal class RoomSyncHandler @Inject constructor(private val readReceiptHandler: ReadReceiptHandler,
private val readReceiptHandler: ReadReceiptHandler,
private val roomSummaryUpdater: RoomSummaryUpdater, private val roomSummaryUpdater: RoomSummaryUpdater,
private val roomTagHandler: RoomTagHandler, private val roomTagHandler: RoomTagHandler,
private val roomFullyReadHandler: RoomFullyReadHandler, private val roomFullyReadHandler: RoomFullyReadHandler,
private val cryptoService: DefaultCryptoService, private val cryptoService: DefaultCryptoService) {
private val tokenStore: SyncTokenStore,
private val pushRuleService: DefaultPushRuleService,
private val processForPushTask: ProcessEventForPushTask,
private val taskExecutor: TaskExecutor) {
sealed class HandlingStrategy { sealed class HandlingStrategy {
data class JOINED(val data: Map<String, RoomSync>) : HandlingStrategy() data class JOINED(val data: Map<String, RoomSync>) : HandlingStrategy()
@ -66,28 +54,16 @@ internal class RoomSyncHandler @Inject constructor(private val monarchy: Monarch
data class LEFT(val data: Map<String, RoomSync>) : HandlingStrategy() data class LEFT(val data: Map<String, RoomSync>) : HandlingStrategy()
} }
suspend fun handle(roomsSyncResponse: RoomsSyncResponse, isInitialSync: Boolean, reporter: DefaultInitialSyncProgressService? = null) { fun handle(
realm: Realm,
roomsSyncResponse: RoomsSyncResponse,
isInitialSync: Boolean,
reporter: DefaultInitialSyncProgressService? = null
) {
Timber.v("Execute transaction from $this") Timber.v("Execute transaction from $this")
monarchy.awaitTransaction { realm -> handleRoomSync(realm, HandlingStrategy.JOINED(roomsSyncResponse.join), isInitialSync, reporter)
handleRoomSync(realm, HandlingStrategy.JOINED(roomsSyncResponse.join), isInitialSync, reporter) handleRoomSync(realm, HandlingStrategy.INVITED(roomsSyncResponse.invite), isInitialSync, reporter)
handleRoomSync(realm, HandlingStrategy.INVITED(roomsSyncResponse.invite), isInitialSync, reporter) handleRoomSync(realm, HandlingStrategy.LEFT(roomsSyncResponse.leave), isInitialSync, reporter)
handleRoomSync(realm, HandlingStrategy.LEFT(roomsSyncResponse.leave), isInitialSync, reporter)
}
// handle event for bing rule checks
checkPushRules(roomsSyncResponse)
}
private fun checkPushRules(roomsSyncResponse: RoomsSyncResponse) {
Timber.v("[PushRules] --> checkPushRules")
if (tokenStore.getLastToken() == null) {
Timber.v("[PushRules] <-- No push rule check on initial sync")
return
} // nothing on initial sync
val rules = pushRuleService.getPushRules(RuleScope.GLOBAL)
processForPushTask.configureWith(ProcessEventForPushTask.Params(roomsSyncResponse, rules))
.executeBy(taskExecutor)
Timber.v("[PushRules] <-- Push task scheduled")
} }
// PRIVATE METHODS ***************************************************************************** // PRIVATE METHODS *****************************************************************************
@ -137,7 +113,7 @@ internal class RoomSyncHandler @Inject constructor(private val monarchy: Monarch
if (roomSync.state != null && roomSync.state.events.isNotEmpty()) { if (roomSync.state != null && roomSync.state.events.isNotEmpty()) {
val minStateIndex = roomEntity.untimelinedStateEvents.where().min(EventEntityFields.STATE_INDEX)?.toInt() val minStateIndex = roomEntity.untimelinedStateEvents.where().min(EventEntityFields.STATE_INDEX)?.toInt()
?: Int.MIN_VALUE ?: Int.MIN_VALUE
val untimelinedStateIndex = minStateIndex + 1 val untimelinedStateIndex = minStateIndex + 1
roomSync.state.events.forEach { event -> roomSync.state.events.forEach { event ->
roomEntity.addStateEvent(event, filterDuplicates = true, stateIndex = untimelinedStateIndex) roomEntity.addStateEvent(event, filterDuplicates = true, stateIndex = untimelinedStateIndex)

View file

@ -16,20 +16,30 @@
package im.vector.matrix.android.internal.session.sync package im.vector.matrix.android.internal.session.sync
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.R import im.vector.matrix.android.R
import im.vector.matrix.android.api.pushrules.PushRuleService
import im.vector.matrix.android.api.pushrules.RuleScope
import im.vector.matrix.android.internal.crypto.DefaultCryptoService import im.vector.matrix.android.internal.crypto.DefaultCryptoService
import im.vector.matrix.android.internal.session.DefaultInitialSyncProgressService import im.vector.matrix.android.internal.session.DefaultInitialSyncProgressService
import im.vector.matrix.android.internal.session.notification.ProcessEventForPushTask
import im.vector.matrix.android.internal.session.reportSubtask import im.vector.matrix.android.internal.session.reportSubtask
import im.vector.matrix.android.internal.session.sync.model.RoomsSyncResponse
import im.vector.matrix.android.internal.session.sync.model.SyncResponse import im.vector.matrix.android.internal.session.sync.model.SyncResponse
import im.vector.matrix.android.internal.util.awaitTransaction
import timber.log.Timber import timber.log.Timber
import javax.inject.Inject import javax.inject.Inject
import kotlin.system.measureTimeMillis import kotlin.system.measureTimeMillis
internal class SyncResponseHandler @Inject constructor(private val roomSyncHandler: RoomSyncHandler, internal class SyncResponseHandler @Inject constructor(private val monarchy: Monarchy,
private val roomSyncHandler: RoomSyncHandler,
private val userAccountDataSyncHandler: UserAccountDataSyncHandler, private val userAccountDataSyncHandler: UserAccountDataSyncHandler,
private val groupSyncHandler: GroupSyncHandler, private val groupSyncHandler: GroupSyncHandler,
private val cryptoSyncHandler: CryptoSyncHandler, private val cryptoSyncHandler: CryptoSyncHandler,
private val cryptoService: DefaultCryptoService, private val cryptoService: DefaultCryptoService,
private val tokenStore: SyncTokenStore,
private val processEventForPushTask: ProcessEventForPushTask,
private val pushRuleService: PushRuleService,
private val initialSyncProgressService: DefaultInitialSyncProgressService) { private val initialSyncProgressService: DefaultInitialSyncProgressService) {
suspend fun handleResponse(syncResponse: SyncResponse, fromToken: String?) { suspend fun handleResponse(syncResponse: SyncResponse, fromToken: String?) {
@ -45,26 +55,27 @@ internal class SyncResponseHandler @Inject constructor(private val roomSyncHandl
}.also { }.also {
Timber.v("Finish handling start cryptoService in $it ms") Timber.v("Finish handling start cryptoService in $it ms")
} }
val measure = measureTimeMillis {
// Handle the to device events before the room ones
// to ensure to decrypt them properly
measureTimeMillis {
Timber.v("Handle toDevice")
reportSubtask(reporter, R.string.initial_sync_start_importing_account_crypto, 100, 0.1f) {
if (syncResponse.toDevice != null) {
cryptoSyncHandler.handleToDevice(syncResponse.toDevice, reporter)
}
}
}.also {
Timber.v("Finish handling toDevice in $it ms")
}
// Handle the to device events before the room ones
// to ensure to decrypt them properly
measureTimeMillis {
Timber.v("Handle toDevice")
reportSubtask(reporter, R.string.initial_sync_start_importing_account_crypto, 100, 0.1f) {
if (syncResponse.toDevice != null) {
cryptoSyncHandler.handleToDevice(syncResponse.toDevice, reporter)
}
}
}.also {
Timber.v("Finish handling toDevice in $it ms")
}
// Start one big transaction
monarchy.awaitTransaction { realm ->
measureTimeMillis { measureTimeMillis {
Timber.v("Handle rooms") Timber.v("Handle rooms")
reportSubtask(reporter, R.string.initial_sync_start_importing_account_rooms, 100, 0.7f) { reportSubtask(reporter, R.string.initial_sync_start_importing_account_rooms, 100, 0.7f) {
if (syncResponse.rooms != null) { if (syncResponse.rooms != null) {
roomSyncHandler.handle(syncResponse.rooms, isInitialSync, reporter) roomSyncHandler.handle(realm, syncResponse.rooms, isInitialSync, reporter)
} }
} }
}.also { }.also {
@ -75,7 +86,7 @@ internal class SyncResponseHandler @Inject constructor(private val roomSyncHandl
reportSubtask(reporter, R.string.initial_sync_start_importing_account_groups, 100, 0.1f) { reportSubtask(reporter, R.string.initial_sync_start_importing_account_groups, 100, 0.1f) {
Timber.v("Handle groups") Timber.v("Handle groups")
if (syncResponse.groups != null) { if (syncResponse.groups != null) {
groupSyncHandler.handle(syncResponse.groups, reporter) groupSyncHandler.handle(realm, syncResponse.groups, reporter)
} }
} }
}.also { }.also {
@ -85,15 +96,32 @@ internal class SyncResponseHandler @Inject constructor(private val roomSyncHandl
measureTimeMillis { measureTimeMillis {
reportSubtask(reporter, R.string.initial_sync_start_importing_account_data, 100, 0.1f) { reportSubtask(reporter, R.string.initial_sync_start_importing_account_data, 100, 0.1f) {
Timber.v("Handle accountData") Timber.v("Handle accountData")
userAccountDataSyncHandler.handle(syncResponse.accountData, syncResponse.rooms?.invite) userAccountDataSyncHandler.handle(realm, syncResponse.accountData)
} }
}.also { }.also {
Timber.v("Finish handling accountData in $it ms") Timber.v("Finish handling accountData in $it ms")
} }
tokenStore.saveToken(realm, syncResponse.nextBatch)
Timber.v("On sync completed")
cryptoSyncHandler.onSyncCompleted(syncResponse)
} }
Timber.v("Finish handling sync in $measure ms")
// Everything else we need to do outside the transaction
syncResponse.rooms?.also {
checkPushRules(it, isInitialSync)
userAccountDataSyncHandler.synchronizeWithServerIfNeeded(it.invite)
}
Timber.v("On sync completed")
cryptoSyncHandler.onSyncCompleted(syncResponse)
}
private suspend fun checkPushRules(roomsSyncResponse: RoomsSyncResponse, isInitialSync: Boolean) {
Timber.v("[PushRules] --> checkPushRules")
if (isInitialSync) {
Timber.v("[PushRules] <-- No push rule check on initial sync")
return
} // nothing on initial sync
val rules = pushRuleService.getPushRules(RuleScope.GLOBAL)
processEventForPushTask.execute(ProcessEventForPushTask.Params(roomsSyncResponse, rules))
Timber.v("[PushRules] <-- Push task scheduled")
} }
} }

View file

@ -21,7 +21,6 @@ import im.vector.matrix.android.api.failure.Failure
import im.vector.matrix.android.api.failure.MatrixError import im.vector.matrix.android.api.failure.MatrixError
import im.vector.matrix.android.internal.auth.SessionParamsStore import im.vector.matrix.android.internal.auth.SessionParamsStore
import im.vector.matrix.android.internal.di.UserId import im.vector.matrix.android.internal.di.UserId
import im.vector.matrix.android.internal.network.NetworkConnectivityChecker
import im.vector.matrix.android.internal.network.executeRequest import im.vector.matrix.android.internal.network.executeRequest
import im.vector.matrix.android.internal.session.DefaultInitialSyncProgressService import im.vector.matrix.android.internal.session.DefaultInitialSyncProgressService
import im.vector.matrix.android.internal.session.filter.FilterRepository import im.vector.matrix.android.internal.session.filter.FilterRepository
@ -88,7 +87,6 @@ internal class DefaultSyncTask @Inject constructor(private val syncAPI: SyncAPI,
throw throwable throw throwable
} }
syncResponseHandler.handleResponse(syncResponse, token) syncResponseHandler.handleResponse(syncResponse, token)
syncTokenStore.saveToken(syncResponse.nextBatch)
if (isInitialSync) { if (isInitialSync) {
initialSyncProgressService.endAll() initialSyncProgressService.endAll()
} }

View file

@ -18,7 +18,6 @@ package im.vector.matrix.android.internal.session.sync
import com.zhuinden.monarchy.Monarchy import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.internal.database.model.SyncEntity import im.vector.matrix.android.internal.database.model.SyncEntity
import im.vector.matrix.android.internal.util.awaitTransaction
import io.realm.Realm import io.realm.Realm
import javax.inject.Inject import javax.inject.Inject
@ -30,10 +29,8 @@ internal class SyncTokenStore @Inject constructor(private val monarchy: Monarchy
} }
} }
suspend fun saveToken(token: String?) { fun saveToken(realm: Realm, token: String?) {
monarchy.awaitTransaction { val sync = SyncEntity(token)
val sync = SyncEntity(token) realm.insertOrUpdate(sync)
it.insertOrUpdate(sync)
}
} }
} }

View file

@ -17,44 +17,39 @@
package im.vector.matrix.android.internal.session.sync package im.vector.matrix.android.internal.session.sync
import com.zhuinden.monarchy.Monarchy import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.api.pushrules.RuleScope
import im.vector.matrix.android.api.pushrules.RuleSetKey
import im.vector.matrix.android.api.session.events.model.toModel import im.vector.matrix.android.api.session.events.model.toModel
import im.vector.matrix.android.api.session.room.model.RoomMember import im.vector.matrix.android.api.session.room.model.RoomMember
import im.vector.matrix.android.internal.database.mapper.PushRulesMapper
import im.vector.matrix.android.internal.database.mapper.asDomain import im.vector.matrix.android.internal.database.mapper.asDomain
import im.vector.matrix.android.internal.database.model.RoomSummaryEntity import im.vector.matrix.android.internal.database.model.*
import im.vector.matrix.android.internal.database.query.getDirectRooms import im.vector.matrix.android.internal.database.query.getDirectRooms
import im.vector.matrix.android.internal.database.query.getOrCreate
import im.vector.matrix.android.internal.database.query.where import im.vector.matrix.android.internal.database.query.where
import im.vector.matrix.android.internal.di.UserId import im.vector.matrix.android.internal.di.UserId
import im.vector.matrix.android.internal.session.pushers.SavePushRulesTask
import im.vector.matrix.android.internal.session.room.membership.RoomMembers import im.vector.matrix.android.internal.session.room.membership.RoomMembers
import im.vector.matrix.android.internal.session.sync.model.InvitedRoomSync import im.vector.matrix.android.internal.session.sync.model.InvitedRoomSync
import im.vector.matrix.android.internal.session.sync.model.accountdata.* import im.vector.matrix.android.internal.session.sync.model.accountdata.*
import im.vector.matrix.android.internal.session.user.accountdata.DirectChatsHelper import im.vector.matrix.android.internal.session.user.accountdata.DirectChatsHelper
import im.vector.matrix.android.internal.session.user.accountdata.SaveBreadcrumbsTask
import im.vector.matrix.android.internal.session.user.accountdata.SaveIgnoredUsersTask
import im.vector.matrix.android.internal.session.user.accountdata.UpdateUserAccountDataTask import im.vector.matrix.android.internal.session.user.accountdata.UpdateUserAccountDataTask
import im.vector.matrix.android.internal.task.TaskExecutor
import im.vector.matrix.android.internal.task.configureWith
import im.vector.matrix.android.internal.util.awaitTransaction
import io.realm.Realm import io.realm.Realm
import io.realm.RealmList
import timber.log.Timber import timber.log.Timber
import javax.inject.Inject import javax.inject.Inject
internal class UserAccountDataSyncHandler @Inject constructor(private val monarchy: Monarchy, internal class UserAccountDataSyncHandler @Inject constructor(private val monarchy: Monarchy,
@UserId private val userId: String, @UserId private val userId: String,
private val directChatsHelper: DirectChatsHelper, private val directChatsHelper: DirectChatsHelper,
private val updateUserAccountDataTask: UpdateUserAccountDataTask, private val updateUserAccountDataTask: UpdateUserAccountDataTask) {
private val savePushRulesTask: SavePushRulesTask,
private val saveIgnoredUsersTask: SaveIgnoredUsersTask,
private val saveBreadcrumbsTask: SaveBreadcrumbsTask,
private val taskExecutor: TaskExecutor) {
suspend fun handle(accountData: UserAccountDataSync?, invites: Map<String, InvitedRoomSync>?) { fun handle(realm: Realm, accountData: UserAccountDataSync?) {
accountData?.list?.forEach { accountData?.list?.forEach {
when (it) { when (it) {
is UserAccountDataDirectMessages -> handleDirectChatRooms(it) is UserAccountDataDirectMessages -> handleDirectChatRooms(realm, it)
is UserAccountDataPushRules -> handlePushRules(it) is UserAccountDataPushRules -> handlePushRules(realm, it)
is UserAccountDataIgnoredUsers -> handleIgnoredUsers(it) is UserAccountDataIgnoredUsers -> handleIgnoredUsers(realm, it)
is UserAccountDataBreadcrumbs -> handleBreadcrumbs(it) is UserAccountDataBreadcrumbs -> handleBreadcrumbs(realm, it)
is UserAccountDataFallback -> Timber.d("Receive account data of unhandled type ${it.type}") is UserAccountDataFallback -> Timber.d("Receive account data of unhandled type ${it.type}")
else -> error("Missing code here!") else -> error("Missing code here!")
} }
@ -65,78 +60,134 @@ internal class UserAccountDataSyncHandler @Inject constructor(private val monarc
// it.toString() // it.toString()
// MoshiProvider.providesMoshi() // MoshiProvider.providesMoshi()
// } // }
monarchy.doWithRealm { realm ->
synchronizeWithServerIfNeeded(realm, invites)
}
}
private suspend fun handlePushRules(userAccountDataPushRules: UserAccountDataPushRules) {
savePushRulesTask.execute(SavePushRulesTask.Params(userAccountDataPushRules.content))
}
private suspend fun handleDirectChatRooms(directMessages: UserAccountDataDirectMessages) {
monarchy.awaitTransaction { realm ->
val oldDirectRooms = RoomSummaryEntity.getDirectRooms(realm)
oldDirectRooms.forEach {
it.isDirect = false
it.directUserId = null
}
directMessages.content.forEach {
val userId = it.key
it.value.forEach { roomId ->
val roomSummaryEntity = RoomSummaryEntity.where(realm, roomId).findFirst()
if (roomSummaryEntity != null) {
roomSummaryEntity.isDirect = true
roomSummaryEntity.directUserId = userId
realm.insertOrUpdate(roomSummaryEntity)
}
}
}
}
} }
// If we get some direct chat invites, we synchronize the user account data including those. // If we get some direct chat invites, we synchronize the user account data including those.
private fun synchronizeWithServerIfNeeded(realm: Realm, invites: Map<String, InvitedRoomSync>?) { suspend fun synchronizeWithServerIfNeeded(invites: Map<String, InvitedRoomSync>) {
if (invites.isNullOrEmpty()) return if (invites.isNullOrEmpty()) return
val directChats = directChatsHelper.getLocalUserAccount() val directChats = directChatsHelper.getLocalUserAccount()
var hasUpdate = false var hasUpdate = false
invites.forEach { (roomId, _) -> monarchy.doWithRealm { realm ->
val myUserStateEvent = RoomMembers(realm, roomId).getStateEvent(userId) invites.forEach { (roomId, _) ->
val inviterId = myUserStateEvent?.sender val myUserStateEvent = RoomMembers(realm, roomId).getStateEvent(userId)
val myUserRoomMember: RoomMember? = myUserStateEvent?.let { it.asDomain().content?.toModel() } val inviterId = myUserStateEvent?.sender
val isDirect = myUserRoomMember?.isDirect val myUserRoomMember: RoomMember? = myUserStateEvent?.let { it.asDomain().content?.toModel() }
if (inviterId != null && inviterId != userId && isDirect == true) { val isDirect = myUserRoomMember?.isDirect
directChats if (inviterId != null && inviterId != userId && isDirect == true) {
.getOrPut(inviterId, { arrayListOf() }) directChats
.apply { .getOrPut(inviterId, { arrayListOf() })
if (contains(roomId)) { .apply {
Timber.v("Direct chats already include room $roomId with user $inviterId") if (contains(roomId)) {
} else { Timber.v("Direct chats already include room $roomId with user $inviterId")
add(roomId) } else {
hasUpdate = true add(roomId)
hasUpdate = true
}
} }
} }
} }
} }
if (hasUpdate) { if (hasUpdate) {
val updateUserAccountParams = UpdateUserAccountDataTask.DirectChatParams( val updateUserAccountParams = UpdateUserAccountDataTask.DirectChatParams(
directMessages = directChats directMessages = directChats
) )
updateUserAccountDataTask.configureWith(updateUserAccountParams).executeBy(taskExecutor) updateUserAccountDataTask.execute(updateUserAccountParams)
} }
} }
private fun handleIgnoredUsers(userAccountDataIgnoredUsers: UserAccountDataIgnoredUsers) {
saveIgnoredUsersTask private fun handlePushRules(realm: Realm, userAccountDataPushRules: UserAccountDataPushRules) {
.configureWith(SaveIgnoredUsersTask.Params(userAccountDataIgnoredUsers.content.ignoredUsers.keys.toList())) val pushRules = userAccountDataPushRules.content
.executeBy(taskExecutor) realm.where(PushRulesEntity::class.java)
.findAll()
.deleteAllFromRealm()
// Save only global rules for the moment
val globalRules = pushRules.global
val content = PushRulesEntity(RuleScope.GLOBAL).apply { kind = RuleSetKey.CONTENT }
globalRules.content?.forEach { rule ->
content.pushRules.add(PushRulesMapper.map(rule))
}
realm.insertOrUpdate(content)
val override = PushRulesEntity(RuleScope.GLOBAL).apply { kind = RuleSetKey.OVERRIDE }
globalRules.override?.forEach { rule ->
PushRulesMapper.map(rule).also {
override.pushRules.add(it)
}
}
realm.insertOrUpdate(override)
val rooms = PushRulesEntity(RuleScope.GLOBAL).apply { kind = RuleSetKey.ROOM }
globalRules.room?.forEach { rule ->
rooms.pushRules.add(PushRulesMapper.map(rule))
}
realm.insertOrUpdate(rooms)
val senders = PushRulesEntity(RuleScope.GLOBAL).apply { kind = RuleSetKey.SENDER }
globalRules.sender?.forEach { rule ->
senders.pushRules.add(PushRulesMapper.map(rule))
}
realm.insertOrUpdate(senders)
val underrides = PushRulesEntity(RuleScope.GLOBAL).apply { kind = RuleSetKey.UNDERRIDE }
globalRules.underride?.forEach { rule ->
underrides.pushRules.add(PushRulesMapper.map(rule))
}
realm.insertOrUpdate(underrides)
}
private fun handleDirectChatRooms(realm: Realm, directMessages: UserAccountDataDirectMessages) {
val oldDirectRooms = RoomSummaryEntity.getDirectRooms(realm)
oldDirectRooms.forEach {
it.isDirect = false
it.directUserId = null
}
directMessages.content.forEach {
val userId = it.key
it.value.forEach { roomId ->
val roomSummaryEntity = RoomSummaryEntity.where(realm, roomId).findFirst()
if (roomSummaryEntity != null) {
roomSummaryEntity.isDirect = true
roomSummaryEntity.directUserId = userId
realm.insertOrUpdate(roomSummaryEntity)
}
}
}
}
private fun handleIgnoredUsers(realm: Realm, userAccountDataIgnoredUsers: UserAccountDataIgnoredUsers) {
val userIds = userAccountDataIgnoredUsers.content.ignoredUsers.keys
realm.where(IgnoredUserEntity::class.java)
.findAll()
.deleteAllFromRealm()
// And save the new received list
userIds.forEach { realm.createObject(IgnoredUserEntity::class.java).apply { userId = it } }
// TODO If not initial sync, we should execute a init sync // TODO If not initial sync, we should execute a init sync
} }
private fun handleBreadcrumbs(userAccountDataBreadcrumbs: UserAccountDataBreadcrumbs) { private fun handleBreadcrumbs(realm: Realm, userAccountDataBreadcrumbs: UserAccountDataBreadcrumbs) {
saveBreadcrumbsTask val recentRoomIds = userAccountDataBreadcrumbs.content.recentRoomIds
.configureWith(SaveBreadcrumbsTask.Params(userAccountDataBreadcrumbs.content.recentRoomIds)) val entity = BreadcrumbsEntity.getOrCreate(realm)
.executeBy(taskExecutor)
// And save the new received list
entity.recentRoomIds = RealmList<String>().apply { addAll(recentRoomIds) }
// Update the room summaries
// Reset all the indexes...
RoomSummaryEntity.where(realm)
.greaterThan(RoomSummaryEntityFields.BREADCRUMBS_INDEX, RoomSummaryEntity.NOT_IN_BREADCRUMBS)
.findAll()
.forEach {
it.breadcrumbsIndex = RoomSummaryEntity.NOT_IN_BREADCRUMBS
}
// ...and apply new indexes
recentRoomIds.forEachIndexed { index, roomId ->
RoomSummaryEntity.where(realm, roomId)
.findFirst()
?.breadcrumbsIndex = index
}
} }
} }

View file

@ -19,21 +19,15 @@ package im.vector.matrix.android.internal.session.sync.job
import androidx.lifecycle.LiveData import androidx.lifecycle.LiveData
import androidx.lifecycle.MutableLiveData import androidx.lifecycle.MutableLiveData
import com.squareup.moshi.JsonEncodingException import com.squareup.moshi.JsonEncodingException
import im.vector.matrix.android.api.MatrixCallback
import im.vector.matrix.android.api.failure.Failure import im.vector.matrix.android.api.failure.Failure
import im.vector.matrix.android.api.failure.MatrixError import im.vector.matrix.android.api.failure.MatrixError
import im.vector.matrix.android.api.session.sync.SyncState import im.vector.matrix.android.api.session.sync.SyncState
import im.vector.matrix.android.api.util.Cancelable
import im.vector.matrix.android.internal.network.NetworkConnectivityChecker import im.vector.matrix.android.internal.network.NetworkConnectivityChecker
import im.vector.matrix.android.internal.session.sync.SyncTask import im.vector.matrix.android.internal.session.sync.SyncTask
import im.vector.matrix.android.internal.task.TaskExecutor
import im.vector.matrix.android.internal.task.TaskThread
import im.vector.matrix.android.internal.task.configureWith
import im.vector.matrix.android.internal.util.BackgroundDetectionObserver import im.vector.matrix.android.internal.util.BackgroundDetectionObserver
import kotlinx.coroutines.* import kotlinx.coroutines.*
import timber.log.Timber import timber.log.Timber
import java.net.SocketTimeoutException import java.net.SocketTimeoutException
import java.util.concurrent.CountDownLatch
import javax.inject.Inject import javax.inject.Inject
private const val RETRY_WAIT_TIME_MS = 10_000L private const val RETRY_WAIT_TIME_MS = 10_000L

View file

@ -84,6 +84,4 @@ internal open class ChannelCoroutineSequencer<T> : CoroutineSequencer<T> {
throw cancellation throw cancellation
} }
} }
} }

View file

@ -99,7 +99,6 @@ class CoroutineSequencersTest {
sequencer.post { suspendingMethod("#1") }.also { sequencer.post { suspendingMethod("#1") }.also {
results.add(it) results.add(it)
} }
}, },
GlobalScope.launch(dispatcher) { GlobalScope.launch(dispatcher) {
val result = sequencer.post { suspendingMethod("#2") }.also { val result = sequencer.post { suspendingMethod("#2") }.also {
@ -127,5 +126,4 @@ class CoroutineSequencersTest {
println("BLOCKING METHOD $name ENDS on ${Thread.currentThread().name}") println("BLOCKING METHOD $name ENDS on ${Thread.currentThread().name}")
return name return name
} }
} }

View file

@ -17,13 +17,11 @@
package im.vector.riotx.core.extensions package im.vector.riotx.core.extensions
import android.content.Context import android.content.Context
import android.content.Intent
import androidx.core.content.ContextCompat import androidx.core.content.ContextCompat
import androidx.lifecycle.Lifecycle import androidx.lifecycle.Lifecycle
import androidx.lifecycle.ProcessLifecycleOwner import androidx.lifecycle.ProcessLifecycleOwner
import im.vector.matrix.android.api.session.Session import im.vector.matrix.android.api.session.Session
import im.vector.matrix.android.api.session.sync.FilterService import im.vector.matrix.android.api.session.sync.FilterService
import im.vector.matrix.android.internal.session.sync.job.SyncService
import im.vector.riotx.core.services.VectorSyncService import im.vector.riotx.core.services.VectorSyncService
import im.vector.riotx.features.notifications.PushRuleTriggerListener import im.vector.riotx.features.notifications.PushRuleTriggerListener
import im.vector.riotx.features.session.SessionListener import im.vector.riotx.features.session.SessionListener

View file

@ -26,7 +26,6 @@ import im.vector.riotx.R
import im.vector.riotx.core.di.ActiveSessionHolder import im.vector.riotx.core.di.ActiveSessionHolder
import im.vector.riotx.core.di.ScreenComponent import im.vector.riotx.core.di.ScreenComponent
import im.vector.riotx.core.error.ErrorFormatter import im.vector.riotx.core.error.ErrorFormatter
import im.vector.riotx.core.extensions.configureAndStart
import im.vector.riotx.core.extensions.startSyncing import im.vector.riotx.core.extensions.startSyncing
import im.vector.riotx.core.platform.VectorBaseActivity import im.vector.riotx.core.platform.VectorBaseActivity
import im.vector.riotx.core.utils.deleteAllFiles import im.vector.riotx.core.utils.deleteAllFiles