Merge pull request #2845 from nextcloud/bugfix/2814/avoidDuplicatedMessagesAfterHistoryPull

Bugfix/2814/avoid duplicated messages after history pull
This commit is contained in:
Marcel Hibbe 2023-03-09 16:13:08 +01:00 committed by GitHub
commit 0fabb73058
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 144 additions and 172 deletions

View file

@ -257,14 +257,12 @@ class ChatController(args: Bundle) :
private val roomPassword: String
var credentials: String? = null
var currentConversation: Conversation? = null
private var historyRead = false
private var globalLastKnownFutureMessageId = -1
private var globalLastKnownPastMessageId = -1
var adapter: TalkMessagesListAdapter<ChatMessage>? = null
private var mentionAutocomplete: Autocomplete<*>? = null
var layoutManager: LinearLayoutManager? = null
var pullChatMessagesPending = false
private var lookingIntoFuture = false
var newMessagesCount = 0
var startCallFromNotification: Boolean? = null
var startCallFromRoomSwitch: Boolean = false
@ -1414,10 +1412,10 @@ class ChatController(args: Bundle) :
binding?.messageInputView?.inputEditText?.visibility = View.VISIBLE
if (isFirstMessagesProcessing && pastPreconditionFailed) {
pastPreconditionFailed = false
pullChatMessages(0)
pullChatMessages(false)
} else if (futurePreconditionFailed) {
futurePreconditionFailed = false
pullChatMessages(1)
pullChatMessages(true)
}
}
} else {
@ -2014,9 +2012,9 @@ class ChatController(args: Bundle) :
webSocketInstance?.getSignalingMessageReceiver()?.addListener(localParticipantMessageListener)
if (isFirstMessagesProcessing) {
pullChatMessages(0)
pullChatMessages(false)
} else {
pullChatMessages(1, 0)
pullChatMessages(true, false)
}
if (webSocketInstance != null) {
@ -2054,9 +2052,9 @@ class ChatController(args: Bundle) :
}
if (isFirstMessagesProcessing) {
pullChatMessages(0)
pullChatMessages(false)
} else {
pullChatMessages(1)
pullChatMessages(true)
}
}
}
@ -2234,11 +2232,17 @@ class ChatController(args: Bundle) :
}
}
fun pullChatMessages(lookIntoFuture: Int, setReadMarker: Int = 1, xChatLastCommonRead: Int? = null) {
fun pullChatMessages(
lookIntoFuture: Boolean,
setReadMarker: Boolean = true,
xChatLastCommonRead: Int? = null
) {
if (!validSessionId()) {
return
}
Log.d(TAG, "pullChatMessages. lookIntoFuture= $lookIntoFuture")
if (pullChatMessagesPending) {
// Sometimes pullChatMessages may be called before response to a previous call is received.
// In such cases just ignore the second call. Message processing will continue when response to the
@ -2249,12 +2253,118 @@ class ChatController(args: Bundle) :
}
pullChatMessagesPending = true
val pullChatMessagesFieldMap = setupFieldsForPullChatMessages(
lookIntoFuture,
xChatLastCommonRead,
setReadMarker
)
var apiVersion = 1
// FIXME this is a best guess, guests would need to get the capabilities themselves
if (conversationUser != null) {
apiVersion = ApiUtils.getChatApiVersion(conversationUser, intArrayOf(1))
}
ncApi.pullChatMessages(
credentials,
ApiUtils.getUrlForChat(apiVersion, conversationUser?.baseUrl, roomToken),
pullChatMessagesFieldMap
)
?.subscribeOn(Schedulers.io())
?.observeOn(AndroidSchedulers.mainThread())
?.subscribe(object : Observer<Response<*>> {
override fun onSubscribe(d: Disposable) {
disposables.add(d)
}
@SuppressLint("NotifyDataSetChanged")
@Suppress("Detekt.TooGenericExceptionCaught")
override fun onNext(response: Response<*>) {
pullChatMessagesPending = false
when (response.code()) {
HTTP_CODE_NOT_MODIFIED -> {
Log.d(TAG, "pullChatMessages - HTTP_CODE_NOT_MODIFIED.")
if (lookIntoFuture) {
Log.d(TAG, "recursive call to pullChatMessages.")
pullChatMessages(true, setReadMarker, xChatLastCommonRead)
}
}
HTTP_CODE_PRECONDITION_FAILED -> {
Log.d(TAG, "pullChatMessages - HTTP_CODE_PRECONDITION_FAILED.")
if (lookIntoFuture) {
futurePreconditionFailed = true
} else {
pastPreconditionFailed = true
}
}
HTTP_CODE_OK -> {
Log.d(TAG, "pullChatMessages - HTTP_CODE_OK.")
val chatOverall = response.body() as ChatOverall?
val chatMessageList = handleSystemMessages(chatOverall?.ocs!!.data!!)
processHeaderChatLastGiven(response, lookIntoFuture)
if (chatMessageList.isNotEmpty() &&
ChatMessage.SystemMessageType.CLEARED_CHAT == chatMessageList[0].systemMessageType
) {
adapter?.clear()
adapter?.notifyDataSetChanged()
}
if (lookIntoFuture) {
processMessagesFromTheFuture(chatMessageList)
} else {
processMessagesNotFromTheFuture(chatMessageList)
}
val newXChatLastCommonRead = response.headers()["X-Chat-Last-Common-Read"]?.let {
Integer.parseInt(it)
}
updateReadStatusOfAllMessages(newXChatLastCommonRead)
adapter?.notifyDataSetChanged()
if (isFirstMessagesProcessing || lookIntoFuture) {
Log.d(TAG, "recursive call to pullChatMessages")
pullChatMessages(true, true, newXChatLastCommonRead)
}
}
}
processExpiredMessages()
if (isFirstMessagesProcessing) {
cancelNotificationsForCurrentConversation()
isFirstMessagesProcessing = false
binding?.progressBar?.visibility = View.GONE
binding?.messagesListView?.visibility = View.VISIBLE
}
}
override fun onError(e: Throwable) {
Log.e(TAG, "pullChatMessages - pullChatMessages ERROR", e)
pullChatMessagesPending = false
}
override fun onComplete() {
pullChatMessagesPending = false
}
})
}
private fun setupFieldsForPullChatMessages(
lookIntoFuture: Boolean,
xChatLastCommonRead: Int?,
setReadMarker: Boolean
): HashMap<String, Int> {
val fieldMap = HashMap<String, Int>()
fieldMap["includeLastKnown"] = 0
if (lookIntoFuture > 0) {
lookingIntoFuture = true
} else if (isFirstMessagesProcessing) {
if (!lookIntoFuture && isFirstMessagesProcessing) {
if (currentConversation != null) {
globalLastKnownFutureMessageId = currentConversation!!.lastReadMessage
globalLastKnownPastMessageId = currentConversation!!.lastReadMessage
@ -2262,19 +2372,7 @@ class ChatController(args: Bundle) :
}
}
val timeout = if (lookingIntoFuture) {
LOOKING_INTO_FUTURE_TIMEOUT
} else {
0
}
fieldMap["timeout"] = timeout
fieldMap["lookIntoFuture"] = lookIntoFuture
fieldMap["limit"] = MESSAGE_PULL_LIMIT
fieldMap["setReadMarker"] = setReadMarker
val lastKnown = if (lookIntoFuture > 0) {
val lastKnown = if (lookIntoFuture) {
globalLastKnownFutureMessageId
} else {
globalLastKnownPastMessageId
@ -2285,88 +2383,27 @@ class ChatController(args: Bundle) :
fieldMap["lastCommonReadId"] = it
}
var apiVersion = 1
// FIXME this is a best guess, guests would need to get the capabilities themselves
if (conversationUser != null) {
apiVersion = ApiUtils.getChatApiVersion(conversationUser, intArrayOf(1))
}
if (lookIntoFuture > 0) {
Log.d(TAG, "pullChatMessages - pullChatMessages[lookIntoFuture > 0] - calling")
ncApi.pullChatMessages(
credentials,
ApiUtils.getUrlForChat(apiVersion, conversationUser?.baseUrl, roomToken),
fieldMap
)
?.subscribeOn(Schedulers.io())
?.observeOn(AndroidSchedulers.mainThread())
?.subscribe(object : Observer<Response<*>> {
override fun onSubscribe(d: Disposable) {
disposables.add(d)
}
@Suppress("Detekt.TooGenericExceptionCaught")
override fun onNext(response: Response<*>) {
Log.d(TAG, "pullChatMessages - pullChatMessages[lookIntoFuture > 0] - got response")
pullChatMessagesPending = false
if (response.code() == HTTP_CODE_NOT_MODIFIED) {
Log.d(TAG, "pullChatMessages - quasi recursive call to pullChatMessages")
pullChatMessages(1, setReadMarker, xChatLastCommonRead)
} else if (response.code() == HTTP_CODE_PRECONDITION_FAILED) {
futurePreconditionFailed = true
} else {
processMessagesResponse(response, true)
}
processExpiredMessages()
}
override fun onError(e: Throwable) {
Log.e(TAG, "pullChatMessages - pullChatMessages[lookIntoFuture > 0] - ERROR", e)
pullChatMessagesPending = false
}
override fun onComplete() {
pullChatMessagesPending = false
}
})
val timeout = if (lookIntoFuture) {
LOOKING_INTO_FUTURE_TIMEOUT
} else {
Log.d(TAG, "pullChatMessages - pullChatMessages[lookIntoFuture <= 0] - calling")
ncApi.pullChatMessages(
credentials,
ApiUtils.getUrlForChat(apiVersion, conversationUser?.baseUrl, roomToken),
fieldMap
)
?.subscribeOn(Schedulers.io())
?.observeOn(AndroidSchedulers.mainThread())
?.subscribe(object : Observer<Response<*>> {
override fun onSubscribe(d: Disposable) {
disposables.add(d)
}
@Suppress("Detekt.TooGenericExceptionCaught")
override fun onNext(response: Response<*>) {
Log.d(TAG, "pullChatMessages - pullChatMessages[lookIntoFuture <= 0] - got response")
pullChatMessagesPending = false
if (response.code() == HTTP_CODE_PRECONDITION_FAILED) {
pastPreconditionFailed = true
} else {
processMessagesResponse(response, false)
}
processExpiredMessages()
}
override fun onError(e: Throwable) {
Log.e(TAG, "pullChatMessages - pullChatMessages[lookIntoFuture <= 0] - ERROR", e)
pullChatMessagesPending = false
}
override fun onComplete() {
pullChatMessagesPending = false
}
})
0
}
fieldMap["timeout"] = timeout
fieldMap["limit"] = MESSAGE_PULL_LIMIT
if (lookIntoFuture) {
fieldMap["lookIntoFuture"] = 1
} else {
fieldMap["lookIntoFuture"] = 0
}
if (setReadMarker) {
fieldMap["setReadMarker"] = 1
} else {
fieldMap["setReadMarker"] = 0
}
return fieldMap
}
private fun processExpiredMessages() {
@ -2393,69 +2430,6 @@ class ChatController(args: Bundle) :
}
}
private fun processMessagesResponse(response: Response<*>, isFromTheFuture: Boolean) {
val xChatLastCommonRead = response.headers()["X-Chat-Last-Common-Read"]?.let {
Integer.parseInt(it)
}
processHeaderChatLastGiven(response, isFromTheFuture)
if (response.code() == HTTP_CODE_OK) {
val chatOverall = response.body() as ChatOverall?
val chatMessageList = handleSystemMessages(chatOverall?.ocs!!.data!!)
processMessages(chatMessageList, isFromTheFuture, xChatLastCommonRead)
} else if (response.code() == HTTP_CODE_NOT_MODIFIED && !isFromTheFuture) {
if (isFirstMessagesProcessing) {
cancelNotificationsForCurrentConversation()
isFirstMessagesProcessing = false
binding?.progressBar?.visibility = View.GONE
}
historyRead = true
if (!lookingIntoFuture && validSessionId()) {
pullChatMessages(1)
}
}
}
private fun processMessages(
chatMessageList: List<ChatMessage>,
isFromTheFuture: Boolean,
xChatLastCommonRead: Int?
) {
if (chatMessageList.isNotEmpty() &&
ChatMessage.SystemMessageType.CLEARED_CHAT == chatMessageList[0].systemMessageType
) {
adapter?.clear()
adapter?.notifyDataSetChanged()
}
if (isFirstMessagesProcessing) {
cancelNotificationsForCurrentConversation()
isFirstMessagesProcessing = false
binding?.progressBar?.visibility = View.GONE
binding?.messagesListView?.visibility = View.VISIBLE
}
if (isFromTheFuture) {
processMessagesFromTheFuture(chatMessageList)
} else {
processMessagesNotFromTheFuture(chatMessageList)
}
updateReadStatusOfAllMessages(xChatLastCommonRead)
adapter?.notifyDataSetChanged()
if (validSessionId()) {
pullChatMessages(1, 1, xChatLastCommonRead)
}
}
private fun updateReadStatusOfAllMessages(xChatLastCommonRead: Int?) {
if (adapter != null) {
for (message in adapter!!.items) {
@ -2644,9 +2618,7 @@ class ChatController(args: Bundle) :
}
override fun onLoadMore(page: Int, totalItemsCount: Int) {
if (!historyRead && validSessionId()) {
pullChatMessages(0)
}
pullChatMessages(false)
}
override fun format(date: Date): String {

View file

@ -1,2 +1,2 @@
DO NOT TOUCH; GENERATED BY DRONE
<span class="mdl-layout-title">Lint Report: 110 warnings</span>
<span class="mdl-layout-title">Lint Report: 108 warnings</span>