mirror of
https://github.com/element-hq/element-android
synced 2024-11-27 11:59:12 +03:00
Merge pull request #2978 from vector-im/feature/fga/send_queue_rework
Feature/fga/send queue rework
This commit is contained in:
commit
1fe8dfa810
20 changed files with 552 additions and 270 deletions
|
@ -11,6 +11,7 @@ Improvements 🙌:
|
|||
- PIP support for Jitsi call (#2418)
|
||||
- Add tooltip for room quick actions
|
||||
- Pre-share session keys when opening a room or start typing (#2771)
|
||||
- Sending is now queuing by room and not uniquely to the session
|
||||
|
||||
Bugfix 🐛:
|
||||
- Try to fix crash about UrlPreview (#2640)
|
||||
|
|
|
@ -47,7 +47,7 @@ private const val MIN_NUMBER_OF_EVENTS_BY_CHUNK = 300
|
|||
internal class DatabaseCleaner @Inject constructor(@SessionDatabase private val realmConfiguration: RealmConfiguration,
|
||||
private val taskExecutor: TaskExecutor) : SessionLifecycleObserver {
|
||||
|
||||
override fun onStart() {
|
||||
override fun onSessionStarted() {
|
||||
taskExecutor.executorScope.launch(Dispatchers.Default) {
|
||||
awaitTransaction(realmConfiguration) { realm ->
|
||||
val allRooms = realm.where(RoomEntity::class.java).findAll()
|
||||
|
|
|
@ -46,7 +46,7 @@ internal abstract class RealmLiveEntityObserver<T : RealmObject>(protected val r
|
|||
private val backgroundRealm = AtomicReference<Realm>()
|
||||
private lateinit var results: AtomicReference<RealmResults<T>>
|
||||
|
||||
override fun onStart() {
|
||||
override fun onSessionStarted() {
|
||||
if (isStarted.compareAndSet(false, true)) {
|
||||
BACKGROUND_HANDLER.post {
|
||||
val realm = Realm.getInstance(realmConfiguration)
|
||||
|
@ -58,7 +58,7 @@ internal abstract class RealmLiveEntityObserver<T : RealmObject>(protected val r
|
|||
}
|
||||
}
|
||||
|
||||
override fun onStop() {
|
||||
override fun onSessionStopped() {
|
||||
if (isStarted.compareAndSet(true, false)) {
|
||||
BACKGROUND_HANDLER.post {
|
||||
results.getAndSet(null).removeAllChangeListeners()
|
||||
|
|
|
@ -44,14 +44,14 @@ internal class RealmSessionProvider @Inject constructor(@SessionDatabase private
|
|||
}
|
||||
|
||||
@MainThread
|
||||
override fun onStart() {
|
||||
override fun onSessionStarted() {
|
||||
realmThreadLocal.getOrSet {
|
||||
Realm.getInstance(monarchy.realmConfiguration)
|
||||
}
|
||||
}
|
||||
|
||||
@MainThread
|
||||
override fun onStop() {
|
||||
override fun onSessionStopped() {
|
||||
realmThreadLocal.get()?.close()
|
||||
realmThreadLocal.remove()
|
||||
}
|
||||
|
|
|
@ -65,7 +65,6 @@ import org.matrix.android.sdk.internal.di.UnauthenticatedWithCertificate
|
|||
import org.matrix.android.sdk.internal.di.WorkManagerProvider
|
||||
import org.matrix.android.sdk.internal.network.GlobalErrorHandler
|
||||
import org.matrix.android.sdk.internal.session.identity.DefaultIdentityService
|
||||
import org.matrix.android.sdk.internal.session.room.send.queue.EventSenderProcessor
|
||||
import org.matrix.android.sdk.internal.session.sync.SyncTokenStore
|
||||
import org.matrix.android.sdk.internal.session.sync.job.SyncThread
|
||||
import org.matrix.android.sdk.internal.session.sync.job.SyncWorker
|
||||
|
@ -120,8 +119,7 @@ internal class DefaultSession @Inject constructor(
|
|||
private val thirdPartyService: Lazy<ThirdPartyService>,
|
||||
private val callSignalingService: Lazy<CallSignalingService>,
|
||||
@UnauthenticatedWithCertificate
|
||||
private val unauthenticatedWithCertificateOkHttpClient: Lazy<OkHttpClient>,
|
||||
private val eventSenderProcessor: EventSenderProcessor
|
||||
private val unauthenticatedWithCertificateOkHttpClient: Lazy<OkHttpClient>
|
||||
) : Session,
|
||||
RoomService by roomService.get(),
|
||||
RoomDirectoryService by roomDirectoryService.get(),
|
||||
|
@ -158,10 +156,9 @@ internal class DefaultSession @Inject constructor(
|
|||
isOpen = true
|
||||
cryptoService.get().ensureDevice()
|
||||
uiHandler.post {
|
||||
lifecycleObservers.forEach { it.onStart() }
|
||||
lifecycleObservers.forEach { it.onSessionStarted() }
|
||||
}
|
||||
globalErrorHandler.listener = this
|
||||
eventSenderProcessor.start()
|
||||
}
|
||||
|
||||
override fun requireBackgroundSync() {
|
||||
|
@ -200,12 +197,11 @@ internal class DefaultSession @Inject constructor(
|
|||
stopSync()
|
||||
// timelineEventDecryptor.destroy()
|
||||
uiHandler.post {
|
||||
lifecycleObservers.forEach { it.onStop() }
|
||||
lifecycleObservers.forEach { it.onSessionStopped() }
|
||||
}
|
||||
cryptoService.get().close()
|
||||
isOpen = false
|
||||
globalErrorHandler.listener = null
|
||||
eventSenderProcessor.interrupt()
|
||||
}
|
||||
|
||||
override fun getSyncStateLive() = getSyncThread().liveState()
|
||||
|
|
|
@ -27,7 +27,7 @@ internal interface SessionLifecycleObserver {
|
|||
Called when the session is opened
|
||||
*/
|
||||
@MainThread
|
||||
fun onStart() {
|
||||
fun onSessionStarted() {
|
||||
// noop
|
||||
}
|
||||
|
||||
|
@ -43,7 +43,7 @@ internal interface SessionLifecycleObserver {
|
|||
Called when the session is closed
|
||||
*/
|
||||
@MainThread
|
||||
fun onStop() {
|
||||
fun onSessionStopped() {
|
||||
// noop
|
||||
}
|
||||
}
|
||||
|
|
|
@ -83,6 +83,8 @@ import org.matrix.android.sdk.internal.session.permalinks.DefaultPermalinkServic
|
|||
import org.matrix.android.sdk.internal.session.room.EventRelationsAggregationProcessor
|
||||
import org.matrix.android.sdk.internal.session.room.create.RoomCreateEventProcessor
|
||||
import org.matrix.android.sdk.internal.session.room.prune.RedactionEventProcessor
|
||||
import org.matrix.android.sdk.internal.session.room.send.queue.EventSenderProcessor
|
||||
import org.matrix.android.sdk.internal.session.room.send.queue.EventSenderProcessorCoroutine
|
||||
import org.matrix.android.sdk.internal.session.room.tombstone.RoomTombstoneEventProcessor
|
||||
import org.matrix.android.sdk.internal.session.securestorage.DefaultSecureStorageService
|
||||
import org.matrix.android.sdk.internal.session.typing.DefaultTypingUsersTracker
|
||||
|
@ -339,6 +341,10 @@ internal abstract class SessionModule {
|
|||
@IntoSet
|
||||
abstract fun bindRealmSessionProvider(provider: RealmSessionProvider): SessionLifecycleObserver
|
||||
|
||||
@Binds
|
||||
@IntoSet
|
||||
abstract fun bindEventSenderProcessorAsSessionLifecycleObserver(processor: EventSenderProcessorCoroutine): SessionLifecycleObserver
|
||||
|
||||
@Binds
|
||||
abstract fun bindInitialSyncProgressService(service: DefaultInitialSyncProgressService): InitialSyncProgressService
|
||||
|
||||
|
@ -362,4 +368,7 @@ internal abstract class SessionModule {
|
|||
|
||||
@Binds
|
||||
abstract fun bindRedactEventTask(task: DefaultRedactEventTask): RedactEventTask
|
||||
|
||||
@Binds
|
||||
abstract fun bindEventSenderProcessor(processor: EventSenderProcessorCoroutine): EventSenderProcessor
|
||||
}
|
||||
|
|
|
@ -92,7 +92,7 @@ internal class DefaultIdentityService @Inject constructor(
|
|||
|
||||
private val listeners = mutableSetOf<IdentityServiceListener>()
|
||||
|
||||
override fun onStart() {
|
||||
override fun onSessionStarted() {
|
||||
lifecycleRegistry.currentState = Lifecycle.State.STARTED
|
||||
// Observe the account data change
|
||||
accountDataDataSource
|
||||
|
@ -117,7 +117,7 @@ internal class DefaultIdentityService @Inject constructor(
|
|||
}
|
||||
}
|
||||
|
||||
override fun onStop() {
|
||||
override fun onSessionStopped() {
|
||||
lifecycleRegistry.currentState = Lifecycle.State.DESTROYED
|
||||
}
|
||||
|
||||
|
|
|
@ -77,7 +77,7 @@ internal class IntegrationManager @Inject constructor(matrixConfiguration: Matri
|
|||
currentConfigs.add(defaultConfig)
|
||||
}
|
||||
|
||||
override fun onStart() {
|
||||
override fun onSessionStarted() {
|
||||
lifecycleRegistry.currentState = Lifecycle.State.STARTED
|
||||
observeWellknownConfig()
|
||||
accountDataDataSource
|
||||
|
@ -105,7 +105,7 @@ internal class IntegrationManager @Inject constructor(matrixConfiguration: Matri
|
|||
}
|
||||
}
|
||||
|
||||
override fun onStop() {
|
||||
override fun onSessionStopped() {
|
||||
lifecycleRegistry.currentState = Lifecycle.State.DESTROYED
|
||||
}
|
||||
|
||||
|
|
|
@ -1,11 +1,11 @@
|
|||
/*
|
||||
* Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
* Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
|
@ -16,235 +16,21 @@
|
|||
|
||||
package org.matrix.android.sdk.internal.session.room.send.queue
|
||||
|
||||
import kotlinx.coroutines.CancellationException
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import org.matrix.android.sdk.api.auth.data.SessionParams
|
||||
import org.matrix.android.sdk.api.auth.data.sessionId
|
||||
import org.matrix.android.sdk.api.extensions.tryOrNull
|
||||
import org.matrix.android.sdk.api.failure.Failure
|
||||
import org.matrix.android.sdk.api.failure.MatrixError
|
||||
import org.matrix.android.sdk.api.failure.isTokenError
|
||||
import org.matrix.android.sdk.api.session.crypto.CryptoService
|
||||
import org.matrix.android.sdk.api.session.events.model.Event
|
||||
import org.matrix.android.sdk.api.session.sync.SyncState
|
||||
import org.matrix.android.sdk.api.util.Cancelable
|
||||
import org.matrix.android.sdk.internal.session.SessionScope
|
||||
import org.matrix.android.sdk.internal.task.TaskExecutor
|
||||
import timber.log.Timber
|
||||
import java.io.IOException
|
||||
import java.net.InetAddress
|
||||
import java.net.InetSocketAddress
|
||||
import java.net.Socket
|
||||
import java.util.Timer
|
||||
import java.util.TimerTask
|
||||
import java.util.concurrent.LinkedBlockingQueue
|
||||
import javax.inject.Inject
|
||||
import kotlin.concurrent.schedule
|
||||
import org.matrix.android.sdk.internal.session.SessionLifecycleObserver
|
||||
|
||||
/**
|
||||
* A simple ever running thread unique for that session responsible of sending events in order.
|
||||
* Each send is retried 3 times, if there is no network (e.g if cannot ping home server) it will wait and
|
||||
* periodically test reachability before resume (does not count as a retry)
|
||||
*
|
||||
* If the app is killed before all event were sent, on next wakeup the scheduled events will be re posted
|
||||
*/
|
||||
@SessionScope
|
||||
internal class EventSenderProcessor @Inject constructor(
|
||||
private val cryptoService: CryptoService,
|
||||
private val sessionParams: SessionParams,
|
||||
private val queuedTaskFactory: QueuedTaskFactory,
|
||||
private val taskExecutor: TaskExecutor,
|
||||
private val memento: QueueMemento
|
||||
) : Thread("SENDER_THREAD_SID_${sessionParams.credentials.sessionId()}") {
|
||||
internal interface EventSenderProcessor: SessionLifecycleObserver {
|
||||
|
||||
private fun markAsManaged(task: QueuedTask) {
|
||||
memento.track(task)
|
||||
}
|
||||
fun postEvent(event: Event): Cancelable
|
||||
|
||||
private fun markAsFinished(task: QueuedTask) {
|
||||
memento.unTrack(task)
|
||||
}
|
||||
fun postEvent(event: Event, encrypt: Boolean): Cancelable
|
||||
|
||||
// API
|
||||
fun postEvent(event: Event): Cancelable {
|
||||
return postEvent(event, event.roomId?.let { cryptoService.isRoomEncrypted(it) } ?: false)
|
||||
}
|
||||
fun postRedaction(redactionLocalEcho: Event, reason: String?): Cancelable
|
||||
|
||||
override fun start() {
|
||||
super.start()
|
||||
// We should check for sending events not handled because app was killed
|
||||
// But we should be careful of only took those that was submitted to us, because if it's
|
||||
// for example it's a media event it is handled by some worker and he will handle it
|
||||
// This is a bit fragile :/
|
||||
// also some events cannot be retried manually by users, e.g reactions
|
||||
// they were previously relying on workers to do the work :/ and was expected to always finally succeed
|
||||
// Also some echos are not to be resent like redaction echos (fake event created for aggregation)
|
||||
fun postRedaction(redactionLocalEchoId: String, eventToRedactId: String, roomId: String, reason: String?): Cancelable
|
||||
|
||||
tryOrNull {
|
||||
taskExecutor.executorScope.launch {
|
||||
Timber.d("## Send relaunched pending events on restart")
|
||||
memento.restoreTasks(this@EventSenderProcessor)
|
||||
}
|
||||
}
|
||||
}
|
||||
fun postTask(task: QueuedTask): Cancelable
|
||||
|
||||
fun postEvent(event: Event, encrypt: Boolean): Cancelable {
|
||||
val task = queuedTaskFactory.createSendTask(event, encrypt)
|
||||
return postTask(task)
|
||||
}
|
||||
|
||||
fun postRedaction(redactionLocalEcho: Event, reason: String?): Cancelable {
|
||||
return postRedaction(redactionLocalEcho.eventId!!, redactionLocalEcho.redacts!!, redactionLocalEcho.roomId!!, reason)
|
||||
}
|
||||
|
||||
fun postRedaction(redactionLocalEchoId: String, eventToRedactId: String, roomId: String, reason: String?): Cancelable {
|
||||
val task = queuedTaskFactory.createRedactTask(redactionLocalEchoId, eventToRedactId, roomId, reason)
|
||||
return postTask(task)
|
||||
}
|
||||
|
||||
fun postTask(task: QueuedTask): Cancelable {
|
||||
// non blocking add to queue
|
||||
sendingQueue.add(task)
|
||||
markAsManaged(task)
|
||||
return task
|
||||
}
|
||||
|
||||
fun cancel(eventId: String, roomId: String) {
|
||||
(currentTask as? SendEventQueuedTask)
|
||||
?.takeIf { it -> it.event.eventId == eventId && it.event.roomId == roomId }
|
||||
?.cancel()
|
||||
}
|
||||
|
||||
companion object {
|
||||
private const val RETRY_WAIT_TIME_MS = 10_000L
|
||||
}
|
||||
|
||||
private var currentTask: QueuedTask? = null
|
||||
|
||||
private var sendingQueue = LinkedBlockingQueue<QueuedTask>()
|
||||
|
||||
private var networkAvailableLock = Object()
|
||||
private var canReachServer = true
|
||||
private var retryNoNetworkTask: TimerTask? = null
|
||||
|
||||
override fun run() {
|
||||
Timber.v("## SendThread started ts:${System.currentTimeMillis()}")
|
||||
try {
|
||||
while (!isInterrupted) {
|
||||
Timber.v("## SendThread wait for task to process")
|
||||
val task = sendingQueue.take()
|
||||
.also { currentTask = it }
|
||||
Timber.v("## SendThread Found task to process $task")
|
||||
|
||||
if (task.isCancelled()) {
|
||||
Timber.v("## SendThread send cancelled for $task")
|
||||
// we do not execute this one
|
||||
continue
|
||||
}
|
||||
// we check for network connectivity
|
||||
while (!canReachServer) {
|
||||
Timber.v("## SendThread cannot reach server, wait ts:${System.currentTimeMillis()}")
|
||||
// schedule to retry
|
||||
waitForNetwork()
|
||||
// if thread as been killed meanwhile
|
||||
// if (state == State.KILLING) break
|
||||
}
|
||||
Timber.v("## Server is Reachable")
|
||||
// so network is available
|
||||
|
||||
runBlocking {
|
||||
retryLoop@ while (task.retryCount < 3) {
|
||||
try {
|
||||
// SendPerformanceProfiler.startStage(task.event.eventId!!, SendPerformanceProfiler.Stages.SEND_WORKER)
|
||||
Timber.v("## SendThread retryLoop for $task retryCount ${task.retryCount}")
|
||||
task.execute()
|
||||
// sendEventTask.execute(SendEventTask.Params(task.event, task.encrypt, cryptoService))
|
||||
// SendPerformanceProfiler.stopStage(task.event.eventId, SendPerformanceProfiler.Stages.SEND_WORKER)
|
||||
break@retryLoop
|
||||
} catch (exception: Throwable) {
|
||||
when {
|
||||
exception is IOException || exception is Failure.NetworkConnection -> {
|
||||
canReachServer = false
|
||||
task.retryCount++
|
||||
if (task.retryCount >= 3) task.onTaskFailed()
|
||||
while (!canReachServer) {
|
||||
Timber.v("## SendThread retryLoop cannot reach server, wait ts:${System.currentTimeMillis()}")
|
||||
// schedule to retry
|
||||
waitForNetwork()
|
||||
}
|
||||
}
|
||||
(exception is Failure.ServerError && exception.error.code == MatrixError.M_LIMIT_EXCEEDED) -> {
|
||||
task.retryCount++
|
||||
if (task.retryCount >= 3) task.onTaskFailed()
|
||||
Timber.v("## SendThread retryLoop retryable error for $task reason: ${exception.localizedMessage}")
|
||||
// wait a bit
|
||||
// Todo if its a quota exception can we get timout?
|
||||
sleep(3_000)
|
||||
continue@retryLoop
|
||||
}
|
||||
exception.isTokenError() -> {
|
||||
Timber.v("## SendThread retryLoop retryable TOKEN error, interrupt")
|
||||
// we can exit the loop
|
||||
task.onTaskFailed()
|
||||
throw InterruptedException()
|
||||
}
|
||||
exception is CancellationException -> {
|
||||
Timber.v("## SendThread task has been cancelled")
|
||||
break@retryLoop
|
||||
}
|
||||
else -> {
|
||||
Timber.v("## SendThread retryLoop Un-Retryable error, try next task")
|
||||
// this task is in error, check next one?
|
||||
task.onTaskFailed()
|
||||
break@retryLoop
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
markAsFinished(task)
|
||||
}
|
||||
} catch (interruptionException: InterruptedException) {
|
||||
// will be thrown is thread is interrupted while seeping
|
||||
interrupt()
|
||||
Timber.v("## InterruptedException!! ${interruptionException.localizedMessage}")
|
||||
}
|
||||
// state = State.KILLED
|
||||
// is this needed?
|
||||
retryNoNetworkTask?.cancel()
|
||||
Timber.w("## SendThread finished ${System.currentTimeMillis()}")
|
||||
}
|
||||
|
||||
private fun waitForNetwork() {
|
||||
retryNoNetworkTask = Timer(SyncState.NoNetwork.toString(), false).schedule(RETRY_WAIT_TIME_MS) {
|
||||
synchronized(networkAvailableLock) {
|
||||
canReachServer = checkHostAvailable().also {
|
||||
Timber.v("## SendThread checkHostAvailable $it")
|
||||
}
|
||||
networkAvailableLock.notify()
|
||||
}
|
||||
}
|
||||
synchronized(networkAvailableLock) { networkAvailableLock.wait() }
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if homeserver is reachable.
|
||||
*/
|
||||
private fun checkHostAvailable(): Boolean {
|
||||
val host = sessionParams.homeServerConnectionConfig.homeServerUri.host ?: return false
|
||||
val port = sessionParams.homeServerConnectionConfig.homeServerUri.port.takeIf { it != -1 } ?: 80
|
||||
val timeout = 30_000
|
||||
try {
|
||||
Socket().use { socket ->
|
||||
val inetAddress: InetAddress = InetAddress.getByName(host)
|
||||
val inetSocketAddress = InetSocketAddress(inetAddress, port)
|
||||
socket.connect(inetSocketAddress, timeout)
|
||||
return true
|
||||
}
|
||||
} catch (e: IOException) {
|
||||
Timber.v("## EventSender isHostAvailable failure ${e.localizedMessage}")
|
||||
return false
|
||||
}
|
||||
}
|
||||
fun cancel(eventId: String, roomId: String)
|
||||
}
|
||||
|
|
|
@ -0,0 +1,200 @@
|
|||
/*
|
||||
* Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.matrix.android.sdk.internal.session.room.send.queue
|
||||
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.Dispatchers
|
||||
import kotlinx.coroutines.delay
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.withContext
|
||||
import org.matrix.android.sdk.api.auth.data.SessionParams
|
||||
import org.matrix.android.sdk.api.failure.Failure
|
||||
import org.matrix.android.sdk.api.failure.MatrixError
|
||||
import org.matrix.android.sdk.api.session.crypto.CryptoService
|
||||
import org.matrix.android.sdk.api.session.events.model.Event
|
||||
import org.matrix.android.sdk.api.util.Cancelable
|
||||
import org.matrix.android.sdk.internal.session.SessionScope
|
||||
import org.matrix.android.sdk.internal.task.CoroutineSequencer
|
||||
import org.matrix.android.sdk.internal.task.SemaphoreCoroutineSequencer
|
||||
import org.matrix.android.sdk.internal.task.TaskExecutor
|
||||
import org.matrix.android.sdk.internal.util.toCancelable
|
||||
import timber.log.Timber
|
||||
import java.io.IOException
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
import javax.inject.Inject
|
||||
import kotlin.coroutines.cancellation.CancellationException
|
||||
|
||||
private const val RETRY_WAIT_TIME_MS = 10_000L
|
||||
private const val MAX_RETRY_COUNT = 3
|
||||
|
||||
/**
|
||||
* This class is responsible for sending events in order in each room. It uses the QueuedTask.queueIdentifier to execute tasks sequentially.
|
||||
* Each send is retried 3 times, if there is no network (e.g if cannot ping home server) it will wait and
|
||||
* periodically test reachability before resume (does not count as a retry)
|
||||
*
|
||||
* If the app is killed before all event were sent, on next wakeup the scheduled events will be re posted
|
||||
*
|
||||
*/
|
||||
@SessionScope
|
||||
internal class EventSenderProcessorCoroutine @Inject constructor(
|
||||
private val cryptoService: CryptoService,
|
||||
private val sessionParams: SessionParams,
|
||||
private val queuedTaskFactory: QueuedTaskFactory,
|
||||
private val taskExecutor: TaskExecutor,
|
||||
private val memento: QueueMemento
|
||||
) : EventSenderProcessor {
|
||||
|
||||
private val waitForNetworkSequencer = SemaphoreCoroutineSequencer()
|
||||
|
||||
/**
|
||||
* sequencers use QueuedTask.queueIdentifier as key
|
||||
*/
|
||||
private val sequencers = ConcurrentHashMap<String, CoroutineSequencer>()
|
||||
|
||||
/**
|
||||
* cancelableBag use QueuedTask.taskIdentifier as key
|
||||
*/
|
||||
private val cancelableBag = ConcurrentHashMap<String, Cancelable>()
|
||||
|
||||
override fun onSessionStarted() {
|
||||
// We should check for sending events not handled because app was killed
|
||||
// But we should be careful of only took those that was submitted to us, because if it's
|
||||
// for example it's a media event it is handled by some worker and he will handle it
|
||||
// This is a bit fragile :/
|
||||
// also some events cannot be retried manually by users, e.g reactions
|
||||
// they were previously relying on workers to do the work :/ and was expected to always finally succeed
|
||||
// Also some echos are not to be resent like redaction echos (fake event created for aggregation)
|
||||
taskExecutor.executorScope.launch {
|
||||
Timber.d("## Send relaunched pending events on restart")
|
||||
try {
|
||||
memento.restoreTasks(this@EventSenderProcessorCoroutine)
|
||||
} catch (failure: Throwable) {
|
||||
Timber.e(failure, "Fail restoring send tasks")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun postEvent(event: Event): Cancelable {
|
||||
return postEvent(event, event.roomId?.let { cryptoService.isRoomEncrypted(it) } ?: false)
|
||||
}
|
||||
|
||||
override fun postEvent(event: Event, encrypt: Boolean): Cancelable {
|
||||
val task = queuedTaskFactory.createSendTask(event, encrypt)
|
||||
return postTask(task)
|
||||
}
|
||||
|
||||
override fun postRedaction(redactionLocalEcho: Event, reason: String?): Cancelable {
|
||||
return postRedaction(redactionLocalEcho.eventId!!, redactionLocalEcho.redacts!!, redactionLocalEcho.roomId!!, reason)
|
||||
}
|
||||
|
||||
override fun postRedaction(redactionLocalEchoId: String, eventToRedactId: String, roomId: String, reason: String?): Cancelable {
|
||||
val task = queuedTaskFactory.createRedactTask(redactionLocalEchoId, eventToRedactId, roomId, reason)
|
||||
return postTask(task)
|
||||
}
|
||||
|
||||
override fun postTask(task: QueuedTask): Cancelable {
|
||||
markAsManaged(task)
|
||||
val sequencer = sequencers.getOrPut(task.queueIdentifier) {
|
||||
SemaphoreCoroutineSequencer()
|
||||
}
|
||||
Timber.v("## post $task")
|
||||
return taskExecutor.executorScope
|
||||
.launchWith(sequencer) {
|
||||
executeTask(task)
|
||||
}.toCancelable()
|
||||
.also {
|
||||
cancelableBag[task.taskIdentifier] = it
|
||||
}
|
||||
}
|
||||
|
||||
override fun cancel(eventId: String, roomId: String) {
|
||||
// eventId is most likely the taskIdentifier
|
||||
cancelableBag[eventId]?.cancel()
|
||||
}
|
||||
|
||||
private fun CoroutineScope.launchWith(sequencer: CoroutineSequencer, block: suspend CoroutineScope.() -> Unit) = launch {
|
||||
sequencer.post {
|
||||
block()
|
||||
}
|
||||
}
|
||||
|
||||
private suspend fun executeTask(task: QueuedTask) {
|
||||
try {
|
||||
if (task.isCancelled()) {
|
||||
Timber.v("## $task has been cancelled, try next task")
|
||||
return
|
||||
}
|
||||
task.waitForNetwork()
|
||||
task.execute()
|
||||
} catch (exception: Throwable) {
|
||||
when {
|
||||
exception is IOException || exception is Failure.NetworkConnection -> {
|
||||
canReachServer.set(false)
|
||||
task.markAsFailedOrRetry(exception, 0)
|
||||
}
|
||||
(exception is Failure.ServerError && exception.error.code == MatrixError.M_LIMIT_EXCEEDED) -> {
|
||||
val delay = exception.error.retryAfterMillis?.plus(100) ?: 3_000
|
||||
task.markAsFailedOrRetry(exception, delay)
|
||||
}
|
||||
exception is CancellationException -> {
|
||||
Timber.v("## $task has been cancelled, try next task")
|
||||
}
|
||||
else -> {
|
||||
Timber.v("## un-retryable error for $task, try next task")
|
||||
// this task is in error, check next one?
|
||||
task.onTaskFailed()
|
||||
}
|
||||
}
|
||||
}
|
||||
markAsFinished(task)
|
||||
}
|
||||
|
||||
private suspend fun QueuedTask.markAsFailedOrRetry(failure: Throwable, retryDelay: Long) {
|
||||
if (retryCount.incrementAndGet() >= MAX_RETRY_COUNT) {
|
||||
onTaskFailed()
|
||||
} else {
|
||||
Timber.v("## retryable error for $this reason: ${failure.localizedMessage}")
|
||||
// Wait if necessary
|
||||
delay(retryDelay)
|
||||
// And then retry
|
||||
executeTask(this)
|
||||
}
|
||||
}
|
||||
|
||||
private fun markAsManaged(task: QueuedTask) {
|
||||
memento.track(task)
|
||||
}
|
||||
|
||||
private fun markAsFinished(task: QueuedTask) {
|
||||
cancelableBag.remove(task.taskIdentifier)
|
||||
memento.unTrack(task)
|
||||
}
|
||||
|
||||
private val canReachServer = AtomicBoolean(true)
|
||||
|
||||
private suspend fun QueuedTask.waitForNetwork() = waitForNetworkSequencer.post {
|
||||
while (!canReachServer.get()) {
|
||||
Timber.v("## $this cannot reach server wait ts:${System.currentTimeMillis()}")
|
||||
delay(RETRY_WAIT_TIME_MS)
|
||||
withContext(Dispatchers.IO) {
|
||||
val hostAvailable = HomeServerAvailabilityChecker(sessionParams).check()
|
||||
canReachServer.set(hostAvailable)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,234 @@
|
|||
/*
|
||||
* Copyright 2020 The Matrix.org Foundation C.I.C.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.matrix.android.sdk.internal.session.room.send.queue
|
||||
|
||||
import kotlinx.coroutines.CancellationException
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import org.matrix.android.sdk.api.auth.data.SessionParams
|
||||
import org.matrix.android.sdk.api.auth.data.sessionId
|
||||
import org.matrix.android.sdk.api.extensions.tryOrNull
|
||||
import org.matrix.android.sdk.api.failure.Failure
|
||||
import org.matrix.android.sdk.api.failure.MatrixError
|
||||
import org.matrix.android.sdk.api.failure.isTokenError
|
||||
import org.matrix.android.sdk.api.session.crypto.CryptoService
|
||||
import org.matrix.android.sdk.api.session.events.model.Event
|
||||
import org.matrix.android.sdk.api.session.sync.SyncState
|
||||
import org.matrix.android.sdk.api.util.Cancelable
|
||||
import org.matrix.android.sdk.internal.session.SessionScope
|
||||
import org.matrix.android.sdk.internal.task.TaskExecutor
|
||||
import timber.log.Timber
|
||||
import java.io.IOException
|
||||
import java.util.Timer
|
||||
import java.util.TimerTask
|
||||
import java.util.concurrent.LinkedBlockingQueue
|
||||
import javax.inject.Inject
|
||||
import kotlin.concurrent.schedule
|
||||
|
||||
/**
|
||||
* A simple ever running thread unique for that session responsible of sending events in order.
|
||||
* Each send is retried 3 times, if there is no network (e.g if cannot ping home server) it will wait and
|
||||
* periodically test reachability before resume (does not count as a retry)
|
||||
*
|
||||
* If the app is killed before all event were sent, on next wakeup the scheduled events will be re posted
|
||||
*/
|
||||
@Deprecated("You should know use EventSenderProcessorCoroutine instead")
|
||||
@SessionScope
|
||||
internal class EventSenderProcessorThread @Inject constructor(
|
||||
private val cryptoService: CryptoService,
|
||||
private val sessionParams: SessionParams,
|
||||
private val queuedTaskFactory: QueuedTaskFactory,
|
||||
private val taskExecutor: TaskExecutor,
|
||||
private val memento: QueueMemento
|
||||
) : Thread("SENDER_THREAD_SID_${sessionParams.credentials.sessionId()}"), EventSenderProcessor {
|
||||
|
||||
private fun markAsManaged(task: QueuedTask) {
|
||||
memento.track(task)
|
||||
}
|
||||
|
||||
private fun markAsFinished(task: QueuedTask) {
|
||||
memento.unTrack(task)
|
||||
}
|
||||
|
||||
override fun onSessionStarted() {
|
||||
start()
|
||||
}
|
||||
|
||||
override fun onSessionStopped() {
|
||||
interrupt()
|
||||
}
|
||||
|
||||
override fun start() {
|
||||
super.start()
|
||||
// We should check for sending events not handled because app was killed
|
||||
// But we should be careful of only took those that was submitted to us, because if it's
|
||||
// for example it's a media event it is handled by some worker and he will handle it
|
||||
// This is a bit fragile :/
|
||||
// also some events cannot be retried manually by users, e.g reactions
|
||||
// they were previously relying on workers to do the work :/ and was expected to always finally succeed
|
||||
// Also some echos are not to be resent like redaction echos (fake event created for aggregation)
|
||||
|
||||
tryOrNull {
|
||||
taskExecutor.executorScope.launch {
|
||||
Timber.d("## Send relaunched pending events on restart")
|
||||
memento.restoreTasks(this@EventSenderProcessorThread)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// API
|
||||
override fun postEvent(event: Event): Cancelable {
|
||||
return postEvent(event, event.roomId?.let { cryptoService.isRoomEncrypted(it) } ?: false)
|
||||
}
|
||||
|
||||
override fun postEvent(event: Event, encrypt: Boolean): Cancelable {
|
||||
val task = queuedTaskFactory.createSendTask(event, encrypt)
|
||||
return postTask(task)
|
||||
}
|
||||
|
||||
override fun postRedaction(redactionLocalEcho: Event, reason: String?): Cancelable {
|
||||
return postRedaction(redactionLocalEcho.eventId!!, redactionLocalEcho.redacts!!, redactionLocalEcho.roomId!!, reason)
|
||||
}
|
||||
|
||||
override fun postRedaction(redactionLocalEchoId: String, eventToRedactId: String, roomId: String, reason: String?): Cancelable {
|
||||
val task = queuedTaskFactory.createRedactTask(redactionLocalEchoId, eventToRedactId, roomId, reason)
|
||||
return postTask(task)
|
||||
}
|
||||
|
||||
override fun postTask(task: QueuedTask): Cancelable {
|
||||
// non blocking add to queue
|
||||
sendingQueue.add(task)
|
||||
markAsManaged(task)
|
||||
return task
|
||||
}
|
||||
|
||||
override fun cancel(eventId: String, roomId: String) {
|
||||
(currentTask as? SendEventQueuedTask)
|
||||
?.takeIf { it -> it.event.eventId == eventId && it.event.roomId == roomId }
|
||||
?.cancel()
|
||||
}
|
||||
|
||||
companion object {
|
||||
private const val RETRY_WAIT_TIME_MS = 10_000L
|
||||
}
|
||||
|
||||
private var currentTask: QueuedTask? = null
|
||||
|
||||
private var sendingQueue = LinkedBlockingQueue<QueuedTask>()
|
||||
|
||||
private var networkAvailableLock = Object()
|
||||
private var canReachServer = true
|
||||
private var retryNoNetworkTask: TimerTask? = null
|
||||
|
||||
override fun run() {
|
||||
Timber.v("## SendThread started ts:${System.currentTimeMillis()}")
|
||||
try {
|
||||
while (!isInterrupted) {
|
||||
Timber.v("## SendThread wait for task to process")
|
||||
val task = sendingQueue.take()
|
||||
.also { currentTask = it }
|
||||
Timber.v("## SendThread Found task to process $task")
|
||||
|
||||
if (task.isCancelled()) {
|
||||
Timber.v("## SendThread send cancelled for $task")
|
||||
// we do not execute this one
|
||||
continue
|
||||
}
|
||||
// we check for network connectivity
|
||||
while (!canReachServer) {
|
||||
Timber.v("## SendThread cannot reach server, wait ts:${System.currentTimeMillis()}")
|
||||
// schedule to retry
|
||||
waitForNetwork()
|
||||
// if thread as been killed meanwhile
|
||||
// if (state == State.KILLING) break
|
||||
}
|
||||
Timber.v("## Server is Reachable")
|
||||
// so network is available
|
||||
|
||||
runBlocking {
|
||||
retryLoop@ while (task.retryCount.get() < 3) {
|
||||
try {
|
||||
// SendPerformanceProfiler.startStage(task.event.eventId!!, SendPerformanceProfiler.Stages.SEND_WORKER)
|
||||
Timber.v("## SendThread retryLoop for $task retryCount ${task.retryCount}")
|
||||
task.execute()
|
||||
// sendEventTask.execute(SendEventTask.Params(task.event, task.encrypt, cryptoService))
|
||||
// SendPerformanceProfiler.stopStage(task.event.eventId, SendPerformanceProfiler.Stages.SEND_WORKER)
|
||||
break@retryLoop
|
||||
} catch (exception: Throwable) {
|
||||
when {
|
||||
exception is IOException || exception is Failure.NetworkConnection -> {
|
||||
canReachServer = false
|
||||
if (task.retryCount.getAndIncrement() >= 3) task.onTaskFailed()
|
||||
while (!canReachServer) {
|
||||
Timber.v("## SendThread retryLoop cannot reach server, wait ts:${System.currentTimeMillis()}")
|
||||
// schedule to retry
|
||||
waitForNetwork()
|
||||
}
|
||||
}
|
||||
(exception is Failure.ServerError && exception.error.code == MatrixError.M_LIMIT_EXCEEDED) -> {
|
||||
if (task.retryCount.getAndIncrement() >= 3) task.onTaskFailed()
|
||||
Timber.v("## SendThread retryLoop retryable error for $task reason: ${exception.localizedMessage}")
|
||||
// wait a bit
|
||||
// Todo if its a quota exception can we get timout?
|
||||
sleep(3_000)
|
||||
continue@retryLoop
|
||||
}
|
||||
exception.isTokenError() -> {
|
||||
Timber.v("## SendThread retryLoop retryable TOKEN error, interrupt")
|
||||
// we can exit the loop
|
||||
task.onTaskFailed()
|
||||
throw InterruptedException()
|
||||
}
|
||||
exception is CancellationException -> {
|
||||
Timber.v("## SendThread task has been cancelled")
|
||||
break@retryLoop
|
||||
}
|
||||
else -> {
|
||||
Timber.v("## SendThread retryLoop Un-Retryable error, try next task")
|
||||
// this task is in error, check next one?
|
||||
task.onTaskFailed()
|
||||
break@retryLoop
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
markAsFinished(task)
|
||||
}
|
||||
} catch (interruptionException: InterruptedException) {
|
||||
// will be thrown is thread is interrupted while seeping
|
||||
interrupt()
|
||||
Timber.v("## InterruptedException!! ${interruptionException.localizedMessage}")
|
||||
}
|
||||
// state = State.KILLED
|
||||
// is this needed?
|
||||
retryNoNetworkTask?.cancel()
|
||||
Timber.w("## SendThread finished ${System.currentTimeMillis()}")
|
||||
}
|
||||
|
||||
private fun waitForNetwork() {
|
||||
retryNoNetworkTask = Timer(SyncState.NoNetwork.toString(), false).schedule(RETRY_WAIT_TIME_MS) {
|
||||
synchronized(networkAvailableLock) {
|
||||
canReachServer = HomeServerAvailabilityChecker(sessionParams).check().also {
|
||||
Timber.v("## SendThread checkHostAvailable $it")
|
||||
}
|
||||
networkAvailableLock.notify()
|
||||
}
|
||||
}
|
||||
synchronized(networkAvailableLock) { networkAvailableLock.wait() }
|
||||
}
|
||||
}
|
|
@ -0,0 +1,44 @@
|
|||
/*
|
||||
* Copyright 2021 The Matrix.org Foundation C.I.C.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.matrix.android.sdk.internal.session.room.send.queue
|
||||
|
||||
import org.matrix.android.sdk.api.auth.data.SessionParams
|
||||
import timber.log.Timber
|
||||
import java.io.IOException
|
||||
import java.net.InetAddress
|
||||
import java.net.InetSocketAddress
|
||||
import java.net.Socket
|
||||
|
||||
internal class HomeServerAvailabilityChecker(val sessionParams: SessionParams) {
|
||||
|
||||
fun check(): Boolean {
|
||||
val host = sessionParams.homeServerConnectionConfig.homeServerUri.host ?: return false
|
||||
val port = sessionParams.homeServerConnectionConfig.homeServerUri.port.takeIf { it != -1 } ?: 80
|
||||
val timeout = 30_000
|
||||
try {
|
||||
Socket().use { socket ->
|
||||
val inetAddress: InetAddress = InetAddress.getByName(host)
|
||||
val inetSocketAddress = InetSocketAddress(inetAddress, port)
|
||||
socket.connect(inetSocketAddress, timeout)
|
||||
return true
|
||||
}
|
||||
} catch (e: IOException) {
|
||||
Timber.v("## EventSender isHostAvailable failure ${e.localizedMessage}")
|
||||
return false
|
||||
}
|
||||
}
|
||||
}
|
|
@ -32,6 +32,9 @@ import javax.inject.Inject
|
|||
* It is just used to remember what events/localEchos was managed by the event sender in order to
|
||||
* reschedule them (and only them) on next restart
|
||||
*/
|
||||
|
||||
private const val PERSISTENCE_KEY = "ManagedBySender"
|
||||
|
||||
internal class QueueMemento @Inject constructor(context: Context,
|
||||
@SessionId sessionId: String,
|
||||
private val queuedTaskFactory: QueuedTaskFactory,
|
||||
|
@ -39,28 +42,27 @@ internal class QueueMemento @Inject constructor(context: Context,
|
|||
private val cryptoService: CryptoService) {
|
||||
|
||||
private val storage = context.getSharedPreferences("QueueMemento_$sessionId", Context.MODE_PRIVATE)
|
||||
private val managedTaskInfos = mutableListOf<QueuedTask>()
|
||||
private val trackedTasks = mutableListOf<QueuedTask>()
|
||||
|
||||
fun track(task: QueuedTask) {
|
||||
synchronized(managedTaskInfos) {
|
||||
managedTaskInfos.add(task)
|
||||
persist()
|
||||
}
|
||||
fun track(task: QueuedTask) = synchronized(trackedTasks) {
|
||||
trackedTasks.add(task)
|
||||
persist()
|
||||
}
|
||||
|
||||
fun unTrack(task: QueuedTask) {
|
||||
synchronized(managedTaskInfos) {
|
||||
managedTaskInfos.remove(task)
|
||||
persist()
|
||||
}
|
||||
fun unTrack(task: QueuedTask) = synchronized(trackedTasks) {
|
||||
trackedTasks.remove(task)
|
||||
persist()
|
||||
}
|
||||
|
||||
fun trackedTasks() = synchronized(trackedTasks) {
|
||||
}
|
||||
|
||||
private fun persist() {
|
||||
managedTaskInfos.mapIndexedNotNull { index, queuedTask ->
|
||||
trackedTasks.mapIndexedNotNull { index, queuedTask ->
|
||||
toTaskInfo(queuedTask, index)?.let { TaskInfo.map(it) }
|
||||
}.toSet().let { set ->
|
||||
storage.edit()
|
||||
.putStringSet("ManagedBySender", set)
|
||||
.putStringSet(PERSISTENCE_KEY, set)
|
||||
.apply()
|
||||
}
|
||||
}
|
||||
|
@ -82,7 +84,7 @@ internal class QueueMemento @Inject constructor(context: Context,
|
|||
|
||||
suspend fun restoreTasks(eventProcessor: EventSenderProcessor) {
|
||||
// events should be restarted in correct order
|
||||
storage.getStringSet("ManagedBySender", null)?.let { pending ->
|
||||
storage.getStringSet(PERSISTENCE_KEY, null)?.let { pending ->
|
||||
Timber.d("## Send - Recovering unsent events $pending")
|
||||
pending.mapNotNull { tryOrNull { TaskInfo.map(it) } }
|
||||
}
|
||||
|
|
|
@ -17,15 +17,29 @@
|
|||
package org.matrix.android.sdk.internal.session.room.send.queue
|
||||
|
||||
import org.matrix.android.sdk.api.util.Cancelable
|
||||
import timber.log.Timber
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
||||
abstract class QueuedTask : Cancelable {
|
||||
var retryCount = 0
|
||||
/**
|
||||
* @param queueIdentifier String value to identify a unique Queue
|
||||
* @param taskIdentifier String value to identify a unique Task. Should be different from queueIdentifier
|
||||
*/
|
||||
internal abstract class QueuedTask(
|
||||
val queueIdentifier: String,
|
||||
val taskIdentifier: String
|
||||
) : Cancelable {
|
||||
|
||||
override fun toString() = "${javaClass.simpleName} queueIdentifier: $queueIdentifier, taskIdentifier: $taskIdentifier)"
|
||||
|
||||
var retryCount = AtomicInteger(0)
|
||||
|
||||
private var hasBeenCancelled: Boolean = false
|
||||
|
||||
suspend fun execute() {
|
||||
if (!isCancelled()) {
|
||||
Timber.v("Execute: $this start")
|
||||
doExecute()
|
||||
Timber.v("Execute: $this finish")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -29,9 +29,7 @@ internal class RedactQueuedTask(
|
|||
private val redactEventTask: RedactEventTask,
|
||||
private val localEchoRepository: LocalEchoRepository,
|
||||
private val cancelSendTracker: CancelSendTracker
|
||||
) : QueuedTask() {
|
||||
|
||||
override fun toString() = "[RedactQueuedTask $redactionLocalEchoId]"
|
||||
) : QueuedTask(queueIdentifier = roomId, taskIdentifier = redactionLocalEchoId) {
|
||||
|
||||
override suspend fun doExecute() {
|
||||
redactEventTask.execute(RedactEventTask.Params(redactionLocalEchoId, roomId, toRedactEventId, reason))
|
||||
|
|
|
@ -31,9 +31,7 @@ internal class SendEventQueuedTask(
|
|||
val cryptoService: CryptoService,
|
||||
val localEchoRepository: LocalEchoRepository,
|
||||
val cancelSendTracker: CancelSendTracker
|
||||
) : QueuedTask() {
|
||||
|
||||
override fun toString() = "[SendEventQueuedTask ${event.eventId}]"
|
||||
) : QueuedTask(queueIdentifier = event.roomId!!, taskIdentifier = event.eventId!!) {
|
||||
|
||||
override suspend fun doExecute() {
|
||||
sendEventTask.execute(SendEventTask.Params(event, encrypt))
|
||||
|
|
|
@ -36,7 +36,7 @@ internal interface TaskInfo {
|
|||
const val TYPE_SEND = "TYPE_SEND"
|
||||
const val TYPE_REDACT = "TYPE_REDACT"
|
||||
|
||||
val moshi = Moshi.Builder()
|
||||
private val moshi = Moshi.Builder()
|
||||
.add(RuntimeJsonAdapterFactory.of(TaskInfo::class.java, "type", FallbackTaskInfo::class.java)
|
||||
.registerSubtype(SendEventTaskInfo::class.java, TYPE_SEND)
|
||||
.registerSubtype(RedactEventTaskInfo::class.java, TYPE_REDACT)
|
||||
|
@ -71,6 +71,6 @@ internal data class RedactEventTaskInfo(
|
|||
|
||||
@JsonClass(generateAdapter = true)
|
||||
internal data class FallbackTaskInfo(
|
||||
@Json(name = "type") override val type: String = TaskInfo.TYPE_REDACT,
|
||||
@Json(name = "type") override val type: String = TaskInfo.TYPE_UNKNOWN,
|
||||
@Json(name = "order") override val order: Int
|
||||
) : TaskInfo
|
||||
|
|
|
@ -37,12 +37,12 @@ internal class DefaultWidgetURLFormatter @Inject constructor(private val integra
|
|||
private lateinit var currentConfig: IntegrationManagerConfig
|
||||
private var whiteListedUrls: List<String> = emptyList()
|
||||
|
||||
override fun onStart() {
|
||||
override fun onSessionStarted() {
|
||||
setupWithConfiguration()
|
||||
integrationManager.addListener(this)
|
||||
}
|
||||
|
||||
override fun onStop() {
|
||||
override fun onSessionStopped() {
|
||||
integrationManager.removeListener(this)
|
||||
}
|
||||
|
||||
|
|
|
@ -62,12 +62,12 @@ internal class WidgetManager @Inject constructor(private val integrationManager:
|
|||
private val lifecycleOwner: LifecycleOwner = LifecycleOwner { lifecycleRegistry }
|
||||
private val lifecycleRegistry: LifecycleRegistry = LifecycleRegistry(lifecycleOwner)
|
||||
|
||||
override fun onStart() {
|
||||
override fun onSessionStarted() {
|
||||
lifecycleRegistry.currentState = Lifecycle.State.STARTED
|
||||
integrationManager.addListener(this)
|
||||
}
|
||||
|
||||
override fun onStop() {
|
||||
override fun onSessionStopped() {
|
||||
integrationManager.removeListener(this)
|
||||
lifecycleRegistry.currentState = Lifecycle.State.DESTROYED
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue