mirror of
https://github.com/element-hq/element-android
synced 2024-12-18 07:12:47 +03:00
Network: reword the strategy for handling NetworkConnectivity (remove Merlin)
This commit is contained in:
parent
c65f25d7ae
commit
4331d2ef47
14 changed files with 277 additions and 167 deletions
|
@ -111,7 +111,6 @@ dependencies {
|
|||
implementation 'com.squareup.retrofit2:converter-moshi:2.6.2'
|
||||
implementation 'com.squareup.okhttp3:okhttp:4.2.2'
|
||||
implementation 'com.squareup.okhttp3:logging-interceptor:4.2.2'
|
||||
implementation 'com.novoda:merlin:1.2.0'
|
||||
implementation "com.squareup.moshi:moshi-adapters:$moshi_version"
|
||||
kapt "com.squareup.moshi:moshi-kotlin-codegen:$moshi_version"
|
||||
|
||||
|
|
|
@ -0,0 +1,75 @@
|
|||
/*
|
||||
* Copyright 2020 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package im.vector.matrix.android.internal.network
|
||||
|
||||
import android.annotation.TargetApi
|
||||
import android.content.Context
|
||||
import android.content.IntentFilter
|
||||
import android.net.ConnectivityManager
|
||||
import android.net.Network
|
||||
import javax.inject.Inject
|
||||
|
||||
internal interface NetworkCallbackStrategy {
|
||||
fun register(hasChanged: () -> Unit)
|
||||
fun unregister()
|
||||
}
|
||||
|
||||
internal class FallbackNetworkCallbackStrategy @Inject constructor(private val context: Context,
|
||||
private val networkInfoReceiver: NetworkInfoReceiver) : NetworkCallbackStrategy {
|
||||
|
||||
@Suppress("DEPRECATION")
|
||||
val filter = IntentFilter(ConnectivityManager.CONNECTIVITY_ACTION)
|
||||
|
||||
override fun register(hasChanged: () -> Unit) {
|
||||
networkInfoReceiver.isConnectedCallback = {
|
||||
hasChanged()
|
||||
}
|
||||
context.registerReceiver(networkInfoReceiver, filter)
|
||||
}
|
||||
|
||||
override fun unregister() {
|
||||
networkInfoReceiver.isConnectedCallback = null
|
||||
context.unregisterReceiver(networkInfoReceiver)
|
||||
}
|
||||
}
|
||||
|
||||
@TargetApi(android.os.Build.VERSION_CODES.N)
|
||||
internal class PreferredNetworkCallbackStrategy @Inject constructor(context: Context) : NetworkCallbackStrategy {
|
||||
|
||||
private var hasChangedCallback: (() -> Unit)? = null
|
||||
private val conn = context.getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManager
|
||||
private val networkCallback = object : ConnectivityManager.NetworkCallback() {
|
||||
|
||||
override fun onLost(network: Network) {
|
||||
hasChangedCallback?.invoke()
|
||||
}
|
||||
|
||||
override fun onAvailable(network: Network) {
|
||||
hasChangedCallback?.invoke()
|
||||
}
|
||||
}
|
||||
|
||||
override fun register(hasChanged: () -> Unit) {
|
||||
hasChangedCallback = hasChanged
|
||||
conn.registerDefaultNetworkCallback(networkCallback)
|
||||
}
|
||||
|
||||
override fun unregister() {
|
||||
hasChangedCallback = null
|
||||
conn.unregisterNetworkCallback(networkCallback)
|
||||
}
|
||||
}
|
|
@ -16,23 +16,15 @@
|
|||
|
||||
package im.vector.matrix.android.internal.network
|
||||
|
||||
import android.content.Context
|
||||
import androidx.annotation.WorkerThread
|
||||
import com.novoda.merlin.Endpoint
|
||||
import com.novoda.merlin.Merlin
|
||||
import com.novoda.merlin.MerlinsBeard
|
||||
import com.novoda.merlin.ResponseCodeValidator
|
||||
import im.vector.matrix.android.api.auth.data.HomeServerConnectionConfig
|
||||
import im.vector.matrix.android.internal.session.SessionScope
|
||||
import im.vector.matrix.android.internal.session.homeserver.HomeServerPinger
|
||||
import im.vector.matrix.android.internal.util.BackgroundDetectionObserver
|
||||
import im.vector.matrix.android.internal.util.MatrixCoroutineDispatchers
|
||||
import kotlinx.coroutines.withContext
|
||||
import timber.log.Timber
|
||||
import kotlinx.coroutines.runBlocking
|
||||
import java.util.Collections
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
import javax.inject.Inject
|
||||
import kotlin.coroutines.resume
|
||||
import kotlin.coroutines.suspendCoroutine
|
||||
|
||||
interface NetworkConnectivityChecker {
|
||||
/**
|
||||
|
@ -41,60 +33,29 @@ interface NetworkConnectivityChecker {
|
|||
@WorkerThread
|
||||
fun hasInternetAccess(forcePing: Boolean): Boolean
|
||||
|
||||
/**
|
||||
* Wait until we get internet connection.
|
||||
*/
|
||||
suspend fun waitUntilConnected()
|
||||
|
||||
fun register(listener: Listener)
|
||||
fun unregister(listener: Listener)
|
||||
|
||||
interface Listener {
|
||||
fun onConnect() {
|
||||
}
|
||||
|
||||
fun onDisconnect() {
|
||||
}
|
||||
fun onConnectivityChanged()
|
||||
}
|
||||
}
|
||||
|
||||
@SessionScope
|
||||
internal class MerlinNetworkConnectivityChecker @Inject constructor(context: Context,
|
||||
homeServerConnectionConfig: HomeServerConnectionConfig,
|
||||
private val coroutineDispatchers: MatrixCoroutineDispatchers,
|
||||
private val backgroundDetectionObserver: BackgroundDetectionObserver)
|
||||
: NetworkConnectivityChecker {
|
||||
|
||||
private val waitingForNetwork = AtomicBoolean(false)
|
||||
private val isMerlinBounded = AtomicBoolean(false)
|
||||
private val endpointString = "${homeServerConnectionConfig.homeServerUri}/_matrix/client/versions"
|
||||
private val endpoint = Endpoint.from(endpointString)
|
||||
private val responseCodeValidator = ResponseCodeValidator { responseCode ->
|
||||
responseCode == 204 || responseCode == 400 || responseCode == 404
|
||||
}
|
||||
|
||||
private val merlin = Merlin.Builder()
|
||||
.withEndpoint(endpoint)
|
||||
.withResponseCodeValidator(responseCodeValidator)
|
||||
.withAllCallbacks()
|
||||
.build(context)
|
||||
|
||||
private val merlinsBeard = MerlinsBeard.Builder()
|
||||
.withEndpoint(endpoint)
|
||||
.withResponseCodeValidator(responseCodeValidator)
|
||||
.build(context)
|
||||
|
||||
private val hasInternetAccess = AtomicBoolean(merlinsBeard.isConnected)
|
||||
internal class DefaultNetworkConnectivityChecker @Inject constructor(private val coroutineDispatchers: MatrixCoroutineDispatchers,
|
||||
private val homeServerPinger: HomeServerPinger,
|
||||
private val backgroundDetectionObserver: BackgroundDetectionObserver,
|
||||
private val networkCallbackStrategy: NetworkCallbackStrategy) : NetworkConnectivityChecker {
|
||||
|
||||
private val hasInternetAccess = AtomicBoolean(true)
|
||||
private val listeners = Collections.synchronizedSet(LinkedHashSet<NetworkConnectivityChecker.Listener>())
|
||||
|
||||
private val backgroundDetectionObserverListener = object : BackgroundDetectionObserver.Listener {
|
||||
override fun onMoveToForeground() {
|
||||
bindMerlinIfNeeded()
|
||||
bind()
|
||||
}
|
||||
|
||||
override fun onMoveToBackground() {
|
||||
unbindMerlinIfNeeded()
|
||||
unbind()
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -104,81 +65,20 @@ internal class MerlinNetworkConnectivityChecker @Inject constructor(context: Con
|
|||
@WorkerThread
|
||||
override fun hasInternetAccess(forcePing: Boolean): Boolean {
|
||||
return if (forcePing) {
|
||||
merlinsBeard.hasInternetAccess()
|
||||
runBlocking {
|
||||
homeServerPinger.canReachHomeServer()
|
||||
}
|
||||
} else {
|
||||
hasInternetAccess.get()
|
||||
}
|
||||
}
|
||||
|
||||
private fun bindMerlinIfNeeded() {
|
||||
if (isMerlinBounded.get()) {
|
||||
return
|
||||
}
|
||||
Timber.v("Bind merlin")
|
||||
isMerlinBounded.set(true)
|
||||
merlin.bind()
|
||||
merlinsBeard.hasInternetAccess {
|
||||
hasInternetAccess.set(it)
|
||||
}
|
||||
merlin.registerBindable {
|
||||
Timber.v("On Network available: ${it.isAvailable}")
|
||||
}
|
||||
merlin.registerDisconnectable {
|
||||
Timber.v("On Disconnect")
|
||||
hasInternetAccess.set(false)
|
||||
val localListeners = listeners.toList()
|
||||
localListeners.forEach {
|
||||
it.onDisconnect()
|
||||
}
|
||||
}
|
||||
merlin.registerConnectable {
|
||||
Timber.v("On Connect")
|
||||
hasInternetAccess.set(true)
|
||||
val localListeners = listeners.toList()
|
||||
localListeners.forEach {
|
||||
it.onConnect()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun unbindMerlinIfNeeded() {
|
||||
if (backgroundDetectionObserver.isInBackground && !waitingForNetwork.get() && isMerlinBounded.get()) {
|
||||
isMerlinBounded.set(false)
|
||||
Timber.v("Unbind merlin")
|
||||
merlin.unbind()
|
||||
}
|
||||
}
|
||||
|
||||
override suspend fun waitUntilConnected() {
|
||||
val hasInternetAccess = withContext(coroutineDispatchers.io) {
|
||||
merlinsBeard.hasInternetAccess()
|
||||
}
|
||||
if (hasInternetAccess) {
|
||||
return
|
||||
} else {
|
||||
waitingForNetwork.set(true)
|
||||
bindMerlinIfNeeded()
|
||||
Timber.v("Waiting for network...")
|
||||
suspendCoroutine<Unit> { continuation ->
|
||||
register(object : NetworkConnectivityChecker.Listener {
|
||||
override fun onConnect() {
|
||||
unregister(this)
|
||||
waitingForNetwork.set(false)
|
||||
unbindMerlinIfNeeded()
|
||||
Timber.v("Connected to network...")
|
||||
continuation.resume(Unit)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override fun register(listener: NetworkConnectivityChecker.Listener) {
|
||||
if (listeners.isEmpty()) {
|
||||
if (backgroundDetectionObserver.isInBackground) {
|
||||
unbindMerlinIfNeeded()
|
||||
unbind()
|
||||
} else {
|
||||
bindMerlinIfNeeded()
|
||||
bind()
|
||||
}
|
||||
backgroundDetectionObserver.register(backgroundDetectionObserverListener)
|
||||
}
|
||||
|
@ -191,4 +91,20 @@ internal class MerlinNetworkConnectivityChecker @Inject constructor(context: Con
|
|||
backgroundDetectionObserver.unregister(backgroundDetectionObserverListener)
|
||||
}
|
||||
}
|
||||
|
||||
private fun bind() {
|
||||
networkCallbackStrategy.register {
|
||||
val localListeners = listeners.toList()
|
||||
localListeners.forEach {
|
||||
it.onConnectivityChanged()
|
||||
}
|
||||
}
|
||||
homeServerPinger.canReachHomeServer {
|
||||
hasInternetAccess.set(it)
|
||||
}
|
||||
}
|
||||
|
||||
private fun unbind() {
|
||||
networkCallbackStrategy.unregister()
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
/*
|
||||
* Copyright 2020 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package im.vector.matrix.android.internal.network
|
||||
|
||||
import android.content.BroadcastReceiver
|
||||
import android.content.Context
|
||||
import android.content.Intent
|
||||
import android.net.ConnectivityManager
|
||||
import android.net.NetworkInfo
|
||||
import javax.inject.Inject
|
||||
|
||||
internal class NetworkInfoReceiver @Inject constructor() : BroadcastReceiver() {
|
||||
|
||||
var isConnectedCallback: ((Boolean) -> Unit)? = null
|
||||
|
||||
override fun onReceive(context: Context, intent: Intent) {
|
||||
val conn = context.getSystemService(Context.CONNECTIVITY_SERVICE) as ConnectivityManager
|
||||
val networkInfo: NetworkInfo? = conn.activeNetworkInfo
|
||||
isConnectedCallback?.invoke(networkInfo?.isConnected ?: false)
|
||||
}
|
||||
}
|
|
@ -18,6 +18,7 @@ package im.vector.matrix.android.internal.network
|
|||
|
||||
import im.vector.matrix.android.api.failure.Failure
|
||||
import kotlinx.coroutines.CancellationException
|
||||
import kotlinx.coroutines.delay
|
||||
import org.greenrobot.eventbus.EventBus
|
||||
import retrofit2.Call
|
||||
import java.io.IOException
|
||||
|
@ -27,6 +28,12 @@ internal suspend inline fun <DATA> executeRequest(eventBus: EventBus?,
|
|||
|
||||
internal class Request<DATA>(private val eventBus: EventBus?) {
|
||||
|
||||
var isRetryable = false
|
||||
var initialDelay: Long = 100L
|
||||
var maxDelay: Long = 10_000L
|
||||
var maxRetryCount = Int.MAX_VALUE
|
||||
private var currentRetryCount = 0
|
||||
private var currentDelay = initialDelay
|
||||
lateinit var apiCall: Call<DATA>
|
||||
|
||||
suspend fun execute(): DATA {
|
||||
|
@ -39,12 +46,18 @@ internal class Request<DATA>(private val eventBus: EventBus?) {
|
|||
throw response.toFailure(eventBus)
|
||||
}
|
||||
} catch (exception: Throwable) {
|
||||
throw when (exception) {
|
||||
is IOException -> Failure.NetworkConnection(exception)
|
||||
is Failure.ServerError,
|
||||
is Failure.OtherServerError -> exception
|
||||
is CancellationException -> Failure.Cancelled(exception)
|
||||
else -> Failure.Unknown(exception)
|
||||
if (isRetryable && currentRetryCount++ < maxRetryCount && exception is IOException) {
|
||||
delay(currentDelay)
|
||||
currentDelay = (currentDelay * 2L).coerceAtMost(maxDelay)
|
||||
return execute()
|
||||
} else {
|
||||
throw when (exception) {
|
||||
is IOException -> Failure.NetworkConnection(exception)
|
||||
is Failure.ServerError,
|
||||
is Failure.OtherServerError -> exception
|
||||
is CancellationException -> Failure.Cancelled(exception)
|
||||
else -> Failure.Unknown(exception)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -35,8 +35,11 @@ import im.vector.matrix.android.internal.database.LiveEntityObserver
|
|||
import im.vector.matrix.android.internal.database.SessionRealmConfigurationFactory
|
||||
import im.vector.matrix.android.internal.di.*
|
||||
import im.vector.matrix.android.internal.network.AccessTokenInterceptor
|
||||
import im.vector.matrix.android.internal.network.MerlinNetworkConnectivityChecker
|
||||
import im.vector.matrix.android.internal.network.DefaultNetworkConnectivityChecker
|
||||
import im.vector.matrix.android.internal.network.FallbackNetworkCallbackStrategy
|
||||
import im.vector.matrix.android.internal.network.NetworkCallbackStrategy
|
||||
import im.vector.matrix.android.internal.network.NetworkConnectivityChecker
|
||||
import im.vector.matrix.android.internal.network.PreferredNetworkCallbackStrategy
|
||||
import im.vector.matrix.android.internal.network.RetrofitFactory
|
||||
import im.vector.matrix.android.internal.network.interceptors.CurlLoggingInterceptor
|
||||
import im.vector.matrix.android.internal.session.group.GroupSummaryUpdater
|
||||
|
@ -52,6 +55,7 @@ import okhttp3.OkHttpClient
|
|||
import org.greenrobot.eventbus.EventBus
|
||||
import retrofit2.Retrofit
|
||||
import java.io.File
|
||||
import javax.inject.Provider
|
||||
|
||||
@Module
|
||||
internal abstract class SessionModule {
|
||||
|
@ -172,13 +176,26 @@ internal abstract class SessionModule {
|
|||
fun providesEventBus(): EventBus {
|
||||
return EventBus.builder().build()
|
||||
}
|
||||
|
||||
@JvmStatic
|
||||
@Provides
|
||||
@SessionScope
|
||||
fun providesNetworkCallbackStrategy(fallbackNetworkCallbackStrategy: Provider<FallbackNetworkCallbackStrategy>,
|
||||
preferredNetworkCallbackStrategy: Provider<PreferredNetworkCallbackStrategy>
|
||||
): NetworkCallbackStrategy {
|
||||
return if (android.os.Build.VERSION.SDK_INT >= android.os.Build.VERSION_CODES.N) {
|
||||
preferredNetworkCallbackStrategy.get()
|
||||
} else {
|
||||
fallbackNetworkCallbackStrategy.get()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Binds
|
||||
abstract fun bindSession(session: DefaultSession): Session
|
||||
|
||||
@Binds
|
||||
abstract fun bindNetworkConnectivityChecker(networkConnectivityChecker: MerlinNetworkConnectivityChecker): NetworkConnectivityChecker
|
||||
abstract fun bindNetworkConnectivityChecker(networkConnectivityChecker: DefaultNetworkConnectivityChecker): NetworkConnectivityChecker
|
||||
|
||||
@Binds
|
||||
@IntoSet
|
||||
|
@ -209,3 +226,4 @@ internal abstract class SessionModule {
|
|||
@Binds
|
||||
abstract fun bindHomeServerCapabilitiesService(homeServerCapabilitiesService: DefaultHomeServerCapabilitiesService): HomeServerCapabilitiesService
|
||||
}
|
||||
|
||||
|
|
|
@ -27,4 +27,10 @@ internal interface CapabilitiesAPI {
|
|||
*/
|
||||
@GET(NetworkConstants.URI_API_MEDIA_PREFIX_PATH_R0 + "config")
|
||||
fun getUploadCapabilities(): Call<GetUploadCapabilitiesResult>
|
||||
|
||||
/**
|
||||
* Request the versions
|
||||
*/
|
||||
@GET(NetworkConstants.URI_API_PREFIX_PATH_+"versions")
|
||||
fun getVersions(): Call<Unit>
|
||||
}
|
||||
|
|
|
@ -0,0 +1,50 @@
|
|||
/*
|
||||
* Copyright 2020 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package im.vector.matrix.android.internal.session.homeserver
|
||||
|
||||
import im.vector.matrix.android.api.failure.Failure
|
||||
import im.vector.matrix.android.internal.network.executeRequest
|
||||
import im.vector.matrix.android.internal.task.TaskExecutor
|
||||
import kotlinx.coroutines.delay
|
||||
import kotlinx.coroutines.launch
|
||||
import javax.inject.Inject
|
||||
|
||||
internal class HomeServerPinger @Inject constructor(private val taskExecutor: TaskExecutor,
|
||||
private val capabilitiesAPI: CapabilitiesAPI) {
|
||||
|
||||
fun canReachHomeServer(callback: (Boolean) -> Unit) {
|
||||
taskExecutor.executorScope.launch {
|
||||
val canReach = canReachHomeServer()
|
||||
callback(canReach)
|
||||
}
|
||||
}
|
||||
|
||||
suspend fun canReachHomeServer(): Boolean {
|
||||
return try {
|
||||
executeRequest<Unit>(null) {
|
||||
apiCall = capabilitiesAPI.getVersions()
|
||||
}
|
||||
true
|
||||
} catch (throwable: Throwable) {
|
||||
if (throwable is Failure.OtherServerError) {
|
||||
(throwable.httpCode == 404 || throwable.httpCode == 400)
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -66,14 +66,14 @@ internal class DefaultSetReadMarkersTask @Inject constructor(
|
|||
val markers = HashMap<String, String>()
|
||||
Timber.v("Execute set read marker with params: $params")
|
||||
val latestSyncedEventId = latestSyncedEventId(params.roomId)
|
||||
val fullyReadEventId = if(params.forceReadMarker){
|
||||
val fullyReadEventId = if (params.forceReadMarker) {
|
||||
latestSyncedEventId
|
||||
}else {
|
||||
} else {
|
||||
params.fullyReadEventId
|
||||
}
|
||||
val readReceiptEventId = if(params.forceReadReceipt){
|
||||
val readReceiptEventId = if (params.forceReadReceipt) {
|
||||
latestSyncedEventId
|
||||
}else {
|
||||
} else {
|
||||
params.readReceiptEventId
|
||||
}
|
||||
if (fullyReadEventId != null && !isReadMarkerMoreRecent(monarchy, params.roomId, fullyReadEventId)) {
|
||||
|
@ -97,8 +97,8 @@ internal class DefaultSetReadMarkersTask @Inject constructor(
|
|||
if (markers.isEmpty()) {
|
||||
return
|
||||
}
|
||||
networkConnectivityChecker.waitUntilConnected()
|
||||
executeRequest<Unit>(eventBus) {
|
||||
isRetryable = true
|
||||
apiCall = roomAPI.sendReadMarker(params.roomId, markers)
|
||||
}
|
||||
}
|
||||
|
@ -119,7 +119,7 @@ internal class DefaultSetReadMarkersTask @Inject constructor(
|
|||
val readReceiptContent = ReadReceiptHandler.createContent(userId, readReceiptId)
|
||||
readReceiptHandler.handle(realm, roomId, readReceiptContent, false)
|
||||
}
|
||||
if(shouldUpdateRoomSummary){
|
||||
if (shouldUpdateRoomSummary) {
|
||||
val roomSummary = RoomSummaryEntity.where(realm, roomId).findFirst()
|
||||
?: return@awaitTransaction
|
||||
roomSummary.notificationCount = 0
|
||||
|
|
|
@ -16,7 +16,6 @@
|
|||
|
||||
package im.vector.matrix.android.internal.session.room.timeline
|
||||
|
||||
import im.vector.matrix.android.internal.network.NetworkConnectivityChecker
|
||||
import im.vector.matrix.android.internal.network.executeRequest
|
||||
import im.vector.matrix.android.internal.session.filter.FilterRepository
|
||||
import im.vector.matrix.android.internal.session.room.RoomAPI
|
||||
|
@ -38,14 +37,13 @@ internal class DefaultPaginationTask @Inject constructor(
|
|||
private val roomAPI: RoomAPI,
|
||||
private val filterRepository: FilterRepository,
|
||||
private val tokenChunkEventPersistor: TokenChunkEventPersistor,
|
||||
private val eventBus: EventBus,
|
||||
private val networkConnectivityChecker: NetworkConnectivityChecker
|
||||
private val eventBus: EventBus
|
||||
) : PaginationTask {
|
||||
|
||||
override suspend fun execute(params: PaginationTask.Params): TokenChunkEventPersistor.Result {
|
||||
networkConnectivityChecker.waitUntilConnected()
|
||||
val filter = filterRepository.getRoomFilter()
|
||||
val chunk = executeRequest<PaginationResponse>(eventBus) {
|
||||
isRetryable = true
|
||||
apiCall = roomAPI.getRoomMessagesFrom(params.roomId, params.from, params.direction.value, params.limit, filter)
|
||||
}
|
||||
return tokenChunkEventPersistor.insertInDb(chunk, params.roomId, params.direction)
|
||||
|
|
|
@ -25,6 +25,8 @@ import im.vector.matrix.android.api.session.sync.SyncState
|
|||
import im.vector.matrix.android.internal.network.NetworkConnectivityChecker
|
||||
import im.vector.matrix.android.internal.session.sync.SyncTask
|
||||
import im.vector.matrix.android.internal.util.BackgroundDetectionObserver
|
||||
import im.vector.matrix.android.internal.util.Debouncer
|
||||
import im.vector.matrix.android.internal.util.createUIHandler
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.SupervisorJob
|
||||
import kotlinx.coroutines.cancelChildren
|
||||
|
@ -33,7 +35,10 @@ import kotlinx.coroutines.launch
|
|||
import kotlinx.coroutines.runBlocking
|
||||
import timber.log.Timber
|
||||
import java.net.SocketTimeoutException
|
||||
import java.util.Timer
|
||||
import java.util.TimerTask
|
||||
import javax.inject.Inject
|
||||
import kotlin.concurrent.schedule
|
||||
|
||||
private const val RETRY_WAIT_TIME_MS = 10_000L
|
||||
private const val DEFAULT_LONG_POOL_TIMEOUT = 30_000L
|
||||
|
@ -47,9 +52,12 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
|
|||
private var liveState = MutableLiveData<SyncState>()
|
||||
private val lock = Object()
|
||||
private val syncScope = CoroutineScope(SupervisorJob())
|
||||
private val debouncer = Debouncer(createUIHandler())
|
||||
|
||||
private var canReachServer = true
|
||||
private var isStarted = false
|
||||
private var isTokenValid = true
|
||||
private var retryNoNetworkTask: TimerTask? = null
|
||||
|
||||
init {
|
||||
updateStateTo(SyncState.Idle)
|
||||
|
@ -64,7 +72,8 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
|
|||
if (!isStarted) {
|
||||
Timber.v("Resume sync...")
|
||||
isStarted = true
|
||||
// Check again the token validity
|
||||
// Check again server availability and the token validity
|
||||
canReachServer = true
|
||||
isTokenValid = true
|
||||
lock.notify()
|
||||
}
|
||||
|
@ -74,6 +83,7 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
|
|||
if (isStarted) {
|
||||
Timber.v("Pause sync...")
|
||||
isStarted = false
|
||||
retryNoNetworkTask?.cancel()
|
||||
syncScope.coroutineContext.cancelChildren()
|
||||
}
|
||||
}
|
||||
|
@ -81,6 +91,7 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
|
|||
fun kill() = synchronized(lock) {
|
||||
Timber.v("Kill sync...")
|
||||
updateStateTo(SyncState.Killing)
|
||||
retryNoNetworkTask?.cancel()
|
||||
syncScope.coroutineContext.cancelChildren()
|
||||
lock.notify()
|
||||
}
|
||||
|
@ -89,9 +100,10 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
|
|||
return liveState
|
||||
}
|
||||
|
||||
override fun onConnect() {
|
||||
Timber.v("Network is back")
|
||||
override fun onConnectivityChanged() {
|
||||
retryNoNetworkTask?.cancel()
|
||||
synchronized(lock) {
|
||||
canReachServer = true
|
||||
lock.notify()
|
||||
}
|
||||
}
|
||||
|
@ -108,11 +120,18 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
|
|||
updateStateTo(SyncState.Paused)
|
||||
synchronized(lock) { lock.wait() }
|
||||
Timber.v("...unlocked")
|
||||
} else if (!networkConnectivityChecker.hasInternetAccess(forcePing = false)) {
|
||||
} else if (!canReachServer) {
|
||||
Timber.v("No network. Waiting...")
|
||||
updateStateTo(SyncState.NoNetwork)
|
||||
// We force retrying in RETRY_WAIT_TIME_MS maximum. Otherwise it will be unlocked by onConnectivityChanged() or restart()
|
||||
retryNoNetworkTask = Timer(SyncState.NoNetwork.toString(), false).schedule(RETRY_WAIT_TIME_MS) {
|
||||
synchronized(lock) {
|
||||
canReachServer = true
|
||||
lock.notify()
|
||||
}
|
||||
}
|
||||
synchronized(lock) { lock.wait() }
|
||||
Timber.v("...unlocked")
|
||||
Timber.v("...retry")
|
||||
} else if (!isTokenValid) {
|
||||
Timber.v("Token is invalid. Waiting...")
|
||||
updateStateTo(SyncState.InvalidToken)
|
||||
|
@ -145,6 +164,9 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
|
|||
try {
|
||||
syncTask.execute(params)
|
||||
} catch (failure: Throwable) {
|
||||
if (failure is Failure.NetworkConnection) {
|
||||
canReachServer = false
|
||||
}
|
||||
if (failure is Failure.NetworkConnection && failure.cause is SocketTimeoutException) {
|
||||
// Timeout are not critical
|
||||
Timber.v("Timeout")
|
||||
|
@ -175,7 +197,9 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
|
|||
private fun updateStateTo(newState: SyncState) {
|
||||
Timber.v("Update state from $state to $newState")
|
||||
state = newState
|
||||
liveState.postValue(newState)
|
||||
debouncer.debounce("post_state", Runnable {
|
||||
liveState.value = newState
|
||||
}, 150)
|
||||
}
|
||||
|
||||
override fun onMoveToForeground() {
|
||||
|
|
|
@ -36,7 +36,6 @@ internal data class ConfigurableTask<PARAMS, RESULT>(
|
|||
val id: UUID,
|
||||
val callbackThread: TaskThread,
|
||||
val executionThread: TaskThread,
|
||||
val retryCount: Int,
|
||||
val callback: MatrixCallback<RESULT>
|
||||
|
||||
) : Task<PARAMS, RESULT> by task {
|
||||
|
@ -57,7 +56,6 @@ internal data class ConfigurableTask<PARAMS, RESULT>(
|
|||
id = id,
|
||||
callbackThread = callbackThread,
|
||||
executionThread = executionThread,
|
||||
retryCount = retryCount,
|
||||
callback = callback
|
||||
)
|
||||
}
|
||||
|
|
|
@ -19,13 +19,11 @@ package im.vector.matrix.android.internal.task
|
|||
import im.vector.matrix.android.api.util.Cancelable
|
||||
import im.vector.matrix.android.internal.di.MatrixScope
|
||||
import im.vector.matrix.android.internal.extensions.foldToCallback
|
||||
import im.vector.matrix.android.internal.network.NetworkConnectivityChecker
|
||||
import im.vector.matrix.android.internal.util.MatrixCoroutineDispatchers
|
||||
import im.vector.matrix.android.internal.util.toCancelable
|
||||
import kotlinx.coroutines.CoroutineScope
|
||||
import kotlinx.coroutines.SupervisorJob
|
||||
import kotlinx.coroutines.cancelChildren
|
||||
import kotlinx.coroutines.delay
|
||||
import kotlinx.coroutines.launch
|
||||
import kotlinx.coroutines.withContext
|
||||
import timber.log.Timber
|
||||
|
@ -43,10 +41,8 @@ internal class TaskExecutor @Inject constructor(private val coroutineDispatchers
|
|||
val resultOrFailure = runCatching {
|
||||
withContext(task.executionThread.toDispatcher()) {
|
||||
Timber.v("Enqueue task $task")
|
||||
retry(task.retryCount) {
|
||||
Timber.v("Execute task $task on ${Thread.currentThread().name}")
|
||||
task.execute(task.params)
|
||||
}
|
||||
Timber.v("Execute task $task on ${Thread.currentThread().name}")
|
||||
task.execute(task.params)
|
||||
}
|
||||
}
|
||||
resultOrFailure
|
||||
|
@ -60,25 +56,6 @@ internal class TaskExecutor @Inject constructor(private val coroutineDispatchers
|
|||
|
||||
fun cancelAll() = executorScope.coroutineContext.cancelChildren()
|
||||
|
||||
private suspend fun <T> retry(
|
||||
times: Int = Int.MAX_VALUE,
|
||||
initialDelay: Long = 100, // 0.1 second
|
||||
maxDelay: Long = 10_000, // 10 second
|
||||
factor: Double = 2.0,
|
||||
block: suspend () -> T): T {
|
||||
var currentDelay = initialDelay
|
||||
repeat(times - 1) {
|
||||
try {
|
||||
return block()
|
||||
} catch (e: Exception) {
|
||||
Timber.v("Retry task after $currentDelay ms")
|
||||
delay(currentDelay)
|
||||
currentDelay = (currentDelay * factor).toLong().coerceAtMost(maxDelay)
|
||||
}
|
||||
}
|
||||
return block()
|
||||
}
|
||||
|
||||
private fun TaskThread.toDispatcher() = when (this) {
|
||||
TaskThread.MAIN -> coroutineDispatchers.main
|
||||
TaskThread.COMPUTATION -> coroutineDispatchers.computation
|
||||
|
|
|
@ -67,6 +67,7 @@ class VectorSyncService : SyncService() {
|
|||
}
|
||||
|
||||
private fun reschedule(sessionId: String, delay: Long) {
|
||||
AlarmSyncBroadcastReceiver.cancelAlarm(applicationContext)
|
||||
AlarmSyncBroadcastReceiver.scheduleAlarm(applicationContext, sessionId, delay)
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue