diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/SyncResponseHandler.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/SyncResponseHandler.kt index eab7b91e4d..d21b1de3b0 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/SyncResponseHandler.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/SyncResponseHandler.kt @@ -18,7 +18,6 @@ package im.vector.matrix.android.internal.session.sync import arrow.core.Try import im.vector.matrix.android.internal.crypto.CryptoManager -import im.vector.matrix.android.internal.session.SessionScope import im.vector.matrix.android.internal.session.sync.model.SyncResponse import timber.log.Timber import javax.inject.Inject diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/SyncTask.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/SyncTask.kt index 78f949de33..e29f2eeac5 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/SyncTask.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/SyncTask.kt @@ -29,9 +29,9 @@ import im.vector.matrix.android.internal.session.sync.model.SyncResponse import im.vector.matrix.android.internal.task.Task import javax.inject.Inject -internal interface SyncTask : Task { +internal interface SyncTask : Task { - data class Params(val token: String?, var timeout: Long = 30_000L) + data class Params(var timeout: Long = 30_000L) } @@ -39,15 +39,17 @@ internal class DefaultSyncTask @Inject constructor(private val syncAPI: SyncAPI, private val credentials: Credentials, private val filterRepository: FilterRepository, private val syncResponseHandler: SyncResponseHandler, - private val sessionParamsStore: SessionParamsStore + private val sessionParamsStore: SessionParamsStore, + private val syncTokenStore: SyncTokenStore ) : SyncTask { - override suspend fun execute(params: SyncTask.Params): Try { + override suspend fun execute(params: SyncTask.Params): Try { val requestParams = HashMap() var timeout = 0L - if (params.token != null) { - requestParams["since"] = params.token + val token = syncTokenStore.getLastToken() + if (token != null) { + requestParams["since"] = token timeout = params.timeout } requestParams["timeout"] = timeout.toString() @@ -65,9 +67,9 @@ internal class DefaultSyncTask @Inject constructor(private val syncAPI: SyncAPI, // Transmit the throwable throwable.failure() }.flatMap { syncResponse -> - syncResponseHandler.handleResponse(syncResponse, params.token, false) + syncResponseHandler.handleResponse(syncResponse, token, false) + }.map { + syncTokenStore.saveToken(it.nextBatch) } } - - } \ No newline at end of file diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncService.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncService.kt index 7aa38b472d..3fe850d675 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncService.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncService.kt @@ -26,8 +26,6 @@ import im.vector.matrix.android.api.failure.MatrixError import im.vector.matrix.android.api.util.Cancelable import im.vector.matrix.android.internal.network.NetworkConnectivityChecker import im.vector.matrix.android.internal.session.sync.SyncTask -import im.vector.matrix.android.internal.session.sync.SyncTokenStore -import im.vector.matrix.android.internal.session.sync.model.SyncResponse import im.vector.matrix.android.internal.task.TaskExecutor import im.vector.matrix.android.internal.task.TaskThread import im.vector.matrix.android.internal.task.configureWith @@ -35,9 +33,6 @@ import timber.log.Timber import java.net.SocketTimeoutException import java.util.* -private const val DEFAULT_LONG_POOL_TIMEOUT = 10_000L -private const val BACKGROUND_LONG_POOL_TIMEOUT = 0L - /** * Can execute periodic sync task. * An IntentService is used in conjunction with the AlarmManager and a Broadcast Receiver @@ -49,7 +44,6 @@ open class SyncService : Service() { private var mIsSelfDestroyed: Boolean = false private var cancelableTask: Cancelable? = null - private lateinit var syncTokenStore: SyncTokenStore private lateinit var syncTask: SyncTask private lateinit var networkConnectivityChecker: NetworkConnectivityChecker private lateinit var taskExecutor: TaskExecutor @@ -57,18 +51,12 @@ open class SyncService : Service() { var timer = Timer() - var nextBatchDelay = 0L - var timeout = 10_000L - override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int { Timber.i("onStartCommand ${intent}") - nextBatchDelay = 60_000L - timeout = 0 intent?.let { val userId = it.getStringExtra(EXTRA_USER_ID) val sessionComponent = Matrix.getInstance(applicationContext).sessionManager.getSessionComponent(userId) ?: return@let - syncTokenStore = sessionComponent.syncTokenStore() syncTask = sessionComponent.syncTask() networkConnectivityChecker = sessionComponent.networkConnectivityChecker() taskExecutor = sessionComponent.taskExecutor() @@ -105,7 +93,6 @@ open class SyncService : Service() { } fun doSync(once: Boolean = false) { - var nextBatch = syncTokenStore.getLastToken() if (!networkConnectivityChecker.isConnected()) { Timber.v("Sync is Paused. Waiting...") //TODO Retry in ? @@ -113,24 +100,22 @@ open class SyncService : Service() { override fun run() { doSync() } - }, 5_000L) + }, NO_NETWORK_DELAY) } else { - Timber.v("Execute sync request with token $nextBatch and timeout $timeout") - val params = SyncTask.Params(nextBatch, timeout) + Timber.v("Execute sync request with timeout 0") + val params = SyncTask.Params(TIME_OUT) cancelableTask = syncTask.configureWith(params) .callbackOn(TaskThread.CALLER) .executeOn(TaskThread.CALLER) - .dispatchTo(object : MatrixCallback { - override fun onSuccess(data: SyncResponse) { + .dispatchTo(object : MatrixCallback { + override fun onSuccess(data: Unit) { cancelableTask = null - nextBatch = data.nextBatch - syncTokenStore.saveToken(nextBatch) if (!once) { timer.schedule(object : TimerTask() { override fun run() { doSync() } - }, nextBatchDelay) + }, NEXT_BATCH_DELAY) } else { //stop stopMe() @@ -180,6 +165,10 @@ open class SyncService : Service() { companion object { const val EXTRA_USER_ID = "EXTRA_USER_ID" + + const val TIME_OUT = 0L + const val NEXT_BATCH_DELAY = 60_000L + const val NO_NETWORK_DELAY = 5_000L } } \ No newline at end of file diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncThread.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncThread.kt index ba4c009aa9..56d590d7ba 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncThread.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncThread.kt @@ -25,10 +25,7 @@ import im.vector.matrix.android.api.failure.MatrixError import im.vector.matrix.android.api.session.sync.SyncState import im.vector.matrix.android.api.util.Cancelable import im.vector.matrix.android.internal.network.NetworkConnectivityChecker -import im.vector.matrix.android.internal.session.SessionScope import im.vector.matrix.android.internal.session.sync.SyncTask -import im.vector.matrix.android.internal.session.sync.SyncTokenStore -import im.vector.matrix.android.internal.session.sync.model.SyncResponse import im.vector.matrix.android.internal.task.TaskExecutor import im.vector.matrix.android.internal.task.TaskThread import im.vector.matrix.android.internal.task.configureWith @@ -44,7 +41,6 @@ private const val DEFAULT_LONG_POOL_DELAY = 0L internal class SyncThread @Inject constructor(private val syncTask: SyncTask, private val networkConnectivityChecker: NetworkConnectivityChecker, - private val syncTokenStore: SyncTokenStore, private val backgroundDetectionObserver: BackgroundDetectionObserver, private val taskExecutor: TaskExecutor ) : Thread(), NetworkConnectivityChecker.Listener, BackgroundDetectionObserver.Listener { @@ -52,7 +48,6 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask, private var state: SyncState = SyncState.IDLE private var liveState = MutableLiveData() private val lock = Object() - private var nextBatch = syncTokenStore.getLastToken() private var cancelableTask: Cancelable? = null init { @@ -62,8 +57,6 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask, fun restart() = synchronized(lock) { if (state is SyncState.PAUSED) { Timber.v("Resume sync...") - // Retrieve the last token, it may have been deleted in case of a clear cache - nextBatch = syncTokenStore.getLastToken() updateStateTo(SyncState.RUNNING(catchingUp = true)) lock.notify() } @@ -100,22 +93,20 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask, lock.wait() } } else { - Timber.v("Execute sync request with token $nextBatch and timeout $DEFAULT_LONG_POOL_TIMEOUT") + Timber.v("Execute sync request with timeout $DEFAULT_LONG_POOL_TIMEOUT") val latch = CountDownLatch(1) - val params = SyncTask.Params(nextBatch, DEFAULT_LONG_POOL_TIMEOUT) + val params = SyncTask.Params(DEFAULT_LONG_POOL_TIMEOUT) cancelableTask = syncTask.configureWith(params) .callbackOn(TaskThread.CALLER) .executeOn(TaskThread.CALLER) - .dispatchTo(object : MatrixCallback { - override fun onSuccess(data: SyncResponse) { - nextBatch = data.nextBatch - syncTokenStore.saveToken(nextBatch) + .dispatchTo(object : MatrixCallback { + override fun onSuccess(data: Unit) { latch.countDown() } override fun onFailure(failure: Throwable) { if (failure is Failure.NetworkConnection - && failure.cause is SocketTimeoutException) { + && failure.cause is SocketTimeoutException) { // Timeout are not critical Timber.v("Timeout") } else { @@ -123,13 +114,13 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask, } if (failure !is Failure.NetworkConnection - || failure.cause is JsonEncodingException) { + || failure.cause is JsonEncodingException) { // Wait 10s before retrying sleep(RETRY_WAIT_TIME_MS) } if (failure is Failure.ServerError - && (failure.error.code == MatrixError.UNKNOWN_TOKEN || failure.error.code == MatrixError.MISSING_TOKEN)) { + && (failure.error.code == MatrixError.UNKNOWN_TOKEN || failure.error.code == MatrixError.MISSING_TOKEN)) { // No token or invalid token, stop the thread updateStateTo(SyncState.KILLING) } @@ -140,7 +131,7 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask, }) .executeBy(taskExecutor) - latch.await() + latch.await() if (state is SyncState.RUNNING) { updateStateTo(SyncState.RUNNING(catchingUp = false)) } diff --git a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncWorker.kt b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncWorker.kt index 6b2af8fcfd..8a741eb04f 100644 --- a/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncWorker.kt +++ b/matrix-sdk-android/src/main/java/im/vector/matrix/android/internal/session/sync/job/SyncWorker.kt @@ -18,18 +18,16 @@ package im.vector.matrix.android.internal.session.sync.job import android.content.Context import androidx.work.* import com.squareup.moshi.JsonClass -import im.vector.matrix.android.api.failure.Failure -import im.vector.matrix.android.api.failure.MatrixError -import im.vector.matrix.android.internal.auth.SessionParamsStore -import im.vector.matrix.android.internal.network.executeRequest -import im.vector.matrix.android.internal.session.filter.FilterRepository -import im.vector.matrix.android.internal.session.sync.SyncAPI -import im.vector.matrix.android.internal.session.sync.SyncResponseHandler -import im.vector.matrix.android.internal.session.sync.SyncTokenStore -import im.vector.matrix.android.internal.session.sync.model.SyncResponse +import im.vector.matrix.android.api.MatrixCallback +import im.vector.matrix.android.api.util.Cancelable +import im.vector.matrix.android.internal.session.sync.SyncTask +import im.vector.matrix.android.internal.task.TaskExecutor +import im.vector.matrix.android.internal.task.TaskThread +import im.vector.matrix.android.internal.task.configureWith import im.vector.matrix.android.internal.worker.WorkerParamsFactory import im.vector.matrix.android.internal.worker.getSessionComponent import timber.log.Timber +import java.util.concurrent.CountDownLatch import java.util.concurrent.TimeUnit import javax.inject.Inject @@ -47,12 +45,10 @@ internal class SyncWorker(context: Context, val automaticallyRetry: Boolean = false ) - @Inject lateinit var syncAPI: SyncAPI - @Inject lateinit var filterRepository: FilterRepository - @Inject lateinit var syncResponseHandler: SyncResponseHandler - @Inject lateinit var sessionParamsStore: SessionParamsStore - @Inject lateinit var syncTokenStore: SyncTokenStore - + @Inject + lateinit var syncTask: SyncTask + @Inject + lateinit var taskExecutor: TaskExecutor override suspend fun doWork(): Result { Timber.i("Sync work starting") @@ -60,37 +56,34 @@ internal class SyncWorker(context: Context, val sessionComponent = getSessionComponent(params.userId) ?: return Result.success() sessionComponent.inject(this) - val requestParams = HashMap() - requestParams["timeout"] = params.timeout.toString() - requestParams["filter"] = filterRepository.getFilter() - val token = syncTokenStore.getLastToken()?.also { requestParams["since"] = it } - Timber.i("Sync work last token $token") - return executeRequest { - apiCall = syncAPI.sync(requestParams) - }.fold( - { - if (it is Failure.ServerError - && it.error.code == MatrixError.UNKNOWN_TOKEN) { - sessionParamsStore.delete(params.userId) - Result.failure() - } else { - Timber.i("Sync work failed $it") - Result.retry() + val latch = CountDownLatch(1) + val taskParams = SyncTask.Params(0) + cancelableTask = syncTask.configureWith(taskParams) + .callbackOn(TaskThread.CALLER) + .executeOn(TaskThread.CALLER) + .dispatchTo(object : MatrixCallback { + override fun onSuccess(data: Unit) { + latch.countDown() } - }, - { - Timber.i("Sync work success next batch ${it.nextBatch}") - if (!isStopped) { - syncResponseHandler.handleResponse(it, token, false) - syncTokenStore.saveToken(it.nextBatch) + + override fun onFailure(failure: Throwable) { + Timber.e(failure) + latch.countDown() } - if (params.automaticallyRetry) Result.retry() else Result.success() - } - ) + + }) + .executeBy(taskExecutor) + + latch.await() + return Result.success() } companion object { + + + private var cancelableTask: Cancelable? = null + fun requireBackgroundSync(context: Context, userId: String, serverTimeout: Long = 0) { val data = WorkerParamsFactory.toData(Params(userId, serverTimeout, false)) val workRequest = OneTimeWorkRequestBuilder() @@ -116,6 +109,7 @@ internal class SyncWorker(context: Context, } fun stopAnyBackgroundSync(context: Context) { + cancelableTask?.cancel() WorkManager.getInstance(context).cancelUniqueWork("BG_SYNCP") } }