Clean code after review

This commit is contained in:
ganfra 2019-08-06 11:45:06 +02:00
parent fe884dba2d
commit d3ce4c491c
7 changed files with 67 additions and 109 deletions

View file

@ -17,74 +17,23 @@ package im.vector.matrix.android.internal.database
import io.realm.Realm import io.realm.Realm
import io.realm.RealmConfiguration import io.realm.RealmConfiguration
import io.realm.internal.OsSharedRealm import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.CancellableContinuation import kotlinx.coroutines.isActive
import kotlinx.coroutines.suspendCancellableCoroutine import kotlinx.coroutines.withContext
import timber.log.Timber
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
import java.util.concurrent.Future
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
private object AsyncTransactionThreadHolder {
val EXECUTOR: ExecutorService by lazy { suspend fun awaitTransaction(config: RealmConfiguration, transaction: suspend (realm: Realm) -> Unit) = withContext(Dispatchers.IO) {
Executors.newSingleThreadExecutor() Realm.getInstance(config).use { bgRealm ->
}
}
private class AsyncTransactionRunnable(private val continuation: CancellableContinuation<Unit>,
private val realmConfiguration: RealmConfiguration,
private val transaction: (realm: Realm) -> Unit) : Runnable {
override fun run() {
if (Thread.currentThread().isInterrupted) {
return
}
var versionID: OsSharedRealm.VersionID? = null
var exception: Throwable? = null
val bgRealm = Realm.getInstance(realmConfiguration)
bgRealm.beginTransaction() bgRealm.beginTransaction()
try { try {
transaction(bgRealm) transaction(bgRealm)
if (Thread.currentThread().isInterrupted) { if (isActive) {
return
}
bgRealm.commitTransaction() bgRealm.commitTransaction()
versionID = bgRealm.sharedRealm.versionID }
} catch (e: Throwable) {
exception = e
} finally { } finally {
try {
if (bgRealm.isInTransaction) { if (bgRealm.isInTransaction) {
bgRealm.cancelTransaction() bgRealm.cancelTransaction()
} }
} finally {
bgRealm.close()
} }
} }
val backgroundException = exception
val backgroundVersionID = versionID
when {
backgroundVersionID != null -> continuation.resume(Unit)
backgroundException != null -> continuation.resumeWithException(backgroundException)
}
}
}
suspend fun awaitTransaction(realmConfiguration: RealmConfiguration, transaction: (realm: Realm) -> Unit) {
return suspendCancellableCoroutine { continuation ->
var futureTask: Future<*>? = null
continuation.invokeOnCancellation {
Timber.v("Cancel database transaction")
futureTask?.cancel(true)
}
val runnable = AsyncTransactionRunnable(continuation, realmConfiguration, transaction)
futureTask = AsyncTransactionThreadHolder.EXECUTOR.submit(runnable)
}
} }

View file

@ -36,20 +36,20 @@ internal class NetworkConnectivityChecker @Inject constructor(context: Context)
.build(context) .build(context)
private val merlinsBeard = MerlinsBeard.Builder().build(context) private val merlinsBeard = MerlinsBeard.Builder().build(context)
private val listeners = ArrayList<Listener>() private val listeners = Collections.synchronizedList(ArrayList<Listener>())
init { init {
merlin.bind() merlin.bind()
merlin.registerDisconnectable { merlin.registerDisconnectable {
Timber.v("On Disconnect") Timber.v("On Disconnect")
val localListeners = Collections.synchronizedList(listeners) val localListeners = listeners.toList()
localListeners.forEach { localListeners.forEach {
it.onDisconnect() it.onDisconnect()
} }
} }
merlin.registerConnectable { merlin.registerConnectable {
Timber.v("On Connect") Timber.v("On Connect")
val localListeners = Collections.synchronizedList(listeners) val localListeners = listeners.toList()
localListeners.forEach { localListeners.forEach {
it.onConnect() it.onConnect()
} }

View file

@ -20,17 +20,16 @@ import android.content.Context
import androidx.work.CoroutineWorker import androidx.work.CoroutineWorker
import androidx.work.WorkerParameters import androidx.work.WorkerParameters
import com.squareup.moshi.JsonClass import com.squareup.moshi.JsonClass
import im.vector.matrix.android.api.MatrixCallback
import im.vector.matrix.android.api.failure.Failure import im.vector.matrix.android.api.failure.Failure
import im.vector.matrix.android.api.session.crypto.CryptoService import im.vector.matrix.android.api.session.crypto.CryptoService
import im.vector.matrix.android.api.session.events.model.Event import im.vector.matrix.android.api.session.events.model.Event
import im.vector.matrix.android.api.session.room.send.SendState import im.vector.matrix.android.api.session.room.send.SendState
import im.vector.matrix.android.internal.crypto.model.MXEncryptEventContentResult import im.vector.matrix.android.internal.crypto.model.MXEncryptEventContentResult
import im.vector.matrix.android.internal.util.awaitCallback
import im.vector.matrix.android.internal.worker.SessionWorkerParams import im.vector.matrix.android.internal.worker.SessionWorkerParams
import im.vector.matrix.android.internal.worker.WorkerParamsFactory import im.vector.matrix.android.internal.worker.WorkerParamsFactory
import im.vector.matrix.android.internal.worker.getSessionComponent import im.vector.matrix.android.internal.worker.getSessionComponent
import timber.log.Timber import timber.log.Timber
import java.util.concurrent.CountDownLatch
import javax.inject.Inject import javax.inject.Inject
internal class EncryptEventWorker(context: Context, params: WorkerParameters) internal class EncryptEventWorker(context: Context, params: WorkerParameters)
@ -71,44 +70,30 @@ internal class EncryptEventWorker(context: Context, params: WorkerParameters)
} }
localEchoUpdater.updateSendState(localEvent.eventId, SendState.ENCRYPTING) localEchoUpdater.updateSendState(localEvent.eventId, SendState.ENCRYPTING)
// TODO Better async handling
val latch = CountDownLatch(1)
var result: MXEncryptEventContentResult? = null
var error: Throwable? = null
val localMutableContent = HashMap(localEvent.content) val localMutableContent = HashMap(localEvent.content)
params.keepKeys?.forEach { params.keepKeys?.forEach {
localMutableContent.remove(it) localMutableContent.remove(it)
} }
var error: Throwable? = null
var result: MXEncryptEventContentResult? = null
try { try {
crypto.encryptEventContent(localMutableContent, localEvent.type, params.roomId, object : MatrixCallback<MXEncryptEventContentResult> { result = awaitCallback {
override fun onSuccess(data: MXEncryptEventContentResult) { crypto.encryptEventContent(localMutableContent, localEvent.type, params.roomId, it)
result = data
latch.countDown()
} }
} catch (throwable: Throwable) {
override fun onFailure(failure: Throwable) { error = throwable
error = failure
latch.countDown()
} }
})
} catch (e: Throwable) {
error = e
latch.countDown()
}
latch.await()
if (result != null) { if (result != null) {
val modifiedContent = HashMap(result?.eventContent) val modifiedContent = HashMap(result.eventContent)
params.keepKeys?.forEach { toKeep -> params.keepKeys?.forEach { toKeep ->
localEvent.content?.get(toKeep)?.let { localEvent.content?.get(toKeep)?.let {
//put it back in the encrypted thing //put it back in the encrypted thing
modifiedContent[toKeep] = it modifiedContent[toKeep] = it
} }
} }
val safeResult = result!!.copy(eventContent = modifiedContent) val safeResult = result.copy(eventContent = modifiedContent)
val encryptedEvent = localEvent.copy( val encryptedEvent = localEvent.copy(
type = safeResult.eventType, type = safeResult.eventType,
content = safeResult.eventContent content = safeResult.eventContent
@ -122,7 +107,8 @@ internal class EncryptEventWorker(context: Context, params: WorkerParameters)
} }
localEchoUpdater.updateSendState(localEvent.eventId, sendState) localEchoUpdater.updateSendState(localEvent.eventId, sendState)
//always return success, or the chain will be stuck for ever! //always return success, or the chain will be stuck for ever!
val nextWorkerParams = SendEventWorker.Params(params.userId, params.roomId, localEvent, error?.localizedMessage ?: "Error") val nextWorkerParams = SendEventWorker.Params(params.userId, params.roomId, localEvent, error?.localizedMessage
?: "Error")
return Result.success(WorkerParamsFactory.toData(nextWorkerParams)) return Result.success(WorkerParamsFactory.toData(nextWorkerParams))
} }
} }

View file

@ -40,7 +40,7 @@ internal data class ConfigurableTask<PARAMS, RESULT>(
val retryCount: Int, val retryCount: Int,
val callback: MatrixCallback<RESULT> val callback: MatrixCallback<RESULT>
) : Task<PARAMS, RESULT> { ) : Task<PARAMS, RESULT> by task {
class Builder<PARAMS, RESULT>( class Builder<PARAMS, RESULT>(
@ -66,11 +66,6 @@ internal data class ConfigurableTask<PARAMS, RESULT>(
) )
} }
override suspend fun execute(params: PARAMS): RESULT {
return task.execute(params)
}
fun executeBy(taskExecutor: TaskExecutor): Cancelable { fun executeBy(taskExecutor: TaskExecutor): Cancelable {
return taskExecutor.execute(this) return taskExecutor.execute(this)
} }

View file

@ -18,16 +18,12 @@ package im.vector.matrix.android.internal.task
import im.vector.matrix.android.api.util.Cancelable import im.vector.matrix.android.api.util.Cancelable
import im.vector.matrix.android.api.util.CancelableBag
import im.vector.matrix.android.internal.di.MatrixScope import im.vector.matrix.android.internal.di.MatrixScope
import im.vector.matrix.android.internal.extensions.foldToCallback import im.vector.matrix.android.internal.extensions.foldToCallback
import im.vector.matrix.android.internal.network.NetworkConnectivityChecker import im.vector.matrix.android.internal.network.NetworkConnectivityChecker
import im.vector.matrix.android.internal.util.CancelableCoroutine import im.vector.matrix.android.internal.util.CancelableCoroutine
import im.vector.matrix.android.internal.util.MatrixCoroutineDispatchers import im.vector.matrix.android.internal.util.MatrixCoroutineDispatchers
import kotlinx.coroutines.GlobalScope import kotlinx.coroutines.*
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import timber.log.Timber import timber.log.Timber
import javax.inject.Inject import javax.inject.Inject
import kotlin.coroutines.EmptyCoroutineContext import kotlin.coroutines.EmptyCoroutineContext
@ -36,11 +32,11 @@ import kotlin.coroutines.EmptyCoroutineContext
internal class TaskExecutor @Inject constructor(private val coroutineDispatchers: MatrixCoroutineDispatchers, internal class TaskExecutor @Inject constructor(private val coroutineDispatchers: MatrixCoroutineDispatchers,
private val networkConnectivityChecker: NetworkConnectivityChecker) { private val networkConnectivityChecker: NetworkConnectivityChecker) {
private val cancelableBag = CancelableBag() private val executorScope = CoroutineScope(SupervisorJob())
fun <PARAMS, RESULT> execute(task: ConfigurableTask<PARAMS, RESULT>): Cancelable { fun <PARAMS, RESULT> execute(task: ConfigurableTask<PARAMS, RESULT>): Cancelable {
val job = GlobalScope.launch(task.callbackThread.toDispatcher()) { val job = executorScope.launch(task.callbackThread.toDispatcher()) {
val resultOrFailure = runCatching { val resultOrFailure = runCatching {
withContext(task.executionThread.toDispatcher()) { withContext(task.executionThread.toDispatcher()) {
Timber.v("Enqueue task $task") Timber.v("Enqueue task $task")
@ -60,14 +56,11 @@ internal class TaskExecutor @Inject constructor(private val coroutineDispatchers
} }
.foldToCallback(task.callback) .foldToCallback(task.callback)
} }
return CancelableCoroutine(job).also { return CancelableCoroutine(job)
cancelableBag += it
}
} }
fun cancelAll() = synchronized(this) { fun cancelAll() = executorScope.coroutineContext.cancelChildren()
cancelableBag.cancel()
}
private suspend fun <T> retry( private suspend fun <T> retry(
times: Int = Int.MAX_VALUE, times: Int = Int.MAX_VALUE,

View file

@ -22,7 +22,7 @@ import io.realm.Realm
import io.realm.RealmModel import io.realm.RealmModel
import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.atomic.AtomicReference
internal suspend fun Monarchy.awaitTransaction(transaction: (realm: Realm) -> Unit) { internal suspend fun Monarchy.awaitTransaction(transaction: suspend (realm: Realm) -> Unit) {
awaitTransaction(realmConfiguration, transaction) awaitTransaction(realmConfiguration, transaction)
} }

View file

@ -0,0 +1,35 @@
/*
* Copyright 2019 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.matrix.android.internal.util
import im.vector.matrix.android.api.MatrixCallback
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine
suspend inline fun <T> awaitCallback(crossinline callback: (MatrixCallback<T>) -> Unit) = suspendCoroutine<T> { cont ->
callback(object : MatrixCallback<T> {
override fun onFailure(failure: Throwable) {
cont.resumeWithException(failure)
}
override fun onSuccess(data: T) {
cont.resume(data)
}
})
}