Start reworking networkConnectivityCheck (WIP)

This commit is contained in:
ganfra 2020-01-21 18:48:19 +01:00 committed by ganfra
parent 2bddf61afe
commit d93050240a
11 changed files with 189 additions and 155 deletions

View file

@ -56,8 +56,6 @@ internal interface MatrixComponent {
fun sessionParamsStore(): SessionParamsStore
fun networkConnectivityChecker(): NetworkConnectivityChecker
fun backgroundDetectionObserver(): BackgroundDetectionObserver
fun sessionManager(): SessionManager

View file

@ -18,104 +18,36 @@ package im.vector.matrix.android.internal.network
import android.content.Context
import androidx.annotation.WorkerThread
import com.novoda.merlin.Endpoint
import com.novoda.merlin.Merlin
import com.novoda.merlin.MerlinsBeard
import im.vector.matrix.android.internal.di.MatrixScope
import com.novoda.merlin.ResponseCodeValidator
import im.vector.matrix.android.api.auth.data.HomeServerConnectionConfig
import im.vector.matrix.android.internal.session.SessionScope
import im.vector.matrix.android.internal.util.BackgroundDetectionObserver
import im.vector.matrix.android.internal.util.MatrixCoroutineDispatchers
import kotlinx.coroutines.withContext
import timber.log.Timber
import java.util.Collections
import java.util.concurrent.atomic.AtomicBoolean
import javax.inject.Inject
import kotlin.coroutines.resume
import kotlin.coroutines.suspendCoroutine
@MatrixScope
internal class NetworkConnectivityChecker @Inject constructor(private val context: Context,
private val backgroundDetectionObserver: BackgroundDetectionObserver)
: BackgroundDetectionObserver.Listener {
private val merlin = Merlin.Builder()
.withConnectableCallbacks()
.withDisconnectableCallbacks()
.build(context)
private val merlinsBeard = MerlinsBeard.Builder().build(context)
private val listeners = Collections.synchronizedSet(LinkedHashSet<Listener>())
private var hasInternetAccess = merlinsBeard.isConnected
init {
backgroundDetectionObserver.register(this)
}
interface NetworkConnectivityChecker {
/**
* Returns true when internet is available
*/
@WorkerThread
fun hasInternetAccess(): Boolean {
// If we are in background we have unbound merlin, so we have to check
return if (backgroundDetectionObserver.isInBackground) {
merlinsBeard.hasInternetAccess()
} else {
hasInternetAccess
}
}
fun hasInternetAccess(forcePing: Boolean): Boolean
override fun onMoveToForeground() {
merlin.bind()
merlinsBeard.hasInternetAccess {
hasInternetAccess = it
}
merlin.registerDisconnectable {
if (hasInternetAccess) {
Timber.v("On Disconnect")
hasInternetAccess = false
val localListeners = listeners.toList()
localListeners.forEach {
it.onDisconnect()
}
}
}
merlin.registerConnectable {
if (!hasInternetAccess) {
Timber.v("On Connect")
hasInternetAccess = true
val localListeners = listeners.toList()
localListeners.forEach {
it.onConnect()
}
}
}
}
/**
* Wait until we get internet connection.
*/
suspend fun waitUntilConnected()
override fun onMoveToBackground() {
merlin.unbind()
}
// In background you won't get notification as merlin is unbound
suspend fun waitUntilConnected() {
if (hasInternetAccess) {
return
} else {
Timber.v("Waiting for network...")
suspendCoroutine<Unit> { continuation ->
register(object : Listener {
override fun onConnect() {
unregister(this)
Timber.v("Connected to network...")
continuation.resume(Unit)
}
})
}
}
}
fun register(listener: Listener) {
listeners.add(listener)
}
fun unregister(listener: Listener) {
listeners.remove(listener)
}
fun register(listener: Listener)
fun unregister(listener: Listener)
interface Listener {
fun onConnect() {
@ -125,3 +57,138 @@ internal class NetworkConnectivityChecker @Inject constructor(private val contex
}
}
}
@SessionScope
internal class MerlinNetworkConnectivityChecker @Inject constructor(context: Context,
homeServerConnectionConfig: HomeServerConnectionConfig,
private val coroutineDispatchers: MatrixCoroutineDispatchers,
private val backgroundDetectionObserver: BackgroundDetectionObserver)
: NetworkConnectivityChecker {
private val waitingForNetwork = AtomicBoolean(false)
private val isMerlinBounded = AtomicBoolean(false)
private val endpointString = "${homeServerConnectionConfig.homeServerUri}/_matrix/client/versions"
private val endpoint = Endpoint.from(endpointString)
private val responseCodeValidator = ResponseCodeValidator { responseCode ->
responseCode == 204 || responseCode == 400 || responseCode == 404
}
private val merlin = Merlin.Builder()
.withEndpoint(endpoint)
.withResponseCodeValidator(responseCodeValidator)
.withAllCallbacks()
.build(context)
private val merlinsBeard = MerlinsBeard.Builder()
.withEndpoint(endpoint)
.withResponseCodeValidator(responseCodeValidator)
.build(context)
private val hasInternetAccess = AtomicBoolean(merlinsBeard.isConnected)
private val listeners = Collections.synchronizedSet(LinkedHashSet<NetworkConnectivityChecker.Listener>())
private val backgroundDetectionObserverListener = object : BackgroundDetectionObserver.Listener {
override fun onMoveToForeground() {
bindMerlinIfNeeded()
}
override fun onMoveToBackground() {
unbindMerlinIfNeeded()
}
}
/**
* Returns true when internet is available
*/
@WorkerThread
override fun hasInternetAccess(forcePing: Boolean): Boolean {
return if (forcePing) {
merlinsBeard.hasInternetAccess()
} else {
hasInternetAccess.get()
}
}
private fun bindMerlinIfNeeded() {
if (isMerlinBounded.get()) {
return
}
Timber.v("Bind merlin")
isMerlinBounded.set(true)
merlin.bind()
merlinsBeard.hasInternetAccess {
hasInternetAccess.set(it)
}
merlin.registerBindable {
Timber.v("On Network available: ${it.isAvailable}")
}
merlin.registerDisconnectable {
Timber.v("On Disconnect")
hasInternetAccess.set(false)
val localListeners = listeners.toList()
localListeners.forEach {
it.onDisconnect()
}
}
merlin.registerConnectable {
Timber.v("On Connect")
hasInternetAccess.set(true)
val localListeners = listeners.toList()
localListeners.forEach {
it.onConnect()
}
}
}
private fun unbindMerlinIfNeeded() {
if (backgroundDetectionObserver.isInBackground && !waitingForNetwork.get() && isMerlinBounded.get()) {
isMerlinBounded.set(false)
Timber.v("Unbind merlin")
merlin.unbind()
}
}
override suspend fun waitUntilConnected() {
val hasInternetAccess = withContext(coroutineDispatchers.io) {
merlinsBeard.hasInternetAccess()
}
if (hasInternetAccess) {
return
} else {
waitingForNetwork.set(true)
bindMerlinIfNeeded()
Timber.v("Waiting for network...")
suspendCoroutine<Unit> { continuation ->
register(object : NetworkConnectivityChecker.Listener {
override fun onConnect() {
unregister(this)
waitingForNetwork.set(false)
unbindMerlinIfNeeded()
Timber.v("Connected to network...")
continuation.resume(Unit)
}
})
}
}
}
override fun register(listener: NetworkConnectivityChecker.Listener) {
if (listeners.isEmpty()) {
if (backgroundDetectionObserver.isInBackground) {
unbindMerlinIfNeeded()
} else {
bindMerlinIfNeeded()
}
backgroundDetectionObserver.register(backgroundDetectionObserverListener)
}
listeners.add(listener)
}
override fun unregister(listener: NetworkConnectivityChecker.Listener) {
listeners.remove(listener)
if (listeners.isEmpty()) {
backgroundDetectionObserver.unregister(backgroundDetectionObserverListener)
}
}
}

View file

@ -35,6 +35,8 @@ import im.vector.matrix.android.internal.database.LiveEntityObserver
import im.vector.matrix.android.internal.database.SessionRealmConfigurationFactory
import im.vector.matrix.android.internal.di.*
import im.vector.matrix.android.internal.network.AccessTokenInterceptor
import im.vector.matrix.android.internal.network.MerlinNetworkConnectivityChecker
import im.vector.matrix.android.internal.network.NetworkConnectivityChecker
import im.vector.matrix.android.internal.network.RetrofitFactory
import im.vector.matrix.android.internal.network.interceptors.CurlLoggingInterceptor
import im.vector.matrix.android.internal.session.group.GroupSummaryUpdater
@ -175,6 +177,9 @@ internal abstract class SessionModule {
@Binds
abstract fun bindSession(session: DefaultSession): Session
@Binds
abstract fun bindNetworkConnectivityChecker(networkConnectivityChecker: MerlinNetworkConnectivityChecker): NetworkConnectivityChecker
@Binds
@IntoSet
abstract fun bindGroupSummaryUpdater(groupSummaryUpdater: GroupSummaryUpdater): LiveEntityObserver

View file

@ -16,6 +16,7 @@
package im.vector.matrix.android.internal.session.room.timeline
import im.vector.matrix.android.internal.network.NetworkConnectivityChecker
import im.vector.matrix.android.internal.network.executeRequest
import im.vector.matrix.android.internal.session.filter.FilterRepository
import im.vector.matrix.android.internal.session.room.RoomAPI
@ -37,10 +38,12 @@ internal class DefaultPaginationTask @Inject constructor(
private val roomAPI: RoomAPI,
private val filterRepository: FilterRepository,
private val tokenChunkEventPersistor: TokenChunkEventPersistor,
private val eventBus: EventBus
private val eventBus: EventBus,
private val networkConnectivityChecker: NetworkConnectivityChecker
) : PaginationTask {
override suspend fun execute(params: PaginationTask.Params): TokenChunkEventPersistor.Result {
networkConnectivityChecker.waitUntilConnected()
val filter = filterRepository.getRoomFilter()
val chunk = executeRequest<PaginationResponse>(eventBus) {
apiCall = roomAPI.getRoomMessagesFrom(params.roomId, params.from, params.direction.value, params.limit, filter)

View file

@ -38,7 +38,6 @@ import im.vector.matrix.android.internal.database.query.FilterContent
import im.vector.matrix.android.internal.database.query.findAllInRoomWithSendStates
import im.vector.matrix.android.internal.database.query.where
import im.vector.matrix.android.internal.database.query.whereInRoom
import im.vector.matrix.android.internal.task.TaskConstraints
import im.vector.matrix.android.internal.task.TaskExecutor
import im.vector.matrix.android.internal.task.configureWith
import im.vector.matrix.android.internal.util.Debouncer
@ -504,7 +503,6 @@ internal class DefaultTimeline(
Timber.v("Should fetch $limit items $direction")
cancelableBag += paginationTask
.configureWith(params) {
this.constraints = TaskConstraints(connectedToNetwork = true)
this.callback = object : MatrixCallback<TokenChunkEventPersistor.Result> {
override fun onSuccess(data: TokenChunkEventPersistor.Result) {
when (data) {

View file

@ -28,7 +28,10 @@ import im.vector.matrix.android.internal.session.sync.SyncTask
import im.vector.matrix.android.internal.task.TaskExecutor
import im.vector.matrix.android.internal.util.BackgroundDetectionObserver
import im.vector.matrix.android.internal.util.MatrixCoroutineDispatchers
import kotlinx.coroutines.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancelChildren
import kotlinx.coroutines.launch
import timber.log.Timber
import java.util.concurrent.atomic.AtomicBoolean
@ -97,14 +100,6 @@ abstract class SyncService : Service() {
}
private suspend fun doSync() {
if (!networkConnectivityChecker.hasInternetAccess()) {
Timber.v("No network reschedule to avoid wasting resources")
sessionId?.also {
onRescheduleAsked(it, isInitialSync, delay = 10_000L)
}
stopMe()
return
}
Timber.v("Execute sync request with timeout 0")
val params = SyncTask.Params(TIME_OUT)
try {
@ -120,9 +115,11 @@ abstract class SyncService : Service() {
if (throwable.isTokenError()) {
stopMe()
} else {
Timber.v("Retry to sync in 5s")
delay(DELAY_FAILURE)
doSync()
Timber.v("Should be rescheduled to avoid wasting resources")
sessionId?.also {
onRescheduleAsked(it, isInitialSync, delay = 10_000L)
}
stopMe()
}
}
}
@ -165,6 +162,5 @@ abstract class SyncService : Service() {
companion object {
const val EXTRA_SESSION_ID = "EXTRA_SESSION_ID"
private const val TIME_OUT = 0L
private const val DELAY_FAILURE = 5_000L
}
}

View file

@ -25,7 +25,12 @@ import im.vector.matrix.android.api.session.sync.SyncState
import im.vector.matrix.android.internal.network.NetworkConnectivityChecker
import im.vector.matrix.android.internal.session.sync.SyncTask
import im.vector.matrix.android.internal.util.BackgroundDetectionObserver
import kotlinx.coroutines.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancelChildren
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import timber.log.Timber
import java.net.SocketTimeoutException
import javax.inject.Inject
@ -98,16 +103,16 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
backgroundDetectionObserver.register(this)
while (state != SyncState.Killing) {
Timber.v("Entering loop, state: $state")
if (!networkConnectivityChecker.hasInternetAccess()) {
Timber.v("No network. Waiting...")
updateStateTo(SyncState.NoNetwork)
synchronized(lock) { lock.wait() }
Timber.v("...unlocked")
} else if (!isStarted) {
if (!isStarted) {
Timber.v("Sync is Paused. Waiting...")
updateStateTo(SyncState.Paused)
synchronized(lock) { lock.wait() }
Timber.v("...unlocked")
} else if (!networkConnectivityChecker.hasInternetAccess(forcePing = false)) {
Timber.v("No network. Waiting...")
updateStateTo(SyncState.NoNetwork)
synchronized(lock) { lock.wait() }
Timber.v("...unlocked")
} else if (!isTokenValid) {
Timber.v("Token is invalid. Waiting...")
updateStateTo(SyncState.InvalidToken)

View file

@ -36,7 +36,6 @@ internal data class ConfigurableTask<PARAMS, RESULT>(
val id: UUID,
val callbackThread: TaskThread,
val executionThread: TaskThread,
val constraints: TaskConstraints,
val retryCount: Int,
val callback: MatrixCallback<RESULT>
@ -48,7 +47,6 @@ internal data class ConfigurableTask<PARAMS, RESULT>(
var id: UUID = UUID.randomUUID(),
var callbackThread: TaskThread = TaskThread.MAIN,
var executionThread: TaskThread = TaskThread.IO,
var constraints: TaskConstraints = TaskConstraints(),
var retryCount: Int = 0,
var callback: MatrixCallback<RESULT> = object : MatrixCallback<RESULT> {}
) {
@ -59,7 +57,6 @@ internal data class ConfigurableTask<PARAMS, RESULT>(
id = id,
callbackThread = callbackThread,
executionThread = executionThread,
constraints = constraints,
retryCount = retryCount,
callback = callback
)

View file

@ -1,22 +0,0 @@
/*
* Copyright 2019 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.matrix.android.internal.task
data class TaskConstraints(
val connectedToNetwork: Boolean = false
)

View file

@ -22,14 +22,18 @@ import im.vector.matrix.android.internal.extensions.foldToCallback
import im.vector.matrix.android.internal.network.NetworkConnectivityChecker
import im.vector.matrix.android.internal.util.MatrixCoroutineDispatchers
import im.vector.matrix.android.internal.util.toCancelable
import kotlinx.coroutines.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancelChildren
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext
import timber.log.Timber
import javax.inject.Inject
import kotlin.coroutines.EmptyCoroutineContext
@MatrixScope
internal class TaskExecutor @Inject constructor(private val coroutineDispatchers: MatrixCoroutineDispatchers,
private val networkConnectivityChecker: NetworkConnectivityChecker) {
internal class TaskExecutor @Inject constructor(private val coroutineDispatchers: MatrixCoroutineDispatchers) {
private val executorScope = CoroutineScope(SupervisorJob())
@ -40,10 +44,6 @@ internal class TaskExecutor @Inject constructor(private val coroutineDispatchers
withContext(task.executionThread.toDispatcher()) {
Timber.v("Enqueue task $task")
retry(task.retryCount) {
if (task.constraints.connectedToNetwork) {
Timber.v("Waiting network for $task")
networkConnectivityChecker.waitUntilConnected()
}
Timber.v("Execute task $task on ${Thread.currentThread().name}")
task.execute(task.params)
}

View file

@ -15,15 +15,13 @@
*/
package im.vector.riotx.core.services
import android.app.AlarmManager
import android.app.NotificationManager
import android.app.PendingIntent
import android.content.Context
import android.content.Intent
import android.os.Build
import im.vector.matrix.android.internal.session.sync.job.SyncService
import im.vector.riotx.R
import im.vector.riotx.core.extensions.vectorComponent
import im.vector.riotx.fdroid.receiver.AlarmSyncBroadcastReceiver
import im.vector.riotx.features.notifications.NotificationUtils
class VectorSyncService : SyncService() {
@ -69,17 +67,6 @@ class VectorSyncService : SyncService() {
}
private fun reschedule(sessionId: String, delay: Long) {
val pendingIntent = if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {
PendingIntent.getForegroundService(this, 0, newIntent(this, sessionId), 0)
} else {
PendingIntent.getService(this, 0, newIntent(this, sessionId), 0)
}
val firstMillis = System.currentTimeMillis() + delay
val alarmMgr = getSystemService(Context.ALARM_SERVICE) as AlarmManager
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.M) {
alarmMgr.setAndAllowWhileIdle(AlarmManager.RTC_WAKEUP, firstMillis, pendingIntent)
} else {
alarmMgr.set(AlarmManager.RTC_WAKEUP, firstMillis, pendingIntent)
}
AlarmSyncBroadcastReceiver.scheduleAlarm(applicationContext, sessionId, delay)
}
}