Add SlidingSyncStreamToken

This commit is contained in:
Erik Johnston 2024-07-17 11:20:23 +01:00
parent 79924aebef
commit 30263b43c2

View file

@ -1137,6 +1137,43 @@ StreamToken.START = StreamToken(
)
@attr.s(slots=True, frozen=True, auto_attribs=True)
class SlidingSyncStreamToken:
"""The same as a `StreamToken`, but includes an extra field at the start for
the sliding sync connection token (separated by a '/'). This is used to
store per-connection state.
This then looks something like:
5/s2633508_17_338_6732159_1082514_541479_274711_265584_1_379
"""
stream_token: StreamToken
connection_token: int
@staticmethod
@cancellable
async def from_string(store: "DataStore", string: str) -> "SlidingSyncStreamToken":
"""Creates a SlidingSyncStreamToken from its textual representation."""
try:
connection_token_str, stream_token_str = string.split("/", 1)
connection_token = int(connection_token_str)
stream_token = await StreamToken.from_string(store, stream_token_str)
return SlidingSyncStreamToken(
stream_token=stream_token,
connection_token=connection_token,
)
except CancelledError:
raise
except Exception:
raise SynapseError(400, "Invalid stream token")
async def to_string(self, store: "DataStore") -> str:
"""Serializes the token to a string"""
stream_token_str = await self.stream_token.to_string(store)
return f"{self.connection_token}/{stream_token_str}"
@attr.s(slots=True, frozen=True, auto_attribs=True)
class PersistedPosition:
"""Position of a newly persisted row with instance that persisted it."""