Sync: exposes ShareFlow<SyncResponse> from the SyncThread

This commit is contained in:
ganfra 2021-08-23 16:46:37 +02:00
parent ebe1e28689
commit a968a848b0
4 changed files with 31 additions and 9 deletions

View file

@ -18,6 +18,7 @@ package org.matrix.android.sdk.api.session
import androidx.annotation.MainThread import androidx.annotation.MainThread
import androidx.lifecycle.LiveData import androidx.lifecycle.LiveData
import kotlinx.coroutines.flow.SharedFlow
import okhttp3.OkHttpClient import okhttp3.OkHttpClient
import org.matrix.android.sdk.api.auth.data.SessionParams import org.matrix.android.sdk.api.auth.data.SessionParams
import org.matrix.android.sdk.api.failure.GlobalError import org.matrix.android.sdk.api.failure.GlobalError
@ -57,6 +58,7 @@ import org.matrix.android.sdk.api.session.thirdparty.ThirdPartyService
import org.matrix.android.sdk.api.session.typing.TypingUsersTracker import org.matrix.android.sdk.api.session.typing.TypingUsersTracker
import org.matrix.android.sdk.api.session.user.UserService import org.matrix.android.sdk.api.session.user.UserService
import org.matrix.android.sdk.api.session.widgets.WidgetService import org.matrix.android.sdk.api.session.widgets.WidgetService
import org.matrix.android.sdk.api.session.sync.model.SyncResponse
/** /**
* This interface defines interactions with a session. * This interface defines interactions with a session.
@ -143,6 +145,11 @@ interface Session :
*/ */
fun getSyncState(): SyncState fun getSyncState(): SyncState
/**
* This method returns a flow of SyncResponse. New value will be pushed through the sync thread.
*/
fun syncFlow(): SharedFlow<SyncResponse>
/** /**
* This methods return true if an initial sync has been processed * This methods return true if an initial sync has been processed
*/ */

View file

@ -223,6 +223,8 @@ internal class DefaultSession @Inject constructor(
override fun getSyncStateLive() = getSyncThread().liveState() override fun getSyncStateLive() = getSyncThread().liveState()
override fun syncFlow() = getSyncThread().syncFlow()
override fun getSyncState() = getSyncThread().currentState() override fun getSyncState() = getSyncThread().currentState()
override fun hasAlreadySynced(): Boolean { override fun hasAlreadySynced(): Boolean {

View file

@ -41,7 +41,7 @@ import java.io.File
import java.net.SocketTimeoutException import java.net.SocketTimeoutException
import javax.inject.Inject import javax.inject.Inject
internal interface SyncTask : Task<SyncTask.Params, Unit> { internal interface SyncTask : Task<SyncTask.Params, SyncResponse> {
data class Params( data class Params(
val timeout: Long, val timeout: Long,
@ -69,13 +69,13 @@ internal class DefaultSyncTask @Inject constructor(
private val workingDir = File(fileDirectory, "is") private val workingDir = File(fileDirectory, "is")
private val initialSyncStatusRepository: InitialSyncStatusRepository = FileInitialSyncStatusRepository(workingDir) private val initialSyncStatusRepository: InitialSyncStatusRepository = FileInitialSyncStatusRepository(workingDir)
override suspend fun execute(params: SyncTask.Params) { override suspend fun execute(params: SyncTask.Params) : SyncResponse {
syncTaskSequencer.post { return syncTaskSequencer.post {
doSync(params) doSync(params)
} }
} }
private suspend fun doSync(params: SyncTask.Params) { private suspend fun doSync(params: SyncTask.Params): SyncResponse {
Timber.v("Sync task started on Thread: ${Thread.currentThread().name}") Timber.v("Sync task started on Thread: ${Thread.currentThread().name}")
val requestParams = HashMap<String, String>() val requestParams = HashMap<String, String>()
@ -100,6 +100,7 @@ internal class DefaultSyncTask @Inject constructor(
val readTimeOut = (params.timeout + TIMEOUT_MARGIN).coerceAtLeast(TimeOutInterceptor.DEFAULT_LONG_TIMEOUT) val readTimeOut = (params.timeout + TIMEOUT_MARGIN).coerceAtLeast(TimeOutInterceptor.DEFAULT_LONG_TIMEOUT)
var syncResponseToReturn: SyncResponse? = null
if (isInitialSync) { if (isInitialSync) {
Timber.d("INIT_SYNC with filter: ${requestParams["filter"]}") Timber.d("INIT_SYNC with filter: ${requestParams["filter"]}")
val initSyncStrategy = initialSyncStrategy val initSyncStrategy = initialSyncStrategy
@ -108,7 +109,7 @@ internal class DefaultSyncTask @Inject constructor(
roomSyncEphemeralTemporaryStore.reset() roomSyncEphemeralTemporaryStore.reset()
workingDir.mkdirs() workingDir.mkdirs()
val file = downloadInitSyncResponse(requestParams) val file = downloadInitSyncResponse(requestParams)
reportSubtask(initialSyncProgressService, InitSyncStep.ImportingAccount, 1, 0.7F) { syncResponseToReturn = reportSubtask(initialSyncProgressService, InitSyncStep.ImportingAccount, 1, 0.7F) {
handleSyncFile(file, initSyncStrategy) handleSyncFile(file, initSyncStrategy)
} }
// Delete all files // Delete all files
@ -122,10 +123,10 @@ internal class DefaultSyncTask @Inject constructor(
) )
} }
} }
logDuration("INIT_SYNC Database insertion") { logDuration("INIT_SYNC Database insertion") {
syncResponseHandler.handleResponse(syncResponse, token, initialSyncProgressService) syncResponseHandler.handleResponse(syncResponse, token, initialSyncProgressService)
} }
syncResponseToReturn = syncResponse
} }
} }
initialSyncProgressService.endAll() initialSyncProgressService.endAll()
@ -137,8 +138,11 @@ internal class DefaultSyncTask @Inject constructor(
) )
} }
syncResponseHandler.handleResponse(syncResponse, token, null) syncResponseHandler.handleResponse(syncResponse, token, null)
syncResponseToReturn = syncResponse
} }
Timber.v("Sync task finished on Thread: ${Thread.currentThread().name}") Timber.v("Sync task finished on Thread: ${Thread.currentThread().name}")
// Should throw if null as it's a mandatory value.
return syncResponseToReturn!!
} }
private suspend fun downloadInitSyncResponse(requestParams: Map<String, String>): File { private suspend fun downloadInitSyncResponse(requestParams: Map<String, String>): File {
@ -195,8 +199,8 @@ internal class DefaultSyncTask @Inject constructor(
} }
} }
private suspend fun handleSyncFile(workingFile: File, initSyncStrategy: InitialSyncStrategy.Optimized) { private suspend fun handleSyncFile(workingFile: File, initSyncStrategy: InitialSyncStrategy.Optimized): SyncResponse {
logDuration("INIT_SYNC handleSyncFile()") { return logDuration("INIT_SYNC handleSyncFile()") {
val syncResponse = logDuration("INIT_SYNC Read file and parse") { val syncResponse = logDuration("INIT_SYNC Read file and parse") {
syncResponseParser.parse(initSyncStrategy, workingFile) syncResponseParser.parse(initSyncStrategy, workingFile)
} }
@ -210,6 +214,7 @@ internal class DefaultSyncTask @Inject constructor(
syncResponseHandler.handleResponse(syncResponse, null, initialSyncProgressService) syncResponseHandler.handleResponse(syncResponse, null, initialSyncProgressService)
} }
initialSyncStatusRepository.setStep(InitialSyncStatus.STEP_SUCCESS) initialSyncStatusRepository.setStep(InitialSyncStatus.STEP_SUCCESS)
syncResponse
} }
} }

View file

@ -34,11 +34,14 @@ import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancelChildren import kotlinx.coroutines.cancelChildren
import kotlinx.coroutines.delay import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.SharedFlow
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking import kotlinx.coroutines.runBlocking
import org.matrix.android.sdk.api.session.call.MxCall import org.matrix.android.sdk.api.session.call.MxCall
import org.matrix.android.sdk.internal.session.call.ActiveCallHandler import org.matrix.android.sdk.internal.session.call.ActiveCallHandler
import org.matrix.android.sdk.internal.session.sync.SyncPresence import org.matrix.android.sdk.internal.session.sync.SyncPresence
import org.matrix.android.sdk.api.session.sync.model.SyncResponse
import timber.log.Timber import timber.log.Timber
import java.net.SocketTimeoutException import java.net.SocketTimeoutException
import java.util.Timer import java.util.Timer
@ -72,6 +75,8 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
} }
} }
private val _syncFlow = MutableSharedFlow<SyncResponse>()
init { init {
updateStateTo(SyncState.Idle) updateStateTo(SyncState.Idle)
} }
@ -115,6 +120,8 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
return liveState return liveState
} }
fun syncFlow(): SharedFlow<SyncResponse> = _syncFlow
override fun onConnectivityChanged() { override fun onConnectivityChanged() {
retryNoNetworkTask?.cancel() retryNoNetworkTask?.cancel()
synchronized(lock) { synchronized(lock) {
@ -192,7 +199,8 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
private suspend fun doSync(params: SyncTask.Params) { private suspend fun doSync(params: SyncTask.Params) {
try { try {
syncTask.execute(params) val syncResponse = syncTask.execute(params)
_syncFlow.emit(syncResponse)
} catch (failure: Throwable) { } catch (failure: Throwable) {
if (failure is Failure.NetworkConnection) { if (failure is Failure.NetworkConnection) {
canReachServer = false canReachServer = false