mirror of
https://github.com/element-hq/synapse.git
synced 2024-11-24 10:35:46 +03:00
Sliding Sync: Retrieve fewer events from DB in sync (#17688)
When using timeline limit of 1 we end up fetching 2 events from the DB purely to tell if the response was "limited" or not. Lets not do that.
This commit is contained in:
parent
515c1cc0a1
commit
588e5b521d
9 changed files with 89 additions and 91 deletions
1
changelog.d/17688.misc
Normal file
1
changelog.d/17688.misc
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Speed up sync by pulling out fewer events from the database.
|
|
@ -200,6 +200,7 @@ class AdminHandler:
|
||||||
(
|
(
|
||||||
events,
|
events,
|
||||||
_,
|
_,
|
||||||
|
_,
|
||||||
) = await self._store.paginate_room_events_by_topological_ordering(
|
) = await self._store.paginate_room_events_by_topological_ordering(
|
||||||
room_id=room_id,
|
room_id=room_id,
|
||||||
from_key=from_key,
|
from_key=from_key,
|
||||||
|
|
|
@ -510,6 +510,7 @@ class PaginationHandler:
|
||||||
(
|
(
|
||||||
events,
|
events,
|
||||||
next_key,
|
next_key,
|
||||||
|
_,
|
||||||
) = await self.store.paginate_room_events_by_topological_ordering(
|
) = await self.store.paginate_room_events_by_topological_ordering(
|
||||||
room_id=room_id,
|
room_id=room_id,
|
||||||
from_key=from_token.room_key,
|
from_key=from_token.room_key,
|
||||||
|
@ -588,6 +589,7 @@ class PaginationHandler:
|
||||||
(
|
(
|
||||||
events,
|
events,
|
||||||
next_key,
|
next_key,
|
||||||
|
_,
|
||||||
) = await self.store.paginate_room_events_by_topological_ordering(
|
) = await self.store.paginate_room_events_by_topological_ordering(
|
||||||
room_id=room_id,
|
room_id=room_id,
|
||||||
from_key=from_token.room_key,
|
from_key=from_token.room_key,
|
||||||
|
|
|
@ -1753,7 +1753,7 @@ class RoomEventSource(EventSource[RoomStreamToken, EventBase]):
|
||||||
)
|
)
|
||||||
|
|
||||||
events = list(room_events)
|
events = list(room_events)
|
||||||
events.extend(e for evs, _ in room_to_events.values() for e in evs)
|
events.extend(e for evs, _, _ in room_to_events.values() for e in evs)
|
||||||
|
|
||||||
# We know stream_ordering must be not None here, as its been
|
# We know stream_ordering must be not None here, as its been
|
||||||
# persisted, but mypy doesn't know that
|
# persisted, but mypy doesn't know that
|
||||||
|
|
|
@ -47,7 +47,6 @@ from synapse.types import (
|
||||||
MutableStateMap,
|
MutableStateMap,
|
||||||
PersistedEventPosition,
|
PersistedEventPosition,
|
||||||
Requester,
|
Requester,
|
||||||
RoomStreamToken,
|
|
||||||
SlidingSyncStreamToken,
|
SlidingSyncStreamToken,
|
||||||
StateMap,
|
StateMap,
|
||||||
StreamKeyType,
|
StreamKeyType,
|
||||||
|
@ -632,7 +631,7 @@ class SlidingSyncHandler:
|
||||||
# Use `stream_ordering` for updates
|
# Use `stream_ordering` for updates
|
||||||
else paginate_room_events_by_stream_ordering
|
else paginate_room_events_by_stream_ordering
|
||||||
)
|
)
|
||||||
timeline_events, new_room_key = await pagination_method(
|
timeline_events, new_room_key, limited = await pagination_method(
|
||||||
room_id=room_id,
|
room_id=room_id,
|
||||||
# The bounds are reversed so we can paginate backwards
|
# The bounds are reversed so we can paginate backwards
|
||||||
# (from newer to older events) starting at to_bound.
|
# (from newer to older events) starting at to_bound.
|
||||||
|
@ -640,28 +639,13 @@ class SlidingSyncHandler:
|
||||||
from_key=to_bound,
|
from_key=to_bound,
|
||||||
to_key=timeline_from_bound,
|
to_key=timeline_from_bound,
|
||||||
direction=Direction.BACKWARDS,
|
direction=Direction.BACKWARDS,
|
||||||
# We add one so we can determine if there are enough events to saturate
|
limit=room_sync_config.timeline_limit,
|
||||||
# the limit or not (see `limited`)
|
|
||||||
limit=room_sync_config.timeline_limit + 1,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# We want to return the events in ascending order (the last event is the
|
# We want to return the events in ascending order (the last event is the
|
||||||
# most recent).
|
# most recent).
|
||||||
timeline_events.reverse()
|
timeline_events.reverse()
|
||||||
|
|
||||||
# Determine our `limited` status based on the timeline. We do this before
|
|
||||||
# filtering the events so we can accurately determine if there is more to
|
|
||||||
# paginate even if we filter out some/all events.
|
|
||||||
if len(timeline_events) > room_sync_config.timeline_limit:
|
|
||||||
limited = True
|
|
||||||
# Get rid of that extra "+ 1" event because we only used it to determine
|
|
||||||
# if we hit the limit or not
|
|
||||||
timeline_events = timeline_events[-room_sync_config.timeline_limit :]
|
|
||||||
assert timeline_events[0].internal_metadata.stream_ordering
|
|
||||||
new_room_key = RoomStreamToken(
|
|
||||||
stream=timeline_events[0].internal_metadata.stream_ordering - 1
|
|
||||||
)
|
|
||||||
|
|
||||||
# Make sure we don't expose any events that the client shouldn't see
|
# Make sure we don't expose any events that the client shouldn't see
|
||||||
timeline_events = await filter_events_for_client(
|
timeline_events = await filter_events_for_client(
|
||||||
self.storage_controllers,
|
self.storage_controllers,
|
||||||
|
|
|
@ -906,7 +906,7 @@ class SyncHandler:
|
||||||
# Use `stream_ordering` for updates
|
# Use `stream_ordering` for updates
|
||||||
else paginate_room_events_by_stream_ordering
|
else paginate_room_events_by_stream_ordering
|
||||||
)
|
)
|
||||||
events, end_key = await pagination_method(
|
events, end_key, limited = await pagination_method(
|
||||||
room_id=room_id,
|
room_id=room_id,
|
||||||
# The bounds are reversed so we can paginate backwards
|
# The bounds are reversed so we can paginate backwards
|
||||||
# (from newer to older events) starting at to_bound.
|
# (from newer to older events) starting at to_bound.
|
||||||
|
@ -914,9 +914,7 @@ class SyncHandler:
|
||||||
from_key=end_key,
|
from_key=end_key,
|
||||||
to_key=since_key,
|
to_key=since_key,
|
||||||
direction=Direction.BACKWARDS,
|
direction=Direction.BACKWARDS,
|
||||||
# We add one so we can determine if there are enough events to saturate
|
limit=load_limit,
|
||||||
# the limit or not (see `limited`)
|
|
||||||
limit=load_limit + 1,
|
|
||||||
)
|
)
|
||||||
# We want to return the events in ascending order (the last event is the
|
# We want to return the events in ascending order (the last event is the
|
||||||
# most recent).
|
# most recent).
|
||||||
|
@ -971,9 +969,6 @@ class SyncHandler:
|
||||||
loaded_recents.extend(recents)
|
loaded_recents.extend(recents)
|
||||||
recents = loaded_recents
|
recents = loaded_recents
|
||||||
|
|
||||||
if len(events) <= load_limit:
|
|
||||||
limited = False
|
|
||||||
break
|
|
||||||
max_repeat -= 1
|
max_repeat -= 1
|
||||||
|
|
||||||
if len(recents) > timeline_limit:
|
if len(recents) > timeline_limit:
|
||||||
|
@ -2608,7 +2603,7 @@ class SyncHandler:
|
||||||
|
|
||||||
newly_joined = room_id in newly_joined_rooms
|
newly_joined = room_id in newly_joined_rooms
|
||||||
if room_entry:
|
if room_entry:
|
||||||
events, start_key = room_entry
|
events, start_key, _ = room_entry
|
||||||
# We want to return the events in ascending order (the last event is the
|
# We want to return the events in ascending order (the last event is the
|
||||||
# most recent).
|
# most recent).
|
||||||
events.reverse()
|
events.reverse()
|
||||||
|
|
|
@ -108,7 +108,7 @@ class PaginateFunction(Protocol):
|
||||||
to_key: Optional[RoomStreamToken] = None,
|
to_key: Optional[RoomStreamToken] = None,
|
||||||
direction: Direction = Direction.BACKWARDS,
|
direction: Direction = Direction.BACKWARDS,
|
||||||
limit: int = 0,
|
limit: int = 0,
|
||||||
) -> Tuple[List[EventBase], RoomStreamToken]: ...
|
) -> Tuple[List[EventBase], RoomStreamToken, bool]: ...
|
||||||
|
|
||||||
|
|
||||||
# Used as return values for pagination APIs
|
# Used as return values for pagination APIs
|
||||||
|
@ -679,7 +679,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
to_key: Optional[RoomStreamToken] = None,
|
to_key: Optional[RoomStreamToken] = None,
|
||||||
direction: Direction = Direction.BACKWARDS,
|
direction: Direction = Direction.BACKWARDS,
|
||||||
limit: int = 0,
|
limit: int = 0,
|
||||||
) -> Dict[str, Tuple[List[EventBase], RoomStreamToken]]:
|
) -> Dict[str, Tuple[List[EventBase], RoomStreamToken, bool]]:
|
||||||
"""Get new room events in stream ordering since `from_key`.
|
"""Get new room events in stream ordering since `from_key`.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
|
@ -695,6 +695,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
A map from room id to a tuple containing:
|
A map from room id to a tuple containing:
|
||||||
- list of recent events in the room
|
- list of recent events in the room
|
||||||
- stream ordering key for the start of the chunk of events returned.
|
- stream ordering key for the start of the chunk of events returned.
|
||||||
|
- a boolean to indicate if there were more events but we hit the limit
|
||||||
|
|
||||||
When Direction.FORWARDS: from_key < x <= to_key, (ascending order)
|
When Direction.FORWARDS: from_key < x <= to_key, (ascending order)
|
||||||
When Direction.BACKWARDS: from_key >= x > to_key, (descending order)
|
When Direction.BACKWARDS: from_key >= x > to_key, (descending order)
|
||||||
|
@ -758,7 +759,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
to_key: Optional[RoomStreamToken] = None,
|
to_key: Optional[RoomStreamToken] = None,
|
||||||
direction: Direction = Direction.BACKWARDS,
|
direction: Direction = Direction.BACKWARDS,
|
||||||
limit: int = 0,
|
limit: int = 0,
|
||||||
) -> Tuple[List[EventBase], RoomStreamToken]:
|
) -> Tuple[List[EventBase], RoomStreamToken, bool]:
|
||||||
"""
|
"""
|
||||||
Paginate events by `stream_ordering` in the room from the `from_key` in the
|
Paginate events by `stream_ordering` in the room from the `from_key` in the
|
||||||
given `direction` to the `to_key` or `limit`.
|
given `direction` to the `to_key` or `limit`.
|
||||||
|
@ -773,8 +774,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
limit: Maximum number of events to return
|
limit: Maximum number of events to return
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
The results as a list of events and a token that points to the end
|
The results as a list of events, a token that points to the end of
|
||||||
of the result set. If no events are returned then the end of the
|
the result set, and a boolean to indicate if there were more events
|
||||||
|
but we hit the limit. If no events are returned then the end of the
|
||||||
stream has been reached (i.e. there are no events between `from_key`
|
stream has been reached (i.e. there are no events between `from_key`
|
||||||
and `to_key`).
|
and `to_key`).
|
||||||
|
|
||||||
|
@ -798,7 +800,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
and to_key.is_before_or_eq(from_key)
|
and to_key.is_before_or_eq(from_key)
|
||||||
):
|
):
|
||||||
# Token selection matches what we do below if there are no rows
|
# Token selection matches what we do below if there are no rows
|
||||||
return [], to_key if to_key else from_key
|
return [], to_key if to_key else from_key, False
|
||||||
# Or vice-versa, if we're looking backwards and our `from_key` is already before
|
# Or vice-versa, if we're looking backwards and our `from_key` is already before
|
||||||
# our `to_key`.
|
# our `to_key`.
|
||||||
elif (
|
elif (
|
||||||
|
@ -807,7 +809,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
and from_key.is_before_or_eq(to_key)
|
and from_key.is_before_or_eq(to_key)
|
||||||
):
|
):
|
||||||
# Token selection matches what we do below if there are no rows
|
# Token selection matches what we do below if there are no rows
|
||||||
return [], to_key if to_key else from_key
|
return [], to_key if to_key else from_key, False
|
||||||
|
|
||||||
# We can do a quick sanity check to see if any events have been sent in the room
|
# We can do a quick sanity check to see if any events have been sent in the room
|
||||||
# since the earlier token.
|
# since the earlier token.
|
||||||
|
@ -826,7 +828,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
|
|
||||||
if not has_changed:
|
if not has_changed:
|
||||||
# Token selection matches what we do below if there are no rows
|
# Token selection matches what we do below if there are no rows
|
||||||
return [], to_key if to_key else from_key
|
return [], to_key if to_key else from_key, False
|
||||||
|
|
||||||
order, from_bound, to_bound = generate_pagination_bounds(
|
order, from_bound, to_bound = generate_pagination_bounds(
|
||||||
direction, from_key, to_key
|
direction, from_key, to_key
|
||||||
|
@ -842,7 +844,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
engine=self.database_engine,
|
engine=self.database_engine,
|
||||||
)
|
)
|
||||||
|
|
||||||
def f(txn: LoggingTransaction) -> List[_EventDictReturn]:
|
def f(txn: LoggingTransaction) -> Tuple[List[_EventDictReturn], bool]:
|
||||||
sql = f"""
|
sql = f"""
|
||||||
SELECT event_id, instance_name, stream_ordering
|
SELECT event_id, instance_name, stream_ordering
|
||||||
FROM events
|
FROM events
|
||||||
|
@ -854,9 +856,13 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
"""
|
"""
|
||||||
txn.execute(sql, (room_id, 2 * limit))
|
txn.execute(sql, (room_id, 2 * limit))
|
||||||
|
|
||||||
|
# Get all the rows and check if we hit the limit.
|
||||||
|
fetched_rows = txn.fetchall()
|
||||||
|
limited = len(fetched_rows) >= 2 * limit
|
||||||
|
|
||||||
rows = [
|
rows = [
|
||||||
_EventDictReturn(event_id, None, stream_ordering)
|
_EventDictReturn(event_id, None, stream_ordering)
|
||||||
for event_id, instance_name, stream_ordering in txn
|
for event_id, instance_name, stream_ordering in fetched_rows
|
||||||
if _filter_results_by_stream(
|
if _filter_results_by_stream(
|
||||||
lower_token=(
|
lower_token=(
|
||||||
to_key if direction == Direction.BACKWARDS else from_key
|
to_key if direction == Direction.BACKWARDS else from_key
|
||||||
|
@ -867,10 +873,17 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
instance_name=instance_name,
|
instance_name=instance_name,
|
||||||
stream_ordering=stream_ordering,
|
stream_ordering=stream_ordering,
|
||||||
)
|
)
|
||||||
][:limit]
|
]
|
||||||
return rows
|
|
||||||
|
|
||||||
rows = await self.db_pool.runInteraction("get_room_events_stream_for_room", f)
|
if len(rows) > limit:
|
||||||
|
limited = True
|
||||||
|
|
||||||
|
rows = rows[:limit]
|
||||||
|
return rows, limited
|
||||||
|
|
||||||
|
rows, limited = await self.db_pool.runInteraction(
|
||||||
|
"get_room_events_stream_for_room", f
|
||||||
|
)
|
||||||
|
|
||||||
ret = await self.get_events_as_list(
|
ret = await self.get_events_as_list(
|
||||||
[r.event_id for r in rows], get_prev_content=True
|
[r.event_id for r in rows], get_prev_content=True
|
||||||
|
@ -887,7 +900,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
# `_paginate_room_events_by_topological_ordering_txn(...)`)
|
# `_paginate_room_events_by_topological_ordering_txn(...)`)
|
||||||
next_key = to_key if to_key else from_key
|
next_key = to_key if to_key else from_key
|
||||||
|
|
||||||
return ret, next_key
|
return ret, next_key, limited
|
||||||
|
|
||||||
@trace
|
@trace
|
||||||
async def get_current_state_delta_membership_changes_for_user(
|
async def get_current_state_delta_membership_changes_for_user(
|
||||||
|
@ -1191,7 +1204,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
if limit == 0:
|
if limit == 0:
|
||||||
return [], end_token
|
return [], end_token
|
||||||
|
|
||||||
rows, token = await self.db_pool.runInteraction(
|
rows, token, _ = await self.db_pool.runInteraction(
|
||||||
"get_recent_event_ids_for_room",
|
"get_recent_event_ids_for_room",
|
||||||
self._paginate_room_events_by_topological_ordering_txn,
|
self._paginate_room_events_by_topological_ordering_txn,
|
||||||
room_id,
|
room_id,
|
||||||
|
@ -1765,7 +1778,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
topological=topological_ordering, stream=stream_ordering
|
topological=topological_ordering, stream=stream_ordering
|
||||||
)
|
)
|
||||||
|
|
||||||
rows, start_token = self._paginate_room_events_by_topological_ordering_txn(
|
rows, start_token, _ = self._paginate_room_events_by_topological_ordering_txn(
|
||||||
txn,
|
txn,
|
||||||
room_id,
|
room_id,
|
||||||
before_token,
|
before_token,
|
||||||
|
@ -1775,7 +1788,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
)
|
)
|
||||||
events_before = [r.event_id for r in rows]
|
events_before = [r.event_id for r in rows]
|
||||||
|
|
||||||
rows, end_token = self._paginate_room_events_by_topological_ordering_txn(
|
rows, end_token, _ = self._paginate_room_events_by_topological_ordering_txn(
|
||||||
txn,
|
txn,
|
||||||
room_id,
|
room_id,
|
||||||
after_token,
|
after_token,
|
||||||
|
@ -1947,7 +1960,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
direction: Direction = Direction.BACKWARDS,
|
direction: Direction = Direction.BACKWARDS,
|
||||||
limit: int = 0,
|
limit: int = 0,
|
||||||
event_filter: Optional[Filter] = None,
|
event_filter: Optional[Filter] = None,
|
||||||
) -> Tuple[List[_EventDictReturn], RoomStreamToken]:
|
) -> Tuple[List[_EventDictReturn], RoomStreamToken, bool]:
|
||||||
"""Returns list of events before or after a given token.
|
"""Returns list of events before or after a given token.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
|
@ -1962,10 +1975,11 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
those that match the filter.
|
those that match the filter.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
A list of _EventDictReturn and a token that points to the end of the
|
A list of _EventDictReturn, a token that points to the end of the
|
||||||
result set. If no events are returned then the end of the stream has
|
result set, and a boolean to indicate if there were more events but
|
||||||
been reached (i.e. there are no events between `from_token` and
|
we hit the limit. If no events are returned then the end of the
|
||||||
`to_token`), or `limit` is zero.
|
stream has been reached (i.e. there are no events between
|
||||||
|
`from_token` and `to_token`), or `limit` is zero.
|
||||||
"""
|
"""
|
||||||
# We can bail early if we're looking forwards, and our `to_key` is already
|
# We can bail early if we're looking forwards, and our `to_key` is already
|
||||||
# before our `from_token`.
|
# before our `from_token`.
|
||||||
|
@ -1975,7 +1989,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
and to_token.is_before_or_eq(from_token)
|
and to_token.is_before_or_eq(from_token)
|
||||||
):
|
):
|
||||||
# Token selection matches what we do below if there are no rows
|
# Token selection matches what we do below if there are no rows
|
||||||
return [], to_token if to_token else from_token
|
return [], to_token if to_token else from_token, False
|
||||||
# Or vice-versa, if we're looking backwards and our `from_token` is already before
|
# Or vice-versa, if we're looking backwards and our `from_token` is already before
|
||||||
# our `to_token`.
|
# our `to_token`.
|
||||||
elif (
|
elif (
|
||||||
|
@ -1984,7 +1998,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
and from_token.is_before_or_eq(to_token)
|
and from_token.is_before_or_eq(to_token)
|
||||||
):
|
):
|
||||||
# Token selection matches what we do below if there are no rows
|
# Token selection matches what we do below if there are no rows
|
||||||
return [], to_token if to_token else from_token
|
return [], to_token if to_token else from_token, False
|
||||||
|
|
||||||
args: List[Any] = [room_id]
|
args: List[Any] = [room_id]
|
||||||
|
|
||||||
|
@ -2007,6 +2021,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
args.extend(filter_args)
|
args.extend(filter_args)
|
||||||
|
|
||||||
# We fetch more events as we'll filter the result set
|
# We fetch more events as we'll filter the result set
|
||||||
|
requested_limit = int(limit) * 2
|
||||||
args.append(int(limit) * 2)
|
args.append(int(limit) * 2)
|
||||||
|
|
||||||
select_keywords = "SELECT"
|
select_keywords = "SELECT"
|
||||||
|
@ -2071,10 +2086,14 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
}
|
}
|
||||||
txn.execute(sql, args)
|
txn.execute(sql, args)
|
||||||
|
|
||||||
|
# Get all the rows and check if we hit the limit.
|
||||||
|
fetched_rows = txn.fetchall()
|
||||||
|
limited = len(fetched_rows) >= requested_limit
|
||||||
|
|
||||||
# Filter the result set.
|
# Filter the result set.
|
||||||
rows = [
|
rows = [
|
||||||
_EventDictReturn(event_id, topological_ordering, stream_ordering)
|
_EventDictReturn(event_id, topological_ordering, stream_ordering)
|
||||||
for event_id, instance_name, topological_ordering, stream_ordering in txn
|
for event_id, instance_name, topological_ordering, stream_ordering in fetched_rows
|
||||||
if _filter_results(
|
if _filter_results(
|
||||||
lower_token=(
|
lower_token=(
|
||||||
to_token if direction == Direction.BACKWARDS else from_token
|
to_token if direction == Direction.BACKWARDS else from_token
|
||||||
|
@ -2086,7 +2105,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
topological_ordering=topological_ordering,
|
topological_ordering=topological_ordering,
|
||||||
stream_ordering=stream_ordering,
|
stream_ordering=stream_ordering,
|
||||||
)
|
)
|
||||||
][:limit]
|
]
|
||||||
|
|
||||||
|
if len(rows) > limit:
|
||||||
|
limited = True
|
||||||
|
|
||||||
|
rows = rows[:limit]
|
||||||
|
|
||||||
if rows:
|
if rows:
|
||||||
assert rows[-1].topological_ordering is not None
|
assert rows[-1].topological_ordering is not None
|
||||||
|
@ -2097,7 +2121,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
# TODO (erikj): We should work out what to do here instead.
|
# TODO (erikj): We should work out what to do here instead.
|
||||||
next_token = to_token if to_token else from_token
|
next_token = to_token if to_token else from_token
|
||||||
|
|
||||||
return rows, next_token
|
return rows, next_token, limited
|
||||||
|
|
||||||
@trace
|
@trace
|
||||||
@tag_args
|
@tag_args
|
||||||
|
@ -2110,7 +2134,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
direction: Direction = Direction.BACKWARDS,
|
direction: Direction = Direction.BACKWARDS,
|
||||||
limit: int = 0,
|
limit: int = 0,
|
||||||
event_filter: Optional[Filter] = None,
|
event_filter: Optional[Filter] = None,
|
||||||
) -> Tuple[List[EventBase], RoomStreamToken]:
|
) -> Tuple[List[EventBase], RoomStreamToken, bool]:
|
||||||
"""
|
"""
|
||||||
Paginate events by `topological_ordering` (tie-break with `stream_ordering`) in
|
Paginate events by `topological_ordering` (tie-break with `stream_ordering`) in
|
||||||
the room from the `from_key` in the given `direction` to the `to_key` or
|
the room from the `from_key` in the given `direction` to the `to_key` or
|
||||||
|
@ -2127,8 +2151,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
event_filter: If provided filters the events to those that match the filter.
|
event_filter: If provided filters the events to those that match the filter.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
The results as a list of events and a token that points to the end
|
The results as a list of events, a token that points to the end of
|
||||||
of the result set. If no events are returned then the end of the
|
the result set, and a boolean to indicate if there were more events
|
||||||
|
but we hit the limit. If no events are returned then the end of the
|
||||||
stream has been reached (i.e. there are no events between `from_key`
|
stream has been reached (i.e. there are no events between `from_key`
|
||||||
and `to_key`).
|
and `to_key`).
|
||||||
|
|
||||||
|
@ -2152,7 +2177,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
):
|
):
|
||||||
# Token selection matches what we do in `_paginate_room_events_txn` if there
|
# Token selection matches what we do in `_paginate_room_events_txn` if there
|
||||||
# are no rows
|
# are no rows
|
||||||
return [], to_key if to_key else from_key
|
return [], to_key if to_key else from_key, False
|
||||||
# Or vice-versa, if we're looking backwards and our `from_key` is already before
|
# Or vice-versa, if we're looking backwards and our `from_key` is already before
|
||||||
# our `to_key`.
|
# our `to_key`.
|
||||||
elif (
|
elif (
|
||||||
|
@ -2162,9 +2187,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
):
|
):
|
||||||
# Token selection matches what we do in `_paginate_room_events_txn` if there
|
# Token selection matches what we do in `_paginate_room_events_txn` if there
|
||||||
# are no rows
|
# are no rows
|
||||||
return [], to_key if to_key else from_key
|
return [], to_key if to_key else from_key, False
|
||||||
|
|
||||||
rows, token = await self.db_pool.runInteraction(
|
rows, token, limited = await self.db_pool.runInteraction(
|
||||||
"paginate_room_events_by_topological_ordering",
|
"paginate_room_events_by_topological_ordering",
|
||||||
self._paginate_room_events_by_topological_ordering_txn,
|
self._paginate_room_events_by_topological_ordering_txn,
|
||||||
room_id,
|
room_id,
|
||||||
|
@ -2179,7 +2204,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
[r.event_id for r in rows], get_prev_content=True
|
[r.event_id for r in rows], get_prev_content=True
|
||||||
)
|
)
|
||||||
|
|
||||||
return events, token
|
return events, token, limited
|
||||||
|
|
||||||
@cached()
|
@cached()
|
||||||
async def get_id_for_instance(self, instance_name: str) -> int:
|
async def get_id_for_instance(self, instance_name: str) -> int:
|
||||||
|
|
|
@ -22,7 +22,7 @@ import synapse.rest.admin
|
||||||
from synapse.api.constants import EventTypes
|
from synapse.api.constants import EventTypes
|
||||||
from synapse.rest.client import login, room, sync
|
from synapse.rest.client import login, room, sync
|
||||||
from synapse.server import HomeServer
|
from synapse.server import HomeServer
|
||||||
from synapse.types import StreamToken, StrSequence
|
from synapse.types import StrSequence
|
||||||
from synapse.util import Clock
|
from synapse.util import Clock
|
||||||
|
|
||||||
from tests.rest.client.sliding_sync.test_sliding_sync import SlidingSyncBase
|
from tests.rest.client.sliding_sync.test_sliding_sync import SlidingSyncBase
|
||||||
|
@ -149,16 +149,10 @@ class SlidingSyncRoomsTimelineTestCase(SlidingSyncBase):
|
||||||
user2_tok = self.login(user2_id, "pass")
|
user2_tok = self.login(user2_id, "pass")
|
||||||
|
|
||||||
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
|
room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
|
||||||
self.helper.send(room_id1, "activity1", tok=user2_tok)
|
event_response1 = self.helper.send(room_id1, "activity1", tok=user2_tok)
|
||||||
self.helper.send(room_id1, "activity2", tok=user2_tok)
|
event_response2 = self.helper.send(room_id1, "activity2", tok=user2_tok)
|
||||||
event_response3 = self.helper.send(room_id1, "activity3", tok=user2_tok)
|
event_response3 = self.helper.send(room_id1, "activity3", tok=user2_tok)
|
||||||
event_pos3 = self.get_success(
|
|
||||||
self.store.get_position_for_event(event_response3["event_id"])
|
|
||||||
)
|
|
||||||
event_response4 = self.helper.send(room_id1, "activity4", tok=user2_tok)
|
event_response4 = self.helper.send(room_id1, "activity4", tok=user2_tok)
|
||||||
event_pos4 = self.get_success(
|
|
||||||
self.store.get_position_for_event(event_response4["event_id"])
|
|
||||||
)
|
|
||||||
event_response5 = self.helper.send(room_id1, "activity5", tok=user2_tok)
|
event_response5 = self.helper.send(room_id1, "activity5", tok=user2_tok)
|
||||||
user1_join_response = self.helper.join(room_id1, user1_id, tok=user1_tok)
|
user1_join_response = self.helper.join(room_id1, user1_id, tok=user1_tok)
|
||||||
|
|
||||||
|
@ -196,27 +190,23 @@ class SlidingSyncRoomsTimelineTestCase(SlidingSyncBase):
|
||||||
)
|
)
|
||||||
|
|
||||||
# Check to make sure the `prev_batch` points at the right place
|
# Check to make sure the `prev_batch` points at the right place
|
||||||
prev_batch_token = self.get_success(
|
prev_batch_token = response_body["rooms"][room_id1]["prev_batch"]
|
||||||
StreamToken.from_string(
|
|
||||||
self.store, response_body["rooms"][room_id1]["prev_batch"]
|
# If we use the `prev_batch` token to look backwards we should see
|
||||||
)
|
# `event3` and older next.
|
||||||
|
channel = self.make_request(
|
||||||
|
"GET",
|
||||||
|
f"/rooms/{room_id1}/messages?from={prev_batch_token}&dir=b&limit=3",
|
||||||
|
access_token=user1_tok,
|
||||||
)
|
)
|
||||||
prev_batch_room_stream_token_serialized = self.get_success(
|
self.assertEqual(channel.code, 200, channel.json_body)
|
||||||
prev_batch_token.room_key.to_string(self.store)
|
self.assertListEqual(
|
||||||
)
|
[
|
||||||
# If we use the `prev_batch` token to look backwards, we should see `event3`
|
event_response3["event_id"],
|
||||||
# next so make sure the token encompasses it
|
event_response2["event_id"],
|
||||||
self.assertEqual(
|
event_response1["event_id"],
|
||||||
event_pos3.persisted_after(prev_batch_token.room_key),
|
],
|
||||||
False,
|
[ev["event_id"] for ev in channel.json_body["chunk"]],
|
||||||
f"`prev_batch` token {prev_batch_room_stream_token_serialized} should be >= event_pos3={self.get_success(event_pos3.to_room_stream_token().to_string(self.store))}",
|
|
||||||
)
|
|
||||||
# If we use the `prev_batch` token to look backwards, we shouldn't see `event4`
|
|
||||||
# anymore since it was just returned in this response.
|
|
||||||
self.assertEqual(
|
|
||||||
event_pos4.persisted_after(prev_batch_token.room_key),
|
|
||||||
True,
|
|
||||||
f"`prev_batch` token {prev_batch_room_stream_token_serialized} should be < event_pos4={self.get_success(event_pos4.to_room_stream_token().to_string(self.store))}",
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# With no `from_token` (initial sync), it's all historical since there is no
|
# With no `from_token` (initial sync), it's all historical since there is no
|
||||||
|
|
|
@ -147,7 +147,7 @@ class PaginationTestCase(HomeserverTestCase):
|
||||||
def _filter_messages(self, filter: JsonDict) -> List[str]:
|
def _filter_messages(self, filter: JsonDict) -> List[str]:
|
||||||
"""Make a request to /messages with a filter, returns the chunk of events."""
|
"""Make a request to /messages with a filter, returns the chunk of events."""
|
||||||
|
|
||||||
events, next_key = self.get_success(
|
events, next_key, _ = self.get_success(
|
||||||
self.hs.get_datastores().main.paginate_room_events_by_topological_ordering(
|
self.hs.get_datastores().main.paginate_room_events_by_topological_ordering(
|
||||||
room_id=self.room_id,
|
room_id=self.room_id,
|
||||||
from_key=self.from_token.room_key,
|
from_key=self.from_token.room_key,
|
||||||
|
|
Loading…
Reference in a new issue