diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessorCoroutine.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessorCoroutine.kt index 77e156f90a..56fb3b8539 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessorCoroutine.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessorCoroutine.kt @@ -34,12 +34,14 @@ import org.matrix.android.sdk.internal.task.TaskExecutor import org.matrix.android.sdk.internal.util.toCancelable import timber.log.Timber import java.io.IOException +import java.util.Queue import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicBoolean import javax.inject.Inject import kotlin.coroutines.cancellation.CancellationException private const val RETRY_WAIT_TIME_MS = 10_000L +private const val MAX_RETRY_COUNT = 3 /** * This class is responsible for sending events in order in each room. It uses the QueuedTask.queueIdentifier to execute tasks sequentially. @@ -58,6 +60,8 @@ internal class EventSenderProcessorCoroutine @Inject constructor( private val memento: QueueMemento, ) : EventSenderProcessor { + private val waitForNetworkSequencer = SemaphoreCoroutineSequencer() + /** * sequencers use QueuedTask.queueIdentifier as key */ @@ -109,6 +113,7 @@ internal class EventSenderProcessorCoroutine @Inject constructor( val sequencer = sequencers.getOrPut(task.queueIdentifier) { SemaphoreCoroutineSequencer() } + Timber.v("## post $task") return taskExecutor.executorScope .launchWith(sequencer) { executeTask(task) @@ -119,7 +124,7 @@ internal class EventSenderProcessorCoroutine @Inject constructor( } override fun cancel(eventId: String, roomId: String) { - // eventId is the taskIdentifier + // eventId is most likely the taskIdentifier cancelableBag[eventId]?.cancel() } @@ -132,36 +137,26 @@ internal class EventSenderProcessorCoroutine @Inject constructor( private suspend fun executeTask(task: QueuedTask) { try { if (task.isCancelled()) { - Timber.v("## task has been cancelled") + Timber.v("## $task has been cancelled, try next task") return } - waitForNetwork() - Timber.v("## execute task with id: ${task.queueIdentifier}") + task.waitForNetwork() task.execute() } catch (exception: Throwable) { when { exception is IOException || exception is Failure.NetworkConnection -> { canReachServer.set(false) - task.retryCount++ - if (task.retryCount >= 3) task.onTaskFailed() - Timber.v("## retryable error for $task reason: ${exception.localizedMessage}") - // Retry - executeTask(task) + task.markAsFailedOrRetry(exception, 0) } (exception is Failure.ServerError && exception.error.code == MatrixError.M_LIMIT_EXCEEDED) -> { - task.retryCount++ - if (task.retryCount >= 3) task.onTaskFailed() - Timber.v("## retryable error for $task reason: ${exception.localizedMessage}") - // Wait some time - delay(exception.error.retryAfterMillis?.plus(100) ?: 3_000) - // and retry - executeTask(task) + val delay = exception.error.retryAfterMillis?.plus(100) ?: 3_000 + task.markAsFailedOrRetry(exception, delay) } exception is CancellationException -> { - Timber.v("## task has been cancelled") + Timber.v("## $task has been cancelled, try next task") } else -> { - Timber.v("## Un-Retryable error, try next task") + Timber.v("## un-retryable error for $task, try next task") // this task is in error, check next one? task.onTaskFailed() } @@ -170,6 +165,18 @@ internal class EventSenderProcessorCoroutine @Inject constructor( markAsFinished(task) } + private suspend fun QueuedTask.markAsFailedOrRetry(failure: Throwable, retryDelay: Long){ + if (retryCount.incrementAndGet() >= MAX_RETRY_COUNT) { + onTaskFailed() + } else { + Timber.v("## retryable error for $this reason: ${failure.localizedMessage}") + // Wait if necessary + delay(retryDelay) + // And then retry + executeTask(this) + } + } + private fun markAsManaged(task: QueuedTask) { memento.track(task) } @@ -181,9 +188,9 @@ internal class EventSenderProcessorCoroutine @Inject constructor( private val canReachServer = AtomicBoolean(true) - private suspend fun waitForNetwork() { + private suspend fun QueuedTask.waitForNetwork() = waitForNetworkSequencer.post { while (!canReachServer.get()) { - Timber.v("## Cannot reach server, wait ts:${System.currentTimeMillis()}") + Timber.v("## $this cannot reach server wait ts:${System.currentTimeMillis()}") delay(RETRY_WAIT_TIME_MS) withContext(Dispatchers.IO) { val hostAvailable = HomeServerAvailabilityChecker(sessionParams).check() diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessorThread.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessorThread.kt index c3a9eb4a07..2e1acf710c 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessorThread.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessorThread.kt @@ -163,7 +163,7 @@ internal class EventSenderProcessorThread @Inject constructor( // so network is available runBlocking { - retryLoop@ while (task.retryCount < 3) { + retryLoop@ while (task.retryCount.get() < 3) { try { // SendPerformanceProfiler.startStage(task.event.eventId!!, SendPerformanceProfiler.Stages.SEND_WORKER) Timber.v("## SendThread retryLoop for $task retryCount ${task.retryCount}") @@ -175,8 +175,7 @@ internal class EventSenderProcessorThread @Inject constructor( when { exception is IOException || exception is Failure.NetworkConnection -> { canReachServer = false - task.retryCount++ - if (task.retryCount >= 3) task.onTaskFailed() + if (task.retryCount.getAndIncrement() >= 3) task.onTaskFailed() while (!canReachServer) { Timber.v("## SendThread retryLoop cannot reach server, wait ts:${System.currentTimeMillis()}") // schedule to retry @@ -184,8 +183,7 @@ internal class EventSenderProcessorThread @Inject constructor( } } (exception is Failure.ServerError && exception.error.code == MatrixError.M_LIMIT_EXCEEDED) -> { - task.retryCount++ - if (task.retryCount >= 3) task.onTaskFailed() + if (task.retryCount.getAndIncrement() >= 3) task.onTaskFailed() Timber.v("## SendThread retryLoop retryable error for $task reason: ${exception.localizedMessage}") // wait a bit // Todo if its a quota exception can we get timout? diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/QueuedTask.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/QueuedTask.kt index a4eb9e9323..e5302c171c 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/QueuedTask.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/QueuedTask.kt @@ -17,6 +17,8 @@ package org.matrix.android.sdk.internal.session.room.send.queue import org.matrix.android.sdk.api.util.Cancelable +import timber.log.Timber +import java.util.concurrent.atomic.AtomicInteger /** * @param queueIdentifier String value to identify a unique Queue @@ -27,15 +29,17 @@ internal abstract class QueuedTask( val taskIdentifier: String ) : Cancelable { - override fun toString() = "queueIdentifier: $queueIdentifier, taskIdentifier: ${taskIdentifier})" + override fun toString() = "${javaClass.simpleName} queueIdentifier: $queueIdentifier, taskIdentifier: ${taskIdentifier})" - var retryCount = 0 + var retryCount = AtomicInteger(0) private var hasBeenCancelled: Boolean = false suspend fun execute() { if (!isCancelled()) { + Timber.v("Execute: $this start") doExecute() + Timber.v("Execute: $this finish") } }