diff --git a/vector/src/main/java/im/vector/app/core/flow/ChunkOperator.kt b/vector/src/main/java/im/vector/app/core/flow/ChunkOperator.kt new file mode 100644 index 0000000000..533a060883 --- /dev/null +++ b/vector/src/main/java/im/vector/app/core/flow/ChunkOperator.kt @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2021 New Vector Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package im.vector.app.core.flow + +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.channels.ClosedReceiveChannelException +import kotlinx.coroutines.channels.ReceiveChannel +import kotlinx.coroutines.channels.produce +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.isActive +import kotlinx.coroutines.selects.select + +@ExperimentalCoroutinesApi +fun Flow.chunk(durationInMillis: Long): Flow> { + require(durationInMillis> 0) { "Duration should be greater than 0" } + return flow { + coroutineScope { + val events = ArrayList() + val ticker = fixedPeriodTicker(durationInMillis) + try { + val upstreamValues = produce(capacity = Channel.CONFLATED) { + collect { value -> send(value) } + } + while (isActive) { + var hasTimedOut = false + select { + upstreamValues.onReceive { + events.add(it) + } + ticker.onReceive { + hasTimedOut = true + } + } + if (hasTimedOut && events.isNotEmpty()) { + emit(events.toList()) + events.clear() + } + } + } catch (e: ClosedReceiveChannelException) { + // drain remaining events + if (events.isNotEmpty()) emit(events.toList()) + } finally { + ticker.cancel() + } + } + } +} + +private fun CoroutineScope.fixedPeriodTicker(delayMillis: Long, initialDelayMillis: Long = delayMillis): ReceiveChannel { + require(delayMillis >= 0) { "Expected non-negative delay, but has $delayMillis ms" } + require(initialDelayMillis >= 0) { "Expected non-negative initial delay, but has $initialDelayMillis ms" } + return produce(capacity = 0) { + delay(initialDelayMillis) + while (true) { + channel.send(Unit) + delay(delayMillis) + } + } +} diff --git a/vector/src/main/java/im/vector/app/features/home/room/detail/RoomDetailViewModel.kt b/vector/src/main/java/im/vector/app/features/home/room/detail/RoomDetailViewModel.kt index ee929243b5..1b765f81b9 100644 --- a/vector/src/main/java/im/vector/app/features/home/room/detail/RoomDetailViewModel.kt +++ b/vector/src/main/java/im/vector/app/features/home/room/detail/RoomDetailViewModel.kt @@ -33,6 +33,7 @@ import dagger.assisted.AssistedInject import im.vector.app.BuildConfig import im.vector.app.R import im.vector.app.core.extensions.exhaustive +import im.vector.app.core.flow.chunk import im.vector.app.core.mvrx.runCatchingToAsync import im.vector.app.core.platform.VectorViewModel import im.vector.app.core.resources.StringProvider @@ -58,13 +59,17 @@ import im.vector.app.features.settings.VectorPreferences import im.vector.app.features.voice.VoicePlayerHelper import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.buffer import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.combine import kotlinx.coroutines.flow.distinctUntilChanged +import kotlinx.coroutines.flow.filter import kotlinx.coroutines.flow.filterIsInstance +import kotlinx.coroutines.flow.flowOn import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.sample import kotlinx.coroutines.launch import kotlinx.coroutines.withContext import org.matrix.android.sdk.api.MatrixPatterns @@ -863,13 +868,13 @@ class RoomDetailViewModel @AssistedInject constructor( private fun observeEventDisplayedActions() { // We are buffering scroll events for one second // and keep the most recent one to set the read receipt on. - /* + visibleEventsSource .stream() - .buffer(1, TimeUnit.SECONDS) + .chunk(1000) .filter { it.isNotEmpty() } - .subscribeBy(onNext = { actions -> - val bufferedMostRecentDisplayedEvent = actions.maxByOrNull { it.event.displayIndex }?.event ?: return@subscribeBy + .onEach { actions -> + val bufferedMostRecentDisplayedEvent = actions.maxByOrNull { it.event.displayIndex }?.event ?: return@onEach val globalMostRecentDisplayedEvent = mostRecentDisplayedEvent if (trackUnreadMessages.get()) { if (globalMostRecentDisplayedEvent == null) { @@ -883,10 +888,9 @@ class RoomDetailViewModel @AssistedInject constructor( tryOrNull { room.setReadReceipt(eventId) } } } - }) - .disposeOnClear() - - */ + } + .flowOn(Dispatchers.Default) + .launchIn(viewModelScope) } private fun handleMarkAllAsRead() {