Merge pull request #2543 from vector-im/feature/ons/wait_members_before_sending_event

Make sure to load all members in the room before sending the event
This commit is contained in:
Benoit Marty 2020-12-29 09:31:30 +01:00 committed by GitHub
commit 7ff45738e0
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 272 additions and 37 deletions

View file

@ -8,6 +8,7 @@ Improvements 🙌:
-
Bugfix 🐛:
- Wait for all room members to be known before sending a message to a e2e room (#2518)
- Url previews sometimes attached to wrong message (#2561)
Translations 🗣:

View file

@ -86,7 +86,7 @@ class CommonTestHelper(context: Context) {
*
* @param session the session to sync
*/
fun syncSession(session: Session) {
fun syncSession(session: Session, timeout: Long = TestConstants.timeOutMillis) {
val lock = CountDownLatch(1)
val job = GlobalScope.launch(Dispatchers.Main) {
@ -109,7 +109,7 @@ class CommonTestHelper(context: Context) {
}
GlobalScope.launch(Dispatchers.Main) { syncLiveData.observeForever(syncObserver) }
await(lock)
await(lock, timeout)
}
/**
@ -119,7 +119,7 @@ class CommonTestHelper(context: Context) {
* @param message the message to send
* @param nbOfMessages the number of time the message will be sent
*/
fun sendTextMessage(room: Room, message: String, nbOfMessages: Int): List<TimelineEvent> {
fun sendTextMessage(room: Room, message: String, nbOfMessages: Int, timeout: Long = TestConstants.timeOutMillis): List<TimelineEvent> {
val timeline = room.createTimeline(null, TimelineSettings(10))
val sentEvents = ArrayList<TimelineEvent>(nbOfMessages)
val latch = CountDownLatch(1)
@ -151,7 +151,7 @@ class CommonTestHelper(context: Context) {
room.sendTextMessage(message + " #" + (i + 1))
}
// Wait 3 second more per message
await(latch, timeout = TestConstants.timeOutMillis + 3_000L * nbOfMessages)
await(latch, timeout = timeout + 3_000L * nbOfMessages)
timeline.dispose()
// Check that all events has been created
@ -215,14 +215,14 @@ class CommonTestHelper(context: Context) {
.getLoginFlow(hs, it)
}
doSync<RegistrationResult> {
doSync<RegistrationResult>(timeout = 60_000) {
matrix.authenticationService
.getRegistrationWizard()
.createAccount(userName, password, null, it)
}
// Perform dummy step
val registrationResult = doSync<RegistrationResult> {
val registrationResult = doSync<RegistrationResult>(timeout = 60_000) {
matrix.authenticationService
.getRegistrationWizard()
.dummy(it)
@ -231,7 +231,7 @@ class CommonTestHelper(context: Context) {
assertTrue(registrationResult is RegistrationResult.Success)
val session = (registrationResult as RegistrationResult.Success).session
if (sessionTestParams.withInitialSync) {
syncSession(session)
syncSession(session, 60_000)
}
return session

View file

@ -18,14 +18,21 @@ package org.matrix.android.sdk.common
import org.matrix.android.sdk.api.session.Session
data class CryptoTestData(val firstSession: Session,
val roomId: String,
val secondSession: Session? = null,
val thirdSession: Session? = null) {
data class CryptoTestData(val roomId: String,
val sessions: List<Session>) {
val firstSession: Session
get() = sessions.first()
val secondSession: Session?
get() = sessions.getOrNull(1)
val thirdSession: Session?
get() = sessions.getOrNull(2)
fun cleanUp(testHelper: CommonTestHelper) {
testHelper.signOutAndClose(firstSession)
secondSession?.let { testHelper.signOutAndClose(it) }
thirdSession?.let { testHelper.signOutAndClose(it) }
sessions.forEach {
testHelper.signOutAndClose(it)
}
}
}

View file

@ -73,7 +73,7 @@ class CryptoTestHelper(private val mTestHelper: CommonTestHelper) {
}
}
return CryptoTestData(aliceSession, roomId)
return CryptoTestData(roomId, listOf(aliceSession))
}
/**
@ -139,7 +139,7 @@ class CryptoTestHelper(private val mTestHelper: CommonTestHelper) {
// assertNotNull(roomFromBobPOV.powerLevels)
// assertTrue(roomFromBobPOV.powerLevels.maySendMessage(bobSession.myUserId))
return CryptoTestData(aliceSession, aliceRoomId, bobSession)
return CryptoTestData(aliceRoomId, listOf(aliceSession, bobSession))
}
/**
@ -157,7 +157,7 @@ class CryptoTestHelper(private val mTestHelper: CommonTestHelper) {
// wait the initial sync
SystemClock.sleep(1000)
return CryptoTestData(aliceSession, aliceRoomId, cryptoTestData.secondSession, samSession)
return CryptoTestData(aliceRoomId, listOf(aliceSession, cryptoTestData.secondSession!!, samSession))
}
/**
@ -381,4 +381,30 @@ class CryptoTestHelper(private val mTestHelper: CommonTestHelper) {
}
}
}
fun doE2ETestWithManyMembers(numberOfMembers: Int): CryptoTestData {
val aliceSession = mTestHelper.createAccount(TestConstants.USER_ALICE, defaultSessionParams)
aliceSession.cryptoService().setWarnOnUnknownDevices(false)
val roomId = mTestHelper.doSync<String> {
aliceSession.createRoom(CreateRoomParams().apply { name = "MyRoom" }, it)
}
val room = aliceSession.getRoom(roomId)!!
mTestHelper.runBlockingTest {
room.enableEncryption()
}
val sessions = mutableListOf(aliceSession)
for (index in 1 until numberOfMembers) {
val session = mTestHelper.createAccount("User_$index", defaultSessionParams)
mTestHelper.doSync<Unit>(timeout = 600_000) { room.invite(session.myUserId, null, it) }
println("TEST -> " + session.myUserId + " invited")
mTestHelper.doSync<Unit> { session.joinRoom(room.roomId, null, emptyList(), it) }
println("TEST -> " + session.myUserId + " joined")
sessions.add(session)
}
return CryptoTestData(roomId, sessions)
}
}

View file

@ -0,0 +1,92 @@
/*
* Copyright 2020 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.session.room.timeline
import org.junit.FixMethodOrder
import org.junit.Test
import org.junit.runner.RunWith
import org.junit.runners.JUnit4
import org.junit.runners.MethodSorters
import org.matrix.android.sdk.InstrumentedTest
import org.matrix.android.sdk.api.extensions.orFalse
import org.matrix.android.sdk.api.session.events.model.toModel
import org.matrix.android.sdk.api.session.room.model.message.MessageContent
import org.matrix.android.sdk.api.session.room.timeline.TimelineSettings
import org.matrix.android.sdk.common.CommonTestHelper
import org.matrix.android.sdk.common.CryptoTestHelper
import java.util.concurrent.CountDownLatch
import kotlin.test.fail
@RunWith(JUnit4::class)
@FixMethodOrder(MethodSorters.JVM)
class TimelineWithManyMembersTest : InstrumentedTest {
companion object {
private const val NUMBER_OF_MEMBERS = 6
}
private val commonTestHelper = CommonTestHelper(context())
private val cryptoTestHelper = CryptoTestHelper(commonTestHelper)
/**
* Ensures when someone sends a message to a crowded room, everyone can decrypt the message.
*/
@Test
fun everyone_should_decrypt_message_in_a_crowded_room() {
val cryptoTestData = cryptoTestHelper.doE2ETestWithManyMembers(NUMBER_OF_MEMBERS)
val sessionForFirstMember = cryptoTestData.firstSession
val roomForFirstMember = sessionForFirstMember.getRoom(cryptoTestData.roomId)!!
val firstMessage = "First messages from Alice"
commonTestHelper.sendTextMessage(
roomForFirstMember,
firstMessage,
1,
600_000
)
for (index in 1 until cryptoTestData.sessions.size) {
val session = cryptoTestData.sessions[index]
val roomForCurrentMember = session.getRoom(cryptoTestData.roomId)!!
val timelineForCurrentMember = roomForCurrentMember.createTimeline(null, TimelineSettings(30))
timelineForCurrentMember.start()
session.startSync(true)
run {
val lock = CountDownLatch(1)
val eventsListener = commonTestHelper.createEventListener(lock) { snapshot ->
snapshot
.find { it.isEncrypted() }
?.let {
val body = it.root.getClearContent()?.toModel<MessageContent>()?.body
if (body?.startsWith(firstMessage).orFalse()) {
println("User " + session.myUserId + " decrypted as " + body)
return@createEventListener true
} else {
fail("User " + session.myUserId + " decrypted as " + body + " CryptoError: " + it.root.mCryptoError)
}
} ?: return@createEventListener false
}
timelineForCurrentMember.addListener(eventsListener)
commonTestHelper.await(lock, 600_000)
}
session.stopSync()
}
}
}

View file

@ -20,6 +20,7 @@ import org.matrix.android.sdk.api.session.events.model.Event
import org.matrix.android.sdk.api.session.room.send.SendState
import org.matrix.android.sdk.internal.network.executeRequest
import org.matrix.android.sdk.internal.session.room.RoomAPI
import org.matrix.android.sdk.internal.session.room.membership.LoadRoomMembersTask
import org.matrix.android.sdk.internal.session.room.send.LocalEchoRepository
import org.matrix.android.sdk.internal.session.room.send.SendResponse
import org.matrix.android.sdk.internal.task.Task
@ -35,11 +36,19 @@ internal interface SendEventTask : Task<SendEventTask.Params, String> {
internal class DefaultSendEventTask @Inject constructor(
private val localEchoRepository: LocalEchoRepository,
private val encryptEventTask: DefaultEncryptEventTask,
private val loadRoomMembersTask: LoadRoomMembersTask,
private val roomAPI: RoomAPI,
private val eventBus: EventBus) : SendEventTask {
override suspend fun execute(params: SendEventTask.Params): String {
try {
// Make sure to load all members in the room before sending the event.
params.event.roomId
?.takeIf { params.encrypt }
?.let { roomId ->
loadRoomMembersTask.execute(LoadRoomMembersTask.Params(roomId))
}
val event = handleEncryption(params)
val localId = event.eventId!!

View file

@ -21,6 +21,8 @@ import io.realm.RealmMigration
import org.matrix.android.sdk.internal.database.model.HomeServerCapabilitiesEntityFields
import org.matrix.android.sdk.internal.database.model.PendingThreePidEntityFields
import org.matrix.android.sdk.internal.database.model.PreviewUrlCacheEntityFields
import org.matrix.android.sdk.internal.database.model.RoomEntityFields
import org.matrix.android.sdk.internal.database.model.RoomMembersLoadStatusType
import org.matrix.android.sdk.internal.database.model.RoomSummaryEntityFields
import timber.log.Timber
import javax.inject.Inject
@ -28,7 +30,7 @@ import javax.inject.Inject
class RealmSessionStoreMigration @Inject constructor() : RealmMigration {
companion object {
const val SESSION_STORE_SCHEMA_VERSION = 6L
const val SESSION_STORE_SCHEMA_VERSION = 7L
}
override fun migrate(realm: DynamicRealm, oldVersion: Long, newVersion: Long) {
@ -40,6 +42,7 @@ class RealmSessionStoreMigration @Inject constructor() : RealmMigration {
if (oldVersion <= 3) migrateTo4(realm)
if (oldVersion <= 4) migrateTo5(realm)
if (oldVersion <= 5) migrateTo6(realm)
if (oldVersion <= 6) migrateTo7(realm)
}
private fun migrateTo1(realm: DynamicRealm) {
@ -105,4 +108,18 @@ class RealmSessionStoreMigration @Inject constructor() : RealmMigration {
.addField(PreviewUrlCacheEntityFields.MXC_URL, String::class.java)
.addField(PreviewUrlCacheEntityFields.LAST_UPDATED_TIMESTAMP, Long::class.java)
}
private fun migrateTo7(realm: DynamicRealm) {
Timber.d("Step 6 -> 7")
realm.schema.get("RoomEntity")
?.addField(RoomEntityFields.MEMBERS_LOAD_STATUS_STR, String::class.java)
?.transform { obj ->
if (obj.getBoolean("areAllMembersLoaded")) {
obj.setString("membersLoadStatusStr", RoomMembersLoadStatusType.LOADED.name)
} else {
obj.setString("membersLoadStatusStr", RoomMembersLoadStatusType.NONE.name)
}
}
?.removeField("areAllMembersLoaded")
}
}

View file

@ -23,8 +23,7 @@ import io.realm.annotations.PrimaryKey
internal open class RoomEntity(@PrimaryKey var roomId: String = "",
var chunks: RealmList<ChunkEntity> = RealmList(),
var sendingTimelineEvents: RealmList<TimelineEventEntity> = RealmList(),
var areAllMembersLoaded: Boolean = false
var sendingTimelineEvents: RealmList<TimelineEventEntity> = RealmList()
) : RealmObject() {
private var membershipStr: String = Membership.NONE.name
@ -36,5 +35,14 @@ internal open class RoomEntity(@PrimaryKey var roomId: String = "",
membershipStr = value.name
}
private var membersLoadStatusStr: String = RoomMembersLoadStatusType.NONE.name
var membersLoadStatus: RoomMembersLoadStatusType
get() {
return RoomMembersLoadStatusType.valueOf(membersLoadStatusStr)
}
set(value) {
membersLoadStatusStr = value.name
}
companion object
}

View file

@ -0,0 +1,23 @@
/*
* Copyright 2020 The Matrix.org Foundation C.I.C.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.matrix.android.sdk.internal.database.model
internal enum class RoomMembersLoadStatusType {
NONE,
LOADING,
LOADED
}

View file

@ -17,12 +17,19 @@
package org.matrix.android.sdk.internal.session.room.membership
import com.zhuinden.monarchy.Monarchy
import io.realm.Realm
import io.realm.kotlin.createObject
import kotlinx.coroutines.TimeoutCancellationException
import org.greenrobot.eventbus.EventBus
import org.matrix.android.sdk.api.session.room.model.Membership
import org.matrix.android.sdk.api.session.room.send.SendState
import org.matrix.android.sdk.internal.database.awaitNotEmptyResult
import org.matrix.android.sdk.internal.database.mapper.toEntity
import org.matrix.android.sdk.internal.database.model.CurrentStateEventEntity
import org.matrix.android.sdk.internal.database.model.EventInsertType
import org.matrix.android.sdk.internal.database.model.RoomEntity
import org.matrix.android.sdk.internal.database.model.RoomEntityFields
import org.matrix.android.sdk.internal.database.model.RoomMembersLoadStatusType
import org.matrix.android.sdk.internal.database.query.copyToRealmOrIgnore
import org.matrix.android.sdk.internal.database.query.getOrCreate
import org.matrix.android.sdk.internal.database.query.where
@ -33,9 +40,7 @@ import org.matrix.android.sdk.internal.session.room.summary.RoomSummaryUpdater
import org.matrix.android.sdk.internal.session.sync.SyncTokenStore
import org.matrix.android.sdk.internal.task.Task
import org.matrix.android.sdk.internal.util.awaitTransaction
import io.realm.Realm
import io.realm.kotlin.createObject
import org.greenrobot.eventbus.EventBus
import java.util.concurrent.TimeUnit
import javax.inject.Inject
internal interface LoadRoomMembersTask : Task<LoadRoomMembersTask.Params, Unit> {
@ -56,13 +61,40 @@ internal class DefaultLoadRoomMembersTask @Inject constructor(
) : LoadRoomMembersTask {
override suspend fun execute(params: LoadRoomMembersTask.Params) {
if (areAllMembersAlreadyLoaded(params.roomId)) {
return
when (getRoomMembersLoadStatus(params.roomId)) {
RoomMembersLoadStatusType.NONE -> doRequest(params)
RoomMembersLoadStatusType.LOADING -> waitPreviousRequestToFinish(params)
RoomMembersLoadStatusType.LOADED -> Unit
}
}
private suspend fun waitPreviousRequestToFinish(params: LoadRoomMembersTask.Params) {
try {
awaitNotEmptyResult(monarchy.realmConfiguration, TimeUnit.MINUTES.toMillis(1L)) { realm ->
realm.where(RoomEntity::class.java)
.equalTo(RoomEntityFields.ROOM_ID, params.roomId)
.equalTo(RoomEntityFields.MEMBERS_LOAD_STATUS_STR, RoomMembersLoadStatusType.LOADED.name)
}
} catch (exception: TimeoutCancellationException) {
// Timeout, do the request anyway (?)
doRequest(params)
}
}
private suspend fun doRequest(params: LoadRoomMembersTask.Params) {
setRoomMembersLoadStatus(params.roomId, RoomMembersLoadStatusType.LOADING)
val lastToken = syncTokenStore.getLastToken()
val response = executeRequest<RoomMembersResponse>(eventBus) {
apiCall = roomAPI.getMembers(params.roomId, lastToken, null, params.excludeMembership?.value)
val response = try {
executeRequest<RoomMembersResponse>(eventBus) {
apiCall = roomAPI.getMembers(params.roomId, lastToken, null, params.excludeMembership?.value)
}
} catch (throwable: Throwable) {
// Revert status to NONE
setRoomMembersLoadStatus(params.roomId, RoomMembersLoadStatusType.NONE)
throw throwable
}
// This will also set the status to LOADED
insertInDb(response, params.roomId)
}
@ -84,14 +116,23 @@ internal class DefaultLoadRoomMembersTask @Inject constructor(
}
roomMemberEventHandler.handle(realm, roomId, roomMemberEvent)
}
roomEntity.areAllMembersLoaded = true
roomEntity.membersLoadStatus = RoomMembersLoadStatusType.LOADED
roomSummaryUpdater.update(realm, roomId, updateMembers = true)
}
}
private fun areAllMembersAlreadyLoaded(roomId: String): Boolean {
return Realm.getInstance(monarchy.realmConfiguration).use {
RoomEntity.where(it, roomId).findFirst()?.areAllMembersLoaded ?: false
private fun getRoomMembersLoadStatus(roomId: String): RoomMembersLoadStatusType {
var result: RoomMembersLoadStatusType?
Realm.getInstance(monarchy.realmConfiguration).use {
result = RoomEntity.where(it, roomId).findFirst()?.membersLoadStatus
}
return result ?: RoomMembersLoadStatusType.NONE
}
private suspend fun setRoomMembersLoadStatus(roomId: String, status: RoomMembersLoadStatusType) {
monarchy.awaitTransaction { realm ->
val roomEntity = RoomEntity.where(realm, roomId).findFirst() ?: realm.createObject(roomId)
roomEntity.membersLoadStatus = status
}
}
}

View file

@ -27,6 +27,7 @@ import org.greenrobot.eventbus.EventBus
import org.greenrobot.eventbus.Subscribe
import org.greenrobot.eventbus.ThreadMode
import org.matrix.android.sdk.api.MatrixCallback
import org.matrix.android.sdk.api.NoOpMatrixCallback
import org.matrix.android.sdk.api.extensions.orFalse
import org.matrix.android.sdk.api.extensions.tryOrNull
import org.matrix.android.sdk.api.session.events.model.EventType
@ -53,6 +54,7 @@ import org.matrix.android.sdk.internal.database.query.filterEvents
import org.matrix.android.sdk.internal.database.query.findAllInRoomWithSendStates
import org.matrix.android.sdk.internal.database.query.where
import org.matrix.android.sdk.internal.database.query.whereRoomId
import org.matrix.android.sdk.internal.session.room.membership.LoadRoomMembersTask
import org.matrix.android.sdk.internal.task.TaskExecutor
import org.matrix.android.sdk.internal.task.configureWith
import org.matrix.android.sdk.internal.util.Debouncer
@ -81,7 +83,8 @@ internal class DefaultTimeline(
private val hiddenReadReceipts: TimelineHiddenReadReceipts,
private val eventBus: EventBus,
private val eventDecryptor: TimelineEventDecryptor,
private val realmSessionProvider: RealmSessionProvider
private val realmSessionProvider: RealmSessionProvider,
private val loadRoomMembersTask: LoadRoomMembersTask
) : Timeline, TimelineHiddenReadReceipts.Delegate {
data class OnNewTimelineEvents(val roomId: String, val eventIds: List<String>)
@ -184,6 +187,13 @@ internal class DefaultTimeline(
if (settings.shouldHandleHiddenReadReceipts()) {
hiddenReadReceipts.start(realm, filteredEvents, nonFilteredEvents, this)
}
loadRoomMembersTask
.configureWith(LoadRoomMembersTask.Params(roomId)) {
this.callback = NoOpMatrixCallback()
}
.executeBy(taskExecutor)
isReady.set(true)
}
}

View file

@ -39,6 +39,7 @@ import org.matrix.android.sdk.internal.database.model.TimelineEventEntity
import org.matrix.android.sdk.internal.database.model.TimelineEventEntityFields
import org.matrix.android.sdk.internal.database.query.where
import org.matrix.android.sdk.internal.di.SessionDatabase
import org.matrix.android.sdk.internal.session.room.membership.LoadRoomMembersTask
import org.matrix.android.sdk.internal.task.TaskExecutor
internal class DefaultTimelineService @AssistedInject constructor(@Assisted private val roomId: String,
@ -51,7 +52,8 @@ internal class DefaultTimelineService @AssistedInject constructor(@Assisted priv
private val paginationTask: PaginationTask,
private val fetchTokenAndPaginateTask: FetchTokenAndPaginateTask,
private val timelineEventMapper: TimelineEventMapper,
private val readReceiptsSummaryMapper: ReadReceiptsSummaryMapper
private val readReceiptsSummaryMapper: ReadReceiptsSummaryMapper,
private val loadRoomMembersTask: LoadRoomMembersTask
) : TimelineService {
@AssistedInject.Factory
@ -73,7 +75,8 @@ internal class DefaultTimelineService @AssistedInject constructor(@Assisted priv
eventBus = eventBus,
eventDecryptor = eventDecryptor,
fetchTokenAndPaginateTask = fetchTokenAndPaginateTask,
realmSessionProvider = realmSessionProvider
realmSessionProvider = realmSessionProvider,
loadRoomMembersTask = loadRoomMembersTask
)
}

View file

@ -161,7 +161,7 @@ Formatter\.formatShortFileSize===1
# android\.text\.TextUtils
### This is not a rule, but a warning: the number of "enum class" has changed. For Json classes, it is mandatory that they have `@JsonClass(generateAdapter = false)`. If the enum is not used as a Json class, change the value in file forbidden_strings_in_code.txt
enum class===84
enum class===85
### Do not import temporary legacy classes
import org.matrix.android.sdk.internal.legacy.riot===3

View file

@ -31,7 +31,6 @@ import im.vector.app.R
import im.vector.app.core.extensions.exhaustive
import im.vector.app.core.platform.VectorViewModel
import im.vector.app.core.resources.StringProvider
import im.vector.app.core.utils.subscribeLogError
import im.vector.app.features.call.WebRtcPeerConnectionManager
import im.vector.app.features.command.CommandParser
import im.vector.app.features.command.ParsedCommand
@ -168,7 +167,6 @@ class RoomDetailViewModel @AssistedInject constructor(
observePowerLevel()
room.getRoomSummaryLive()
room.markAsRead(ReadService.MarkAsReadParams.READ_RECEIPT, NoOpMatrixCallback())
room.rx().loadRoomMembersIfNeeded().subscribeLogError().disposeOnClear()
// Inform the SDK that the room is displayed
session.onRoomDisplayed(initialState.roomId)
chatEffectManager.delegate = this