mirror of
synced 2025-03-18 20:29:10 +03:00
Merge branch 'develop' into feature/bca/workaround_stuck_message
This commit is contained in:
29 changed files with 542 additions and 341 deletions
@ -5,10 +5,12 @@ Features ✨:
Improvements 🙌:
- Lazy storage of ReadReceipts
- Do not load room members in e2e after init sync
Bugfix 🐛:
- Add option to cancel stuck messages at bottom of timeline see #516
- Ensure message are decrypted in the room list after a clear cache
Translations 🗣:
@ -47,6 +49,7 @@ Bugfix 🐛:
- Be robust if Event.type is missing (#2946)
- Snappier message send status
- Fix MainActivity display (#2927)
- Cross signing now works with servers with an explicit port in the servername
Translations 🗣:
- All string resources and translations have been moved to the application module. Weblate project for the SDK will be removed.
@ -38,7 +38,7 @@ internal class CryptoSessionInfoProvider @Inject constructor(
val encryptionEvent = monarchy.fetchCopied { realm ->
EventEntity.whereType(realm, roomId = roomId, type = EventType.STATE_ROOM_ENCRYPTION)
.contains(EventEntityFields.CONTENT, "\"algorithm\":\"$MXCRYPTO_ALGORITHM_MEGOLM\"")
.isNotNull(EventEntityFields.STATE_KEY) // should be an empty key
return encryptionEvent != null
@ -856,15 +856,8 @@ internal class DefaultCryptoService @Inject constructor(
cryptoCoroutineScope.launch(coroutineDispatchers.crypto) {
val params = LoadRoomMembersTask.Params(roomId)
try {
} catch (throwable: Throwable) {
Timber.e(throwable, "## CRYPTO | onRoomEncryptionEvent ERROR FAILED TO SETUP CRYPTO ")
} finally {
val userIds = getRoomUserIds(roomId)
setEncryptionInRoom(roomId, event.content?.get("algorithm")?.toString(), true, userIds)
val userIds = getRoomUserIds(roomId)
setEncryptionInRoom(roomId, event.content?.get("algorithm")?.toString(), true, userIds)
@ -16,6 +16,7 @@
package org.matrix.android.sdk.internal.crypto
import kotlinx.coroutines.launch
import org.matrix.android.sdk.api.MatrixPatterns
import org.matrix.android.sdk.api.auth.data.Credentials
import org.matrix.android.sdk.internal.crypto.crosssigning.DeviceTrustLevel
@ -28,7 +29,7 @@ import org.matrix.android.sdk.internal.session.SessionScope
import org.matrix.android.sdk.internal.session.sync.SyncTokenStore
import org.matrix.android.sdk.internal.task.TaskExecutor
import org.matrix.android.sdk.internal.util.MatrixCoroutineDispatchers
import kotlinx.coroutines.launch
import org.matrix.android.sdk.internal.util.logLimit
import timber.log.Timber
import javax.inject.Inject
@ -39,8 +40,9 @@ internal class DeviceListManager @Inject constructor(private val cryptoStore: IM
private val syncTokenStore: SyncTokenStore,
private val credentials: Credentials,
private val downloadKeysForUsersTask: DownloadKeysForUsersTask,
private val cryptoSessionInfoProvider: CryptoSessionInfoProvider,
coroutineDispatchers: MatrixCoroutineDispatchers,
taskExecutor: TaskExecutor) {
private val taskExecutor: TaskExecutor) {
interface UserDevicesUpdateListener {
fun onUsersDeviceUpdate(userIds: List<String>)
@ -75,8 +77,10 @@ internal class DeviceListManager @Inject constructor(private val cryptoStore: IM
// HS not ready for retry
private val notReadyToRetryHS = mutableSetOf<String>()
private val cryptoCoroutineContext = coroutineDispatchers.crypto
init {
taskExecutor.executorScope.launch(coroutineDispatchers.crypto) {
taskExecutor.executorScope.launch(cryptoCoroutineContext) {
var isUpdated = false
val deviceTrackingStatuses = cryptoStore.getDeviceTrackingStatuses().toMutableMap()
for ((userId, status) in deviceTrackingStatuses) {
@ -104,7 +108,7 @@ internal class DeviceListManager @Inject constructor(private val cryptoStore: IM
if (':' in userId) {
try {
synchronized(notReadyToRetryHS) {
res = !notReadyToRetryHS.contains(userId.substringAfterLast(':'))
res = !notReadyToRetryHS.contains(userId.substringAfter(':'))
} catch (e: Exception) {
Timber.e(e, "## CRYPTO | canRetryKeysDownload() failed")
@ -123,28 +127,37 @@ internal class DeviceListManager @Inject constructor(private val cryptoStore: IM
fun onRoomMembersLoadedFor(roomId: String) {
taskExecutor.executorScope.launch(cryptoCoroutineContext) {
if (cryptoSessionInfoProvider.isRoomEncrypted(roomId)) {
// It's OK to track also device for invited users
val userIds = cryptoSessionInfoProvider.getRoomUserIds(roomId, true)
* Mark the cached device list for the given user outdated
* flag the given user for device-list tracking, if they are not already.
* @param userIds the user ids list
fun startTrackingDeviceList(userIds: List<String>?) {
if (null != userIds) {
var isUpdated = false
val deviceTrackingStatuses = cryptoStore.getDeviceTrackingStatuses().toMutableMap()
fun startTrackingDeviceList(userIds: List<String>) {
var isUpdated = false
val deviceTrackingStatuses = cryptoStore.getDeviceTrackingStatuses().toMutableMap()
for (userId in userIds) {
if (!deviceTrackingStatuses.containsKey(userId) || TRACKING_STATUS_NOT_TRACKED == deviceTrackingStatuses[userId]) {
Timber.v("## CRYPTO | startTrackingDeviceList() : Now tracking device list for $userId")
deviceTrackingStatuses[userId] = TRACKING_STATUS_PENDING_DOWNLOAD
isUpdated = true
for (userId in userIds) {
if (!deviceTrackingStatuses.containsKey(userId) || TRACKING_STATUS_NOT_TRACKED == deviceTrackingStatuses[userId]) {
Timber.v("## CRYPTO | startTrackingDeviceList() : Now tracking device list for $userId")
deviceTrackingStatuses[userId] = TRACKING_STATUS_PENDING_DOWNLOAD
isUpdated = true
if (isUpdated) {
if (isUpdated) {
@ -307,7 +320,7 @@ internal class DeviceListManager @Inject constructor(private val cryptoStore: IM
* @param downloadUsers the user ids list
private suspend fun doKeyDownloadForUsers(downloadUsers: List<String>): MXUsersDevicesMap<CryptoDeviceInfo> {
Timber.v("## CRYPTO | doKeyDownloadForUsers() : doKeyDownloadForUsers $downloadUsers")
Timber.v("## CRYPTO | doKeyDownloadForUsers() : doKeyDownloadForUsers ${downloadUsers.logLimit()}")
// get the user ids which did not already trigger a keys download
val filteredUsers = downloadUsers.filter { MatrixPatterns.isUserId(it) }
if (filteredUsers.isEmpty()) {
@ -312,7 +312,7 @@ internal class MXOlmDevice @Inject constructor(
* @param theirDeviceIdentityKey the Curve25519 identity key for the remote device.
* @return a list of known session ids for the device.
fun getSessionIds(theirDeviceIdentityKey: String): Set<String>? {
fun getSessionIds(theirDeviceIdentityKey: String): List<String>? {
return store.getDeviceSessionIds(theirDeviceIdentityKey)
@ -154,7 +154,7 @@ internal class MXOlmDecryption(
* @return payload, if decrypted successfully.
private fun decryptMessage(message: JsonDict, theirDeviceIdentityKey: String): String? {
val sessionIds = olmDevice.getSessionIds(theirDeviceIdentityKey) ?: emptySet()
val sessionIds = olmDevice.getSessionIds(theirDeviceIdentityKey).orEmpty()
val messageBody = message["body"] as? String ?: return null
val messageType = when (val typeAsVoid = message["type"]) {
@ -33,15 +33,18 @@ import org.matrix.android.sdk.internal.crypto.store.db.model.CryptoMapper
import org.matrix.android.sdk.internal.crypto.store.db.model.TrustLevelEntity
import org.matrix.android.sdk.internal.crypto.store.db.model.UserEntity
import org.matrix.android.sdk.internal.crypto.store.db.model.UserEntityFields
import org.matrix.android.sdk.internal.database.awaitTransaction
import org.matrix.android.sdk.internal.database.model.RoomMemberSummaryEntity
import org.matrix.android.sdk.internal.database.model.RoomMemberSummaryEntityFields
import org.matrix.android.sdk.internal.database.model.RoomSummaryEntity
import org.matrix.android.sdk.internal.database.model.RoomSummaryEntityFields
import org.matrix.android.sdk.internal.database.query.where
import org.matrix.android.sdk.internal.di.CryptoDatabase
import org.matrix.android.sdk.internal.di.SessionDatabase
import org.matrix.android.sdk.internal.di.UserId
import org.matrix.android.sdk.internal.session.SessionComponent
import org.matrix.android.sdk.internal.session.room.membership.RoomMemberHelper
import org.matrix.android.sdk.internal.util.logLimit
import org.matrix.android.sdk.internal.worker.SessionSafeCoroutineWorker
import org.matrix.android.sdk.internal.worker.SessionWorkerParams
import timber.log.Timber
@ -65,11 +68,16 @@ internal class UpdateTrustWorker(context: Context,
@Inject lateinit var crossSigningService: DefaultCrossSigningService
// It breaks the crypto store contract, but we need to batch things :/
@CryptoDatabase @Inject lateinit var realmConfiguration: RealmConfiguration
@UserId @Inject lateinit var myUserId: String
@Inject lateinit var cryptoRealmConfiguration: RealmConfiguration
@Inject lateinit var sessionRealmConfiguration: RealmConfiguration
@Inject lateinit var myUserId: String
@Inject lateinit var crossSigningKeysMapper: CrossSigningKeysMapper
@Inject lateinit var updateTrustWorkerDataRepository: UpdateTrustWorkerDataRepository
@SessionDatabase @Inject lateinit var sessionRealmConfiguration: RealmConfiguration
// @Inject lateinit var roomSummaryUpdater: RoomSummaryUpdater
@Inject lateinit var cryptoStore: IMXCryptoStore
@ -79,118 +87,114 @@ internal class UpdateTrustWorker(context: Context,
override suspend fun doSafeWork(params: Params): Result {
var userList = params.filename
val userList = params.filename
?.let { updateTrustWorkerDataRepository.getParam(it) }
?: params.updatedUserIds.orEmpty()
if (userList.isEmpty()) {
// This should not happen, but let's avoid go further in case of empty list
return Result.success()
// List should not be empty, but let's avoid go further in case of empty list
if (userList.isNotEmpty()) {
// Unfortunately we don't have much info on what did exactly changed (is it the cross signing keys of that user,
// or a new device?) So we check all again :/
Timber.d("## CrossSigning - Updating trust for users: ${userList.logLimit()}")
Realm.getInstance(cryptoRealmConfiguration).use { cryptoRealm ->
Realm.getInstance(sessionRealmConfiguration).use {
updateTrust(userList, cryptoRealm)
// Unfortunately we don't have much info on what did exactly changed (is it the cross signing keys of that user,
// or a new device?) So we check all again :/
Timber.d("## CrossSigning - Updating trust for $userList")
return Result.success()
private suspend fun updateTrust(userListParam: List<String>,
cRealm: Realm) {
var userList = userListParam
var myCrossSigningInfo: MXCrossSigningInfo? = null
// First we check that the users MSK are trusted by mine
// After that we check the trust chain for each devices of each users
Realm.getInstance(realmConfiguration).use { realm ->
realm.executeTransaction {
// By mapping here to model, this object is not live
// I should update it if needed
var myCrossSigningInfo = realm.where(CrossSigningInfoEntity::class.java)
.equalTo(CrossSigningInfoEntityFields.USER_ID, myUserId)
.findFirst()?.let { mapCrossSigningInfoEntity(it) }
awaitTransaction(cryptoRealmConfiguration) { cryptoRealm ->
// By mapping here to model, this object is not live
// I should update it if needed
myCrossSigningInfo = getCrossSigningInfo(cryptoRealm, myUserId)
var myTrustResult: UserTrustResult? = null
var myTrustResult: UserTrustResult? = null
if (userList.contains(myUserId)) {
Timber.d("## CrossSigning - Clear all trust as a change on my user was detected")
// i am in the list.. but i don't know exactly the delta of change :/
// If it's my cross signing keys we should refresh all trust
// do it anyway ?
userList = realm.where(CrossSigningInfoEntity::class.java)
.findAll().mapNotNull { it.userId }
Timber.d("## CrossSigning - Updating trust for all $userList")
if (userList.contains(myUserId)) {
Timber.d("## CrossSigning - Clear all trust as a change on my user was detected")
// i am in the list.. but i don't know exactly the delta of change :/
// If it's my cross signing keys we should refresh all trust
// do it anyway ?
userList = cryptoRealm.where(CrossSigningInfoEntity::class.java)
.mapNotNull { it.userId }
// check right now my keys and mark it as trusted as other trust depends on it
val myDevices = realm.where<UserEntity>()
.equalTo(UserEntityFields.USER_ID, myUserId)
?.map { deviceInfo ->
myTrustResult = crossSigningService.checkSelfTrust(myCrossSigningInfo, myDevices).also {
updateCrossSigningKeysTrust(realm, myUserId, it.isVerified())
// update model reference
myCrossSigningInfo = realm.where(CrossSigningInfoEntity::class.java)
.equalTo(CrossSigningInfoEntityFields.USER_ID, myUserId)
.findFirst()?.let { mapCrossSigningInfoEntity(it) }
// check right now my keys and mark it as trusted as other trust depends on it
val myDevices = cryptoRealm.where<UserEntity>()
.equalTo(UserEntityFields.USER_ID, myUserId)
?.map { CryptoMapper.mapToModel(it) }
val otherInfos = userList.map {
it to realm.where(CrossSigningInfoEntity::class.java)
.equalTo(CrossSigningInfoEntityFields.USER_ID, it)
.findFirst()?.let { mapCrossSigningInfoEntity(it) }
myTrustResult = crossSigningService.checkSelfTrust(myCrossSigningInfo, myDevices)
updateCrossSigningKeysTrust(cryptoRealm, myUserId, myTrustResult.isVerified())
// update model reference
myCrossSigningInfo = getCrossSigningInfo(cryptoRealm, myUserId)
val trusts = otherInfos.map { infoEntry ->
infoEntry.key to when (infoEntry.key) {
myUserId -> myTrustResult
else -> {
crossSigningService.checkOtherMSKTrusted(myCrossSigningInfo, infoEntry.value).also {
Timber.d("## CrossSigning - user:${infoEntry.key} result:$it")
val otherInfos = userList.associateWith { userId ->
getCrossSigningInfo(cryptoRealm, userId)
val trusts = otherInfos.mapValues { entry ->
when (entry.key) {
myUserId -> myTrustResult
else -> {
crossSigningService.checkOtherMSKTrusted(myCrossSigningInfo, entry.value).also {
Timber.d("## CrossSigning - user:${entry.key} result:$it")
// TODO! if it's me and my keys has changed... I have to reset trust for everyone!
// i have all the new trusts, update DB
trusts.forEach {
val verified = it.value?.isVerified() == true
updateCrossSigningKeysTrust(realm, it.key, verified)
// TODO! if it's me and my keys has changed... I have to reset trust for everyone!
// i have all the new trusts, update DB
trusts.forEach {
val verified = it.value?.isVerified() == true
updateCrossSigningKeysTrust(cryptoRealm, it.key, verified)
// Ok so now we have to check device trust for all these users..
Timber.v("## CrossSigning - Updating devices cross trust users: ${trusts.keys.logLimit()}")
trusts.keys.forEach { userId ->
val devicesEntities = cryptoRealm.where<UserEntity>()
.equalTo(UserEntityFields.USER_ID, userId)
val trustMap = devicesEntities?.associateWith { device ->
// get up to date from DB has could have been updated
val otherInfo = getCrossSigningInfo(cryptoRealm, userId)
crossSigningService.checkDeviceTrust(myCrossSigningInfo, otherInfo, CryptoMapper.mapToModel(device))
// Ok so now we have to check device trust for all these users..
Timber.v("## CrossSigning - Updating devices cross trust users ${trusts.keys}")
trusts.keys.forEach {
val devicesEntities = realm.where<UserEntity>()
.equalTo(UserEntityFields.USER_ID, it)
val trustMap = devicesEntities?.map { device ->
// get up to date from DB has could have been updated
val otherInfo = realm.where(CrossSigningInfoEntity::class.java)
.equalTo(CrossSigningInfoEntityFields.USER_ID, it)
.findFirst()?.let { mapCrossSigningInfoEntity(it) }
device to crossSigningService.checkDeviceTrust(myCrossSigningInfo, otherInfo, CryptoMapper.mapToModel(device))
// Update trust if needed
devicesEntities?.forEach { device ->
val crossSignedVerified = trustMap?.get(device)?.isCrossSignedVerified()
Timber.d("## CrossSigning - Trust for ${device.userId}|${device.deviceId} : cross verified: ${trustMap?.get(device)}")
if (device.trustLevelEntity?.crossSignedVerified != crossSignedVerified) {
Timber.d("## CrossSigning - Trust change detected for ${device.userId}|${device.deviceId} : cross verified: $crossSignedVerified")
// need to save
val trustEntity = device.trustLevelEntity
if (trustEntity == null) {
realm.createObject(TrustLevelEntity::class.java).let {
it.locallyVerified = false
it.crossSignedVerified = crossSignedVerified
device.trustLevelEntity = it
} else {
trustEntity.crossSignedVerified = crossSignedVerified
// Update trust if needed
devicesEntities?.forEach { device ->
val crossSignedVerified = trustMap?.get(device)?.isCrossSignedVerified()
Timber.d("## CrossSigning - Trust for ${device.userId}|${device.deviceId} : cross verified: ${trustMap?.get(device)}")
if (device.trustLevelEntity?.crossSignedVerified != crossSignedVerified) {
Timber.d("## CrossSigning - Trust change detected for ${device.userId}|${device.deviceId} : cross verified: $crossSignedVerified")
// need to save
val trustEntity = device.trustLevelEntity
if (trustEntity == null) {
device.trustLevelEntity = cryptoRealm.createObject(TrustLevelEntity::class.java).also {
it.locallyVerified = false
it.crossSignedVerified = crossSignedVerified
} else {
trustEntity.crossSignedVerified = crossSignedVerified
@ -201,35 +205,44 @@ internal class UpdateTrustWorker(context: Context,
// We can now update room shields? in the session DB?
Timber.d("## CrossSigning - Updating shields for impacted rooms...")
Realm.getInstance(sessionRealmConfiguration).use { it ->
it.executeTransaction { realm ->
val distinctRoomIds = realm.where(RoomMemberSummaryEntity::class.java)
.`in`(RoomMemberSummaryEntityFields.USER_ID, userList.toTypedArray())
.map { it.roomId }
Timber.d("## CrossSigning - ... impacted rooms $distinctRoomIds")
distinctRoomIds.forEach { roomId ->
val roomSummary = RoomSummaryEntity.where(realm, roomId).findFirst()
if (roomSummary?.isEncrypted == true) {
Timber.d("## CrossSigning - Check shield state for room $roomId")
val allActiveRoomMembers = RoomMemberHelper(realm, roomId).getActiveRoomMemberIds()
try {
val updatedTrust = computeRoomShield(allActiveRoomMembers, roomSummary)
if (roomSummary.roomEncryptionTrustLevel != updatedTrust) {
Timber.d("## CrossSigning - Shield change detected for $roomId -> $updatedTrust")
roomSummary.roomEncryptionTrustLevel = updatedTrust
} catch (failure: Throwable) {
awaitTransaction(sessionRealmConfiguration) { sessionRealm ->
.`in`(RoomMemberSummaryEntityFields.USER_ID, userList.toTypedArray())
.map { it.roomId }
.also { Timber.d("## CrossSigning - ... impacted rooms ${it.logLimit()}") }
.forEach { roomId ->
RoomSummaryEntity.where(sessionRealm, roomId)
.equalTo(RoomSummaryEntityFields.IS_ENCRYPTED, true)
?.let { roomSummary ->
Timber.d("## CrossSigning - Check shield state for room $roomId")
val allActiveRoomMembers = RoomMemberHelper(sessionRealm, roomId).getActiveRoomMemberIds()
try {
val updatedTrust = computeRoomShield(
if (roomSummary.roomEncryptionTrustLevel != updatedTrust) {
Timber.d("## CrossSigning - Shield change detected for $roomId -> $updatedTrust")
roomSummary.roomEncryptionTrustLevel = updatedTrust
} catch (failure: Throwable) {
return Result.success()
private fun getCrossSigningInfo(cryptoRealm: Realm, userId: String): MXCrossSigningInfo? {
return cryptoRealm.where(CrossSigningInfoEntity::class.java)
.equalTo(CrossSigningInfoEntityFields.USER_ID, userId)
?.let { mapCrossSigningInfoEntity(it) }
private fun cleanup(params: Params) {
@ -237,30 +250,34 @@ internal class UpdateTrustWorker(context: Context,
?.let { updateTrustWorkerDataRepository.delete(it) }
private fun updateCrossSigningKeysTrust(realm: Realm, userId: String, verified: Boolean) {
val xInfoEntity = realm.where(CrossSigningInfoEntity::class.java)
private fun updateCrossSigningKeysTrust(cryptoRealm: Realm, userId: String, verified: Boolean) {
.equalTo(CrossSigningInfoEntityFields.USER_ID, userId)
xInfoEntity?.crossSigningKeys?.forEach { info ->
// optimization to avoid trigger updates when there is no change..
if (info.trustLevelEntity?.isVerified() != verified) {
Timber.d("## CrossSigning - Trust change for $userId : $verified")
val level = info.trustLevelEntity
if (level == null) {
val newLevel = realm.createObject(TrustLevelEntity::class.java)
newLevel.locallyVerified = verified
newLevel.crossSignedVerified = verified
info.trustLevelEntity = newLevel
} else {
level.locallyVerified = verified
level.crossSignedVerified = verified
?.forEach { info ->
// optimization to avoid trigger updates when there is no change..
if (info.trustLevelEntity?.isVerified() != verified) {
Timber.d("## CrossSigning - Trust change for $userId : $verified")
val level = info.trustLevelEntity
if (level == null) {
info.trustLevelEntity = cryptoRealm.createObject(TrustLevelEntity::class.java).also {
it.locallyVerified = verified
it.crossSignedVerified = verified
} else {
level.locallyVerified = verified
level.crossSignedVerified = verified
private fun computeRoomShield(activeMemberUserIds: List<String>, roomSummaryEntity: RoomSummaryEntity): RoomEncryptionTrustLevel {
Timber.d("## CrossSigning - computeRoomShield ${roomSummaryEntity.roomId} -> $activeMemberUserIds")
private fun computeRoomShield(myCrossSigningInfo: MXCrossSigningInfo?,
cryptoRealm: Realm,
activeMemberUserIds: List<String>,
roomSummaryEntity: RoomSummaryEntity): RoomEncryptionTrustLevel {
Timber.d("## CrossSigning - computeRoomShield ${roomSummaryEntity.roomId} -> ${activeMemberUserIds.logLimit()}")
// The set of “all users” depends on the type of room:
// For regular / topic rooms which have more than 2 members (including yourself) are considered when decorating a room
// For 1:1 and group DM rooms, all other users (i.e. excluding yourself) are considered when decorating a room
@ -272,17 +289,8 @@ internal class UpdateTrustWorker(context: Context,
val allTrustedUserIds = listToCheck
.filter { userId ->
Realm.getInstance(realmConfiguration).use {
.equalTo(CrossSigningInfoEntityFields.USER_ID, userId)
.findFirst()?.let { mapCrossSigningInfoEntity(it) }?.isTrusted() == true
getCrossSigningInfo(cryptoRealm, userId)?.isTrusted() == true
val myCrossKeys = Realm.getInstance(realmConfiguration).use {
.equalTo(CrossSigningInfoEntityFields.USER_ID, myUserId)
.findFirst()?.let { mapCrossSigningInfoEntity(it) }
return if (allTrustedUserIds.isEmpty()) {
@ -291,21 +299,17 @@ internal class UpdateTrustWorker(context: Context,
// If all devices of all verified users are trusted -> green
// else -> black
.mapNotNull { uid ->
Realm.getInstance(realmConfiguration).use {
.equalTo(UserEntityFields.USER_ID, uid)
?.map {
.mapNotNull { userId ->
.equalTo(UserEntityFields.USER_ID, userId)
?.map { CryptoMapper.mapToModel(it) }
.let { allDevices ->
Timber.v("## CrossSigning - computeRoomShield ${roomSummaryEntity.roomId} devices ${allDevices.map { it.deviceId }}")
if (myCrossKeys != null) {
Timber.v("## CrossSigning - computeRoomShield ${roomSummaryEntity.roomId} devices ${allDevices.map { it.deviceId }.logLimit()}")
if (myCrossSigningInfo != null) {
allDevices.any { !it.trustLevel?.crossSigningVerified.orFalse() }
} else {
// Legacy method
@ -259,7 +259,7 @@ internal interface IMXCryptoStore {
* @param deviceKey the public key of the other device.
* @return A set of sessionId, or null if device is not known
fun getDeviceSessionIds(deviceKey: String): Set<String>?
fun getDeviceSessionIds(deviceKey: String): List<String>?
* Retrieve an end-to-end session between the logged-in user and another
@ -692,7 +692,7 @@ internal class RealmCryptoStore @Inject constructor(
override fun getDeviceSessionIds(deviceKey: String): MutableSet<String> {
override fun getDeviceSessionIds(deviceKey: String): List<String> {
return doWithRealm(realmConfiguration) {
.equalTo(OlmSessionEntityFields.DEVICE_KEY, deviceKey)
@ -701,7 +701,6 @@ internal class RealmCryptoStore @Inject constructor(
override fun storeInboundGroupSessions(sessions: List<OlmInboundGroupSessionWrapper2>) {
@ -801,7 +800,7 @@ internal class RealmCryptoStore @Inject constructor(
* Note: the result will be only use to export all the keys and not to use the OlmInboundGroupSessionWrapper2,
* so there is no need to use or update `inboundGroupSessionToRelease` for native memory management
override fun getInboundGroupSessions(): MutableList<OlmInboundGroupSessionWrapper2> {
override fun getInboundGroupSessions(): List<OlmInboundGroupSessionWrapper2> {
return doWithRealm(realmConfiguration) {
@ -809,7 +808,6 @@ internal class RealmCryptoStore @Inject constructor(
override fun removeInboundGroupSession(sessionId: String, senderKey: String) {
@ -964,7 +962,7 @@ internal class RealmCryptoStore @Inject constructor(
override fun getRoomsListBlacklistUnverifiedDevices(): MutableList<String> {
override fun getRoomsListBlacklistUnverifiedDevices(): List<String> {
return doWithRealm(realmConfiguration) {
.equalTo(CryptoRoomEntityFields.BLACKLIST_UNVERIFIED_DEVICES, true)
@ -973,10 +971,9 @@ internal class RealmCryptoStore @Inject constructor(
override fun getDeviceTrackingStatuses(): MutableMap<String, Int> {
override fun getDeviceTrackingStatuses(): Map<String, Int> {
return doWithRealm(realmConfiguration) {
@ -987,7 +984,6 @@ internal class RealmCryptoStore @Inject constructor(
override fun saveDeviceTrackingStatuses(deviceTrackingStatuses: Map<String, Int>) {
@ -22,6 +22,8 @@ import io.realm.kotlin.createObject
import kotlinx.coroutines.TimeoutCancellationException
import org.matrix.android.sdk.api.session.room.model.Membership
import org.matrix.android.sdk.api.session.room.send.SendState
import org.matrix.android.sdk.internal.crypto.CryptoSessionInfoProvider
import org.matrix.android.sdk.internal.crypto.DeviceListManager
import org.matrix.android.sdk.internal.database.awaitNotEmptyResult
import org.matrix.android.sdk.internal.database.mapper.toEntity
import org.matrix.android.sdk.internal.database.model.CurrentStateEventEntity
@ -57,6 +59,8 @@ internal class DefaultLoadRoomMembersTask @Inject constructor(
private val syncTokenStore: SyncTokenStore,
private val roomSummaryUpdater: RoomSummaryUpdater,
private val roomMemberEventHandler: RoomMemberEventHandler,
private val cryptoSessionInfoProvider: CryptoSessionInfoProvider,
private val deviceListManager: DeviceListManager,
private val globalErrorReceiver: GlobalErrorReceiver
) : LoadRoomMembersTask {
@ -124,6 +128,10 @@ internal class DefaultLoadRoomMembersTask @Inject constructor(
roomEntity.membersLoadStatus = RoomMembersLoadStatusType.LOADED
roomSummaryUpdater.update(realm, roomId, updateMembers = true)
if (cryptoSessionInfoProvider.isRoomEncrypted(roomId)) {
private fun getRoomMembersLoadStatus(roomId: String): RoomMembersLoadStatusType {
@ -117,7 +117,7 @@ internal class DefaultSetReadMarkersTask @Inject constructor(
if (readReceiptId != null) {
val readReceiptContent = ReadReceiptHandler.createContent(userId, readReceiptId)
readReceiptHandler.handle(realm, roomId, readReceiptContent, false)
readReceiptHandler.handle(realm, roomId, readReceiptContent, false, null)
if (shouldUpdateRoomSummary) {
val roomSummary = RoomSummaryEntity.where(realm, roomId).findFirst()
@ -131,8 +131,8 @@ internal class RoomSummaryUpdater @Inject constructor(
// mmm i want to decrypt now or is it ok to do it async?
tryOrNull {
eventDecryptor.decryptEvent(root.asDomain(), "")
// eventDecryptor.decryptEventAsync(root.asDomain(), "", NoOpMatrixCallback())
?.let { root.setDecryptionResult(it) }
if (updateMembers) {
@ -144,7 +144,7 @@ internal class RoomSummaryUpdater @Inject constructor(
if (roomSummaryEntity.isEncrypted) {
if (roomSummaryEntity.isEncrypted && otherRoomMembers.isNotEmpty()) {
// mmm maybe we could only refresh shield instead of checking trust also?
@ -44,6 +44,7 @@ import org.matrix.android.sdk.internal.database.query.findAllInRoomWithSendState
import org.matrix.android.sdk.internal.database.query.where
import org.matrix.android.sdk.internal.database.query.whereRoomId
import org.matrix.android.sdk.internal.session.room.membership.LoadRoomMembersTask
import org.matrix.android.sdk.internal.session.sync.ReadReceiptHandler
import org.matrix.android.sdk.internal.task.TaskExecutor
import org.matrix.android.sdk.internal.task.configureWith
import org.matrix.android.sdk.internal.util.Debouncer
@ -73,7 +74,8 @@ internal class DefaultTimeline(
private val timelineInput: TimelineInput,
private val eventDecryptor: TimelineEventDecryptor,
private val realmSessionProvider: RealmSessionProvider,
private val loadRoomMembersTask: LoadRoomMembersTask
private val loadRoomMembersTask: LoadRoomMembersTask,
private val readReceiptHandler: ReadReceiptHandler
) : Timeline,
@ -182,11 +184,27 @@ internal class DefaultTimeline(
// Ensure ReadReceipt from init sync are loaded
private fun ensureReadReceiptAreLoaded(realm: Realm) {
?.also {
Timber.w("INIT_SYNC Insert when opening timeline RR for room $roomId")
?.let { readReceiptContent ->
realm.executeTransactionAsync {
readReceiptHandler.handle(it, roomId, readReceiptContent, false, null)
private fun TimelineSettings.shouldHandleHiddenReadReceipts(): Boolean {
return buildReadReceipts && (filters.filterEdits || filters.filterTypes)
@ -17,10 +17,10 @@
package org.matrix.android.sdk.internal.session.room.timeline
import androidx.lifecycle.LiveData
import dagger.assisted.Assisted
import dagger.assisted.AssistedInject
import dagger.assisted.AssistedFactory
import com.zhuinden.monarchy.Monarchy
import dagger.assisted.Assisted
import dagger.assisted.AssistedFactory
import dagger.assisted.AssistedInject
import io.realm.Sort
import io.realm.kotlin.where
import org.matrix.android.sdk.api.session.events.model.isImageMessage
@ -38,20 +38,23 @@ import org.matrix.android.sdk.internal.database.model.TimelineEventEntityFields
import org.matrix.android.sdk.internal.database.query.where
import org.matrix.android.sdk.internal.di.SessionDatabase
import org.matrix.android.sdk.internal.session.room.membership.LoadRoomMembersTask
import org.matrix.android.sdk.internal.session.sync.ReadReceiptHandler
import org.matrix.android.sdk.internal.task.TaskExecutor
internal class DefaultTimelineService @AssistedInject constructor(@Assisted private val roomId: String,
@SessionDatabase private val monarchy: Monarchy,
private val realmSessionProvider: RealmSessionProvider,
private val timelineInput: TimelineInput,
private val taskExecutor: TaskExecutor,
private val contextOfEventTask: GetContextOfEventTask,
private val eventDecryptor: TimelineEventDecryptor,
private val paginationTask: PaginationTask,
private val fetchTokenAndPaginateTask: FetchTokenAndPaginateTask,
private val timelineEventMapper: TimelineEventMapper,
private val readReceiptsSummaryMapper: ReadReceiptsSummaryMapper,
private val loadRoomMembersTask: LoadRoomMembersTask
internal class DefaultTimelineService @AssistedInject constructor(
@Assisted private val roomId: String,
@SessionDatabase private val monarchy: Monarchy,
private val realmSessionProvider: RealmSessionProvider,
private val timelineInput: TimelineInput,
private val taskExecutor: TaskExecutor,
private val contextOfEventTask: GetContextOfEventTask,
private val eventDecryptor: TimelineEventDecryptor,
private val paginationTask: PaginationTask,
private val fetchTokenAndPaginateTask: FetchTokenAndPaginateTask,
private val timelineEventMapper: TimelineEventMapper,
private val readReceiptsSummaryMapper: ReadReceiptsSummaryMapper,
private val loadRoomMembersTask: LoadRoomMembersTask,
private val readReceiptHandler: ReadReceiptHandler
) : TimelineService {
@ -74,7 +77,8 @@ internal class DefaultTimelineService @AssistedInject constructor(@Assisted priv
eventDecryptor = eventDecryptor,
fetchTokenAndPaginateTask = fetchTokenAndPaginateTask,
realmSessionProvider = realmSessionProvider,
loadRoomMembersTask = loadRoomMembersTask
loadRoomMembersTask = loadRoomMembersTask,
readReceiptHandler = readReceiptHandler
@ -42,9 +42,9 @@ sealed class InitialSyncStrategy {
val minSizeToSplit: Long = 1024 * 1024,
* Limit per room to reach to decide to store a join room ephemeral Events into a file
* Empiric value: 6 kilobytes
* Empiric value: 1 kilobytes
val minSizeToStoreInFile: Long = 6 * 1024,
val minSizeToStoreInFile: Long = 1024,
* Max number of rooms to insert at a time in database (to avoid too much RAM usage)
@ -16,12 +16,13 @@
package org.matrix.android.sdk.internal.session.sync
import io.realm.Realm
import org.matrix.android.sdk.api.session.events.model.EventType
import org.matrix.android.sdk.internal.database.model.ReadReceiptEntity
import org.matrix.android.sdk.internal.database.model.ReadReceiptsSummaryEntity
import org.matrix.android.sdk.internal.database.query.createUnmanaged
import org.matrix.android.sdk.internal.database.query.getOrCreate
import org.matrix.android.sdk.internal.database.query.where
import io.realm.Realm
import timber.log.Timber
import javax.inject.Inject
@ -35,7 +36,9 @@ typealias ReadReceiptContent = Map<String, Map<String, Map<String, Map<String, D
private const val READ_KEY = "m.read"
private const val TIMESTAMP_KEY = "ts"
internal class ReadReceiptHandler @Inject constructor() {
internal class ReadReceiptHandler @Inject constructor(
private val roomSyncEphemeralTemporaryStore: RoomSyncEphemeralTemporaryStore
) {
companion object {
@ -52,22 +55,29 @@ internal class ReadReceiptHandler @Inject constructor() {
fun handle(realm: Realm, roomId: String, content: ReadReceiptContent?, isInitialSync: Boolean) {
if (content == null) {
fun handle(realm: Realm,
roomId: String,
content: ReadReceiptContent?,
isInitialSync: Boolean,
aggregator: SyncResponsePostTreatmentAggregator?) {
content ?: return
try {
handleReadReceiptContent(realm, roomId, content, isInitialSync)
handleReadReceiptContent(realm, roomId, content, isInitialSync, aggregator)
} catch (exception: Exception) {
Timber.e("Fail to handle read receipt for room $roomId")
private fun handleReadReceiptContent(realm: Realm, roomId: String, content: ReadReceiptContent, isInitialSync: Boolean) {
private fun handleReadReceiptContent(realm: Realm,
roomId: String,
content: ReadReceiptContent,
isInitialSync: Boolean,
aggregator: SyncResponsePostTreatmentAggregator?) {
if (isInitialSync) {
initialSyncStrategy(realm, roomId, content)
} else {
incrementalSyncStrategy(realm, roomId, content)
incrementalSyncStrategy(realm, roomId, content, aggregator)
@ -87,7 +97,21 @@ internal class ReadReceiptHandler @Inject constructor() {
private fun incrementalSyncStrategy(realm: Realm, roomId: String, content: ReadReceiptContent) {
private fun incrementalSyncStrategy(realm: Realm,
roomId: String,
content: ReadReceiptContent,
aggregator: SyncResponsePostTreatmentAggregator?) {
// First check if we have data from init sync to handle
getContentFromInitSync(roomId)?.let {
Timber.w("INIT_SYNC Insert during incremental sync RR for room $roomId")
doIncrementalSyncStrategy(realm, roomId, it)
doIncrementalSyncStrategy(realm, roomId, content)
private fun doIncrementalSyncStrategy(realm: Realm, roomId: String, content: ReadReceiptContent) {
for ((eventId, receiptDict) in content) {
val userIdsDict = receiptDict[READ_KEY] ?: continue
val readReceiptsSummary = ReadReceiptsSummaryEntity.where(realm, eventId).findFirst()
@ -110,4 +134,27 @@ internal class ReadReceiptHandler @Inject constructor() {
fun getContentFromInitSync(roomId: String): ReadReceiptContent? {
val dataFromFile = roomSyncEphemeralTemporaryStore.read(roomId)
dataFromFile ?: return null
val content = dataFromFile
.firstOrNull { it.type == EventType.RECEIPT }
?.content as? ReadReceiptContent
if (content == null) {
// We can delete the file now
return content
fun onContentFromInitSyncHandled(roomId: String) {
@ -0,0 +1,79 @@
* Copyright (c) 2021 The Matrix.org Foundation C.I.C.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.matrix.android.sdk.internal.session.sync
import com.squareup.moshi.JsonReader
import com.squareup.moshi.Moshi
import okio.buffer
import okio.source
import org.matrix.android.sdk.internal.di.SessionFilesDirectory
import org.matrix.android.sdk.internal.session.sync.model.RoomSyncEphemeral
import org.matrix.android.sdk.internal.util.md5
import timber.log.Timber
import java.io.File
import javax.inject.Inject
internal interface RoomSyncEphemeralTemporaryStore {
fun write(roomId: String, roomSyncEphemeralJson: String)
fun read(roomId: String): RoomSyncEphemeral?
fun reset()
fun delete(roomId: String)
internal class RoomSyncEphemeralTemporaryStoreFile @Inject constructor(
@SessionFilesDirectory fileDirectory: File,
moshi: Moshi
) : RoomSyncEphemeralTemporaryStore {
private val workingDir = File(fileDirectory, "rr")
.also { it.mkdirs() }
private val roomSyncEphemeralAdapter = moshi.adapter(RoomSyncEphemeral::class.java)
* Write RoomSyncEphemeral to a file
override fun write(roomId: String, roomSyncEphemeralJson: String) {
Timber.w("INIT_SYNC Store ephemeral events for room $roomId")
* Read RoomSyncEphemeral from a file, or null if there is no file to read
override fun read(roomId: String): RoomSyncEphemeral? {
return getFile(roomId)
.takeIf { it.exists() }
?.use { pos ->
override fun delete(roomId: String) {
override fun reset() {
private fun getFile(roomId: String): File {
return File(workingDir, "${roomId.md5()}.json")
@ -60,6 +60,7 @@ import org.matrix.android.sdk.internal.session.room.timeline.PaginationDirection
import org.matrix.android.sdk.internal.session.room.timeline.TimelineInput
import org.matrix.android.sdk.internal.session.room.typing.TypingEventContent
import org.matrix.android.sdk.internal.session.sync.model.InvitedRoomSync
import org.matrix.android.sdk.internal.session.sync.model.LazyRoomSyncEphemeral
import org.matrix.android.sdk.internal.session.sync.model.RoomSync
import org.matrix.android.sdk.internal.session.sync.model.RoomSyncAccountData
import org.matrix.android.sdk.internal.session.sync.model.RoomsSyncResponse
@ -87,29 +88,21 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
fun handle(realm: Realm,
roomsSyncResponse: RoomsSyncResponse,
isInitialSync: Boolean,
aggregator: SyncResponsePostTreatmentAggregator,
reporter: ProgressReporter? = null) {
Timber.v("Execute transaction from $this")
handleRoomSync(realm, HandlingStrategy.JOINED(roomsSyncResponse.join), isInitialSync, reporter)
handleRoomSync(realm, HandlingStrategy.INVITED(roomsSyncResponse.invite), isInitialSync, reporter)
handleRoomSync(realm, HandlingStrategy.LEFT(roomsSyncResponse.leave), isInitialSync, reporter)
fun handleInitSyncEphemeral(realm: Realm,
roomsSyncResponse: RoomsSyncResponse) {
roomsSyncResponse.join.forEach { roomSync ->
val ephemeralResult = roomSync.value.ephemeral
?.takeIf { it.isNotEmpty() }
?.let { events -> handleEphemeral(realm, roomSync.key, events, true) }
roomTypingUsersHandler.handle(realm, roomSync.key, ephemeralResult)
handleRoomSync(realm, HandlingStrategy.JOINED(roomsSyncResponse.join), isInitialSync, aggregator, reporter)
handleRoomSync(realm, HandlingStrategy.INVITED(roomsSyncResponse.invite), isInitialSync, aggregator, reporter)
handleRoomSync(realm, HandlingStrategy.LEFT(roomsSyncResponse.leave), isInitialSync, aggregator, reporter)
// PRIVATE METHODS *****************************************************************************
private fun handleRoomSync(realm: Realm, handlingStrategy: HandlingStrategy, isInitialSync: Boolean, reporter: ProgressReporter?) {
private fun handleRoomSync(realm: Realm,
handlingStrategy: HandlingStrategy,
isInitialSync: Boolean,
aggregator: SyncResponsePostTreatmentAggregator,
reporter: ProgressReporter?) {
val insertType = if (isInitialSync) {
} else {
@ -119,12 +112,12 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
val rooms = when (handlingStrategy) {
is HandlingStrategy.JOINED -> {
if (isInitialSync && initialSyncStrategy is InitialSyncStrategy.Optimized) {
insertJoinRoomsFromInitSync(realm, handlingStrategy, syncLocalTimeStampMillis, reporter)
insertJoinRoomsFromInitSync(realm, handlingStrategy, syncLocalTimeStampMillis, aggregator, reporter)
// Rooms are already inserted, return an empty list
} else {
handlingStrategy.data.mapWithProgress(reporter, InitSyncStep.ImportingAccountJoinedRooms, 0.6f) {
handleJoinedRoom(realm, it.key, it.value, true, insertType, syncLocalTimeStampMillis)
handleJoinedRoom(realm, it.key, it.value, insertType, syncLocalTimeStampMillis, aggregator)
@ -145,6 +138,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
private fun insertJoinRoomsFromInitSync(realm: Realm,
handlingStrategy: HandlingStrategy.JOINED,
syncLocalTimeStampMillis: Long,
aggregator: SyncResponsePostTreatmentAggregator,
reporter: ProgressReporter?) {
val maxSize = (initialSyncStrategy as? InitialSyncStrategy.Optimized)?.maxRoomsToInsert ?: Int.MAX_VALUE
val listSize = handlingStrategy.data.keys.size
@ -165,9 +159,9 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
realm = realm,
roomId = it,
roomSync = handlingStrategy.data[it] ?: error("Should not happen"),
handleEphemeralEvents = false,
insertType = EventInsertType.INITIAL_SYNC,
syncLocalTimestampMillis = syncLocalTimeStampMillis
syncLocalTimestampMillis = syncLocalTimeStampMillis,
@ -177,7 +171,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
} else {
// No need to split
val rooms = handlingStrategy.data.mapWithProgress(reporter, InitSyncStep.ImportingAccountJoinedRooms, 0.6f) {
handleJoinedRoom(realm, it.key, it.value, false, EventInsertType.INITIAL_SYNC, syncLocalTimeStampMillis)
handleJoinedRoom(realm, it.key, it.value, EventInsertType.INITIAL_SYNC, syncLocalTimeStampMillis, aggregator)
@ -186,17 +180,16 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
private fun handleJoinedRoom(realm: Realm,
roomId: String,
roomSync: RoomSync,
handleEphemeralEvents: Boolean,
insertType: EventInsertType,
syncLocalTimestampMillis: Long): RoomEntity {
syncLocalTimestampMillis: Long,
aggregator: SyncResponsePostTreatmentAggregator): RoomEntity {
Timber.v("Handle join sync for room $roomId")
var ephemeralResult: EphemeralResult? = null
if (handleEphemeralEvents) {
ephemeralResult = roomSync.ephemeral?.roomSyncEphemeral?.events
?.takeIf { it.isNotEmpty() }
?.let { handleEphemeral(realm, roomId, it, insertType == EventInsertType.INITIAL_SYNC) }
val ephemeralResult = (roomSync.ephemeral as? LazyRoomSyncEphemeral.Parsed)
?.takeIf { it.isNotEmpty() }
?.let { handleEphemeral(realm, roomId, it, insertType == EventInsertType.INITIAL_SYNC, aggregator) }
if (roomSync.accountData?.events?.isNotEmpty() == true) {
handleRoomAccountDataEvents(realm, roomId, roomSync.accountData)
@ -436,14 +429,15 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
private fun handleEphemeral(realm: Realm,
roomId: String,
ephemeralEvents: List<Event>,
isInitialSync: Boolean): EphemeralResult {
isInitialSync: Boolean,
aggregator: SyncResponsePostTreatmentAggregator): EphemeralResult {
var result = EphemeralResult()
for (event in ephemeralEvents) {
when (event.type) {
EventType.RECEIPT -> {
(event.content as? ReadReceiptContent)?.let { readReceiptContent ->
readReceiptHandler.handle(realm, roomId, readReceiptContent, isInitialSync)
readReceiptHandler.handle(realm, roomId, readReceiptContent, isInitialSync, aggregator)
EventType.TYPING -> {
@ -26,6 +26,7 @@ import javax.inject.Inject
internal class RoomTypingUsersHandler @Inject constructor(@UserId private val userId: String,
private val typingUsersTracker: DefaultTypingUsersTracker) {
// TODO This could be handled outside of the Realm transaction. Use the new aggregator?
fun handle(realm: Realm, roomId: String, ephemeralResult: RoomSyncHandler.EphemeralResult?) {
val roomMemberHelper = RoomMemberHelper(realm, roomId)
val typingIds = ephemeralResult?.typingUserIds?.filter { it != userId }.orEmpty()
@ -37,4 +37,7 @@ internal abstract class SyncModule {
abstract fun bindSyncTask(task: DefaultSyncTask): SyncTask
abstract fun bindRoomSyncEphemeralTemporaryStore(store: RoomSyncEphemeralTemporaryStoreFile): RoomSyncEphemeralTemporaryStore
@ -41,17 +41,19 @@ import kotlin.system.measureTimeMillis
internal class SyncResponseHandler @Inject constructor(@SessionDatabase private val monarchy: Monarchy,
@SessionId private val sessionId: String,
private val workManagerProvider: WorkManagerProvider,
private val roomSyncHandler: RoomSyncHandler,
private val userAccountDataSyncHandler: UserAccountDataSyncHandler,
private val groupSyncHandler: GroupSyncHandler,
private val cryptoSyncHandler: CryptoSyncHandler,
private val cryptoService: DefaultCryptoService,
private val tokenStore: SyncTokenStore,
private val processEventForPushTask: ProcessEventForPushTask,
private val pushRuleService: PushRuleService) {
internal class SyncResponseHandler @Inject constructor(
@SessionDatabase private val monarchy: Monarchy,
@SessionId private val sessionId: String,
private val workManagerProvider: WorkManagerProvider,
private val roomSyncHandler: RoomSyncHandler,
private val userAccountDataSyncHandler: UserAccountDataSyncHandler,
private val groupSyncHandler: GroupSyncHandler,
private val cryptoSyncHandler: CryptoSyncHandler,
private val aggregatorHandler: SyncResponsePostTreatmentAggregatorHandler,
private val cryptoService: DefaultCryptoService,
private val tokenStore: SyncTokenStore,
private val processEventForPushTask: ProcessEventForPushTask,
private val pushRuleService: PushRuleService) {
suspend fun handleResponse(syncResponse: SyncResponse,
fromToken: String?,
@ -81,13 +83,14 @@ internal class SyncResponseHandler @Inject constructor(@SessionDatabase private
}.also {
Timber.v("Finish handling toDevice in $it ms")
val aggregator = SyncResponsePostTreatmentAggregator()
// Start one big transaction
monarchy.awaitTransaction { realm ->
measureTimeMillis {
Timber.v("Handle rooms")
reportSubtask(reporter, InitSyncStep.ImportingAccountRoom, 1, 0.7f) {
if (syncResponse.rooms != null) {
roomSyncHandler.handle(realm, syncResponse.rooms, isInitialSync, reporter)
roomSyncHandler.handle(realm, syncResponse.rooms, isInitialSync, aggregator, reporter)
}.also {
@ -115,7 +118,10 @@ internal class SyncResponseHandler @Inject constructor(@SessionDatabase private
tokenStore.saveToken(realm, syncResponse.nextBatch)
// Everything else we need to do outside the transaction
syncResponse.rooms?.let {
checkPushRules(it, isInitialSync)
@ -128,15 +134,6 @@ internal class SyncResponseHandler @Inject constructor(@SessionDatabase private
suspend fun handleInitSyncSecondTransaction(syncResponse: SyncResponse) {
// Start another transaction to handle the ephemeral events
monarchy.awaitTransaction { realm ->
if (syncResponse.rooms != null) {
roomSyncHandler.handleInitSyncEphemeral(realm, syncResponse.rooms)
* At the moment we don't get any group data through the sync, so we poll where every hour.
* You can also force to refetch group data using [Group] API.
@ -0,0 +1,22 @@
* Copyright (c) 2021 The Matrix.org Foundation C.I.C.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.matrix.android.sdk.internal.session.sync
internal class SyncResponsePostTreatmentAggregator {
// List of RoomId
val ephemeralFilesToDelete = mutableListOf<String>()
@ -0,0 +1,33 @@
* Copyright (c) 2021 The Matrix.org Foundation C.I.C.
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package org.matrix.android.sdk.internal.session.sync
import javax.inject.Inject
internal class SyncResponsePostTreatmentAggregatorHandler @Inject constructor(
private val ephemeralTemporaryStore: RoomSyncEphemeralTemporaryStore
) {
fun handle(synResHaResponsePostTreatmentAggregator: SyncResponsePostTreatmentAggregator) {
private fun cleanupEphemeralFiles(ephemeralFilesToDelete: List<String>) {
ephemeralFilesToDelete.forEach {
@ -62,7 +62,8 @@ internal class DefaultSyncTask @Inject constructor(
private val globalErrorReceiver: GlobalErrorReceiver,
private val fileDirectory: File,
private val syncResponseParser: InitialSyncResponseParser
private val syncResponseParser: InitialSyncResponseParser,
private val roomSyncEphemeralTemporaryStore: RoomSyncEphemeralTemporaryStore
) : SyncTask {
private val workingDir = File(fileDirectory, "is")
@ -102,13 +103,16 @@ internal class DefaultSyncTask @Inject constructor(
if (isInitialSync) {
Timber.v("INIT_SYNC with filter: ${requestParams["filter"]}")
val initSyncStrategy = initialSyncStrategy
var syncResp: SyncResponse? = null
logDuration("INIT_SYNC strategy: $initSyncStrategy") {
if (initSyncStrategy is InitialSyncStrategy.Optimized) {
val file = downloadInitSyncResponse(requestParams)
syncResp = reportSubtask(initialSyncProgressService, InitSyncStep.ImportingAccount, 1, 0.7F) {
reportSubtask(initialSyncProgressService, InitSyncStep.ImportingAccount, 1, 0.7F) {
handleSyncFile(file, initSyncStrategy)
// Delete all files
} else {
val syncResponse = logDuration("INIT_SYNC Request") {
executeRequest<SyncResponse>(globalErrorReceiver) {
@ -125,15 +129,6 @@ internal class DefaultSyncTask @Inject constructor(
if (initSyncStrategy is InitialSyncStrategy.Optimized) {
logDuration("INIT_SYNC Handle ephemeral") {
// Delete all files
} else {
val syncResponse = executeRequest<SyncResponse>(globalErrorReceiver) {
apiCall = syncAPI.sync(
@ -147,7 +142,6 @@ internal class DefaultSyncTask @Inject constructor(
private suspend fun downloadInitSyncResponse(requestParams: Map<String, String>): File {
val workingFile = File(workingDir, "initSync.json")
val status = initialSyncStatusRepository.getStep()
if (workingFile.exists() && status >= InitialSyncStatus.STEP_DOWNLOADED) {
@ -201,8 +195,8 @@ internal class DefaultSyncTask @Inject constructor(
private suspend fun handleSyncFile(workingFile: File, initSyncStrategy: InitialSyncStrategy.Optimized): SyncResponse {
return logDuration("INIT_SYNC handleSyncFile()") {
private suspend fun handleSyncFile(workingFile: File, initSyncStrategy: InitialSyncStrategy.Optimized) {
logDuration("INIT_SYNC handleSyncFile()") {
val syncResponse = logDuration("INIT_SYNC Read file and parse") {
syncResponseParser.parse(initSyncStrategy, workingFile)
@ -215,7 +209,7 @@ internal class DefaultSyncTask @Inject constructor(
logDuration("INIT_SYNC Database insertion") {
syncResponseHandler.handleResponse(syncResponse, null, initialSyncProgressService)
@ -16,28 +16,10 @@
package org.matrix.android.sdk.internal.session.sync.model
import com.squareup.moshi.JsonAdapter
import com.squareup.moshi.JsonClass
import com.squareup.moshi.JsonReader
import okio.buffer
import okio.source
import java.io.File
@JsonClass(generateAdapter = false)
internal sealed class LazyRoomSyncEphemeral {
data class Parsed(val _roomSyncEphemeral: RoomSyncEphemeral) : LazyRoomSyncEphemeral()
data class Stored(val roomSyncEphemeralAdapter: JsonAdapter<RoomSyncEphemeral>, val file: File) : LazyRoomSyncEphemeral()
val roomSyncEphemeral: RoomSyncEphemeral
get() {
return when (this) {
is Parsed -> _roomSyncEphemeral
is Stored -> {
// Parse the file now
file.inputStream().use { pos ->
object Stored : LazyRoomSyncEphemeral()
@ -22,11 +22,10 @@ import com.squareup.moshi.JsonReader
import com.squareup.moshi.JsonWriter
import com.squareup.moshi.ToJson
import org.matrix.android.sdk.internal.session.sync.InitialSyncStrategy
import org.matrix.android.sdk.internal.session.sync.RoomSyncEphemeralTemporaryStore
import org.matrix.android.sdk.internal.session.sync.model.LazyRoomSyncEphemeral
import org.matrix.android.sdk.internal.session.sync.model.RoomSyncEphemeral
import timber.log.Timber
import java.io.File
import java.util.concurrent.atomic.AtomicInteger
internal class DefaultLazyRoomSyncEphemeralJsonAdapter {
@ -44,20 +43,15 @@ internal class DefaultLazyRoomSyncEphemeralJsonAdapter {
internal class SplitLazyRoomSyncJsonAdapter(
private val workingDirectory: File,
internal class SplitLazyRoomSyncEphemeralJsonAdapter(
private val roomSyncEphemeralTemporaryStore: RoomSyncEphemeralTemporaryStore,
private val syncStrategy: InitialSyncStrategy.Optimized
) {
private val atomicInteger = AtomicInteger(0)
private fun createFile(): File {
val index = atomicInteger.getAndIncrement()
return File(workingDirectory, "room_$index.json")
fun fromJson(reader: JsonReader, delegate: JsonAdapter<RoomSyncEphemeral>): LazyRoomSyncEphemeral? {
val path = reader.path
val roomId = path.substringAfter("\$.rooms.join.").substringBeforeLast(".ephemeral")
val json = reader.nextSource().inputStream().bufferedReader().use {
@ -65,9 +59,8 @@ internal class SplitLazyRoomSyncJsonAdapter(
return if (json.length > limit) {
Timber.v("INIT_SYNC $path content length: ${json.length} copy to a file")
// Copy the source to a file
val file = createFile()
LazyRoomSyncEphemeral.Stored(delegate, file)
roomSyncEphemeralTemporaryStore.write(roomId, json)
} else {
Timber.v("INIT_SYNC $path content length: ${json.length} parse it now")
val roomSync = delegate.fromJson(json) ?: return null
@ -20,29 +20,33 @@ import com.squareup.moshi.Moshi
import okio.buffer
import okio.source
import org.matrix.android.sdk.internal.session.sync.InitialSyncStrategy
import org.matrix.android.sdk.internal.session.sync.RoomSyncEphemeralTemporaryStore
import org.matrix.android.sdk.internal.session.sync.model.SyncResponse
import timber.log.Timber
import java.io.File
import javax.inject.Inject
internal class InitialSyncResponseParser @Inject constructor(private val moshi: Moshi) {
internal class InitialSyncResponseParser @Inject constructor(
private val moshi: Moshi,
private val roomSyncEphemeralTemporaryStore: RoomSyncEphemeralTemporaryStore
) {
fun parse(syncStrategy: InitialSyncStrategy.Optimized, workingFile: File): SyncResponse {
val syncResponseLength = workingFile.length().toInt()
Timber.v("INIT_SYNC Sync file size is $syncResponseLength bytes")
val shouldSplit = syncResponseLength >= syncStrategy.minSizeToSplit
Timber.v("INIT_SYNC should split in several files: $shouldSplit")
return getMoshi(syncStrategy, workingFile.parentFile!!, shouldSplit)
return getMoshi(syncStrategy, shouldSplit)
private fun getMoshi(syncStrategy: InitialSyncStrategy.Optimized, workingDirectory: File, shouldSplit: Boolean): Moshi {
private fun getMoshi(syncStrategy: InitialSyncStrategy.Optimized, shouldSplit: Boolean): Moshi {
// If we don't have to split we'll rely on the already default moshi
if (!shouldSplit) return moshi
// Otherwise, we create a new adapter for handling Map of Lazy sync
return moshi.newBuilder()
.add(SplitLazyRoomSyncJsonAdapter(workingDirectory, syncStrategy))
.add(SplitLazyRoomSyncEphemeralJsonAdapter(roomSyncEphemeralTemporaryStore, syncStrategy))
@ -19,6 +19,18 @@ package org.matrix.android.sdk.internal.util
import org.matrix.android.sdk.BuildConfig
import timber.log.Timber
internal fun <T> Collection<T>.logLimit(maxQuantity: Int = 5): String {
return buildString {
append(" item(s)")
if (size > maxQuantity) {
append(", first $maxQuantity items")
append(": ")
internal suspend fun <T> logDuration(message: String,
block: suspend () -> T): T {
Timber.v("$message -- BEGIN")
@ -92,7 +92,8 @@ class RoomSummaryItemFactory @Inject constructor(private val displayableEventFor
return RoomSummaryItem_()
// We do not display shield in the room list anymore
// .encryptionTrustLevel(roomSummary.roomEncryptionTrustLevel)
Add table
Reference in a new issue