Merge pull request #4396 from vector-im/feature/aris/thread_aware

Feature/aris/thread aware
This commit is contained in:
Benoit Marty 2021-11-18 10:16:56 +01:00 committed by GitHub
commit e98dd2e663
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 328 additions and 8 deletions

1
changelog.d/4246.feature Normal file
View file

@ -0,0 +1 @@
Make Element Android Thread aware

View file

@ -28,6 +28,10 @@ object RelationType {
/** Lets you define an event which references an existing event.*/
const val REFERENCE = "m.reference"
/** Lets you define an thread event that belongs to another existing event.*/
// const val THREAD = "m.thread" // m.thread is not yet released in the backend
const val THREAD = "io.element.thread" // io.element.thread will be replaced by m.thread when it is released
/** Lets you define an event which adds a response to an existing event.*/
const val RESPONSE = "org.matrix.response"
}

View file

@ -19,6 +19,7 @@ package org.matrix.android.sdk.internal.database.model
import io.realm.RealmObject
import io.realm.annotations.Index
import org.matrix.android.sdk.api.session.room.send.SendState
import org.matrix.android.sdk.api.util.JsonDict
import org.matrix.android.sdk.internal.crypto.MXEventDecryptionResult
import org.matrix.android.sdk.internal.crypto.algorithms.olm.OlmDecryptionResult
import org.matrix.android.sdk.internal.di.MoshiProvider
@ -56,10 +57,10 @@ internal open class EventEntity(@Index var eventId: String = "",
companion object
fun setDecryptionResult(result: MXEventDecryptionResult) {
fun setDecryptionResult(result: MXEventDecryptionResult, clearEvent: JsonDict? = null) {
assertIsManaged()
val decryptionResult = OlmDecryptionResult(
payload = result.clearEvent,
payload = clearEvent ?: result.clearEvent,
senderKey = result.senderCurve25519Key,
keysClaimed = result.claimedEd25519Key?.let { mapOf("ed25519" to it) },
forwardingCurve25519KeyChain = result.forwardingCurve25519KeyChain

View file

@ -29,7 +29,6 @@ internal class DefaultEventService @Inject constructor(
override suspend fun getEvent(roomId: String, eventId: String): Event {
val event = getEventTask.execute(GetEventTask.Params(roomId, eventId))
event.ageLocalTs = event.unsignedData?.age?.let { System.currentTimeMillis() - it }
// Fast lane to the call event processors: try to make the incoming call ring faster
if (callEventProcessor.shouldProcessFastLane(event.getClearType())) {
callEventProcessor.processFastLane(event)

View file

@ -23,6 +23,7 @@ import io.realm.RealmConfiguration
import io.realm.RealmQuery
import io.realm.RealmResults
import io.realm.Sort
import kotlinx.coroutines.runBlocking
import org.matrix.android.sdk.api.MatrixCallback
import org.matrix.android.sdk.api.extensions.orFalse
import org.matrix.android.sdk.api.extensions.tryOrNull
@ -33,6 +34,7 @@ import org.matrix.android.sdk.api.session.room.timeline.TimelineEvent
import org.matrix.android.sdk.api.session.room.timeline.TimelineSettings
import org.matrix.android.sdk.api.util.CancelableBag
import org.matrix.android.sdk.internal.database.RealmSessionProvider
import org.matrix.android.sdk.internal.database.mapper.EventMapper
import org.matrix.android.sdk.internal.database.mapper.TimelineEventMapper
import org.matrix.android.sdk.internal.database.model.ChunkEntity
import org.matrix.android.sdk.internal.database.model.RoomEntity
@ -43,6 +45,7 @@ import org.matrix.android.sdk.internal.database.query.where
import org.matrix.android.sdk.internal.database.query.whereRoomId
import org.matrix.android.sdk.internal.session.room.membership.LoadRoomMembersTask
import org.matrix.android.sdk.internal.session.sync.handler.room.ReadReceiptHandler
import org.matrix.android.sdk.internal.session.sync.handler.room.ThreadsAwarenessHandler
import org.matrix.android.sdk.internal.task.TaskExecutor
import org.matrix.android.sdk.internal.task.configureWith
import org.matrix.android.sdk.internal.util.Debouncer
@ -72,6 +75,7 @@ internal class DefaultTimeline(
private val eventDecryptor: TimelineEventDecryptor,
private val realmSessionProvider: RealmSessionProvider,
private val loadRoomMembersTask: LoadRoomMembersTask,
private val threadsAwarenessHandler: ThreadsAwarenessHandler,
private val readReceiptHandler: ReadReceiptHandler
) : Timeline,
TimelineInput.Listener,
@ -577,6 +581,10 @@ internal class DefaultTimeline(
} else {
nextDisplayIndex = offsetIndex + 1
}
// Prerequisite to in order for the ThreadsAwarenessHandler to work properly
fetchRootThreadEventsIfNeeded(offsetResults)
offsetResults.forEach { eventEntity ->
val timelineEvent = buildTimelineEvent(eventEntity)
@ -601,6 +609,20 @@ internal class DefaultTimeline(
return offsetResults.size
}
/**
* This function is responsible to fetch and store the root event of a thread event
* in order to be able to display the event to the user appropriately
*/
private fun fetchRootThreadEventsIfNeeded(offsetResults: RealmResults<TimelineEventEntity>) = runBlocking {
val eventEntityList = offsetResults
.mapNotNull {
it?.root
}.map {
EventMapper.map(it)
}
threadsAwarenessHandler.fetchRootThreadEventsIfNeeded(eventEntityList)
}
private fun buildTimelineEvent(eventEntity: TimelineEventEntity): TimelineEvent {
return timelineEventMapper.map(
timelineEventEntity = eventEntity,

View file

@ -38,6 +38,7 @@ import org.matrix.android.sdk.internal.database.query.where
import org.matrix.android.sdk.internal.di.SessionDatabase
import org.matrix.android.sdk.internal.session.room.membership.LoadRoomMembersTask
import org.matrix.android.sdk.internal.session.sync.handler.room.ReadReceiptHandler
import org.matrix.android.sdk.internal.session.sync.handler.room.ThreadsAwarenessHandler
import org.matrix.android.sdk.internal.task.TaskExecutor
internal class DefaultTimelineService @AssistedInject constructor(
@ -52,6 +53,7 @@ internal class DefaultTimelineService @AssistedInject constructor(
private val fetchTokenAndPaginateTask: FetchTokenAndPaginateTask,
private val timelineEventMapper: TimelineEventMapper,
private val loadRoomMembersTask: LoadRoomMembersTask,
private val threadsAwarenessHandler: ThreadsAwarenessHandler,
private val readReceiptHandler: ReadReceiptHandler
) : TimelineService {
@ -75,6 +77,7 @@ internal class DefaultTimelineService @AssistedInject constructor(
fetchTokenAndPaginateTask = fetchTokenAndPaginateTask,
realmSessionProvider = realmSessionProvider,
loadRoomMembersTask = loadRoomMembersTask,
threadsAwarenessHandler = threadsAwarenessHandler,
readReceiptHandler = readReceiptHandler
)
}

View file

@ -59,6 +59,8 @@ internal class DefaultGetEventTask @Inject constructor(
}
}
event.ageLocalTs = event.unsignedData?.age?.let { System.currentTimeMillis() - it }
return event
}
}

View file

@ -26,6 +26,7 @@ import org.matrix.android.sdk.internal.crypto.model.event.EncryptedEventContent
import org.matrix.android.sdk.internal.database.model.EventEntity
import org.matrix.android.sdk.internal.database.query.where
import org.matrix.android.sdk.internal.di.SessionDatabase
import org.matrix.android.sdk.internal.session.sync.handler.room.ThreadsAwarenessHandler
import timber.log.Timber
import java.util.concurrent.ExecutorService
import java.util.concurrent.Executors
@ -34,7 +35,8 @@ import javax.inject.Inject
internal class TimelineEventDecryptor @Inject constructor(
@SessionDatabase
private val realmConfiguration: RealmConfiguration,
private val cryptoService: CryptoService
private val cryptoService: CryptoService,
private val threadsAwarenessHandler: ThreadsAwarenessHandler
) {
private val newSessionListener = object : NewSessionListener {
@ -106,10 +108,19 @@ internal class TimelineEventDecryptor @Inject constructor(
val result = cryptoService.decryptEvent(request.event, timelineId)
Timber.v("Successfully decrypted event ${event.eventId}")
realm.executeTransaction {
val eventId = event.eventId ?: ""
EventEntity.where(it, eventId = eventId)
val eventId = event.eventId ?: return@executeTransaction
val eventEntity = EventEntity
.where(it, eventId = eventId)
.findFirst()
?.setDecryptionResult(result)
eventEntity?.apply {
val decryptedPayload = threadsAwarenessHandler.handleIfNeededDuringDecryption(
it,
roomId = event.roomId,
event,
result)
setDecryptionResult(result, decryptedPayload)
}
}
} catch (e: MXCryptoError) {
Timber.v("Failed to decrypt event ${event.eventId} : ${e.localizedMessage}")

View file

@ -41,6 +41,7 @@ import org.matrix.android.sdk.internal.session.sync.handler.PresenceSyncHandler
import org.matrix.android.sdk.internal.session.sync.handler.SyncResponsePostTreatmentAggregatorHandler
import org.matrix.android.sdk.internal.session.sync.handler.UserAccountDataSyncHandler
import org.matrix.android.sdk.internal.session.sync.handler.room.RoomSyncHandler
import org.matrix.android.sdk.internal.session.sync.handler.room.ThreadsAwarenessHandler
import org.matrix.android.sdk.internal.util.awaitTransaction
import org.matrix.android.sdk.internal.worker.WorkerParamsFactory
import timber.log.Timber
@ -65,6 +66,7 @@ internal class SyncResponseHandler @Inject constructor(
private val tokenStore: SyncTokenStore,
private val processEventForPushTask: ProcessEventForPushTask,
private val pushRuleService: PushRuleService,
private val threadsAwarenessHandler: ThreadsAwarenessHandler,
private val presenceSyncHandler: PresenceSyncHandler
) {
@ -97,6 +99,10 @@ internal class SyncResponseHandler @Inject constructor(
Timber.v("Finish handling toDevice in $it ms")
}
val aggregator = SyncResponsePostTreatmentAggregator()
// Prerequisite for thread events handling in RoomSyncHandler
threadsAwarenessHandler.fetchRootThreadEventsIfNeeded(syncResponse)
// Start one big transaction
monarchy.awaitTransaction { realm ->
measureTimeMillis {

View file

@ -76,6 +76,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
private val cryptoService: DefaultCryptoService,
private val roomMemberEventHandler: RoomMemberEventHandler,
private val roomTypingUsersHandler: RoomTypingUsersHandler,
private val threadsAwarenessHandler: ThreadsAwarenessHandler,
private val roomChangeMembershipStateDataSource: RoomChangeMembershipStateDataSource,
@UserId private val userId: String,
private val timelineInput: TimelineInput) {
@ -362,10 +363,17 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
}
eventIds.add(event.eventId)
if (event.isEncrypted() && insertType != EventInsertType.INITIAL_SYNC) {
val isInitialSync = insertType == EventInsertType.INITIAL_SYNC
if (event.isEncrypted() && !isInitialSync) {
decryptIfNeeded(event, roomId)
}
threadsAwarenessHandler.handleIfNeeded(
realm = realm,
roomId = roomId,
event = event)
val ageLocalTs = event.unsignedData?.age?.let { syncLocalTimestampMillis - it }
val eventEntity = event.toEntity(roomId, SendState.SYNCED, ageLocalTs).copyToRealmOrIgnore(realm, insertType)
if (event.stateKey != null) {

View file

@ -0,0 +1,263 @@
/*
* Copyright 2021 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.internal.session.sync.handler.room
import com.zhuinden.monarchy.Monarchy
import io.realm.Realm
import org.matrix.android.sdk.api.session.crypto.CryptoService
import org.matrix.android.sdk.api.session.crypto.MXCryptoError
import org.matrix.android.sdk.api.session.events.model.Event
import org.matrix.android.sdk.api.session.events.model.RelationType
import org.matrix.android.sdk.api.session.events.model.toContent
import org.matrix.android.sdk.api.session.events.model.toModel
import org.matrix.android.sdk.api.session.room.model.message.MessageFormat
import org.matrix.android.sdk.api.session.room.model.message.MessageRelationContent
import org.matrix.android.sdk.api.session.room.model.message.MessageTextContent
import org.matrix.android.sdk.api.session.room.send.SendState
import org.matrix.android.sdk.api.session.sync.model.SyncResponse
import org.matrix.android.sdk.api.util.JsonDict
import org.matrix.android.sdk.internal.crypto.MXEventDecryptionResult
import org.matrix.android.sdk.internal.crypto.algorithms.olm.OlmDecryptionResult
import org.matrix.android.sdk.internal.database.mapper.EventMapper
import org.matrix.android.sdk.internal.database.mapper.toEntity
import org.matrix.android.sdk.internal.database.model.EventEntity
import org.matrix.android.sdk.internal.database.model.EventInsertType
import org.matrix.android.sdk.internal.database.query.copyToRealmOrIgnore
import org.matrix.android.sdk.internal.database.query.where
import org.matrix.android.sdk.internal.di.SessionDatabase
import org.matrix.android.sdk.internal.session.permalinks.PermalinkFactory
import org.matrix.android.sdk.internal.session.room.send.LocalEchoEventFactory
import org.matrix.android.sdk.internal.session.room.timeline.GetEventTask
import org.matrix.android.sdk.internal.util.awaitTransaction
import javax.inject.Inject
/**
* This handler is responsible for a smooth threads migration. It will map all incoming
* threads as replies. So a device without threads enabled/updated will be able to view
* threads response as replies to the original message
*/
internal class ThreadsAwarenessHandler @Inject constructor(
private val permalinkFactory: PermalinkFactory,
private val cryptoService: CryptoService,
@SessionDatabase private val monarchy: Monarchy,
private val getEventTask: GetEventTask
) {
/**
* Fetch root thread events if they are missing from the local storage
* @param syncResponse the sync response
*/
suspend fun fetchRootThreadEventsIfNeeded(syncResponse: SyncResponse) {
val handlingStrategy = syncResponse.rooms?.join?.let {
RoomSyncHandler.HandlingStrategy.JOINED(it)
}
if (handlingStrategy !is RoomSyncHandler.HandlingStrategy.JOINED) return
val eventList = handlingStrategy.data
.mapNotNull { (roomId, roomSync) ->
roomSync.timeline?.events?.map {
it.copy(roomId = roomId)
}
}.flatten()
fetchRootThreadEventsIfNeeded(eventList)
}
/**
* Fetch root thread events if they are missing from the local storage
* @param eventList a list with the events to examine
*/
suspend fun fetchRootThreadEventsIfNeeded(eventList: List<Event>) {
if (eventList.isNullOrEmpty()) return
val threadsToFetch = emptyMap<String, String>().toMutableMap()
Realm.getInstance(monarchy.realmConfiguration).use { realm ->
eventList.asSequence()
.filter {
isThreadEvent(it) && it.roomId != null
}.mapNotNull { event ->
getRootThreadEventId(event)?.let {
Pair(it, event.roomId!!)
}
}.forEach { (rootThreadEventId, roomId) ->
EventEntity.where(realm, rootThreadEventId).findFirst() ?: run { threadsToFetch[rootThreadEventId] = roomId }
}
}
fetchThreadsEvents(threadsToFetch)
}
/**
* Fetch multiple unique events using the fetchEvent function
*/
private suspend fun fetchThreadsEvents(threadsToFetch: Map<String, String>) {
val eventEntityList = threadsToFetch.mapNotNull { (eventId, roomId) ->
fetchEvent(eventId, roomId)?.let {
it.toEntity(roomId, SendState.SYNCED, it.ageLocalTs)
}
}
if (eventEntityList.isNullOrEmpty()) return
// Transaction should be done on its own thread, like below
monarchy.awaitTransaction { realm ->
eventEntityList.forEach {
it.copyToRealmOrIgnore(realm, EventInsertType.INCREMENTAL_SYNC)
}
}
}
/**
* This function will fetch the event from the homeserver, this is mandatory when the
* initial thread message is too old and is not saved in the device, so in order to
* construct the "reply to" format we need to know the event thread.
* @return the Event or null otherwise
*/
private suspend fun fetchEvent(eventId: String, roomId: String): Event? {
return runCatching {
getEventTask.execute(GetEventTask.Params(roomId = roomId, eventId = eventId))
}.fold(
onSuccess = {
it
},
onFailure = {
null
})
}
/**
* Handle events mainly coming from the RoomSyncHandler
*/
fun handleIfNeeded(realm: Realm,
roomId: String,
event: Event) {
val payload = transformThreadToReplyIfNeeded(
realm = realm,
roomId = roomId,
event = event,
decryptedResult = event.mxDecryptionResult?.payload) ?: return
event.mxDecryptionResult = event.mxDecryptionResult?.copy(payload = payload)
}
/**
* Handle events while they are being decrypted
*/
fun handleIfNeededDuringDecryption(realm: Realm,
roomId: String?,
event: Event,
result: MXEventDecryptionResult): JsonDict? {
return transformThreadToReplyIfNeeded(
realm = realm,
roomId = roomId,
event = event,
decryptedResult = result.clearEvent)
}
/**
* If the event is a thread event then transform/enhance it to a visual Reply Event,
* If the event is not a thread event, null value will be returned
* If there is an error (ex. the root/origin thread event is not found), null willl be returend
*/
private fun transformThreadToReplyIfNeeded(realm: Realm, roomId: String?, event: Event, decryptedResult: JsonDict?): JsonDict? {
roomId ?: return null
if (!isThreadEvent(event)) return null
val rootThreadEventId = getRootThreadEventId(event) ?: return null
val payload = decryptedResult?.toMutableMap() ?: return null
val body = getValueFromPayload(payload, "body") ?: return null
val msgType = getValueFromPayload(payload, "msgtype") ?: return null
val rootThreadEvent = getEventFromDB(realm, rootThreadEventId) ?: return null
val rootThreadEventSenderId = rootThreadEvent.senderId ?: return null
decryptIfNeeded(rootThreadEvent, roomId)
val rootThreadEventBody = getValueFromPayload(rootThreadEvent.mxDecryptionResult?.payload?.toMutableMap(), "body")
val permalink = permalinkFactory.createPermalink(roomId, rootThreadEventId, false)
val userLink = permalinkFactory.createPermalink(rootThreadEventSenderId, false) ?: ""
val replyFormatted = LocalEchoEventFactory.REPLY_PATTERN.format(
permalink,
userLink,
rootThreadEventSenderId,
// Remove inner mx_reply tags if any
rootThreadEventBody,
body)
val messageTextContent = MessageTextContent(
msgType = msgType,
format = MessageFormat.FORMAT_MATRIX_HTML,
body = body,
formattedBody = replyFormatted
).toContent()
payload["content"] = messageTextContent
return payload
}
/**
* Decrypt the event
*/
private fun decryptIfNeeded(event: Event, roomId: String) {
try {
if (!event.isEncrypted() || event.mxDecryptionResult != null) return
// Event from sync does not have roomId, so add it to the event first
val result = cryptoService.decryptEvent(event.copy(roomId = roomId), "")
event.mxDecryptionResult = OlmDecryptionResult(
payload = result.clearEvent,
senderKey = result.senderCurve25519Key,
keysClaimed = result.claimedEd25519Key?.let { k -> mapOf("ed25519" to k) },
forwardingCurve25519KeyChain = result.forwardingCurve25519KeyChain
)
} catch (e: MXCryptoError) {
if (e is MXCryptoError.Base) {
event.mCryptoError = e.errorType
event.mCryptoErrorReason = e.technicalMessage.takeIf { it.isNotEmpty() } ?: e.detailedErrorDescription
}
}
}
/**
* Try to get the event form the local DB, if the event does not exist null
* will be returned
*/
private fun getEventFromDB(realm: Realm, eventId: String): Event? {
val eventEntity = EventEntity.where(realm, eventId = eventId).findFirst() ?: return null
return EventMapper.map(eventEntity)
}
/**
* Returns True if the event is a thread
* @param event
*/
private fun isThreadEvent(event: Event): Boolean =
event.content.toModel<MessageRelationContent>()?.relatesTo?.type == RelationType.THREAD
/**
* Returns the root thread eventId or null otherwise
* @param event
*/
private fun getRootThreadEventId(event: Event): String? =
event.content.toModel<MessageRelationContent>()?.relatesTo?.eventId
@Suppress("UNCHECKED_CAST")
private fun getValueFromPayload(payload: JsonDict?, key: String): String? {
val content = payload?.get("content") as? JsonDict
return content?.get(key) as? String
}
}