Merge pull request #1116 from vector-im/feature/worker_manager

Add some documentation on Workers
This commit is contained in:
Benoit Marty 2020-04-16 16:28:20 +02:00 committed by GitHub
commit cc94b6cf7d
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 91 additions and 25 deletions

View file

@ -30,6 +30,10 @@ import im.vector.matrix.android.internal.worker.getSessionComponent
import timber.log.Timber import timber.log.Timber
import javax.inject.Inject import javax.inject.Inject
/**
* Possible previous worker: None
* Possible next worker : None
*/
internal class SendVerificationMessageWorker(context: Context, internal class SendVerificationMessageWorker(context: Context,
params: WorkerParameters) params: WorkerParameters)
: CoroutineWorker(context, params) { : CoroutineWorker(context, params) {
@ -48,7 +52,7 @@ internal class SendVerificationMessageWorker(context: Context,
lateinit var cryptoService: CryptoService lateinit var cryptoService: CryptoService
override suspend fun doWork(): Result { override suspend fun doWork(): Result {
val errorOutputData = Data.Builder().putBoolean("failed", true).build() val errorOutputData = Data.Builder().putBoolean(OUTPUT_KEY_FAILED, true).build()
val params = WorkerParamsFactory.fromData<Params>(inputData) val params = WorkerParamsFactory.fromData<Params>(inputData)
?: return Result.success(errorOutputData) ?: return Result.success(errorOutputData)
@ -76,4 +80,12 @@ internal class SendVerificationMessageWorker(context: Context,
} }
} }
} }
companion object {
private const val OUTPUT_KEY_FAILED = "failed"
fun hasFailed(outputData: Data): Boolean {
return outputData.getBoolean(SendVerificationMessageWorker.OUTPUT_KEY_FAILED, false)
}
}
} }

View file

@ -34,6 +34,9 @@ internal interface VerificationTransport {
onErrorReason: CancelCode, onErrorReason: CancelCode,
onDone: (() -> Unit)?) onDone: (() -> Unit)?)
/**
* @param callback will be called with eventId and ValidVerificationInfoRequest in case of success
*/
fun sendVerificationRequest(supportedMethods: List<String>, fun sendVerificationRequest(supportedMethods: List<String>,
localId: String, localId: String,
otherUserId: String, otherUserId: String,

View file

@ -115,7 +115,7 @@ internal class VerificationTransportRoomMessage(
?.filter { it.state == WorkInfo.State.SUCCEEDED } ?.filter { it.state == WorkInfo.State.SUCCEEDED }
?.firstOrNull { it.id == enqueueInfo.second } ?.firstOrNull { it.id == enqueueInfo.second }
?.let { wInfo -> ?.let { wInfo ->
if (wInfo.outputData.getBoolean("failed", false)) { if (SendVerificationMessageWorker.hasFailed(wInfo.outputData)) {
Timber.e("## SAS verification [${tx?.transactionId}] failed to send verification message in state : ${tx?.state}") Timber.e("## SAS verification [${tx?.transactionId}] failed to send verification message in state : ${tx?.state}")
tx?.cancel(onErrorReason) tx?.cancel(onErrorReason)
} else { } else {
@ -196,12 +196,15 @@ internal class VerificationTransportRoomMessage(
?.filter { it.state == WorkInfo.State.SUCCEEDED } ?.filter { it.state == WorkInfo.State.SUCCEEDED }
?.firstOrNull { it.id == workRequest.id } ?.firstOrNull { it.id == workRequest.id }
?.let { wInfo -> ?.let { wInfo ->
if (wInfo.outputData.getBoolean("failed", false)) { if (SendVerificationMessageWorker.hasFailed(wInfo.outputData)) {
callback(null, null) callback(null, null)
} else if (wInfo.outputData.getString(localId) != null) {
callback(wInfo.outputData.getString(localId), validInfo)
} else { } else {
callback(null, null) val eventId = wInfo.outputData.getString(localId)
if (eventId != null) {
callback(eventId, validInfo)
} else {
callback(null, null)
}
} }
workLiveData.removeObserver(this) workLiveData.removeObserver(this)
} }

View file

@ -46,6 +46,10 @@ private data class NewImageAttributes(
val newFileSize: Int val newFileSize: Int
) )
/**
* Possible previous worker: None
* Possible next worker : Always [MultipleEventSendingDispatcherWorker]
*/
internal class UploadContentWorker(val context: Context, params: WorkerParameters) : CoroutineWorker(context, params) { internal class UploadContentWorker(val context: Context, params: WorkerParameters) : CoroutineWorker(context, params) {
@JsonClass(generateAdapter = true) @JsonClass(generateAdapter = true)
@ -64,12 +68,14 @@ internal class UploadContentWorker(val context: Context, params: WorkerParameter
override suspend fun doWork(): Result { override suspend fun doWork(): Result {
val params = WorkerParamsFactory.fromData<Params>(inputData) val params = WorkerParamsFactory.fromData<Params>(inputData)
?: return Result.success() ?: return Result.success()
.also { Timber.e("Unable to parse work parameters") }
Timber.v("Starting upload media work with params $params") Timber.v("Starting upload media work with params $params")
if (params.lastFailureMessage != null) { if (params.lastFailureMessage != null) {
// Transmit the error // Transmit the error
Timber.v("Stop upload media work due to input failure")
return Result.success(inputData) return Result.success(inputData)
.also { Timber.e("Work cancelled due to input error from parent") }
} }
// Just defensive code to ensure that we never have an uncaught exception that could break the queue // Just defensive code to ensure that we never have an uncaught exception that could break the queue

View file

@ -23,8 +23,13 @@ import com.squareup.moshi.JsonClass
import im.vector.matrix.android.internal.worker.SessionWorkerParams import im.vector.matrix.android.internal.worker.SessionWorkerParams
import im.vector.matrix.android.internal.worker.WorkerParamsFactory import im.vector.matrix.android.internal.worker.WorkerParamsFactory
import im.vector.matrix.android.internal.worker.getSessionComponent import im.vector.matrix.android.internal.worker.getSessionComponent
import timber.log.Timber
import javax.inject.Inject import javax.inject.Inject
/**
* Possible previous worker: None
* Possible next worker : None
*/
internal class GetGroupDataWorker(context: Context, params: WorkerParameters) : CoroutineWorker(context, params) { internal class GetGroupDataWorker(context: Context, params: WorkerParameters) : CoroutineWorker(context, params) {
@JsonClass(generateAdapter = true) @JsonClass(generateAdapter = true)
@ -39,6 +44,7 @@ internal class GetGroupDataWorker(context: Context, params: WorkerParameters) :
override suspend fun doWork(): Result { override suspend fun doWork(): Result {
val params = WorkerParamsFactory.fromData<Params>(inputData) val params = WorkerParamsFactory.fromData<Params>(inputData)
?: return Result.failure() ?: return Result.failure()
.also { Timber.e("Unable to parse work parameters") }
val sessionComponent = getSessionComponent(params.sessionId) ?: return Result.success() val sessionComponent = getSessionComponent(params.sessionId) ?: return Result.success()
sessionComponent.inject(this) sessionComponent.inject(this)

View file

@ -69,13 +69,13 @@ internal class GroupSummaryUpdater @Inject constructor(
val workData = WorkerParamsFactory.toData(getGroupDataWorkerParams) val workData = WorkerParamsFactory.toData(getGroupDataWorkerParams)
val sendWork = workManagerProvider.matrixOneTimeWorkRequestBuilder<GetGroupDataWorker>() val getGroupWork = workManagerProvider.matrixOneTimeWorkRequestBuilder<GetGroupDataWorker>()
.setInputData(workData) .setInputData(workData)
.setConstraints(WorkManagerProvider.workConstraints) .setConstraints(WorkManagerProvider.workConstraints)
.build() .build()
workManagerProvider.workManager workManagerProvider.workManager
.beginUniqueWork(GET_GROUP_DATA_WORKER, ExistingWorkPolicy.APPEND, sendWork) .beginUniqueWork(GET_GROUP_DATA_WORKER, ExistingWorkPolicy.APPEND, getGroupWork)
.enqueue() .enqueue()
} }

View file

@ -31,6 +31,7 @@ import im.vector.matrix.android.internal.worker.SessionWorkerParams
import im.vector.matrix.android.internal.worker.WorkerParamsFactory import im.vector.matrix.android.internal.worker.WorkerParamsFactory
import im.vector.matrix.android.internal.worker.getSessionComponent import im.vector.matrix.android.internal.worker.getSessionComponent
import org.greenrobot.eventbus.EventBus import org.greenrobot.eventbus.EventBus
import timber.log.Timber
import javax.inject.Inject import javax.inject.Inject
internal class AddHttpPusherWorker(context: Context, params: WorkerParameters) internal class AddHttpPusherWorker(context: Context, params: WorkerParameters)
@ -50,6 +51,7 @@ internal class AddHttpPusherWorker(context: Context, params: WorkerParameters)
override suspend fun doWork(): Result { override suspend fun doWork(): Result {
val params = WorkerParamsFactory.fromData<Params>(inputData) val params = WorkerParamsFactory.fromData<Params>(inputData)
?: return Result.failure() ?: return Result.failure()
.also { Timber.e("Unable to parse work parameters") }
val sessionComponent = getSessionComponent(params.sessionId) ?: return Result.success() val sessionComponent = getSessionComponent(params.sessionId) ?: return Result.success()
sessionComponent.inject(this) sessionComponent.inject(this)

View file

@ -31,6 +31,7 @@ import im.vector.matrix.android.internal.worker.SessionWorkerParams
import im.vector.matrix.android.internal.worker.WorkerParamsFactory import im.vector.matrix.android.internal.worker.WorkerParamsFactory
import im.vector.matrix.android.internal.worker.getSessionComponent import im.vector.matrix.android.internal.worker.getSessionComponent
import org.greenrobot.eventbus.EventBus import org.greenrobot.eventbus.EventBus
import timber.log.Timber
import javax.inject.Inject import javax.inject.Inject
// TODO This is not used. Delete? // TODO This is not used. Delete?
@ -51,10 +52,12 @@ internal class SendRelationWorker(context: Context, params: WorkerParameters) :
override suspend fun doWork(): Result { override suspend fun doWork(): Result {
val params = WorkerParamsFactory.fromData<Params>(inputData) val params = WorkerParamsFactory.fromData<Params>(inputData)
?: return Result.failure() ?: return Result.failure()
.also { Timber.e("Unable to parse work parameters") }
if (params.lastFailureMessage != null) { if (params.lastFailureMessage != null) {
// Transmit the error // Transmit the error
return Result.success(inputData) return Result.success(inputData)
.also { Timber.e("Work cancelled due to input error from parent") }
} }
val sessionComponent = getSessionComponent(params.sessionId) ?: return Result.success() val sessionComponent = getSessionComponent(params.sessionId) ?: return Result.success()

View file

@ -228,7 +228,7 @@ internal class DefaultSendService @AssistedInject constructor(
keys.forEach { isRoomEncrypted -> keys.forEach { isRoomEncrypted ->
// Should never be empty // Should never be empty
val localEchoes = get(isRoomEncrypted).orEmpty() val localEchoes = get(isRoomEncrypted).orEmpty()
val uploadWork = createUploadMediaWork(localEchoes, attachment, isRoomEncrypted, compressBeforeSending, startChain = true) val uploadWork = createUploadMediaWork(localEchoes, attachment, isRoomEncrypted, compressBeforeSending)
val dispatcherWork = createMultipleEventDispatcherWork(isRoomEncrypted) val dispatcherWork = createMultipleEventDispatcherWork(isRoomEncrypted)
@ -293,14 +293,13 @@ internal class DefaultSendService @AssistedInject constructor(
private fun createUploadMediaWork(allLocalEchos: List<Event>, private fun createUploadMediaWork(allLocalEchos: List<Event>,
attachment: ContentAttachmentData, attachment: ContentAttachmentData,
isRoomEncrypted: Boolean, isRoomEncrypted: Boolean,
compressBeforeSending: Boolean, compressBeforeSending: Boolean): OneTimeWorkRequest {
startChain: Boolean): OneTimeWorkRequest {
val uploadMediaWorkerParams = UploadContentWorker.Params(sessionId, allLocalEchos, attachment, isRoomEncrypted, compressBeforeSending) val uploadMediaWorkerParams = UploadContentWorker.Params(sessionId, allLocalEchos, attachment, isRoomEncrypted, compressBeforeSending)
val uploadWorkData = WorkerParamsFactory.toData(uploadMediaWorkerParams) val uploadWorkData = WorkerParamsFactory.toData(uploadMediaWorkerParams)
return workManagerProvider.matrixOneTimeWorkRequestBuilder<UploadContentWorker>() return workManagerProvider.matrixOneTimeWorkRequestBuilder<UploadContentWorker>()
.setConstraints(WorkManagerProvider.workConstraints) .setConstraints(WorkManagerProvider.workConstraints)
.startChain(startChain) .startChain(true)
.setInputData(uploadWorkData) .setInputData(uploadWorkData)
.setBackoffCriteria(BackoffPolicy.LINEAR, WorkManagerProvider.BACKOFF_DELAY, TimeUnit.MILLISECONDS) .setBackoffCriteria(BackoffPolicy.LINEAR, WorkManagerProvider.BACKOFF_DELAY, TimeUnit.MILLISECONDS)
.build() .build()

View file

@ -35,6 +35,10 @@ import im.vector.matrix.android.internal.worker.getSessionComponent
import timber.log.Timber import timber.log.Timber
import javax.inject.Inject import javax.inject.Inject
/**
* Possible previous worker: None
* Possible next worker : Always [SendEventWorker]
*/
internal class EncryptEventWorker(context: Context, params: WorkerParameters) internal class EncryptEventWorker(context: Context, params: WorkerParameters)
: CoroutineWorker(context, params) { : CoroutineWorker(context, params) {
@ -53,14 +57,14 @@ internal class EncryptEventWorker(context: Context, params: WorkerParameters)
override suspend fun doWork(): Result { override suspend fun doWork(): Result {
Timber.v("Start Encrypt work") Timber.v("Start Encrypt work")
val params = WorkerParamsFactory.fromData<Params>(inputData) val params = WorkerParamsFactory.fromData<Params>(inputData)
?: return Result.success().also { ?: return Result.success()
Timber.e("Work cancelled due to input error from parent") .also { Timber.e("Unable to parse work parameters") }
}
Timber.v("Start Encrypt work for event ${params.event.eventId}") Timber.v("Start Encrypt work for event ${params.event.eventId}")
if (params.lastFailureMessage != null) { if (params.lastFailureMessage != null) {
// Transmit the error // Transmit the error
return Result.success(inputData) return Result.success(inputData)
.also { Timber.e("Work cancelled due to input error from parent") }
} }
val sessionComponent = getSessionComponent(params.sessionId) ?: return Result.success() val sessionComponent = getSessionComponent(params.sessionId) ?: return Result.success()

View file

@ -25,6 +25,7 @@ import com.squareup.moshi.JsonClass
import im.vector.matrix.android.api.session.events.model.Event import im.vector.matrix.android.api.session.events.model.Event
import im.vector.matrix.android.api.session.room.send.SendState import im.vector.matrix.android.api.session.room.send.SendState
import im.vector.matrix.android.internal.di.WorkManagerProvider import im.vector.matrix.android.internal.di.WorkManagerProvider
import im.vector.matrix.android.internal.session.content.UploadContentWorker
import im.vector.matrix.android.internal.session.room.timeline.TimelineSendEventWorkCommon import im.vector.matrix.android.internal.session.room.timeline.TimelineSendEventWorkCommon
import im.vector.matrix.android.internal.worker.SessionWorkerParams import im.vector.matrix.android.internal.worker.SessionWorkerParams
import im.vector.matrix.android.internal.worker.WorkerParamsFactory import im.vector.matrix.android.internal.worker.WorkerParamsFactory
@ -36,6 +37,9 @@ import javax.inject.Inject
/** /**
* This worker creates a new work for each events passed in parameter * This worker creates a new work for each events passed in parameter
*
* Possible previous worker: Always [UploadContentWorker]
* Possible next worker : None, but it will post new work to send events, encrypted or not
*/ */
internal class MultipleEventSendingDispatcherWorker(context: Context, params: WorkerParameters) internal class MultipleEventSendingDispatcherWorker(context: Context, params: WorkerParameters)
: CoroutineWorker(context, params) { : CoroutineWorker(context, params) {
@ -55,9 +59,8 @@ internal class MultipleEventSendingDispatcherWorker(context: Context, params: Wo
override suspend fun doWork(): Result { override suspend fun doWork(): Result {
Timber.v("Start dispatch sending multiple event work") Timber.v("Start dispatch sending multiple event work")
val params = WorkerParamsFactory.fromData<Params>(inputData) val params = WorkerParamsFactory.fromData<Params>(inputData)
?: return Result.success().also { ?: return Result.success()
Timber.e("Work cancelled due to input error from parent") .also { Timber.e("Unable to parse work parameters") }
}
val sessionComponent = getSessionComponent(params.sessionId) ?: return Result.success() val sessionComponent = getSessionComponent(params.sessionId) ?: return Result.success()
sessionComponent.inject(this) sessionComponent.inject(this)
@ -68,6 +71,7 @@ internal class MultipleEventSendingDispatcherWorker(context: Context, params: Wo
} }
// Transmit the error if needed? // Transmit the error if needed?
return Result.success(inputData) return Result.success(inputData)
.also { Timber.e("Work cancelled due to input error from parent") }
} }
// Create a work for every event // Create a work for every event

View file

@ -26,8 +26,13 @@ import im.vector.matrix.android.internal.worker.SessionWorkerParams
import im.vector.matrix.android.internal.worker.WorkerParamsFactory import im.vector.matrix.android.internal.worker.WorkerParamsFactory
import im.vector.matrix.android.internal.worker.getSessionComponent import im.vector.matrix.android.internal.worker.getSessionComponent
import org.greenrobot.eventbus.EventBus import org.greenrobot.eventbus.EventBus
import timber.log.Timber
import javax.inject.Inject import javax.inject.Inject
/**
* Possible previous worker: None
* Possible next worker : None
*/
internal class RedactEventWorker(context: Context, params: WorkerParameters) : CoroutineWorker(context, params) { internal class RedactEventWorker(context: Context, params: WorkerParameters) : CoroutineWorker(context, params) {
@JsonClass(generateAdapter = true) @JsonClass(generateAdapter = true)
@ -46,10 +51,12 @@ internal class RedactEventWorker(context: Context, params: WorkerParameters) : C
override suspend fun doWork(): Result { override suspend fun doWork(): Result {
val params = WorkerParamsFactory.fromData<Params>(inputData) val params = WorkerParamsFactory.fromData<Params>(inputData)
?: return Result.failure() ?: return Result.failure()
.also { Timber.e("Unable to parse work parameters") }
if (params.lastFailureMessage != null) { if (params.lastFailureMessage != null) {
// Transmit the error // Transmit the error
return Result.success(inputData) return Result.success(inputData)
.also { Timber.e("Work cancelled due to input error from parent") }
} }
val sessionComponent = getSessionComponent(params.sessionId) ?: return Result.success() val sessionComponent = getSessionComponent(params.sessionId) ?: return Result.success()

View file

@ -32,6 +32,10 @@ import org.greenrobot.eventbus.EventBus
import timber.log.Timber import timber.log.Timber
import javax.inject.Inject import javax.inject.Inject
/**
* Possible previous worker: [EncryptEventWorker] or first worker
* Possible next worker : None
*/
internal class SendEventWorker(context: Context, internal class SendEventWorker(context: Context,
params: WorkerParameters) params: WorkerParameters)
: CoroutineWorker(context, params) { : CoroutineWorker(context, params) {
@ -49,9 +53,8 @@ internal class SendEventWorker(context: Context,
override suspend fun doWork(): Result { override suspend fun doWork(): Result {
val params = WorkerParamsFactory.fromData<Params>(inputData) val params = WorkerParamsFactory.fromData<Params>(inputData)
?: return Result.success().also { ?: return Result.success()
Timber.e("Work cancelled due to input error from parent") .also { Timber.e("Unable to parse work parameters") }
}
val sessionComponent = getSessionComponent(params.sessionId) ?: return Result.success() val sessionComponent = getSessionComponent(params.sessionId) ?: return Result.success()
sessionComponent.inject(this) sessionComponent.inject(this)
@ -65,6 +68,7 @@ internal class SendEventWorker(context: Context,
localEchoUpdater.updateSendState(event.eventId, SendState.UNDELIVERED) localEchoUpdater.updateSendState(event.eventId, SendState.UNDELIVERED)
// Transmit the error // Transmit the error
return Result.success(inputData) return Result.success(inputData)
.also { Timber.e("Work cancelled due to input error from parent") }
} }
return try { return try {
sendEvent(event) sendEvent(event)

View file

@ -35,6 +35,10 @@ import javax.inject.Inject
private const val DEFAULT_LONG_POOL_TIMEOUT = 0L private const val DEFAULT_LONG_POOL_TIMEOUT = 0L
/**
* Possible previous worker: None
* Possible next worker : None
*/
internal class SyncWorker(context: Context, internal class SyncWorker(context: Context,
workerParameters: WorkerParameters workerParameters: WorkerParameters
) : CoroutineWorker(context, workerParameters) { ) : CoroutineWorker(context, workerParameters) {
@ -53,7 +57,10 @@ internal class SyncWorker(context: Context,
override suspend fun doWork(): Result { override suspend fun doWork(): Result {
Timber.i("Sync work starting") Timber.i("Sync work starting")
val params = WorkerParamsFactory.fromData<Params>(inputData) ?: return Result.success() val params = WorkerParamsFactory.fromData<Params>(inputData)
?: return Result.success()
.also { Timber.e("Unable to parse work parameters") }
val sessionComponent = getSessionComponent(params.sessionId) ?: return Result.success() val sessionComponent = getSessionComponent(params.sessionId) ?: return Result.success()
sessionComponent.inject(this) sessionComponent.inject(this)
return runCatching { return runCatching {
@ -76,7 +83,6 @@ internal class SyncWorker(context: Context,
} }
companion object { companion object {
private const val BG_SYNC_WORK_NAME = "BG_SYNCP" private const val BG_SYNC_WORK_NAME = "BG_SYNCP"
fun requireBackgroundSync(workManagerProvider: WorkManagerProvider, sessionId: String, serverTimeout: Long = 0) { fun requireBackgroundSync(workManagerProvider: WorkManagerProvider, sessionId: String, serverTimeout: Long = 0) {

View file

@ -16,9 +16,16 @@
package im.vector.matrix.android.internal.worker package im.vector.matrix.android.internal.worker
/**
* Note about the Worker usage:
* The workers we chain, or when using the append strategy, should never return Result.Failure(), else the chain will be broken forever
*/
interface SessionWorkerParams { interface SessionWorkerParams {
val sessionId: String val sessionId: String
// Null is no error occurs. When chaining Workers, first step is to check that there is no lastFailureMessage from the previous workers /**
* Null when no error occurs. When chaining Workers, first step is to check that there is no lastFailureMessage from the previous workers
* If it is the case, the worker should just transmit the error and shouldn't do anything else
*/
val lastFailureMessage: String? val lastFailureMessage: String?
} }