Flow: restore read receipts

This commit is contained in:
ganfra 2021-10-26 15:57:18 +02:00
parent 9479342a64
commit 8cf5b727e1
2 changed files with 91 additions and 8 deletions

View file

@ -0,0 +1,79 @@
/*
* Copyright (c) 2021 New Vector Ltd
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package im.vector.app.core.flow
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.channels.ClosedReceiveChannelException
import kotlinx.coroutines.channels.ReceiveChannel
import kotlinx.coroutines.channels.produce
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.isActive
import kotlinx.coroutines.selects.select
@ExperimentalCoroutinesApi
fun <T> Flow<T>.chunk(durationInMillis: Long): Flow<List<T>> {
require(durationInMillis> 0) { "Duration should be greater than 0" }
return flow {
coroutineScope {
val events = ArrayList<T>()
val ticker = fixedPeriodTicker(durationInMillis)
try {
val upstreamValues = produce(capacity = Channel.CONFLATED) {
collect { value -> send(value) }
}
while (isActive) {
var hasTimedOut = false
select<Unit> {
upstreamValues.onReceive {
events.add(it)
}
ticker.onReceive {
hasTimedOut = true
}
}
if (hasTimedOut && events.isNotEmpty()) {
emit(events.toList())
events.clear()
}
}
} catch (e: ClosedReceiveChannelException) {
// drain remaining events
if (events.isNotEmpty()) emit(events.toList())
} finally {
ticker.cancel()
}
}
}
}
private fun CoroutineScope.fixedPeriodTicker(delayMillis: Long, initialDelayMillis: Long = delayMillis): ReceiveChannel<Unit> {
require(delayMillis >= 0) { "Expected non-negative delay, but has $delayMillis ms" }
require(initialDelayMillis >= 0) { "Expected non-negative initial delay, but has $initialDelayMillis ms" }
return produce(capacity = 0) {
delay(initialDelayMillis)
while (true) {
channel.send(Unit)
delay(delayMillis)
}
}
}

View file

@ -33,6 +33,7 @@ import dagger.assisted.AssistedInject
import im.vector.app.BuildConfig import im.vector.app.BuildConfig
import im.vector.app.R import im.vector.app.R
import im.vector.app.core.extensions.exhaustive import im.vector.app.core.extensions.exhaustive
import im.vector.app.core.flow.chunk
import im.vector.app.core.mvrx.runCatchingToAsync import im.vector.app.core.mvrx.runCatchingToAsync
import im.vector.app.core.platform.VectorViewModel import im.vector.app.core.platform.VectorViewModel
import im.vector.app.core.resources.StringProvider import im.vector.app.core.resources.StringProvider
@ -58,13 +59,17 @@ import im.vector.app.features.settings.VectorPreferences
import im.vector.app.features.voice.VoicePlayerHelper import im.vector.app.features.voice.VoicePlayerHelper
import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.MutableSharedFlow import kotlinx.coroutines.flow.MutableSharedFlow
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.combine import kotlinx.coroutines.flow.combine
import kotlinx.coroutines.flow.distinctUntilChanged import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.filterIsInstance import kotlinx.coroutines.flow.filterIsInstance
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.launchIn
import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.onEach
import kotlinx.coroutines.flow.sample
import kotlinx.coroutines.launch import kotlinx.coroutines.launch
import kotlinx.coroutines.withContext import kotlinx.coroutines.withContext
import org.matrix.android.sdk.api.MatrixPatterns import org.matrix.android.sdk.api.MatrixPatterns
@ -863,13 +868,13 @@ class RoomDetailViewModel @AssistedInject constructor(
private fun observeEventDisplayedActions() { private fun observeEventDisplayedActions() {
// We are buffering scroll events for one second // We are buffering scroll events for one second
// and keep the most recent one to set the read receipt on. // and keep the most recent one to set the read receipt on.
/*
visibleEventsSource visibleEventsSource
.stream() .stream()
.buffer(1, TimeUnit.SECONDS) .chunk(1000)
.filter { it.isNotEmpty() } .filter { it.isNotEmpty() }
.subscribeBy(onNext = { actions -> .onEach { actions ->
val bufferedMostRecentDisplayedEvent = actions.maxByOrNull { it.event.displayIndex }?.event ?: return@subscribeBy val bufferedMostRecentDisplayedEvent = actions.maxByOrNull { it.event.displayIndex }?.event ?: return@onEach
val globalMostRecentDisplayedEvent = mostRecentDisplayedEvent val globalMostRecentDisplayedEvent = mostRecentDisplayedEvent
if (trackUnreadMessages.get()) { if (trackUnreadMessages.get()) {
if (globalMostRecentDisplayedEvent == null) { if (globalMostRecentDisplayedEvent == null) {
@ -883,10 +888,9 @@ class RoomDetailViewModel @AssistedInject constructor(
tryOrNull { room.setReadReceipt(eventId) } tryOrNull { room.setReadReceipt(eventId) }
} }
} }
}) }
.disposeOnClear() .flowOn(Dispatchers.Default)
.launchIn(viewModelScope)
*/
} }
private fun handleMarkAllAsRead() { private fun handleMarkAllAsRead() {