WIP/PoC of storing whether we have sent rooms down to clients

This commit is contained in:
Erik Johnston 2024-07-15 14:12:01 +01:00
parent f3a4cfb8b4
commit d44f7e12b1

View file

@ -18,6 +18,7 @@
# #
# #
import logging import logging
from enum import Enum
from itertools import chain from itertools import chain
from typing import TYPE_CHECKING, Any, Dict, Final, List, Mapping, Optional, Set, Tuple from typing import TYPE_CHECKING, Any, Dict, Final, List, Mapping, Optional, Set, Tuple
@ -38,6 +39,7 @@ from synapse.types import (
RoomStreamToken, RoomStreamToken,
SlidingSyncStreamToken, SlidingSyncStreamToken,
StateMap, StateMap,
StrCollection,
StreamKeyType, StreamKeyType,
StreamToken, StreamToken,
UserID, UserID,
@ -1861,3 +1863,198 @@ class SlidingSyncHandler:
next_batch=f"{stream_id}", next_batch=f"{stream_id}",
events=messages, events=messages,
) )
class HaveSentRoomFlag(Enum):
"""Flag for whether we have sent the room down a sliding sync connection.
The valid state changes here are:
NEVER -> LIVE
LIVE -> PREVIOUSLY
PREVIOUSLY -> LIVE
"""
# The room has never been sent down (or we have forgotten we have sent it
# down).
NEVER = 1
# We have previously sent the room down, but there are updates that we
# haven't sent down.
PREVIOUSLY = 2
# We have sent the room down and the client has received all updates.
LIVE = 3
@attr.s(auto_attribs=True, slots=True, frozen=True)
class HaveSentRoom:
"""Whether we have sent the room down a sliding sync connection.
Attributes:
status: Flag of if we have or haven't sent down the room
last_token: If the flag is `PREVIOUSLY` then this is non-null and
contains the last stream token of the last updates we sent down
the room, i.e. we still need to send everything since then to the
client.
"""
status: HaveSentRoomFlag
last_token: Optional[RoomStreamToken]
@staticmethod
def previously(last_token: RoomStreamToken) -> "HaveSentRoom":
"""Constructor for `PREVIOUSLY` flag."""
return HaveSentRoom(HaveSentRoomFlag.PREVIOUSLY, last_token)
HAVE_SENT_ROOM_NEVER = HaveSentRoom(HaveSentRoomFlag.NEVER, None)
HAVE_SENT_ROOM_LIVE = HaveSentRoom(HaveSentRoomFlag.LIVE, None)
@attr.s(auto_attribs=True)
class SlidingSyncConnectionStore:
"""In-memory store of per-connection state, including what rooms we have
previously sent down a sliding sync connection.
Note: This is NOT safe to run in a worker setup.
The complication here is that we need to handle requests being resent, i.e.
if we sent down a room in a response that the client received, we must
consider the room *not* sent when we get the request again.
This is handled by using an integer "token", which is returned to the client
as part of the sync token. For each connection we store a mapping from
tokens to the room states, and create a new entry when we send down new
rooms.
Note that for any given sliding sync connection we will only store a maximum
of two different tokens: the previous token from the request and a new token
sent in the response. When we receive a request with a given token, we then
clear out all other entries with a different token.
Attributes:
_connections: Mapping from `(user_id, conn_id)` to mapping of `token`
to mapping of room ID to `HaveSentRoom`.
"""
# `(user_id, conn_id)` -> `token` -> `room_id` -> `HaveSentRoom`
_connections: Dict[Tuple[str, str], Dict[int, Dict[str, HaveSentRoom]]] = (
attr.Factory(dict)
)
async def have_sent_room(
self, user_id: str, conn_id: str, connection_token: int, room_id: str
) -> HaveSentRoom:
"""Whether for the given user_id/conn_id/token, return whether we have
previously sent the room down
"""
sync_statuses = self._connections.setdefault((user_id, conn_id), {})
room_status = sync_statuses.get(connection_token, {}).get(
room_id, HAVE_SENT_ROOM_NEVER
)
return room_status
async def record_rooms(
self,
user_id: str,
conn_id: str,
from_token: Optional[SlidingSyncStreamToken],
*,
sent_room_ids: StrCollection,
unsent_room_ids: StrCollection,
) -> int:
"""Record which rooms we have/haven't sent down in a new response
Attributes:
user_id
conn_id
from_token: The since token from the request, if any
sent_room_ids: The set of room IDs that we have sent down as
part of this request (only needs to be ones we didn't
previously sent down).
unsent_room_ids: The set of room IDs that have had updates
since the `last_room_token`, but which were not included in
this request
"""
prev_connection_token = 0
if from_token is not None:
prev_connection_token = from_token.connection_token
# If there are no changes then this is a noop.
if not sent_room_ids and not unsent_room_ids:
return prev_connection_token
sync_statuses = self._connections.setdefault((user_id, conn_id), {})
# Generate a new token, removing any existing entries in that token
# (which can happen if requests get resent).
new_store_token = prev_connection_token + 1
sync_statuses.pop(new_store_token, None)
# Copy over and update the room mappings.
new_room_statuses = dict(sync_statuses.get(prev_connection_token, {}))
# Whether we have updated the `new_room_statuses`, if we don't by the
# end we can treat this as a noop.
have_updated = False
for room_id in sent_room_ids:
new_room_statuses[room_id] = HAVE_SENT_ROOM_LIVE
have_updated = True
# Whether we add/update the entries for unsent rooms depends on the
# existing entry:
# - LIVE: We have previously sent down everything up to
# `last_room_token, so we update the entry to be `PREVIOUSLY` with
# `last_room_token`.
# - PREVIOUSLY: We have previously sent down everything up to *a*
# given token, so we don't need to update the entry.
# - NEVER: We have never previously sent down the room, and we haven't
# sent anything down this time either so we leave it as NEVER.
# Work out the new state for unsent rooms that were `LIVE`.
if from_token:
new_unsent_state = HaveSentRoom.previously(from_token.stream_token.room_key)
else:
new_unsent_state = HAVE_SENT_ROOM_NEVER
for room_id in unsent_room_ids:
prev_state = new_room_statuses.get(room_id)
if prev_state is not None and prev_state.status == HaveSentRoomFlag.LIVE:
new_room_statuses[room_id] = new_unsent_state
have_updated = True
if not have_updated:
return prev_connection_token
sync_statuses[new_store_token] = new_room_statuses
return new_store_token
async def mark_token_seen(
self,
user_id: str,
conn_id: str,
from_token: Optional[SlidingSyncStreamToken],
) -> None:
"""We have received a request with the given token, so we can clear out
any other tokens associated with the connection.
If there is no from token then we have started afresh, and so we delete
all tokens associated with the device.
"""
# Clear out any tokens for the connection that doesn't match the one
# from the request.
sync_statuses = self._connections.pop((user_id, conn_id), {})
if from_token is None:
return
sync_statuses = {
i: room_statuses
for i, room_statuses in sync_statuses.items()
if i == from_token.connection_token
}
if sync_statuses:
self._connections[(user_id, conn_id)] = sync_statuses