Merge pull request #785 from vector-im/feature/initial_sync

Feature/initial sync
This commit is contained in:
Benoit Marty 2019-12-20 18:18:32 +01:00 committed by GitHub
commit 90f2199eb7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
61 changed files with 975 additions and 565 deletions

View file

@ -5,6 +5,7 @@ Features ✨:
-
Improvements 🙌:
- The initial sync is now handled by a foreground service
- Render aliases and canonical alias change in the timeline
- Fix autocompletion issues and add support for rooms and groups
@ -12,7 +13,8 @@ Other changes:
-
Bugfix 🐛:
-
- Fix avatar image disappearing (#777)
- Fix read marker banner when permalink
Translations 🗣:
-

View file

@ -19,4 +19,4 @@ package im.vector.matrix.android
import im.vector.matrix.android.internal.util.MatrixCoroutineDispatchers
import kotlinx.coroutines.Dispatchers.Main
internal val testCoroutineDispatchers = MatrixCoroutineDispatchers(Main, Main, Main, Main, Main)
internal val testCoroutineDispatchers = MatrixCoroutineDispatchers(Main, Main, Main, Main)

View file

@ -1,60 +0,0 @@
/*
* Copyright 2019 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.matrix.android.auth
import androidx.test.annotation.UiThreadTest
import androidx.test.ext.junit.runners.AndroidJUnit4
import androidx.test.rule.GrantPermissionRule
import im.vector.matrix.android.InstrumentedTest
import im.vector.matrix.android.OkReplayRuleChainNoActivity
import im.vector.matrix.android.api.auth.AuthenticationService
import okreplay.*
import org.junit.ClassRule
import org.junit.Rule
import org.junit.Test
import org.junit.runner.RunWith
@RunWith(AndroidJUnit4::class)
internal class AuthenticationServiceTest : InstrumentedTest {
lateinit var authenticationService: AuthenticationService
lateinit var okReplayInterceptor: OkReplayInterceptor
private val okReplayConfig = OkReplayConfig.Builder()
.tapeRoot(AndroidTapeRoot(
context(), javaClass))
.defaultMode(TapeMode.READ_WRITE) // or TapeMode.READ_ONLY
.sslEnabled(true)
.interceptor(okReplayInterceptor)
.build()
@get:Rule
val testRule = OkReplayRuleChainNoActivity(okReplayConfig).get()
@Test
@UiThreadTest
@OkReplay(tape = "auth", mode = TapeMode.READ_WRITE)
fun auth() {
}
companion object {
@ClassRule
@JvmField
val grantExternalStoragePermissionRule: GrantPermissionRule =
GrantPermissionRule.grant(android.Manifest.permission.WRITE_EXTERNAL_STORAGE)
}
}

View file

@ -16,20 +16,31 @@
package im.vector.matrix.android.internal.crypto
import androidx.test.ext.junit.runners.AndroidJUnit4
import im.vector.matrix.android.InstrumentedTest
import im.vector.matrix.android.internal.crypto.model.OlmSessionWrapper
import im.vector.matrix.android.internal.crypto.store.IMXCryptoStore
import io.realm.Realm
import org.junit.Assert.*
import org.junit.Before
import org.junit.Test
import org.junit.runner.RunWith
import org.matrix.olm.OlmAccount
import org.matrix.olm.OlmManager
import org.matrix.olm.OlmSession
private const val DUMMY_DEVICE_KEY = "DeviceKey"
class CryptoStoreTest {
@RunWith(AndroidJUnit4::class)
class CryptoStoreTest : InstrumentedTest {
private val cryptoStoreHelper = CryptoStoreHelper()
@Before
fun setup() {
Realm.init(context())
}
@Test
fun test_metadata_realm_ok() {
val cryptoStore: IMXCryptoStore = cryptoStoreHelper.createStore()

View file

@ -21,6 +21,7 @@ import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.InstrumentedTest
import im.vector.matrix.android.internal.database.helper.*
import im.vector.matrix.android.internal.database.model.ChunkEntity
import im.vector.matrix.android.internal.database.model.SessionRealmModule
import im.vector.matrix.android.internal.session.room.timeline.PaginationDirection
import im.vector.matrix.android.session.room.timeline.RoomDataHelper.createFakeListOfEvents
import im.vector.matrix.android.session.room.timeline.RoomDataHelper.createFakeMessageEvent
@ -43,7 +44,11 @@ internal class ChunkEntityTest : InstrumentedTest {
@Before
fun setup() {
Realm.init(context())
val testConfig = RealmConfiguration.Builder().inMemory().name("test-realm").build()
val testConfig = RealmConfiguration.Builder()
.inMemory()
.name("test-realm")
.modules(SessionRealmModule())
.build()
monarchy = Monarchy.Builder().setRealmConfiguration(testConfig).build()
}

View file

@ -66,7 +66,7 @@ internal class TimelineTest : InstrumentedTest {
// val latch = CountDownLatch(2)
// var timelineEvents: List<TimelineEvent> = emptyList()
// timeline.listener = object : Timeline.Listener {
// override fun onUpdated(snapshot: List<TimelineEvent>) {
// override fun onTimelineUpdated(snapshot: List<TimelineEvent>) {
// if (snapshot.isNotEmpty()) {
// if (initialLoad == 0) {
// initialLoad = snapshot.size

View file

@ -14,13 +14,15 @@
* limitations under the License.
*/
package im.vector.riotx.core.error
package im.vector.matrix.android.api.failure
import im.vector.matrix.android.api.failure.Failure
import im.vector.matrix.android.api.failure.MatrixError
import javax.net.ssl.HttpsURLConnection
fun Throwable.is401(): Boolean {
return (this is Failure.ServerError && httpCode == HttpsURLConnection.HTTP_UNAUTHORIZED /* 401 */
&& error.code == MatrixError.M_UNAUTHORIZED)
}
fun Throwable.is401() =
this is Failure.ServerError
&& httpCode == HttpsURLConnection.HTTP_UNAUTHORIZED /* 401 */
&& error.code == MatrixError.M_UNAUTHORIZED
fun Throwable.isTokenError() =
this is Failure.ServerError
&& (error.code == MatrixError.M_UNKNOWN_TOKEN || error.code == MatrixError.M_MISSING_TOKEN)

View file

@ -109,6 +109,11 @@ interface Session :
*/
fun syncState(): LiveData<SyncState>
/**
* This methods return true if an initial sync has been processed
*/
fun hasAlreadySynced(): Boolean
/**
* This method allow to close a session. It does stop some services.
*/

View file

@ -24,7 +24,7 @@ import im.vector.matrix.android.api.MatrixCallback
interface CacheService {
/**
* Clear the whole cached data, except credentials. Once done, the session is closed and has to be opened again
* Clear the whole cached data, except credentials. Once done, the sync has to be restarted by the sdk user.
*/
fun clearCache(callback: MatrixCallback<Unit>)
}

View file

@ -65,7 +65,7 @@ interface Timeline {
/**
* This is the main method to enrich the timeline with new data.
* It will call the onUpdated method from [Listener] when the data will be processed.
* It will call the onTimelineUpdated method from [Listener] when the data will be processed.
* It also ensures only one pagination by direction is launched at a time, so you can safely call this multiple time in a row.
*/
fun paginate(direction: Direction, count: Int)
@ -106,7 +106,12 @@ interface Timeline {
* Call when the timeline has been updated through pagination or sync.
* @param snapshot the most up to date snapshot
*/
fun onUpdated(snapshot: List<TimelineEvent>)
fun onTimelineUpdated(snapshot: List<TimelineEvent>)
/**
* Called whenever an error we can't recover from occurred
*/
fun onTimelineFailure(throwable: Throwable)
}
/**

View file

@ -19,12 +19,16 @@ package im.vector.matrix.android.internal.database
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.internal.util.createBackgroundHandler
import io.realm.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancelChildren
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
internal interface LiveEntityObserver {
fun start()
fun dispose()
fun cancelProcess()
fun isStarted(): Boolean
}
@ -35,6 +39,7 @@ internal abstract class RealmLiveEntityObserver<T : RealmObject>(protected val r
val BACKGROUND_HANDLER = createBackgroundHandler("LIVE_ENTITY_BACKGROUND")
}
protected val observerScope = CoroutineScope(SupervisorJob())
protected abstract val query: Monarchy.Query<T>
private val isStarted = AtomicBoolean(false)
private val backgroundRealm = AtomicReference<Realm>()
@ -59,10 +64,15 @@ internal abstract class RealmLiveEntityObserver<T : RealmObject>(protected val r
backgroundRealm.getAndSet(null).also {
it.close()
}
observerScope.coroutineContext.cancelChildren()
}
}
}
override fun cancelProcess() {
observerScope.coroutineContext.cancelChildren()
}
override fun isStarted(): Boolean {
return isStarted.get()
}

View file

@ -21,12 +21,7 @@ import im.vector.matrix.android.api.session.events.model.EventType
import im.vector.matrix.android.api.session.room.send.SendState
import im.vector.matrix.android.internal.database.mapper.asDomain
import im.vector.matrix.android.internal.database.mapper.toEntity
import im.vector.matrix.android.internal.database.model.ChunkEntity
import im.vector.matrix.android.internal.database.model.EventAnnotationsSummaryEntity
import im.vector.matrix.android.internal.database.model.ReadReceiptEntity
import im.vector.matrix.android.internal.database.model.ReadReceiptsSummaryEntity
import im.vector.matrix.android.internal.database.model.TimelineEventEntity
import im.vector.matrix.android.internal.database.model.TimelineEventEntityFields
import im.vector.matrix.android.internal.database.model.*
import im.vector.matrix.android.internal.database.query.find
import im.vector.matrix.android.internal.database.query.getOrCreate
import im.vector.matrix.android.internal.database.query.where

View file

@ -26,7 +26,6 @@ import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.android.asCoroutineDispatcher
import kotlinx.coroutines.asCoroutineDispatcher
import org.matrix.olm.OlmManager
import java.util.concurrent.Executors
@Module
internal object MatrixModule {
@ -38,8 +37,7 @@ internal object MatrixModule {
return MatrixCoroutineDispatchers(io = Dispatchers.IO,
computation = Dispatchers.Default,
main = Dispatchers.Main,
crypto = createBackgroundHandler("Crypto_Thread").asCoroutineDispatcher(),
sync = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
crypto = createBackgroundHandler("Crypto_Thread").asCoroutineDispatcher()
)
}

View file

@ -17,6 +17,7 @@
package im.vector.matrix.android.internal.network
import android.content.Context
import androidx.annotation.WorkerThread
import com.novoda.merlin.Merlin
import com.novoda.merlin.MerlinsBeard
import im.vector.matrix.android.internal.di.MatrixScope
@ -28,8 +29,8 @@ import kotlin.coroutines.resume
import kotlin.coroutines.suspendCoroutine
@MatrixScope
internal class NetworkConnectivityChecker @Inject constructor(context: Context,
backgroundDetectionObserver: BackgroundDetectionObserver)
internal class NetworkConnectivityChecker @Inject constructor(private val context: Context,
private val backgroundDetectionObserver: BackgroundDetectionObserver)
: BackgroundDetectionObserver.Listener {
private val merlin = Merlin.Builder()
@ -37,19 +38,33 @@ internal class NetworkConnectivityChecker @Inject constructor(context: Context,
.withDisconnectableCallbacks()
.build(context)
private val listeners = Collections.synchronizedSet(LinkedHashSet<Listener>())
private val merlinsBeard = MerlinsBeard.Builder().build(context)
// True when internet is available
var hasInternetAccess = MerlinsBeard.Builder().build(context).isConnected
private set
private val listeners = Collections.synchronizedSet(LinkedHashSet<Listener>())
private var hasInternetAccess = merlinsBeard.isConnected
init {
backgroundDetectionObserver.register(this)
}
/**
* Returns true when internet is available
*/
@WorkerThread
fun hasInternetAccess(): Boolean {
// If we are in background we have unbound merlin, so we have to check
return if (backgroundDetectionObserver.isInBackground) {
merlinsBeard.hasInternetAccess()
} else {
hasInternetAccess
}
}
override fun onMoveToForeground() {
merlin.bind()
merlinsBeard.hasInternetAccess {
hasInternetAccess = it
}
merlin.registerDisconnectable {
if (hasInternetAccess) {
Timber.v("On Disconnect")
@ -76,14 +91,17 @@ internal class NetworkConnectivityChecker @Inject constructor(context: Context,
merlin.unbind()
}
// In background you won't get notification as merlin is unbound
suspend fun waitUntilConnected() {
if (hasInternetAccess) {
return
} else {
Timber.v("Waiting for network...")
suspendCoroutine<Unit> { continuation ->
register(object : Listener {
override fun onConnect() {
unregister(this)
Timber.v("Connected to network...")
continuation.resume(Unit)
}
})

View file

@ -45,6 +45,8 @@ import im.vector.matrix.android.api.session.user.UserService
import im.vector.matrix.android.internal.auth.SessionParamsStore
import im.vector.matrix.android.internal.crypto.DefaultCryptoService
import im.vector.matrix.android.internal.database.LiveEntityObserver
import im.vector.matrix.android.internal.session.sync.SyncTaskSequencer
import im.vector.matrix.android.internal.session.sync.SyncTokenStore
import im.vector.matrix.android.internal.session.sync.job.SyncThread
import im.vector.matrix.android.internal.session.sync.job.SyncWorker
import kotlinx.coroutines.Dispatchers
@ -76,24 +78,26 @@ internal class DefaultSession @Inject constructor(override val sessionParams: Se
private val secureStorageService: Lazy<SecureStorageService>,
private val syncThreadProvider: Provider<SyncThread>,
private val contentUrlResolver: ContentUrlResolver,
private val syncTokenStore: SyncTokenStore,
private val syncTaskSequencer: SyncTaskSequencer,
private val sessionParamsStore: SessionParamsStore,
private val contentUploadProgressTracker: ContentUploadStateTracker,
private val initialSyncProgressService: Lazy<InitialSyncProgressService>,
private val homeServerCapabilitiesService: Lazy<HomeServerCapabilitiesService>)
: Session,
RoomService by roomService.get(),
RoomDirectoryService by roomDirectoryService.get(),
GroupService by groupService.get(),
UserService by userService.get(),
CryptoService by cryptoService.get(),
SignOutService by signOutService.get(),
FilterService by filterService.get(),
PushRuleService by pushRuleService.get(),
PushersService by pushersService.get(),
FileService by fileService.get(),
InitialSyncProgressService by initialSyncProgressService.get(),
SecureStorageService by secureStorageService.get(),
HomeServerCapabilitiesService by homeServerCapabilitiesService.get() {
RoomService by roomService.get(),
RoomDirectoryService by roomDirectoryService.get(),
GroupService by groupService.get(),
UserService by userService.get(),
CryptoService by cryptoService.get(),
SignOutService by signOutService.get(),
FilterService by filterService.get(),
PushRuleService by pushRuleService.get(),
PushersService by pushersService.get(),
FileService by fileService.get(),
InitialSyncProgressService by initialSyncProgressService.get(),
SecureStorageService by secureStorageService.get(),
HomeServerCapabilitiesService by homeServerCapabilitiesService.get() {
private var isOpen = false
@ -149,12 +153,17 @@ internal class DefaultSession @Inject constructor(override val sessionParams: Se
cryptoService.get().close()
isOpen = false
EventBus.getDefault().unregister(this)
syncTaskSequencer.close()
}
override fun syncState(): LiveData<SyncState> {
return getSyncThread().liveState()
}
override fun hasAlreadySynced(): Boolean {
return syncTokenStore.getLastToken() != null
}
private fun getSyncThread(): SyncThread {
return syncThread ?: syncThreadProvider.get().also {
syncThread = it
@ -164,23 +173,14 @@ internal class DefaultSession @Inject constructor(override val sessionParams: Se
override fun clearCache(callback: MatrixCallback<Unit>) {
stopSync()
stopAnyBackgroundSync()
cacheService.get().clearCache(object : MatrixCallback<Unit> {
override fun onSuccess(data: Unit) {
startSync(true)
callback.onSuccess(data)
}
override fun onFailure(failure: Throwable) {
startSync(true)
callback.onFailure(failure)
}
})
liveEntityObservers.forEach { it.cancelProcess() }
cacheService.get().clearCache(callback)
}
@Subscribe(threadMode = ThreadMode.MAIN)
fun onGlobalError(globalError: GlobalError) {
if (globalError is GlobalError.InvalidToken
&& globalError.softLogout) {
&& globalError.softLogout) {
// Mark the token has invalid
GlobalScope.launch(Dispatchers.IO) {
sessionParamsStore.setTokenInvalid(myUserId)

View file

@ -46,6 +46,7 @@ import im.vector.matrix.android.internal.session.sync.job.SyncWorker
import im.vector.matrix.android.internal.session.user.UserModule
import im.vector.matrix.android.internal.session.user.accountdata.AccountDataModule
import im.vector.matrix.android.internal.task.TaskExecutor
import im.vector.matrix.android.internal.util.MatrixCoroutineDispatchers
@Component(dependencies = [MatrixComponent::class],
modules = [
@ -69,6 +70,8 @@ import im.vector.matrix.android.internal.task.TaskExecutor
@SessionScope
internal interface SessionComponent {
fun coroutineDispatchers(): MatrixCoroutineDispatchers
fun session(): Session
fun syncTask(): SyncTask

View file

@ -22,6 +22,7 @@ import androidx.work.WorkManager
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.api.session.room.model.Membership
import im.vector.matrix.android.internal.database.RealmLiveEntityObserver
import im.vector.matrix.android.internal.database.awaitTransaction
import im.vector.matrix.android.internal.database.model.GroupEntity
import im.vector.matrix.android.internal.database.model.GroupSummaryEntity
import im.vector.matrix.android.internal.database.query.where
@ -31,6 +32,7 @@ import im.vector.matrix.android.internal.worker.WorkManagerUtil.matrixOneTimeWor
import im.vector.matrix.android.internal.worker.WorkerParamsFactory
import io.realm.OrderedCollectionChangeSet
import io.realm.RealmResults
import kotlinx.coroutines.launch
import javax.inject.Inject
private const val GET_GROUP_DATA_WORKER = "GET_GROUP_DATA_WORKER"
@ -49,14 +51,19 @@ internal class GroupSummaryUpdater @Inject constructor(private val context: Cont
.mapNotNull { results[it] }
fetchGroupsData(modifiedGroupEntity
.filter { it.membership == Membership.JOIN || it.membership == Membership.INVITE }
.map { it.groupId }
.toList())
.filter { it.membership == Membership.JOIN || it.membership == Membership.INVITE }
.map { it.groupId }
.toList())
deleteGroups(modifiedGroupEntity
modifiedGroupEntity
.filter { it.membership == Membership.LEAVE }
.map { it.groupId }
.toList())
.toList()
.also {
observerScope.launch {
deleteGroups(it)
}
}
}
private fun fetchGroupsData(groupIds: List<String>) {
@ -77,12 +84,9 @@ internal class GroupSummaryUpdater @Inject constructor(private val context: Cont
/**
* Delete the GroupSummaryEntity of left groups
*/
private fun deleteGroups(groupIds: List<String>) {
monarchy
.writeAsync { realm ->
GroupSummaryEntity.where(realm, groupIds)
.findAll()
.deleteAllFromRealm()
}
private suspend fun deleteGroups(groupIds: List<String>) = awaitTransaction(monarchy.realmConfiguration) { realm ->
GroupSummaryEntity.where(realm, groupIds)
.findAll()
.deleteAllFromRealm()
}
}

View file

@ -23,11 +23,10 @@ import im.vector.matrix.android.internal.database.model.EventEntity
import im.vector.matrix.android.internal.database.query.types
import im.vector.matrix.android.internal.di.SessionDatabase
import im.vector.matrix.android.internal.di.UserId
import im.vector.matrix.android.internal.task.TaskExecutor
import im.vector.matrix.android.internal.task.configureWith
import io.realm.OrderedCollectionChangeSet
import io.realm.RealmConfiguration
import io.realm.RealmResults
import kotlinx.coroutines.launch
import timber.log.Timber
import javax.inject.Inject
@ -39,8 +38,7 @@ import javax.inject.Inject
internal class EventRelationsAggregationUpdater @Inject constructor(@SessionDatabase realmConfiguration: RealmConfiguration,
@UserId private val userId: String,
private val task: EventRelationsAggregationTask,
private val taskExecutor: TaskExecutor) :
private val task: EventRelationsAggregationTask) :
RealmLiveEntityObserver<EventEntity>(realmConfiguration) {
override val query = Monarchy.Query<EventEntity> {
@ -63,6 +61,8 @@ internal class EventRelationsAggregationUpdater @Inject constructor(@SessionData
insertedDomains,
userId
)
task.configureWith(params).executeBy(taskExecutor)
observerScope.launch {
task.execute(params)
}
}
}

View file

@ -20,6 +20,7 @@ import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.api.session.crypto.CryptoService
import im.vector.matrix.android.api.session.room.Room
import im.vector.matrix.android.internal.database.mapper.RoomSummaryMapper
import im.vector.matrix.android.internal.session.SessionScope
import im.vector.matrix.android.internal.session.room.draft.DefaultDraftService
import im.vector.matrix.android.internal.session.room.membership.DefaultMembershipService
import im.vector.matrix.android.internal.session.room.notification.DefaultRoomPushRuleService
@ -35,6 +36,7 @@ internal interface RoomFactory {
fun create(roomId: String): Room
}
@SessionScope
internal class DefaultRoomFactory @Inject constructor(private val monarchy: Monarchy,
private val roomSummaryMapper: RoomSummaryMapper,
private val cryptoService: CryptoService,

View file

@ -28,6 +28,7 @@ import im.vector.matrix.android.internal.database.model.EventEntity
import im.vector.matrix.android.internal.database.model.EventEntityFields
import im.vector.matrix.android.internal.database.model.RoomSummaryEntity
import im.vector.matrix.android.internal.database.model.TimelineEventEntity
import im.vector.matrix.android.internal.database.query.*
import im.vector.matrix.android.internal.database.query.isEventRead
import im.vector.matrix.android.internal.database.query.latestEvent
import im.vector.matrix.android.internal.database.query.prev
@ -38,7 +39,6 @@ import im.vector.matrix.android.internal.session.room.membership.RoomMembers
import im.vector.matrix.android.internal.session.sync.model.RoomSyncSummary
import im.vector.matrix.android.internal.session.sync.model.RoomSyncUnreadNotifications
import io.realm.Realm
import io.realm.kotlin.createObject
import javax.inject.Inject
internal class RoomSummaryUpdater @Inject constructor(@UserId private val userId: String,
@ -69,9 +69,7 @@ internal class RoomSummaryUpdater @Inject constructor(@UserId private val userId
roomSummary: RoomSyncSummary? = null,
unreadNotifications: RoomSyncUnreadNotifications? = null,
updateMembers: Boolean = false) {
val roomSummaryEntity = RoomSummaryEntity.where(realm, roomId).findFirst()
?: realm.createObject(roomId)
val roomSummaryEntity = RoomSummaryEntity.getOrCreate(realm, roomId)
if (roomSummary != null) {
if (roomSummary.heroes.isNotEmpty()) {
roomSummaryEntity.heroes.clear()

View file

@ -23,6 +23,7 @@ import im.vector.matrix.android.api.session.events.model.toModel
import im.vector.matrix.android.api.session.room.model.VersioningState
import im.vector.matrix.android.api.session.room.model.create.RoomCreateContent
import im.vector.matrix.android.internal.database.RealmLiveEntityObserver
import im.vector.matrix.android.internal.database.awaitTransaction
import im.vector.matrix.android.internal.database.mapper.asDomain
import im.vector.matrix.android.internal.database.model.EventEntity
import im.vector.matrix.android.internal.database.model.RoomSummaryEntity
@ -30,9 +31,9 @@ import im.vector.matrix.android.internal.database.query.types
import im.vector.matrix.android.internal.database.query.where
import im.vector.matrix.android.internal.di.SessionDatabase
import io.realm.OrderedCollectionChangeSet
import io.realm.Realm
import io.realm.RealmConfiguration
import io.realm.RealmResults
import kotlinx.coroutines.launch
import javax.inject.Inject
internal class RoomCreateEventLiveObserver @Inject constructor(@SessionDatabase
@ -51,21 +52,21 @@ internal class RoomCreateEventLiveObserver @Inject constructor(@SessionDatabase
}
.toList()
.also {
handleRoomCreateEvents(it)
observerScope.launch {
handleRoomCreateEvents(it)
}
}
}
private fun handleRoomCreateEvents(createEvents: List<Event>) = Realm.getInstance(realmConfiguration).use {
it.executeTransactionAsync { realm ->
for (event in createEvents) {
val createRoomContent = event.getClearContent().toModel<RoomCreateContent>()
val predecessorRoomId = createRoomContent?.predecessor?.roomId ?: continue
private suspend fun handleRoomCreateEvents(createEvents: List<Event>) = awaitTransaction(realmConfiguration) { realm ->
for (event in createEvents) {
val createRoomContent = event.getClearContent().toModel<RoomCreateContent>()
val predecessorRoomId = createRoomContent?.predecessor?.roomId ?: continue
val predecessorRoomSummary = RoomSummaryEntity.where(realm, predecessorRoomId).findFirst()
?: RoomSummaryEntity(predecessorRoomId)
predecessorRoomSummary.versioningState = VersioningState.UPGRADED_ROOM_JOINED
realm.insertOrUpdate(predecessorRoomSummary)
}
val predecessorRoomSummary = RoomSummaryEntity.where(realm, predecessorRoomId).findFirst()
?: RoomSummaryEntity(predecessorRoomId)
predecessorRoomSummary.versioningState = VersioningState.UPGRADED_ROOM_JOINED
realm.insertOrUpdate(predecessorRoomSummary)
}
}
}

View file

@ -23,11 +23,10 @@ import im.vector.matrix.android.internal.database.mapper.asDomain
import im.vector.matrix.android.internal.database.model.EventEntity
import im.vector.matrix.android.internal.database.query.types
import im.vector.matrix.android.internal.di.SessionDatabase
import im.vector.matrix.android.internal.task.TaskExecutor
import im.vector.matrix.android.internal.task.configureWith
import io.realm.OrderedCollectionChangeSet
import io.realm.RealmConfiguration
import io.realm.RealmResults
import kotlinx.coroutines.launch
import timber.log.Timber
import javax.inject.Inject
@ -36,8 +35,7 @@ import javax.inject.Inject
* As it will actually delete the content, it should be called last in the list of listener.
*/
internal class EventsPruner @Inject constructor(@SessionDatabase realmConfiguration: RealmConfiguration,
private val pruneEventTask: PruneEventTask,
private val taskExecutor: TaskExecutor) :
private val pruneEventTask: PruneEventTask) :
RealmLiveEntityObserver<EventEntity>(realmConfiguration) {
override val query = Monarchy.Query<EventEntity> { EventEntity.types(it, listOf(EventType.REDACTION)) }
@ -50,7 +48,9 @@ internal class EventsPruner @Inject constructor(@SessionDatabase realmConfigurat
.mapNotNull { results[it]?.asDomain() }
.toList()
val params = PruneEventTask.Params(insertedDomains)
pruneEventTask.configureWith(params).executeBy(taskExecutor)
observerScope.launch {
val params = PruneEventTask.Params(insertedDomains)
pruneEventTask.execute(params)
}
}
}

View file

@ -504,7 +504,6 @@ internal class DefaultTimeline(
Timber.v("Should fetch $limit items $direction")
cancelableBag += paginationTask
.configureWith(params) {
this.retryCount = Int.MAX_VALUE
this.constraints = TaskConstraints(connectedToNetwork = true)
this.callback = object : MatrixCallback<TokenChunkEventPersistor.Result> {
override fun onSuccess(data: TokenChunkEventPersistor.Result) {
@ -524,6 +523,8 @@ internal class DefaultTimeline(
}
override fun onFailure(failure: Throwable) {
updateState(direction) { it.copy(isPaginating = false, requestedPaginationCount = 0) }
postSnapshot()
Timber.v("Failure fetching $limit items $direction from pagination request")
}
}
@ -637,7 +638,14 @@ internal class DefaultTimeline(
private fun fetchEvent(eventId: String) {
val params = GetContextOfEventTask.Params(roomId, eventId, settings.initialSize)
cancelableBag += contextOfEventTask.configureWith(params).executeBy(taskExecutor)
cancelableBag += contextOfEventTask.configureWith(params) {
callback = object : MatrixCallback<TokenChunkEventPersistor.Result> {
override fun onFailure(failure: Throwable) {
postFailure(failure)
}
}
}
.executeBy(taskExecutor)
}
private fun postSnapshot() {
@ -650,7 +658,7 @@ internal class DefaultTimeline(
val runnable = Runnable {
synchronized(listeners) {
listeners.forEach {
it.onUpdated(snapshot)
it.onTimelineUpdated(snapshot)
}
}
}
@ -658,6 +666,20 @@ internal class DefaultTimeline(
}
}
private fun postFailure(throwable: Throwable) {
if (isReady.get().not()) {
return
}
val runnable = Runnable {
synchronized(listeners) {
listeners.forEach {
it.onTimelineFailure(throwable)
}
}
}
mainHandler.post(runnable)
}
private fun clearAllValues() {
prevDisplayIndex = null
nextDisplayIndex = null

View file

@ -18,6 +18,10 @@ package im.vector.matrix.android.internal.session.room.timeline
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.internal.database.helper.*
import im.vector.matrix.android.internal.database.helper.add
import im.vector.matrix.android.internal.database.helper.addOrUpdate
import im.vector.matrix.android.internal.database.helper.addStateEvent
import im.vector.matrix.android.internal.database.helper.deleteOnCascade
import im.vector.matrix.android.internal.database.model.ChunkEntity
import im.vector.matrix.android.internal.database.model.RoomEntity
import im.vector.matrix.android.internal.database.query.create
@ -112,7 +116,7 @@ internal class TokenChunkEventPersistor @Inject constructor(private val monarchy
Timber.v("Start persisting ${receivedChunk.events.size} events in $roomId towards $direction")
val roomEntity = RoomEntity.where(realm, roomId).findFirst()
?: realm.createObject(roomId)
?: realm.createObject(roomId)
val nextToken: String?
val prevToken: String?
@ -125,7 +129,7 @@ internal class TokenChunkEventPersistor @Inject constructor(private val monarchy
}
val shouldSkip = ChunkEntity.find(realm, roomId, nextToken = nextToken) != null
|| ChunkEntity.find(realm, roomId, prevToken = prevToken) != null
|| ChunkEntity.find(realm, roomId, prevToken = prevToken) != null
val prevChunk = ChunkEntity.find(realm, roomId, nextToken = prevToken)
val nextChunk = ChunkEntity.find(realm, roomId, prevToken = nextToken)
@ -139,7 +143,7 @@ internal class TokenChunkEventPersistor @Inject constructor(private val monarchy
} else {
nextChunk?.apply { this.prevToken = prevToken }
}
?: ChunkEntity.create(realm, prevToken, nextToken)
?: ChunkEntity.create(realm, prevToken, nextToken)
if (receivedChunk.events.isEmpty() && receivedChunk.end == receivedChunk.start) {
Timber.v("Reach end of $roomId")

View file

@ -23,6 +23,7 @@ import im.vector.matrix.android.api.session.events.model.toModel
import im.vector.matrix.android.api.session.room.model.VersioningState
import im.vector.matrix.android.api.session.room.model.tombstone.RoomTombstoneContent
import im.vector.matrix.android.internal.database.RealmLiveEntityObserver
import im.vector.matrix.android.internal.database.awaitTransaction
import im.vector.matrix.android.internal.database.mapper.asDomain
import im.vector.matrix.android.internal.database.model.EventEntity
import im.vector.matrix.android.internal.database.model.RoomSummaryEntity
@ -30,9 +31,9 @@ import im.vector.matrix.android.internal.database.query.types
import im.vector.matrix.android.internal.database.query.where
import im.vector.matrix.android.internal.di.SessionDatabase
import io.realm.OrderedCollectionChangeSet
import io.realm.Realm
import io.realm.RealmConfiguration
import io.realm.RealmResults
import kotlinx.coroutines.launch
import javax.inject.Inject
internal class RoomTombstoneEventLiveObserver @Inject constructor(@SessionDatabase
@ -51,24 +52,24 @@ internal class RoomTombstoneEventLiveObserver @Inject constructor(@SessionDataba
}
.toList()
.also {
handleRoomTombstoneEvents(it)
observerScope.launch {
handleRoomTombstoneEvents(it)
}
}
}
private fun handleRoomTombstoneEvents(tombstoneEvents: List<Event>) = Realm.getInstance(realmConfiguration).use {
it.executeTransactionAsync { realm ->
for (event in tombstoneEvents) {
if (event.roomId == null) continue
val createRoomContent = event.getClearContent().toModel<RoomTombstoneContent>()
if (createRoomContent?.replacementRoom == null) continue
private suspend fun handleRoomTombstoneEvents(tombstoneEvents: List<Event>) = awaitTransaction(realmConfiguration) { realm ->
for (event in tombstoneEvents) {
if (event.roomId == null) continue
val createRoomContent = event.getClearContent().toModel<RoomTombstoneContent>()
if (createRoomContent?.replacementRoom == null) continue
val predecessorRoomSummary = RoomSummaryEntity.where(realm, event.roomId).findFirst()
?: RoomSummaryEntity(event.roomId)
if (predecessorRoomSummary.versioningState == VersioningState.NONE) {
predecessorRoomSummary.versioningState = VersioningState.UPGRADED_ROOM_NOT_JOINED
}
realm.insertOrUpdate(predecessorRoomSummary)
val predecessorRoomSummary = RoomSummaryEntity.where(realm, event.roomId).findFirst()
?: RoomSummaryEntity(event.roomId)
if (predecessorRoomSummary.versioningState == VersioningState.NONE) {
predecessorRoomSummary.versioningState = VersioningState.UPGRADED_ROOM_NOT_JOINED
}
realm.insertOrUpdate(predecessorRoomSummary)
}
}
}

View file

@ -16,7 +16,6 @@
package im.vector.matrix.android.internal.session.sync
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.R
import im.vector.matrix.android.api.session.room.model.Membership
import im.vector.matrix.android.internal.database.model.GroupEntity
@ -25,11 +24,10 @@ import im.vector.matrix.android.internal.session.DefaultInitialSyncProgressServi
import im.vector.matrix.android.internal.session.mapWithProgress
import im.vector.matrix.android.internal.session.sync.model.GroupsSyncResponse
import im.vector.matrix.android.internal.session.sync.model.InvitedGroupSync
import im.vector.matrix.android.internal.util.awaitTransaction
import io.realm.Realm
import javax.inject.Inject
internal class GroupSyncHandler @Inject constructor(private val monarchy: Monarchy) {
internal class GroupSyncHandler @Inject constructor() {
sealed class HandlingStrategy {
data class JOINED(val data: Map<String, Any>) : HandlingStrategy()
@ -37,12 +35,14 @@ internal class GroupSyncHandler @Inject constructor(private val monarchy: Monarc
data class LEFT(val data: Map<String, Any>) : HandlingStrategy()
}
suspend fun handle(roomsSyncResponse: GroupsSyncResponse, reporter: DefaultInitialSyncProgressService? = null) {
monarchy.awaitTransaction { realm ->
handleGroupSync(realm, HandlingStrategy.JOINED(roomsSyncResponse.join), reporter)
handleGroupSync(realm, HandlingStrategy.INVITED(roomsSyncResponse.invite), reporter)
handleGroupSync(realm, HandlingStrategy.LEFT(roomsSyncResponse.leave), reporter)
}
fun handle(
realm: Realm,
roomsSyncResponse: GroupsSyncResponse,
reporter: DefaultInitialSyncProgressService? = null
) {
handleGroupSync(realm, HandlingStrategy.JOINED(roomsSyncResponse.join), reporter)
handleGroupSync(realm, HandlingStrategy.INVITED(roomsSyncResponse.invite), reporter)
handleGroupSync(realm, HandlingStrategy.LEFT(roomsSyncResponse.leave), reporter)
}
// PRIVATE METHODS *****************************************************************************

View file

@ -16,9 +16,7 @@
package im.vector.matrix.android.internal.session.sync
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.R
import im.vector.matrix.android.api.pushrules.RuleScope
import im.vector.matrix.android.api.session.events.model.Event
import im.vector.matrix.android.api.session.events.model.EventType
import im.vector.matrix.android.api.session.events.model.toModel
@ -34,31 +32,21 @@ import im.vector.matrix.android.internal.database.query.findLastLiveChunkFromRoo
import im.vector.matrix.android.internal.database.query.where
import im.vector.matrix.android.internal.session.DefaultInitialSyncProgressService
import im.vector.matrix.android.internal.session.mapWithProgress
import im.vector.matrix.android.internal.session.notification.DefaultPushRuleService
import im.vector.matrix.android.internal.session.notification.ProcessEventForPushTask
import im.vector.matrix.android.internal.session.room.RoomSummaryUpdater
import im.vector.matrix.android.internal.session.room.read.FullyReadContent
import im.vector.matrix.android.internal.session.room.timeline.PaginationDirection
import im.vector.matrix.android.internal.session.sync.model.*
import im.vector.matrix.android.internal.session.user.UserEntityFactory
import im.vector.matrix.android.internal.task.TaskExecutor
import im.vector.matrix.android.internal.task.configureWith
import im.vector.matrix.android.internal.util.awaitTransaction
import io.realm.Realm
import io.realm.kotlin.createObject
import timber.log.Timber
import javax.inject.Inject
internal class RoomSyncHandler @Inject constructor(private val monarchy: Monarchy,
private val readReceiptHandler: ReadReceiptHandler,
internal class RoomSyncHandler @Inject constructor(private val readReceiptHandler: ReadReceiptHandler,
private val roomSummaryUpdater: RoomSummaryUpdater,
private val roomTagHandler: RoomTagHandler,
private val roomFullyReadHandler: RoomFullyReadHandler,
private val cryptoService: DefaultCryptoService,
private val tokenStore: SyncTokenStore,
private val pushRuleService: DefaultPushRuleService,
private val processForPushTask: ProcessEventForPushTask,
private val taskExecutor: TaskExecutor) {
private val cryptoService: DefaultCryptoService) {
sealed class HandlingStrategy {
data class JOINED(val data: Map<String, RoomSync>) : HandlingStrategy()
@ -66,28 +54,16 @@ internal class RoomSyncHandler @Inject constructor(private val monarchy: Monarch
data class LEFT(val data: Map<String, RoomSync>) : HandlingStrategy()
}
suspend fun handle(roomsSyncResponse: RoomsSyncResponse, isInitialSync: Boolean, reporter: DefaultInitialSyncProgressService? = null) {
fun handle(
realm: Realm,
roomsSyncResponse: RoomsSyncResponse,
isInitialSync: Boolean,
reporter: DefaultInitialSyncProgressService? = null
) {
Timber.v("Execute transaction from $this")
monarchy.awaitTransaction { realm ->
handleRoomSync(realm, HandlingStrategy.JOINED(roomsSyncResponse.join), isInitialSync, reporter)
handleRoomSync(realm, HandlingStrategy.INVITED(roomsSyncResponse.invite), isInitialSync, reporter)
handleRoomSync(realm, HandlingStrategy.LEFT(roomsSyncResponse.leave), isInitialSync, reporter)
}
// handle event for bing rule checks
checkPushRules(roomsSyncResponse)
}
private fun checkPushRules(roomsSyncResponse: RoomsSyncResponse) {
Timber.v("[PushRules] --> checkPushRules")
if (tokenStore.getLastToken() == null) {
Timber.v("[PushRules] <-- No push rule check on initial sync")
return
} // nothing on initial sync
val rules = pushRuleService.getPushRules(RuleScope.GLOBAL)
processForPushTask.configureWith(ProcessEventForPushTask.Params(roomsSyncResponse, rules))
.executeBy(taskExecutor)
Timber.v("[PushRules] <-- Push task scheduled")
handleRoomSync(realm, HandlingStrategy.JOINED(roomsSyncResponse.join), isInitialSync, reporter)
handleRoomSync(realm, HandlingStrategy.INVITED(roomsSyncResponse.invite), isInitialSync, reporter)
handleRoomSync(realm, HandlingStrategy.LEFT(roomsSyncResponse.leave), isInitialSync, reporter)
}
// PRIVATE METHODS *****************************************************************************
@ -137,7 +113,7 @@ internal class RoomSyncHandler @Inject constructor(private val monarchy: Monarch
if (roomSync.state != null && roomSync.state.events.isNotEmpty()) {
val minStateIndex = roomEntity.untimelinedStateEvents.where().min(EventEntityFields.STATE_INDEX)?.toInt()
?: Int.MIN_VALUE
?: Int.MIN_VALUE
val untimelinedStateIndex = minStateIndex + 1
roomSync.state.events.forEach { event ->
roomEntity.addStateEvent(event, filterDuplicates = true, stateIndex = untimelinedStateIndex)

View file

@ -16,20 +16,30 @@
package im.vector.matrix.android.internal.session.sync
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.R
import im.vector.matrix.android.api.pushrules.PushRuleService
import im.vector.matrix.android.api.pushrules.RuleScope
import im.vector.matrix.android.internal.crypto.DefaultCryptoService
import im.vector.matrix.android.internal.session.DefaultInitialSyncProgressService
import im.vector.matrix.android.internal.session.notification.ProcessEventForPushTask
import im.vector.matrix.android.internal.session.reportSubtask
import im.vector.matrix.android.internal.session.sync.model.RoomsSyncResponse
import im.vector.matrix.android.internal.session.sync.model.SyncResponse
import im.vector.matrix.android.internal.util.awaitTransaction
import timber.log.Timber
import javax.inject.Inject
import kotlin.system.measureTimeMillis
internal class SyncResponseHandler @Inject constructor(private val roomSyncHandler: RoomSyncHandler,
internal class SyncResponseHandler @Inject constructor(private val monarchy: Monarchy,
private val roomSyncHandler: RoomSyncHandler,
private val userAccountDataSyncHandler: UserAccountDataSyncHandler,
private val groupSyncHandler: GroupSyncHandler,
private val cryptoSyncHandler: CryptoSyncHandler,
private val cryptoService: DefaultCryptoService,
private val tokenStore: SyncTokenStore,
private val processEventForPushTask: ProcessEventForPushTask,
private val pushRuleService: PushRuleService,
private val initialSyncProgressService: DefaultInitialSyncProgressService) {
suspend fun handleResponse(syncResponse: SyncResponse, fromToken: String?) {
@ -45,26 +55,27 @@ internal class SyncResponseHandler @Inject constructor(private val roomSyncHandl
}.also {
Timber.v("Finish handling start cryptoService in $it ms")
}
val measure = measureTimeMillis {
// Handle the to device events before the room ones
// to ensure to decrypt them properly
measureTimeMillis {
Timber.v("Handle toDevice")
reportSubtask(reporter, R.string.initial_sync_start_importing_account_crypto, 100, 0.1f) {
if (syncResponse.toDevice != null) {
cryptoSyncHandler.handleToDevice(syncResponse.toDevice, reporter)
}
}
}.also {
Timber.v("Finish handling toDevice in $it ms")
}
// Handle the to device events before the room ones
// to ensure to decrypt them properly
measureTimeMillis {
Timber.v("Handle toDevice")
reportSubtask(reporter, R.string.initial_sync_start_importing_account_crypto, 100, 0.1f) {
if (syncResponse.toDevice != null) {
cryptoSyncHandler.handleToDevice(syncResponse.toDevice, reporter)
}
}
}.also {
Timber.v("Finish handling toDevice in $it ms")
}
// Start one big transaction
monarchy.awaitTransaction { realm ->
measureTimeMillis {
Timber.v("Handle rooms")
reportSubtask(reporter, R.string.initial_sync_start_importing_account_rooms, 100, 0.7f) {
if (syncResponse.rooms != null) {
roomSyncHandler.handle(syncResponse.rooms, isInitialSync, reporter)
roomSyncHandler.handle(realm, syncResponse.rooms, isInitialSync, reporter)
}
}
}.also {
@ -75,7 +86,7 @@ internal class SyncResponseHandler @Inject constructor(private val roomSyncHandl
reportSubtask(reporter, R.string.initial_sync_start_importing_account_groups, 100, 0.1f) {
Timber.v("Handle groups")
if (syncResponse.groups != null) {
groupSyncHandler.handle(syncResponse.groups, reporter)
groupSyncHandler.handle(realm, syncResponse.groups, reporter)
}
}
}.also {
@ -85,15 +96,32 @@ internal class SyncResponseHandler @Inject constructor(private val roomSyncHandl
measureTimeMillis {
reportSubtask(reporter, R.string.initial_sync_start_importing_account_data, 100, 0.1f) {
Timber.v("Handle accountData")
userAccountDataSyncHandler.handle(syncResponse.accountData, syncResponse.rooms?.invite)
userAccountDataSyncHandler.handle(realm, syncResponse.accountData)
}
}.also {
Timber.v("Finish handling accountData in $it ms")
}
Timber.v("On sync completed")
cryptoSyncHandler.onSyncCompleted(syncResponse)
tokenStore.saveToken(realm, syncResponse.nextBatch)
}
Timber.v("Finish handling sync in $measure ms")
// Everything else we need to do outside the transaction
syncResponse.rooms?.also {
checkPushRules(it, isInitialSync)
userAccountDataSyncHandler.synchronizeWithServerIfNeeded(it.invite)
}
Timber.v("On sync completed")
cryptoSyncHandler.onSyncCompleted(syncResponse)
}
private suspend fun checkPushRules(roomsSyncResponse: RoomsSyncResponse, isInitialSync: Boolean) {
Timber.v("[PushRules] --> checkPushRules")
if (isInitialSync) {
Timber.v("[PushRules] <-- No push rule check on initial sync")
return
} // nothing on initial sync
val rules = pushRuleService.getPushRules(RuleScope.GLOBAL)
processEventForPushTask.execute(ProcessEventForPushTask.Params(roomsSyncResponse, rules))
Timber.v("[PushRules] <-- Push task scheduled")
}
}

View file

@ -17,7 +17,6 @@
package im.vector.matrix.android.internal.session.sync
import im.vector.matrix.android.R
import im.vector.matrix.android.internal.auth.SessionParamsStore
import im.vector.matrix.android.internal.di.UserId
import im.vector.matrix.android.internal.network.executeRequest
import im.vector.matrix.android.internal.session.DefaultInitialSyncProgressService
@ -26,6 +25,7 @@ import im.vector.matrix.android.internal.session.homeserver.GetHomeServerCapabil
import im.vector.matrix.android.internal.session.sync.model.SyncResponse
import im.vector.matrix.android.internal.session.user.UserStore
import im.vector.matrix.android.internal.task.Task
import timber.log.Timber
import javax.inject.Inject
internal interface SyncTask : Task<SyncTask.Params, Unit> {
@ -37,14 +37,19 @@ internal class DefaultSyncTask @Inject constructor(private val syncAPI: SyncAPI,
@UserId private val userId: String,
private val filterRepository: FilterRepository,
private val syncResponseHandler: SyncResponseHandler,
private val sessionParamsStore: SessionParamsStore,
private val initialSyncProgressService: DefaultInitialSyncProgressService,
private val syncTokenStore: SyncTokenStore,
private val getHomeServerCapabilitiesTask: GetHomeServerCapabilitiesTask,
private val userStore: UserStore
private val userStore: UserStore,
private val syncTaskSequencer: SyncTaskSequencer
) : SyncTask {
override suspend fun execute(params: SyncTask.Params) {
override suspend fun execute(params: SyncTask.Params) = syncTaskSequencer.post {
doSync(params)
}
private suspend fun doSync(params: SyncTask.Params) {
Timber.v("Sync task started on Thread: ${Thread.currentThread().name}")
// Maybe refresh the home server capabilities data we know
getHomeServerCapabilitiesTask.execute(Unit)
@ -69,9 +74,9 @@ internal class DefaultSyncTask @Inject constructor(private val syncAPI: SyncAPI,
apiCall = syncAPI.sync(requestParams)
}
syncResponseHandler.handleResponse(syncResponse, token)
syncTokenStore.saveToken(syncResponse.nextBatch)
if (isInitialSync) {
initialSyncProgressService.endAll()
}
Timber.v("Sync task finished on Thread: ${Thread.currentThread().name}")
}
}

View file

@ -0,0 +1,24 @@
/*
* Copyright 2019 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.matrix.android.internal.session.sync
import im.vector.matrix.android.internal.session.SessionScope
import im.vector.matrix.android.internal.task.ChannelCoroutineSequencer
import javax.inject.Inject
@SessionScope
internal class SyncTaskSequencer @Inject constructor() : ChannelCoroutineSequencer<Unit>()

View file

@ -18,7 +18,6 @@ package im.vector.matrix.android.internal.session.sync
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.internal.database.model.SyncEntity
import im.vector.matrix.android.internal.util.awaitTransaction
import io.realm.Realm
import javax.inject.Inject
@ -30,10 +29,8 @@ internal class SyncTokenStore @Inject constructor(private val monarchy: Monarchy
}
}
suspend fun saveToken(token: String?) {
monarchy.awaitTransaction {
val sync = SyncEntity(token)
it.insertOrUpdate(sync)
}
fun saveToken(realm: Realm, token: String?) {
val sync = SyncEntity(token)
realm.insertOrUpdate(sync)
}
}

View file

@ -17,44 +17,39 @@
package im.vector.matrix.android.internal.session.sync
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.api.pushrules.RuleScope
import im.vector.matrix.android.api.pushrules.RuleSetKey
import im.vector.matrix.android.api.session.events.model.toModel
import im.vector.matrix.android.api.session.room.model.RoomMember
import im.vector.matrix.android.internal.database.mapper.PushRulesMapper
import im.vector.matrix.android.internal.database.mapper.asDomain
import im.vector.matrix.android.internal.database.model.RoomSummaryEntity
import im.vector.matrix.android.internal.database.model.*
import im.vector.matrix.android.internal.database.query.getDirectRooms
import im.vector.matrix.android.internal.database.query.getOrCreate
import im.vector.matrix.android.internal.database.query.where
import im.vector.matrix.android.internal.di.UserId
import im.vector.matrix.android.internal.session.pushers.SavePushRulesTask
import im.vector.matrix.android.internal.session.room.membership.RoomMembers
import im.vector.matrix.android.internal.session.sync.model.InvitedRoomSync
import im.vector.matrix.android.internal.session.sync.model.accountdata.*
import im.vector.matrix.android.internal.session.user.accountdata.DirectChatsHelper
import im.vector.matrix.android.internal.session.user.accountdata.SaveBreadcrumbsTask
import im.vector.matrix.android.internal.session.user.accountdata.SaveIgnoredUsersTask
import im.vector.matrix.android.internal.session.user.accountdata.UpdateUserAccountDataTask
import im.vector.matrix.android.internal.task.TaskExecutor
import im.vector.matrix.android.internal.task.configureWith
import im.vector.matrix.android.internal.util.awaitTransaction
import io.realm.Realm
import io.realm.RealmList
import timber.log.Timber
import javax.inject.Inject
internal class UserAccountDataSyncHandler @Inject constructor(private val monarchy: Monarchy,
@UserId private val userId: String,
private val directChatsHelper: DirectChatsHelper,
private val updateUserAccountDataTask: UpdateUserAccountDataTask,
private val savePushRulesTask: SavePushRulesTask,
private val saveIgnoredUsersTask: SaveIgnoredUsersTask,
private val saveBreadcrumbsTask: SaveBreadcrumbsTask,
private val taskExecutor: TaskExecutor) {
private val updateUserAccountDataTask: UpdateUserAccountDataTask) {
suspend fun handle(accountData: UserAccountDataSync?, invites: Map<String, InvitedRoomSync>?) {
fun handle(realm: Realm, accountData: UserAccountDataSync?) {
accountData?.list?.forEach {
when (it) {
is UserAccountDataDirectMessages -> handleDirectChatRooms(it)
is UserAccountDataPushRules -> handlePushRules(it)
is UserAccountDataIgnoredUsers -> handleIgnoredUsers(it)
is UserAccountDataBreadcrumbs -> handleBreadcrumbs(it)
is UserAccountDataDirectMessages -> handleDirectChatRooms(realm, it)
is UserAccountDataPushRules -> handlePushRules(realm, it)
is UserAccountDataIgnoredUsers -> handleIgnoredUsers(realm, it)
is UserAccountDataBreadcrumbs -> handleBreadcrumbs(realm, it)
is UserAccountDataFallback -> Timber.d("Receive account data of unhandled type ${it.type}")
else -> error("Missing code here!")
}
@ -65,78 +60,133 @@ internal class UserAccountDataSyncHandler @Inject constructor(private val monarc
// it.toString()
// MoshiProvider.providesMoshi()
// }
monarchy.doWithRealm { realm ->
synchronizeWithServerIfNeeded(realm, invites)
}
}
private suspend fun handlePushRules(userAccountDataPushRules: UserAccountDataPushRules) {
savePushRulesTask.execute(SavePushRulesTask.Params(userAccountDataPushRules.content))
}
private suspend fun handleDirectChatRooms(directMessages: UserAccountDataDirectMessages) {
monarchy.awaitTransaction { realm ->
val oldDirectRooms = RoomSummaryEntity.getDirectRooms(realm)
oldDirectRooms.forEach {
it.isDirect = false
it.directUserId = null
}
directMessages.content.forEach {
val userId = it.key
it.value.forEach { roomId ->
val roomSummaryEntity = RoomSummaryEntity.where(realm, roomId).findFirst()
if (roomSummaryEntity != null) {
roomSummaryEntity.isDirect = true
roomSummaryEntity.directUserId = userId
realm.insertOrUpdate(roomSummaryEntity)
}
}
}
}
}
// If we get some direct chat invites, we synchronize the user account data including those.
private fun synchronizeWithServerIfNeeded(realm: Realm, invites: Map<String, InvitedRoomSync>?) {
suspend fun synchronizeWithServerIfNeeded(invites: Map<String, InvitedRoomSync>) {
if (invites.isNullOrEmpty()) return
val directChats = directChatsHelper.getLocalUserAccount()
var hasUpdate = false
invites.forEach { (roomId, _) ->
val myUserStateEvent = RoomMembers(realm, roomId).getStateEvent(userId)
val inviterId = myUserStateEvent?.sender
val myUserRoomMember: RoomMember? = myUserStateEvent?.let { it.asDomain().content?.toModel() }
val isDirect = myUserRoomMember?.isDirect
if (inviterId != null && inviterId != userId && isDirect == true) {
directChats
.getOrPut(inviterId, { arrayListOf() })
.apply {
if (contains(roomId)) {
Timber.v("Direct chats already include room $roomId with user $inviterId")
} else {
add(roomId)
hasUpdate = true
monarchy.doWithRealm { realm ->
invites.forEach { (roomId, _) ->
val myUserStateEvent = RoomMembers(realm, roomId).getStateEvent(userId)
val inviterId = myUserStateEvent?.sender
val myUserRoomMember: RoomMember? = myUserStateEvent?.let { it.asDomain().content?.toModel() }
val isDirect = myUserRoomMember?.isDirect
if (inviterId != null && inviterId != userId && isDirect == true) {
directChats
.getOrPut(inviterId, { arrayListOf() })
.apply {
if (contains(roomId)) {
Timber.v("Direct chats already include room $roomId with user $inviterId")
} else {
add(roomId)
hasUpdate = true
}
}
}
}
}
}
if (hasUpdate) {
val updateUserAccountParams = UpdateUserAccountDataTask.DirectChatParams(
directMessages = directChats
)
updateUserAccountDataTask.configureWith(updateUserAccountParams).executeBy(taskExecutor)
updateUserAccountDataTask.execute(updateUserAccountParams)
}
}
private fun handleIgnoredUsers(userAccountDataIgnoredUsers: UserAccountDataIgnoredUsers) {
saveIgnoredUsersTask
.configureWith(SaveIgnoredUsersTask.Params(userAccountDataIgnoredUsers.content.ignoredUsers.keys.toList()))
.executeBy(taskExecutor)
private fun handlePushRules(realm: Realm, userAccountDataPushRules: UserAccountDataPushRules) {
val pushRules = userAccountDataPushRules.content
realm.where(PushRulesEntity::class.java)
.findAll()
.deleteAllFromRealm()
// Save only global rules for the moment
val globalRules = pushRules.global
val content = PushRulesEntity(RuleScope.GLOBAL).apply { kind = RuleSetKey.CONTENT }
globalRules.content?.forEach { rule ->
content.pushRules.add(PushRulesMapper.map(rule))
}
realm.insertOrUpdate(content)
val override = PushRulesEntity(RuleScope.GLOBAL).apply { kind = RuleSetKey.OVERRIDE }
globalRules.override?.forEach { rule ->
PushRulesMapper.map(rule).also {
override.pushRules.add(it)
}
}
realm.insertOrUpdate(override)
val rooms = PushRulesEntity(RuleScope.GLOBAL).apply { kind = RuleSetKey.ROOM }
globalRules.room?.forEach { rule ->
rooms.pushRules.add(PushRulesMapper.map(rule))
}
realm.insertOrUpdate(rooms)
val senders = PushRulesEntity(RuleScope.GLOBAL).apply { kind = RuleSetKey.SENDER }
globalRules.sender?.forEach { rule ->
senders.pushRules.add(PushRulesMapper.map(rule))
}
realm.insertOrUpdate(senders)
val underrides = PushRulesEntity(RuleScope.GLOBAL).apply { kind = RuleSetKey.UNDERRIDE }
globalRules.underride?.forEach { rule ->
underrides.pushRules.add(PushRulesMapper.map(rule))
}
realm.insertOrUpdate(underrides)
}
private fun handleDirectChatRooms(realm: Realm, directMessages: UserAccountDataDirectMessages) {
val oldDirectRooms = RoomSummaryEntity.getDirectRooms(realm)
oldDirectRooms.forEach {
it.isDirect = false
it.directUserId = null
}
directMessages.content.forEach {
val userId = it.key
it.value.forEach { roomId ->
val roomSummaryEntity = RoomSummaryEntity.where(realm, roomId).findFirst()
if (roomSummaryEntity != null) {
roomSummaryEntity.isDirect = true
roomSummaryEntity.directUserId = userId
realm.insertOrUpdate(roomSummaryEntity)
}
}
}
}
private fun handleIgnoredUsers(realm: Realm, userAccountDataIgnoredUsers: UserAccountDataIgnoredUsers) {
val userIds = userAccountDataIgnoredUsers.content.ignoredUsers.keys
realm.where(IgnoredUserEntity::class.java)
.findAll()
.deleteAllFromRealm()
// And save the new received list
userIds.forEach { realm.createObject(IgnoredUserEntity::class.java).apply { userId = it } }
// TODO If not initial sync, we should execute a init sync
}
private fun handleBreadcrumbs(userAccountDataBreadcrumbs: UserAccountDataBreadcrumbs) {
saveBreadcrumbsTask
.configureWith(SaveBreadcrumbsTask.Params(userAccountDataBreadcrumbs.content.recentRoomIds))
.executeBy(taskExecutor)
private fun handleBreadcrumbs(realm: Realm, userAccountDataBreadcrumbs: UserAccountDataBreadcrumbs) {
val recentRoomIds = userAccountDataBreadcrumbs.content.recentRoomIds
val entity = BreadcrumbsEntity.getOrCreate(realm)
// And save the new received list
entity.recentRoomIds = RealmList<String>().apply { addAll(recentRoomIds) }
// Update the room summaries
// Reset all the indexes...
RoomSummaryEntity.where(realm)
.greaterThan(RoomSummaryEntityFields.BREADCRUMBS_INDEX, RoomSummaryEntity.NOT_IN_BREADCRUMBS)
.findAll()
.forEach {
it.breadcrumbsIndex = RoomSummaryEntity.NOT_IN_BREADCRUMBS
}
// ...and apply new indexes
recentRoomIds.forEachIndexed { index, roomId ->
RoomSummaryEntity.where(realm, roomId)
.findFirst()
?.breadcrumbsIndex = index
}
}
}

View file

@ -18,21 +18,18 @@ package im.vector.matrix.android.internal.session.sync.job
import android.app.Service
import android.content.Intent
import android.os.IBinder
import com.squareup.moshi.JsonEncodingException
import im.vector.matrix.android.api.Matrix
import im.vector.matrix.android.api.MatrixCallback
import im.vector.matrix.android.api.failure.Failure
import im.vector.matrix.android.api.failure.MatrixError
import im.vector.matrix.android.api.util.Cancelable
import im.vector.matrix.android.api.failure.isTokenError
import im.vector.matrix.android.api.session.Session
import im.vector.matrix.android.api.session.sync.SyncState
import im.vector.matrix.android.internal.network.NetworkConnectivityChecker
import im.vector.matrix.android.internal.session.sync.SyncTask
import im.vector.matrix.android.internal.task.TaskExecutor
import im.vector.matrix.android.internal.task.TaskThread
import im.vector.matrix.android.internal.task.configureWith
import im.vector.matrix.android.internal.util.BackgroundDetectionObserver
import im.vector.matrix.android.internal.util.MatrixCoroutineDispatchers
import kotlinx.coroutines.*
import timber.log.Timber
import java.net.SocketTimeoutException
import java.util.Timer
import java.util.TimerTask
import java.util.concurrent.atomic.AtomicBoolean
/**
* Can execute periodic sync task.
@ -40,33 +37,46 @@ import java.util.TimerTask
* in order to be able to perform a sync even if the app is not running.
* The <receiver> and <service> must be declared in the Manifest or the app using the SDK
*/
open class SyncService : Service() {
abstract class SyncService : Service() {
private var userId: String? = null
private var mIsSelfDestroyed: Boolean = false
private var cancelableTask: Cancelable? = null
private var isInitialSync: Boolean = false
private lateinit var session: Session
private lateinit var syncTask: SyncTask
private lateinit var networkConnectivityChecker: NetworkConnectivityChecker
private lateinit var taskExecutor: TaskExecutor
private lateinit var coroutineDispatchers: MatrixCoroutineDispatchers
private lateinit var backgroundDetectionObserver: BackgroundDetectionObserver
var timer = Timer()
private val isRunning = AtomicBoolean(false)
private val serviceScope = CoroutineScope(SupervisorJob())
override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int {
Timber.i("onStartCommand $intent")
intent?.let {
val userId = it.getStringExtra(EXTRA_USER_ID)
val sessionComponent = Matrix.getInstance(applicationContext).sessionManager.getSessionComponent(userId)
val matrix = Matrix.getInstance(applicationContext)
val safeUserId = it.getStringExtra(EXTRA_USER_ID) ?: return@let
val sessionComponent = matrix.sessionManager.getSessionComponent(safeUserId)
?: return@let
session = sessionComponent.session()
userId = safeUserId
syncTask = sessionComponent.syncTask()
isInitialSync = !session.hasAlreadySynced()
networkConnectivityChecker = sessionComponent.networkConnectivityChecker()
taskExecutor = sessionComponent.taskExecutor()
if (cancelableTask == null) {
timer.cancel()
timer = Timer()
doSync(true)
coroutineDispatchers = sessionComponent.coroutineDispatchers()
backgroundDetectionObserver = matrix.backgroundDetectionObserver
onStart(isInitialSync)
if (isRunning.get()) {
Timber.i("Received a start while was already syncing... ignore")
} else {
// Already syncing ignore
Timber.i("Received a start while was already syncking... ignore")
isRunning.set(true)
serviceScope.launch(coroutineDispatchers.io) {
doSync()
}
}
}
// No intent just start the service, an alarm will should call with intent
@ -75,98 +85,61 @@ open class SyncService : Service() {
override fun onDestroy() {
Timber.i("## onDestroy() : $this")
if (!mIsSelfDestroyed) {
Timber.w("## Destroy by the system : $this")
}
cancelableTask?.cancel()
serviceScope.coroutineContext.cancelChildren()
isRunning.set(false)
super.onDestroy()
}
fun stopMe() {
timer.cancel()
timer = Timer()
cancelableTask?.cancel()
private fun stopMe() {
mIsSelfDestroyed = true
stopSelf()
}
fun doSync(once: Boolean = false) {
if (!networkConnectivityChecker.hasInternetAccess) {
Timber.v("No internet access. Waiting...")
// TODO Retry in ?
timer.schedule(object : TimerTask() {
override fun run() {
doSync()
}
}, NO_NETWORK_DELAY)
} else {
Timber.v("Execute sync request with timeout 0")
val params = SyncTask.Params(TIME_OUT)
cancelableTask = syncTask
.configureWith(params) {
callbackThread = TaskThread.SYNC
executionThread = TaskThread.SYNC
callback = object : MatrixCallback<Unit> {
override fun onSuccess(data: Unit) {
cancelableTask = null
if (!once) {
timer.schedule(object : TimerTask() {
override fun run() {
doSync()
}
}, NEXT_BATCH_DELAY)
} else {
// stop
stopMe()
}
}
override fun onFailure(failure: Throwable) {
Timber.e(failure)
cancelableTask = null
if (failure is Failure.NetworkConnection
&& failure.cause is SocketTimeoutException) {
// Timeout are not critical
timer.schedule(object : TimerTask() {
override fun run() {
doSync()
}
}, 5_000L)
}
if (failure !is Failure.NetworkConnection
|| failure.cause is JsonEncodingException) {
// Wait 10s before retrying
timer.schedule(object : TimerTask() {
override fun run() {
doSync()
}
}, 5_000L)
}
if (failure is Failure.ServerError
&& (failure.error.code == MatrixError.M_UNKNOWN_TOKEN || failure.error.code == MatrixError.M_MISSING_TOKEN)) {
// No token or invalid token, stop the thread
stopSelf()
}
}
}
}
.executeBy(taskExecutor)
private suspend fun doSync() {
if (!networkConnectivityChecker.hasInternetAccess()) {
Timber.v("No network reschedule to avoid wasting resources")
userId?.also {
onRescheduleAsked(it, isInitialSync, delay = 10_000L)
}
stopMe()
return
}
Timber.v("Execute sync request with timeout 0")
val params = SyncTask.Params(TIME_OUT)
try {
syncTask.execute(params)
// Start sync if we were doing an initial sync and the syncThread is not launched yet
if (isInitialSync && session.syncState().value == SyncState.Idle) {
val isForeground = !backgroundDetectionObserver.isInBackground
session.startSync(isForeground)
}
stopMe()
} catch (throwable: Throwable) {
Timber.e(throwable)
if (throwable.isTokenError()) {
stopMe()
} else {
Timber.v("Retry to sync in 5s")
delay(DELAY_FAILURE)
doSync()
}
}
}
abstract fun onStart(isInitialSync: Boolean)
abstract fun onRescheduleAsked(userId: String, isInitialSync: Boolean, delay: Long)
override fun onBind(intent: Intent?): IBinder? {
return null
}
companion object {
const val EXTRA_USER_ID = "EXTRA_USER_ID"
const val TIME_OUT = 0L
const val NEXT_BATCH_DELAY = 60_000L
const val NO_NETWORK_DELAY = 5_000L
private const val TIME_OUT = 0L
private const val DELAY_FAILURE = 5_000L
}
}

View file

@ -19,20 +19,15 @@ package im.vector.matrix.android.internal.session.sync.job
import androidx.lifecycle.LiveData
import androidx.lifecycle.MutableLiveData
import com.squareup.moshi.JsonEncodingException
import im.vector.matrix.android.api.MatrixCallback
import im.vector.matrix.android.api.failure.Failure
import im.vector.matrix.android.api.failure.MatrixError
import im.vector.matrix.android.api.failure.isTokenError
import im.vector.matrix.android.api.session.sync.SyncState
import im.vector.matrix.android.api.util.Cancelable
import im.vector.matrix.android.internal.network.NetworkConnectivityChecker
import im.vector.matrix.android.internal.session.sync.SyncTask
import im.vector.matrix.android.internal.task.TaskExecutor
import im.vector.matrix.android.internal.task.TaskThread
import im.vector.matrix.android.internal.task.configureWith
import im.vector.matrix.android.internal.util.BackgroundDetectionObserver
import kotlinx.coroutines.*
import timber.log.Timber
import java.net.SocketTimeoutException
import java.util.concurrent.CountDownLatch
import javax.inject.Inject
private const val RETRY_WAIT_TIME_MS = 10_000L
@ -40,14 +35,13 @@ private const val DEFAULT_LONG_POOL_TIMEOUT = 30_000L
internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
private val networkConnectivityChecker: NetworkConnectivityChecker,
private val backgroundDetectionObserver: BackgroundDetectionObserver,
private val taskExecutor: TaskExecutor
) : Thread(), NetworkConnectivityChecker.Listener, BackgroundDetectionObserver.Listener {
private val backgroundDetectionObserver: BackgroundDetectionObserver)
: Thread(), NetworkConnectivityChecker.Listener, BackgroundDetectionObserver.Listener {
private var state: SyncState = SyncState.Idle
private var liveState = MutableLiveData<SyncState>()
private val lock = Object()
private var cancelableTask: Cancelable? = null
private val syncScope = CoroutineScope(SupervisorJob())
private var isStarted = false
private var isTokenValid = true
@ -75,14 +69,14 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
if (isStarted) {
Timber.v("Pause sync...")
isStarted = false
cancelableTask?.cancel()
syncScope.coroutineContext.cancelChildren()
}
}
fun kill() = synchronized(lock) {
Timber.v("Kill sync...")
updateStateTo(SyncState.Killing)
cancelableTask?.cancel()
syncScope.coroutineContext.cancelChildren()
lock.notify()
}
@ -102,11 +96,9 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
isStarted = true
networkConnectivityChecker.register(this)
backgroundDetectionObserver.register(this)
while (state != SyncState.Killing) {
Timber.v("Entering loop, state: $state")
if (!networkConnectivityChecker.hasInternetAccess) {
if (!networkConnectivityChecker.hasInternetAccess()) {
Timber.v("No network. Waiting...")
updateStateTo(SyncState.NoNetwork)
synchronized(lock) { lock.wait() }
@ -125,58 +117,16 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
if (state !is SyncState.Running) {
updateStateTo(SyncState.Running(afterPause = true))
}
// No timeout after a pause
val timeout = state.let { if (it is SyncState.Running && it.afterPause) 0 else DEFAULT_LONG_POOL_TIMEOUT }
Timber.v("Execute sync request with timeout $timeout")
val latch = CountDownLatch(1)
val params = SyncTask.Params(timeout)
cancelableTask = syncTask.configureWith(params) {
this.callbackThread = TaskThread.SYNC
this.executionThread = TaskThread.SYNC
this.callback = object : MatrixCallback<Unit> {
override fun onSuccess(data: Unit) {
Timber.v("onSuccess")
latch.countDown()
}
override fun onFailure(failure: Throwable) {
if (failure is Failure.NetworkConnection && failure.cause is SocketTimeoutException) {
// Timeout are not critical
Timber.v("Timeout")
} else if (failure is Failure.Cancelled) {
Timber.v("Cancelled")
} else if (failure is Failure.ServerError
&& (failure.error.code == MatrixError.M_UNKNOWN_TOKEN || failure.error.code == MatrixError.M_MISSING_TOKEN)) {
// No token or invalid token
Timber.w(failure)
isTokenValid = false
isStarted = false
} else {
Timber.e(failure)
if (failure !is Failure.NetworkConnection || failure.cause is JsonEncodingException) {
// Wait 10s before retrying
Timber.v("Wait 10s")
sleep(RETRY_WAIT_TIME_MS)
}
}
latch.countDown()
}
}
val sync = syncScope.launch {
doSync(params)
}
.executeBy(taskExecutor)
latch.await()
state.let {
if (it is SyncState.Running && it.afterPause) {
updateStateTo(SyncState.Running(afterPause = false))
}
runBlocking {
sync.join()
}
Timber.v("...Continue")
}
}
@ -186,6 +136,37 @@ internal class SyncThread @Inject constructor(private val syncTask: SyncTask,
networkConnectivityChecker.unregister(this)
}
private suspend fun doSync(params: SyncTask.Params) {
try {
syncTask.execute(params)
} catch (failure: Throwable) {
if (failure is Failure.NetworkConnection && failure.cause is SocketTimeoutException) {
// Timeout are not critical
Timber.v("Timeout")
} else if (failure is Failure.Cancelled) {
Timber.v("Cancelled")
} else if (failure.isTokenError()) {
// No token or invalid token, stop the thread
Timber.w(failure)
isStarted = false
isTokenValid = false
} else {
Timber.e(failure)
if (failure !is Failure.NetworkConnection || failure.cause is JsonEncodingException) {
// Wait 10s before retrying
Timber.v("Wait 10s")
delay(RETRY_WAIT_TIME_MS)
}
}
} finally {
state.let {
if (it is SyncState.Running && it.afterPause) {
updateStateTo(SyncState.Running(afterPause = false))
}
}
}
}
private fun updateStateTo(newState: SyncState) {
Timber.v("Update state from $state to $newState")
state = newState

View file

@ -18,14 +18,14 @@ package im.vector.matrix.android.internal.session.sync.job
import android.content.Context
import androidx.work.*
import com.squareup.moshi.JsonClass
import im.vector.matrix.android.api.failure.isTokenError
import im.vector.matrix.android.internal.network.NetworkConnectivityChecker
import im.vector.matrix.android.internal.session.sync.SyncTask
import im.vector.matrix.android.internal.task.TaskExecutor
import im.vector.matrix.android.internal.util.MatrixCoroutineDispatchers
import im.vector.matrix.android.internal.worker.WorkManagerUtil
import im.vector.matrix.android.internal.worker.WorkManagerUtil.matrixOneTimeWorkRequestBuilder
import im.vector.matrix.android.internal.worker.WorkerParamsFactory
import im.vector.matrix.android.internal.worker.getSessionComponent
import kotlinx.coroutines.withContext
import timber.log.Timber
import java.util.concurrent.TimeUnit
import javax.inject.Inject
@ -45,46 +45,58 @@ internal class SyncWorker(context: Context,
@Inject lateinit var syncTask: SyncTask
@Inject lateinit var taskExecutor: TaskExecutor
@Inject lateinit var coroutineDispatchers: MatrixCoroutineDispatchers
@Inject lateinit var networkConnectivityChecker: NetworkConnectivityChecker
override suspend fun doWork(): Result {
Timber.i("Sync work starting")
val params = WorkerParamsFactory.fromData<Params>(inputData) ?: return Result.success()
val sessionComponent = getSessionComponent(params.userId) ?: return Result.success()
sessionComponent.inject(this)
runCatching {
withContext(coroutineDispatchers.sync) {
val taskParams = SyncTask.Params(0)
syncTask.execute(taskParams)
}
}
return Result.success()
return runCatching {
doSync(params.timeout)
}.fold(
{ Result.success() },
{ failure ->
if (failure.isTokenError() || !params.automaticallyRetry) {
Result.failure()
} else {
Result.retry()
}
}
)
}
private suspend fun doSync(timeout: Long) {
val taskParams = SyncTask.Params(timeout)
syncTask.execute(taskParams)
}
companion object {
const val BG_SYNC_WORK_NAME = "BG_SYNCP"
fun requireBackgroundSync(context: Context, userId: String, serverTimeout: Long = 0) {
val data = WorkerParamsFactory.toData(Params(userId, serverTimeout, false))
val workRequest = matrixOneTimeWorkRequestBuilder<SyncWorker>()
.setInputData(data)
.setConstraints(WorkManagerUtil.workConstraints)
.setBackoffCriteria(BackoffPolicy.LINEAR, 1_000, TimeUnit.MILLISECONDS)
.setInputData(data)
.build()
WorkManager.getInstance(context).enqueueUniqueWork("BG_SYNCP", ExistingWorkPolicy.REPLACE, workRequest)
WorkManager.getInstance(context).enqueueUniqueWork(BG_SYNC_WORK_NAME, ExistingWorkPolicy.REPLACE, workRequest)
}
fun automaticallyBackgroundSync(context: Context, userId: String, serverTimeout: Long = 0, delay: Long = 30_000) {
val data = WorkerParamsFactory.toData(Params(userId, serverTimeout, true))
val workRequest = matrixOneTimeWorkRequestBuilder<SyncWorker>()
.setInputData(data)
.setConstraints(WorkManagerUtil.workConstraints)
.setInputData(data)
.setBackoffCriteria(BackoffPolicy.LINEAR, delay, TimeUnit.MILLISECONDS)
.build()
WorkManager.getInstance(context).enqueueUniqueWork("BG_SYNCP", ExistingWorkPolicy.REPLACE, workRequest)
WorkManager.getInstance(context).enqueueUniqueWork(BG_SYNC_WORK_NAME, ExistingWorkPolicy.REPLACE, workRequest)
}
fun stopAnyBackgroundSync(context: Context) {
WorkManager.getInstance(context).cancelUniqueWork("BG_SYNCP")
WorkManager.getInstance(context).cancelUniqueWork(BG_SYNC_WORK_NAME)
}
}
}

View file

@ -0,0 +1,87 @@
/*
* Copyright 2019 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.matrix.android.internal.task
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import java.util.concurrent.Executors
/**
* This class intends to be used for ensure suspendable methods are played sequentially all the way long.
*/
internal interface CoroutineSequencer<T> {
/**
* @param block the suspendable block to execute
* @return the result of the block
*/
suspend fun post(block: suspend () -> T): T
/**
* Cancel all and close, so you won't be able to post anything else after
*/
fun close()
}
internal open class ChannelCoroutineSequencer<T> : CoroutineSequencer<T> {
private data class Message<T>(
val block: suspend () -> T,
val deferred: CompletableDeferred<T>
)
private var messageChannel: Channel<Message<T>> = Channel()
private val coroutineScope = CoroutineScope(SupervisorJob())
// This will ensure
private val singleDispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
init {
launchCoroutine()
}
private fun launchCoroutine() {
coroutineScope.launch(singleDispatcher) {
for (message in messageChannel) {
try {
val result = message.block()
message.deferred.complete(result)
} catch (exception: Throwable) {
message.deferred.completeExceptionally(exception)
}
}
}
}
override fun close() {
coroutineScope.coroutineContext.cancelChildren()
messageChannel.close()
}
override suspend fun post(block: suspend () -> T): T {
val deferred = CompletableDeferred<T>()
val message = Message(block, deferred)
messageChannel.send(message)
return try {
deferred.await()
} catch (cancellation: CancellationException) {
// In case of cancellation, we stop the current coroutine context
// and relaunch one to consume next messages
coroutineScope.coroutineContext.cancelChildren()
launchCoroutine()
throw cancellation
}
}
}

View file

@ -85,6 +85,5 @@ internal class TaskExecutor @Inject constructor(private val coroutineDispatchers
TaskThread.IO -> coroutineDispatchers.io
TaskThread.CALLER -> EmptyCoroutineContext
TaskThread.CRYPTO -> coroutineDispatchers.crypto
TaskThread.SYNC -> coroutineDispatchers.sync
}
}

View file

@ -21,6 +21,5 @@ internal enum class TaskThread {
COMPUTATION,
IO,
CALLER,
CRYPTO,
SYNC
CRYPTO
}

View file

@ -29,7 +29,7 @@ import javax.inject.Inject
@MatrixScope
internal class BackgroundDetectionObserver @Inject constructor() : LifecycleObserver {
var isIsBackground: Boolean = false
var isInBackground: Boolean = false
private set
private
@ -46,14 +46,14 @@ internal class BackgroundDetectionObserver @Inject constructor() : LifecycleObse
@OnLifecycleEvent(Lifecycle.Event.ON_START)
fun onMoveToForeground() {
Timber.v("App returning to foreground…")
isIsBackground = false
isInBackground = false
listeners.forEach { it.onMoveToForeground() }
}
@OnLifecycleEvent(Lifecycle.Event.ON_STOP)
fun onMoveToBackground() {
Timber.v("App going to background…")
isIsBackground = true
isInBackground = true
listeners.forEach { it.onMoveToBackground() }
}

View file

@ -22,6 +22,5 @@ internal data class MatrixCoroutineDispatchers(
val io: CoroutineDispatcher,
val computation: CoroutineDispatcher,
val main: CoroutineDispatcher,
val crypto: CoroutineDispatcher,
val sync: CoroutineDispatcher
val crypto: CoroutineDispatcher
)

View file

@ -0,0 +1,129 @@
/*
* Copyright 2019 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.matrix.android.internal.task
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.delay
import kotlinx.coroutines.joinAll
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.Assert.assertEquals
import org.junit.Test
import java.util.concurrent.Executors
class CoroutineSequencersTest {
private val dispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
@Test
fun sequencer_should_run_sequential() {
val sequencer = ChannelCoroutineSequencer<String>()
val results = ArrayList<String>()
val jobs = listOf(
GlobalScope.launch(dispatcher) {
sequencer.post { suspendingMethod("#1") }.also {
results.add(it)
}
},
GlobalScope.launch(dispatcher) {
sequencer.post { suspendingMethod("#2") }.also {
results.add(it)
}
},
GlobalScope.launch(dispatcher) {
sequencer.post { suspendingMethod("#3") }.also {
results.add(it)
}
}
)
runBlocking {
jobs.joinAll()
}
assertEquals(3, results.size)
assertEquals(results[0], "#1")
assertEquals(results[1], "#2")
assertEquals(results[2], "#3")
}
@Test
fun sequencer_should_run_parallel() {
val sequencer1 = ChannelCoroutineSequencer<String>()
val sequencer2 = ChannelCoroutineSequencer<String>()
val sequencer3 = ChannelCoroutineSequencer<String>()
val results = ArrayList<String>()
val jobs = listOf(
GlobalScope.launch(dispatcher) {
sequencer1.post { suspendingMethod("#1") }.also {
results.add(it)
}
},
GlobalScope.launch(dispatcher) {
sequencer2.post { suspendingMethod("#2") }.also {
results.add(it)
}
},
GlobalScope.launch(dispatcher) {
sequencer3.post { suspendingMethod("#3") }.also {
results.add(it)
}
}
)
runBlocking {
jobs.joinAll()
}
assertEquals(3, results.size)
}
@Test
fun sequencer_should_jump_to_next_when_current_job_canceled() {
val sequencer = ChannelCoroutineSequencer<String>()
val results = ArrayList<String>()
val jobs = listOf(
GlobalScope.launch(dispatcher) {
sequencer.post { suspendingMethod("#1") }.also {
results.add(it)
}
},
GlobalScope.launch(dispatcher) {
val result = sequencer.post { suspendingMethod("#2") }.also {
results.add(it)
}
println("Result: $result")
},
GlobalScope.launch(dispatcher) {
sequencer.post { suspendingMethod("#3") }.also {
results.add(it)
}
}
)
// We are canceling the second job
jobs[1].cancel()
runBlocking {
jobs.joinAll()
}
assertEquals(2, results.size)
}
private suspend fun suspendingMethod(name: String): String {
println("BLOCKING METHOD $name STARTS on ${Thread.currentThread().name}")
delay(1000)
println("BLOCKING METHOD $name ENDS on ${Thread.currentThread().name}")
return name
}
}

View file

@ -4,7 +4,6 @@
<uses-permission android:name="android.permission.WAKE_LOCK" />
<uses-permission android:name="android.permission.RECEIVE_BOOT_COMPLETED" />
<uses-permission android:name="android.permission.FOREGROUND_SERVICE" />
<application>
@ -20,10 +19,6 @@
android:enabled="true"
android:exported="false" />
<service
android:name=".fdroid.service.VectorSyncService"
android:exported="false" />
</application>
</manifest>

View file

@ -25,7 +25,7 @@ import android.os.Build
import android.os.PowerManager
import androidx.core.content.ContextCompat
import im.vector.matrix.android.internal.session.sync.job.SyncService
import im.vector.riotx.fdroid.service.VectorSyncService
import im.vector.riotx.core.services.VectorSyncService
import timber.log.Timber
class AlarmSyncBroadcastReceiver : BroadcastReceiver() {
@ -41,14 +41,9 @@ class AlarmSyncBroadcastReceiver : BroadcastReceiver() {
val userId = intent.getStringExtra(SyncService.EXTRA_USER_ID)
// This method is called when the BroadcastReceiver is receiving an Intent broadcast.
Timber.d("RestartBroadcastReceiver received intent")
Intent(context, VectorSyncService::class.java).also {
it.putExtra(SyncService.EXTRA_USER_ID, userId)
VectorSyncService.newIntent(context, userId).also {
try {
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {
ContextCompat.startForegroundService(context, it)
} else {
context.startService(it)
}
ContextCompat.startForegroundService(context, it)
} catch (ex: Throwable) {
// TODO
Timber.e(ex)
@ -79,6 +74,7 @@ class AlarmSyncBroadcastReceiver : BroadcastReceiver() {
}
fun cancelAlarm(context: Context) {
Timber.v("Cancel alarm")
val intent = Intent(context, AlarmSyncBroadcastReceiver::class.java)
val pIntent = PendingIntent.getBroadcast(context, REQUEST_CODE, intent, PendingIntent.FLAG_UPDATE_CURRENT)
val alarmMgr = context.getSystemService(Context.ALARM_SERVICE) as AlarmManager

View file

@ -5,6 +5,7 @@
<uses-permission android:name="android.permission.INTERNET" />
<uses-permission android:name="android.permission.READ_CONTACTS" />
<uses-permission android:name="android.permission.FOREGROUND_SERVICE" />
<application
android:name=".VectorApplication"
@ -126,6 +127,10 @@
android:name=".core.services.CallService"
android:exported="false" />
<service
android:name=".core.services.VectorSyncService"
android:exported="false" />
<!-- Receivers -->
<!-- Exported false, should only be accessible from this app!! -->

View file

@ -116,11 +116,12 @@ class VectorApplication : Application(), HasVectorInjector, MatrixConfiguration.
if (authenticationService.hasAuthenticatedSessions() && !activeSessionHolder.hasActiveSession()) {
val lastAuthenticatedSession = authenticationService.getLastAuthenticatedSession()!!
activeSessionHolder.setActiveSession(lastAuthenticatedSession)
lastAuthenticatedSession.configureAndStart(pushRuleTriggerListener, sessionListener)
lastAuthenticatedSession.configureAndStart(applicationContext, pushRuleTriggerListener, sessionListener)
}
ProcessLifecycleOwner.get().lifecycle.addObserver(object : LifecycleObserver {
@OnLifecycleEvent(Lifecycle.Event.ON_RESUME)
fun entersForeground() {
Timber.i("App entered foreground")
FcmHelper.onEnterForeground(appContext)
activeSessionHolder.getSafeActiveSession()?.also {
it.stopAnyBackgroundSync()

View file

@ -16,24 +16,26 @@
package im.vector.riotx.core.extensions
import android.content.Context
import androidx.core.content.ContextCompat
import androidx.lifecycle.Lifecycle
import androidx.lifecycle.ProcessLifecycleOwner
import im.vector.matrix.android.api.session.Session
import im.vector.matrix.android.api.session.crypto.keysbackup.KeysBackupState
import im.vector.matrix.android.api.session.sync.FilterService
import im.vector.riotx.core.services.VectorSyncService
import im.vector.riotx.features.notifications.PushRuleTriggerListener
import im.vector.riotx.features.session.SessionListener
import timber.log.Timber
fun Session.configureAndStart(pushRuleTriggerListener: PushRuleTriggerListener,
fun Session.configureAndStart(context: Context,
pushRuleTriggerListener: PushRuleTriggerListener,
sessionListener: SessionListener) {
open()
addListener(sessionListener)
setFilter(FilterService.FilterPreset.RiotFilter)
Timber.i("Configure and start session for ${this.myUserId}")
val isAtLeastStarted = ProcessLifecycleOwner.get().lifecycle.currentState.isAtLeast(Lifecycle.State.STARTED)
Timber.v("--> is at least started? $isAtLeastStarted")
startSync(isAtLeastStarted)
startSyncing(context)
refreshPushers()
pushRuleTriggerListener.startWithSession(this)
@ -42,6 +44,24 @@ fun Session.configureAndStart(pushRuleTriggerListener: PushRuleTriggerListener,
// @Inject lateinit var keyRequestHandler: KeyRequestHandler
}
fun Session.startSyncing(context: Context) {
val applicationContext = context.applicationContext
if (!hasAlreadySynced()) {
VectorSyncService.newIntent(applicationContext, myUserId).also {
try {
ContextCompat.startForegroundService(applicationContext, it)
} catch (ex: Throwable) {
// TODO
Timber.e(ex)
}
}
} else {
val isAtLeastStarted = ProcessLifecycleOwner.get().lifecycle.currentState.isAtLeast(Lifecycle.State.STARTED)
Timber.v("--> is at least started? $isAtLeastStarted")
startSync(isAtLeastStarted)
}
}
/**
* Tell is the session has unsaved e2e keys in the backup
*/

View file

@ -31,6 +31,7 @@ import butterknife.Unbinder
import com.airbnb.mvrx.BaseMvRxFragment
import com.airbnb.mvrx.MvRx
import com.bumptech.glide.util.Util.assertMainThread
import com.google.android.material.snackbar.Snackbar
import im.vector.riotx.core.di.DaggerScreenComponent
import im.vector.riotx.core.di.HasScreenInjector
import im.vector.riotx.core.di.ScreenComponent
@ -167,6 +168,13 @@ abstract class VectorBaseFragment : BaseMvRxFragment(), HasScreenInjector {
return this
}
protected fun showErrorInSnackbar(throwable: Throwable) {
vectorBaseActivity.coordinatorLayout?.let {
Snackbar.make(it, errorFormatter.toHumanReadable(throwable), Snackbar.LENGTH_SHORT)
.show()
}
}
/* ==========================================================================================
* Toolbar
* ========================================================================================== */

View file

@ -13,9 +13,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.riotx.fdroid.service
package im.vector.riotx.core.services
import android.app.AlarmManager
import android.app.NotificationManager
import android.app.PendingIntent
import android.content.Context
import android.content.Intent
import android.os.Build
@ -23,10 +25,18 @@ import im.vector.matrix.android.internal.session.sync.job.SyncService
import im.vector.riotx.R
import im.vector.riotx.core.extensions.vectorComponent
import im.vector.riotx.features.notifications.NotificationUtils
import timber.log.Timber
class VectorSyncService : SyncService() {
companion object {
fun newIntent(context: Context, userId: String): Intent {
return Intent(context, VectorSyncService::class.java).also {
it.putExtra(EXTRA_USER_ID, userId)
}
}
}
private lateinit var notificationUtils: NotificationUtils
override fun onCreate() {
@ -34,6 +44,22 @@ class VectorSyncService : SyncService() {
notificationUtils = vectorComponent().notificationUtils()
}
override fun onStart(isInitialSync: Boolean) {
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {
val notificationSubtitleRes = if (isInitialSync) {
R.string.notification_initial_sync
} else {
R.string.notification_listening_for_events
}
val notification = notificationUtils.buildForegroundServiceNotification(notificationSubtitleRes, false)
startForeground(NotificationUtils.NOTIFICATION_ID_FOREGROUND_SERVICE, notification)
}
}
override fun onRescheduleAsked(userId: String, isInitialSync: Boolean, delay: Long) {
reschedule(userId, delay)
}
override fun onDestroy() {
removeForegroundNotif()
super.onDestroy()
@ -44,16 +70,18 @@ class VectorSyncService : SyncService() {
notificationManager.cancel(NotificationUtils.NOTIFICATION_ID_FOREGROUND_SERVICE)
}
/**
* Service is started only in fdroid mode when no FCM is available
* Otherwise it is bounded
*/
override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int {
Timber.v("VectorSyncService - onStartCommand ")
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {
val notification = notificationUtils.buildForegroundServiceNotification(R.string.notification_listening_for_events, false)
startForeground(NotificationUtils.NOTIFICATION_ID_FOREGROUND_SERVICE, notification)
private fun reschedule(userId: String, delay: Long) {
val pendingIntent = if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.O) {
PendingIntent.getForegroundService(this, 0, newIntent(this, userId), 0)
} else {
PendingIntent.getService(this, 0, newIntent(this, userId), 0)
}
val firstMillis = System.currentTimeMillis() + delay
val alarmMgr = getSystemService(Context.ALARM_SERVICE) as AlarmManager
if (Build.VERSION.SDK_INT >= Build.VERSION_CODES.M) {
alarmMgr.setAndAllowWhileIdle(AlarmManager.RTC_WAKEUP, firstMillis, pendingIntent)
} else {
alarmMgr.set(AlarmManager.RTC_WAKEUP, firstMillis, pendingIntent)
}
return super.onStartCommand(intent, flags, startId)
}
}

View file

@ -28,6 +28,7 @@ import im.vector.riotx.R
import im.vector.riotx.core.di.ActiveSessionHolder
import im.vector.riotx.core.di.ScreenComponent
import im.vector.riotx.core.error.ErrorFormatter
import im.vector.riotx.core.extensions.startSyncing
import im.vector.riotx.core.platform.VectorBaseActivity
import im.vector.riotx.core.utils.deleteAllFiles
import im.vector.riotx.features.home.HomeActivity
@ -84,11 +85,9 @@ class MainActivity : VectorBaseActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
args = parseArgs()
if (args.clearCredentials || args.isUserLoggedOut) {
clearNotifications()
}
// Handle some wanted cleanup
if (args.clearCache || args.clearCredentials) {
doCleanUp()
@ -116,24 +115,32 @@ class MainActivity : VectorBaseActivity() {
}
private fun doCleanUp() {
val session = sessionHolder.getSafeActiveSession()
if (session == null) {
startNextActivityAndFinish()
return
}
when {
args.clearCredentials -> sessionHolder.getActiveSession().signOut(
args.clearCredentials -> session.signOut(
!args.isUserLoggedOut,
object : MatrixCallback<Unit> {
override fun onSuccess(data: Unit) {
Timber.w("SIGN_OUT: success, start app")
sessionHolder.clearActiveSession()
doLocalCleanupAndStart()
doLocalCleanup()
startNextActivityAndFinish()
}
override fun onFailure(failure: Throwable) {
displayError(failure)
}
})
args.clearCache -> sessionHolder.getActiveSession().clearCache(
args.clearCache -> session.clearCache(
object : MatrixCallback<Unit> {
override fun onSuccess(data: Unit) {
doLocalCleanupAndStart()
doLocalCleanup()
session.startSyncing(applicationContext)
startNextActivityAndFinish()
}
override fun onFailure(failure: Throwable) {
@ -148,7 +155,7 @@ class MainActivity : VectorBaseActivity() {
Timber.w("Ignoring invalid token global error")
}
private fun doLocalCleanupAndStart() {
private fun doLocalCleanup() {
GlobalScope.launch(Dispatchers.Main) {
// On UI Thread
Glide.get(this@MainActivity).clearMemory()
@ -160,8 +167,6 @@ class MainActivity : VectorBaseActivity() {
deleteAllFiles(this@MainActivity.cacheDir)
}
}
startNextActivityAndFinish()
}
private fun displayError(failure: Throwable) {

View file

@ -52,6 +52,14 @@ class AvatarRenderer @Inject constructor(private val activeSessionHolder: Active
DrawableImageViewTarget(imageView))
}
@UiThread
fun render(matrixItem: MatrixItem, imageView: ImageView, glideRequests: GlideRequests) {
render(imageView.context,
glideRequests,
matrixItem,
DrawableImageViewTarget(imageView))
}
@UiThread
fun render(context: Context,
glideRequest: GlideRequests,

View file

@ -30,6 +30,7 @@ import im.vector.matrix.android.api.session.group.model.GroupSummary
import im.vector.matrix.android.api.util.toMatrixItem
import im.vector.riotx.R
import im.vector.riotx.core.extensions.commitTransactionNow
import im.vector.riotx.core.glide.GlideApp
import im.vector.riotx.core.platform.ToolbarConfigurable
import im.vector.riotx.core.platform.VectorBaseFragment
import im.vector.riotx.core.ui.views.KeysBackupBanner
@ -75,7 +76,8 @@ class HomeDetailFragment @Inject constructor(
private fun onGroupChange(groupSummary: GroupSummary?) {
groupSummary?.let {
avatarRenderer.render(it.toMatrixItem(), groupToolbarAvatarImageView)
// Use GlideApp with activity context to avoid the glideRequests to be paused
avatarRenderer.render(it.toMatrixItem(), groupToolbarAvatarImageView, GlideApp.with(requireActivity()))
}
}

View file

@ -283,6 +283,16 @@ class RoomDetailFragment @Inject constructor(
roomDetailViewModel.requestLiveData.observeEvent(this) {
displayRoomDetailActionResult(it)
}
roomDetailViewModel.viewEvents
.observe()
.observeOn(AndroidSchedulers.mainThread())
.subscribe {
when (it) {
is RoomDetailViewEvents.Failure -> showErrorInSnackbar(it.throwable)
}
}
.disposeOnDestroyView()
}
override fun onActivityCreated(savedInstanceState: Bundle?) {

View file

@ -0,0 +1,24 @@
/*
* Copyright 2019 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.riotx.features.home.room.detail
/**
* Transient events for RoomDetail
*/
sealed class RoomDetailViewEvents {
data class Failure(val throwable: Throwable) : RoomDetailViewEvents()
}

View file

@ -27,6 +27,7 @@ import com.squareup.inject.assisted.Assisted
import com.squareup.inject.assisted.AssistedInject
import im.vector.matrix.android.api.MatrixCallback
import im.vector.matrix.android.api.MatrixPatterns
import im.vector.matrix.android.api.failure.Failure
import im.vector.matrix.android.api.session.Session
import im.vector.matrix.android.api.session.events.model.EventType
import im.vector.matrix.android.api.session.events.model.isImageMessage
@ -56,7 +57,9 @@ import im.vector.riotx.core.extensions.postLiveEvent
import im.vector.riotx.core.platform.VectorViewModel
import im.vector.riotx.core.resources.StringProvider
import im.vector.riotx.core.resources.UserPreferencesProvider
import im.vector.riotx.core.utils.DataSource
import im.vector.riotx.core.utils.LiveEvent
import im.vector.riotx.core.utils.PublishDataSource
import im.vector.riotx.core.utils.subscribeLogError
import im.vector.riotx.features.command.CommandParser
import im.vector.riotx.features.command.ParsedCommand
@ -101,6 +104,9 @@ class RoomDetailViewModel @AssistedInject constructor(@Assisted initialState: Ro
private var timelineEvents = PublishRelay.create<List<TimelineEvent>>()
private var timeline = room.createTimeline(eventId, timelineSettings)
private val _viewEvents = PublishDataSource<RoomDetailViewEvents>()
val viewEvents: DataSource<RoomDetailViewEvents> = _viewEvents
// Can be used for several actions, for a one shot result
private val _requestLiveData = MutableLiveData<LiveEvent<Async<RoomDetailAction>>>()
val requestLiveData: LiveData<LiveEvent<Async<RoomDetailAction>>>
@ -819,9 +825,14 @@ class RoomDetailViewModel @AssistedInject constructor(@Assisted initialState: Ro
if (events.isEmpty()) return UnreadState.Unknown
val readMarkerIdSnapshot = roomSummary.readMarkerId ?: return UnreadState.Unknown
val firstDisplayableEventId = timeline.getFirstDisplayableEventId(readMarkerIdSnapshot)
?: return UnreadState.ReadMarkerNotLoaded(readMarkerIdSnapshot)
val firstDisplayableEventIndex = timeline.getIndexOfEvent(firstDisplayableEventId)
?: return UnreadState.ReadMarkerNotLoaded(readMarkerIdSnapshot)
if (firstDisplayableEventId == null || firstDisplayableEventIndex == null) {
return if (timeline.isLive) {
UnreadState.ReadMarkerNotLoaded(readMarkerIdSnapshot)
} else {
UnreadState.Unknown
}
}
for (i in (firstDisplayableEventIndex - 1) downTo 0) {
val timelineEvent = events.getOrNull(i) ?: return UnreadState.Unknown
val eventId = timelineEvent.root.eventId ?: return UnreadState.Unknown
@ -857,10 +868,16 @@ class RoomDetailViewModel @AssistedInject constructor(@Assisted initialState: Ro
}
}
override fun onUpdated(snapshot: List<TimelineEvent>) {
override fun onTimelineUpdated(snapshot: List<TimelineEvent>) {
timelineEvents.accept(snapshot)
}
override fun onTimelineFailure(throwable: Throwable) {
// If we have a critical timeline issue, we get back to live.
timeline.restartWithEventId(null)
_viewEvents.post(RoomDetailViewEvents.Failure(throwable))
}
override fun onCleared() {
timeline.dispose()
timeline.removeListener(this)

View file

@ -220,7 +220,7 @@ class TimelineEventController @Inject constructor(private val dateFormatter: Vec
showingForwardLoader = LoadingItem_()
.id("forward_loading_item_$timestamp")
.setVisibilityStateChangedListener(Timeline.Direction.FORWARDS)
.addWhen(Timeline.Direction.FORWARDS)
.addWhenLoading(Timeline.Direction.FORWARDS)
val timelineModels = getModels()
add(timelineModels)
@ -230,16 +230,20 @@ class TimelineEventController @Inject constructor(private val dateFormatter: Vec
LoadingItem_()
.id("backward_loading_item_$timestamp")
.setVisibilityStateChangedListener(Timeline.Direction.BACKWARDS)
.addWhen(Timeline.Direction.BACKWARDS)
.addWhenLoading(Timeline.Direction.BACKWARDS)
}
}
// Timeline.LISTENER ***************************************************************************
override fun onUpdated(snapshot: List<TimelineEvent>) {
override fun onTimelineUpdated(snapshot: List<TimelineEvent>) {
submitSnapshot(snapshot)
}
override fun onTimelineFailure(throwable: Throwable) {
// no-op, already handled
}
private fun submitSnapshot(newSnapshot: List<TimelineEvent>) {
backgroundHandler.post {
inSubmitList = true
@ -247,6 +251,7 @@ class TimelineEventController @Inject constructor(private val dateFormatter: Vec
currentSnapshot = newSnapshot
val diffResult = DiffUtil.calculateDiff(diffCallback)
diffResult.dispatchUpdatesTo(listUpdateCallback)
requestDelayedModelBuild(100)
inSubmitList = false
}
}
@ -319,7 +324,7 @@ class TimelineEventController @Inject constructor(private val dateFormatter: Vec
/**
* Return true if added
*/
private fun LoadingItem_.addWhen(direction: Timeline.Direction): Boolean {
private fun LoadingItem_.addWhenLoading(direction: Timeline.Direction): Boolean {
val shouldAdd = timeline?.hasMoreToLoad(direction) ?: false
addIf(shouldAdd, this@TimelineEventController)
return shouldAdd

View file

@ -28,7 +28,6 @@ import androidx.recyclerview.widget.LinearLayoutManager
import androidx.recyclerview.widget.RecyclerView
import com.airbnb.epoxy.OnModelBuildFinishedListener
import com.airbnb.mvrx.*
import com.google.android.material.snackbar.Snackbar
import im.vector.matrix.android.api.failure.Failure
import im.vector.matrix.android.api.session.room.model.Membership
import im.vector.matrix.android.api.session.room.model.RoomSummary
@ -104,7 +103,7 @@ class RoomListFragment @Inject constructor(
.subscribe {
when (it) {
is RoomListViewEvents.SelectRoom -> openSelectedRoom(it)
is RoomListViewEvents.Failure -> showError(it)
is RoomListViewEvents.Failure -> showErrorInSnackbar(it.throwable)
}
}
.disposeOnDestroyView()
@ -135,13 +134,6 @@ class RoomListFragment @Inject constructor(
}
}
private fun showError(event: RoomListViewEvents.Failure) {
vectorBaseActivity.coordinatorLayout?.let {
Snackbar.make(it, errorFormatter.toHumanReadable(event.throwable), Snackbar.LENGTH_SHORT)
.show()
}
}
private fun setupCreateRoomButton() {
when (roomListParams.displayMode) {
RoomListDisplayMode.HOME -> createChatFabMenu.isVisible = true

View file

@ -30,8 +30,8 @@ import com.google.i18n.phonenumbers.PhoneNumberUtil
import com.jakewharton.rxbinding3.widget.textChanges
import im.vector.matrix.android.api.auth.registration.RegisterThreePid
import im.vector.matrix.android.api.failure.Failure
import im.vector.matrix.android.api.failure.is401
import im.vector.riotx.R
import im.vector.riotx.core.error.is401
import im.vector.riotx.core.extensions.hideKeyboard
import im.vector.riotx.core.extensions.isEmail
import im.vector.riotx.core.extensions.setTextOrHide

View file

@ -20,8 +20,8 @@ import androidx.appcompat.app.AlertDialog
import butterknife.OnClick
import com.airbnb.mvrx.Fail
import com.airbnb.mvrx.Success
import im.vector.matrix.android.api.failure.is401
import im.vector.riotx.R
import im.vector.riotx.core.error.is401
import kotlinx.android.synthetic.main.fragment_login_reset_password_mail_confirmation.*
import javax.inject.Inject

View file

@ -16,8 +16,15 @@
package im.vector.riotx.features.login
import android.content.Context
import androidx.fragment.app.FragmentActivity
import com.airbnb.mvrx.*
import com.airbnb.mvrx.ActivityViewModelContext
import com.airbnb.mvrx.Fail
import com.airbnb.mvrx.Loading
import com.airbnb.mvrx.MvRxViewModelFactory
import com.airbnb.mvrx.Success
import com.airbnb.mvrx.Uninitialized
import com.airbnb.mvrx.ViewModelContext
import com.squareup.inject.assisted.Assisted
import com.squareup.inject.assisted.AssistedInject
import im.vector.matrix.android.api.MatrixCallback
@ -46,6 +53,7 @@ import java.util.concurrent.CancellationException
*
*/
class LoginViewModel @AssistedInject constructor(@Assisted initialState: LoginViewState,
private val applicationContext: Context,
private val authenticationService: AuthenticationService,
private val activeSessionHolder: ActiveSessionHolder,
private val pushRuleTriggerListener: PushRuleTriggerListener,
@ -486,7 +494,7 @@ class LoginViewModel @AssistedInject constructor(@Assisted initialState: LoginVi
private fun onSessionCreated(session: Session) {
activeSessionHolder.setActiveSession(session)
session.configureAndStart(pushRuleTriggerListener, sessionListener)
session.configureAndStart(applicationContext, pushRuleTriggerListener, sessionListener)
setState {
copy(
asyncLoginAction = Success(Unit)

View file

@ -20,8 +20,8 @@ import android.os.Bundle
import android.os.Parcelable
import android.view.View
import com.airbnb.mvrx.args
import im.vector.matrix.android.api.failure.is401
import im.vector.riotx.R
import im.vector.riotx.core.error.is401
import kotlinx.android.parcel.Parcelize
import kotlinx.android.synthetic.main.fragment_login_wait_for_email.*
import javax.inject.Inject

View file

@ -3,5 +3,6 @@
<!-- Strings not defined in Riot -->
<string name="notification_initial_sync">Initial Sync…</string>
</resources>