Introduce EventInsertEntity to handle db updates

This commit is contained in:
ganfra 2020-07-02 15:32:58 +02:00
parent 3db26bcae1
commit 2f6b38eb39
27 changed files with 617 additions and 844 deletions

View file

@ -1,161 +0,0 @@
/*
* Copyright 2019 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.matrix.android.internal.crypto.tasks
import im.vector.matrix.android.api.session.crypto.CryptoService
import im.vector.matrix.android.api.session.crypto.MXCryptoError
import im.vector.matrix.android.api.session.crypto.verification.VerificationService
import im.vector.matrix.android.api.session.events.model.Event
import im.vector.matrix.android.api.session.events.model.EventType
import im.vector.matrix.android.api.session.events.model.toModel
import im.vector.matrix.android.api.session.room.model.message.MessageContent
import im.vector.matrix.android.api.session.room.model.message.MessageRelationContent
import im.vector.matrix.android.api.session.room.model.message.MessageType
import im.vector.matrix.android.api.session.room.model.message.MessageVerificationReadyContent
import im.vector.matrix.android.api.session.room.model.message.MessageVerificationRequestContent
import im.vector.matrix.android.api.session.room.model.message.MessageVerificationStartContent
import im.vector.matrix.android.internal.crypto.algorithms.olm.OlmDecryptionResult
import im.vector.matrix.android.internal.crypto.verification.DefaultVerificationService
import im.vector.matrix.android.internal.di.DeviceId
import im.vector.matrix.android.internal.di.UserId
import im.vector.matrix.android.internal.task.Task
import timber.log.Timber
import java.util.ArrayList
import javax.inject.Inject
internal interface RoomVerificationUpdateTask : Task<RoomVerificationUpdateTask.Params, Unit> {
data class Params(
val events: List<Event>,
val verificationService: DefaultVerificationService,
val cryptoService: CryptoService
)
}
internal class DefaultRoomVerificationUpdateTask @Inject constructor(
@UserId private val userId: String,
@DeviceId private val deviceId: String?,
private val cryptoService: CryptoService) : RoomVerificationUpdateTask {
companion object {
// XXX what about multi-account?
private val transactionsHandledByOtherDevice = ArrayList<String>()
}
override suspend fun execute(params: RoomVerificationUpdateTask.Params) {
// TODO ignore initial sync or back pagination?
params.events.forEach { event ->
Timber.v("## SAS Verification live observer: received msgId: ${event.eventId} msgtype: ${event.type} from ${event.senderId}")
// If the request is in the future by more than 5 minutes or more than 10 minutes in the past,
// the message should be ignored by the receiver.
if (!VerificationService.isValidRequest(event.ageLocalTs
?: event.originServerTs)) return@forEach Unit.also {
Timber.d("## SAS Verification live observer: msgId: ${event.eventId} is outdated")
}
// decrypt if needed?
if (event.isEncrypted() && event.mxDecryptionResult == null) {
// TODO use a global event decryptor? attache to session and that listen to new sessionId?
// for now decrypt sync
try {
val result = cryptoService.decryptEvent(event, "")
event.mxDecryptionResult = OlmDecryptionResult(
payload = result.clearEvent,
senderKey = result.senderCurve25519Key,
keysClaimed = result.claimedEd25519Key?.let { mapOf("ed25519" to it) },
forwardingCurve25519KeyChain = result.forwardingCurve25519KeyChain
)
} catch (e: MXCryptoError) {
Timber.e("## SAS Failed to decrypt event: ${event.eventId}")
params.verificationService.onPotentiallyInterestingEventRoomFailToDecrypt(event)
}
}
Timber.v("## SAS Verification live observer: received msgId: ${event.eventId} type: ${event.getClearType()}")
// Relates to is not encrypted
val relatesToEventId = event.content.toModel<MessageRelationContent>()?.relatesTo?.eventId
if (event.senderId == userId) {
// If it's send from me, we need to keep track of Requests or Start
// done from another device of mine
if (EventType.MESSAGE == event.getClearType()) {
val msgType = event.getClearContent().toModel<MessageContent>()?.msgType
if (MessageType.MSGTYPE_VERIFICATION_REQUEST == msgType) {
event.getClearContent().toModel<MessageVerificationRequestContent>()?.let {
if (it.fromDevice != deviceId) {
// The verification is requested from another device
Timber.v("## SAS Verification live observer: Transaction requested from other device tid:${event.eventId} ")
event.eventId?.let { txId -> transactionsHandledByOtherDevice.add(txId) }
}
}
}
} else if (EventType.KEY_VERIFICATION_START == event.getClearType()) {
event.getClearContent().toModel<MessageVerificationStartContent>()?.let {
if (it.fromDevice != deviceId) {
// The verification is started from another device
Timber.v("## SAS Verification live observer: Transaction started by other device tid:$relatesToEventId ")
relatesToEventId?.let { txId -> transactionsHandledByOtherDevice.add(txId) }
params.verificationService.onRoomRequestHandledByOtherDevice(event)
}
}
} else if (EventType.KEY_VERIFICATION_READY == event.getClearType()) {
event.getClearContent().toModel<MessageVerificationReadyContent>()?.let {
if (it.fromDevice != deviceId) {
// The verification is started from another device
Timber.v("## SAS Verification live observer: Transaction started by other device tid:$relatesToEventId ")
relatesToEventId?.let { txId -> transactionsHandledByOtherDevice.add(txId) }
params.verificationService.onRoomRequestHandledByOtherDevice(event)
}
}
} else if (EventType.KEY_VERIFICATION_CANCEL == event.getClearType() || EventType.KEY_VERIFICATION_DONE == event.getClearType()) {
relatesToEventId?.let {
transactionsHandledByOtherDevice.remove(it)
params.verificationService.onRoomRequestHandledByOtherDevice(event)
}
}
Timber.v("## SAS Verification ignoring message sent by me: ${event.eventId} type: ${event.getClearType()}")
return@forEach
}
if (relatesToEventId != null && transactionsHandledByOtherDevice.contains(relatesToEventId)) {
// Ignore this event, it is directed to another of my devices
Timber.v("## SAS Verification live observer: Ignore Transaction handled by other device tid:$relatesToEventId ")
return@forEach
}
when (event.getClearType()) {
EventType.KEY_VERIFICATION_START,
EventType.KEY_VERIFICATION_ACCEPT,
EventType.KEY_VERIFICATION_KEY,
EventType.KEY_VERIFICATION_MAC,
EventType.KEY_VERIFICATION_CANCEL,
EventType.KEY_VERIFICATION_READY,
EventType.KEY_VERIFICATION_DONE -> {
params.verificationService.onRoomEvent(event)
}
EventType.MESSAGE -> {
if (MessageType.MSGTYPE_VERIFICATION_REQUEST == event.getClearContent().toModel<MessageContent>()?.msgType) {
params.verificationService.onRoomRequestReceived(event)
}
}
}
}
}
}

View file

@ -15,34 +15,38 @@
*/ */
package im.vector.matrix.android.internal.crypto.verification package im.vector.matrix.android.internal.crypto.verification
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.api.session.crypto.CryptoService import im.vector.matrix.android.api.session.crypto.CryptoService
import im.vector.matrix.android.api.session.crypto.MXCryptoError
import im.vector.matrix.android.api.session.crypto.verification.VerificationService
import im.vector.matrix.android.api.session.events.model.Event
import im.vector.matrix.android.api.session.events.model.EventType import im.vector.matrix.android.api.session.events.model.EventType
import im.vector.matrix.android.api.session.events.model.LocalEcho import im.vector.matrix.android.api.session.events.model.LocalEcho
import im.vector.matrix.android.internal.crypto.tasks.DefaultRoomVerificationUpdateTask import im.vector.matrix.android.api.session.events.model.toModel
import im.vector.matrix.android.internal.crypto.tasks.RoomVerificationUpdateTask import im.vector.matrix.android.api.session.room.model.message.MessageContent
import im.vector.matrix.android.internal.database.RealmLiveEntityObserver import im.vector.matrix.android.api.session.room.model.message.MessageRelationContent
import im.vector.matrix.android.internal.database.mapper.asDomain import im.vector.matrix.android.api.session.room.model.message.MessageType
import im.vector.matrix.android.internal.database.model.EventEntity import im.vector.matrix.android.api.session.room.model.message.MessageVerificationReadyContent
import im.vector.matrix.android.internal.database.query.whereTypes import im.vector.matrix.android.api.session.room.model.message.MessageVerificationRequestContent
import im.vector.matrix.android.internal.di.SessionDatabase import im.vector.matrix.android.api.session.room.model.message.MessageVerificationStartContent
import im.vector.matrix.android.internal.task.TaskExecutor import im.vector.matrix.android.internal.crypto.algorithms.olm.OlmDecryptionResult
import im.vector.matrix.android.internal.task.configureWith import im.vector.matrix.android.internal.di.DeviceId
import io.realm.OrderedCollectionChangeSet import im.vector.matrix.android.internal.di.UserId
import io.realm.RealmConfiguration import im.vector.matrix.android.internal.session.EventInsertLiveProcessor
import io.realm.RealmResults import io.realm.Realm
import timber.log.Timber
import java.util.ArrayList
import javax.inject.Inject import javax.inject.Inject
internal class VerificationMessageLiveObserver @Inject constructor( internal class VerificationMessageProcessor @Inject constructor(
@SessionDatabase realmConfiguration: RealmConfiguration,
private val roomVerificationUpdateTask: DefaultRoomVerificationUpdateTask,
private val cryptoService: CryptoService, private val cryptoService: CryptoService,
private val verificationService: DefaultVerificationService, private val verificationService: DefaultVerificationService,
private val taskExecutor: TaskExecutor @UserId private val userId: String,
) : RealmLiveEntityObserver<EventEntity>(realmConfiguration) { @DeviceId private val deviceId: String?
) : EventInsertLiveProcessor {
override val query = Monarchy.Query { private val transactionsHandledByOtherDevice = ArrayList<String>()
EventEntity.whereTypes(it, listOf(
private val allowedTypes = listOf(
EventType.KEY_VERIFICATION_START, EventType.KEY_VERIFICATION_START,
EventType.KEY_VERIFICATION_ACCEPT, EventType.KEY_VERIFICATION_ACCEPT,
EventType.KEY_VERIFICATION_KEY, EventType.KEY_VERIFICATION_KEY,
@ -51,23 +55,110 @@ internal class VerificationMessageLiveObserver @Inject constructor(
EventType.KEY_VERIFICATION_DONE, EventType.KEY_VERIFICATION_DONE,
EventType.KEY_VERIFICATION_READY, EventType.KEY_VERIFICATION_READY,
EventType.MESSAGE, EventType.MESSAGE,
EventType.ENCRYPTED) EventType.ENCRYPTED
) )
override fun shouldProcess(eventId: String, eventType: String): Boolean {
return allowedTypes.contains(eventType) && !LocalEcho.isLocalEchoId(eventId)
} }
override fun onChange(results: RealmResults<EventEntity>, changeSet: OrderedCollectionChangeSet) { override suspend fun process(realm: Realm, event: Event) {
// Should we ignore when it's an initial sync? Timber.v("## SAS Verification live observer: received msgId: ${event.eventId} msgtype: ${event.type} from ${event.senderId}")
val events = changeSet.insertions
.asSequence()
.mapNotNull { results[it]?.asDomain() }
.filterNot {
// ignore local echos
LocalEcho.isLocalEchoId(it.eventId ?: "")
}
.toList()
roomVerificationUpdateTask.configureWith( // If the request is in the future by more than 5 minutes or more than 10 minutes in the past,
RoomVerificationUpdateTask.Params(events, verificationService, cryptoService) // the message should be ignored by the receiver.
).executeBy(taskExecutor)
if (!VerificationService.isValidRequest(event.ageLocalTs
?: event.originServerTs)) return Unit.also {
Timber.d("## SAS Verification live observer: msgId: ${event.eventId} is outdated")
}
// decrypt if needed?
if (event.isEncrypted() && event.mxDecryptionResult == null) {
// TODO use a global event decryptor? attache to session and that listen to new sessionId?
// for now decrypt sync
try {
val result = cryptoService.decryptEvent(event, "")
event.mxDecryptionResult = OlmDecryptionResult(
payload = result.clearEvent,
senderKey = result.senderCurve25519Key,
keysClaimed = result.claimedEd25519Key?.let { mapOf("ed25519" to it) },
forwardingCurve25519KeyChain = result.forwardingCurve25519KeyChain
)
} catch (e: MXCryptoError) {
Timber.e("## SAS Failed to decrypt event: ${event.eventId}")
verificationService.onPotentiallyInterestingEventRoomFailToDecrypt(event)
}
}
Timber.v("## SAS Verification live observer: received msgId: ${event.eventId} type: ${event.getClearType()}")
// Relates to is not encrypted
val relatesToEventId = event.content.toModel<MessageRelationContent>()?.relatesTo?.eventId
if (event.senderId == userId) {
// If it's send from me, we need to keep track of Requests or Start
// done from another device of mine
if (EventType.MESSAGE == event.getClearType()) {
val msgType = event.getClearContent().toModel<MessageContent>()?.msgType
if (MessageType.MSGTYPE_VERIFICATION_REQUEST == msgType) {
event.getClearContent().toModel<MessageVerificationRequestContent>()?.let {
if (it.fromDevice != deviceId) {
// The verification is requested from another device
Timber.v("## SAS Verification live observer: Transaction requested from other device tid:${event.eventId} ")
event.eventId?.let { txId -> transactionsHandledByOtherDevice.add(txId) }
}
}
}
} else if (EventType.KEY_VERIFICATION_START == event.getClearType()) {
event.getClearContent().toModel<MessageVerificationStartContent>()?.let {
if (it.fromDevice != deviceId) {
// The verification is started from another device
Timber.v("## SAS Verification live observer: Transaction started by other device tid:$relatesToEventId ")
relatesToEventId?.let { txId -> transactionsHandledByOtherDevice.add(txId) }
verificationService.onRoomRequestHandledByOtherDevice(event)
}
}
} else if (EventType.KEY_VERIFICATION_READY == event.getClearType()) {
event.getClearContent().toModel<MessageVerificationReadyContent>()?.let {
if (it.fromDevice != deviceId) {
// The verification is started from another device
Timber.v("## SAS Verification live observer: Transaction started by other device tid:$relatesToEventId ")
relatesToEventId?.let { txId -> transactionsHandledByOtherDevice.add(txId) }
verificationService.onRoomRequestHandledByOtherDevice(event)
}
}
} else if (EventType.KEY_VERIFICATION_CANCEL == event.getClearType() || EventType.KEY_VERIFICATION_DONE == event.getClearType()) {
relatesToEventId?.let {
transactionsHandledByOtherDevice.remove(it)
verificationService.onRoomRequestHandledByOtherDevice(event)
}
}
Timber.v("## SAS Verification ignoring message sent by me: ${event.eventId} type: ${event.getClearType()}")
return
}
if (relatesToEventId != null && transactionsHandledByOtherDevice.contains(relatesToEventId)) {
// Ignore this event, it is directed to another of my devices
Timber.v("## SAS Verification live observer: Ignore Transaction handled by other device tid:$relatesToEventId ")
return
}
when (event.getClearType()) {
EventType.KEY_VERIFICATION_START,
EventType.KEY_VERIFICATION_ACCEPT,
EventType.KEY_VERIFICATION_KEY,
EventType.KEY_VERIFICATION_MAC,
EventType.KEY_VERIFICATION_CANCEL,
EventType.KEY_VERIFICATION_READY,
EventType.KEY_VERIFICATION_DONE -> {
verificationService.onRoomEvent(event)
}
EventType.MESSAGE -> {
if (MessageType.MSGTYPE_VERIFICATION_REQUEST == event.getClearContent().toModel<MessageContent>()?.msgType) {
verificationService.onRoomRequestReceived(event)
}
}
}
} }
} }

View file

@ -0,0 +1,101 @@
/*
* Copyright (c) 2020 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.matrix.android.internal.database
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.api.session.crypto.CryptoService
import im.vector.matrix.android.api.session.crypto.MXCryptoError
import im.vector.matrix.android.api.session.events.model.Event
import im.vector.matrix.android.internal.crypto.algorithms.olm.OlmDecryptionResult
import im.vector.matrix.android.internal.database.mapper.asDomain
import im.vector.matrix.android.internal.database.model.EventEntity
import im.vector.matrix.android.internal.database.model.EventInsertEntity
import im.vector.matrix.android.internal.database.query.where
import im.vector.matrix.android.internal.di.SessionDatabase
import im.vector.matrix.android.internal.session.EventInsertLiveProcessor
import io.realm.OrderedCollectionChangeSet
import io.realm.RealmConfiguration
import io.realm.RealmResults
import kotlinx.coroutines.launch
import timber.log.Timber
import javax.inject.Inject
internal class EventInsertLiveObserver @Inject constructor(@SessionDatabase realmConfiguration: RealmConfiguration,
private val processors: Set<@JvmSuppressWildcards EventInsertLiveProcessor>,
private val cryptoService: CryptoService)
: RealmLiveEntityObserver<EventInsertEntity>(realmConfiguration) {
override val query = Monarchy.Query<EventInsertEntity> {
it.where(EventInsertEntity::class.java)
}
override fun onChange(results: RealmResults<EventInsertEntity>) {
if (!results.isLoaded || results.isEmpty()) {
return
}
Timber.v("EventInsertEntity updated with ${results.size} results in db")
val filteredEventIds = results.mapNotNull {
val shouldProcess = shouldProcess(it)
if (shouldProcess) {
it.eventId
} else {
null
}
}
Timber.v("There are ${filteredEventIds.size} events to process")
observerScope.launch {
awaitTransaction(realmConfiguration) { realm ->
filteredEventIds.forEach { eventId ->
val event = EventEntity.where(realm, eventId).findFirst()
if (event == null) {
Timber.v("Event $eventId not found")
return@forEach
}
val domainEvent = event.asDomain()
decryptIfNeeded(domainEvent)
processors.forEach {
it.process(realm, domainEvent)
}
}
realm.where(EventInsertEntity::class.java).findAll().deleteAllFromRealm()
}
}
}
private fun decryptIfNeeded(event: Event) {
if (event.isEncrypted() && event.mxDecryptionResult == null) {
try {
val result = cryptoService.decryptEvent(event, event.roomId ?: "")
event.mxDecryptionResult = OlmDecryptionResult(
payload = result.clearEvent,
senderKey = result.senderCurve25519Key,
keysClaimed = result.claimedEd25519Key?.let { k -> mapOf("ed25519" to k) },
forwardingCurve25519KeyChain = result.forwardingCurve25519KeyChain
)
} catch (e: MXCryptoError) {
Timber.v("Call service: Failed to decrypt event")
// TODO -> we should keep track of this and retry, or aggregation will be broken
}
}
}
private fun shouldProcess(eventInsertEntity: EventInsertEntity): Boolean {
return processors.any {
it.shouldProcess(eventInsertEntity.eventId, eventInsertEntity.eventType)
}
}
}

View file

@ -21,6 +21,7 @@ import im.vector.matrix.android.internal.session.SessionLifecycleObserver
import im.vector.matrix.android.internal.util.createBackgroundHandler import im.vector.matrix.android.internal.util.createBackgroundHandler
import io.realm.OrderedRealmCollectionChangeListener import io.realm.OrderedRealmCollectionChangeListener
import io.realm.Realm import io.realm.Realm
import io.realm.RealmChangeListener
import io.realm.RealmConfiguration import io.realm.RealmConfiguration
import io.realm.RealmObject import io.realm.RealmObject
import io.realm.RealmResults import io.realm.RealmResults
@ -33,7 +34,7 @@ import java.util.concurrent.atomic.AtomicReference
internal interface LiveEntityObserver: SessionLifecycleObserver internal interface LiveEntityObserver: SessionLifecycleObserver
internal abstract class RealmLiveEntityObserver<T : RealmObject>(protected val realmConfiguration: RealmConfiguration) internal abstract class RealmLiveEntityObserver<T : RealmObject>(protected val realmConfiguration: RealmConfiguration)
: LiveEntityObserver, OrderedRealmCollectionChangeListener<RealmResults<T>> { : LiveEntityObserver, RealmChangeListener<RealmResults<T>> {
private companion object { private companion object {
val BACKGROUND_HANDLER = createBackgroundHandler("LIVE_ENTITY_BACKGROUND") val BACKGROUND_HANDLER = createBackgroundHandler("LIVE_ENTITY_BACKGROUND")

View file

@ -45,7 +45,6 @@ internal object EventMapper {
eventEntity.redacts = event.redacts eventEntity.redacts = event.redacts
eventEntity.age = event.unsignedData?.age ?: event.originServerTs eventEntity.age = event.unsignedData?.age ?: event.originServerTs
eventEntity.unsignedData = uds eventEntity.unsignedData = uds
eventEntity.decryptionResultJson = event.mxDecryptionResult?.let { eventEntity.decryptionResultJson = event.mxDecryptionResult?.let {
MoshiProvider.providesMoshi().adapter<OlmDecryptionResult>(OlmDecryptionResult::class.java).toJson(it) MoshiProvider.providesMoshi().adapter<OlmDecryptionResult>(OlmDecryptionResult::class.java).toJson(it)
} }

View file

@ -0,0 +1,28 @@
/*
* Copyright (c) 2020 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.matrix.android.internal.database.model
import io.realm.RealmObject
import io.realm.annotations.Index
internal open class EventInsertEntity(var eventId: String = "",
var eventType: String = ""
) : RealmObject() {
companion object
}

View file

@ -25,6 +25,7 @@ import io.realm.annotations.RealmModule
classes = [ classes = [
ChunkEntity::class, ChunkEntity::class,
EventEntity::class, EventEntity::class,
EventInsertEntity::class,
TimelineEventEntity::class, TimelineEventEntity::class,
FilterEntity::class, FilterEntity::class,
GroupEntity::class, GroupEntity::class,

View file

@ -18,16 +18,25 @@ package im.vector.matrix.android.internal.database.query
import im.vector.matrix.android.internal.database.model.EventEntity import im.vector.matrix.android.internal.database.model.EventEntity
import im.vector.matrix.android.internal.database.model.EventEntityFields import im.vector.matrix.android.internal.database.model.EventEntityFields
import im.vector.matrix.android.internal.database.model.EventInsertEntity
import io.realm.Realm import io.realm.Realm
import io.realm.RealmList import io.realm.RealmList
import io.realm.RealmQuery import io.realm.RealmQuery
import io.realm.kotlin.where import io.realm.kotlin.where
internal fun EventEntity.copyToRealmOrIgnore(realm: Realm): EventEntity { internal fun EventEntity.copyToRealmOrIgnore(realm: Realm): EventEntity {
return realm.where<EventEntity>() val eventEntity = realm.where<EventEntity>()
.equalTo(EventEntityFields.EVENT_ID, eventId) .equalTo(EventEntityFields.EVENT_ID, eventId)
.equalTo(EventEntityFields.ROOM_ID, roomId) .equalTo(EventEntityFields.ROOM_ID, roomId)
.findFirst() ?: realm.copyToRealm(this) .findFirst()
return if (eventEntity == null) {
val insertEntity = EventInsertEntity(eventId = eventId, eventType = type)
realm.insert(insertEntity)
// copy this event entity and return it
realm.copyToRealm(this)
} else {
eventEntity
}
} }
internal fun EventEntity.Companion.where(realm: Realm, eventId: String): RealmQuery<EventEntity> { internal fun EventEntity.Companion.where(realm: Realm, eventId: String): RealmQuery<EventEntity> {

View file

@ -166,6 +166,7 @@ internal class DefaultSession @Inject constructor(
} }
eventBus.register(this) eventBus.register(this)
timelineEventDecryptor.start() timelineEventDecryptor.start()
taskExecutor.executorScope.launch(Dispatchers.Default) { taskExecutor.executorScope.launch(Dispatchers.Default) {
awaitTransaction(realmConfiguration) { realm -> awaitTransaction(realmConfiguration) { realm ->
val allRooms = realm.where(RoomEntity::class.java).findAll() val allRooms = realm.where(RoomEntity::class.java).findAll()
@ -176,9 +177,9 @@ internal class DefaultSession @Inject constructor(
if (numberOfTimelineEvents < 30_000L) { if (numberOfTimelineEvents < 30_000L) {
Timber.v("Db is low enough") Timber.v("Db is low enough")
} else { } else {
val hugeChunks = realm.where(ChunkEntity::class.java).greaterThan(ChunkEntityFields.NUMBER_OF_TIMELINE_EVENTS, 250).findAll() val hugeChunks = realm.where(ChunkEntity::class.java).greaterThan(ChunkEntityFields.NUMBER_OF_TIMELINE_EVENTS, 250).findAll()
Timber.v("There are ${hugeChunks.size} chunks to clean") Timber.v("There are ${hugeChunks.size} chunks to clean")
/*
for (chunk in hugeChunks) { for (chunk in hugeChunks) {
val maxDisplayIndex = chunk.nextDisplayIndex(PaginationDirection.FORWARDS) val maxDisplayIndex = chunk.nextDisplayIndex(PaginationDirection.FORWARDS)
val thresholdDisplayIndex = maxDisplayIndex - 250 val thresholdDisplayIndex = maxDisplayIndex - 250
@ -203,8 +204,9 @@ internal class DefaultSession @Inject constructor(
it.deleteFromRealm() it.deleteFromRealm()
} }
} }
}
*/
}
} }
} }
} }

View file

@ -0,0 +1,27 @@
/*
* Copyright (c) 2020 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.matrix.android.internal.session
import im.vector.matrix.android.api.session.events.model.Event
import io.realm.Realm
internal interface EventInsertLiveProcessor {
fun shouldProcess(eventId: String, eventType: String): Boolean
suspend fun process(realm: Realm, event: Event)
}

View file

@ -39,7 +39,8 @@ import im.vector.matrix.android.api.session.securestorage.SharedSecretStorageSer
import im.vector.matrix.android.api.session.typing.TypingUsersTracker import im.vector.matrix.android.api.session.typing.TypingUsersTracker
import im.vector.matrix.android.internal.crypto.crosssigning.ShieldTrustUpdater import im.vector.matrix.android.internal.crypto.crosssigning.ShieldTrustUpdater
import im.vector.matrix.android.internal.crypto.secrets.DefaultSharedSecretStorageService import im.vector.matrix.android.internal.crypto.secrets.DefaultSharedSecretStorageService
import im.vector.matrix.android.internal.crypto.verification.VerificationMessageLiveObserver import im.vector.matrix.android.internal.crypto.verification.VerificationMessageProcessor
import im.vector.matrix.android.internal.database.EventInsertLiveObserver
import im.vector.matrix.android.internal.database.SessionRealmConfigurationFactory import im.vector.matrix.android.internal.database.SessionRealmConfigurationFactory
import im.vector.matrix.android.internal.di.Authenticated import im.vector.matrix.android.internal.di.Authenticated
import im.vector.matrix.android.internal.di.DeviceId import im.vector.matrix.android.internal.di.DeviceId
@ -64,16 +65,16 @@ import im.vector.matrix.android.internal.network.httpclient.addSocketFactory
import im.vector.matrix.android.internal.network.interceptors.CurlLoggingInterceptor import im.vector.matrix.android.internal.network.interceptors.CurlLoggingInterceptor
import im.vector.matrix.android.internal.network.token.AccessTokenProvider import im.vector.matrix.android.internal.network.token.AccessTokenProvider
import im.vector.matrix.android.internal.network.token.HomeserverAccessTokenProvider import im.vector.matrix.android.internal.network.token.HomeserverAccessTokenProvider
import im.vector.matrix.android.internal.session.call.CallEventObserver import im.vector.matrix.android.internal.session.call.CallEventProcessor
import im.vector.matrix.android.internal.session.download.DownloadProgressInterceptor import im.vector.matrix.android.internal.session.download.DownloadProgressInterceptor
import im.vector.matrix.android.internal.session.group.GroupSummaryUpdater import im.vector.matrix.android.internal.session.group.GroupSummaryUpdater
import im.vector.matrix.android.internal.session.homeserver.DefaultHomeServerCapabilitiesService import im.vector.matrix.android.internal.session.homeserver.DefaultHomeServerCapabilitiesService
import im.vector.matrix.android.internal.session.identity.DefaultIdentityService import im.vector.matrix.android.internal.session.identity.DefaultIdentityService
import im.vector.matrix.android.internal.session.integrationmanager.IntegrationManager import im.vector.matrix.android.internal.session.integrationmanager.IntegrationManager
import im.vector.matrix.android.internal.session.room.EventRelationsAggregationUpdater import im.vector.matrix.android.internal.session.room.EventRelationsAggregationProcessor
import im.vector.matrix.android.internal.session.room.create.RoomCreateEventLiveObserver import im.vector.matrix.android.internal.session.room.create.RoomCreateEventProcessor
import im.vector.matrix.android.internal.session.room.prune.EventsPruner import im.vector.matrix.android.internal.session.room.prune.RedactionEventProcessor
import im.vector.matrix.android.internal.session.room.tombstone.RoomTombstoneEventLiveObserver import im.vector.matrix.android.internal.session.room.tombstone.RoomTombstoneEventProcessor
import im.vector.matrix.android.internal.session.securestorage.DefaultSecureStorageService import im.vector.matrix.android.internal.session.securestorage.DefaultSecureStorageService
import im.vector.matrix.android.internal.session.typing.DefaultTypingUsersTracker import im.vector.matrix.android.internal.session.typing.DefaultTypingUsersTracker
import im.vector.matrix.android.internal.session.user.accountdata.DefaultAccountDataService import im.vector.matrix.android.internal.session.user.accountdata.DefaultAccountDataService
@ -297,27 +298,31 @@ internal abstract class SessionModule {
@Binds @Binds
@IntoSet @IntoSet
abstract fun bindEventsPruner(pruner: EventsPruner): SessionLifecycleObserver abstract fun bindEventRedactionProcessor(processor: RedactionEventProcessor): EventInsertLiveProcessor
@Binds @Binds
@IntoSet @IntoSet
abstract fun bindEventRelationsAggregationUpdater(updater: EventRelationsAggregationUpdater): SessionLifecycleObserver abstract fun bindEventRelationsAggregationProcessor(processor: EventRelationsAggregationProcessor): EventInsertLiveProcessor
@Binds @Binds
@IntoSet @IntoSet
abstract fun bindRoomTombstoneEventLiveObserver(observer: RoomTombstoneEventLiveObserver): SessionLifecycleObserver abstract fun bindRoomTombstoneEventProcessor(processor: RoomTombstoneEventProcessor): EventInsertLiveProcessor
@Binds @Binds
@IntoSet @IntoSet
abstract fun bindRoomCreateEventLiveObserver(observer: RoomCreateEventLiveObserver): SessionLifecycleObserver abstract fun bindRoomCreateEventProcessor(processor: RoomCreateEventProcessor): EventInsertLiveProcessor
@Binds @Binds
@IntoSet @IntoSet
abstract fun bindVerificationMessageLiveObserver(observer: VerificationMessageLiveObserver): SessionLifecycleObserver abstract fun bindVerificationMessageProcessor(processor: VerificationMessageProcessor): EventInsertLiveProcessor
@Binds @Binds
@IntoSet @IntoSet
abstract fun bindCallEventObserver(observer: CallEventObserver): SessionLifecycleObserver abstract fun bindCallEventProcessor(processor: CallEventProcessor): EventInsertLiveProcessor
@Binds
@IntoSet
abstract fun bindEventInsertObserver(observer: EventInsertLiveObserver): SessionLifecycleObserver
@Binds @Binds
@IntoSet @IntoSet

View file

@ -1,66 +0,0 @@
/*
* Copyright (c) 2020 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.matrix.android.internal.session.call
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.api.session.events.model.EventType
import im.vector.matrix.android.internal.database.RealmLiveEntityObserver
import im.vector.matrix.android.internal.database.mapper.asDomain
import im.vector.matrix.android.internal.database.model.EventEntity
import im.vector.matrix.android.internal.database.query.whereTypes
import im.vector.matrix.android.internal.di.SessionDatabase
import im.vector.matrix.android.internal.di.UserId
import io.realm.OrderedCollectionChangeSet
import io.realm.RealmConfiguration
import io.realm.RealmResults
import kotlinx.coroutines.launch
import timber.log.Timber
import javax.inject.Inject
internal class CallEventObserver @Inject constructor(
@SessionDatabase realmConfiguration: RealmConfiguration,
@UserId private val userId: String,
private val task: CallEventsObserverTask
) : RealmLiveEntityObserver<EventEntity>(realmConfiguration) {
override val query = Monarchy.Query<EventEntity> {
EventEntity.whereTypes(it, listOf(
EventType.CALL_ANSWER,
EventType.CALL_CANDIDATES,
EventType.CALL_INVITE,
EventType.CALL_HANGUP,
EventType.ENCRYPTED)
)
}
override fun onChange(results: RealmResults<EventEntity>, changeSet: OrderedCollectionChangeSet) {
Timber.v("EventRelationsAggregationUpdater called with ${changeSet.insertions.size} insertions")
val insertedDomains = changeSet.insertions
.asSequence()
.mapNotNull { results[it]?.asDomain() }
.toList()
val params = CallEventsObserverTask.Params(
insertedDomains,
userId
)
observerScope.launch {
task.execute(params)
}
}
}

View file

@ -0,0 +1,65 @@
/*
* Copyright (c) 2020 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.matrix.android.internal.session.call
import im.vector.matrix.android.api.session.events.model.Event
import im.vector.matrix.android.api.session.events.model.EventType
import im.vector.matrix.android.internal.di.UserId
import im.vector.matrix.android.internal.session.EventInsertLiveProcessor
import io.realm.Realm
import timber.log.Timber
import javax.inject.Inject
internal class CallEventProcessor @Inject constructor(
@UserId private val userId: String,
private val callService: DefaultCallSignalingService
) : EventInsertLiveProcessor {
private val allowedTypes = listOf(
EventType.CALL_ANSWER,
EventType.CALL_CANDIDATES,
EventType.CALL_INVITE,
EventType.CALL_HANGUP,
EventType.ENCRYPTED
)
override fun shouldProcess(eventId: String, eventType: String): Boolean {
return allowedTypes.contains(eventType)
}
override suspend fun process(realm: Realm, event: Event) {
update(realm, event)
}
private fun update(realm: Realm, event: Event) {
val now = System.currentTimeMillis()
// TODO might check if an invite is not closed (hangup/answsered) in the same event batch?
event.roomId ?: return Unit.also {
Timber.w("Event with no room id ${event.eventId}")
}
val age = now - (event.ageLocalTs ?: now)
if (age > 40_000) {
// To old to ring?
return
}
event.ageLocalTs
if (EventType.isCallEvent(event.getClearType())) {
callService.onCallEvent(event)
}
Timber.v("$realm : $userId")
}
}

View file

@ -1,92 +0,0 @@
/*
* Copyright (c) 2020 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.matrix.android.internal.session.call
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.api.session.crypto.CryptoService
import im.vector.matrix.android.api.session.crypto.MXCryptoError
import im.vector.matrix.android.api.session.events.model.Event
import im.vector.matrix.android.api.session.events.model.EventType
import im.vector.matrix.android.internal.crypto.algorithms.olm.OlmDecryptionResult
import im.vector.matrix.android.internal.di.SessionDatabase
import im.vector.matrix.android.internal.task.Task
import im.vector.matrix.android.internal.util.awaitTransaction
import io.realm.Realm
import timber.log.Timber
import javax.inject.Inject
internal interface CallEventsObserverTask : Task<CallEventsObserverTask.Params, Unit> {
data class Params(
val events: List<Event>,
val userId: String
)
}
internal class DefaultCallEventsObserverTask @Inject constructor(
@SessionDatabase private val monarchy: Monarchy,
private val cryptoService: CryptoService,
private val callService: DefaultCallSignalingService) : CallEventsObserverTask {
override suspend fun execute(params: CallEventsObserverTask.Params) {
val events = params.events
val userId = params.userId
monarchy.awaitTransaction { realm ->
Timber.v(">>> DefaultCallEventsObserverTask[${params.hashCode()}] called with ${events.size} events")
update(realm, events, userId)
Timber.v("<<< DefaultCallEventsObserverTask[${params.hashCode()}] finished")
}
}
private fun update(realm: Realm, events: List<Event>, userId: String) {
val now = System.currentTimeMillis()
// TODO might check if an invite is not closed (hangup/answsered) in the same event batch?
events.forEach { event ->
event.roomId ?: return@forEach Unit.also {
Timber.w("Event with no room id ${event.eventId}")
}
val age = now - (event.ageLocalTs ?: now)
if (age > 40_000) {
// To old to ring?
return@forEach
}
event.ageLocalTs
decryptIfNeeded(event)
if (EventType.isCallEvent(event.getClearType())) {
callService.onCallEvent(event)
}
}
Timber.v("$realm : $userId")
}
private fun decryptIfNeeded(event: Event) {
if (event.isEncrypted() && event.mxDecryptionResult == null) {
try {
val result = cryptoService.decryptEvent(event, event.roomId ?: "")
event.mxDecryptionResult = OlmDecryptionResult(
payload = result.clearEvent,
senderKey = result.senderCurve25519Key,
keysClaimed = result.claimedEd25519Key?.let { k -> mapOf("ed25519" to k) },
forwardingCurve25519KeyChain = result.forwardingCurve25519KeyChain
)
} catch (e: MXCryptoError) {
Timber.v("Call service: Failed to decrypt event")
// TODO -> we should keep track of this and retry, or aggregation will be broken
}
}
}
}

View file

@ -42,6 +42,4 @@ internal abstract class CallModule {
@Binds @Binds
abstract fun bindGetTurnServerTask(task: DefaultGetTurnServerTask): GetTurnServerTask abstract fun bindGetTurnServerTask(task: DefaultGetTurnServerTask): GetTurnServerTask
@Binds
abstract fun bindCallEventsObserverTask(task: DefaultCallEventsObserverTask): CallEventsObserverTask
} }

View file

@ -43,7 +43,8 @@ internal class GroupSummaryUpdater @Inject constructor(
override val query = Monarchy.Query { GroupEntity.where(it) } override val query = Monarchy.Query { GroupEntity.where(it) }
override fun onChange(results: RealmResults<GroupEntity>, changeSet: OrderedCollectionChangeSet) { override fun onChange(results: RealmResults<GroupEntity>) {
/*
// `insertions` for new groups and `changes` to handle left groups // `insertions` for new groups and `changes` to handle left groups
val modifiedGroupEntity = (changeSet.insertions + changeSet.changes) val modifiedGroupEntity = (changeSet.insertions + changeSet.changes)
.asSequence() .asSequence()
@ -63,6 +64,7 @@ internal class GroupSummaryUpdater @Inject constructor(
deleteGroups(it) deleteGroups(it)
} }
} }
*/
} }
private fun fetchGroupsData(groupIds: List<String>) { private fun fetchGroupsData(groupIds: List<String>) {

View file

@ -15,9 +15,7 @@
*/ */
package im.vector.matrix.android.internal.session.room package im.vector.matrix.android.internal.session.room
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.api.session.crypto.CryptoService import im.vector.matrix.android.api.session.crypto.CryptoService
import im.vector.matrix.android.api.session.crypto.MXCryptoError
import im.vector.matrix.android.api.session.events.model.AggregatedAnnotation import im.vector.matrix.android.api.session.events.model.AggregatedAnnotation
import im.vector.matrix.android.api.session.events.model.Event import im.vector.matrix.android.api.session.events.model.Event
import im.vector.matrix.android.api.session.events.model.EventType import im.vector.matrix.android.api.session.events.model.EventType
@ -32,7 +30,6 @@ import im.vector.matrix.android.api.session.room.model.message.MessageContent
import im.vector.matrix.android.api.session.room.model.message.MessagePollResponseContent import im.vector.matrix.android.api.session.room.model.message.MessagePollResponseContent
import im.vector.matrix.android.api.session.room.model.message.MessageRelationContent import im.vector.matrix.android.api.session.room.model.message.MessageRelationContent
import im.vector.matrix.android.api.session.room.model.relation.ReactionContent import im.vector.matrix.android.api.session.room.model.relation.ReactionContent
import im.vector.matrix.android.internal.crypto.algorithms.olm.OlmDecryptionResult
import im.vector.matrix.android.internal.crypto.model.event.EncryptedEventContent import im.vector.matrix.android.internal.crypto.model.event.EncryptedEventContent
import im.vector.matrix.android.internal.database.mapper.ContentMapper import im.vector.matrix.android.internal.database.mapper.ContentMapper
import im.vector.matrix.android.internal.database.mapper.EventMapper import im.vector.matrix.android.internal.database.mapper.EventMapper
@ -47,21 +44,12 @@ import im.vector.matrix.android.internal.database.model.TimelineEventEntity
import im.vector.matrix.android.internal.database.query.create import im.vector.matrix.android.internal.database.query.create
import im.vector.matrix.android.internal.database.query.getOrCreate import im.vector.matrix.android.internal.database.query.getOrCreate
import im.vector.matrix.android.internal.database.query.where import im.vector.matrix.android.internal.database.query.where
import im.vector.matrix.android.internal.di.SessionDatabase import im.vector.matrix.android.internal.di.UserId
import im.vector.matrix.android.internal.task.Task import im.vector.matrix.android.internal.session.EventInsertLiveProcessor
import im.vector.matrix.android.internal.util.awaitTransaction
import io.realm.Realm import io.realm.Realm
import timber.log.Timber import timber.log.Timber
import javax.inject.Inject import javax.inject.Inject
internal interface EventRelationsAggregationTask : Task<EventRelationsAggregationTask.Params, Unit> {
data class Params(
val events: List<Event>,
val userId: String
)
}
enum class VerificationState { enum class VerificationState {
REQUEST, REQUEST,
WAITING, WAITING,
@ -89,36 +77,38 @@ private fun VerificationState?.toState(newState: VerificationState): Verificatio
return newState return newState
} }
/** internal class EventRelationsAggregationProcessor @Inject constructor(@UserId private val userId: String,
* Called by EventRelationAggregationUpdater, when new events that can affect relations are inserted in base. private val cryptoService: CryptoService
*/ ) : EventInsertLiveProcessor {
internal class DefaultEventRelationsAggregationTask @Inject constructor(
@SessionDatabase private val monarchy: Monarchy,
private val cryptoService: CryptoService) : EventRelationsAggregationTask {
// OPT OUT serer aggregation until API mature enough private val allowedTypes = listOf(
private val SHOULD_HANDLE_SERVER_AGREGGATION = false EventType.MESSAGE,
EventType.REDACTION,
EventType.REACTION,
EventType.KEY_VERIFICATION_DONE,
EventType.KEY_VERIFICATION_CANCEL,
EventType.KEY_VERIFICATION_ACCEPT,
EventType.KEY_VERIFICATION_START,
EventType.KEY_VERIFICATION_MAC,
// TODO Add ?
// EventType.KEY_VERIFICATION_READY,
EventType.KEY_VERIFICATION_KEY,
EventType.ENCRYPTED
)
override suspend fun execute(params: EventRelationsAggregationTask.Params) { override fun shouldProcess(eventId: String, eventType: String): Boolean {
val events = params.events return allowedTypes.contains(eventType)
val userId = params.userId
monarchy.awaitTransaction { realm ->
Timber.v(">>> DefaultEventRelationsAggregationTask[${params.hashCode()}] called with ${events.size} events")
update(realm, events, userId)
Timber.v("<<< DefaultEventRelationsAggregationTask[${params.hashCode()}] finished")
}
} }
private fun update(realm: Realm, events: List<Event>, userId: String) { override suspend fun process(realm: Realm, event: Event) {
events.forEach { event ->
try { // Temporary catch, should be removed try { // Temporary catch, should be removed
val roomId = event.roomId val roomId = event.roomId
if (roomId == null) { if (roomId == null) {
Timber.w("Event has no room id ${event.eventId}") Timber.w("Event has no room id ${event.eventId}")
return@forEach return
} }
val isLocalEcho = LocalEcho.isLocalEchoId(event.eventId ?: "") val isLocalEcho = LocalEcho.isLocalEchoId(event.eventId ?: "")
when (event.type) { when (event.getClearType()) {
EventType.REACTION -> { EventType.REACTION -> {
// we got a reaction!! // we got a reaction!!
Timber.v("###REACTION in room $roomId , reaction eventID ${event.eventId}") Timber.v("###REACTION in room $roomId , reaction eventID ${event.eventId}")
@ -171,7 +161,6 @@ internal class DefaultEventRelationsAggregationTask @Inject constructor(
|| encryptedEventContent?.relatesTo?.type == RelationType.RESPONSE || encryptedEventContent?.relatesTo?.type == RelationType.RESPONSE
) { ) {
// we need to decrypt if needed // we need to decrypt if needed
decryptIfNeeded(event)
event.getClearContent().toModel<MessageContent>()?.let { event.getClearContent().toModel<MessageContent>()?.let {
if (encryptedEventContent.relatesTo.type == RelationType.REPLACE) { if (encryptedEventContent.relatesTo.type == RelationType.REPLACE) {
Timber.v("###REPLACE in room $roomId for event ${event.eventId}") Timber.v("###REPLACE in room $roomId for event ${event.eventId}")
@ -183,7 +172,6 @@ internal class DefaultEventRelationsAggregationTask @Inject constructor(
} }
} }
} else if (encryptedEventContent?.relatesTo?.type == RelationType.REFERENCE) { } else if (encryptedEventContent?.relatesTo?.type == RelationType.REFERENCE) {
decryptIfNeeded(event)
when (event.getClearType()) { when (event.getClearType()) {
EventType.KEY_VERIFICATION_DONE, EventType.KEY_VERIFICATION_DONE,
EventType.KEY_VERIFICATION_CANCEL, EventType.KEY_VERIFICATION_CANCEL,
@ -202,7 +190,7 @@ internal class DefaultEventRelationsAggregationTask @Inject constructor(
} }
EventType.REDACTION -> { EventType.REDACTION -> {
val eventToPrune = event.redacts?.let { EventEntity.where(realm, eventId = it).findFirst() } val eventToPrune = event.redacts?.let { EventEntity.where(realm, eventId = it).findFirst() }
?: return@forEach ?: return
when (eventToPrune.type) { when (eventToPrune.type) {
EventType.MESSAGE -> { EventType.MESSAGE -> {
Timber.d("REDACTION for message ${eventToPrune.eventId}") Timber.d("REDACTION for message ${eventToPrune.eventId}")
@ -226,24 +214,9 @@ internal class DefaultEventRelationsAggregationTask @Inject constructor(
Timber.e(t, "## Should not happen ") Timber.e(t, "## Should not happen ")
} }
} }
}
private fun decryptIfNeeded(event: Event) { // OPT OUT serer aggregation until API mature enough
if (event.mxDecryptionResult == null) { private val SHOULD_HANDLE_SERVER_AGREGGATION = false
try {
val result = cryptoService.decryptEvent(event, "")
event.mxDecryptionResult = OlmDecryptionResult(
payload = result.clearEvent,
senderKey = result.senderCurve25519Key,
keysClaimed = result.claimedEd25519Key?.let { k -> mapOf("ed25519" to k) },
forwardingCurve25519KeyChain = result.forwardingCurve25519KeyChain
)
} catch (e: MXCryptoError) {
Timber.v("Failed to decrypt e2e replace")
// TODO -> we should keep track of this and retry, or aggregation will be broken
}
}
}
private fun handleReplace(realm: Realm, event: Event, content: MessageContent, roomId: String, isLocalEcho: Boolean, relatedEventId: String? = null) { private fun handleReplace(realm: Realm, event: Event, content: MessageContent, roomId: String, isLocalEcho: Boolean, relatedEventId: String? = null) {
val eventId = event.eventId ?: return val eventId = event.eventId ?: return

View file

@ -1,76 +0,0 @@
/*
* Copyright 2019 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.matrix.android.internal.session.room
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.api.session.events.model.EventType
import im.vector.matrix.android.internal.database.RealmLiveEntityObserver
import im.vector.matrix.android.internal.database.mapper.asDomain
import im.vector.matrix.android.internal.database.model.EventEntity
import im.vector.matrix.android.internal.database.query.whereTypes
import im.vector.matrix.android.internal.di.SessionDatabase
import im.vector.matrix.android.internal.di.UserId
import io.realm.OrderedCollectionChangeSet
import io.realm.RealmConfiguration
import io.realm.RealmResults
import kotlinx.coroutines.launch
import timber.log.Timber
import javax.inject.Inject
/**
* Acts as a listener of incoming messages in order to incrementally computes a summary of annotations.
* For reactions will build a EventAnnotationsSummaryEntity, ans for edits a EditAggregatedSummaryEntity.
* The summaries can then be extracted and added (as a decoration) to a TimelineEvent for final display.
*/
internal class EventRelationsAggregationUpdater @Inject constructor(
@SessionDatabase realmConfiguration: RealmConfiguration,
@UserId private val userId: String,
private val task: EventRelationsAggregationTask) :
RealmLiveEntityObserver<EventEntity>(realmConfiguration) {
override val query = Monarchy.Query<EventEntity> {
EventEntity.whereTypes(it, listOf(
EventType.MESSAGE,
EventType.REDACTION,
EventType.REACTION,
EventType.KEY_VERIFICATION_DONE,
EventType.KEY_VERIFICATION_CANCEL,
EventType.KEY_VERIFICATION_ACCEPT,
EventType.KEY_VERIFICATION_START,
EventType.KEY_VERIFICATION_MAC,
// TODO Add ?
// EventType.KEY_VERIFICATION_READY,
EventType.KEY_VERIFICATION_KEY,
EventType.ENCRYPTED)
)
}
override fun onChange(results: RealmResults<EventEntity>, changeSet: OrderedCollectionChangeSet) {
Timber.v("EventRelationsAggregationUpdater called with ${changeSet.insertions.size} insertions")
val insertedDomains = changeSet.insertions
.asSequence()
.mapNotNull { results[it]?.asDomain() }
.toList()
val params = EventRelationsAggregationTask.Params(
insertedDomains,
userId
)
observerScope.launch {
task.execute(params)
}
}
}

View file

@ -44,8 +44,6 @@ import im.vector.matrix.android.internal.session.room.membership.joining.InviteT
import im.vector.matrix.android.internal.session.room.membership.joining.JoinRoomTask import im.vector.matrix.android.internal.session.room.membership.joining.JoinRoomTask
import im.vector.matrix.android.internal.session.room.membership.leaving.DefaultLeaveRoomTask import im.vector.matrix.android.internal.session.room.membership.leaving.DefaultLeaveRoomTask
import im.vector.matrix.android.internal.session.room.membership.leaving.LeaveRoomTask import im.vector.matrix.android.internal.session.room.membership.leaving.LeaveRoomTask
import im.vector.matrix.android.internal.session.room.prune.DefaultPruneEventTask
import im.vector.matrix.android.internal.session.room.prune.PruneEventTask
import im.vector.matrix.android.internal.session.room.read.DefaultMarkAllRoomsReadTask import im.vector.matrix.android.internal.session.room.read.DefaultMarkAllRoomsReadTask
import im.vector.matrix.android.internal.session.room.read.DefaultSetReadMarkersTask import im.vector.matrix.android.internal.session.room.read.DefaultSetReadMarkersTask
import im.vector.matrix.android.internal.session.room.read.MarkAllRoomsReadTask import im.vector.matrix.android.internal.session.room.read.MarkAllRoomsReadTask
@ -129,9 +127,6 @@ internal abstract class RoomModule {
@Binds @Binds
abstract fun bindFileService(service: DefaultFileService): FileService abstract fun bindFileService(service: DefaultFileService): FileService
@Binds
abstract fun bindEventRelationsAggregationTask(task: DefaultEventRelationsAggregationTask): EventRelationsAggregationTask
@Binds @Binds
abstract fun bindCreateRoomTask(task: DefaultCreateRoomTask): CreateRoomTask abstract fun bindCreateRoomTask(task: DefaultCreateRoomTask): CreateRoomTask
@ -156,9 +151,6 @@ internal abstract class RoomModule {
@Binds @Binds
abstract fun bindLoadRoomMembersTask(task: DefaultLoadRoomMembersTask): LoadRoomMembersTask abstract fun bindLoadRoomMembersTask(task: DefaultLoadRoomMembersTask): LoadRoomMembersTask
@Binds
abstract fun bindPruneEventTask(task: DefaultPruneEventTask): PruneEventTask
@Binds @Binds
abstract fun bindSetReadMarkersTask(task: DefaultSetReadMarkersTask): SetReadMarkersTask abstract fun bindSetReadMarkersTask(task: DefaultSetReadMarkersTask): SetReadMarkersTask

View file

@ -1,72 +0,0 @@
/*
* Copyright 2019 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.matrix.android.internal.session.room.create
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.api.session.events.model.Event
import im.vector.matrix.android.api.session.events.model.EventType
import im.vector.matrix.android.api.session.events.model.toModel
import im.vector.matrix.android.api.session.room.model.VersioningState
import im.vector.matrix.android.api.session.room.model.create.RoomCreateContent
import im.vector.matrix.android.internal.database.RealmLiveEntityObserver
import im.vector.matrix.android.internal.database.awaitTransaction
import im.vector.matrix.android.internal.database.mapper.asDomain
import im.vector.matrix.android.internal.database.model.EventEntity
import im.vector.matrix.android.internal.database.model.RoomSummaryEntity
import im.vector.matrix.android.internal.database.query.where
import im.vector.matrix.android.internal.database.query.whereTypes
import im.vector.matrix.android.internal.di.SessionDatabase
import io.realm.OrderedCollectionChangeSet
import io.realm.RealmConfiguration
import io.realm.RealmResults
import kotlinx.coroutines.launch
import javax.inject.Inject
internal class RoomCreateEventLiveObserver @Inject constructor(@SessionDatabase
realmConfiguration: RealmConfiguration)
: RealmLiveEntityObserver<EventEntity>(realmConfiguration) {
override val query = Monarchy.Query<EventEntity> {
EventEntity.whereTypes(it, listOf(EventType.STATE_ROOM_CREATE))
}
override fun onChange(results: RealmResults<EventEntity>, changeSet: OrderedCollectionChangeSet) {
changeSet.insertions
.asSequence()
.mapNotNull {
results[it]?.asDomain()
}
.toList()
.also {
observerScope.launch {
handleRoomCreateEvents(it)
}
}
}
private suspend fun handleRoomCreateEvents(createEvents: List<Event>) = awaitTransaction(realmConfiguration) { realm ->
for (event in createEvents) {
val createRoomContent = event.getClearContent().toModel<RoomCreateContent>()
val predecessorRoomId = createRoomContent?.predecessor?.roomId ?: continue
val predecessorRoomSummary = RoomSummaryEntity.where(realm, predecessorRoomId).findFirst()
?: RoomSummaryEntity(predecessorRoomId)
predecessorRoomSummary.versioningState = VersioningState.UPGRADED_ROOM_JOINED
realm.insertOrUpdate(predecessorRoomSummary)
}
}
}

View file

@ -0,0 +1,45 @@
/*
* Copyright (c) 2020 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.matrix.android.internal.session.room.create
import im.vector.matrix.android.api.session.events.model.Event
import im.vector.matrix.android.api.session.events.model.EventType
import im.vector.matrix.android.api.session.events.model.toModel
import im.vector.matrix.android.api.session.room.model.VersioningState
import im.vector.matrix.android.api.session.room.model.create.RoomCreateContent
import im.vector.matrix.android.internal.database.model.RoomSummaryEntity
import im.vector.matrix.android.internal.database.query.where
import im.vector.matrix.android.internal.session.EventInsertLiveProcessor
import io.realm.Realm
import javax.inject.Inject
internal class RoomCreateEventProcessor @Inject constructor() : EventInsertLiveProcessor {
override suspend fun process(realm: Realm, event: Event) {
val createRoomContent = event.getClearContent().toModel<RoomCreateContent>()
val predecessorRoomId = createRoomContent?.predecessor?.roomId ?: return
val predecessorRoomSummary = RoomSummaryEntity.where(realm, predecessorRoomId).findFirst()
?: RoomSummaryEntity(predecessorRoomId)
predecessorRoomSummary.versioningState = VersioningState.UPGRADED_ROOM_JOINED
realm.insertOrUpdate(predecessorRoomSummary)
}
override fun shouldProcess(eventId: String, eventType: String): Boolean {
return eventType == EventType.STATE_ROOM_CREATE
}
}

View file

@ -1,56 +0,0 @@
/*
* Copyright 2019 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.matrix.android.internal.session.room.prune
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.api.session.events.model.EventType
import im.vector.matrix.android.internal.database.RealmLiveEntityObserver
import im.vector.matrix.android.internal.database.mapper.asDomain
import im.vector.matrix.android.internal.database.model.EventEntity
import im.vector.matrix.android.internal.database.query.whereTypes
import im.vector.matrix.android.internal.di.SessionDatabase
import io.realm.OrderedCollectionChangeSet
import io.realm.RealmConfiguration
import io.realm.RealmResults
import kotlinx.coroutines.launch
import timber.log.Timber
import javax.inject.Inject
/**
* Listens to the database for the insertion of any redaction event.
* As it will actually delete the content, it should be called last in the list of listener.
*/
internal class EventsPruner @Inject constructor(@SessionDatabase realmConfiguration: RealmConfiguration,
private val pruneEventTask: PruneEventTask) :
RealmLiveEntityObserver<EventEntity>(realmConfiguration) {
override val query = Monarchy.Query<EventEntity> { EventEntity.whereTypes(it, listOf(EventType.REDACTION)) }
override fun onChange(results: RealmResults<EventEntity>, changeSet: OrderedCollectionChangeSet) {
Timber.v("Event pruner called with ${changeSet.insertions.size} insertions")
val insertedDomains = changeSet.insertions
.asSequence()
.mapNotNull { results[it]?.asDomain() }
.toList()
observerScope.launch {
val params = PruneEventTask.Params(insertedDomains)
pruneEventTask.execute(params)
}
}
}

View file

@ -13,9 +13,9 @@
* See the License for the specific language governing permissions and * See the License for the specific language governing permissions and
* limitations under the License. * limitations under the License.
*/ */
package im.vector.matrix.android.internal.session.room.prune package im.vector.matrix.android.internal.session.room.prune
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.api.session.events.model.Event import im.vector.matrix.android.api.session.events.model.Event
import im.vector.matrix.android.api.session.events.model.EventType import im.vector.matrix.android.api.session.events.model.EventType
import im.vector.matrix.android.api.session.events.model.LocalEcho import im.vector.matrix.android.api.session.events.model.LocalEcho
@ -27,29 +27,24 @@ import im.vector.matrix.android.internal.database.model.TimelineEventEntity
import im.vector.matrix.android.internal.database.query.findWithSenderMembershipEvent import im.vector.matrix.android.internal.database.query.findWithSenderMembershipEvent
import im.vector.matrix.android.internal.database.query.where import im.vector.matrix.android.internal.database.query.where
import im.vector.matrix.android.internal.di.MoshiProvider import im.vector.matrix.android.internal.di.MoshiProvider
import im.vector.matrix.android.internal.di.SessionDatabase import im.vector.matrix.android.internal.session.EventInsertLiveProcessor
import im.vector.matrix.android.internal.task.Task
import im.vector.matrix.android.internal.util.awaitTransaction
import io.realm.Realm import io.realm.Realm
import timber.log.Timber import timber.log.Timber
import javax.inject.Inject import javax.inject.Inject
internal interface PruneEventTask : Task<PruneEventTask.Params, Unit> { /**
* Listens to the database for the insertion of any redaction event.
* As it will actually delete the content, it should be called last in the list of listener.
*/
internal class RedactionEventProcessor @Inject constructor() : EventInsertLiveProcessor {
data class Params( override fun shouldProcess(eventId: String, eventType: String): Boolean {
val redactionEvents: List<Event> return eventType == EventType.REDACTION
)
} }
internal class DefaultPruneEventTask @Inject constructor(@SessionDatabase private val monarchy: Monarchy) : PruneEventTask { override suspend fun process(realm: Realm, event: Event) {
override suspend fun execute(params: PruneEventTask.Params) {
monarchy.awaitTransaction { realm ->
params.redactionEvents.forEach { event ->
pruneEvent(realm, event) pruneEvent(realm, event)
} }
}
}
private fun pruneEvent(realm: Realm, redactionEvent: Event) { private fun pruneEvent(realm: Realm, redactionEvent: Event) {
if (redactionEvent.redacts.isNullOrBlank()) { if (redactionEvent.redacts.isNullOrBlank()) {

View file

@ -174,24 +174,13 @@ internal class DefaultTimeline(
backgroundRealm.set(realm) backgroundRealm.set(realm)
roomEntity = RoomEntity.where(realm, roomId = roomId).findFirst() roomEntity = RoomEntity.where(realm, roomId = roomId).findFirst()
roomEntity?.sendingTimelineEvents?.addChangeListener { events ->
// Remove in memory as soon as they are known by database
events.forEach { te ->
inMemorySendingEvents.removeAll { te.eventId == it.eventId }
}
postSnapshot()
}
nonFilteredEvents = buildEventQuery(realm).sort(TimelineEventEntityFields.DISPLAY_INDEX, Sort.DESCENDING).findAll() nonFilteredEvents = buildEventQuery(realm).sort(TimelineEventEntityFields.DISPLAY_INDEX, Sort.DESCENDING).findAll()
filteredEvents = nonFilteredEvents.where() filteredEvents = nonFilteredEvents.where()
.filterEventsWithSettings() .filterEventsWithSettings()
.findAll() .findAll()
handleInitialLoad() handleInitialLoad()
nonFilteredEvents.addChangeListener(eventsChangeListener)
eventRelations = EventAnnotationsSummaryEntity.whereInRoom(realm, roomId) eventRelations = EventAnnotationsSummaryEntity.whereInRoom(realm, roomId)
.findAllAsync() .findAllAsync()
.also { it.addChangeListener(relationsListener) }
if (settings.shouldHandleHiddenReadReceipts()) { if (settings.shouldHandleHiddenReadReceipts()) {
hiddenReadReceipts.start(realm, filteredEvents, nonFilteredEvents, this) hiddenReadReceipts.start(realm, filteredEvents, nonFilteredEvents, this)

View file

@ -125,7 +125,7 @@ internal class TimelineHiddenReadReceipts constructor(private val readReceiptsSu
.isNotEmpty(ReadReceiptsSummaryEntityFields.READ_RECEIPTS.`$`) .isNotEmpty(ReadReceiptsSummaryEntityFields.READ_RECEIPTS.`$`)
.filterReceiptsWithSettings() .filterReceiptsWithSettings()
.findAllAsync() .findAllAsync()
.also { it.addChangeListener(hiddenReadReceiptsListener) } //.also { it.addChangeListener(hiddenReadReceiptsListener) }
} }
/** /**

View file

@ -1,75 +0,0 @@
/*
* Copyright 2019 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.matrix.android.internal.session.room.tombstone
import com.zhuinden.monarchy.Monarchy
import im.vector.matrix.android.api.session.events.model.Event
import im.vector.matrix.android.api.session.events.model.EventType
import im.vector.matrix.android.api.session.events.model.toModel
import im.vector.matrix.android.api.session.room.model.VersioningState
import im.vector.matrix.android.api.session.room.model.tombstone.RoomTombstoneContent
import im.vector.matrix.android.internal.database.RealmLiveEntityObserver
import im.vector.matrix.android.internal.database.awaitTransaction
import im.vector.matrix.android.internal.database.mapper.asDomain
import im.vector.matrix.android.internal.database.model.EventEntity
import im.vector.matrix.android.internal.database.model.RoomSummaryEntity
import im.vector.matrix.android.internal.database.query.where
import im.vector.matrix.android.internal.database.query.whereTypes
import im.vector.matrix.android.internal.di.SessionDatabase
import io.realm.OrderedCollectionChangeSet
import io.realm.RealmConfiguration
import io.realm.RealmResults
import kotlinx.coroutines.launch
import javax.inject.Inject
internal class RoomTombstoneEventLiveObserver @Inject constructor(@SessionDatabase
realmConfiguration: RealmConfiguration)
: RealmLiveEntityObserver<EventEntity>(realmConfiguration) {
override val query = Monarchy.Query<EventEntity> {
EventEntity.whereTypes(it, listOf(EventType.STATE_ROOM_TOMBSTONE))
}
override fun onChange(results: RealmResults<EventEntity>, changeSet: OrderedCollectionChangeSet) {
changeSet.insertions
.asSequence()
.mapNotNull {
results[it]?.asDomain()
}
.toList()
.also {
observerScope.launch {
handleRoomTombstoneEvents(it)
}
}
}
private suspend fun handleRoomTombstoneEvents(tombstoneEvents: List<Event>) = awaitTransaction(realmConfiguration) { realm ->
for (event in tombstoneEvents) {
if (event.roomId == null) continue
val createRoomContent = event.getClearContent().toModel<RoomTombstoneContent>()
if (createRoomContent?.replacementRoomId == null) continue
val predecessorRoomSummary = RoomSummaryEntity.where(realm, event.roomId).findFirst()
?: RoomSummaryEntity(event.roomId)
if (predecessorRoomSummary.versioningState == VersioningState.NONE) {
predecessorRoomSummary.versioningState = VersioningState.UPGRADED_ROOM_NOT_JOINED
}
realm.insertOrUpdate(predecessorRoomSummary)
}
}
}

View file

@ -0,0 +1,48 @@
/*
* Copyright (c) 2020 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.matrix.android.internal.session.room.tombstone
import im.vector.matrix.android.api.session.events.model.Event
import im.vector.matrix.android.api.session.events.model.EventType
import im.vector.matrix.android.api.session.events.model.toModel
import im.vector.matrix.android.api.session.room.model.VersioningState
import im.vector.matrix.android.api.session.room.model.tombstone.RoomTombstoneContent
import im.vector.matrix.android.internal.database.model.RoomSummaryEntity
import im.vector.matrix.android.internal.database.query.where
import im.vector.matrix.android.internal.session.EventInsertLiveProcessor
import io.realm.Realm
import javax.inject.Inject
internal class RoomTombstoneEventProcessor @Inject constructor() : EventInsertLiveProcessor {
override suspend fun process(realm: Realm, event: Event) {
if (event.roomId == null) return
val createRoomContent = event.getClearContent().toModel<RoomTombstoneContent>()
if (createRoomContent?.replacementRoomId == null) return
val predecessorRoomSummary = RoomSummaryEntity.where(realm, event.roomId).findFirst()
?: RoomSummaryEntity(event.roomId)
if (predecessorRoomSummary.versioningState == VersioningState.NONE) {
predecessorRoomSummary.versioningState = VersioningState.UPGRADED_ROOM_NOT_JOINED
}
realm.insertOrUpdate(predecessorRoomSummary)
}
override fun shouldProcess(eventId: String, eventType: String): Boolean {
return eventType == EventType.STATE_ROOM_TOMBSTONE
}
}