mirror of
https://github.com/SchildiChat/SchildiChat-android.git
synced 2025-02-18 13:00:18 +03:00
Merge pull request #2963 from vector-im/feature/bma/init_sync_noRR
Improve init sync performance
This commit is contained in:
commit
ed963d86f9
15 changed files with 114 additions and 83 deletions
|
@ -7,7 +7,7 @@ Features ✨:
|
||||||
Improvements 🙌:
|
Improvements 🙌:
|
||||||
- Allow non-HTTPS connections to homeservers on Tor (#2941)
|
- Allow non-HTTPS connections to homeservers on Tor (#2941)
|
||||||
- Fetch homeserver type and version and display in a new setting screen and add info in rageshakes (#2831)
|
- Fetch homeserver type and version and display in a new setting screen and add info in rageshakes (#2831)
|
||||||
- Improve initial sync performance (#983)
|
- Improve initial sync performance - split into 2 transactions (#983)
|
||||||
- PIP support for Jitsi call (#2418)
|
- PIP support for Jitsi call (#2418)
|
||||||
- Add tooltip for room quick actions
|
- Add tooltip for room quick actions
|
||||||
- Pre-share session keys when opening a room or start typing (#2771)
|
- Pre-share session keys when opening a room or start typing (#2771)
|
||||||
|
|
|
@ -22,9 +22,9 @@ interface FilterService {
|
||||||
NoFilter,
|
NoFilter,
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Filter for Riot, will include only known event type
|
* Filter for Element, will include only known event type
|
||||||
*/
|
*/
|
||||||
RiotFilter
|
ElementFilter
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -36,7 +36,7 @@ import org.matrix.android.sdk.internal.network.parsing.ForceToBooleanJsonAdapter
|
||||||
import org.matrix.android.sdk.internal.network.parsing.RuntimeJsonAdapterFactory
|
import org.matrix.android.sdk.internal.network.parsing.RuntimeJsonAdapterFactory
|
||||||
import org.matrix.android.sdk.internal.network.parsing.TlsVersionMoshiAdapter
|
import org.matrix.android.sdk.internal.network.parsing.TlsVersionMoshiAdapter
|
||||||
import org.matrix.android.sdk.internal.network.parsing.UriMoshiAdapter
|
import org.matrix.android.sdk.internal.network.parsing.UriMoshiAdapter
|
||||||
import org.matrix.android.sdk.internal.session.sync.parsing.DefaultLazyRoomSyncJsonAdapter
|
import org.matrix.android.sdk.internal.session.sync.parsing.DefaultLazyRoomSyncEphemeralJsonAdapter
|
||||||
|
|
||||||
object MoshiProvider {
|
object MoshiProvider {
|
||||||
|
|
||||||
|
@ -46,7 +46,7 @@ object MoshiProvider {
|
||||||
.add(CipherSuiteMoshiAdapter())
|
.add(CipherSuiteMoshiAdapter())
|
||||||
.add(TlsVersionMoshiAdapter())
|
.add(TlsVersionMoshiAdapter())
|
||||||
// Use addLast here so we can inject a SplitLazyRoomSyncJsonAdapter later to override the default parsing.
|
// Use addLast here so we can inject a SplitLazyRoomSyncJsonAdapter later to override the default parsing.
|
||||||
.addLast(DefaultLazyRoomSyncJsonAdapter())
|
.addLast(DefaultLazyRoomSyncEphemeralJsonAdapter())
|
||||||
.add(RuntimeJsonAdapterFactory.of(MessageContent::class.java, "msgtype", MessageDefaultContent::class.java)
|
.add(RuntimeJsonAdapterFactory.of(MessageContent::class.java, "msgtype", MessageDefaultContent::class.java)
|
||||||
.registerSubtype(MessageTextContent::class.java, MessageType.MSGTYPE_TEXT)
|
.registerSubtype(MessageTextContent::class.java, MessageType.MSGTYPE_TEXT)
|
||||||
.registerSubtype(MessageNoticeContent::class.java, MessageType.MSGTYPE_NOTICE)
|
.registerSubtype(MessageNoticeContent::class.java, MessageType.MSGTYPE_NOTICE)
|
||||||
|
|
|
@ -33,11 +33,11 @@ internal object FilterFactory {
|
||||||
return FilterUtil.enableLazyLoading(Filter(), true)
|
return FilterUtil.enableLazyLoading(Filter(), true)
|
||||||
}
|
}
|
||||||
|
|
||||||
fun createRiotFilter(): Filter {
|
fun createElementFilter(): Filter {
|
||||||
return Filter(
|
return Filter(
|
||||||
room = RoomFilter(
|
room = RoomFilter(
|
||||||
timeline = createRiotTimelineFilter(),
|
timeline = createElementTimelineFilter(),
|
||||||
state = createRiotStateFilter()
|
state = createElementStateFilter()
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
@ -48,7 +48,7 @@ internal object FilterFactory {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
fun createRiotRoomFilter(): RoomEventFilter {
|
fun createElementRoomFilter(): RoomEventFilter {
|
||||||
return RoomEventFilter(
|
return RoomEventFilter(
|
||||||
lazyLoadMembers = true
|
lazyLoadMembers = true
|
||||||
// TODO Enable this for optimization
|
// TODO Enable this for optimization
|
||||||
|
@ -56,26 +56,26 @@ internal object FilterFactory {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun createRiotTimelineFilter(): RoomEventFilter {
|
private fun createElementTimelineFilter(): RoomEventFilter? {
|
||||||
return RoomEventFilter().apply {
|
return null // RoomEventFilter().apply {
|
||||||
// TODO Enable this for optimization
|
// TODO Enable this for optimization
|
||||||
// types = listOfSupportedEventTypes.toMutableList()
|
// types = listOfSupportedEventTypes.toMutableList()
|
||||||
}
|
// }
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun createRiotStateFilter(): RoomEventFilter {
|
private fun createElementStateFilter(): RoomEventFilter {
|
||||||
return RoomEventFilter(
|
return RoomEventFilter(
|
||||||
lazyLoadMembers = true
|
lazyLoadMembers = true
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get only managed types by Riot
|
// Get only managed types by Element
|
||||||
private val listOfSupportedEventTypes = listOf(
|
private val listOfSupportedEventTypes = listOf(
|
||||||
// TODO Complete the list
|
// TODO Complete the list
|
||||||
EventType.MESSAGE
|
EventType.MESSAGE
|
||||||
)
|
)
|
||||||
|
|
||||||
// Get only managed types by Riot
|
// Get only managed types by Element
|
||||||
private val listOfSupportedStateEventTypes = listOf(
|
private val listOfSupportedStateEventTypes = listOf(
|
||||||
// TODO Complete the list
|
// TODO Complete the list
|
||||||
EventType.STATE_ROOM_MEMBER
|
EventType.STATE_ROOM_MEMBER
|
||||||
|
|
|
@ -42,18 +42,18 @@ internal class DefaultSaveFilterTask @Inject constructor(
|
||||||
|
|
||||||
override suspend fun execute(params: SaveFilterTask.Params) {
|
override suspend fun execute(params: SaveFilterTask.Params) {
|
||||||
val filterBody = when (params.filterPreset) {
|
val filterBody = when (params.filterPreset) {
|
||||||
FilterService.FilterPreset.RiotFilter -> {
|
FilterService.FilterPreset.ElementFilter -> {
|
||||||
FilterFactory.createRiotFilter()
|
FilterFactory.createElementFilter()
|
||||||
}
|
}
|
||||||
FilterService.FilterPreset.NoFilter -> {
|
FilterService.FilterPreset.NoFilter -> {
|
||||||
FilterFactory.createDefaultFilter()
|
FilterFactory.createDefaultFilter()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
val roomFilter = when (params.filterPreset) {
|
val roomFilter = when (params.filterPreset) {
|
||||||
FilterService.FilterPreset.RiotFilter -> {
|
FilterService.FilterPreset.ElementFilter -> {
|
||||||
FilterFactory.createRiotRoomFilter()
|
FilterFactory.createElementRoomFilter()
|
||||||
}
|
}
|
||||||
FilterService.FilterPreset.NoFilter -> {
|
FilterService.FilterPreset.NoFilter -> {
|
||||||
FilterFactory.createDefaultRoomFilter()
|
FilterFactory.createDefaultRoomFilter()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -50,7 +50,7 @@ internal class DefaultProcessEventForPushTask @Inject constructor(
|
||||||
}
|
}
|
||||||
val newJoinEvents = params.syncResponse.join
|
val newJoinEvents = params.syncResponse.join
|
||||||
.mapNotNull { (key, value) ->
|
.mapNotNull { (key, value) ->
|
||||||
value.roomSync.timeline?.events?.map { it.copy(roomId = key) }
|
value.timeline?.events?.map { it.copy(roomId = key) }
|
||||||
}
|
}
|
||||||
.flatten()
|
.flatten()
|
||||||
val inviteEvents = params.syncResponse.invite
|
val inviteEvents = params.syncResponse.invite
|
||||||
|
@ -80,7 +80,7 @@ internal class DefaultProcessEventForPushTask @Inject constructor(
|
||||||
|
|
||||||
val allRedactedEvents = params.syncResponse.join
|
val allRedactedEvents = params.syncResponse.join
|
||||||
.asSequence()
|
.asSequence()
|
||||||
.mapNotNull { (_, value) -> value.roomSync.timeline?.events }
|
.mapNotNull { it.value.timeline?.events }
|
||||||
.flatten()
|
.flatten()
|
||||||
.filter { it.type == EventType.REDACTION }
|
.filter { it.type == EventType.REDACTION }
|
||||||
.mapNotNull { it.redacts }
|
.mapNotNull { it.redacts }
|
||||||
|
|
|
@ -41,10 +41,10 @@ sealed class InitialSyncStrategy {
|
||||||
*/
|
*/
|
||||||
val minSizeToSplit: Long = 1024 * 1024,
|
val minSizeToSplit: Long = 1024 * 1024,
|
||||||
/**
|
/**
|
||||||
* Limit per room to reach to decide to store a join room into a file
|
* Limit per room to reach to decide to store a join room ephemeral Events into a file
|
||||||
* Empiric value: 10 kilobytes
|
* Empiric value: 6 kilobytes
|
||||||
*/
|
*/
|
||||||
val minSizeToStoreInFile: Long = 10 * 1024,
|
val minSizeToStoreInFile: Long = 6 * 1024,
|
||||||
/**
|
/**
|
||||||
* Max number of rooms to insert at a time in database (to avoid too much RAM usage)
|
* Max number of rooms to insert at a time in database (to avoid too much RAM usage)
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -60,10 +60,8 @@ import org.matrix.android.sdk.internal.session.room.timeline.PaginationDirection
|
||||||
import org.matrix.android.sdk.internal.session.room.timeline.TimelineInput
|
import org.matrix.android.sdk.internal.session.room.timeline.TimelineInput
|
||||||
import org.matrix.android.sdk.internal.session.room.typing.TypingEventContent
|
import org.matrix.android.sdk.internal.session.room.typing.TypingEventContent
|
||||||
import org.matrix.android.sdk.internal.session.sync.model.InvitedRoomSync
|
import org.matrix.android.sdk.internal.session.sync.model.InvitedRoomSync
|
||||||
import org.matrix.android.sdk.internal.session.sync.model.LazyRoomSync
|
|
||||||
import org.matrix.android.sdk.internal.session.sync.model.RoomSync
|
import org.matrix.android.sdk.internal.session.sync.model.RoomSync
|
||||||
import org.matrix.android.sdk.internal.session.sync.model.RoomSyncAccountData
|
import org.matrix.android.sdk.internal.session.sync.model.RoomSyncAccountData
|
||||||
import org.matrix.android.sdk.internal.session.sync.model.RoomSyncEphemeral
|
|
||||||
import org.matrix.android.sdk.internal.session.sync.model.RoomsSyncResponse
|
import org.matrix.android.sdk.internal.session.sync.model.RoomsSyncResponse
|
||||||
import timber.log.Timber
|
import timber.log.Timber
|
||||||
import javax.inject.Inject
|
import javax.inject.Inject
|
||||||
|
@ -81,7 +79,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
|
||||||
private val timelineInput: TimelineInput) {
|
private val timelineInput: TimelineInput) {
|
||||||
|
|
||||||
sealed class HandlingStrategy {
|
sealed class HandlingStrategy {
|
||||||
data class JOINED(val data: Map<String, LazyRoomSync>) : HandlingStrategy()
|
data class JOINED(val data: Map<String, RoomSync>) : HandlingStrategy()
|
||||||
data class INVITED(val data: Map<String, InvitedRoomSync>) : HandlingStrategy()
|
data class INVITED(val data: Map<String, InvitedRoomSync>) : HandlingStrategy()
|
||||||
data class LEFT(val data: Map<String, RoomSync>) : HandlingStrategy()
|
data class LEFT(val data: Map<String, RoomSync>) : HandlingStrategy()
|
||||||
}
|
}
|
||||||
|
@ -96,6 +94,19 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
|
||||||
handleRoomSync(realm, HandlingStrategy.LEFT(roomsSyncResponse.leave), isInitialSync, reporter)
|
handleRoomSync(realm, HandlingStrategy.LEFT(roomsSyncResponse.leave), isInitialSync, reporter)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fun handleInitSyncEphemeral(realm: Realm,
|
||||||
|
roomsSyncResponse: RoomsSyncResponse) {
|
||||||
|
roomsSyncResponse.join.forEach { roomSync ->
|
||||||
|
val ephemeralResult = roomSync.value.ephemeral
|
||||||
|
?.roomSyncEphemeral
|
||||||
|
?.events
|
||||||
|
?.takeIf { it.isNotEmpty() }
|
||||||
|
?.let { events -> handleEphemeral(realm, roomSync.key, events, true) }
|
||||||
|
|
||||||
|
roomTypingUsersHandler.handle(realm, roomSync.key, ephemeralResult)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// PRIVATE METHODS *****************************************************************************
|
// PRIVATE METHODS *****************************************************************************
|
||||||
|
|
||||||
private fun handleRoomSync(realm: Realm, handlingStrategy: HandlingStrategy, isInitialSync: Boolean, reporter: ProgressReporter?) {
|
private fun handleRoomSync(realm: Realm, handlingStrategy: HandlingStrategy, isInitialSync: Boolean, reporter: ProgressReporter?) {
|
||||||
|
@ -108,12 +119,12 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
|
||||||
val rooms = when (handlingStrategy) {
|
val rooms = when (handlingStrategy) {
|
||||||
is HandlingStrategy.JOINED -> {
|
is HandlingStrategy.JOINED -> {
|
||||||
if (isInitialSync && initialSyncStrategy is InitialSyncStrategy.Optimized) {
|
if (isInitialSync && initialSyncStrategy is InitialSyncStrategy.Optimized) {
|
||||||
insertJoinRooms(realm, handlingStrategy, insertType, syncLocalTimeStampMillis, reporter)
|
insertJoinRoomsFromInitSync(realm, handlingStrategy, syncLocalTimeStampMillis, reporter)
|
||||||
// Rooms are already inserted, return an empty list
|
// Rooms are already inserted, return an empty list
|
||||||
emptyList()
|
emptyList()
|
||||||
} else {
|
} else {
|
||||||
handlingStrategy.data.mapWithProgress(reporter, InitSyncStep.ImportingAccountJoinedRooms, 0.6f) {
|
handlingStrategy.data.mapWithProgress(reporter, InitSyncStep.ImportingAccountJoinedRooms, 0.6f) {
|
||||||
handleJoinedRoom(realm, it.key, it.value.roomSync, insertType, syncLocalTimeStampMillis)
|
handleJoinedRoom(realm, it.key, it.value, true, insertType, syncLocalTimeStampMillis)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -131,11 +142,10 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
|
||||||
realm.insertOrUpdate(rooms)
|
realm.insertOrUpdate(rooms)
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun insertJoinRooms(realm: Realm,
|
private fun insertJoinRoomsFromInitSync(realm: Realm,
|
||||||
handlingStrategy: HandlingStrategy.JOINED,
|
handlingStrategy: HandlingStrategy.JOINED,
|
||||||
insertType: EventInsertType,
|
syncLocalTimeStampMillis: Long,
|
||||||
syncLocalTimeStampMillis: Long,
|
reporter: ProgressReporter?) {
|
||||||
reporter: ProgressReporter?) {
|
|
||||||
val maxSize = (initialSyncStrategy as? InitialSyncStrategy.Optimized)?.maxRoomsToInsert ?: Int.MAX_VALUE
|
val maxSize = (initialSyncStrategy as? InitialSyncStrategy.Optimized)?.maxRoomsToInsert ?: Int.MAX_VALUE
|
||||||
val listSize = handlingStrategy.data.keys.size
|
val listSize = handlingStrategy.data.keys.size
|
||||||
val numberOfChunks = ceil(listSize / maxSize.toDouble()).toInt()
|
val numberOfChunks = ceil(listSize / maxSize.toDouble()).toInt()
|
||||||
|
@ -152,11 +162,12 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
|
||||||
.also { Timber.v("INIT_SYNC insert ${roomIds.size} rooms") }
|
.also { Timber.v("INIT_SYNC insert ${roomIds.size} rooms") }
|
||||||
.map {
|
.map {
|
||||||
handleJoinedRoom(
|
handleJoinedRoom(
|
||||||
realm,
|
realm = realm,
|
||||||
it,
|
roomId = it,
|
||||||
(handlingStrategy.data[it] ?: error("Should not happen")).roomSync,
|
roomSync = handlingStrategy.data[it] ?: error("Should not happen"),
|
||||||
insertType,
|
handleEphemeralEvents = false,
|
||||||
syncLocalTimeStampMillis
|
insertType = EventInsertType.INITIAL_SYNC,
|
||||||
|
syncLocalTimestampMillis = syncLocalTimeStampMillis
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
realm.insertOrUpdate(roomEntities)
|
realm.insertOrUpdate(roomEntities)
|
||||||
|
@ -166,7 +177,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
|
||||||
} else {
|
} else {
|
||||||
// No need to split
|
// No need to split
|
||||||
val rooms = handlingStrategy.data.mapWithProgress(reporter, InitSyncStep.ImportingAccountJoinedRooms, 0.6f) {
|
val rooms = handlingStrategy.data.mapWithProgress(reporter, InitSyncStep.ImportingAccountJoinedRooms, 0.6f) {
|
||||||
handleJoinedRoom(realm, it.key, it.value.roomSync, insertType, syncLocalTimeStampMillis)
|
handleJoinedRoom(realm, it.key, it.value, false, EventInsertType.INITIAL_SYNC, syncLocalTimeStampMillis)
|
||||||
}
|
}
|
||||||
realm.insertOrUpdate(rooms)
|
realm.insertOrUpdate(rooms)
|
||||||
}
|
}
|
||||||
|
@ -175,13 +186,16 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
|
||||||
private fun handleJoinedRoom(realm: Realm,
|
private fun handleJoinedRoom(realm: Realm,
|
||||||
roomId: String,
|
roomId: String,
|
||||||
roomSync: RoomSync,
|
roomSync: RoomSync,
|
||||||
|
handleEphemeralEvents: Boolean,
|
||||||
insertType: EventInsertType,
|
insertType: EventInsertType,
|
||||||
syncLocalTimestampMillis: Long): RoomEntity {
|
syncLocalTimestampMillis: Long): RoomEntity {
|
||||||
Timber.v("Handle join sync for room $roomId")
|
Timber.v("Handle join sync for room $roomId")
|
||||||
|
|
||||||
var ephemeralResult: EphemeralResult? = null
|
var ephemeralResult: EphemeralResult? = null
|
||||||
if (roomSync.ephemeral?.events?.isNotEmpty() == true) {
|
if (handleEphemeralEvents) {
|
||||||
ephemeralResult = handleEphemeral(realm, roomId, roomSync.ephemeral, insertType == EventInsertType.INITIAL_SYNC)
|
ephemeralResult = roomSync.ephemeral?.roomSyncEphemeral?.events
|
||||||
|
?.takeIf { it.isNotEmpty() }
|
||||||
|
?.let { handleEphemeral(realm, roomId, it, insertType == EventInsertType.INITIAL_SYNC) }
|
||||||
}
|
}
|
||||||
|
|
||||||
if (roomSync.accountData?.events?.isNotEmpty() == true) {
|
if (roomSync.accountData?.events?.isNotEmpty() == true) {
|
||||||
|
@ -421,10 +435,10 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
|
||||||
|
|
||||||
private fun handleEphemeral(realm: Realm,
|
private fun handleEphemeral(realm: Realm,
|
||||||
roomId: String,
|
roomId: String,
|
||||||
ephemeral: RoomSyncEphemeral,
|
ephemeralEvents: List<Event>,
|
||||||
isInitialSync: Boolean): EphemeralResult {
|
isInitialSync: Boolean): EphemeralResult {
|
||||||
var result = EphemeralResult()
|
var result = EphemeralResult()
|
||||||
for (event in ephemeral.events) {
|
for (event in ephemeralEvents) {
|
||||||
when (event.type) {
|
when (event.type) {
|
||||||
EventType.RECEIPT -> {
|
EventType.RECEIPT -> {
|
||||||
@Suppress("UNCHECKED_CAST")
|
@Suppress("UNCHECKED_CAST")
|
||||||
|
|
|
@ -25,10 +25,10 @@ import org.matrix.android.sdk.internal.crypto.DefaultCryptoService
|
||||||
import org.matrix.android.sdk.internal.di.SessionDatabase
|
import org.matrix.android.sdk.internal.di.SessionDatabase
|
||||||
import org.matrix.android.sdk.internal.di.SessionId
|
import org.matrix.android.sdk.internal.di.SessionId
|
||||||
import org.matrix.android.sdk.internal.di.WorkManagerProvider
|
import org.matrix.android.sdk.internal.di.WorkManagerProvider
|
||||||
import org.matrix.android.sdk.internal.session.initsync.ProgressReporter
|
|
||||||
import org.matrix.android.sdk.internal.session.group.GetGroupDataWorker
|
import org.matrix.android.sdk.internal.session.group.GetGroupDataWorker
|
||||||
import org.matrix.android.sdk.internal.session.notification.ProcessEventForPushTask
|
import org.matrix.android.sdk.internal.session.initsync.ProgressReporter
|
||||||
import org.matrix.android.sdk.internal.session.initsync.reportSubtask
|
import org.matrix.android.sdk.internal.session.initsync.reportSubtask
|
||||||
|
import org.matrix.android.sdk.internal.session.notification.ProcessEventForPushTask
|
||||||
import org.matrix.android.sdk.internal.session.sync.model.GroupsSyncResponse
|
import org.matrix.android.sdk.internal.session.sync.model.GroupsSyncResponse
|
||||||
import org.matrix.android.sdk.internal.session.sync.model.RoomsSyncResponse
|
import org.matrix.android.sdk.internal.session.sync.model.RoomsSyncResponse
|
||||||
import org.matrix.android.sdk.internal.session.sync.model.SyncResponse
|
import org.matrix.android.sdk.internal.session.sync.model.SyncResponse
|
||||||
|
@ -128,6 +128,15 @@ internal class SyncResponseHandler @Inject constructor(@SessionDatabase private
|
||||||
cryptoSyncHandler.onSyncCompleted(syncResponse)
|
cryptoSyncHandler.onSyncCompleted(syncResponse)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
suspend fun handleInitSyncSecondTransaction(syncResponse: SyncResponse) {
|
||||||
|
// Start another transaction to handle the ephemeral events
|
||||||
|
monarchy.awaitTransaction { realm ->
|
||||||
|
if (syncResponse.rooms != null) {
|
||||||
|
roomSyncHandler.handleInitSyncEphemeral(realm, syncResponse.rooms)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* At the moment we don't get any group data through the sync, so we poll where every hour.
|
* At the moment we don't get any group data through the sync, so we poll where every hour.
|
||||||
* You can also force to refetch group data using [Group] API.
|
* You can also force to refetch group data using [Group] API.
|
||||||
|
|
|
@ -28,7 +28,7 @@ import org.matrix.android.sdk.internal.session.filter.FilterRepository
|
||||||
import org.matrix.android.sdk.internal.session.homeserver.GetHomeServerCapabilitiesTask
|
import org.matrix.android.sdk.internal.session.homeserver.GetHomeServerCapabilitiesTask
|
||||||
import org.matrix.android.sdk.internal.session.initsync.DefaultInitialSyncProgressService
|
import org.matrix.android.sdk.internal.session.initsync.DefaultInitialSyncProgressService
|
||||||
import org.matrix.android.sdk.internal.session.initsync.reportSubtask
|
import org.matrix.android.sdk.internal.session.initsync.reportSubtask
|
||||||
import org.matrix.android.sdk.internal.session.sync.model.LazyRoomSync
|
import org.matrix.android.sdk.internal.session.sync.model.LazyRoomSyncEphemeral
|
||||||
import org.matrix.android.sdk.internal.session.sync.model.SyncResponse
|
import org.matrix.android.sdk.internal.session.sync.model.SyncResponse
|
||||||
import org.matrix.android.sdk.internal.session.sync.parsing.InitialSyncResponseParser
|
import org.matrix.android.sdk.internal.session.sync.parsing.InitialSyncResponseParser
|
||||||
import org.matrix.android.sdk.internal.session.user.UserStore
|
import org.matrix.android.sdk.internal.session.user.UserStore
|
||||||
|
@ -98,10 +98,15 @@ internal class DefaultSyncTask @Inject constructor(
|
||||||
val readTimeOut = (params.timeout + TIMEOUT_MARGIN).coerceAtLeast(TimeOutInterceptor.DEFAULT_LONG_TIMEOUT)
|
val readTimeOut = (params.timeout + TIMEOUT_MARGIN).coerceAtLeast(TimeOutInterceptor.DEFAULT_LONG_TIMEOUT)
|
||||||
|
|
||||||
if (isInitialSync) {
|
if (isInitialSync) {
|
||||||
|
Timber.v("INIT_SYNC with filter: ${requestParams["filter"]}")
|
||||||
val initSyncStrategy = initialSyncStrategy
|
val initSyncStrategy = initialSyncStrategy
|
||||||
|
var syncResp: SyncResponse? = null
|
||||||
logDuration("INIT_SYNC strategy: $initSyncStrategy") {
|
logDuration("INIT_SYNC strategy: $initSyncStrategy") {
|
||||||
if (initSyncStrategy is InitialSyncStrategy.Optimized) {
|
if (initSyncStrategy is InitialSyncStrategy.Optimized) {
|
||||||
safeInitialSync(requestParams, initSyncStrategy)
|
val file = downloadInitSyncResponse(requestParams)
|
||||||
|
syncResp = reportSubtask(initialSyncProgressService, InitSyncStep.ImportingAccount, 1, 0.7F) {
|
||||||
|
handleSyncFile(file, initSyncStrategy)
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
val syncResponse = logDuration("INIT_SYNC Request") {
|
val syncResponse = logDuration("INIT_SYNC Request") {
|
||||||
executeRequest<SyncResponse>(globalErrorReceiver) {
|
executeRequest<SyncResponse>(globalErrorReceiver) {
|
||||||
|
@ -118,6 +123,15 @@ internal class DefaultSyncTask @Inject constructor(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
initialSyncProgressService.endAll()
|
initialSyncProgressService.endAll()
|
||||||
|
|
||||||
|
if (initSyncStrategy is InitialSyncStrategy.Optimized) {
|
||||||
|
logDuration("INIT_SYNC Handle ephemeral") {
|
||||||
|
syncResponseHandler.handleInitSyncSecondTransaction(syncResp!!)
|
||||||
|
}
|
||||||
|
initialSyncStatusRepository.setStep(InitialSyncStatus.STEP_SUCCESS)
|
||||||
|
// Delete all files
|
||||||
|
workingDir.deleteRecursively()
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
val syncResponse = executeRequest<SyncResponse>(globalErrorReceiver) {
|
val syncResponse = executeRequest<SyncResponse>(globalErrorReceiver) {
|
||||||
apiCall = syncAPI.sync(
|
apiCall = syncAPI.sync(
|
||||||
|
@ -130,12 +144,11 @@ internal class DefaultSyncTask @Inject constructor(
|
||||||
Timber.v("Sync task finished on Thread: ${Thread.currentThread().name}")
|
Timber.v("Sync task finished on Thread: ${Thread.currentThread().name}")
|
||||||
}
|
}
|
||||||
|
|
||||||
private suspend fun safeInitialSync(requestParams: Map<String, String>, initSyncStrategy: InitialSyncStrategy.Optimized) {
|
private suspend fun downloadInitSyncResponse(requestParams: Map<String, String>): File {
|
||||||
workingDir.mkdirs()
|
workingDir.mkdirs()
|
||||||
val workingFile = File(workingDir, "initSync.json")
|
val workingFile = File(workingDir, "initSync.json")
|
||||||
val status = initialSyncStatusRepository.getStep()
|
val status = initialSyncStatusRepository.getStep()
|
||||||
if (workingFile.exists() && status >= InitialSyncStatus.STEP_DOWNLOADED) {
|
if (workingFile.exists() && status >= InitialSyncStatus.STEP_DOWNLOADED) {
|
||||||
// Go directly to the parse step
|
|
||||||
Timber.v("INIT_SYNC file is already here")
|
Timber.v("INIT_SYNC file is already here")
|
||||||
reportSubtask(initialSyncProgressService, InitSyncStep.Downloading, 1, 0.3f) {
|
reportSubtask(initialSyncProgressService, InitSyncStep.Downloading, 1, 0.3f) {
|
||||||
// Empty task
|
// Empty task
|
||||||
|
@ -164,12 +177,7 @@ internal class DefaultSyncTask @Inject constructor(
|
||||||
}
|
}
|
||||||
initialSyncStatusRepository.setStep(InitialSyncStatus.STEP_DOWNLOADED)
|
initialSyncStatusRepository.setStep(InitialSyncStatus.STEP_DOWNLOADED)
|
||||||
}
|
}
|
||||||
reportSubtask(initialSyncProgressService, InitSyncStep.ImportingAccount, 1, 0.7F) {
|
return workingFile
|
||||||
handleSyncFile(workingFile, initSyncStrategy)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Delete all files
|
|
||||||
workingDir.deleteRecursively()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private suspend fun getSyncResponse(requestParams: Map<String, String>, maxNumberOfRetries: Int): Response<ResponseBody> {
|
private suspend fun getSyncResponse(requestParams: Map<String, String>, maxNumberOfRetries: Int): Response<ResponseBody> {
|
||||||
|
@ -191,21 +199,21 @@ internal class DefaultSyncTask @Inject constructor(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private suspend fun handleSyncFile(workingFile: File, initSyncStrategy: InitialSyncStrategy.Optimized) {
|
private suspend fun handleSyncFile(workingFile: File, initSyncStrategy: InitialSyncStrategy.Optimized): SyncResponse {
|
||||||
logDuration("INIT_SYNC handleSyncFile()") {
|
return logDuration("INIT_SYNC handleSyncFile()") {
|
||||||
val syncResponse = logDuration("INIT_SYNC Read file and parse") {
|
val syncResponse = logDuration("INIT_SYNC Read file and parse") {
|
||||||
syncResponseParser.parse(initSyncStrategy, workingFile)
|
syncResponseParser.parse(initSyncStrategy, workingFile)
|
||||||
}
|
}
|
||||||
initialSyncStatusRepository.setStep(InitialSyncStatus.STEP_PARSED)
|
initialSyncStatusRepository.setStep(InitialSyncStatus.STEP_PARSED)
|
||||||
// Log some stats
|
// Log some stats
|
||||||
val nbOfJoinedRooms = syncResponse.rooms?.join?.size ?: 0
|
val nbOfJoinedRooms = syncResponse.rooms?.join?.size ?: 0
|
||||||
val nbOfJoinedRoomsInFile = syncResponse.rooms?.join?.values?.count { it is LazyRoomSync.Stored }
|
val nbOfJoinedRoomsInFile = syncResponse.rooms?.join?.values?.count { it.ephemeral is LazyRoomSyncEphemeral.Stored }
|
||||||
Timber.v("INIT_SYNC $nbOfJoinedRooms rooms, $nbOfJoinedRoomsInFile stored into files")
|
Timber.v("INIT_SYNC $nbOfJoinedRooms rooms, $nbOfJoinedRoomsInFile ephemeral stored into files")
|
||||||
|
|
||||||
logDuration("INIT_SYNC Database insertion") {
|
logDuration("INIT_SYNC Database insertion") {
|
||||||
syncResponseHandler.handleResponse(syncResponse, null, initialSyncProgressService)
|
syncResponseHandler.handleResponse(syncResponse, null, initialSyncProgressService)
|
||||||
}
|
}
|
||||||
initialSyncStatusRepository.setStep(InitialSyncStatus.STEP_SUCCESS)
|
syncResponse
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -24,18 +24,18 @@ import okio.source
|
||||||
import java.io.File
|
import java.io.File
|
||||||
|
|
||||||
@JsonClass(generateAdapter = false)
|
@JsonClass(generateAdapter = false)
|
||||||
internal sealed class LazyRoomSync {
|
internal sealed class LazyRoomSyncEphemeral {
|
||||||
data class Parsed(val _roomSync: RoomSync) : LazyRoomSync()
|
data class Parsed(val _roomSyncEphemeral: RoomSyncEphemeral) : LazyRoomSyncEphemeral()
|
||||||
data class Stored(val roomSyncAdapter: JsonAdapter<RoomSync>, val file: File) : LazyRoomSync()
|
data class Stored(val roomSyncEphemeralAdapter: JsonAdapter<RoomSyncEphemeral>, val file: File) : LazyRoomSyncEphemeral()
|
||||||
|
|
||||||
val roomSync: RoomSync
|
val roomSyncEphemeral: RoomSyncEphemeral
|
||||||
get() {
|
get() {
|
||||||
return when (this) {
|
return when (this) {
|
||||||
is Parsed -> _roomSync
|
is Parsed -> _roomSyncEphemeral
|
||||||
is Stored -> {
|
is Stored -> {
|
||||||
// Parse the file now
|
// Parse the file now
|
||||||
file.inputStream().use { pos ->
|
file.inputStream().use { pos ->
|
||||||
roomSyncAdapter.fromJson(JsonReader.of(pos.source().buffer()))!!
|
roomSyncEphemeralAdapter.fromJson(JsonReader.of(pos.source().buffer()))!!
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -34,7 +34,7 @@ internal data class RoomSync(
|
||||||
/**
|
/**
|
||||||
* The ephemeral events in the room that aren't recorded in the timeline or state of the room (e.g. typing, receipts).
|
* The ephemeral events in the room that aren't recorded in the timeline or state of the room (e.g. typing, receipts).
|
||||||
*/
|
*/
|
||||||
@Json(name = "ephemeral") val ephemeral: RoomSyncEphemeral? = null,
|
@Json(name = "ephemeral") val ephemeral: LazyRoomSyncEphemeral? = null,
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The account data events for the room (e.g. tags).
|
* The account data events for the room (e.g. tags).
|
||||||
|
|
|
@ -24,7 +24,7 @@ internal data class RoomsSyncResponse(
|
||||||
/**
|
/**
|
||||||
* Joined rooms: keys are rooms ids.
|
* Joined rooms: keys are rooms ids.
|
||||||
*/
|
*/
|
||||||
@Json(name = "join") val join: Map<String, LazyRoomSync> = emptyMap(),
|
@Json(name = "join") val join: Map<String, RoomSync> = emptyMap(),
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Invitations. The rooms that the user has been invited to: keys are rooms ids.
|
* Invitations. The rooms that the user has been invited to: keys are rooms ids.
|
||||||
|
|
|
@ -22,22 +22,22 @@ import com.squareup.moshi.JsonReader
|
||||||
import com.squareup.moshi.JsonWriter
|
import com.squareup.moshi.JsonWriter
|
||||||
import com.squareup.moshi.ToJson
|
import com.squareup.moshi.ToJson
|
||||||
import org.matrix.android.sdk.internal.session.sync.InitialSyncStrategy
|
import org.matrix.android.sdk.internal.session.sync.InitialSyncStrategy
|
||||||
import org.matrix.android.sdk.internal.session.sync.model.LazyRoomSync
|
import org.matrix.android.sdk.internal.session.sync.model.LazyRoomSyncEphemeral
|
||||||
import org.matrix.android.sdk.internal.session.sync.model.RoomSync
|
import org.matrix.android.sdk.internal.session.sync.model.RoomSyncEphemeral
|
||||||
import timber.log.Timber
|
import timber.log.Timber
|
||||||
import java.io.File
|
import java.io.File
|
||||||
import java.util.concurrent.atomic.AtomicInteger
|
import java.util.concurrent.atomic.AtomicInteger
|
||||||
|
|
||||||
internal class DefaultLazyRoomSyncJsonAdapter {
|
internal class DefaultLazyRoomSyncEphemeralJsonAdapter {
|
||||||
|
|
||||||
@FromJson
|
@FromJson
|
||||||
fun fromJson(reader: JsonReader, delegate: JsonAdapter<RoomSync>): LazyRoomSync? {
|
fun fromJson(reader: JsonReader, delegate: JsonAdapter<RoomSyncEphemeral>): LazyRoomSyncEphemeral? {
|
||||||
val roomSync = delegate.fromJson(reader) ?: return null
|
val roomSyncEphemeral = delegate.fromJson(reader) ?: return null
|
||||||
return LazyRoomSync.Parsed(roomSync)
|
return LazyRoomSyncEphemeral.Parsed(roomSyncEphemeral)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ToJson
|
@ToJson
|
||||||
fun toJson(writer: JsonWriter, value: LazyRoomSync?) {
|
fun toJson(writer: JsonWriter, value: LazyRoomSyncEphemeral?) {
|
||||||
// This Adapter is not supposed to serialize object
|
// This Adapter is not supposed to serialize object
|
||||||
Timber.v("To json $value with $writer")
|
Timber.v("To json $value with $writer")
|
||||||
throw UnsupportedOperationException()
|
throw UnsupportedOperationException()
|
||||||
|
@ -56,7 +56,7 @@ internal class SplitLazyRoomSyncJsonAdapter(
|
||||||
}
|
}
|
||||||
|
|
||||||
@FromJson
|
@FromJson
|
||||||
fun fromJson(reader: JsonReader, delegate: JsonAdapter<RoomSync>): LazyRoomSync? {
|
fun fromJson(reader: JsonReader, delegate: JsonAdapter<RoomSyncEphemeral>): LazyRoomSyncEphemeral? {
|
||||||
val path = reader.path
|
val path = reader.path
|
||||||
val json = reader.nextSource().inputStream().bufferedReader().use {
|
val json = reader.nextSource().inputStream().bufferedReader().use {
|
||||||
it.readText()
|
it.readText()
|
||||||
|
@ -67,16 +67,16 @@ internal class SplitLazyRoomSyncJsonAdapter(
|
||||||
// Copy the source to a file
|
// Copy the source to a file
|
||||||
val file = createFile()
|
val file = createFile()
|
||||||
file.writeText(json)
|
file.writeText(json)
|
||||||
LazyRoomSync.Stored(delegate, file)
|
LazyRoomSyncEphemeral.Stored(delegate, file)
|
||||||
} else {
|
} else {
|
||||||
Timber.v("INIT_SYNC $path content length: ${json.length} parse it now")
|
Timber.v("INIT_SYNC $path content length: ${json.length} parse it now")
|
||||||
val roomSync = delegate.fromJson(json) ?: return null
|
val roomSync = delegate.fromJson(json) ?: return null
|
||||||
LazyRoomSync.Parsed(roomSync)
|
LazyRoomSyncEphemeral.Parsed(roomSync)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ToJson
|
@ToJson
|
||||||
fun toJson(writer: JsonWriter, value: LazyRoomSync?) {
|
fun toJson(writer: JsonWriter, value: LazyRoomSyncEphemeral?) {
|
||||||
// This Adapter is not supposed to serialize object
|
// This Adapter is not supposed to serialize object
|
||||||
Timber.v("To json $value with $writer")
|
Timber.v("To json $value with $writer")
|
||||||
throw UnsupportedOperationException()
|
throw UnsupportedOperationException()
|
|
@ -29,7 +29,7 @@ import timber.log.Timber
|
||||||
fun Session.configureAndStart(context: Context) {
|
fun Session.configureAndStart(context: Context) {
|
||||||
Timber.i("Configure and start session for $myUserId")
|
Timber.i("Configure and start session for $myUserId")
|
||||||
open()
|
open()
|
||||||
setFilter(FilterService.FilterPreset.RiotFilter)
|
setFilter(FilterService.FilterPreset.ElementFilter)
|
||||||
startSyncing(context)
|
startSyncing(context)
|
||||||
refreshPushers()
|
refreshPushers()
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue