New SDK Api for to device and event stream listener

This commit is contained in:
Valere 2021-10-22 18:09:11 +02:00
parent 32f2e7d508
commit 6a34b999f2
14 changed files with 354 additions and 11 deletions

View file

@ -0,0 +1,24 @@
/*
* Copyright 2021 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.api.session
interface EventStreamService {
fun addEventStreamListener(streamListener: LiveEventListener)
fun removeEventStreamListener(streamListener: LiveEventListener)
}

View file

@ -0,0 +1,35 @@
/*
* Copyright 2021 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.api.session
import org.matrix.android.sdk.api.session.events.model.Event
import org.matrix.android.sdk.api.util.JsonDict
interface LiveEventListener {
fun onLiveEvent(roomId: String, event: Event)
fun onPaginatedEvent(roomId: String, event: Event)
fun onEventDecrypted(eventId: String, roomId: String, clearEvent: JsonDict)
fun onEventDecryptionError(eventId: String, roomId: String, throwable: Throwable)
fun onLiveToDeviceEvent(event: Event)
// Maybe later add more, like onJoin, onLeave..
}

View file

@ -84,7 +84,9 @@ interface Session :
SyncStatusService,
HomeServerCapabilitiesService,
SecureStorageService,
AccountService {
AccountService,
ToDeviceService,
EventStreamService {
val coroutineDispatchers: MatrixCoroutineDispatchers

View file

@ -0,0 +1,37 @@
/*
* Copyright 2021 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.api.session
import org.matrix.android.sdk.api.session.events.model.Content
import org.matrix.android.sdk.internal.crypto.model.MXUsersDevicesMap
import java.util.UUID
interface ToDeviceService {
/**
* Send an event to a specific list of devices
*/
suspend fun sendToDevice(eventType: String, contentMap: MXUsersDevicesMap<Any>, txnId: String? = UUID.randomUUID().toString())
suspend fun sendToDevice(eventType: String, userId: String, deviceId: String, content: Content, txnId: String? = UUID.randomUUID().toString()) {
sendToDevice(eventType, mapOf(userId to listOf(deviceId)), content, txnId)
}
suspend fun sendToDevice(eventType: String, targets: Map<String, List<String>>, content: Content, txnId: String? = UUID.randomUUID().toString())
suspend fun sendEncryptedToDevice(eventType: String, targets: Map<String, List<String>>, content: Content, txnId: String? = UUID.randomUUID().toString())
}

View file

@ -90,6 +90,7 @@ import org.matrix.android.sdk.internal.di.MoshiProvider
import org.matrix.android.sdk.internal.di.UserId
import org.matrix.android.sdk.internal.extensions.foldToCallback
import org.matrix.android.sdk.internal.session.SessionScope
import org.matrix.android.sdk.internal.session.StreamEventsManager
import org.matrix.android.sdk.internal.session.room.membership.LoadRoomMembersTask
import org.matrix.android.sdk.internal.task.TaskExecutor
import org.matrix.android.sdk.internal.task.TaskThread
@ -168,7 +169,8 @@ internal class DefaultCryptoService @Inject constructor(
private val coroutineDispatchers: MatrixCoroutineDispatchers,
private val taskExecutor: TaskExecutor,
private val cryptoCoroutineScope: CoroutineScope,
private val eventDecryptor: EventDecryptor
private val eventDecryptor: EventDecryptor,
private val liveEventManager: Lazy<StreamEventsManager>
) : CryptoService {
private val isStarting = AtomicBoolean(false)
@ -782,6 +784,7 @@ internal class DefaultCryptoService @Inject constructor(
}
}
}
liveEventManager.get().dispatchOnLiveToDevice(event)
}
/**

View file

@ -16,6 +16,7 @@
package org.matrix.android.sdk.internal.crypto.algorithms.megolm
import dagger.Lazy
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch
import org.matrix.android.sdk.api.MatrixCoroutineDispatchers
@ -43,6 +44,7 @@ import org.matrix.android.sdk.internal.crypto.model.rest.ForwardedRoomKeyContent
import org.matrix.android.sdk.internal.crypto.model.rest.RoomKeyRequestBody
import org.matrix.android.sdk.internal.crypto.store.IMXCryptoStore
import org.matrix.android.sdk.internal.crypto.tasks.SendToDeviceTask
import org.matrix.android.sdk.internal.session.StreamEventsManager
import timber.log.Timber
private val loggerTag = LoggerTag("MXMegolmDecryption", LoggerTag.CRYPTO)
@ -56,7 +58,8 @@ internal class MXMegolmDecryption(private val userId: String,
private val cryptoStore: IMXCryptoStore,
private val sendToDeviceTask: SendToDeviceTask,
private val coroutineDispatchers: MatrixCoroutineDispatchers,
private val cryptoCoroutineScope: CoroutineScope
private val cryptoCoroutineScope: CoroutineScope,
private val liveEventManager: Lazy<StreamEventsManager>
) : IMXDecrypting, IMXWithHeldExtension {
var newSessionListener: NewSessionListener? = null
@ -108,12 +111,15 @@ internal class MXMegolmDecryption(private val userId: String,
claimedEd25519Key = olmDecryptionResult.keysClaimed?.get("ed25519"),
forwardingCurve25519KeyChain = olmDecryptionResult.forwardingCurve25519KeyChain
.orEmpty()
)
).also {
liveEventManager.get().dispatchLiveEventDecrypted(event, it)
}
} else {
throw MXCryptoError.Base(MXCryptoError.ErrorType.MISSING_FIELDS, MXCryptoError.MISSING_FIELDS_REASON)
}
},
{ throwable ->
liveEventManager.get().dispatchLiveEventDecryptionFailed(event, throwable)
if (throwable is MXCryptoError.OlmError) {
// TODO Check the value of .message
if (throwable.olmException.message == "UNKNOWN_MESSAGE_INDEX") {

View file

@ -16,6 +16,7 @@
package org.matrix.android.sdk.internal.crypto.algorithms.megolm
import dagger.Lazy
import kotlinx.coroutines.CoroutineScope
import org.matrix.android.sdk.api.MatrixCoroutineDispatchers
import org.matrix.android.sdk.internal.crypto.DeviceListManager
@ -26,6 +27,7 @@ import org.matrix.android.sdk.internal.crypto.actions.MessageEncrypter
import org.matrix.android.sdk.internal.crypto.store.IMXCryptoStore
import org.matrix.android.sdk.internal.crypto.tasks.SendToDeviceTask
import org.matrix.android.sdk.internal.di.UserId
import org.matrix.android.sdk.internal.session.StreamEventsManager
import javax.inject.Inject
internal class MXMegolmDecryptionFactory @Inject constructor(
@ -38,7 +40,8 @@ internal class MXMegolmDecryptionFactory @Inject constructor(
private val cryptoStore: IMXCryptoStore,
private val sendToDeviceTask: SendToDeviceTask,
private val coroutineDispatchers: MatrixCoroutineDispatchers,
private val cryptoCoroutineScope: CoroutineScope
private val cryptoCoroutineScope: CoroutineScope,
private val eventsManager: Lazy<StreamEventsManager>
) {
fun create(): MXMegolmDecryption {
@ -52,6 +55,7 @@ internal class MXMegolmDecryptionFactory @Inject constructor(
cryptoStore,
sendToDeviceTask,
coroutineDispatchers,
cryptoCoroutineScope)
cryptoCoroutineScope,
eventsManager)
}
}

View file

@ -0,0 +1,34 @@
/*
* Copyright 2021 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.internal.session
import org.matrix.android.sdk.api.session.EventStreamService
import org.matrix.android.sdk.api.session.LiveEventListener
import javax.inject.Inject
internal class DefaultEventStreamService @Inject constructor(
private val streamEventsManager: StreamEventsManager
) : EventStreamService {
override fun addEventStreamListener(streamListener: LiveEventListener) {
streamEventsManager.addLiveEventListener(streamListener)
}
override fun removeEventStreamListener(streamListener: LiveEventListener) {
streamEventsManager.removeLiveEventListener(streamListener)
}
}

View file

@ -27,8 +27,10 @@ import org.matrix.android.sdk.api.auth.data.SessionParams
import org.matrix.android.sdk.api.failure.GlobalError
import org.matrix.android.sdk.api.federation.FederationService
import org.matrix.android.sdk.api.pushrules.PushRuleService
import org.matrix.android.sdk.api.session.EventStreamService
import org.matrix.android.sdk.api.session.Session
import org.matrix.android.sdk.api.session.SessionLifecycleObserver
import org.matrix.android.sdk.api.session.ToDeviceService
import org.matrix.android.sdk.api.session.account.AccountService
import org.matrix.android.sdk.api.session.accountdata.SessionAccountDataService
import org.matrix.android.sdk.api.session.cache.CacheService
@ -133,6 +135,8 @@ internal class DefaultSession @Inject constructor(
private val spaceService: Lazy<SpaceService>,
private val openIdService: Lazy<OpenIdService>,
private val presenceService: Lazy<PresenceService>,
private val toDeviceService: Lazy<ToDeviceService>,
private val eventStreamService: Lazy<EventStreamService>,
@UnauthenticatedWithCertificate
private val unauthenticatedWithCertificateOkHttpClient: Lazy<OkHttpClient>
) : Session,
@ -152,7 +156,9 @@ internal class DefaultSession @Inject constructor(
HomeServerCapabilitiesService by homeServerCapabilitiesService.get(),
ProfileService by profileService.get(),
PresenceService by presenceService.get(),
AccountService by accountService.get() {
AccountService by accountService.get(),
ToDeviceService by toDeviceService.get(),
EventStreamService by eventStreamService.get() {
override val sharedSecretStorageService: SharedSecretStorageService
get() = _sharedSecretStorageService.get()

View file

@ -0,0 +1,74 @@
/*
* Copyright 2021 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.internal.session
import org.matrix.android.sdk.api.session.ToDeviceService
import org.matrix.android.sdk.api.session.events.model.Content
import org.matrix.android.sdk.api.session.events.model.EventType
import org.matrix.android.sdk.internal.crypto.actions.MessageEncrypter
import org.matrix.android.sdk.internal.crypto.model.MXUsersDevicesMap
import org.matrix.android.sdk.internal.crypto.store.IMXCryptoStore
import org.matrix.android.sdk.internal.crypto.tasks.DefaultSendToDeviceTask
import org.matrix.android.sdk.internal.crypto.tasks.SendToDeviceTask
import javax.inject.Inject
internal class DefaultToDeviceService @Inject constructor(
private val sendToDeviceTask: DefaultSendToDeviceTask,
private val messageEncrypter: MessageEncrypter,
private val cryptoStore: IMXCryptoStore
) : ToDeviceService {
override suspend fun sendToDevice(eventType: String, targets: Map<String, List<String>>, content: Content, txnId: String?) {
val sendToDeviceMap = MXUsersDevicesMap<Any>()
targets.forEach { (userId, deviceIdList) ->
deviceIdList.forEach { deviceId ->
sendToDeviceMap.setObject(userId, deviceId, content)
}
}
sendToDevice(eventType, sendToDeviceMap, txnId)
}
override suspend fun sendToDevice(eventType: String, contentMap: MXUsersDevicesMap<Any>, txnId: String?) {
sendToDeviceTask.executeRetry(
SendToDeviceTask.Params(
eventType = eventType,
contentMap = contentMap,
transactionId = txnId
),
3
)
}
override suspend fun sendEncryptedToDevice(eventType: String, targets: Map<String, List<String>>, content: Content, txnId: String?) {
val payloadJson = mapOf(
"type" to eventType,
"content" to content
)
val sendToDeviceMap = MXUsersDevicesMap<Any>()
// Should I do an ensure olm session?
targets.forEach { (userId, deviceIdList) ->
deviceIdList.forEach { deviceId ->
cryptoStore.getUserDevice(userId, deviceId)?.let { deviceInfo ->
sendToDeviceMap.setObject(userId, deviceId, messageEncrypter.encryptMessage(payloadJson, listOf(deviceInfo)))
}
}
}
sendToDevice(EventType.ENCRYPTED, sendToDeviceMap, txnId)
}
}

View file

@ -32,8 +32,10 @@ import org.matrix.android.sdk.api.auth.data.HomeServerConnectionConfig
import org.matrix.android.sdk.api.auth.data.SessionParams
import org.matrix.android.sdk.api.auth.data.sessionId
import org.matrix.android.sdk.api.crypto.MXCryptoConfig
import org.matrix.android.sdk.api.session.EventStreamService
import org.matrix.android.sdk.api.session.Session
import org.matrix.android.sdk.api.session.SessionLifecycleObserver
import org.matrix.android.sdk.api.session.ToDeviceService
import org.matrix.android.sdk.api.session.accountdata.SessionAccountDataService
import org.matrix.android.sdk.api.session.events.EventService
import org.matrix.android.sdk.api.session.homeserver.HomeServerCapabilitiesService
@ -382,4 +384,10 @@ internal abstract class SessionModule {
@Binds
abstract fun bindEventSenderProcessor(processor: EventSenderProcessorCoroutine): EventSenderProcessor
@Binds
abstract fun bindToDeviceService(deviceService: DefaultToDeviceService): ToDeviceService
@Binds
abstract fun bindEventStreamService(deviceService: DefaultEventStreamService): EventStreamService
}

View file

@ -0,0 +1,103 @@
/*
* Copyright 2021 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.internal.session
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.launch
import org.matrix.android.sdk.api.extensions.tryOrNull
import org.matrix.android.sdk.api.session.LiveEventListener
import org.matrix.android.sdk.api.session.events.model.Event
import org.matrix.android.sdk.internal.crypto.MXEventDecryptionResult
import timber.log.Timber
import javax.inject.Inject
@SessionScope
internal class StreamEventsManager @Inject constructor() {
private val coroutineScope = CoroutineScope(SupervisorJob() + Dispatchers.Default)
private val listeners = mutableListOf<LiveEventListener>()
fun addLiveEventListener(listener: LiveEventListener) {
Timber.v("## VALR: addLiveEventListener")
listeners.add(listener)
}
fun removeLiveEventListener(listener: LiveEventListener) {
Timber.v("## VALR: removeLiveEventListener")
listeners.remove(listener)
}
fun dispatchLiveEventReceived(event: Event, roomId: String, initialSync: Boolean) {
Timber.v("## VALR: dispatchLiveEventReceived ${event.eventId}")
coroutineScope.launch {
if (!initialSync) {
listeners.forEach {
tryOrNull {
it.onLiveEvent(roomId, event)
}
}
}
}
}
fun dispatchPaginatedEventReceived(event: Event, roomId: String) {
Timber.v("## VALR: dispatchPaginatedEventReceived ${event.eventId}")
coroutineScope.launch {
listeners.forEach {
tryOrNull {
it.onPaginatedEvent(roomId, event)
}
}
}
}
fun dispatchLiveEventDecrypted(event: Event, result: MXEventDecryptionResult) {
Timber.v("## VALR: dispatchLiveEventDecrypted ${event.eventId}")
coroutineScope.launch {
listeners.forEach {
tryOrNull {
it.onEventDecrypted(event.eventId ?: "", event.roomId ?: "", result.clearEvent)
}
}
}
}
fun dispatchLiveEventDecryptionFailed(event: Event, error: Throwable) {
Timber.v("## VALR: dispatchLiveEventDecryptionFailed ${event.eventId}")
coroutineScope.launch {
listeners.forEach {
tryOrNull {
it.onEventDecryptionError(event.eventId ?: "", event.roomId ?: "", error)
}
}
}
}
fun dispatchOnLiveToDevice(event: Event) {
Timber.v("## VALR: dispatchOnLiveToDevice ${event.eventId}")
coroutineScope.launch {
listeners.forEach {
tryOrNull {
it.onLiveToDeviceEvent(event)
}
}
}
}
}

View file

@ -17,12 +17,12 @@
package org.matrix.android.sdk.internal.session.room.timeline
import com.zhuinden.monarchy.Monarchy
import dagger.Lazy
import io.realm.Realm
import org.matrix.android.sdk.api.session.events.model.EventType
import org.matrix.android.sdk.api.session.events.model.toModel
import org.matrix.android.sdk.api.session.room.model.RoomMemberContent
import org.matrix.android.sdk.api.session.room.send.SendState
import org.matrix.android.sdk.internal.database.helper.addIfNecessary
import org.matrix.android.sdk.internal.database.helper.addStateEvent
import org.matrix.android.sdk.internal.database.helper.addTimelineEvent
import org.matrix.android.sdk.internal.database.mapper.toEntity
@ -35,14 +35,16 @@ import org.matrix.android.sdk.internal.database.query.create
import org.matrix.android.sdk.internal.database.query.find
import org.matrix.android.sdk.internal.database.query.where
import org.matrix.android.sdk.internal.di.SessionDatabase
import org.matrix.android.sdk.internal.util.awaitTransaction
import org.matrix.android.sdk.internal.session.StreamEventsManager
import timber.log.Timber
import javax.inject.Inject
/**
* Insert Chunk in DB, and eventually link next and previous chunk in db.
*/
internal class TokenChunkEventPersistor @Inject constructor(@SessionDatabase private val monarchy: Monarchy) {
internal class TokenChunkEventPersistor @Inject constructor(
@SessionDatabase private val monarchy: Monarchy,
private val liveEventManager: Lazy<StreamEventsManager>) {
enum class Result {
SHOULD_FETCH_MORE,
@ -170,6 +172,7 @@ internal class TokenChunkEventPersistor @Inject constructor(@SessionDatabase pri
}
roomMemberContentsByUser[event.stateKey] = contentToUse.toModel<RoomMemberContent>()
}
liveEventManager.get().dispatchPaginatedEventReceived(event, roomId)
currentChunk.addTimelineEvent(roomId, eventEntity, direction, roomMemberContentsByUser)
}
}

View file

@ -16,6 +16,7 @@
package org.matrix.android.sdk.internal.session.sync.handler.room
import dagger.Lazy
import io.realm.Realm
import io.realm.kotlin.createObject
import org.matrix.android.sdk.api.session.crypto.MXCryptoError
@ -52,6 +53,7 @@ import org.matrix.android.sdk.internal.database.query.where
import org.matrix.android.sdk.internal.di.MoshiProvider
import org.matrix.android.sdk.internal.di.UserId
import org.matrix.android.sdk.internal.extensions.clearWith
import org.matrix.android.sdk.internal.session.StreamEventsManager
import org.matrix.android.sdk.internal.session.events.getFixedRoomMemberContent
import org.matrix.android.sdk.internal.session.initsync.ProgressReporter
import org.matrix.android.sdk.internal.session.initsync.mapWithProgress
@ -79,7 +81,8 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
private val threadsAwarenessHandler: ThreadsAwarenessHandler,
private val roomChangeMembershipStateDataSource: RoomChangeMembershipStateDataSource,
@UserId private val userId: String,
private val timelineInput: TimelineInput) {
private val timelineInput: TimelineInput,
private val liveEventService: Lazy<StreamEventsManager>) {
sealed class HandlingStrategy {
data class JOINED(val data: Map<String, RoomSync>) : HandlingStrategy()
@ -364,6 +367,7 @@ internal class RoomSyncHandler @Inject constructor(private val readReceiptHandle
continue
}
eventIds.add(event.eventId)
liveEventService.get().dispatchLiveEventReceived(event, roomId, insertType == EventInsertType.INITIAL_SYNC)
val isInitialSync = insertType == EventInsertType.INITIAL_SYNC