diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/DatabaseCleaner.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/DatabaseCleaner.kt index 8b4ce6106b..f11ecc5d75 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/DatabaseCleaner.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/DatabaseCleaner.kt @@ -47,7 +47,7 @@ private const val MIN_NUMBER_OF_EVENTS_BY_CHUNK = 300 internal class DatabaseCleaner @Inject constructor(@SessionDatabase private val realmConfiguration: RealmConfiguration, private val taskExecutor: TaskExecutor) : SessionLifecycleObserver { - override fun onStart() { + override fun onSessionStarted() { taskExecutor.executorScope.launch(Dispatchers.Default) { awaitTransaction(realmConfiguration) { realm -> val allRooms = realm.where(RoomEntity::class.java).findAll() diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/RealmLiveEntityObserver.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/RealmLiveEntityObserver.kt index 3e2160e666..2a0cd963b2 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/RealmLiveEntityObserver.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/RealmLiveEntityObserver.kt @@ -46,7 +46,7 @@ internal abstract class RealmLiveEntityObserver(protected val r private val backgroundRealm = AtomicReference() private lateinit var results: AtomicReference> - override fun onStart() { + override fun onSessionStarted() { if (isStarted.compareAndSet(false, true)) { BACKGROUND_HANDLER.post { val realm = Realm.getInstance(realmConfiguration) @@ -58,7 +58,7 @@ internal abstract class RealmLiveEntityObserver(protected val r } } - override fun onStop() { + override fun onSessionStopped() { if (isStarted.compareAndSet(true, false)) { BACKGROUND_HANDLER.post { results.getAndSet(null).removeAllChangeListeners() diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/RealmSessionProvider.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/RealmSessionProvider.kt index 1947cc83e3..f8d5d323a5 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/RealmSessionProvider.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/database/RealmSessionProvider.kt @@ -44,14 +44,14 @@ internal class RealmSessionProvider @Inject constructor(@SessionDatabase private } @MainThread - override fun onStart() { + override fun onSessionStarted() { realmThreadLocal.getOrSet { Realm.getInstance(monarchy.realmConfiguration) } } @MainThread - override fun onStop() { + override fun onSessionStopped() { realmThreadLocal.get()?.close() realmThreadLocal.remove() } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/DefaultSession.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/DefaultSession.kt index 1204a9ccac..45fcc5af2d 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/DefaultSession.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/DefaultSession.kt @@ -65,7 +65,6 @@ import org.matrix.android.sdk.internal.di.UnauthenticatedWithCertificate import org.matrix.android.sdk.internal.di.WorkManagerProvider import org.matrix.android.sdk.internal.network.GlobalErrorHandler import org.matrix.android.sdk.internal.session.identity.DefaultIdentityService -import org.matrix.android.sdk.internal.session.room.send.queue.EventSenderProcessor import org.matrix.android.sdk.internal.session.sync.SyncTokenStore import org.matrix.android.sdk.internal.session.sync.job.SyncThread import org.matrix.android.sdk.internal.session.sync.job.SyncWorker @@ -120,8 +119,7 @@ internal class DefaultSession @Inject constructor( private val thirdPartyService: Lazy, private val callSignalingService: Lazy, @UnauthenticatedWithCertificate - private val unauthenticatedWithCertificateOkHttpClient: Lazy, - private val eventSenderProcessor: EventSenderProcessor + private val unauthenticatedWithCertificateOkHttpClient: Lazy ) : Session, RoomService by roomService.get(), RoomDirectoryService by roomDirectoryService.get(), @@ -158,10 +156,9 @@ internal class DefaultSession @Inject constructor( isOpen = true cryptoService.get().ensureDevice() uiHandler.post { - lifecycleObservers.forEach { it.onStart() } + lifecycleObservers.forEach { it.onSessionStarted() } } globalErrorHandler.listener = this - eventSenderProcessor.start() } override fun requireBackgroundSync() { @@ -200,12 +197,11 @@ internal class DefaultSession @Inject constructor( stopSync() // timelineEventDecryptor.destroy() uiHandler.post { - lifecycleObservers.forEach { it.onStop() } + lifecycleObservers.forEach { it.onSessionStopped() } } cryptoService.get().close() isOpen = false globalErrorHandler.listener = null - eventSenderProcessor.interrupt() } override fun getSyncStateLive() = getSyncThread().liveState() diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/SessionLifecycleObserver.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/SessionLifecycleObserver.kt index d26e9861d0..cb37fbec75 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/SessionLifecycleObserver.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/SessionLifecycleObserver.kt @@ -27,7 +27,7 @@ internal interface SessionLifecycleObserver { Called when the session is opened */ @MainThread - fun onStart() { + fun onSessionStarted() { // noop } @@ -43,7 +43,7 @@ internal interface SessionLifecycleObserver { Called when the session is closed */ @MainThread - fun onStop() { + fun onSessionStopped() { // noop } } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/SessionModule.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/SessionModule.kt index 57c2336331..1b0a2fa027 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/SessionModule.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/SessionModule.kt @@ -83,6 +83,8 @@ import org.matrix.android.sdk.internal.session.permalinks.DefaultPermalinkServic import org.matrix.android.sdk.internal.session.room.EventRelationsAggregationProcessor import org.matrix.android.sdk.internal.session.room.create.RoomCreateEventProcessor import org.matrix.android.sdk.internal.session.room.prune.RedactionEventProcessor +import org.matrix.android.sdk.internal.session.room.send.queue.EventSenderProcessor +import org.matrix.android.sdk.internal.session.room.send.queue.EventSenderProcessorCoroutine import org.matrix.android.sdk.internal.session.room.tombstone.RoomTombstoneEventProcessor import org.matrix.android.sdk.internal.session.securestorage.DefaultSecureStorageService import org.matrix.android.sdk.internal.session.typing.DefaultTypingUsersTracker @@ -339,6 +341,10 @@ internal abstract class SessionModule { @IntoSet abstract fun bindRealmSessionProvider(provider: RealmSessionProvider): SessionLifecycleObserver + @Binds + @IntoSet + abstract fun bindEventSenderProcessorAsSessionLifecycleObserver(processor: EventSenderProcessorCoroutine): SessionLifecycleObserver + @Binds abstract fun bindInitialSyncProgressService(service: DefaultInitialSyncProgressService): InitialSyncProgressService @@ -362,4 +368,8 @@ internal abstract class SessionModule { @Binds abstract fun bindRedactEventTask(task: DefaultRedactEventTask): RedactEventTask + + @Binds + abstract fun bindEventSenderProcessor(processor: EventSenderProcessorCoroutine): EventSenderProcessor + } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/identity/DefaultIdentityService.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/identity/DefaultIdentityService.kt index c6fb34151c..948e387cb1 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/identity/DefaultIdentityService.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/identity/DefaultIdentityService.kt @@ -92,7 +92,7 @@ internal class DefaultIdentityService @Inject constructor( private val listeners = mutableSetOf() - override fun onStart() { + override fun onSessionStarted() { lifecycleRegistry.currentState = Lifecycle.State.STARTED // Observe the account data change accountDataDataSource @@ -117,7 +117,7 @@ internal class DefaultIdentityService @Inject constructor( } } - override fun onStop() { + override fun onSessionStopped() { lifecycleRegistry.currentState = Lifecycle.State.DESTROYED } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/integrationmanager/IntegrationManager.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/integrationmanager/IntegrationManager.kt index 19a87103f4..e34615d269 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/integrationmanager/IntegrationManager.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/integrationmanager/IntegrationManager.kt @@ -77,7 +77,7 @@ internal class IntegrationManager @Inject constructor(matrixConfiguration: Matri currentConfigs.add(defaultConfig) } - override fun onStart() { + override fun onSessionStarted() { lifecycleRegistry.currentState = Lifecycle.State.STARTED observeWellknownConfig() accountDataDataSource @@ -105,7 +105,7 @@ internal class IntegrationManager @Inject constructor(matrixConfiguration: Matri } } - override fun onStop() { + override fun onSessionStopped() { lifecycleRegistry.currentState = Lifecycle.State.DESTROYED } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessor.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessor.kt index 62338a1d07..05d0876ef0 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessor.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessor.kt @@ -1,11 +1,11 @@ /* - * Copyright 2020 The Matrix.org Foundation C.I.C. + * Copyright (c) 2021 New Vector Ltd * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,235 +16,22 @@ package org.matrix.android.sdk.internal.session.room.send.queue -import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking -import org.matrix.android.sdk.api.auth.data.SessionParams -import org.matrix.android.sdk.api.auth.data.sessionId -import org.matrix.android.sdk.api.extensions.tryOrNull -import org.matrix.android.sdk.api.failure.Failure -import org.matrix.android.sdk.api.failure.MatrixError -import org.matrix.android.sdk.api.failure.isTokenError -import org.matrix.android.sdk.api.session.crypto.CryptoService import org.matrix.android.sdk.api.session.events.model.Event -import org.matrix.android.sdk.api.session.sync.SyncState import org.matrix.android.sdk.api.util.Cancelable -import org.matrix.android.sdk.internal.session.SessionScope -import org.matrix.android.sdk.internal.task.TaskExecutor -import timber.log.Timber -import java.io.IOException -import java.net.InetAddress -import java.net.InetSocketAddress -import java.net.Socket -import java.util.Timer -import java.util.TimerTask -import java.util.concurrent.LinkedBlockingQueue -import javax.inject.Inject -import kotlin.concurrent.schedule +import org.matrix.android.sdk.internal.session.SessionLifecycleObserver -/** - * A simple ever running thread unique for that session responsible of sending events in order. - * Each send is retried 3 times, if there is no network (e.g if cannot ping home server) it will wait and - * periodically test reachability before resume (does not count as a retry) - * - * If the app is killed before all event were sent, on next wakeup the scheduled events will be re posted - */ -@SessionScope -internal class EventSenderProcessor @Inject constructor( - private val cryptoService: CryptoService, - private val sessionParams: SessionParams, - private val queuedTaskFactory: QueuedTaskFactory, - private val taskExecutor: TaskExecutor, - private val memento: QueueMemento -) : Thread("SENDER_THREAD_SID_${sessionParams.credentials.sessionId()}") { +internal interface EventSenderProcessor: SessionLifecycleObserver { - private fun markAsManaged(task: QueuedTask) { - memento.track(task) - } + fun postEvent(event: Event): Cancelable - private fun markAsFinished(task: QueuedTask) { - memento.unTrack(task) - } + fun postEvent(event: Event, encrypt: Boolean): Cancelable - // API - fun postEvent(event: Event): Cancelable { - return postEvent(event, event.roomId?.let { cryptoService.isRoomEncrypted(it) } ?: false) - } + fun postRedaction(redactionLocalEcho: Event, reason: String?): Cancelable - override fun start() { - super.start() - // We should check for sending events not handled because app was killed - // But we should be careful of only took those that was submitted to us, because if it's - // for example it's a media event it is handled by some worker and he will handle it - // This is a bit fragile :/ - // also some events cannot be retried manually by users, e.g reactions - // they were previously relying on workers to do the work :/ and was expected to always finally succeed - // Also some echos are not to be resent like redaction echos (fake event created for aggregation) + fun postRedaction(redactionLocalEchoId: String, eventToRedactId: String, roomId: String, reason: String?): Cancelable - tryOrNull { - taskExecutor.executorScope.launch { - Timber.d("## Send relaunched pending events on restart") - memento.restoreTasks(this@EventSenderProcessor) - } - } - } + fun postTask(task: QueuedTask): Cancelable - fun postEvent(event: Event, encrypt: Boolean): Cancelable { - val task = queuedTaskFactory.createSendTask(event, encrypt) - return postTask(task) - } + fun cancel(eventId: String, roomId: String) - fun postRedaction(redactionLocalEcho: Event, reason: String?): Cancelable { - return postRedaction(redactionLocalEcho.eventId!!, redactionLocalEcho.redacts!!, redactionLocalEcho.roomId!!, reason) - } - - fun postRedaction(redactionLocalEchoId: String, eventToRedactId: String, roomId: String, reason: String?): Cancelable { - val task = queuedTaskFactory.createRedactTask(redactionLocalEchoId, eventToRedactId, roomId, reason) - return postTask(task) - } - - fun postTask(task: QueuedTask): Cancelable { - // non blocking add to queue - sendingQueue.add(task) - markAsManaged(task) - return task - } - - fun cancel(eventId: String, roomId: String) { - (currentTask as? SendEventQueuedTask) - ?.takeIf { it -> it.event.eventId == eventId && it.event.roomId == roomId } - ?.cancel() - } - - companion object { - private const val RETRY_WAIT_TIME_MS = 10_000L - } - - private var currentTask: QueuedTask? = null - - private var sendingQueue = LinkedBlockingQueue() - - private var networkAvailableLock = Object() - private var canReachServer = true - private var retryNoNetworkTask: TimerTask? = null - - override fun run() { - Timber.v("## SendThread started ts:${System.currentTimeMillis()}") - try { - while (!isInterrupted) { - Timber.v("## SendThread wait for task to process") - val task = sendingQueue.take() - .also { currentTask = it } - Timber.v("## SendThread Found task to process $task") - - if (task.isCancelled()) { - Timber.v("## SendThread send cancelled for $task") - // we do not execute this one - continue - } - // we check for network connectivity - while (!canReachServer) { - Timber.v("## SendThread cannot reach server, wait ts:${System.currentTimeMillis()}") - // schedule to retry - waitForNetwork() - // if thread as been killed meanwhile -// if (state == State.KILLING) break - } - Timber.v("## Server is Reachable") - // so network is available - - runBlocking { - retryLoop@ while (task.retryCount < 3) { - try { - // SendPerformanceProfiler.startStage(task.event.eventId!!, SendPerformanceProfiler.Stages.SEND_WORKER) - Timber.v("## SendThread retryLoop for $task retryCount ${task.retryCount}") - task.execute() - // sendEventTask.execute(SendEventTask.Params(task.event, task.encrypt, cryptoService)) - // SendPerformanceProfiler.stopStage(task.event.eventId, SendPerformanceProfiler.Stages.SEND_WORKER) - break@retryLoop - } catch (exception: Throwable) { - when { - exception is IOException || exception is Failure.NetworkConnection -> { - canReachServer = false - task.retryCount++ - if (task.retryCount >= 3) task.onTaskFailed() - while (!canReachServer) { - Timber.v("## SendThread retryLoop cannot reach server, wait ts:${System.currentTimeMillis()}") - // schedule to retry - waitForNetwork() - } - } - (exception is Failure.ServerError && exception.error.code == MatrixError.M_LIMIT_EXCEEDED) -> { - task.retryCount++ - if (task.retryCount >= 3) task.onTaskFailed() - Timber.v("## SendThread retryLoop retryable error for $task reason: ${exception.localizedMessage}") - // wait a bit - // Todo if its a quota exception can we get timout? - sleep(3_000) - continue@retryLoop - } - exception.isTokenError() -> { - Timber.v("## SendThread retryLoop retryable TOKEN error, interrupt") - // we can exit the loop - task.onTaskFailed() - throw InterruptedException() - } - exception is CancellationException -> { - Timber.v("## SendThread task has been cancelled") - break@retryLoop - } - else -> { - Timber.v("## SendThread retryLoop Un-Retryable error, try next task") - // this task is in error, check next one? - task.onTaskFailed() - break@retryLoop - } - } - } - } - } - markAsFinished(task) - } - } catch (interruptionException: InterruptedException) { - // will be thrown is thread is interrupted while seeping - interrupt() - Timber.v("## InterruptedException!! ${interruptionException.localizedMessage}") - } -// state = State.KILLED - // is this needed? - retryNoNetworkTask?.cancel() - Timber.w("## SendThread finished ${System.currentTimeMillis()}") - } - - private fun waitForNetwork() { - retryNoNetworkTask = Timer(SyncState.NoNetwork.toString(), false).schedule(RETRY_WAIT_TIME_MS) { - synchronized(networkAvailableLock) { - canReachServer = checkHostAvailable().also { - Timber.v("## SendThread checkHostAvailable $it") - } - networkAvailableLock.notify() - } - } - synchronized(networkAvailableLock) { networkAvailableLock.wait() } - } - - /** - * Check if homeserver is reachable. - */ - private fun checkHostAvailable(): Boolean { - val host = sessionParams.homeServerConnectionConfig.homeServerUri.host ?: return false - val port = sessionParams.homeServerConnectionConfig.homeServerUri.port.takeIf { it != -1 } ?: 80 - val timeout = 30_000 - try { - Socket().use { socket -> - val inetAddress: InetAddress = InetAddress.getByName(host) - val inetSocketAddress = InetSocketAddress(inetAddress, port) - socket.connect(inetSocketAddress, timeout) - return true - } - } catch (e: IOException) { - Timber.v("## EventSender isHostAvailable failure ${e.localizedMessage}") - return false - } - } } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessorCoroutine.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessorCoroutine.kt new file mode 100644 index 0000000000..77e156f90a --- /dev/null +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessorCoroutine.kt @@ -0,0 +1,194 @@ +/* + * Copyright (c) 2021 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.matrix.android.sdk.internal.session.room.send.queue + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.withContext +import org.matrix.android.sdk.api.auth.data.SessionParams +import org.matrix.android.sdk.api.failure.Failure +import org.matrix.android.sdk.api.failure.MatrixError +import org.matrix.android.sdk.api.session.crypto.CryptoService +import org.matrix.android.sdk.api.session.events.model.Event +import org.matrix.android.sdk.api.util.Cancelable +import org.matrix.android.sdk.internal.session.SessionScope +import org.matrix.android.sdk.internal.task.CoroutineSequencer +import org.matrix.android.sdk.internal.task.SemaphoreCoroutineSequencer +import org.matrix.android.sdk.internal.task.TaskExecutor +import org.matrix.android.sdk.internal.util.toCancelable +import timber.log.Timber +import java.io.IOException +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.atomic.AtomicBoolean +import javax.inject.Inject +import kotlin.coroutines.cancellation.CancellationException + +private const val RETRY_WAIT_TIME_MS = 10_000L + +/** + * This class is responsible for sending events in order in each room. It uses the QueuedTask.queueIdentifier to execute tasks sequentially. + * Each send is retried 3 times, if there is no network (e.g if cannot ping home server) it will wait and + * periodically test reachability before resume (does not count as a retry) + * + * If the app is killed before all event were sent, on next wakeup the scheduled events will be re posted + * + */ +@SessionScope +internal class EventSenderProcessorCoroutine @Inject constructor( + private val cryptoService: CryptoService, + private val sessionParams: SessionParams, + private val queuedTaskFactory: QueuedTaskFactory, + private val taskExecutor: TaskExecutor, + private val memento: QueueMemento, +) : EventSenderProcessor { + + /** + * sequencers use QueuedTask.queueIdentifier as key + */ + private val sequencers = ConcurrentHashMap() + + /** + * cancelableBag use QueuedTask.taskIdentifier as key + */ + private val cancelableBag = ConcurrentHashMap() + + override fun onSessionStarted() { + // We should check for sending events not handled because app was killed + // But we should be careful of only took those that was submitted to us, because if it's + // for example it's a media event it is handled by some worker and he will handle it + // This is a bit fragile :/ + // also some events cannot be retried manually by users, e.g reactions + // they were previously relying on workers to do the work :/ and was expected to always finally succeed + // Also some echos are not to be resent like redaction echos (fake event created for aggregation) + taskExecutor.executorScope.launch { + Timber.d("## Send relaunched pending events on restart") + try { + memento.restoreTasks(this@EventSenderProcessorCoroutine) + } catch (failure: Throwable) { + Timber.e(failure, "Fail restoring send tasks") + } + } + } + + override fun postEvent(event: Event): Cancelable { + return postEvent(event, event.roomId?.let { cryptoService.isRoomEncrypted(it) } ?: false) + } + + override fun postEvent(event: Event, encrypt: Boolean): Cancelable { + val task = queuedTaskFactory.createSendTask(event, encrypt) + return postTask(task) + } + + override fun postRedaction(redactionLocalEcho: Event, reason: String?): Cancelable { + return postRedaction(redactionLocalEcho.eventId!!, redactionLocalEcho.redacts!!, redactionLocalEcho.roomId!!, reason) + } + + override fun postRedaction(redactionLocalEchoId: String, eventToRedactId: String, roomId: String, reason: String?): Cancelable { + val task = queuedTaskFactory.createRedactTask(redactionLocalEchoId, eventToRedactId, roomId, reason) + return postTask(task) + } + + override fun postTask(task: QueuedTask): Cancelable { + markAsManaged(task) + val sequencer = sequencers.getOrPut(task.queueIdentifier) { + SemaphoreCoroutineSequencer() + } + return taskExecutor.executorScope + .launchWith(sequencer) { + executeTask(task) + }.toCancelable() + .also { + cancelableBag[task.taskIdentifier] + } + } + + override fun cancel(eventId: String, roomId: String) { + // eventId is the taskIdentifier + cancelableBag[eventId]?.cancel() + } + + private fun CoroutineScope.launchWith(sequencer: CoroutineSequencer, block: suspend CoroutineScope.() -> Unit) = launch { + sequencer.post { + block() + } + } + + private suspend fun executeTask(task: QueuedTask) { + try { + if (task.isCancelled()) { + Timber.v("## task has been cancelled") + return + } + waitForNetwork() + Timber.v("## execute task with id: ${task.queueIdentifier}") + task.execute() + } catch (exception: Throwable) { + when { + exception is IOException || exception is Failure.NetworkConnection -> { + canReachServer.set(false) + task.retryCount++ + if (task.retryCount >= 3) task.onTaskFailed() + Timber.v("## retryable error for $task reason: ${exception.localizedMessage}") + // Retry + executeTask(task) + } + (exception is Failure.ServerError && exception.error.code == MatrixError.M_LIMIT_EXCEEDED) -> { + task.retryCount++ + if (task.retryCount >= 3) task.onTaskFailed() + Timber.v("## retryable error for $task reason: ${exception.localizedMessage}") + // Wait some time + delay(exception.error.retryAfterMillis?.plus(100) ?: 3_000) + // and retry + executeTask(task) + } + exception is CancellationException -> { + Timber.v("## task has been cancelled") + } + else -> { + Timber.v("## Un-Retryable error, try next task") + // this task is in error, check next one? + task.onTaskFailed() + } + } + } + markAsFinished(task) + } + + private fun markAsManaged(task: QueuedTask) { + memento.track(task) + } + + private fun markAsFinished(task: QueuedTask) { + cancelableBag.remove(task.taskIdentifier) + memento.unTrack(task) + } + + private val canReachServer = AtomicBoolean(true) + + private suspend fun waitForNetwork() { + while (!canReachServer.get()) { + Timber.v("## Cannot reach server, wait ts:${System.currentTimeMillis()}") + delay(RETRY_WAIT_TIME_MS) + withContext(Dispatchers.IO) { + val hostAvailable = HomeServerAvailabilityChecker(sessionParams).check() + canReachServer.set(hostAvailable) + } + } + } +} diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessorThread.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessorThread.kt new file mode 100644 index 0000000000..c3a9eb4a07 --- /dev/null +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/EventSenderProcessorThread.kt @@ -0,0 +1,240 @@ +/* + * Copyright 2020 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.matrix.android.sdk.internal.session.room.send.queue + +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import org.matrix.android.sdk.api.auth.data.SessionParams +import org.matrix.android.sdk.api.auth.data.sessionId +import org.matrix.android.sdk.api.extensions.tryOrNull +import org.matrix.android.sdk.api.failure.Failure +import org.matrix.android.sdk.api.failure.MatrixError +import org.matrix.android.sdk.api.failure.isTokenError +import org.matrix.android.sdk.api.session.crypto.CryptoService +import org.matrix.android.sdk.api.session.events.model.Event +import org.matrix.android.sdk.api.session.sync.SyncState +import org.matrix.android.sdk.api.util.Cancelable +import org.matrix.android.sdk.internal.session.SessionScope +import org.matrix.android.sdk.internal.task.TaskExecutor +import timber.log.Timber +import java.io.IOException +import java.net.InetAddress +import java.net.InetSocketAddress +import java.net.Socket +import java.util.Timer +import java.util.TimerTask +import java.util.concurrent.LinkedBlockingQueue +import javax.inject.Inject +import kotlin.concurrent.schedule + +/** + * A simple ever running thread unique for that session responsible of sending events in order. + * Each send is retried 3 times, if there is no network (e.g if cannot ping home server) it will wait and + * periodically test reachability before resume (does not count as a retry) + * + * If the app is killed before all event were sent, on next wakeup the scheduled events will be re posted + */ +@SessionScope +internal class EventSenderProcessorThread @Inject constructor( + private val cryptoService: CryptoService, + private val sessionParams: SessionParams, + private val queuedTaskFactory: QueuedTaskFactory, + private val taskExecutor: TaskExecutor, + private val memento: QueueMemento +) : Thread("SENDER_THREAD_SID_${sessionParams.credentials.sessionId()}"), EventSenderProcessor { + + private fun markAsManaged(task: QueuedTask) { + memento.track(task) + } + + private fun markAsFinished(task: QueuedTask) { + memento.unTrack(task) + } + + override fun onSessionStarted() { + start() + } + + override fun onSessionStopped() { + interrupt() + } + + + override fun start() { + super.start() + // We should check for sending events not handled because app was killed + // But we should be careful of only took those that was submitted to us, because if it's + // for example it's a media event it is handled by some worker and he will handle it + // This is a bit fragile :/ + // also some events cannot be retried manually by users, e.g reactions + // they were previously relying on workers to do the work :/ and was expected to always finally succeed + // Also some echos are not to be resent like redaction echos (fake event created for aggregation) + + tryOrNull { + taskExecutor.executorScope.launch { + Timber.d("## Send relaunched pending events on restart") + memento.restoreTasks(this@EventSenderProcessorThread) + } + } + } + + // API + override fun postEvent(event: Event): Cancelable { + return postEvent(event, event.roomId?.let { cryptoService.isRoomEncrypted(it) } ?: false) + } + + override fun postEvent(event: Event, encrypt: Boolean): Cancelable { + val task = queuedTaskFactory.createSendTask(event, encrypt) + return postTask(task) + } + + override fun postRedaction(redactionLocalEcho: Event, reason: String?): Cancelable { + return postRedaction(redactionLocalEcho.eventId!!, redactionLocalEcho.redacts!!, redactionLocalEcho.roomId!!, reason) + } + + override fun postRedaction(redactionLocalEchoId: String, eventToRedactId: String, roomId: String, reason: String?): Cancelable { + val task = queuedTaskFactory.createRedactTask(redactionLocalEchoId, eventToRedactId, roomId, reason) + return postTask(task) + } + + override fun postTask(task: QueuedTask): Cancelable { + // non blocking add to queue + sendingQueue.add(task) + markAsManaged(task) + return task + } + + override fun cancel(eventId: String, roomId: String) { + (currentTask as? SendEventQueuedTask) + ?.takeIf { it -> it.event.eventId == eventId && it.event.roomId == roomId } + ?.cancel() + } + + companion object { + private const val RETRY_WAIT_TIME_MS = 10_000L + } + + private var currentTask: QueuedTask? = null + + private var sendingQueue = LinkedBlockingQueue() + + private var networkAvailableLock = Object() + private var canReachServer = true + private var retryNoNetworkTask: TimerTask? = null + + override fun run() { + Timber.v("## SendThread started ts:${System.currentTimeMillis()}") + try { + while (!isInterrupted) { + Timber.v("## SendThread wait for task to process") + val task = sendingQueue.take() + .also { currentTask = it } + Timber.v("## SendThread Found task to process $task") + + if (task.isCancelled()) { + Timber.v("## SendThread send cancelled for $task") + // we do not execute this one + continue + } + // we check for network connectivity + while (!canReachServer) { + Timber.v("## SendThread cannot reach server, wait ts:${System.currentTimeMillis()}") + // schedule to retry + waitForNetwork() + // if thread as been killed meanwhile +// if (state == State.KILLING) break + } + Timber.v("## Server is Reachable") + // so network is available + + runBlocking { + retryLoop@ while (task.retryCount < 3) { + try { + // SendPerformanceProfiler.startStage(task.event.eventId!!, SendPerformanceProfiler.Stages.SEND_WORKER) + Timber.v("## SendThread retryLoop for $task retryCount ${task.retryCount}") + task.execute() + // sendEventTask.execute(SendEventTask.Params(task.event, task.encrypt, cryptoService)) + // SendPerformanceProfiler.stopStage(task.event.eventId, SendPerformanceProfiler.Stages.SEND_WORKER) + break@retryLoop + } catch (exception: Throwable) { + when { + exception is IOException || exception is Failure.NetworkConnection -> { + canReachServer = false + task.retryCount++ + if (task.retryCount >= 3) task.onTaskFailed() + while (!canReachServer) { + Timber.v("## SendThread retryLoop cannot reach server, wait ts:${System.currentTimeMillis()}") + // schedule to retry + waitForNetwork() + } + } + (exception is Failure.ServerError && exception.error.code == MatrixError.M_LIMIT_EXCEEDED) -> { + task.retryCount++ + if (task.retryCount >= 3) task.onTaskFailed() + Timber.v("## SendThread retryLoop retryable error for $task reason: ${exception.localizedMessage}") + // wait a bit + // Todo if its a quota exception can we get timout? + sleep(3_000) + continue@retryLoop + } + exception.isTokenError() -> { + Timber.v("## SendThread retryLoop retryable TOKEN error, interrupt") + // we can exit the loop + task.onTaskFailed() + throw InterruptedException() + } + exception is CancellationException -> { + Timber.v("## SendThread task has been cancelled") + break@retryLoop + } + else -> { + Timber.v("## SendThread retryLoop Un-Retryable error, try next task") + // this task is in error, check next one? + task.onTaskFailed() + break@retryLoop + } + } + } + } + } + markAsFinished(task) + } + } catch (interruptionException: InterruptedException) { + // will be thrown is thread is interrupted while seeping + interrupt() + Timber.v("## InterruptedException!! ${interruptionException.localizedMessage}") + } +// state = State.KILLED + // is this needed? + retryNoNetworkTask?.cancel() + Timber.w("## SendThread finished ${System.currentTimeMillis()}") + } + + private fun waitForNetwork() { + retryNoNetworkTask = Timer(SyncState.NoNetwork.toString(), false).schedule(RETRY_WAIT_TIME_MS) { + synchronized(networkAvailableLock) { + canReachServer = HomeServerAvailabilityChecker(sessionParams).check().also { + Timber.v("## SendThread checkHostAvailable $it") + } + networkAvailableLock.notify() + } + } + synchronized(networkAvailableLock) { networkAvailableLock.wait() } + } + +} diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/HomeServerAvailabilityChecker.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/HomeServerAvailabilityChecker.kt new file mode 100644 index 0000000000..c68be74a64 --- /dev/null +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/HomeServerAvailabilityChecker.kt @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2021 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.matrix.android.sdk.internal.session.room.send.queue + +import org.matrix.android.sdk.api.auth.data.SessionParams +import timber.log.Timber +import java.io.IOException +import java.net.InetAddress +import java.net.InetSocketAddress +import java.net.Socket + +internal class HomeServerAvailabilityChecker(val sessionParams: SessionParams) { + + fun check(): Boolean { + val host = sessionParams.homeServerConnectionConfig.homeServerUri.host ?: return false + val port = sessionParams.homeServerConnectionConfig.homeServerUri.port.takeIf { it != -1 } ?: 80 + val timeout = 30_000 + try { + Socket().use { socket -> + val inetAddress: InetAddress = InetAddress.getByName(host) + val inetSocketAddress = InetSocketAddress(inetAddress, port) + socket.connect(inetSocketAddress, timeout) + return true + } + } catch (e: IOException) { + Timber.v("## EventSender isHostAvailable failure ${e.localizedMessage}") + return false + } + } + +} diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/QueueMemento.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/QueueMemento.kt index a6836c8086..472e4d440f 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/QueueMemento.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/QueueMemento.kt @@ -32,6 +32,9 @@ import javax.inject.Inject * It is just used to remember what events/localEchos was managed by the event sender in order to * reschedule them (and only them) on next restart */ + +private const val PERSISTENCE_KEY = "ManagedBySender" + internal class QueueMemento @Inject constructor(context: Context, @SessionId sessionId: String, private val queuedTaskFactory: QueuedTaskFactory, @@ -39,28 +42,28 @@ internal class QueueMemento @Inject constructor(context: Context, private val cryptoService: CryptoService) { private val storage = context.getSharedPreferences("QueueMemento_$sessionId", Context.MODE_PRIVATE) - private val managedTaskInfos = mutableListOf() + private val trackedTasks = mutableListOf() - fun track(task: QueuedTask) { - synchronized(managedTaskInfos) { - managedTaskInfos.add(task) - persist() - } + fun track(task: QueuedTask) = synchronized(trackedTasks) { + trackedTasks.add(task) + persist() } - fun unTrack(task: QueuedTask) { - synchronized(managedTaskInfos) { - managedTaskInfos.remove(task) - persist() - } + fun unTrack(task: QueuedTask) = synchronized(trackedTasks) { + trackedTasks.remove(task) + persist() + } + + fun trackedTasks() = synchronized(trackedTasks){ + } private fun persist() { - managedTaskInfos.mapIndexedNotNull { index, queuedTask -> + trackedTasks.mapIndexedNotNull { index, queuedTask -> toTaskInfo(queuedTask, index)?.let { TaskInfo.map(it) } }.toSet().let { set -> storage.edit() - .putStringSet("ManagedBySender", set) + .putStringSet(PERSISTENCE_KEY, set) .apply() } } @@ -82,7 +85,7 @@ internal class QueueMemento @Inject constructor(context: Context, suspend fun restoreTasks(eventProcessor: EventSenderProcessor) { // events should be restarted in correct order - storage.getStringSet("ManagedBySender", null)?.let { pending -> + storage.getStringSet(PERSISTENCE_KEY, null)?.let { pending -> Timber.d("## Send - Recovering unsent events $pending") pending.mapNotNull { tryOrNull { TaskInfo.map(it) } } } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/QueuedTask.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/QueuedTask.kt index 9a7fcd8d91..a4eb9e9323 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/QueuedTask.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/QueuedTask.kt @@ -18,7 +18,17 @@ package org.matrix.android.sdk.internal.session.room.send.queue import org.matrix.android.sdk.api.util.Cancelable -abstract class QueuedTask : Cancelable { +/** + * @param queueIdentifier String value to identify a unique Queue + * @param taskIdentifier String value to identify a unique Task. Should be different from queueIdentifier + */ +internal abstract class QueuedTask( + val queueIdentifier: String, + val taskIdentifier: String +) : Cancelable { + + override fun toString() = "queueIdentifier: $queueIdentifier, taskIdentifier: ${taskIdentifier})" + var retryCount = 0 private var hasBeenCancelled: Boolean = false diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/RedactQueuedTask.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/RedactQueuedTask.kt index 8e7ba2f155..0e3d88aa79 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/RedactQueuedTask.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/RedactQueuedTask.kt @@ -29,9 +29,7 @@ internal class RedactQueuedTask( private val redactEventTask: RedactEventTask, private val localEchoRepository: LocalEchoRepository, private val cancelSendTracker: CancelSendTracker -) : QueuedTask() { - - override fun toString() = "[RedactQueuedTask $redactionLocalEchoId]" +) : QueuedTask(queueIdentifier = roomId, taskIdentifier = redactionLocalEchoId) { override suspend fun doExecute() { redactEventTask.execute(RedactEventTask.Params(redactionLocalEchoId, roomId, toRedactEventId, reason)) diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/SendEventQueuedTask.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/SendEventQueuedTask.kt index ea097082c7..49492e7990 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/SendEventQueuedTask.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/SendEventQueuedTask.kt @@ -31,9 +31,7 @@ internal class SendEventQueuedTask( val cryptoService: CryptoService, val localEchoRepository: LocalEchoRepository, val cancelSendTracker: CancelSendTracker -) : QueuedTask() { - - override fun toString() = "[SendEventQueuedTask ${event.eventId}]" +) : QueuedTask(queueIdentifier = event.roomId!!, taskIdentifier = event.eventId!!) { override suspend fun doExecute() { sendEventTask.execute(SendEventTask.Params(event, encrypt)) diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/TaskInfo.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/TaskInfo.kt index 87c6299c4d..a03ab778f6 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/TaskInfo.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/room/send/queue/TaskInfo.kt @@ -36,7 +36,7 @@ internal interface TaskInfo { const val TYPE_SEND = "TYPE_SEND" const val TYPE_REDACT = "TYPE_REDACT" - val moshi = Moshi.Builder() + private val moshi = Moshi.Builder() .add(RuntimeJsonAdapterFactory.of(TaskInfo::class.java, "type", FallbackTaskInfo::class.java) .registerSubtype(SendEventTaskInfo::class.java, TYPE_SEND) .registerSubtype(RedactEventTaskInfo::class.java, TYPE_REDACT) @@ -71,6 +71,6 @@ internal data class RedactEventTaskInfo( @JsonClass(generateAdapter = true) internal data class FallbackTaskInfo( - @Json(name = "type") override val type: String = TaskInfo.TYPE_REDACT, + @Json(name = "type") override val type: String = TaskInfo.TYPE_UNKNOWN, @Json(name = "order") override val order: Int ) : TaskInfo diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/widgets/DefaultWidgetURLFormatter.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/widgets/DefaultWidgetURLFormatter.kt index db74e76b31..0937f6d18b 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/widgets/DefaultWidgetURLFormatter.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/widgets/DefaultWidgetURLFormatter.kt @@ -37,12 +37,12 @@ internal class DefaultWidgetURLFormatter @Inject constructor(private val integra private lateinit var currentConfig: IntegrationManagerConfig private var whiteListedUrls: List = emptyList() - override fun onStart() { + override fun onSessionStarted() { setupWithConfiguration() integrationManager.addListener(this) } - override fun onStop() { + override fun onSessionStopped() { integrationManager.removeListener(this) } diff --git a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/widgets/WidgetManager.kt b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/widgets/WidgetManager.kt index f841a2a245..73a4cc697d 100644 --- a/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/widgets/WidgetManager.kt +++ b/matrix-sdk-android/src/main/java/org/matrix/android/sdk/internal/session/widgets/WidgetManager.kt @@ -62,12 +62,12 @@ internal class WidgetManager @Inject constructor(private val integrationManager: private val lifecycleOwner: LifecycleOwner = LifecycleOwner { lifecycleRegistry } private val lifecycleRegistry: LifecycleRegistry = LifecycleRegistry(lifecycleOwner) - override fun onStart() { + override fun onSessionStarted() { lifecycleRegistry.currentState = Lifecycle.State.STARTED integrationManager.addListener(this) } - override fun onStop() { + override fun onSessionStopped() { integrationManager.removeListener(this) lifecycleRegistry.currentState = Lifecycle.State.DESTROYED }