From a968a848b0cf7ac9c0d740f9a18115c937125bfa Mon Sep 17 00:00:00 2001 From: ganfra Date: Mon, 23 Aug 2021 16:46:37 +0200 Subject: [PATCH] Sync: exposes ShareFlow from the SyncThread --- .../matrix/android/sdk/api/session/Session.kt | 7 +++++++ .../sdk/internal/session/DefaultSession.kt | 2 ++ .../sdk/internal/session/sync/SyncTask.kt | 21 ++++++++++++------- .../internal/session/sync/job/SyncThread.kt | 10 ++++++++- 4 files changed, 31 insertions(+), 9 deletions(-) diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/api/session/Session.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/api/session/Session.kt index 2f981ffbbe..e0b48f1e07 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/api/session/Session.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/api/session/Session.kt @@ -18,6 +18,7 @@ package org.matrix.android.sdk.api.session import androidx.annotation.MainThread import androidx.lifecycle.LiveData +import kotlinx.coroutines.flow.SharedFlow import okhttp3.OkHttpClient import org.matrix.android.sdk.api.auth.data.SessionParams import org.matrix.android.sdk.api.failure.GlobalError @@ -57,6 +58,7 @@ import org.matrix.android.sdk.api.session.thirdparty.ThirdPartyService import org.matrix.android.sdk.api.session.typing.TypingUsersTracker import org.matrix.android.sdk.api.session.user.UserService import org.matrix.android.sdk.api.session.widgets.WidgetService +import org.matrix.android.sdk.api.session.sync.model.SyncResponse /** * This interface defines interactions with a session. @@ -143,6 +145,11 @@ interface Session : */ fun getSyncState(): SyncState + /** + * This method returns a flow of SyncResponse. New value will be pushed through the sync thread. + */ + fun syncFlow(): SharedFlow + /** * This methods return true if an initial sync has been processed */ diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/DefaultSession.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/DefaultSession.kt index c2bd1e24ed..2975cc7ad0 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/DefaultSession.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/DefaultSession.kt @@ -223,6 +223,8 @@ internal class DefaultSession @Inject constructor( override fun getSyncStateLive() = getSyncThread().liveState() + override fun syncFlow() = getSyncThread().syncFlow() + override fun getSyncState() = getSyncThread().currentState() override fun hasAlreadySynced(): Boolean { diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncTask.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncTask.kt index f033fe31d7..5684c2b1d6 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncTask.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/SyncTask.kt @@ -41,7 +41,7 @@ import java.io.File import java.net.SocketTimeoutException import javax.inject.Inject -internal interface SyncTask : Task { +internal interface SyncTask : Task { data class Params( val timeout: Long, @@ -69,13 +69,13 @@ internal class DefaultSyncTask @Inject constructor( private val workingDir = File(fileDirectory, "is") private val initialSyncStatusRepository: InitialSyncStatusRepository = FileInitialSyncStatusRepository(workingDir) - override suspend fun execute(params: SyncTask.Params) { - syncTaskSequencer.post { + override suspend fun execute(params: SyncTask.Params) : SyncResponse { + return syncTaskSequencer.post { doSync(params) } } - private suspend fun doSync(params: SyncTask.Params) { + private suspend fun doSync(params: SyncTask.Params): SyncResponse { Timber.v("Sync task started on Thread: ${Thread.currentThread().name}") val requestParams = HashMap() @@ -100,6 +100,7 @@ internal class DefaultSyncTask @Inject constructor( val readTimeOut = (params.timeout + TIMEOUT_MARGIN).coerceAtLeast(TimeOutInterceptor.DEFAULT_LONG_TIMEOUT) + var syncResponseToReturn: SyncResponse? = null if (isInitialSync) { Timber.d("INIT_SYNC with filter: ${requestParams["filter"]}") val initSyncStrategy = initialSyncStrategy @@ -108,7 +109,7 @@ internal class DefaultSyncTask @Inject constructor( roomSyncEphemeralTemporaryStore.reset() workingDir.mkdirs() val file = downloadInitSyncResponse(requestParams) - reportSubtask(initialSyncProgressService, InitSyncStep.ImportingAccount, 1, 0.7F) { + syncResponseToReturn = reportSubtask(initialSyncProgressService, InitSyncStep.ImportingAccount, 1, 0.7F) { handleSyncFile(file, initSyncStrategy) } // Delete all files @@ -122,10 +123,10 @@ internal class DefaultSyncTask @Inject constructor( ) } } - logDuration("INIT_SYNC Database insertion") { syncResponseHandler.handleResponse(syncResponse, token, initialSyncProgressService) } + syncResponseToReturn = syncResponse } } initialSyncProgressService.endAll() @@ -137,8 +138,11 @@ internal class DefaultSyncTask @Inject constructor( ) } syncResponseHandler.handleResponse(syncResponse, token, null) + syncResponseToReturn = syncResponse } Timber.v("Sync task finished on Thread: ${Thread.currentThread().name}") + // Should throw if null as it's a mandatory value. + return syncResponseToReturn!! } private suspend fun downloadInitSyncResponse(requestParams: Map): File { @@ -195,8 +199,8 @@ internal class DefaultSyncTask @Inject constructor( } } - private suspend fun handleSyncFile(workingFile: File, initSyncStrategy: InitialSyncStrategy.Optimized) { - logDuration("INIT_SYNC handleSyncFile()") { + private suspend fun handleSyncFile(workingFile: File, initSyncStrategy: InitialSyncStrategy.Optimized): SyncResponse { + return logDuration("INIT_SYNC handleSyncFile()") { val syncResponse = logDuration("INIT_SYNC Read file and parse") { syncResponseParser.parse(initSyncStrategy, workingFile) } @@ -210,6 +214,7 @@ internal class DefaultSyncTask @Inject constructor( syncResponseHandler.handleResponse(syncResponse, null, initialSyncProgressService) } initialSyncStatusRepository.setStep(InitialSyncStatus.STEP_SUCCESS) + syncResponse } } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/job/SyncThread.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/job/SyncThread.kt index de8d009892..1b34b625ff 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/job/SyncThread.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/sync/job/SyncThread.kt @@ -34,11 +34,14 @@ import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.cancelChildren import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.SharedFlow import kotlinx.coroutines.launch import kotlinx.coroutines.runBlocking import org.matrix.android.sdk.api.session.call.MxCall import org.matrix.android.sdk.internal.session.call.ActiveCallHandler import org.matrix.android.sdk.internal.session.sync.SyncPresence +import org.matrix.android.sdk.api.session.sync.model.SyncResponse import timber.log.Timber import java.net.SocketTimeoutException import java.util.Timer @@ -72,6 +75,8 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask, } } + private val _syncFlow = MutableSharedFlow() + init { updateStateTo(SyncState.Idle) } @@ -115,6 +120,8 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask, return liveState } + fun syncFlow(): SharedFlow = _syncFlow + override fun onConnectivityChanged() { retryNoNetworkTask?.cancel() synchronized(lock) { @@ -192,7 +199,8 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask, private suspend fun doSync(params: SyncTask.Params) { try { - syncTask.execute(params) + val syncResponse = syncTask.execute(params) + _syncFlow.emit(syncResponse) } catch (failure: Throwable) { if (failure is Failure.NetworkConnection) { canReachServer = false