mirror of
https://github.com/element-hq/element-android
synced 2024-11-25 10:55:38 +03:00
Merge pull request #765 from vector-im/sdk_reference_aggregation
Aggregate Event References for DM verifications
This commit is contained in:
commit
b473aeb475
26 changed files with 364 additions and 61 deletions
|
@ -18,5 +18,8 @@ package im.vector.matrix.android
|
|||
|
||||
import im.vector.matrix.android.internal.util.MatrixCoroutineDispatchers
|
||||
import kotlinx.coroutines.Dispatchers.Main
|
||||
import kotlinx.coroutines.asCoroutineDispatcher
|
||||
import java.util.concurrent.Executors
|
||||
|
||||
internal val testCoroutineDispatchers = MatrixCoroutineDispatchers(Main, Main, Main, Main, Main)
|
||||
internal val testCoroutineDispatchers = MatrixCoroutineDispatchers(Main, Main, Main, Main, Main,
|
||||
Executors.newSingleThreadExecutor().asCoroutineDispatcher())
|
||||
|
|
|
@ -85,6 +85,14 @@ data class Event(
|
|||
@Transient
|
||||
var sendState: SendState = SendState.UNKNOWN
|
||||
|
||||
/**
|
||||
The `age` value transcoded in a timestamp based on the device clock when the SDK received
|
||||
the event from the home server.
|
||||
Unlike `age`, this value is static.
|
||||
*/
|
||||
@Transient
|
||||
var ageLocalTs: Long? = null
|
||||
|
||||
/**
|
||||
* Check if event is a state event.
|
||||
* @return true if event is state event.
|
||||
|
|
|
@ -21,9 +21,23 @@ import com.squareup.moshi.JsonClass
|
|||
|
||||
@JsonClass(generateAdapter = true)
|
||||
data class UnsignedData(
|
||||
/**
|
||||
* The time in milliseconds that has elapsed since the event was sent.
|
||||
* This field is generated by the local homeserver, and may be incorrect if the local time on at least one of the two servers
|
||||
* is out of sync, which can cause the age to either be negative or greater than it actually is.
|
||||
*/
|
||||
@Json(name = "age") val age: Long?,
|
||||
/**
|
||||
* Optional. The event that redacted this event, if any.
|
||||
*/
|
||||
@Json(name = "redacted_because") val redactedEvent: Event? = null,
|
||||
/**
|
||||
* The client-supplied transaction ID, if the client being given the event is the same one which sent it.
|
||||
*/
|
||||
@Json(name = "transaction_id") val transactionId: String? = null,
|
||||
/**
|
||||
* Optional. The previous content for this event. If there is no previous content, this key will be missing.
|
||||
*/
|
||||
@Json(name = "prev_content") val prevContent: Map<String, Any>? = null,
|
||||
@Json(name = "m.relations") val relations: AggregatedRelations? = null
|
||||
)
|
||||
|
|
|
@ -18,5 +18,6 @@ package im.vector.matrix.android.api.session.room.model
|
|||
data class EventAnnotationsSummary(
|
||||
var eventId: String,
|
||||
var reactionsSummary: List<ReactionAggregatedSummary>,
|
||||
var editSummary: EditAggregatedSummary?
|
||||
var editSummary: EditAggregatedSummary?,
|
||||
var referencesAggregatedSummary: ReferencesAggregatedSummary? = null
|
||||
)
|
||||
|
|
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
* Copyright 2019 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package im.vector.matrix.android.api.session.room.model
|
||||
|
||||
import com.squareup.moshi.Json
|
||||
import com.squareup.moshi.JsonClass
|
||||
|
||||
/**
|
||||
* Contains an aggregated summary info of the references.
|
||||
* Put pre-computed info that you want to access quickly without having
|
||||
* to go through all references events
|
||||
*/
|
||||
@JsonClass(generateAdapter = true)
|
||||
data class ReferencesAggregatedContent(
|
||||
// Verification status info for m.key.verification.request msgType events
|
||||
@Json(name = "verif_sum") val verificationSummary: String
|
||||
// Add more fields for future summary info.
|
||||
)
|
|
@ -0,0 +1,29 @@
|
|||
/*
|
||||
* Copyright 2019 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package im.vector.matrix.android.api.session.room.model
|
||||
|
||||
import im.vector.matrix.android.api.session.events.model.Content
|
||||
|
||||
/**
|
||||
* Events can relates to other events, this object keeps a summary
|
||||
* of all events that are referencing the 'eventId' event via the RelationType.REFERENCE
|
||||
*/
|
||||
data class ReferencesAggregatedSummary(
|
||||
val eventId: String,
|
||||
val content: Content?,
|
||||
val sourceEvents: List<String>,
|
||||
val localEchos: List<String>
|
||||
)
|
|
@ -21,6 +21,6 @@ import com.squareup.moshi.JsonClass
|
|||
import im.vector.matrix.android.api.session.room.model.relation.RelationDefaultContent
|
||||
|
||||
@JsonClass(generateAdapter = true)
|
||||
internal data class MessageRelationContent(
|
||||
data class MessageRelationContent(
|
||||
@Json(name = "m.relates_to") val relatesTo: RelationDefaultContent?
|
||||
)
|
||||
|
|
|
@ -20,8 +20,8 @@ import com.squareup.moshi.JsonClass
|
|||
import im.vector.matrix.android.api.session.events.model.RelationType
|
||||
import im.vector.matrix.android.api.session.events.model.toContent
|
||||
import im.vector.matrix.android.api.session.room.model.relation.RelationDefaultContent
|
||||
import im.vector.matrix.android.internal.crypto.verification.VerificationInfoAcceptFactory
|
||||
import im.vector.matrix.android.internal.crypto.verification.VerificationInfoAccept
|
||||
import im.vector.matrix.android.internal.crypto.verification.VerificationInfoAcceptFactory
|
||||
import timber.log.Timber
|
||||
|
||||
@JsonClass(generateAdapter = true)
|
||||
|
|
|
@ -29,6 +29,7 @@ import im.vector.matrix.android.internal.crypto.tasks.SendVerificationMessageTas
|
|||
import im.vector.matrix.android.internal.session.room.send.SendResponse
|
||||
import im.vector.matrix.android.internal.task.TaskConstraints
|
||||
import im.vector.matrix.android.internal.task.TaskExecutor
|
||||
import im.vector.matrix.android.internal.task.TaskThread
|
||||
import im.vector.matrix.android.internal.task.configureWith
|
||||
import timber.log.Timber
|
||||
import javax.inject.Inject
|
||||
|
@ -56,6 +57,8 @@ internal class SasTransportRoomMessage(
|
|||
cryptoService
|
||||
)
|
||||
) {
|
||||
callbackThread = TaskThread.DM_VERIF
|
||||
executionThread = TaskThread.DM_VERIF
|
||||
constraints = TaskConstraints(true)
|
||||
callback = object : MatrixCallback<SendResponse> {
|
||||
override fun onSuccess(data: SendResponse) {
|
||||
|
@ -86,6 +89,8 @@ internal class SasTransportRoomMessage(
|
|||
cryptoService
|
||||
)
|
||||
) {
|
||||
callbackThread = TaskThread.DM_VERIF
|
||||
executionThread = TaskThread.DM_VERIF
|
||||
constraints = TaskConstraints(true)
|
||||
retryCount = 3
|
||||
callback = object : MatrixCallback<SendResponse> {
|
||||
|
@ -115,6 +120,8 @@ internal class SasTransportRoomMessage(
|
|||
cryptoService
|
||||
)
|
||||
) {
|
||||
callbackThread = TaskThread.DM_VERIF
|
||||
executionThread = TaskThread.DM_VERIF
|
||||
constraints = TaskConstraints(true)
|
||||
retryCount = 3
|
||||
}
|
||||
|
|
|
@ -61,7 +61,6 @@ internal class SasTransportToDevice(
|
|||
|
||||
override fun onFailure(failure: Throwable) {
|
||||
Timber.e("## SAS verification [$tx.transactionId] failed to send toDevice in state : $tx.state")
|
||||
|
||||
tx.cancel(onErrorReason)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -37,13 +37,12 @@ import timber.log.Timber
|
|||
import java.util.*
|
||||
import javax.inject.Inject
|
||||
|
||||
internal class VerificationMessageLiveObserver @Inject constructor(
|
||||
@SessionDatabase realmConfiguration: RealmConfiguration,
|
||||
@UserId private val userId: String,
|
||||
private val cryptoService: CryptoService,
|
||||
private val sasVerificationService: DefaultSasVerificationService,
|
||||
private val taskExecutor: TaskExecutor
|
||||
) : RealmLiveEntityObserver<EventEntity>(realmConfiguration) {
|
||||
internal class VerificationMessageLiveObserver @Inject constructor(@SessionDatabase realmConfiguration: RealmConfiguration,
|
||||
@UserId private val userId: String,
|
||||
private val cryptoService: CryptoService,
|
||||
private val sasVerificationService: DefaultSasVerificationService,
|
||||
private val taskExecutor: TaskExecutor) :
|
||||
RealmLiveEntityObserver<EventEntity>(realmConfiguration) {
|
||||
|
||||
override val query = Monarchy.Query<EventEntity> {
|
||||
EventEntity.types(it, listOf(
|
||||
|
@ -70,22 +69,31 @@ internal class VerificationMessageLiveObserver @Inject constructor(
|
|||
}
|
||||
.toList()
|
||||
|
||||
// TODO use age also, ignore initial sync or back pagination?
|
||||
// TODO ignore initial sync or back pagination?
|
||||
|
||||
val now = System.currentTimeMillis()
|
||||
val tooInThePast = now - (10 * 60 * 1000 * 1000)
|
||||
val tooInTheFuture = System.currentTimeMillis() + (5 * 60 * 1000 * 1000)
|
||||
val tooInThePast = now - (10 * 60 * 1000)
|
||||
val fiveMinInMs = 5 * 60 * 1000
|
||||
val tooInTheFuture = System.currentTimeMillis() + fiveMinInMs
|
||||
|
||||
events.forEach { event ->
|
||||
Timber.d("## SAS Verification live observer: received msgId: ${event.eventId} msgtype: ${event.type} from ${event.senderId}")
|
||||
Timber.d("## SAS Verification live observer: received msgId: ${event.eventId} msgtype: ${event.type} from ${event.senderId}")
|
||||
Timber.v("## SAS Verification live observer: received msgId: $event")
|
||||
|
||||
// If the request is in the future by more than 5 minutes or more than 10 minutes in the past,
|
||||
// the message should be ignored by the receiver.
|
||||
val eventOrigin = event.originServerTs ?: -1
|
||||
if (eventOrigin < tooInThePast || eventOrigin > tooInTheFuture) {
|
||||
Timber.d("## SAS Verification live observer: msgId: ${event.eventId} is out of time ^^")
|
||||
val ageLocalTs = event.ageLocalTs
|
||||
if (ageLocalTs != null && (now - ageLocalTs) > fiveMinInMs) {
|
||||
Timber.d("## SAS Verification live observer: msgId: ${event.eventId} is too old (age: ${(now - ageLocalTs)})")
|
||||
return@forEach
|
||||
} else {
|
||||
val eventOrigin = event.originServerTs ?: -1
|
||||
if (eventOrigin < tooInThePast || eventOrigin > tooInTheFuture) {
|
||||
Timber.d("## SAS Verification live observer: msgId: ${event.eventId} is too old (ts: $eventOrigin")
|
||||
return@forEach
|
||||
}
|
||||
}
|
||||
|
||||
// decrypt if needed?
|
||||
if (event.isEncrypted() && event.mxDecryptionResult == null) {
|
||||
// TODO use a global event decryptor? attache to session and that listen to new sessionId?
|
||||
|
|
|
@ -19,9 +19,11 @@ package im.vector.matrix.android.internal.database.mapper
|
|||
import im.vector.matrix.android.api.session.room.model.EditAggregatedSummary
|
||||
import im.vector.matrix.android.api.session.room.model.EventAnnotationsSummary
|
||||
import im.vector.matrix.android.api.session.room.model.ReactionAggregatedSummary
|
||||
import im.vector.matrix.android.api.session.room.model.ReferencesAggregatedSummary
|
||||
import im.vector.matrix.android.internal.database.model.EditAggregatedSummaryEntity
|
||||
import im.vector.matrix.android.internal.database.model.EventAnnotationsSummaryEntity
|
||||
import im.vector.matrix.android.internal.database.model.ReactionAggregatedSummaryEntity
|
||||
import im.vector.matrix.android.internal.database.model.ReferencesAggregatedSummaryEntity
|
||||
import io.realm.RealmList
|
||||
|
||||
internal object EventAnnotationsSummaryMapper {
|
||||
|
@ -45,6 +47,14 @@ internal object EventAnnotationsSummaryMapper {
|
|||
it.sourceLocalEchoEvents.toList(),
|
||||
it.lastEditTs
|
||||
)
|
||||
},
|
||||
referencesAggregatedSummary = annotationsSummary.referencesSummaryEntity?.let {
|
||||
ReferencesAggregatedSummary(
|
||||
it.eventId,
|
||||
ContentMapper.map(it.content),
|
||||
it.sourceEvents.toList(),
|
||||
it.sourceLocalEcho.toList()
|
||||
)
|
||||
}
|
||||
)
|
||||
}
|
||||
|
@ -75,6 +85,14 @@ internal object EventAnnotationsSummaryMapper {
|
|||
})
|
||||
}
|
||||
}
|
||||
eventAnnotationsSummaryEntity.referencesSummaryEntity = annotationsSummary.referencesAggregatedSummary?.let {
|
||||
ReferencesAggregatedSummaryEntity(
|
||||
it.eventId,
|
||||
ContentMapper.map(it.content),
|
||||
RealmList<String>().apply { addAll(it.sourceEvents) },
|
||||
RealmList<String>().apply { addAll(it.localEchos) }
|
||||
)
|
||||
}
|
||||
return eventAnnotationsSummaryEntity
|
||||
}
|
||||
}
|
||||
|
|
|
@ -43,6 +43,7 @@ internal object EventMapper {
|
|||
eventEntity.redacts = event.redacts
|
||||
eventEntity.age = event.unsignedData?.age ?: event.originServerTs
|
||||
eventEntity.unsignedData = uds
|
||||
eventEntity.ageLocalTs = event.ageLocalTs
|
||||
return eventEntity
|
||||
}
|
||||
|
||||
|
@ -70,6 +71,7 @@ internal object EventMapper {
|
|||
unsignedData = ud,
|
||||
redacts = eventEntity.redacts
|
||||
).also {
|
||||
it.ageLocalTs = eventEntity.ageLocalTs
|
||||
it.sendState = eventEntity.sendState
|
||||
eventEntity.decryptionResultJson?.let { json ->
|
||||
try {
|
||||
|
|
|
@ -24,7 +24,8 @@ internal open class EventAnnotationsSummaryEntity(
|
|||
var eventId: String = "",
|
||||
var roomId: String? = null,
|
||||
var reactionsSummary: RealmList<ReactionAggregatedSummaryEntity> = RealmList(),
|
||||
var editSummary: EditAggregatedSummaryEntity? = null
|
||||
var editSummary: EditAggregatedSummaryEntity? = null,
|
||||
var referencesSummaryEntity: ReferencesAggregatedSummaryEntity? = null
|
||||
) : RealmObject() {
|
||||
|
||||
companion object
|
||||
|
|
|
@ -40,7 +40,8 @@ internal open class EventEntity(@Index var eventId: String = "",
|
|||
@Index var displayIndex: Int = 0,
|
||||
@Index var isUnlinked: Boolean = false,
|
||||
var decryptionResultJson: String? = null,
|
||||
var decryptionErrorCode: String? = null
|
||||
var decryptionErrorCode: String? = null,
|
||||
var ageLocalTs: Long? = null
|
||||
) : RealmObject() {
|
||||
|
||||
enum class LinkFilterMode {
|
||||
|
|
|
@ -0,0 +1,31 @@
|
|||
/*
|
||||
* Copyright 2019 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package im.vector.matrix.android.internal.database.model
|
||||
|
||||
import io.realm.RealmList
|
||||
import io.realm.RealmObject
|
||||
|
||||
internal open class ReferencesAggregatedSummaryEntity(
|
||||
var eventId: String = "",
|
||||
var content: String? = null,
|
||||
// The list of the eventIDs used to build the summary (might be out of sync if chunked received from message chunk)
|
||||
var sourceEvents: RealmList<String> = RealmList(),
|
||||
// List of transaction ids for local echos
|
||||
var sourceLocalEcho: RealmList<String> = RealmList()
|
||||
) : RealmObject() {
|
||||
|
||||
companion object
|
||||
}
|
|
@ -38,6 +38,7 @@ import io.realm.annotations.RealmModule
|
|||
IgnoredUserEntity::class,
|
||||
BreadcrumbsEntity::class,
|
||||
EventAnnotationsSummaryEntity::class,
|
||||
ReferencesAggregatedSummaryEntity::class,
|
||||
ReactionAggregatedSummaryEntity::class,
|
||||
EditAggregatedSummaryEntity::class,
|
||||
PushRulesEntity::class,
|
||||
|
|
|
@ -47,3 +47,7 @@ internal fun EventAnnotationsSummaryEntity.Companion.create(realm: Realm, roomId
|
|||
}
|
||||
return obj
|
||||
}
|
||||
internal fun EventAnnotationsSummaryEntity.Companion.getOrCreate(realm: Realm, roomId: String, eventId: String): EventAnnotationsSummaryEntity {
|
||||
return EventAnnotationsSummaryEntity.where(realm, eventId).findFirst()
|
||||
?: EventAnnotationsSummaryEntity.create(realm, roomId, eventId).apply { this.roomId = roomId }
|
||||
}
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
/*
|
||||
* Copyright 2019 New Vector Ltd
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package im.vector.matrix.android.internal.database.query
|
||||
|
||||
import im.vector.matrix.android.internal.database.model.ReferencesAggregatedSummaryEntity
|
||||
import im.vector.matrix.android.internal.database.model.ReferencesAggregatedSummaryEntityFields
|
||||
import io.realm.Realm
|
||||
import io.realm.RealmQuery
|
||||
import io.realm.kotlin.where
|
||||
|
||||
internal fun ReferencesAggregatedSummaryEntity.Companion.where(realm: Realm, eventId: String): RealmQuery<ReferencesAggregatedSummaryEntity> {
|
||||
val query = realm.where<ReferencesAggregatedSummaryEntity>()
|
||||
query.equalTo(ReferencesAggregatedSummaryEntityFields.EVENT_ID, eventId)
|
||||
return query
|
||||
}
|
||||
|
||||
internal fun ReferencesAggregatedSummaryEntity.Companion.create(realm: Realm, txID: String): ReferencesAggregatedSummaryEntity {
|
||||
return realm.createObject(ReferencesAggregatedSummaryEntity::class.java).apply {
|
||||
this.eventId = txID
|
||||
}
|
||||
}
|
|
@ -36,10 +36,11 @@ internal object MatrixModule {
|
|||
@MatrixScope
|
||||
fun providesMatrixCoroutineDispatchers(): MatrixCoroutineDispatchers {
|
||||
return MatrixCoroutineDispatchers(io = Dispatchers.IO,
|
||||
computation = Dispatchers.Default,
|
||||
main = Dispatchers.Main,
|
||||
crypto = createBackgroundHandler("Crypto_Thread").asCoroutineDispatcher(),
|
||||
sync = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
|
||||
computation = Dispatchers.Default,
|
||||
main = Dispatchers.Main,
|
||||
crypto = createBackgroundHandler("Crypto_Thread").asCoroutineDispatcher(),
|
||||
sync = Executors.newSingleThreadExecutor().asCoroutineDispatcher(),
|
||||
dmVerif = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
|
||||
)
|
||||
}
|
||||
|
||||
|
|
|
@ -19,7 +19,9 @@ import com.zhuinden.monarchy.Monarchy
|
|||
import im.vector.matrix.android.api.session.crypto.CryptoService
|
||||
import im.vector.matrix.android.api.session.crypto.MXCryptoError
|
||||
import im.vector.matrix.android.api.session.events.model.*
|
||||
import im.vector.matrix.android.api.session.room.model.ReferencesAggregatedContent
|
||||
import im.vector.matrix.android.api.session.room.model.message.MessageContent
|
||||
import im.vector.matrix.android.api.session.room.model.message.MessageRelationContent
|
||||
import im.vector.matrix.android.api.session.room.model.relation.ReactionContent
|
||||
import im.vector.matrix.android.internal.crypto.algorithms.olm.OlmDecryptionResult
|
||||
import im.vector.matrix.android.internal.crypto.model.event.EncryptedEventContent
|
||||
|
@ -27,6 +29,7 @@ import im.vector.matrix.android.internal.database.mapper.ContentMapper
|
|||
import im.vector.matrix.android.internal.database.mapper.EventMapper
|
||||
import im.vector.matrix.android.internal.database.model.*
|
||||
import im.vector.matrix.android.internal.database.query.create
|
||||
import im.vector.matrix.android.internal.database.query.getOrCreate
|
||||
import im.vector.matrix.android.internal.database.query.where
|
||||
import im.vector.matrix.android.internal.task.Task
|
||||
import im.vector.matrix.android.internal.util.awaitTransaction
|
||||
|
@ -42,6 +45,14 @@ internal interface EventRelationsAggregationTask : Task<EventRelationsAggregatio
|
|||
)
|
||||
}
|
||||
|
||||
enum class VerificationState {
|
||||
REQUEST,
|
||||
WAITING,
|
||||
CANCELED_BY_ME,
|
||||
CANCELED_BY_OTHER,
|
||||
DONE
|
||||
}
|
||||
|
||||
/**
|
||||
* Called by EventRelationAggregationUpdater, when new events that can affect relations are inserted in base.
|
||||
*/
|
||||
|
@ -72,12 +83,12 @@ internal class DefaultEventRelationsAggregationTask @Inject constructor(
|
|||
}
|
||||
val isLocalEcho = LocalEcho.isLocalEchoId(event.eventId ?: "")
|
||||
when (event.type) {
|
||||
EventType.REACTION -> {
|
||||
EventType.REACTION -> {
|
||||
// we got a reaction!!
|
||||
Timber.v("###REACTION in room $roomId , reaction eventID ${event.eventId}")
|
||||
handleReaction(event, roomId, realm, userId, isLocalEcho)
|
||||
}
|
||||
EventType.MESSAGE -> {
|
||||
EventType.MESSAGE -> {
|
||||
if (event.unsignedData?.relations?.annotations != null) {
|
||||
Timber.v("###REACTION Agreggation in room $roomId for event ${event.eventId}")
|
||||
handleInitialAggregatedRelations(event, roomId, event.unsignedData.relations.annotations, realm)
|
||||
|
@ -99,33 +110,49 @@ internal class DefaultEventRelationsAggregationTask @Inject constructor(
|
|||
}
|
||||
}
|
||||
|
||||
EventType.ENCRYPTED -> {
|
||||
EventType.KEY_VERIFICATION_DONE,
|
||||
EventType.KEY_VERIFICATION_CANCEL,
|
||||
EventType.KEY_VERIFICATION_ACCEPT,
|
||||
EventType.KEY_VERIFICATION_START,
|
||||
EventType.KEY_VERIFICATION_MAC,
|
||||
EventType.KEY_VERIFICATION_KEY -> {
|
||||
Timber.v("## SAS REF in room $roomId for event ${event.eventId}")
|
||||
event.content.toModel<MessageRelationContent>()?.relatesTo?.let {
|
||||
if (it.type == RelationType.REFERENCE && it.eventId != null) {
|
||||
handleVerification(realm, event, roomId, isLocalEcho, it.eventId, userId)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
EventType.ENCRYPTED -> {
|
||||
// Relation type is in clear
|
||||
val encryptedEventContent = event.content.toModel<EncryptedEventContent>()
|
||||
if (encryptedEventContent?.relatesTo?.type == RelationType.REPLACE) {
|
||||
// we need to decrypt if needed
|
||||
if (event.mxDecryptionResult == null) {
|
||||
try {
|
||||
val result = cryptoService.decryptEvent(event, event.roomId)
|
||||
event.mxDecryptionResult = OlmDecryptionResult(
|
||||
payload = result.clearEvent,
|
||||
senderKey = result.senderCurve25519Key,
|
||||
keysClaimed = result.claimedEd25519Key?.let { k -> mapOf("ed25519" to k) },
|
||||
forwardingCurve25519KeyChain = result.forwardingCurve25519KeyChain
|
||||
)
|
||||
} catch (e: MXCryptoError) {
|
||||
Timber.w("Failed to decrypt e2e replace")
|
||||
// TODO -> we should keep track of this and retry, or aggregation will be broken
|
||||
}
|
||||
}
|
||||
decryptIfNeeded(event)
|
||||
event.getClearContent().toModel<MessageContent>()?.let {
|
||||
Timber.v("###REPLACE in room $roomId for event ${event.eventId}")
|
||||
// A replace!
|
||||
handleReplace(realm, event, it, roomId, isLocalEcho, encryptedEventContent.relatesTo.eventId)
|
||||
}
|
||||
} else if (encryptedEventContent?.relatesTo?.type == RelationType.REFERENCE) {
|
||||
decryptIfNeeded(event)
|
||||
when (event.getClearType()) {
|
||||
EventType.KEY_VERIFICATION_DONE,
|
||||
EventType.KEY_VERIFICATION_CANCEL,
|
||||
EventType.KEY_VERIFICATION_ACCEPT,
|
||||
EventType.KEY_VERIFICATION_START,
|
||||
EventType.KEY_VERIFICATION_MAC,
|
||||
EventType.KEY_VERIFICATION_KEY -> {
|
||||
Timber.v("## SAS REF in room $roomId for event ${event.eventId}")
|
||||
encryptedEventContent.relatesTo.eventId?.let {
|
||||
handleVerification(realm, event, roomId, isLocalEcho, it, userId)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
EventType.REDACTION -> {
|
||||
EventType.REDACTION -> {
|
||||
val eventToPrune = event.redacts?.let { EventEntity.where(realm, eventId = it).findFirst() }
|
||||
?: return@forEach
|
||||
when (eventToPrune.type) {
|
||||
|
@ -145,7 +172,7 @@ internal class DefaultEventRelationsAggregationTask @Inject constructor(
|
|||
}
|
||||
}
|
||||
}
|
||||
else -> Timber.v("UnHandled event ${event.eventId}")
|
||||
else -> Timber.v("UnHandled event ${event.eventId}")
|
||||
}
|
||||
} catch (t: Throwable) {
|
||||
Timber.e(t, "## Should not happen ")
|
||||
|
@ -153,16 +180,29 @@ internal class DefaultEventRelationsAggregationTask @Inject constructor(
|
|||
}
|
||||
}
|
||||
|
||||
private fun decryptIfNeeded(event: Event) {
|
||||
if (event.mxDecryptionResult == null) {
|
||||
try {
|
||||
val result = cryptoService.decryptEvent(event, event.roomId ?: "")
|
||||
event.mxDecryptionResult = OlmDecryptionResult(
|
||||
payload = result.clearEvent,
|
||||
senderKey = result.senderCurve25519Key,
|
||||
keysClaimed = result.claimedEd25519Key?.let { k -> mapOf("ed25519" to k) },
|
||||
forwardingCurve25519KeyChain = result.forwardingCurve25519KeyChain
|
||||
)
|
||||
} catch (e: MXCryptoError) {
|
||||
Timber.w("Failed to decrypt e2e replace")
|
||||
// TODO -> we should keep track of this and retry, or aggregation will be broken
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private fun handleReplace(realm: Realm, event: Event, content: MessageContent, roomId: String, isLocalEcho: Boolean, relatedEventId: String? = null) {
|
||||
val eventId = event.eventId ?: return
|
||||
val targetEventId = relatedEventId ?: content.relatesTo?.eventId ?: return
|
||||
val newContent = content.newContent ?: return
|
||||
// ok, this is a replace
|
||||
var existing = EventAnnotationsSummaryEntity.where(realm, targetEventId).findFirst()
|
||||
if (existing == null) {
|
||||
Timber.v("###REPLACE creating new relation summary for $targetEventId")
|
||||
existing = EventAnnotationsSummaryEntity.create(realm, roomId, targetEventId)
|
||||
}
|
||||
val existing = EventAnnotationsSummaryEntity.getOrCreate(realm, roomId, targetEventId)
|
||||
|
||||
// we have it
|
||||
val existingSummary = existing.editSummary
|
||||
|
@ -228,7 +268,8 @@ internal class DefaultEventRelationsAggregationTask @Inject constructor(
|
|||
val eventSummary = EventAnnotationsSummaryEntity.create(realm, roomId, eventId)
|
||||
val sum = realm.createObject(ReactionAggregatedSummaryEntity::class.java)
|
||||
sum.key = it.key
|
||||
sum.firstTimestamp = event.originServerTs ?: 0 // TODO how to maintain order?
|
||||
sum.firstTimestamp = event.originServerTs
|
||||
?: 0 // TODO how to maintain order?
|
||||
sum.count = it.count
|
||||
eventSummary.reactionsSummary.add(sum)
|
||||
} else {
|
||||
|
@ -251,8 +292,7 @@ internal class DefaultEventRelationsAggregationTask @Inject constructor(
|
|||
val relatedEventID = content.relatesTo.eventId
|
||||
val reactionEventId = event.eventId
|
||||
Timber.v("Reaction $reactionEventId relates to $relatedEventID")
|
||||
val eventSummary = EventAnnotationsSummaryEntity.where(realm, relatedEventID).findFirst()
|
||||
?: EventAnnotationsSummaryEntity.create(realm, roomId, relatedEventID).apply { this.roomId = roomId }
|
||||
val eventSummary = EventAnnotationsSummaryEntity.getOrCreate(realm, roomId, relatedEventID)
|
||||
|
||||
var sum = eventSummary.reactionsSummary.find { it.key == reaction }
|
||||
val txId = event.unsignedData?.transactionId
|
||||
|
@ -374,4 +414,58 @@ internal class DefaultEventRelationsAggregationTask @Inject constructor(
|
|||
Timber.e("## Cannot find summary for key $reactionKey")
|
||||
}
|
||||
}
|
||||
|
||||
private fun handleVerification(realm: Realm, event: Event, roomId: String, isLocalEcho: Boolean, relatedEventId: String, userId: String) {
|
||||
val eventSummary = EventAnnotationsSummaryEntity.getOrCreate(realm, roomId, relatedEventId)
|
||||
|
||||
val verifSummary = eventSummary.referencesSummaryEntity
|
||||
?: ReferencesAggregatedSummaryEntity.create(realm, relatedEventId).also {
|
||||
eventSummary.referencesSummaryEntity = it
|
||||
}
|
||||
|
||||
val txId = event.unsignedData?.transactionId
|
||||
|
||||
if (!isLocalEcho && verifSummary.sourceLocalEcho.contains(txId)) {
|
||||
// ok it has already been handled
|
||||
} else {
|
||||
ContentMapper.map(verifSummary.content)?.toModel<ReferencesAggregatedContent>()
|
||||
var data = ContentMapper.map(verifSummary.content)?.toModel<ReferencesAggregatedContent>()
|
||||
?: ReferencesAggregatedContent(VerificationState.REQUEST.name)
|
||||
// TODO ignore invalid messages? e.g a START after a CANCEL?
|
||||
// i.e. never change state if already canceled/done
|
||||
val newState = when (event.getClearType()) {
|
||||
EventType.KEY_VERIFICATION_START -> {
|
||||
VerificationState.WAITING
|
||||
}
|
||||
EventType.KEY_VERIFICATION_ACCEPT -> {
|
||||
VerificationState.WAITING
|
||||
}
|
||||
EventType.KEY_VERIFICATION_KEY -> {
|
||||
VerificationState.WAITING
|
||||
}
|
||||
EventType.KEY_VERIFICATION_MAC -> {
|
||||
VerificationState.WAITING
|
||||
}
|
||||
EventType.KEY_VERIFICATION_CANCEL -> {
|
||||
if (event.senderId == userId) {
|
||||
VerificationState.CANCELED_BY_ME
|
||||
} else VerificationState.CANCELED_BY_OTHER
|
||||
}
|
||||
EventType.KEY_VERIFICATION_DONE -> {
|
||||
VerificationState.DONE
|
||||
}
|
||||
else -> VerificationState.REQUEST
|
||||
}
|
||||
|
||||
data = data.copy(verificationSummary = newState.name)
|
||||
verifSummary.content = ContentMapper.map(data.toContent())
|
||||
}
|
||||
|
||||
if (isLocalEcho) {
|
||||
verifSummary.sourceLocalEcho.add(event.eventId)
|
||||
} else {
|
||||
verifSummary.sourceLocalEcho.remove(txId)
|
||||
verifSummary.sourceEvents.add(event.eventId)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -48,6 +48,12 @@ internal class EventRelationsAggregationUpdater @Inject constructor(@SessionData
|
|||
EventType.MESSAGE,
|
||||
EventType.REDACTION,
|
||||
EventType.REACTION,
|
||||
EventType.KEY_VERIFICATION_DONE,
|
||||
EventType.KEY_VERIFICATION_CANCEL,
|
||||
EventType.KEY_VERIFICATION_ACCEPT,
|
||||
EventType.KEY_VERIFICATION_START,
|
||||
EventType.KEY_VERIFICATION_MAC,
|
||||
EventType.KEY_VERIFICATION_KEY,
|
||||
EventType.ENCRYPTED)
|
||||
)
|
||||
}
|
||||
|
|
|
@ -93,14 +93,15 @@ internal class RoomSyncHandler @Inject constructor(private val monarchy: Monarch
|
|||
// PRIVATE METHODS *****************************************************************************
|
||||
|
||||
private fun handleRoomSync(realm: Realm, handlingStrategy: HandlingStrategy, isInitialSync: Boolean, reporter: DefaultInitialSyncProgressService?) {
|
||||
val syncLocalTimeStampMillis = System.currentTimeMillis()
|
||||
val rooms = when (handlingStrategy) {
|
||||
is HandlingStrategy.JOINED ->
|
||||
handlingStrategy.data.mapWithProgress(reporter, R.string.initial_sync_start_importing_account_joined_rooms, 0.6f) {
|
||||
handleJoinedRoom(realm, it.key, it.value, isInitialSync)
|
||||
handleJoinedRoom(realm, it.key, it.value, isInitialSync, syncLocalTimeStampMillis)
|
||||
}
|
||||
is HandlingStrategy.INVITED ->
|
||||
handlingStrategy.data.mapWithProgress(reporter, R.string.initial_sync_start_importing_account_invited_rooms, 0.1f) {
|
||||
handleInvitedRoom(realm, it.key, it.value)
|
||||
handleInvitedRoom(realm, it.key, it.value, syncLocalTimeStampMillis)
|
||||
}
|
||||
|
||||
is HandlingStrategy.LEFT -> {
|
||||
|
@ -115,7 +116,8 @@ internal class RoomSyncHandler @Inject constructor(private val monarchy: Monarch
|
|||
private fun handleJoinedRoom(realm: Realm,
|
||||
roomId: String,
|
||||
roomSync: RoomSync,
|
||||
isInitialSync: Boolean): RoomEntity {
|
||||
isInitialSync: Boolean,
|
||||
syncLocalTimestampMillis: Long): RoomEntity {
|
||||
Timber.v("Handle join sync for room $roomId")
|
||||
|
||||
if (roomSync.ephemeral != null && roomSync.ephemeral.events.isNotEmpty()) {
|
||||
|
@ -154,7 +156,8 @@ internal class RoomSyncHandler @Inject constructor(private val monarchy: Monarch
|
|||
roomEntity,
|
||||
roomSync.timeline.events,
|
||||
roomSync.timeline.prevToken,
|
||||
roomSync.timeline.limited
|
||||
roomSync.timeline.limited,
|
||||
syncLocalTimestampMillis
|
||||
)
|
||||
roomEntity.addOrUpdate(chunkEntity)
|
||||
}
|
||||
|
@ -170,12 +173,13 @@ internal class RoomSyncHandler @Inject constructor(private val monarchy: Monarch
|
|||
|
||||
private fun handleInvitedRoom(realm: Realm,
|
||||
roomId: String,
|
||||
roomSync: InvitedRoomSync): RoomEntity {
|
||||
roomSync: InvitedRoomSync,
|
||||
syncLocalTimestampMillis: Long): RoomEntity {
|
||||
Timber.v("Handle invited sync for room $roomId")
|
||||
val roomEntity = RoomEntity.where(realm, roomId).findFirst() ?: realm.createObject(roomId)
|
||||
roomEntity.membership = Membership.INVITE
|
||||
if (roomSync.inviteState != null && roomSync.inviteState.events.isNotEmpty()) {
|
||||
val chunkEntity = handleTimelineEvents(realm, roomEntity, roomSync.inviteState.events)
|
||||
val chunkEntity = handleTimelineEvents(realm, roomEntity, roomSync.inviteState.events, syncLocalTimestampMillis = syncLocalTimestampMillis)
|
||||
roomEntity.addOrUpdate(chunkEntity)
|
||||
}
|
||||
val hasRoomMember = roomSync.inviteState?.events?.firstOrNull {
|
||||
|
@ -200,7 +204,8 @@ internal class RoomSyncHandler @Inject constructor(private val monarchy: Monarch
|
|||
roomEntity: RoomEntity,
|
||||
eventList: List<Event>,
|
||||
prevToken: String? = null,
|
||||
isLimited: Boolean = true): ChunkEntity {
|
||||
isLimited: Boolean = true,
|
||||
syncLocalTimestampMillis: Long): ChunkEntity {
|
||||
val lastChunk = ChunkEntity.findLastLiveChunkFromRoom(realm, roomEntity.roomId)
|
||||
var stateIndexOffset = 0
|
||||
val chunkEntity = if (!isLimited && lastChunk != null) {
|
||||
|
@ -216,6 +221,7 @@ internal class RoomSyncHandler @Inject constructor(private val monarchy: Monarch
|
|||
|
||||
val eventIds = ArrayList<String>(eventList.size)
|
||||
for (event in eventList) {
|
||||
event.ageLocalTs = event.unsignedData?.age?.let { syncLocalTimestampMillis - it }
|
||||
event.eventId?.also { eventIds.add(it) }
|
||||
chunkEntity.add(roomEntity.roomId, event, PaginationDirection.FORWARDS, stateIndexOffset)
|
||||
// Give info to crypto module
|
||||
|
|
|
@ -86,5 +86,6 @@ internal class TaskExecutor @Inject constructor(private val coroutineDispatchers
|
|||
TaskThread.CALLER -> EmptyCoroutineContext
|
||||
TaskThread.CRYPTO -> coroutineDispatchers.crypto
|
||||
TaskThread.SYNC -> coroutineDispatchers.sync
|
||||
TaskThread.DM_VERIF -> coroutineDispatchers.dmVerif
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,5 +22,6 @@ internal enum class TaskThread {
|
|||
IO,
|
||||
CALLER,
|
||||
CRYPTO,
|
||||
SYNC
|
||||
SYNC,
|
||||
DM_VERIF
|
||||
}
|
||||
|
|
|
@ -23,5 +23,6 @@ internal data class MatrixCoroutineDispatchers(
|
|||
val computation: CoroutineDispatcher,
|
||||
val main: CoroutineDispatcher,
|
||||
val crypto: CoroutineDispatcher,
|
||||
val sync: CoroutineDispatcher
|
||||
val sync: CoroutineDispatcher,
|
||||
val dmVerif: CoroutineDispatcher
|
||||
)
|
||||
|
|
Loading…
Reference in a new issue