From 78fec6b3c9dc4dd40cc4d33b5d0d2eebe5981be8 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 12 Nov 2018 16:20:14 +0000 Subject: [PATCH] Add flag to sync to exclude threads --- synapse/rest/client/v2_alpha/sync.py | 42 ++++++++++++++++++++++++---- synapse/state/__init__.py | 2 ++ synapse/storage/events_worker.py | 3 -- 3 files changed, 38 insertions(+), 9 deletions(-) diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py index 0251146722..a6b1121e8c 100644 --- a/synapse/rest/client/v2_alpha/sync.py +++ b/synapse/rest/client/v2_alpha/sync.py @@ -28,10 +28,12 @@ from synapse.events.utils import ( format_event_raw, serialize_event, ) +from synapse.events import FrozenEvent from synapse.handlers.presence import format_user_presence_state from synapse.handlers.sync import SyncConfig from synapse.http.servlet import RestServlet, parse_boolean, parse_integer, parse_string from synapse.types import StreamToken +from synapse.util.stringutils import random_string from ._base import client_v2_patterns, set_timeline_upper_limit @@ -118,6 +120,8 @@ class SyncRestServlet(RestServlet): ) ) + exclude_threaded = b"exclude_threaded" in request.args + request_key = (user, timeout, since, filter_id, full_state, device_id) if filter_id: @@ -169,13 +173,14 @@ class SyncRestServlet(RestServlet): time_now = self.clock.time_msec() response_content = self.encode_response( - time_now, sync_result, requester.access_token_id, filter + time_now, sync_result, requester.access_token_id, filter, + exclude_threaded=exclude_threaded, ) defer.returnValue((200, response_content)) @staticmethod - def encode_response(time_now, sync_result, access_token_id, filter): + def encode_response(time_now, sync_result, access_token_id, filter, exclude_threaded): if filter.event_format == 'client': event_formatter = format_event_for_client_v2_without_room_id elif filter.event_format == 'federation': @@ -187,6 +192,7 @@ class SyncRestServlet(RestServlet): sync_result.joined, time_now, access_token_id, filter.event_fields, event_formatter, + exclude_threaded=exclude_threaded, ) invited = SyncRestServlet.encode_invited( @@ -240,7 +246,7 @@ class SyncRestServlet(RestServlet): } @staticmethod - def encode_joined(rooms, time_now, token_id, event_fields, event_formatter): + def encode_joined(rooms, time_now, token_id, event_fields, event_formatter, exclude_threaded): """ Encode the joined rooms in a sync result @@ -263,7 +269,7 @@ class SyncRestServlet(RestServlet): for room in rooms: joined[room.room_id] = SyncRestServlet.encode_room( room, time_now, token_id, joined=True, only_fields=event_fields, - event_formatter=event_formatter, + event_formatter=event_formatter, exclude_threaded=exclude_threaded, ) return joined @@ -337,7 +343,7 @@ class SyncRestServlet(RestServlet): @staticmethod def encode_room( room, time_now, token_id, joined, - only_fields, event_formatter, + only_fields, event_formatter, exclude_threaded, ): """ Args: @@ -377,7 +383,31 @@ class SyncRestServlet(RestServlet): ) serialized_state = [serialize(e) for e in state_events] - serialized_timeline = [serialize(e) for e in timeline_events] + + if exclude_threaded: + serialized_timeline = [] + seen_threads = set() + for e in reversed(timeline_events): + thread_id = e.internal_metadata.thread_id + if thread_id != 0: + if thread_id not in seen_threads: + serialized_timeline.append({ + "type": "org.matrix.new_thread", + "content": { + "thread_id": thread_id, + "latest_event": e.event_id, + }, + "event_id": random_string(24), + "origin_server_ts": e.origin_server_ts, + "sender": "@server", + }) + seen_threads.add(thread_id) + else: + serialized_timeline.append(serialize(e)) + + serialized_timeline.reverse() + else: + serialized_timeline = [serialize(e) for e in timeline_events] account_data = room.account_data diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index 35041028fe..5c9f306af8 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -194,6 +194,8 @@ class StateHandler(object): synapse.events.snapshot.EventContext: """ + event.internal_metadata.thread_id = thread_id + if event.internal_metadata.is_outlier(): # If this is an outlier, then we know it shouldn't have any current # state. Certainly store.get_current_state won't return any, and diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index 5b9cad5522..9397c92dbb 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -398,7 +398,6 @@ class EventsWorkerStore(SQLBaseStore): with Measure(self._clock, "_get_event_from_row"): d = json.loads(js) internal_metadata = json.loads(internal_metadata) - internal_metadata["thread_id"] = thread_id if rejected_reason: rejected_reason = yield self._simple_select_one_onecol( @@ -414,8 +413,6 @@ class EventsWorkerStore(SQLBaseStore): rejected_reason=rejected_reason, ) - original_ev.unsigned["thread_id"] = thread_id - redacted_event = None if redacted: redacted_event = prune_event(original_ev)