Send: use coroutines and let room sending queues be independent of each others

This commit is contained in:
ganfra 2021-03-05 19:08:46 +01:00
parent 79c9c7105e
commit 9174632cfc
19 changed files with 549 additions and 268 deletions

View file

@ -47,7 +47,7 @@ private const val MIN_NUMBER_OF_EVENTS_BY_CHUNK = 300
internal class DatabaseCleaner @Inject constructor(@SessionDatabase private val realmConfiguration: RealmConfiguration,
private val taskExecutor: TaskExecutor) : SessionLifecycleObserver {
override fun onStart() {
override fun onSessionStarted() {
taskExecutor.executorScope.launch(Dispatchers.Default) {
awaitTransaction(realmConfiguration) { realm ->
val allRooms = realm.where(RoomEntity::class.java).findAll()

View file

@ -46,7 +46,7 @@ internal abstract class RealmLiveEntityObserver<T : RealmObject>(protected val r
private val backgroundRealm = AtomicReference<Realm>()
private lateinit var results: AtomicReference<RealmResults<T>>
override fun onStart() {
override fun onSessionStarted() {
if (isStarted.compareAndSet(false, true)) {
BACKGROUND_HANDLER.post {
val realm = Realm.getInstance(realmConfiguration)
@ -58,7 +58,7 @@ internal abstract class RealmLiveEntityObserver<T : RealmObject>(protected val r
}
}
override fun onStop() {
override fun onSessionStopped() {
if (isStarted.compareAndSet(true, false)) {
BACKGROUND_HANDLER.post {
results.getAndSet(null).removeAllChangeListeners()

View file

@ -44,14 +44,14 @@ internal class RealmSessionProvider @Inject constructor(@SessionDatabase private
}
@MainThread
override fun onStart() {
override fun onSessionStarted() {
realmThreadLocal.getOrSet {
Realm.getInstance(monarchy.realmConfiguration)
}
}
@MainThread
override fun onStop() {
override fun onSessionStopped() {
realmThreadLocal.get()?.close()
realmThreadLocal.remove()
}

View file

@ -65,7 +65,6 @@ import org.matrix.android.sdk.internal.di.UnauthenticatedWithCertificate
import org.matrix.android.sdk.internal.di.WorkManagerProvider
import org.matrix.android.sdk.internal.network.GlobalErrorHandler
import org.matrix.android.sdk.internal.session.identity.DefaultIdentityService
import org.matrix.android.sdk.internal.session.room.send.queue.EventSenderProcessor
import org.matrix.android.sdk.internal.session.sync.SyncTokenStore
import org.matrix.android.sdk.internal.session.sync.job.SyncThread
import org.matrix.android.sdk.internal.session.sync.job.SyncWorker
@ -120,8 +119,7 @@ internal class DefaultSession @Inject constructor(
private val thirdPartyService: Lazy<ThirdPartyService>,
private val callSignalingService: Lazy<CallSignalingService>,
@UnauthenticatedWithCertificate
private val unauthenticatedWithCertificateOkHttpClient: Lazy<OkHttpClient>,
private val eventSenderProcessor: EventSenderProcessor
private val unauthenticatedWithCertificateOkHttpClient: Lazy<OkHttpClient>
) : Session,
RoomService by roomService.get(),
RoomDirectoryService by roomDirectoryService.get(),
@ -158,10 +156,9 @@ internal class DefaultSession @Inject constructor(
isOpen = true
cryptoService.get().ensureDevice()
uiHandler.post {
lifecycleObservers.forEach { it.onStart() }
lifecycleObservers.forEach { it.onSessionStarted() }
}
globalErrorHandler.listener = this
eventSenderProcessor.start()
}
override fun requireBackgroundSync() {
@ -200,12 +197,11 @@ internal class DefaultSession @Inject constructor(
stopSync()
// timelineEventDecryptor.destroy()
uiHandler.post {
lifecycleObservers.forEach { it.onStop() }
lifecycleObservers.forEach { it.onSessionStopped() }
}
cryptoService.get().close()
isOpen = false
globalErrorHandler.listener = null
eventSenderProcessor.interrupt()
}
override fun getSyncStateLive() = getSyncThread().liveState()

View file

@ -27,7 +27,7 @@ internal interface SessionLifecycleObserver {
Called when the session is opened
*/
@MainThread
fun onStart() {
fun onSessionStarted() {
// noop
}
@ -43,7 +43,7 @@ internal interface SessionLifecycleObserver {
Called when the session is closed
*/
@MainThread
fun onStop() {
fun onSessionStopped() {
// noop
}
}

View file

@ -83,6 +83,8 @@ import org.matrix.android.sdk.internal.session.permalinks.DefaultPermalinkServic
import org.matrix.android.sdk.internal.session.room.EventRelationsAggregationProcessor
import org.matrix.android.sdk.internal.session.room.create.RoomCreateEventProcessor
import org.matrix.android.sdk.internal.session.room.prune.RedactionEventProcessor
import org.matrix.android.sdk.internal.session.room.send.queue.EventSenderProcessor
import org.matrix.android.sdk.internal.session.room.send.queue.EventSenderProcessorCoroutine
import org.matrix.android.sdk.internal.session.room.tombstone.RoomTombstoneEventProcessor
import org.matrix.android.sdk.internal.session.securestorage.DefaultSecureStorageService
import org.matrix.android.sdk.internal.session.typing.DefaultTypingUsersTracker
@ -339,6 +341,10 @@ internal abstract class SessionModule {
@IntoSet
abstract fun bindRealmSessionProvider(provider: RealmSessionProvider): SessionLifecycleObserver
@Binds
@IntoSet
abstract fun bindEventSenderProcessorAsSessionLifecycleObserver(processor: EventSenderProcessorCoroutine): SessionLifecycleObserver
@Binds
abstract fun bindInitialSyncProgressService(service: DefaultInitialSyncProgressService): InitialSyncProgressService
@ -362,4 +368,8 @@ internal abstract class SessionModule {
@Binds
abstract fun bindRedactEventTask(task: DefaultRedactEventTask): RedactEventTask
@Binds
abstract fun bindEventSenderProcessor(processor: EventSenderProcessorCoroutine): EventSenderProcessor
}

View file

@ -92,7 +92,7 @@ internal class DefaultIdentityService @Inject constructor(
private val listeners = mutableSetOf<IdentityServiceListener>()
override fun onStart() {
override fun onSessionStarted() {
lifecycleRegistry.currentState = Lifecycle.State.STARTED
// Observe the account data change
accountDataDataSource
@ -117,7 +117,7 @@ internal class DefaultIdentityService @Inject constructor(
}
}
override fun onStop() {
override fun onSessionStopped() {
lifecycleRegistry.currentState = Lifecycle.State.DESTROYED
}

View file

@ -77,7 +77,7 @@ internal class IntegrationManager @Inject constructor(matrixConfiguration: Matri
currentConfigs.add(defaultConfig)
}
override fun onStart() {
override fun onSessionStarted() {
lifecycleRegistry.currentState = Lifecycle.State.STARTED
observeWellknownConfig()
accountDataDataSource
@ -105,7 +105,7 @@ internal class IntegrationManager @Inject constructor(matrixConfiguration: Matri
}
}
override fun onStop() {
override fun onSessionStopped() {
lifecycleRegistry.currentState = Lifecycle.State.DESTROYED
}

View file

@ -1,11 +1,11 @@
/*
* Copyright 2020 The Matrix.org Foundation C.I.C.
* Copyright (c) 2021 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@ -16,235 +16,22 @@
package org.matrix.android.sdk.internal.session.room.send.queue
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.matrix.android.sdk.api.auth.data.SessionParams
import org.matrix.android.sdk.api.auth.data.sessionId
import org.matrix.android.sdk.api.extensions.tryOrNull
import org.matrix.android.sdk.api.failure.Failure
import org.matrix.android.sdk.api.failure.MatrixError
import org.matrix.android.sdk.api.failure.isTokenError
import org.matrix.android.sdk.api.session.crypto.CryptoService
import org.matrix.android.sdk.api.session.events.model.Event
import org.matrix.android.sdk.api.session.sync.SyncState
import org.matrix.android.sdk.api.util.Cancelable
import org.matrix.android.sdk.internal.session.SessionScope
import org.matrix.android.sdk.internal.task.TaskExecutor
import timber.log.Timber
import java.io.IOException
import java.net.InetAddress
import java.net.InetSocketAddress
import java.net.Socket
import java.util.Timer
import java.util.TimerTask
import java.util.concurrent.LinkedBlockingQueue
import javax.inject.Inject
import kotlin.concurrent.schedule
import org.matrix.android.sdk.internal.session.SessionLifecycleObserver
/**
* A simple ever running thread unique for that session responsible of sending events in order.
* Each send is retried 3 times, if there is no network (e.g if cannot ping home server) it will wait and
* periodically test reachability before resume (does not count as a retry)
*
* If the app is killed before all event were sent, on next wakeup the scheduled events will be re posted
*/
@SessionScope
internal class EventSenderProcessor @Inject constructor(
private val cryptoService: CryptoService,
private val sessionParams: SessionParams,
private val queuedTaskFactory: QueuedTaskFactory,
private val taskExecutor: TaskExecutor,
private val memento: QueueMemento
) : Thread("SENDER_THREAD_SID_${sessionParams.credentials.sessionId()}") {
internal interface EventSenderProcessor: SessionLifecycleObserver {
private fun markAsManaged(task: QueuedTask) {
memento.track(task)
}
fun postEvent(event: Event): Cancelable
private fun markAsFinished(task: QueuedTask) {
memento.unTrack(task)
}
fun postEvent(event: Event, encrypt: Boolean): Cancelable
// API
fun postEvent(event: Event): Cancelable {
return postEvent(event, event.roomId?.let { cryptoService.isRoomEncrypted(it) } ?: false)
}
fun postRedaction(redactionLocalEcho: Event, reason: String?): Cancelable
override fun start() {
super.start()
// We should check for sending events not handled because app was killed
// But we should be careful of only took those that was submitted to us, because if it's
// for example it's a media event it is handled by some worker and he will handle it
// This is a bit fragile :/
// also some events cannot be retried manually by users, e.g reactions
// they were previously relying on workers to do the work :/ and was expected to always finally succeed
// Also some echos are not to be resent like redaction echos (fake event created for aggregation)
fun postRedaction(redactionLocalEchoId: String, eventToRedactId: String, roomId: String, reason: String?): Cancelable
tryOrNull {
taskExecutor.executorScope.launch {
Timber.d("## Send relaunched pending events on restart")
memento.restoreTasks(this@EventSenderProcessor)
}
}
}
fun postTask(task: QueuedTask): Cancelable
fun postEvent(event: Event, encrypt: Boolean): Cancelable {
val task = queuedTaskFactory.createSendTask(event, encrypt)
return postTask(task)
}
fun cancel(eventId: String, roomId: String)
fun postRedaction(redactionLocalEcho: Event, reason: String?): Cancelable {
return postRedaction(redactionLocalEcho.eventId!!, redactionLocalEcho.redacts!!, redactionLocalEcho.roomId!!, reason)
}
fun postRedaction(redactionLocalEchoId: String, eventToRedactId: String, roomId: String, reason: String?): Cancelable {
val task = queuedTaskFactory.createRedactTask(redactionLocalEchoId, eventToRedactId, roomId, reason)
return postTask(task)
}
fun postTask(task: QueuedTask): Cancelable {
// non blocking add to queue
sendingQueue.add(task)
markAsManaged(task)
return task
}
fun cancel(eventId: String, roomId: String) {
(currentTask as? SendEventQueuedTask)
?.takeIf { it -> it.event.eventId == eventId && it.event.roomId == roomId }
?.cancel()
}
companion object {
private const val RETRY_WAIT_TIME_MS = 10_000L
}
private var currentTask: QueuedTask? = null
private var sendingQueue = LinkedBlockingQueue<QueuedTask>()
private var networkAvailableLock = Object()
private var canReachServer = true
private var retryNoNetworkTask: TimerTask? = null
override fun run() {
Timber.v("## SendThread started ts:${System.currentTimeMillis()}")
try {
while (!isInterrupted) {
Timber.v("## SendThread wait for task to process")
val task = sendingQueue.take()
.also { currentTask = it }
Timber.v("## SendThread Found task to process $task")
if (task.isCancelled()) {
Timber.v("## SendThread send cancelled for $task")
// we do not execute this one
continue
}
// we check for network connectivity
while (!canReachServer) {
Timber.v("## SendThread cannot reach server, wait ts:${System.currentTimeMillis()}")
// schedule to retry
waitForNetwork()
// if thread as been killed meanwhile
// if (state == State.KILLING) break
}
Timber.v("## Server is Reachable")
// so network is available
runBlocking {
retryLoop@ while (task.retryCount < 3) {
try {
// SendPerformanceProfiler.startStage(task.event.eventId!!, SendPerformanceProfiler.Stages.SEND_WORKER)
Timber.v("## SendThread retryLoop for $task retryCount ${task.retryCount}")
task.execute()
// sendEventTask.execute(SendEventTask.Params(task.event, task.encrypt, cryptoService))
// SendPerformanceProfiler.stopStage(task.event.eventId, SendPerformanceProfiler.Stages.SEND_WORKER)
break@retryLoop
} catch (exception: Throwable) {
when {
exception is IOException || exception is Failure.NetworkConnection -> {
canReachServer = false
task.retryCount++
if (task.retryCount >= 3) task.onTaskFailed()
while (!canReachServer) {
Timber.v("## SendThread retryLoop cannot reach server, wait ts:${System.currentTimeMillis()}")
// schedule to retry
waitForNetwork()
}
}
(exception is Failure.ServerError && exception.error.code == MatrixError.M_LIMIT_EXCEEDED) -> {
task.retryCount++
if (task.retryCount >= 3) task.onTaskFailed()
Timber.v("## SendThread retryLoop retryable error for $task reason: ${exception.localizedMessage}")
// wait a bit
// Todo if its a quota exception can we get timout?
sleep(3_000)
continue@retryLoop
}
exception.isTokenError() -> {
Timber.v("## SendThread retryLoop retryable TOKEN error, interrupt")
// we can exit the loop
task.onTaskFailed()
throw InterruptedException()
}
exception is CancellationException -> {
Timber.v("## SendThread task has been cancelled")
break@retryLoop
}
else -> {
Timber.v("## SendThread retryLoop Un-Retryable error, try next task")
// this task is in error, check next one?
task.onTaskFailed()
break@retryLoop
}
}
}
}
}
markAsFinished(task)
}
} catch (interruptionException: InterruptedException) {
// will be thrown is thread is interrupted while seeping
interrupt()
Timber.v("## InterruptedException!! ${interruptionException.localizedMessage}")
}
// state = State.KILLED
// is this needed?
retryNoNetworkTask?.cancel()
Timber.w("## SendThread finished ${System.currentTimeMillis()}")
}
private fun waitForNetwork() {
retryNoNetworkTask = Timer(SyncState.NoNetwork.toString(), false).schedule(RETRY_WAIT_TIME_MS) {
synchronized(networkAvailableLock) {
canReachServer = checkHostAvailable().also {
Timber.v("## SendThread checkHostAvailable $it")
}
networkAvailableLock.notify()
}
}
synchronized(networkAvailableLock) { networkAvailableLock.wait() }
}
/**
* Check if homeserver is reachable.
*/
private fun checkHostAvailable(): Boolean {
val host = sessionParams.homeServerConnectionConfig.homeServerUri.host ?: return false
val port = sessionParams.homeServerConnectionConfig.homeServerUri.port.takeIf { it != -1 } ?: 80
val timeout = 30_000
try {
Socket().use { socket ->
val inetAddress: InetAddress = InetAddress.getByName(host)
val inetSocketAddress = InetSocketAddress(inetAddress, port)
socket.connect(inetSocketAddress, timeout)
return true
}
} catch (e: IOException) {
Timber.v("## EventSender isHostAvailable failure ${e.localizedMessage}")
return false
}
}
}

View file

@ -0,0 +1,194 @@
/*
* Copyright (c) 2021 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.internal.session.room.send.queue
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import org.matrix.android.sdk.api.auth.data.SessionParams
import org.matrix.android.sdk.api.failure.Failure
import org.matrix.android.sdk.api.failure.MatrixError
import org.matrix.android.sdk.api.session.crypto.CryptoService
import org.matrix.android.sdk.api.session.events.model.Event
import org.matrix.android.sdk.api.util.Cancelable
import org.matrix.android.sdk.internal.session.SessionScope
import org.matrix.android.sdk.internal.task.CoroutineSequencer
import org.matrix.android.sdk.internal.task.SemaphoreCoroutineSequencer
import org.matrix.android.sdk.internal.task.TaskExecutor
import org.matrix.android.sdk.internal.util.toCancelable
import timber.log.Timber
import java.io.IOException
import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicBoolean
import javax.inject.Inject
import kotlin.coroutines.cancellation.CancellationException
private const val RETRY_WAIT_TIME_MS = 10_000L
/**
* This class is responsible for sending events in order in each room. It uses the QueuedTask.queueIdentifier to execute tasks sequentially.
* Each send is retried 3 times, if there is no network (e.g if cannot ping home server) it will wait and
* periodically test reachability before resume (does not count as a retry)
*
* If the app is killed before all event were sent, on next wakeup the scheduled events will be re posted
*
*/
@SessionScope
internal class EventSenderProcessorCoroutine @Inject constructor(
private val cryptoService: CryptoService,
private val sessionParams: SessionParams,
private val queuedTaskFactory: QueuedTaskFactory,
private val taskExecutor: TaskExecutor,
private val memento: QueueMemento,
) : EventSenderProcessor {
/**
* sequencers use QueuedTask.queueIdentifier as key
*/
private val sequencers = ConcurrentHashMap<String, CoroutineSequencer>()
/**
* cancelableBag use QueuedTask.taskIdentifier as key
*/
private val cancelableBag = ConcurrentHashMap<String, Cancelable>()
override fun onSessionStarted() {
// We should check for sending events not handled because app was killed
// But we should be careful of only took those that was submitted to us, because if it's
// for example it's a media event it is handled by some worker and he will handle it
// This is a bit fragile :/
// also some events cannot be retried manually by users, e.g reactions
// they were previously relying on workers to do the work :/ and was expected to always finally succeed
// Also some echos are not to be resent like redaction echos (fake event created for aggregation)
taskExecutor.executorScope.launch {
Timber.d("## Send relaunched pending events on restart")
try {
memento.restoreTasks(this@EventSenderProcessorCoroutine)
} catch (failure: Throwable) {
Timber.e(failure, "Fail restoring send tasks")
}
}
}
override fun postEvent(event: Event): Cancelable {
return postEvent(event, event.roomId?.let { cryptoService.isRoomEncrypted(it) } ?: false)
}
override fun postEvent(event: Event, encrypt: Boolean): Cancelable {
val task = queuedTaskFactory.createSendTask(event, encrypt)
return postTask(task)
}
override fun postRedaction(redactionLocalEcho: Event, reason: String?): Cancelable {
return postRedaction(redactionLocalEcho.eventId!!, redactionLocalEcho.redacts!!, redactionLocalEcho.roomId!!, reason)
}
override fun postRedaction(redactionLocalEchoId: String, eventToRedactId: String, roomId: String, reason: String?): Cancelable {
val task = queuedTaskFactory.createRedactTask(redactionLocalEchoId, eventToRedactId, roomId, reason)
return postTask(task)
}
override fun postTask(task: QueuedTask): Cancelable {
markAsManaged(task)
val sequencer = sequencers.getOrPut(task.queueIdentifier) {
SemaphoreCoroutineSequencer()
}
return taskExecutor.executorScope
.launchWith(sequencer) {
executeTask(task)
}.toCancelable()
.also {
cancelableBag[task.taskIdentifier]
}
}
override fun cancel(eventId: String, roomId: String) {
// eventId is the taskIdentifier
cancelableBag[eventId]?.cancel()
}
private fun CoroutineScope.launchWith(sequencer: CoroutineSequencer, block: suspend CoroutineScope.() -> Unit) = launch {
sequencer.post {
block()
}
}
private suspend fun executeTask(task: QueuedTask) {
try {
if (task.isCancelled()) {
Timber.v("## task has been cancelled")
return
}
waitForNetwork()
Timber.v("## execute task with id: ${task.queueIdentifier}")
task.execute()
} catch (exception: Throwable) {
when {
exception is IOException || exception is Failure.NetworkConnection -> {
canReachServer.set(false)
task.retryCount++
if (task.retryCount >= 3) task.onTaskFailed()
Timber.v("## retryable error for $task reason: ${exception.localizedMessage}")
// Retry
executeTask(task)
}
(exception is Failure.ServerError && exception.error.code == MatrixError.M_LIMIT_EXCEEDED) -> {
task.retryCount++
if (task.retryCount >= 3) task.onTaskFailed()
Timber.v("## retryable error for $task reason: ${exception.localizedMessage}")
// Wait some time
delay(exception.error.retryAfterMillis?.plus(100) ?: 3_000)
// and retry
executeTask(task)
}
exception is CancellationException -> {
Timber.v("## task has been cancelled")
}
else -> {
Timber.v("## Un-Retryable error, try next task")
// this task is in error, check next one?
task.onTaskFailed()
}
}
}
markAsFinished(task)
}
private fun markAsManaged(task: QueuedTask) {
memento.track(task)
}
private fun markAsFinished(task: QueuedTask) {
cancelableBag.remove(task.taskIdentifier)
memento.unTrack(task)
}
private val canReachServer = AtomicBoolean(true)
private suspend fun waitForNetwork() {
while (!canReachServer.get()) {
Timber.v("## Cannot reach server, wait ts:${System.currentTimeMillis()}")
delay(RETRY_WAIT_TIME_MS)
withContext(Dispatchers.IO) {
val hostAvailable = HomeServerAvailabilityChecker(sessionParams).check()
canReachServer.set(hostAvailable)
}
}
}
}

View file

@ -0,0 +1,240 @@
/*
* Copyright 2020 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.internal.session.room.send.queue
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.matrix.android.sdk.api.auth.data.SessionParams
import org.matrix.android.sdk.api.auth.data.sessionId
import org.matrix.android.sdk.api.extensions.tryOrNull
import org.matrix.android.sdk.api.failure.Failure
import org.matrix.android.sdk.api.failure.MatrixError
import org.matrix.android.sdk.api.failure.isTokenError
import org.matrix.android.sdk.api.session.crypto.CryptoService
import org.matrix.android.sdk.api.session.events.model.Event
import org.matrix.android.sdk.api.session.sync.SyncState
import org.matrix.android.sdk.api.util.Cancelable
import org.matrix.android.sdk.internal.session.SessionScope
import org.matrix.android.sdk.internal.task.TaskExecutor
import timber.log.Timber
import java.io.IOException
import java.net.InetAddress
import java.net.InetSocketAddress
import java.net.Socket
import java.util.Timer
import java.util.TimerTask
import java.util.concurrent.LinkedBlockingQueue
import javax.inject.Inject
import kotlin.concurrent.schedule
/**
* A simple ever running thread unique for that session responsible of sending events in order.
* Each send is retried 3 times, if there is no network (e.g if cannot ping home server) it will wait and
* periodically test reachability before resume (does not count as a retry)
*
* If the app is killed before all event were sent, on next wakeup the scheduled events will be re posted
*/
@SessionScope
internal class EventSenderProcessorThread @Inject constructor(
private val cryptoService: CryptoService,
private val sessionParams: SessionParams,
private val queuedTaskFactory: QueuedTaskFactory,
private val taskExecutor: TaskExecutor,
private val memento: QueueMemento
) : Thread("SENDER_THREAD_SID_${sessionParams.credentials.sessionId()}"), EventSenderProcessor {
private fun markAsManaged(task: QueuedTask) {
memento.track(task)
}
private fun markAsFinished(task: QueuedTask) {
memento.unTrack(task)
}
override fun onSessionStarted() {
start()
}
override fun onSessionStopped() {
interrupt()
}
override fun start() {
super.start()
// We should check for sending events not handled because app was killed
// But we should be careful of only took those that was submitted to us, because if it's
// for example it's a media event it is handled by some worker and he will handle it
// This is a bit fragile :/
// also some events cannot be retried manually by users, e.g reactions
// they were previously relying on workers to do the work :/ and was expected to always finally succeed
// Also some echos are not to be resent like redaction echos (fake event created for aggregation)
tryOrNull {
taskExecutor.executorScope.launch {
Timber.d("## Send relaunched pending events on restart")
memento.restoreTasks(this@EventSenderProcessorThread)
}
}
}
// API
override fun postEvent(event: Event): Cancelable {
return postEvent(event, event.roomId?.let { cryptoService.isRoomEncrypted(it) } ?: false)
}
override fun postEvent(event: Event, encrypt: Boolean): Cancelable {
val task = queuedTaskFactory.createSendTask(event, encrypt)
return postTask(task)
}
override fun postRedaction(redactionLocalEcho: Event, reason: String?): Cancelable {
return postRedaction(redactionLocalEcho.eventId!!, redactionLocalEcho.redacts!!, redactionLocalEcho.roomId!!, reason)
}
override fun postRedaction(redactionLocalEchoId: String, eventToRedactId: String, roomId: String, reason: String?): Cancelable {
val task = queuedTaskFactory.createRedactTask(redactionLocalEchoId, eventToRedactId, roomId, reason)
return postTask(task)
}
override fun postTask(task: QueuedTask): Cancelable {
// non blocking add to queue
sendingQueue.add(task)
markAsManaged(task)
return task
}
override fun cancel(eventId: String, roomId: String) {
(currentTask as? SendEventQueuedTask)
?.takeIf { it -> it.event.eventId == eventId && it.event.roomId == roomId }
?.cancel()
}
companion object {
private const val RETRY_WAIT_TIME_MS = 10_000L
}
private var currentTask: QueuedTask? = null
private var sendingQueue = LinkedBlockingQueue<QueuedTask>()
private var networkAvailableLock = Object()
private var canReachServer = true
private var retryNoNetworkTask: TimerTask? = null
override fun run() {
Timber.v("## SendThread started ts:${System.currentTimeMillis()}")
try {
while (!isInterrupted) {
Timber.v("## SendThread wait for task to process")
val task = sendingQueue.take()
.also { currentTask = it }
Timber.v("## SendThread Found task to process $task")
if (task.isCancelled()) {
Timber.v("## SendThread send cancelled for $task")
// we do not execute this one
continue
}
// we check for network connectivity
while (!canReachServer) {
Timber.v("## SendThread cannot reach server, wait ts:${System.currentTimeMillis()}")
// schedule to retry
waitForNetwork()
// if thread as been killed meanwhile
// if (state == State.KILLING) break
}
Timber.v("## Server is Reachable")
// so network is available
runBlocking {
retryLoop@ while (task.retryCount < 3) {
try {
// SendPerformanceProfiler.startStage(task.event.eventId!!, SendPerformanceProfiler.Stages.SEND_WORKER)
Timber.v("## SendThread retryLoop for $task retryCount ${task.retryCount}")
task.execute()
// sendEventTask.execute(SendEventTask.Params(task.event, task.encrypt, cryptoService))
// SendPerformanceProfiler.stopStage(task.event.eventId, SendPerformanceProfiler.Stages.SEND_WORKER)
break@retryLoop
} catch (exception: Throwable) {
when {
exception is IOException || exception is Failure.NetworkConnection -> {
canReachServer = false
task.retryCount++
if (task.retryCount >= 3) task.onTaskFailed()
while (!canReachServer) {
Timber.v("## SendThread retryLoop cannot reach server, wait ts:${System.currentTimeMillis()}")
// schedule to retry
waitForNetwork()
}
}
(exception is Failure.ServerError && exception.error.code == MatrixError.M_LIMIT_EXCEEDED) -> {
task.retryCount++
if (task.retryCount >= 3) task.onTaskFailed()
Timber.v("## SendThread retryLoop retryable error for $task reason: ${exception.localizedMessage}")
// wait a bit
// Todo if its a quota exception can we get timout?
sleep(3_000)
continue@retryLoop
}
exception.isTokenError() -> {
Timber.v("## SendThread retryLoop retryable TOKEN error, interrupt")
// we can exit the loop
task.onTaskFailed()
throw InterruptedException()
}
exception is CancellationException -> {
Timber.v("## SendThread task has been cancelled")
break@retryLoop
}
else -> {
Timber.v("## SendThread retryLoop Un-Retryable error, try next task")
// this task is in error, check next one?
task.onTaskFailed()
break@retryLoop
}
}
}
}
}
markAsFinished(task)
}
} catch (interruptionException: InterruptedException) {
// will be thrown is thread is interrupted while seeping
interrupt()
Timber.v("## InterruptedException!! ${interruptionException.localizedMessage}")
}
// state = State.KILLED
// is this needed?
retryNoNetworkTask?.cancel()
Timber.w("## SendThread finished ${System.currentTimeMillis()}")
}
private fun waitForNetwork() {
retryNoNetworkTask = Timer(SyncState.NoNetwork.toString(), false).schedule(RETRY_WAIT_TIME_MS) {
synchronized(networkAvailableLock) {
canReachServer = HomeServerAvailabilityChecker(sessionParams).check().also {
Timber.v("## SendThread checkHostAvailable $it")
}
networkAvailableLock.notify()
}
}
synchronized(networkAvailableLock) { networkAvailableLock.wait() }
}
}

View file

@ -0,0 +1,45 @@
/*
* Copyright (c) 2021 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.internal.session.room.send.queue
import org.matrix.android.sdk.api.auth.data.SessionParams
import timber.log.Timber
import java.io.IOException
import java.net.InetAddress
import java.net.InetSocketAddress
import java.net.Socket
internal class HomeServerAvailabilityChecker(val sessionParams: SessionParams) {
fun check(): Boolean {
val host = sessionParams.homeServerConnectionConfig.homeServerUri.host ?: return false
val port = sessionParams.homeServerConnectionConfig.homeServerUri.port.takeIf { it != -1 } ?: 80
val timeout = 30_000
try {
Socket().use { socket ->
val inetAddress: InetAddress = InetAddress.getByName(host)
val inetSocketAddress = InetSocketAddress(inetAddress, port)
socket.connect(inetSocketAddress, timeout)
return true
}
} catch (e: IOException) {
Timber.v("## EventSender isHostAvailable failure ${e.localizedMessage}")
return false
}
}
}

View file

@ -32,6 +32,9 @@ import javax.inject.Inject
* It is just used to remember what events/localEchos was managed by the event sender in order to
* reschedule them (and only them) on next restart
*/
private const val PERSISTENCE_KEY = "ManagedBySender"
internal class QueueMemento @Inject constructor(context: Context,
@SessionId sessionId: String,
private val queuedTaskFactory: QueuedTaskFactory,
@ -39,28 +42,28 @@ internal class QueueMemento @Inject constructor(context: Context,
private val cryptoService: CryptoService) {
private val storage = context.getSharedPreferences("QueueMemento_$sessionId", Context.MODE_PRIVATE)
private val managedTaskInfos = mutableListOf<QueuedTask>()
private val trackedTasks = mutableListOf<QueuedTask>()
fun track(task: QueuedTask) {
synchronized(managedTaskInfos) {
managedTaskInfos.add(task)
persist()
}
fun track(task: QueuedTask) = synchronized(trackedTasks) {
trackedTasks.add(task)
persist()
}
fun unTrack(task: QueuedTask) {
synchronized(managedTaskInfos) {
managedTaskInfos.remove(task)
persist()
}
fun unTrack(task: QueuedTask) = synchronized(trackedTasks) {
trackedTasks.remove(task)
persist()
}
fun trackedTasks() = synchronized(trackedTasks){
}
private fun persist() {
managedTaskInfos.mapIndexedNotNull { index, queuedTask ->
trackedTasks.mapIndexedNotNull { index, queuedTask ->
toTaskInfo(queuedTask, index)?.let { TaskInfo.map(it) }
}.toSet().let { set ->
storage.edit()
.putStringSet("ManagedBySender", set)
.putStringSet(PERSISTENCE_KEY, set)
.apply()
}
}
@ -82,7 +85,7 @@ internal class QueueMemento @Inject constructor(context: Context,
suspend fun restoreTasks(eventProcessor: EventSenderProcessor) {
// events should be restarted in correct order
storage.getStringSet("ManagedBySender", null)?.let { pending ->
storage.getStringSet(PERSISTENCE_KEY, null)?.let { pending ->
Timber.d("## Send - Recovering unsent events $pending")
pending.mapNotNull { tryOrNull { TaskInfo.map(it) } }
}

View file

@ -18,7 +18,17 @@ package org.matrix.android.sdk.internal.session.room.send.queue
import org.matrix.android.sdk.api.util.Cancelable
abstract class QueuedTask : Cancelable {
/**
* @param queueIdentifier String value to identify a unique Queue
* @param taskIdentifier String value to identify a unique Task. Should be different from queueIdentifier
*/
internal abstract class QueuedTask(
val queueIdentifier: String,
val taskIdentifier: String
) : Cancelable {
override fun toString() = "queueIdentifier: $queueIdentifier, taskIdentifier: ${taskIdentifier})"
var retryCount = 0
private var hasBeenCancelled: Boolean = false

View file

@ -29,9 +29,7 @@ internal class RedactQueuedTask(
private val redactEventTask: RedactEventTask,
private val localEchoRepository: LocalEchoRepository,
private val cancelSendTracker: CancelSendTracker
) : QueuedTask() {
override fun toString() = "[RedactQueuedTask $redactionLocalEchoId]"
) : QueuedTask(queueIdentifier = roomId, taskIdentifier = redactionLocalEchoId) {
override suspend fun doExecute() {
redactEventTask.execute(RedactEventTask.Params(redactionLocalEchoId, roomId, toRedactEventId, reason))

View file

@ -31,9 +31,7 @@ internal class SendEventQueuedTask(
val cryptoService: CryptoService,
val localEchoRepository: LocalEchoRepository,
val cancelSendTracker: CancelSendTracker
) : QueuedTask() {
override fun toString() = "[SendEventQueuedTask ${event.eventId}]"
) : QueuedTask(queueIdentifier = event.roomId!!, taskIdentifier = event.eventId!!) {
override suspend fun doExecute() {
sendEventTask.execute(SendEventTask.Params(event, encrypt))

View file

@ -36,7 +36,7 @@ internal interface TaskInfo {
const val TYPE_SEND = "TYPE_SEND"
const val TYPE_REDACT = "TYPE_REDACT"
val moshi = Moshi.Builder()
private val moshi = Moshi.Builder()
.add(RuntimeJsonAdapterFactory.of(TaskInfo::class.java, "type", FallbackTaskInfo::class.java)
.registerSubtype(SendEventTaskInfo::class.java, TYPE_SEND)
.registerSubtype(RedactEventTaskInfo::class.java, TYPE_REDACT)
@ -71,6 +71,6 @@ internal data class RedactEventTaskInfo(
@JsonClass(generateAdapter = true)
internal data class FallbackTaskInfo(
@Json(name = "type") override val type: String = TaskInfo.TYPE_REDACT,
@Json(name = "type") override val type: String = TaskInfo.TYPE_UNKNOWN,
@Json(name = "order") override val order: Int
) : TaskInfo

View file

@ -37,12 +37,12 @@ internal class DefaultWidgetURLFormatter @Inject constructor(private val integra
private lateinit var currentConfig: IntegrationManagerConfig
private var whiteListedUrls: List<String> = emptyList()
override fun onStart() {
override fun onSessionStarted() {
setupWithConfiguration()
integrationManager.addListener(this)
}
override fun onStop() {
override fun onSessionStopped() {
integrationManager.removeListener(this)
}

View file

@ -62,12 +62,12 @@ internal class WidgetManager @Inject constructor(private val integrationManager:
private val lifecycleOwner: LifecycleOwner = LifecycleOwner { lifecycleRegistry }
private val lifecycleRegistry: LifecycleRegistry = LifecycleRegistry(lifecycleOwner)
override fun onStart() {
override fun onSessionStarted() {
lifecycleRegistry.currentState = Lifecycle.State.STARTED
integrationManager.addListener(this)
}
override fun onStop() {
override fun onSessionStopped() {
integrationManager.removeListener(this)
lifecycleRegistry.currentState = Lifecycle.State.DESTROYED
}