LiveObservers: launch directly coroutines

This commit is contained in:
ganfra 2019-12-18 16:59:45 +01:00
parent 4c88c12cfe
commit 7697278bb2
9 changed files with 80 additions and 65 deletions

View file

@ -19,12 +19,16 @@ package im.vector.matrix.android.internal.database
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.internal.util.createBackgroundHandler
import io.realm.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancelChildren
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
internal interface LiveEntityObserver {
fun start()
fun dispose()
fun cancelProcess()
fun isStarted(): Boolean
}
@ -35,6 +39,7 @@ internal abstract class RealmLiveEntityObserver<T : RealmObject>(protected val r
val BACKGROUND_HANDLER = createBackgroundHandler("LIVE_ENTITY_BACKGROUND")
}
protected val observerScope = CoroutineScope(SupervisorJob())
protected abstract val query: Monarchy.Query<T>
private val isStarted = AtomicBoolean(false)
private val backgroundRealm = AtomicReference<Realm>()
@ -59,10 +64,15 @@ internal abstract class RealmLiveEntityObserver<T : RealmObject>(protected val r
backgroundRealm.getAndSet(null).also {
it.close()
}
observerScope.coroutineContext.cancelChildren()
}
}
}
override fun cancelProcess() {
observerScope.coroutineContext.cancelChildren()
}
override fun isStarted(): Boolean {
return isStarted.get()
}

View file

@ -85,19 +85,19 @@ internal class DefaultSession @Inject constructor(override val sessionParams: Se
private val initialSyncProgressService: Lazy<InitialSyncProgressService>,
private val homeServerCapabilitiesService: Lazy<HomeServerCapabilitiesService>)
: Session,
RoomService by roomService.get(),
RoomDirectoryService by roomDirectoryService.get(),
GroupService by groupService.get(),
UserService by userService.get(),
CryptoService by cryptoService.get(),
SignOutService by signOutService.get(),
FilterService by filterService.get(),
PushRuleService by pushRuleService.get(),
PushersService by pushersService.get(),
FileService by fileService.get(),
InitialSyncProgressService by initialSyncProgressService.get(),
SecureStorageService by secureStorageService.get(),
HomeServerCapabilitiesService by homeServerCapabilitiesService.get() {
RoomService by roomService.get(),
RoomDirectoryService by roomDirectoryService.get(),
GroupService by groupService.get(),
UserService by userService.get(),
CryptoService by cryptoService.get(),
SignOutService by signOutService.get(),
FilterService by filterService.get(),
PushRuleService by pushRuleService.get(),
PushersService by pushersService.get(),
FileService by fileService.get(),
InitialSyncProgressService by initialSyncProgressService.get(),
SecureStorageService by secureStorageService.get(),
HomeServerCapabilitiesService by homeServerCapabilitiesService.get() {
private var isOpen = false
@ -173,13 +173,14 @@ internal class DefaultSession @Inject constructor(override val sessionParams: Se
override fun clearCache(callback: MatrixCallback<Unit>) {
stopSync()
stopAnyBackgroundSync()
liveEntityObservers.forEach { it.cancelProcess() }
cacheService.get().clearCache(callback)
}
@Subscribe(threadMode = ThreadMode.MAIN)
fun onGlobalError(globalError: GlobalError) {
if (globalError is GlobalError.InvalidToken
&& globalError.softLogout) {
&& globalError.softLogout) {
// Mark the token has invalid
GlobalScope.launch(Dispatchers.IO) {
sessionParamsStore.setTokenInvalid(myUserId)

View file

@ -22,6 +22,7 @@ import androidx.work.WorkManager
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.api.session.room.model.Membership
import im.vector.matrix.android.internal.database.RealmLiveEntityObserver
import im.vector.matrix.android.internal.database.awaitTransaction
import im.vector.matrix.android.internal.database.model.GroupEntity
import im.vector.matrix.android.internal.database.model.GroupSummaryEntity
import im.vector.matrix.android.internal.database.query.where
@ -31,6 +32,7 @@ import im.vector.matrix.android.internal.worker.WorkManagerUtil.matrixOneTimeWor
import im.vector.matrix.android.internal.worker.WorkerParamsFactory
import io.realm.OrderedCollectionChangeSet
import io.realm.RealmResults
import kotlinx.coroutines.launch
import javax.inject.Inject
private const val GET_GROUP_DATA_WORKER = "GET_GROUP_DATA_WORKER"
@ -49,14 +51,19 @@ internal class GroupSummaryUpdater @Inject constructor(private val context: Cont
.mapNotNull { results[it] }
fetchGroupsData(modifiedGroupEntity
.filter { it.membership == Membership.JOIN || it.membership == Membership.INVITE }
.map { it.groupId }
.toList())
.filter { it.membership == Membership.JOIN || it.membership == Membership.INVITE }
.map { it.groupId }
.toList())
deleteGroups(modifiedGroupEntity
modifiedGroupEntity
.filter { it.membership == Membership.LEAVE }
.map { it.groupId }
.toList())
.toList()
.also {
observerScope.launch {
deleteGroups(it)
}
}
}
private fun fetchGroupsData(groupIds: List<String>) {
@ -77,12 +84,9 @@ internal class GroupSummaryUpdater @Inject constructor(private val context: Cont
/**
* Delete the GroupSummaryEntity of left groups
*/
private fun deleteGroups(groupIds: List<String>) {
monarchy
.writeAsync { realm ->
GroupSummaryEntity.where(realm, groupIds)
.findAll()
.deleteAllFromRealm()
}
private suspend fun deleteGroups(groupIds: List<String>) = awaitTransaction(monarchy.realmConfiguration) { realm ->
GroupSummaryEntity.where(realm, groupIds)
.findAll()
.deleteAllFromRealm()
}
}

View file

@ -23,11 +23,10 @@ import im.vector.matrix.android.internal.database.model.EventEntity
import im.vector.matrix.android.internal.database.query.types
import im.vector.matrix.android.internal.di.SessionDatabase
import im.vector.matrix.android.internal.di.UserId
import im.vector.matrix.android.internal.task.TaskExecutor
import im.vector.matrix.android.internal.task.configureWith
import io.realm.OrderedCollectionChangeSet
import io.realm.RealmConfiguration
import io.realm.RealmResults
import kotlinx.coroutines.launch
import timber.log.Timber
import javax.inject.Inject
@ -39,8 +38,7 @@ import javax.inject.Inject
internal class EventRelationsAggregationUpdater @Inject constructor(@SessionDatabase realmConfiguration: RealmConfiguration,
@UserId private val userId: String,
private val task: EventRelationsAggregationTask,
private val taskExecutor: TaskExecutor) :
private val task: EventRelationsAggregationTask) :
RealmLiveEntityObserver<EventEntity>(realmConfiguration) {
override val query = Monarchy.Query<EventEntity> {
@ -63,6 +61,8 @@ internal class EventRelationsAggregationUpdater @Inject constructor(@SessionData
insertedDomains,
userId
)
task.configureWith(params).executeBy(taskExecutor)
observerScope.launch {
task.execute(params)
}
}
}

View file

@ -23,6 +23,7 @@ import im.vector.matrix.android.api.session.events.model.toModel
import im.vector.matrix.android.api.session.room.model.VersioningState
import im.vector.matrix.android.api.session.room.model.create.RoomCreateContent
import im.vector.matrix.android.internal.database.RealmLiveEntityObserver
import im.vector.matrix.android.internal.database.awaitTransaction
import im.vector.matrix.android.internal.database.mapper.asDomain
import im.vector.matrix.android.internal.database.model.EventEntity
import im.vector.matrix.android.internal.database.model.RoomSummaryEntity
@ -30,9 +31,9 @@ import im.vector.matrix.android.internal.database.query.types
import im.vector.matrix.android.internal.database.query.where
import im.vector.matrix.android.internal.di.SessionDatabase
import io.realm.OrderedCollectionChangeSet
import io.realm.Realm
import io.realm.RealmConfiguration
import io.realm.RealmResults
import kotlinx.coroutines.launch
import javax.inject.Inject
internal class RoomCreateEventLiveObserver @Inject constructor(@SessionDatabase
@ -51,21 +52,21 @@ internal class RoomCreateEventLiveObserver @Inject constructor(@SessionDatabase
}
.toList()
.also {
handleRoomCreateEvents(it)
observerScope.launch {
handleRoomCreateEvents(it)
}
}
}
private fun handleRoomCreateEvents(createEvents: List<Event>) = Realm.getInstance(realmConfiguration).use {
it.executeTransactionAsync { realm ->
for (event in createEvents) {
val createRoomContent = event.getClearContent().toModel<RoomCreateContent>()
val predecessorRoomId = createRoomContent?.predecessor?.roomId ?: continue
private suspend fun handleRoomCreateEvents(createEvents: List<Event>) = awaitTransaction(realmConfiguration) { realm ->
for (event in createEvents) {
val createRoomContent = event.getClearContent().toModel<RoomCreateContent>()
val predecessorRoomId = createRoomContent?.predecessor?.roomId ?: continue
val predecessorRoomSummary = RoomSummaryEntity.where(realm, predecessorRoomId).findFirst()
?: RoomSummaryEntity(predecessorRoomId)
predecessorRoomSummary.versioningState = VersioningState.UPGRADED_ROOM_JOINED
realm.insertOrUpdate(predecessorRoomSummary)
}
val predecessorRoomSummary = RoomSummaryEntity.where(realm, predecessorRoomId).findFirst()
?: RoomSummaryEntity(predecessorRoomId)
predecessorRoomSummary.versioningState = VersioningState.UPGRADED_ROOM_JOINED
realm.insertOrUpdate(predecessorRoomSummary)
}
}
}

View file

@ -23,11 +23,10 @@ import im.vector.matrix.android.internal.database.mapper.asDomain
import im.vector.matrix.android.internal.database.model.EventEntity
import im.vector.matrix.android.internal.database.query.types
import im.vector.matrix.android.internal.di.SessionDatabase
import im.vector.matrix.android.internal.task.TaskExecutor
import im.vector.matrix.android.internal.task.configureWith
import io.realm.OrderedCollectionChangeSet
import io.realm.RealmConfiguration
import io.realm.RealmResults
import kotlinx.coroutines.launch
import timber.log.Timber
import javax.inject.Inject
@ -36,8 +35,7 @@ import javax.inject.Inject
* As it will actually delete the content, it should be called last in the list of listener.
*/
internal class EventsPruner @Inject constructor(@SessionDatabase realmConfiguration: RealmConfiguration,
private val pruneEventTask: PruneEventTask,
private val taskExecutor: TaskExecutor) :
private val pruneEventTask: PruneEventTask) :
RealmLiveEntityObserver<EventEntity>(realmConfiguration) {
override val query = Monarchy.Query<EventEntity> { EventEntity.types(it, listOf(EventType.REDACTION)) }
@ -50,7 +48,9 @@ internal class EventsPruner @Inject constructor(@SessionDatabase realmConfigurat
.mapNotNull { results[it]?.asDomain() }
.toList()
val params = PruneEventTask.Params(insertedDomains)
pruneEventTask.configureWith(params).executeBy(taskExecutor)
observerScope.launch {
val params = PruneEventTask.Params(insertedDomains)
pruneEventTask.execute(params)
}
}
}

View file

@ -23,6 +23,7 @@ import im.vector.matrix.android.api.session.events.model.toModel
import im.vector.matrix.android.api.session.room.model.VersioningState
import im.vector.matrix.android.api.session.room.model.tombstone.RoomTombstoneContent
import im.vector.matrix.android.internal.database.RealmLiveEntityObserver
import im.vector.matrix.android.internal.database.awaitTransaction
import im.vector.matrix.android.internal.database.mapper.asDomain
import im.vector.matrix.android.internal.database.model.EventEntity
import im.vector.matrix.android.internal.database.model.RoomSummaryEntity
@ -30,9 +31,9 @@ import im.vector.matrix.android.internal.database.query.types
import im.vector.matrix.android.internal.database.query.where
import im.vector.matrix.android.internal.di.SessionDatabase
import io.realm.OrderedCollectionChangeSet
import io.realm.Realm
import io.realm.RealmConfiguration
import io.realm.RealmResults
import kotlinx.coroutines.launch
import javax.inject.Inject
internal class RoomTombstoneEventLiveObserver @Inject constructor(@SessionDatabase
@ -51,24 +52,24 @@ internal class RoomTombstoneEventLiveObserver @Inject constructor(@SessionDataba
}
.toList()
.also {
handleRoomTombstoneEvents(it)
observerScope.launch {
handleRoomTombstoneEvents(it)
}
}
}
private fun handleRoomTombstoneEvents(tombstoneEvents: List<Event>) = Realm.getInstance(realmConfiguration).use {
it.executeTransactionAsync { realm ->
for (event in tombstoneEvents) {
if (event.roomId == null) continue
val createRoomContent = event.getClearContent().toModel<RoomTombstoneContent>()
if (createRoomContent?.replacementRoom == null) continue
private suspend fun handleRoomTombstoneEvents(tombstoneEvents: List<Event>) = awaitTransaction(realmConfiguration) { realm ->
for (event in tombstoneEvents) {
if (event.roomId == null) continue
val createRoomContent = event.getClearContent().toModel<RoomTombstoneContent>()
if (createRoomContent?.replacementRoom == null) continue
val predecessorRoomSummary = RoomSummaryEntity.where(realm, event.roomId).findFirst()
?: RoomSummaryEntity(event.roomId)
if (predecessorRoomSummary.versioningState == VersioningState.NONE) {
predecessorRoomSummary.versioningState = VersioningState.UPGRADED_ROOM_NOT_JOINED
}
realm.insertOrUpdate(predecessorRoomSummary)
val predecessorRoomSummary = RoomSummaryEntity.where(realm, event.roomId).findFirst()
?: RoomSummaryEntity(event.roomId)
if (predecessorRoomSummary.versioningState == VersioningState.NONE) {
predecessorRoomSummary.versioningState = VersioningState.UPGRADED_ROOM_NOT_JOINED
}
realm.insertOrUpdate(predecessorRoomSummary)
}
}
}

View file

@ -95,7 +95,6 @@ internal class UserAccountDataSyncHandler @Inject constructor(private val monarc
}
}
private fun handlePushRules(realm: Realm, userAccountDataPushRules: UserAccountDataPushRules) {
val pushRules = userAccountDataPushRules.content
realm.where(PushRulesEntity::class.java)

View file

@ -20,7 +20,6 @@ import androidx.lifecycle.LiveData
import androidx.lifecycle.MutableLiveData
import com.squareup.moshi.JsonEncodingException
import im.vector.matrix.android.api.failure.Failure
import im.vector.matrix.android.api.failure.MatrixError
import im.vector.matrix.android.api.failure.isTokenError
import im.vector.matrix.android.api.session.sync.SyncState
import im.vector.matrix.android.internal.network.NetworkConnectivityChecker