fixes crash when accepting calls

- the event insert logic is designed to be single threaded however the scope will allow coroutine continuation which leads to unintended multiple thread access for processing and post processing
- the fix is to convert the launching to a flow which will sequentially process the launch logic on the single threaded scope
This commit is contained in:
Adam Brown 2022-05-03 12:13:28 +01:00
parent 0325754d12
commit c09a93c171
2 changed files with 36 additions and 21 deletions

1
changelog.d/5421.bugfix Normal file
View file

@ -0,0 +1 @@
Fixes crash when accepting or receiving VOIP calls

View file

@ -19,6 +19,9 @@ package org.matrix.android.sdk.internal.database
import com.zhuinden.monarchy.Monarchy import com.zhuinden.monarchy.Monarchy
import io.realm.RealmConfiguration import io.realm.RealmConfiguration
import io.realm.RealmResults import io.realm.RealmResults
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import org.matrix.android.sdk.internal.database.mapper.asDomain import org.matrix.android.sdk.internal.database.mapper.asDomain
import org.matrix.android.sdk.internal.database.model.EventEntity import org.matrix.android.sdk.internal.database.model.EventEntity
@ -38,10 +41,22 @@ internal class EventInsertLiveObserver @Inject constructor(@SessionDatabase real
it.where(EventInsertEntity::class.java).equalTo(EventInsertEntityFields.CAN_BE_PROCESSED, true) it.where(EventInsertEntity::class.java).equalTo(EventInsertEntityFields.CAN_BE_PROCESSED, true)
} }
private val onResultsChangedFlow = MutableSharedFlow<RealmResults<EventInsertEntity>>()
init {
onResultsChangedFlow
.onEach { handleChange(it) }
.launchIn(observerScope)
}
override fun onChange(results: RealmResults<EventInsertEntity>) { override fun onChange(results: RealmResults<EventInsertEntity>) {
if (!results.isLoaded || results.isEmpty()) { if (!results.isLoaded || results.isEmpty()) {
return return
} }
observerScope.launch { onResultsChangedFlow.emit(results) }
}
private suspend fun handleChange(results: RealmResults<EventInsertEntity>) {
val idsToDeleteAfterProcess = ArrayList<String>() val idsToDeleteAfterProcess = ArrayList<String>()
val filteredEvents = ArrayList<EventInsertEntity>(results.size) val filteredEvents = ArrayList<EventInsertEntity>(results.size)
Timber.v("EventInsertEntity updated with ${results.size} results in db") Timber.v("EventInsertEntity updated with ${results.size} results in db")
@ -58,30 +73,29 @@ internal class EventInsertLiveObserver @Inject constructor(@SessionDatabase real
} }
idsToDeleteAfterProcess.add(it.eventId) idsToDeleteAfterProcess.add(it.eventId)
} }
observerScope.launch {
awaitTransaction(realmConfiguration) { realm -> awaitTransaction(realmConfiguration) { realm ->
Timber.v("##Transaction: There are ${filteredEvents.size} events to process ") Timber.v("##Transaction: There are ${filteredEvents.size} events to process ")
filteredEvents.forEach { eventInsert -> filteredEvents.forEach { eventInsert ->
val eventId = eventInsert.eventId val eventId = eventInsert.eventId
val event = EventEntity.where(realm, eventId).findFirst() val event = EventEntity.where(realm, eventId).findFirst()
if (event == null) { if (event == null) {
Timber.v("Event $eventId not found") Timber.v("Event $eventId not found")
return@forEach return@forEach
} }
val domainEvent = event.asDomain() val domainEvent = event.asDomain()
processors.filter { processors.filter {
it.shouldProcess(eventId, domainEvent.getClearType(), eventInsert.insertType) it.shouldProcess(eventId, domainEvent.getClearType(), eventInsert.insertType)
}.forEach { }.forEach {
it.process(realm, domainEvent) it.process(realm, domainEvent)
}
} }
realm.where(EventInsertEntity::class.java)
.`in`(EventInsertEntityFields.EVENT_ID, idsToDeleteAfterProcess.toTypedArray())
.findAll()
.deleteAllFromRealm()
} }
processors.forEach { it.onPostProcess() } realm.where(EventInsertEntity::class.java)
.`in`(EventInsertEntityFields.EVENT_ID, idsToDeleteAfterProcess.toTypedArray())
.findAll()
.deleteAllFromRealm()
} }
processors.forEach { it.onPostProcess() }
} }
private fun shouldProcess(eventInsertEntity: EventInsertEntity): Boolean { private fun shouldProcess(eventInsertEntity: EventInsertEntity): Boolean {