Send: clean code and add more logs

This commit is contained in:
ganfra 2021-03-05 21:03:52 +01:00
parent 9174632cfc
commit a0df20fcd2
3 changed files with 36 additions and 27 deletions

View file

@ -34,12 +34,14 @@ import org.matrix.android.sdk.internal.task.TaskExecutor
import org.matrix.android.sdk.internal.util.toCancelable import org.matrix.android.sdk.internal.util.toCancelable
import timber.log.Timber import timber.log.Timber
import java.io.IOException import java.io.IOException
import java.util.Queue
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
import javax.inject.Inject import javax.inject.Inject
import kotlin.coroutines.cancellation.CancellationException import kotlin.coroutines.cancellation.CancellationException
private const val RETRY_WAIT_TIME_MS = 10_000L private const val RETRY_WAIT_TIME_MS = 10_000L
private const val MAX_RETRY_COUNT = 3
/** /**
* This class is responsible for sending events in order in each room. It uses the QueuedTask.queueIdentifier to execute tasks sequentially. * This class is responsible for sending events in order in each room. It uses the QueuedTask.queueIdentifier to execute tasks sequentially.
@ -58,6 +60,8 @@ internal class EventSenderProcessorCoroutine @Inject constructor(
private val memento: QueueMemento, private val memento: QueueMemento,
) : EventSenderProcessor { ) : EventSenderProcessor {
private val waitForNetworkSequencer = SemaphoreCoroutineSequencer()
/** /**
* sequencers use QueuedTask.queueIdentifier as key * sequencers use QueuedTask.queueIdentifier as key
*/ */
@ -109,6 +113,7 @@ internal class EventSenderProcessorCoroutine @Inject constructor(
val sequencer = sequencers.getOrPut(task.queueIdentifier) { val sequencer = sequencers.getOrPut(task.queueIdentifier) {
SemaphoreCoroutineSequencer() SemaphoreCoroutineSequencer()
} }
Timber.v("## post $task")
return taskExecutor.executorScope return taskExecutor.executorScope
.launchWith(sequencer) { .launchWith(sequencer) {
executeTask(task) executeTask(task)
@ -119,7 +124,7 @@ internal class EventSenderProcessorCoroutine @Inject constructor(
} }
override fun cancel(eventId: String, roomId: String) { override fun cancel(eventId: String, roomId: String) {
// eventId is the taskIdentifier // eventId is most likely the taskIdentifier
cancelableBag[eventId]?.cancel() cancelableBag[eventId]?.cancel()
} }
@ -132,36 +137,26 @@ internal class EventSenderProcessorCoroutine @Inject constructor(
private suspend fun executeTask(task: QueuedTask) { private suspend fun executeTask(task: QueuedTask) {
try { try {
if (task.isCancelled()) { if (task.isCancelled()) {
Timber.v("## task has been cancelled") Timber.v("## $task has been cancelled, try next task")
return return
} }
waitForNetwork() task.waitForNetwork()
Timber.v("## execute task with id: ${task.queueIdentifier}")
task.execute() task.execute()
} catch (exception: Throwable) { } catch (exception: Throwable) {
when { when {
exception is IOException || exception is Failure.NetworkConnection -> { exception is IOException || exception is Failure.NetworkConnection -> {
canReachServer.set(false) canReachServer.set(false)
task.retryCount++ task.markAsFailedOrRetry(exception, 0)
if (task.retryCount >= 3) task.onTaskFailed()
Timber.v("## retryable error for $task reason: ${exception.localizedMessage}")
// Retry
executeTask(task)
} }
(exception is Failure.ServerError && exception.error.code == MatrixError.M_LIMIT_EXCEEDED) -> { (exception is Failure.ServerError && exception.error.code == MatrixError.M_LIMIT_EXCEEDED) -> {
task.retryCount++ val delay = exception.error.retryAfterMillis?.plus(100) ?: 3_000
if (task.retryCount >= 3) task.onTaskFailed() task.markAsFailedOrRetry(exception, delay)
Timber.v("## retryable error for $task reason: ${exception.localizedMessage}")
// Wait some time
delay(exception.error.retryAfterMillis?.plus(100) ?: 3_000)
// and retry
executeTask(task)
} }
exception is CancellationException -> { exception is CancellationException -> {
Timber.v("## task has been cancelled") Timber.v("## $task has been cancelled, try next task")
} }
else -> { else -> {
Timber.v("## Un-Retryable error, try next task") Timber.v("## un-retryable error for $task, try next task")
// this task is in error, check next one? // this task is in error, check next one?
task.onTaskFailed() task.onTaskFailed()
} }
@ -170,6 +165,18 @@ internal class EventSenderProcessorCoroutine @Inject constructor(
markAsFinished(task) markAsFinished(task)
} }
private suspend fun QueuedTask.markAsFailedOrRetry(failure: Throwable, retryDelay: Long){
if (retryCount.incrementAndGet() >= MAX_RETRY_COUNT) {
onTaskFailed()
} else {
Timber.v("## retryable error for $this reason: ${failure.localizedMessage}")
// Wait if necessary
delay(retryDelay)
// And then retry
executeTask(this)
}
}
private fun markAsManaged(task: QueuedTask) { private fun markAsManaged(task: QueuedTask) {
memento.track(task) memento.track(task)
} }
@ -181,9 +188,9 @@ internal class EventSenderProcessorCoroutine @Inject constructor(
private val canReachServer = AtomicBoolean(true) private val canReachServer = AtomicBoolean(true)
private suspend fun waitForNetwork() { private suspend fun QueuedTask.waitForNetwork() = waitForNetworkSequencer.post {
while (!canReachServer.get()) { while (!canReachServer.get()) {
Timber.v("## Cannot reach server, wait ts:${System.currentTimeMillis()}") Timber.v("## $this cannot reach server wait ts:${System.currentTimeMillis()}")
delay(RETRY_WAIT_TIME_MS) delay(RETRY_WAIT_TIME_MS)
withContext(Dispatchers.IO) { withContext(Dispatchers.IO) {
val hostAvailable = HomeServerAvailabilityChecker(sessionParams).check() val hostAvailable = HomeServerAvailabilityChecker(sessionParams).check()

View file

@ -163,7 +163,7 @@ internal class EventSenderProcessorThread @Inject constructor(
// so network is available // so network is available
runBlocking { runBlocking {
retryLoop@ while (task.retryCount < 3) { retryLoop@ while (task.retryCount.get() < 3) {
try { try {
// SendPerformanceProfiler.startStage(task.event.eventId!!, SendPerformanceProfiler.Stages.SEND_WORKER) // SendPerformanceProfiler.startStage(task.event.eventId!!, SendPerformanceProfiler.Stages.SEND_WORKER)
Timber.v("## SendThread retryLoop for $task retryCount ${task.retryCount}") Timber.v("## SendThread retryLoop for $task retryCount ${task.retryCount}")
@ -175,8 +175,7 @@ internal class EventSenderProcessorThread @Inject constructor(
when { when {
exception is IOException || exception is Failure.NetworkConnection -> { exception is IOException || exception is Failure.NetworkConnection -> {
canReachServer = false canReachServer = false
task.retryCount++ if (task.retryCount.getAndIncrement() >= 3) task.onTaskFailed()
if (task.retryCount >= 3) task.onTaskFailed()
while (!canReachServer) { while (!canReachServer) {
Timber.v("## SendThread retryLoop cannot reach server, wait ts:${System.currentTimeMillis()}") Timber.v("## SendThread retryLoop cannot reach server, wait ts:${System.currentTimeMillis()}")
// schedule to retry // schedule to retry
@ -184,8 +183,7 @@ internal class EventSenderProcessorThread @Inject constructor(
} }
} }
(exception is Failure.ServerError && exception.error.code == MatrixError.M_LIMIT_EXCEEDED) -> { (exception is Failure.ServerError && exception.error.code == MatrixError.M_LIMIT_EXCEEDED) -> {
task.retryCount++ if (task.retryCount.getAndIncrement() >= 3) task.onTaskFailed()
if (task.retryCount >= 3) task.onTaskFailed()
Timber.v("## SendThread retryLoop retryable error for $task reason: ${exception.localizedMessage}") Timber.v("## SendThread retryLoop retryable error for $task reason: ${exception.localizedMessage}")
// wait a bit // wait a bit
// Todo if its a quota exception can we get timout? // Todo if its a quota exception can we get timout?

View file

@ -17,6 +17,8 @@
package org.matrix.android.sdk.internal.session.room.send.queue package org.matrix.android.sdk.internal.session.room.send.queue
import org.matrix.android.sdk.api.util.Cancelable import org.matrix.android.sdk.api.util.Cancelable
import timber.log.Timber
import java.util.concurrent.atomic.AtomicInteger
/** /**
* @param queueIdentifier String value to identify a unique Queue * @param queueIdentifier String value to identify a unique Queue
@ -27,15 +29,17 @@ internal abstract class QueuedTask(
val taskIdentifier: String val taskIdentifier: String
) : Cancelable { ) : Cancelable {
override fun toString() = "queueIdentifier: $queueIdentifier, taskIdentifier: ${taskIdentifier})" override fun toString() = "${javaClass.simpleName} queueIdentifier: $queueIdentifier, taskIdentifier: ${taskIdentifier})"
var retryCount = 0 var retryCount = AtomicInteger(0)
private var hasBeenCancelled: Boolean = false private var hasBeenCancelled: Boolean = false
suspend fun execute() { suspend fun execute() {
if (!isCancelled()) { if (!isCancelled()) {
Timber.v("Execute: $this start")
doExecute() doExecute()
Timber.v("Execute: $this finish")
} }
} }