diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py index 3962ecc996..23ac1842f8 100644 --- a/synapse/types/__init__.py +++ b/synapse/types/__init__.py @@ -1137,6 +1137,43 @@ StreamToken.START = StreamToken( ) +@attr.s(slots=True, frozen=True, auto_attribs=True) +class SlidingSyncStreamToken: + """The same as a `StreamToken`, but includes an extra field at the start for + the sliding sync connection token (separated by a '/'). This is used to + store per-connection state. + + This then looks something like: + 5/s2633508_17_338_6732159_1082514_541479_274711_265584_1_379 + """ + + stream_token: StreamToken + connection_token: int + + @staticmethod + @cancellable + async def from_string(store: "DataStore", string: str) -> "SlidingSyncStreamToken": + """Creates a SlidingSyncStreamToken from its textual representation.""" + try: + connection_token_str, stream_token_str = string.split("/", 1) + connection_token = int(connection_token_str) + stream_token = await StreamToken.from_string(store, stream_token_str) + + return SlidingSyncStreamToken( + stream_token=stream_token, + connection_token=connection_token, + ) + except CancelledError: + raise + except Exception: + raise SynapseError(400, "Invalid stream token") + + async def to_string(self, store: "DataStore") -> str: + """Serializes the token to a string""" + stream_token_str = await self.stream_token.to_string(store) + return f"{self.connection_token}/{stream_token_str}" + + @attr.s(slots=True, frozen=True, auto_attribs=True) class PersistedPosition: """Position of a newly persisted row with instance that persisted it."""