From f6b38d2b494cb18b4be6c32a5c57e1717bf88649 Mon Sep 17 00:00:00 2001 From: ariskotsomitopoulos Date: Mon, 21 Mar 2022 13:13:09 +0200 Subject: [PATCH 1/4] Add runBlocking when decrypt events to avoid thread switching when accessing the realm instance (thread local) --- .../database/helper/ThreadSummaryHelper.kt | 19 ++++-- .../sync/handler/room/RoomSyncHandler.kt | 66 ++++++++++--------- 2 files changed, 48 insertions(+), 37 deletions(-) diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/helper/ThreadSummaryHelper.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/helper/ThreadSummaryHelper.kt index 7087f07162..8d9304f61b 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/helper/ThreadSummaryHelper.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/helper/ThreadSummaryHelper.kt @@ -20,6 +20,7 @@ import io.realm.Realm import io.realm.RealmQuery import io.realm.Sort import io.realm.kotlin.createObject +import kotlinx.coroutines.runBlocking import org.matrix.android.sdk.api.session.crypto.CryptoService import org.matrix.android.sdk.api.session.crypto.MXCryptoError import org.matrix.android.sdk.api.session.events.model.Event @@ -127,7 +128,7 @@ private fun EventEntity.toTimelineEventEntity(roomMemberContentsByUser: HashMap< return timelineEventEntity } -internal suspend fun ThreadSummaryEntity.Companion.createOrUpdate( +internal fun ThreadSummaryEntity.Companion.createOrUpdate( threadSummaryType: ThreadSummaryUpdateType, realm: Realm, roomId: String, @@ -153,10 +154,18 @@ internal suspend fun ThreadSummaryEntity.Companion.createOrUpdate( } val rootThreadEventEntity = createEventEntity(roomId, rootThreadEvent, realm).also { - decryptIfNeeded(cryptoService, it, roomId) + try { + decryptIfNeeded(cryptoService, it, roomId) + } catch (e: InterruptedException) { + Timber.i("Decryption got interrupted") + } } val latestThreadEventEntity = createLatestEventEntity(roomId, rootThreadEvent, roomMemberContentsByUser, realm)?.also { - decryptIfNeeded(cryptoService, it, roomId) + try { + decryptIfNeeded(cryptoService, it, roomId) + } catch (e: InterruptedException) { + Timber.i("Decryption got interrupted") + } } val isUserParticipating = rootThreadEvent.unsignedData.relations.latestThread.isUserParticipating == true || rootThreadEvent.senderId == userId roomMemberContentsByUser.addSenderState(realm, roomId, rootThreadEvent.senderId) @@ -204,8 +213,8 @@ internal suspend fun ThreadSummaryEntity.Companion.createOrUpdate( } } -private suspend fun decryptIfNeeded(cryptoService: CryptoService?, eventEntity: EventEntity, roomId: String) { - cryptoService ?: return +private fun decryptIfNeeded(cryptoService: CryptoService?, eventEntity: EventEntity, roomId: String) = runBlocking { + cryptoService ?: return@runBlocking val event = eventEntity.asDomain() if (event.isEncrypted() && event.mxDecryptionResult == null && event.eventId != null) { try { diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/handler/room/RoomSyncHandler.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/handler/room/RoomSyncHandler.kt index 8fe85f0d31..51e32b25cb 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/handler/room/RoomSyncHandler.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/handler/room/RoomSyncHandler.kt @@ -102,11 +102,11 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle data class LEFT(val data: Map) : HandlingStrategy() } - suspend fun handle(realm: Realm, - roomsSyncResponse: RoomsSyncResponse, - isInitialSync: Boolean, - aggregator: SyncResponsePostTreatmentAggregator, - reporter: ProgressReporter? = null) { + fun handle(realm: Realm, + roomsSyncResponse: RoomsSyncResponse, + isInitialSync: Boolean, + aggregator: SyncResponsePostTreatmentAggregator, + reporter: ProgressReporter? = null) { Timber.v("Execute transaction from $this") handleRoomSync(realm, HandlingStrategy.JOINED(roomsSyncResponse.join), isInitialSync, aggregator, reporter) handleRoomSync(realm, HandlingStrategy.INVITED(roomsSyncResponse.invite), isInitialSync, aggregator, reporter) @@ -121,11 +121,11 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle } // PRIVATE METHODS ***************************************************************************** - private suspend fun handleRoomSync(realm: Realm, - handlingStrategy: HandlingStrategy, - isInitialSync: Boolean, - aggregator: SyncResponsePostTreatmentAggregator, - reporter: ProgressReporter?) { + private fun handleRoomSync(realm: Realm, + handlingStrategy: HandlingStrategy, + isInitialSync: Boolean, + aggregator: SyncResponsePostTreatmentAggregator, + reporter: ProgressReporter?) { val insertType = if (isInitialSync) { EventInsertType.INITIAL_SYNC } else { @@ -158,11 +158,11 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle realm.insertOrUpdate(rooms) } - private suspend fun insertJoinRoomsFromInitSync(realm: Realm, - handlingStrategy: HandlingStrategy.JOINED, - syncLocalTimeStampMillis: Long, - aggregator: SyncResponsePostTreatmentAggregator, - reporter: ProgressReporter?) { + private fun insertJoinRoomsFromInitSync(realm: Realm, + handlingStrategy: HandlingStrategy.JOINED, + syncLocalTimeStampMillis: Long, + aggregator: SyncResponsePostTreatmentAggregator, + reporter: ProgressReporter?) { val bestChunkSize = computeBestChunkSize( listSize = handlingStrategy.data.keys.size, limit = (initialSyncStrategy as? InitialSyncStrategy.Optimized)?.maxRoomsToInsert ?: Int.MAX_VALUE @@ -200,12 +200,12 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle } } - private suspend fun handleJoinedRoom(realm: Realm, - roomId: String, - roomSync: RoomSync, - insertType: EventInsertType, - syncLocalTimestampMillis: Long, - aggregator: SyncResponsePostTreatmentAggregator): RoomEntity { + private fun handleJoinedRoom(realm: Realm, + roomId: String, + roomSync: RoomSync, + insertType: EventInsertType, + syncLocalTimestampMillis: Long, + aggregator: SyncResponsePostTreatmentAggregator): RoomEntity { Timber.v("Handle join sync for room $roomId") val ephemeralResult = (roomSync.ephemeral as? LazyRoomSyncEphemeral.Parsed) @@ -351,15 +351,15 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle return roomEntity } - private suspend fun handleTimelineEvents(realm: Realm, - roomId: String, - roomEntity: RoomEntity, - eventList: List, - prevToken: String? = null, - isLimited: Boolean = true, - insertType: EventInsertType, - syncLocalTimestampMillis: Long, - aggregator: SyncResponsePostTreatmentAggregator): ChunkEntity { + private fun handleTimelineEvents(realm: Realm, + roomId: String, + roomEntity: RoomEntity, + eventList: List, + prevToken: String? = null, + isLimited: Boolean = true, + insertType: EventInsertType, + syncLocalTimestampMillis: Long, + aggregator: SyncResponsePostTreatmentAggregator): ChunkEntity { val lastChunk = ChunkEntity.findLastForwardChunkOfRoom(realm, roomEntity.roomId) if (isLimited && lastChunk != null) { lastChunk.deleteOnCascade(deleteStateEvents = false, canDeleteRoot = true) @@ -387,8 +387,10 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle val isInitialSync = insertType == EventInsertType.INITIAL_SYNC if (event.isEncrypted() && !isInitialSync) { - runBlocking { + try { decryptIfNeeded(event, roomId) + } catch (e: InterruptedException) { + Timber.i("Decryption got interrupted") } } var contentToInject: String? = null @@ -504,7 +506,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle } } - private suspend fun decryptIfNeeded(event: Event, roomId: String) { + private fun decryptIfNeeded(event: Event, roomId: String) = runBlocking { try { // Event from sync does not have roomId, so add it to the event first val result = cryptoService.decryptEvent(event.copy(roomId = roomId), "") From a959a15636cf7ad783aa8ca2ab8c07d611941d25 Mon Sep 17 00:00:00 2001 From: ariskotsomitopoulos Date: Mon, 21 Mar 2022 13:18:20 +0200 Subject: [PATCH 2/4] Add changelog --- changelog.d/5592.bugfix | 1 + 1 file changed, 1 insertion(+) create mode 100644 changelog.d/5592.bugfix diff --git a/changelog.d/5592.bugfix b/changelog.d/5592.bugfix new file mode 100644 index 0000000000..f0df3dc646 --- /dev/null +++ b/changelog.d/5592.bugfix @@ -0,0 +1 @@ +Improve/fix crashes on messages decryption From 277619d833ac703710f6a1ecc6c897d6311898ed Mon Sep 17 00:00:00 2001 From: ariskotsomitopoulos Date: Mon, 21 Mar 2022 13:39:15 +0200 Subject: [PATCH 3/4] Format code & add comments --- .../database/helper/ThreadSummaryHelper.kt | 1 + .../room/timeline/TimelineEventDecryptor.kt | 24 +++++++++---------- .../sync/handler/room/RoomSyncHandler.kt | 1 + 3 files changed, 14 insertions(+), 12 deletions(-) diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/helper/ThreadSummaryHelper.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/helper/ThreadSummaryHelper.kt index 8d9304f61b..bfe310cd8b 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/helper/ThreadSummaryHelper.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/helper/ThreadSummaryHelper.kt @@ -213,6 +213,7 @@ internal fun ThreadSummaryEntity.Companion.createOrUpdate( } } +// note: runBlocking should be used here while we are in realm single thread executor, to avoid thread switching private fun decryptIfNeeded(cryptoService: CryptoService?, eventEntity: EventEntity, roomId: String) = runBlocking { cryptoService ?: return@runBlocking val event = eventEntity.asDomain() diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/timeline/TimelineEventDecryptor.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/timeline/TimelineEventDecryptor.kt index 3ddd877b78..e225b31f45 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/timeline/TimelineEventDecryptor.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/timeline/TimelineEventDecryptor.kt @@ -101,9 +101,7 @@ internal class TimelineEventDecryptor @Inject constructor( executor?.execute { Realm.getInstance(realmConfiguration).use { realm -> try { - runBlocking { - processDecryptRequest(request, realm) - } + processDecryptRequest(request, realm) } catch (e: InterruptedException) { Timber.i("Decryption got interrupted") } @@ -113,16 +111,18 @@ internal class TimelineEventDecryptor @Inject constructor( private fun threadAwareNonEncryptedEvents(request: DecryptionRequest, realm: Realm) { val event = request.event - realm.executeTransaction { - val eventId = event.eventId ?: return@executeTransaction - val eventEntity = EventEntity - .where(it, eventId = eventId) - .findFirst() - val decryptedEvent = eventEntity?.asDomain() - threadsAwarenessHandler.makeEventThreadAware(realm, event.roomId, decryptedEvent, eventEntity) + realm.executeTransaction { + val eventId = event.eventId ?: return@executeTransaction + val eventEntity = EventEntity + .where(it, eventId = eventId) + .findFirst() + val decryptedEvent = eventEntity?.asDomain() + threadsAwarenessHandler.makeEventThreadAware(realm, event.roomId, decryptedEvent, eventEntity) } } - private suspend fun processDecryptRequest(request: DecryptionRequest, realm: Realm) { + + // note: runBlocking should be used here while we are in realm single thread executor, to avoid thread switching + private fun processDecryptRequest(request: DecryptionRequest, realm: Realm) = runBlocking { val event = request.event val timelineId = request.timelineId @@ -130,7 +130,7 @@ internal class TimelineEventDecryptor @Inject constructor( // Here we have requested a decryption to an event that is not encrypted // We will simply make this event thread aware threadAwareNonEncryptedEvents(request, realm) - return + return@runBlocking } try { val result = cryptoService.decryptEvent(request.event, timelineId) diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/handler/room/RoomSyncHandler.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/handler/room/RoomSyncHandler.kt index 51e32b25cb..e488fd0671 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/handler/room/RoomSyncHandler.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/handler/room/RoomSyncHandler.kt @@ -506,6 +506,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle } } + // note: runBlocking should be used here while we are in realm single thread executor, to avoid thread switching private fun decryptIfNeeded(event: Event, roomId: String) = runBlocking { try { // Event from sync does not have roomId, so add it to the event first From 5cfe218634d33a9fda18ccc7796f50b1375aad07 Mon Sep 17 00:00:00 2001 From: ariskotsomitopoulos Date: Wed, 27 Apr 2022 12:38:25 +0300 Subject: [PATCH 4/4] Wrap only cryptoService.decryptEvent with runBlocking instead of the whole methods --- .../database/helper/ThreadSummaryHelper.kt | 8 ++++---- .../room/timeline/TimelineEventDecryptor.kt | 8 ++++---- .../session/sync/handler/room/RoomSyncHandler.kt | 15 +++++++++------ 3 files changed, 17 insertions(+), 14 deletions(-) diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/helper/ThreadSummaryHelper.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/helper/ThreadSummaryHelper.kt index 66a0309bf4..89474a1bc5 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/helper/ThreadSummaryHelper.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/helper/ThreadSummaryHelper.kt @@ -213,15 +213,15 @@ internal fun ThreadSummaryEntity.Companion.createOrUpdate( } } -// note: runBlocking should be used here while we are in realm single thread executor, to avoid thread switching -private fun decryptIfNeeded(cryptoService: CryptoService?, eventEntity: EventEntity, roomId: String) = runBlocking { - cryptoService ?: return@runBlocking +private fun decryptIfNeeded(cryptoService: CryptoService?, eventEntity: EventEntity, roomId: String) { + cryptoService ?: return val event = eventEntity.asDomain() if (event.isEncrypted() && event.mxDecryptionResult == null && event.eventId != null) { try { Timber.i("###THREADS ThreadSummaryHelper request decryption for eventId:${event.eventId}") // Event from sync does not have roomId, so add it to the event first - val result = cryptoService.decryptEvent(event.copy(roomId = roomId), "") + // note: runBlocking should be used here while we are in realm single thread executor, to avoid thread switching + val result = runBlocking { cryptoService.decryptEvent(event.copy(roomId = roomId), "") } event.mxDecryptionResult = OlmDecryptionResult( payload = result.clearEvent, senderKey = result.senderCurve25519Key, diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/timeline/TimelineEventDecryptor.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/timeline/TimelineEventDecryptor.kt index 41e173a5db..de79661de0 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/timeline/TimelineEventDecryptor.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/timeline/TimelineEventDecryptor.kt @@ -119,8 +119,7 @@ internal class TimelineEventDecryptor @Inject constructor( } } - // note: runBlocking should be used here while we are in realm single thread executor, to avoid thread switching - private fun processDecryptRequest(request: DecryptionRequest, realm: Realm) = runBlocking { + private fun processDecryptRequest(request: DecryptionRequest, realm: Realm) { val event = request.event val timelineId = request.timelineId @@ -128,10 +127,11 @@ internal class TimelineEventDecryptor @Inject constructor( // Here we have requested a decryption to an event that is not encrypted // We will simply make this event thread aware threadAwareNonEncryptedEvents(request, realm) - return@runBlocking + return } try { - val result = cryptoService.decryptEvent(request.event, timelineId) + // note: runBlocking should be used here while we are in realm single thread executor, to avoid thread switching + val result = runBlocking { cryptoService.decryptEvent(request.event, timelineId) } Timber.v("Successfully decrypted event ${event.eventId}") realm.executeTransaction { val eventId = event.eventId ?: return@executeTransaction diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/handler/room/RoomSyncHandler.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/handler/room/RoomSyncHandler.kt index 6f8e0bbbb8..05dad983da 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/handler/room/RoomSyncHandler.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/handler/room/RoomSyncHandler.kt @@ -423,7 +423,8 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle roomId = roomId, eventEntity = eventEntity, direction = PaginationDirection.FORWARDS, - roomMemberContentsByUser = roomMemberContentsByUser) + roomMemberContentsByUser = roomMemberContentsByUser + ) if (lightweightSettingsStorage.areThreadMessagesEnabled()) { eventEntity.rootThreadEventId?.let { // This is a thread event @@ -439,7 +440,8 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle threadEventEntity = eventEntity, roomMemberContentsByUser = roomMemberContentsByUser, userId = userId, - roomEntity = roomEntity) + roomEntity = roomEntity + ) } } ?: run { // This is a normal event or a root thread one @@ -477,7 +479,8 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle roomId = roomId, realm = realm, chunkEntity = chunkEntity, - currentUserId = userId) + currentUserId = userId + ) } // posting new events to timeline if any is registered @@ -507,11 +510,11 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle } } - // note: runBlocking should be used here while we are in realm single thread executor, to avoid thread switching - private fun decryptIfNeeded(event: Event, roomId: String) = runBlocking { + private fun decryptIfNeeded(event: Event, roomId: String) { try { // Event from sync does not have roomId, so add it to the event first - val result = cryptoService.decryptEvent(event.copy(roomId = roomId), "") + // note: runBlocking should be used here while we are in realm single thread executor, to avoid thread switching + val result = runBlocking { cryptoService.decryptEvent(event.copy(roomId = roomId), "") } event.mxDecryptionResult = OlmDecryptionResult( payload = result.clearEvent, senderKey = result.senderCurve25519Key,