From fd1c18c0887321934f89e38ab9d62b677128fffb Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 30 Jan 2017 17:00:24 +0000 Subject: [PATCH 01/18] Use DB cache of joined users for presence --- synapse/handlers/presence.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 1b89dc6274..9982ae0fed 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -574,7 +574,7 @@ class PresenceHandler(object): if not local_states: continue - users = yield self.state.get_current_user_in_room(room_id) + users = yield self.store.get_users_in_room(room_id) hosts = set(get_domain_from_id(u) for u in users) for host in hosts: @@ -766,7 +766,7 @@ class PresenceHandler(object): # don't need to send to local clients here, as that is done as part # of the event stream/sync. # TODO: Only send to servers not already in the room. - user_ids = yield self.state.get_current_user_in_room(room_id) + user_ids = yield self.store.get_users_in_room(room_id) if self.is_mine(user): state = yield self.current_state_for_user(user.to_string()) @@ -1069,7 +1069,7 @@ class PresenceEventSource(object): user_ids_to_check = set() for room_id in room_ids: - users = yield self.state.get_current_user_in_room(room_id) + users = yield self.store.get_users_in_room(room_id) user_ids_to_check.update(users) user_ids_to_check.update(friends) From 4b3403ca9b87a8187ea597027a82be9fe005cfb9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 30 Jan 2017 17:24:32 +0000 Subject: [PATCH 02/18] Stream cache invalidations for room membership storage functions --- synapse/storage/events.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 8712d7e18c..6685b9da1c 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -564,9 +564,13 @@ class EventsStore(SQLBaseStore): ) for member in members_changed: - txn.call_after(self.get_rooms_for_user.invalidate, (member,)) + self._invalidate_cache_and_stream( + txn, self.get_rooms_for_user, (member,) + ) - txn.call_after(self.get_users_in_room.invalidate, (room_id,)) + self._invalidate_cache_and_stream( + txn, self.get_users_in_room, (room_id,) + ) # Add an entry to the current_state_resets table to record the point # where we clobbered the current state From 05b9f48ee577f1cbdd5c5837f22c0d9cbe4c44cc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 31 Jan 2017 10:08:55 +0000 Subject: [PATCH 03/18] Fix clearing out old device list outbound pokes --- synapse/storage/devices.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index f0353929da..e6fe67ee25 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -545,7 +545,7 @@ class DeviceStore(SQLBaseStore): (destination, user_id) tuple to ensure that the prev_ids remain correct if the server does come back. """ - now = self._clock.time_msec() + yesterday = self._clock.time_msec() - 24 * 60 * 60 * 1000 def _prune_txn(txn): select_sql = """ @@ -557,6 +557,9 @@ class DeviceStore(SQLBaseStore): txn.execute(select_sql) rows = txn.fetchall() + if not rows: + return + delete_sql = """ DELETE FROM device_lists_outbound_pokes WHERE ts < ? AND destination = ? AND user_id = ? AND stream_id < ? @@ -565,11 +568,13 @@ class DeviceStore(SQLBaseStore): txn.executemany( delete_sql, ( - (now, row["destination"], row["user_id"], row["stream_id"]) + (yesterday, row[0], row[1], row[2]) for row in rows ) ) + logger.info("Pruned %d device list outbound pokes", txn.rowcount) + return self.runInteraction( "_prune_old_outbound_device_pokes", _prune_txn ) From d3169e8d28a4b7238256ff4d3151e3cc8feef0e1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 31 Jan 2017 11:20:03 +0000 Subject: [PATCH 04/18] Only fetch with row ts and count > 1 --- synapse/storage/devices.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index e6fe67ee25..cccefdd3d2 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -552,9 +552,10 @@ class DeviceStore(SQLBaseStore): SELECT destination, user_id, max(stream_id) as stream_id FROM device_lists_outbound_pokes GROUP BY destination, user_id + HAVING min(ts) < ? AND count(*) > 1 """ - txn.execute(select_sql) + txn.execute(select_sql, (yesterday,)) rows = txn.fetchall() if not rows: From ab55794b6f57988204605f3b1e7245a66e91dcec Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 31 Jan 2017 13:22:41 +0000 Subject: [PATCH 05/18] Fix deletion of old sent devices correctly --- synapse/storage/devices.py | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index cccefdd3d2..8e17800364 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -436,15 +436,27 @@ class DeviceStore(SQLBaseStore): ) def _mark_as_sent_devices_by_remote_txn(self, txn, destination, stream_id): + # First we DELETE all rows such that only the latest row for each + # (destination, user_id is left. We do this by selecting first and + # deleting. + sql = """ + SELECT user_id, coalesce(max(stream_id), 0) FROM device_lists_outbound_pokes + WHERE destination = ? AND stream_id <= ? + GROUP BY user_id + HAVING count(*) > 1 + """ + txn.execute(sql, (destination, stream_id,)) + rows = txn.fetchall() + sql = """ DELETE FROM device_lists_outbound_pokes - WHERE destination = ? AND stream_id < ( - SELECT coalesce(max(stream_id), 0) FROM device_lists_outbound_pokes - WHERE destination = ? AND stream_id <= ? - ) + WHERE destination = ? AND user_id = ? AND stream_id < ? """ - txn.execute(sql, (destination, destination, stream_id,)) + txn.executemany( + sql, ((destination, row[0], row[1],) for row in rows) + ) + # Mark everything that is left as sent sql = """ UPDATE device_lists_outbound_pokes SET sent = ? WHERE destination = ? AND stream_id <= ? From ae7a132f38404e9f654ab1b7c5dd84ba6a3efda6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 31 Jan 2017 13:40:09 +0000 Subject: [PATCH 06/18] Better handle 404 response for federation /send/ --- synapse/federation/transaction_queue.py | 1 + synapse/util/retryutils.py | 15 +++++++++++++-- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index d18f6b6cfd..cb106c6a1b 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -319,6 +319,7 @@ class TransactionQueue(object): destination, self.clock, self.store, + backoff_on_404=True, # If we get a 404 the other side has gone ) device_message_edus, device_stream_id, dev_list_id = ( diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index e2de7fce91..cc88a0b532 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -88,7 +88,7 @@ class RetryDestinationLimiter(object): def __init__(self, destination, clock, store, retry_interval, min_retry_interval=10 * 60 * 1000, max_retry_interval=24 * 60 * 60 * 1000, - multiplier_retry_interval=5,): + multiplier_retry_interval=5, backoff_on_404=False): """Marks the destination as "down" if an exception is thrown in the context, except for CodeMessageException with code < 500. @@ -107,6 +107,7 @@ class RetryDestinationLimiter(object): a failed request, in milliseconds. multiplier_retry_interval (int): The multiplier to use to increase the retry interval after a failed request. + backoff_on_404 (bool): Back off if we get a 404 """ self.clock = clock self.store = store @@ -116,6 +117,7 @@ class RetryDestinationLimiter(object): self.min_retry_interval = min_retry_interval self.max_retry_interval = max_retry_interval self.multiplier_retry_interval = multiplier_retry_interval + self.backoff_on_404 = backoff_on_404 def __enter__(self): pass @@ -123,7 +125,16 @@ class RetryDestinationLimiter(object): def __exit__(self, exc_type, exc_val, exc_tb): valid_err_code = False if exc_type is not None and issubclass(exc_type, CodeMessageException): - valid_err_code = exc_val.code != 429 and 0 <= exc_val.code < 500 + if exc_val.code < 400: + valid_err_code = True + elif exc_val.code == 404 and self.backoff_on_404: + valid_err_code = False + elif exc_val.code == 429: + valid_err_code = False + elif exc_val.code < 500: + valid_err_code = True + else: + valid_err_code = False if exc_type is None or valid_err_code: # We connected successfully. From 85c590105f87a6cd138f1509f70087aa0881cf2d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 31 Jan 2017 13:46:38 +0000 Subject: [PATCH 07/18] Comment --- synapse/util/retryutils.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index cc88a0b532..0961dd5b25 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -125,6 +125,10 @@ class RetryDestinationLimiter(object): def __exit__(self, exc_type, exc_val, exc_tb): valid_err_code = False if exc_type is not None and issubclass(exc_type, CodeMessageException): + # Some error codes are perfectly fine for some APIs, whereas other + # APIs may expect to never received e.g. a 404. It's important to + # handle 404 as some remote servers will return a 404 when the HS + # has been decommissioned. if exc_val.code < 400: valid_err_code = True elif exc_val.code == 404 and self.backoff_on_404: From 4c0ec15bdcb8fbecf5e4f6cdd3017c9c53076972 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 31 Jan 2017 13:53:46 +0000 Subject: [PATCH 08/18] Comment --- synapse/util/retryutils.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index 0961dd5b25..5c7fc1afb4 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -134,6 +134,8 @@ class RetryDestinationLimiter(object): elif exc_val.code == 404 and self.backoff_on_404: valid_err_code = False elif exc_val.code == 429: + # 429 is us being aggresively rate limited, so lets rate limit + # ourselves. valid_err_code = False elif exc_val.code < 500: valid_err_code = True From 21b73757780cc8609e895cd851a3b5072c8a7e32 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 31 Jan 2017 15:15:41 +0000 Subject: [PATCH 09/18] Add an index to make membership queries faster --- synapse/storage/roommember.py | 2 +- .../schema/delta/40/current_state_idx.sql | 17 +++++++++++++++++ synapse/storage/state.py | 8 ++++++++ 3 files changed, 26 insertions(+), 1 deletion(-) create mode 100644 synapse/storage/schema/delta/40/current_state_idx.sql diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 0fdcf29085..10f7c7a4bc 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -220,7 +220,7 @@ class RoomMemberStore(SQLBaseStore): " ON e.event_id = c.event_id" " AND m.room_id = c.room_id" " AND m.user_id = c.state_key" - " WHERE %s" + " WHERE c.type = 'm.room.member' AND %s" ) % (where_clause,) txn.execute(sql, args) diff --git a/synapse/storage/schema/delta/40/current_state_idx.sql b/synapse/storage/schema/delta/40/current_state_idx.sql new file mode 100644 index 0000000000..7ffa189f39 --- /dev/null +++ b/synapse/storage/schema/delta/40/current_state_idx.sql @@ -0,0 +1,17 @@ +/* Copyright 2017 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +INSERT INTO background_updates (update_name, progress_json) VALUES + ('current_state_members_idx', '{}'); diff --git a/synapse/storage/state.py b/synapse/storage/state.py index d1d653327c..1b3800eb6a 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -49,6 +49,7 @@ class StateStore(SQLBaseStore): STATE_GROUP_DEDUPLICATION_UPDATE_NAME = "state_group_state_deduplication" STATE_GROUP_INDEX_UPDATE_NAME = "state_group_state_type_index" + CURRENT_STATE_INDEX_UPDATE_NAME = "current_state_members_idx" def __init__(self, hs): super(StateStore, self).__init__(hs) @@ -60,6 +61,13 @@ class StateStore(SQLBaseStore): self.STATE_GROUP_INDEX_UPDATE_NAME, self._background_index_state, ) + self.register_background_index_update( + self.CURRENT_STATE_INDEX_UPDATE_NAME, + index_name="current_state_events_member_index", + table="current_state_events", + columns=["state_key"], + where_clause="type='m.room.member'", + ) @defer.inlineCallbacks def get_state_groups_ids(self, room_id, event_ids): From fe08db2713cb35e1424034d58d750ebdc52cedbc Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 31 Jan 2017 15:21:32 +0000 Subject: [PATCH 10/18] Remove explicit < 400 check as apparently this is confusing --- synapse/util/retryutils.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index 5c7fc1afb4..b94ae369cf 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -129,9 +129,7 @@ class RetryDestinationLimiter(object): # APIs may expect to never received e.g. a 404. It's important to # handle 404 as some remote servers will return a 404 when the HS # has been decommissioned. - if exc_val.code < 400: - valid_err_code = True - elif exc_val.code == 404 and self.backoff_on_404: + if exc_val.code == 404 and self.backoff_on_404: valid_err_code = False elif exc_val.code == 429: # 429 is us being aggresively rate limited, so lets rate limit From 458b6f473314a81d7e671fc2fc8c30d3259924c4 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 31 Jan 2017 16:09:03 +0000 Subject: [PATCH 11/18] Only invalidate membership caches based on the cache stream Before we completely invalidated get_users_in_room whenever we updated any current_state_events table. This was way too aggressive. --- synapse/replication/resource.py | 3 --- synapse/replication/slave/storage/events.py | 21 +++++---------------- synapse/storage/events.py | 20 -------------------- synapse/storage/roommember.py | 2 -- 4 files changed, 5 insertions(+), 41 deletions(-) diff --git a/synapse/replication/resource.py b/synapse/replication/resource.py index a30e647474..d8eb14592b 100644 --- a/synapse/replication/resource.py +++ b/synapse/replication/resource.py @@ -299,9 +299,6 @@ class ReplicationResource(Resource): "backward_ex_outliers", res.backward_ex_outliers, ("position", "event_id", "state_group"), ) - writer.write_header_and_rows( - "state_resets", res.state_resets, ("position",), - ) @defer.inlineCallbacks def presence(self, writer, current_token, request_streams): diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index b3f3bf7488..15a025a019 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -192,10 +192,6 @@ class SlavedEventStore(BaseSlavedStore): return result def process_replication(self, result): - state_resets = set( - r[0] for r in result.get("state_resets", {"rows": []})["rows"] - ) - stream = result.get("events") if stream: self._stream_id_gen.advance(int(stream["position"])) @@ -205,7 +201,7 @@ class SlavedEventStore(BaseSlavedStore): for row in stream["rows"]: self._process_replication_row( - row, backfilled=False, state_resets=state_resets + row, backfilled=False, ) stream = result.get("backfill") @@ -213,7 +209,7 @@ class SlavedEventStore(BaseSlavedStore): self._backfill_id_gen.advance(-int(stream["position"])) for row in stream["rows"]: self._process_replication_row( - row, backfilled=True, state_resets=state_resets + row, backfilled=True, ) stream = result.get("forward_ex_outliers") @@ -232,20 +228,15 @@ class SlavedEventStore(BaseSlavedStore): return super(SlavedEventStore, self).process_replication(result) - def _process_replication_row(self, row, backfilled, state_resets): - position = row[0] + def _process_replication_row(self, row, backfilled): internal = json.loads(row[1]) event_json = json.loads(row[2]) event = FrozenEvent(event_json, internal_metadata_dict=internal) self.invalidate_caches_for_event( - event, backfilled, reset_state=position in state_resets + event, backfilled, ) - def invalidate_caches_for_event(self, event, backfilled, reset_state): - if reset_state: - self.get_rooms_for_user.invalidate_all() - self.get_users_in_room.invalidate((event.room_id,)) - + def invalidate_caches_for_event(self, event, backfilled): self._invalidate_get_event_cache(event.event_id) self.get_latest_event_ids_in_room.invalidate((event.room_id,)) @@ -267,8 +258,6 @@ class SlavedEventStore(BaseSlavedStore): self._invalidate_get_event_cache(event.redacts) if event.type == EventTypes.Member: - self.get_rooms_for_user.invalidate((event.state_key,)) - self.get_users_in_room.invalidate((event.room_id,)) self._membership_stream_cache.entity_has_changed( event.state_key, event.internal_metadata.stream_ordering ) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 6685b9da1c..f4352b326b 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -572,14 +572,6 @@ class EventsStore(SQLBaseStore): txn, self.get_users_in_room, (room_id,) ) - # Add an entry to the current_state_resets table to record the point - # where we clobbered the current state - self._simple_insert_txn( - txn, - table="current_state_resets", - values={"event_stream_ordering": max_stream_order} - ) - for room_id, new_extrem in new_forward_extremeties.items(): self._simple_delete_txn( txn, @@ -1610,15 +1602,6 @@ class EventsStore(SQLBaseStore): else: upper_bound = current_forward_id - sql = ( - "SELECT event_stream_ordering FROM current_state_resets" - " WHERE ? < event_stream_ordering" - " AND event_stream_ordering <= ?" - " ORDER BY event_stream_ordering ASC" - ) - txn.execute(sql, (last_forward_id, upper_bound)) - state_resets = txn.fetchall() - sql = ( "SELECT event_stream_ordering, event_id, state_group" " FROM ex_outlier_stream" @@ -1630,7 +1613,6 @@ class EventsStore(SQLBaseStore): forward_ex_outliers = txn.fetchall() else: new_forward_events = [] - state_resets = [] forward_ex_outliers = [] sql = ( @@ -1670,7 +1652,6 @@ class EventsStore(SQLBaseStore): return AllNewEventsResult( new_forward_events, new_backfill_events, forward_ex_outliers, backward_ex_outliers, - state_resets, ) return self.runInteraction("get_all_new_events", get_all_new_events_txn) @@ -1896,5 +1877,4 @@ class EventsStore(SQLBaseStore): AllNewEventsResult = namedtuple("AllNewEventsResult", [ "new_forward_events", "new_backfill_events", "forward_ex_outliers", "backward_ex_outliers", - "state_resets" ]) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 0fdcf29085..845def8467 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -66,8 +66,6 @@ class RoomMemberStore(SQLBaseStore): ) for event in events: - txn.call_after(self.get_rooms_for_user.invalidate, (event.state_key,)) - txn.call_after(self.get_users_in_room.invalidate, (event.room_id,)) txn.call_after( self._membership_stream_cache.entity_has_changed, event.state_key, event.internal_metadata.stream_ordering From 692daf6f5439c3c4852934f3bc950ccac2ec6d92 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 31 Jan 2017 16:10:16 +0000 Subject: [PATCH 12/18] Remote membership tests for replication This is because it now relies of the caches stream, which only works on postgres. We are trying to test with sqlite. --- .../replication/slave/storage/test_events.py | 43 ------------------- 1 file changed, 43 deletions(-) diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py index 6acb8ab758..105e1228bb 100644 --- a/tests/replication/slave/storage/test_events.py +++ b/tests/replication/slave/storage/test_events.py @@ -58,49 +58,6 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): def tearDown(self): [unpatch() for unpatch in self.unpatches] - @defer.inlineCallbacks - def test_room_members(self): - yield self.persist(type="m.room.create", key="", creator=USER_ID) - yield self.replicate() - yield self.check("get_rooms_for_user", (USER_ID,), []) - yield self.check("get_users_in_room", (ROOM_ID,), []) - - # Join the room. - join = yield self.persist(type="m.room.member", key=USER_ID, membership="join") - yield self.replicate() - yield self.check("get_rooms_for_user", (USER_ID,), [RoomsForUser( - room_id=ROOM_ID, - sender=USER_ID, - membership="join", - event_id=join.event_id, - stream_ordering=join.internal_metadata.stream_ordering, - )]) - yield self.check("get_users_in_room", (ROOM_ID,), [USER_ID]) - - # Leave the room. - yield self.persist(type="m.room.member", key=USER_ID, membership="leave") - yield self.replicate() - yield self.check("get_rooms_for_user", (USER_ID,), []) - yield self.check("get_users_in_room", (ROOM_ID,), []) - - # Add some other user to the room. - join = yield self.persist(type="m.room.member", key=USER_ID_2, membership="join") - yield self.replicate() - yield self.check("get_rooms_for_user", (USER_ID_2,), [RoomsForUser( - room_id=ROOM_ID, - sender=USER_ID, - membership="join", - event_id=join.event_id, - stream_ordering=join.internal_metadata.stream_ordering, - )]) - yield self.check("get_users_in_room", (ROOM_ID,), [USER_ID_2]) - - yield self.persist( - type="m.room.member", key=USER_ID, membership="join", - ) - yield self.replicate() - yield self.check("get_users_in_room", (ROOM_ID,), [USER_ID_2, USER_ID]) - @defer.inlineCallbacks def test_get_latest_event_ids_in_room(self): create = yield self.persist(type="m.room.create", key="", creator=USER_ID) From 97479d0c5442f3a644b356c5dbc920bf2ca2c925 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 1 Feb 2017 10:30:03 +0000 Subject: [PATCH 13/18] Implement /keys/changes --- synapse/handlers/device.py | 16 ++++++++++++ synapse/rest/client/v2_alpha/keys.py | 38 ++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 7245d14fab..4a28d95967 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -220,6 +220,22 @@ class DeviceHandler(BaseHandler): for host in hosts: self.federation_sender.send_device_messages(host) + @defer.inlineCallbacks + def get_user_ids_changed(self, user_id, from_device_key): + rooms = yield self.store.get_rooms_for_user(user_id) + room_ids = set(r.room_id for r in rooms) + + user_ids_changed = set() + changed = yield self.store.get_user_whose_devices_changed( + from_device_key + ) + for other_user_id in changed: + other_rooms = yield self.store.get_rooms_for_user(other_user_id) + if room_ids.intersection(e.room_id for e in other_rooms): + user_ids_changed.add(other_user_id) + + defer.returnValue(user_ids_changed) + @defer.inlineCallbacks def _incoming_device_list_update(self, origin, edu_content): user_id = edu_content["user_id"] diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py index 46789775b9..5080101f18 100644 --- a/synapse/rest/client/v2_alpha/keys.py +++ b/synapse/rest/client/v2_alpha/keys.py @@ -21,6 +21,8 @@ from synapse.api.errors import SynapseError from synapse.http.servlet import ( RestServlet, parse_json_object_from_request, parse_integer ) +from synapse.http.servlet import parse_string +from synapse.types import StreamToken from ._base import client_v2_patterns logger = logging.getLogger(__name__) @@ -149,6 +151,41 @@ class KeyQueryServlet(RestServlet): defer.returnValue((200, result)) +class KeyChangesServlet(RestServlet): + PATTERNS = client_v2_patterns( + "/keys/changes$", + releases=() + ) + + def __init__(self, hs): + """ + Args: + hs (synapse.server.HomeServer): + """ + super(KeyChangesServlet, self).__init__() + self.auth = hs.get_auth() + self.device_handler = hs.get_device_handler() + + @defer.inlineCallbacks + def on_GET(self, request): + requester = yield self.auth.get_user_by_req(request, allow_guest=True) + + from_token_string = parse_string(request, "from") + parse_string(request, "to") # We want to enforce they do pass us one. + + from_token = StreamToken.from_string(from_token_string) + + user_id = requester.user.to_string() + + changed = yield self.device_handler.get_user_ids_changed( + user_id, from_token.device_list_key, + ) + + defer.returnValue((200, { + "changed": changed + })) + + class OneTimeKeyServlet(RestServlet): """ POST /keys/claim HTTP/1.1 @@ -192,4 +229,5 @@ class OneTimeKeyServlet(RestServlet): def register_servlets(hs, http_server): KeyUploadServlet(hs).register(http_server) KeyQueryServlet(hs).register(http_server) + KeyChangesServlet(hs).register(http_server) OneTimeKeyServlet(hs).register(http_server) From acb501c46d75247329f49a1eef3baf6d8af0cba1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 1 Feb 2017 10:32:49 +0000 Subject: [PATCH 14/18] Comment --- synapse/rest/client/v2_alpha/keys.py | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py index 5080101f18..2e855e5e04 100644 --- a/synapse/rest/client/v2_alpha/keys.py +++ b/synapse/rest/client/v2_alpha/keys.py @@ -152,6 +152,14 @@ class KeyQueryServlet(RestServlet): class KeyChangesServlet(RestServlet): + """Returns the list of changes of keys between two stream tokens (may return + spurious results). + + GET /keys/changes?from=...&to=... + + 200 OK + { "changed": ["@foo:example.com"] } + """ PATTERNS = client_v2_patterns( "/keys/changes$", releases=() @@ -171,7 +179,10 @@ class KeyChangesServlet(RestServlet): requester = yield self.auth.get_user_by_req(request, allow_guest=True) from_token_string = parse_string(request, "from") - parse_string(request, "to") # We want to enforce they do pass us one. + + # We want to enforce they do pass us one, but we ignore it and return + # changes after the "to" as well as before. + parse_string(request, "to") from_token = StreamToken.from_string(from_token_string) From 5deaf9e30bcf37b765e80f08c242e74ca8ac93b3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 1 Feb 2017 10:39:41 +0000 Subject: [PATCH 15/18] Up get_latest_event_ids_in_room cache --- synapse/storage/event_federation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index f0aa2193fb..ee88c61954 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -129,7 +129,7 @@ class EventFederationStore(SQLBaseStore): room_id, ) - @cached() + @cached(max_entries=5000, iterable=True) def get_latest_event_ids_in_room(self, room_id): return self._simple_select_onecol( table="event_forward_extremities", From 368c88c4870f797cee7775acaa2caec2753b7f91 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 1 Feb 2017 10:42:37 +0000 Subject: [PATCH 16/18] Add a small cache get_all_new_events --- synapse/storage/events.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 6685b9da1c..332da25783 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -28,6 +28,7 @@ from synapse.util.metrics import Measure from synapse.api.constants import EventTypes from synapse.api.errors import SynapseError from synapse.state import resolve_events +from synapse.util.caches.descriptors import cached from canonicaljson import encode_canonical_json from collections import deque, namedtuple, OrderedDict @@ -1579,6 +1580,7 @@ class EventsStore(SQLBaseStore): """The current minimum token that backfilled events have reached""" return -self._backfill_id_gen.get_current_token() + @cached(num_args=5, max_entries=10) def get_all_new_events(self, last_backfill_id, last_forward_id, current_backfill_id, current_forward_id, limit): """Get all the new events that have arrived at the server either as From f6124311fd8893a306e7443cd725b1c25b007d39 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 1 Feb 2017 11:59:17 +0000 Subject: [PATCH 17/18] Add m.room.member type to query --- synapse/storage/roommember.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py index 10f7c7a4bc..3a99dc2349 100644 --- a/synapse/storage/roommember.py +++ b/synapse/storage/roommember.py @@ -266,7 +266,7 @@ class RoomMemberStore(SQLBaseStore): " ON m.event_id = c.event_id " " AND m.room_id = c.room_id " " AND m.user_id = c.state_key" - " WHERE %(where)s" + " WHERE c.type = 'm.room.member' AND %(where)s" ) % { "where": where_clause, } From 73d676dc8b38e8b16d35b9557480117a6c072ef7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 1 Feb 2017 13:17:17 +0000 Subject: [PATCH 18/18] Comment --- synapse/rest/client/v2_alpha/keys.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py index 2e855e5e04..4590efa6bf 100644 --- a/synapse/rest/client/v2_alpha/keys.py +++ b/synapse/rest/client/v2_alpha/keys.py @@ -153,7 +153,7 @@ class KeyQueryServlet(RestServlet): class KeyChangesServlet(RestServlet): """Returns the list of changes of keys between two stream tokens (may return - spurious results). + spurious extra results, since we currently ignore the `to` param). GET /keys/changes?from=...&to=...