From 1b7fa7b04a895b501072f16ca92c776840dcf090 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Sat, 18 May 2024 12:08:30 +0100 Subject: [PATCH] Add StreamToken.is_before_or_eq func --- synapse/notifier.py | 3 +++ synapse/types/__init__.py | 52 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 54 insertions(+), 1 deletion(-) diff --git a/synapse/notifier.py b/synapse/notifier.py index 7c1cd3b5f2..06ce04c800 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -763,6 +763,9 @@ class Notifier: return result + async def wait_for_stream_position(self, stream_token: StreamToken) -> None: + pass + async def _get_room_ids( self, user: UserID, explicit_room_id: Optional[str] ) -> Tuple[StrCollection, bool]: diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py index 509a2d3a0f..8ba35eef0d 100644 --- a/synapse/types/__init__.py +++ b/synapse/types/__init__.py @@ -48,7 +48,7 @@ import attr from immutabledict import immutabledict from signedjson.key import decode_verify_key_bytes from signedjson.types import VerifyKey -from typing_extensions import TypedDict +from typing_extensions import Self, TypedDict from unpaddedbase64 import decode_base64 from zope.interface import Interface @@ -515,6 +515,27 @@ class AbstractMultiWriterStreamToken(metaclass=abc.ABCMeta): # at `self.stream`. return self.instance_map.get(instance_name, self.stream) + def is_before_or_eq(self, other_token: Self) -> bool: + """Wether this token is before the other token, i.e. every constituent + part is before the other. + + Essentially it is `self <= other`. + + Note: if `self.is_before_or_eq(other_token) is False` then that does not + imply that the reverse is True. + """ + if self.stream > other_token.stream: + return False + + instances = self.instance_map.keys() | other_token.instance_map.keys() + for instance in instances: + if self.instance_map.get( + instance, self.stream + ) > other_token.instance_map.get(instance, other_token.stream): + return False + + return True + @attr.s(frozen=True, slots=True, order=False) class RoomStreamToken(AbstractMultiWriterStreamToken): @@ -1008,6 +1029,35 @@ class StreamToken: """Returns the stream ID for the given key.""" return getattr(self, key.value) + def is_before_or_eq(self, other_token: "StreamToken") -> bool: + """Wether this token is before the other token, i.e. every constituent + part is before the other. + + Essentially it is `self <= other`. + + Note: if `self.is_before_or_eq(other_token) is False` then that does not + imply that the reverse is True. + """ + + for _, key in StreamKeyType.__members__.items(): + self_value = self.get_field(key) + other_value = other_token.get_field(key) + + if isinstance(self_value, RoomStreamToken): + assert isinstance(other_value, RoomStreamToken) + if not self_value.is_before_or_eq(other_value): + return False + elif isinstance(self_value, MultiWriterStreamToken): + assert isinstance(other_value, MultiWriterStreamToken) + if not self_value.is_before_or_eq(other_value): + return False + else: + assert isinstance(other_value, int) + if self_value > other_value: + return False + + return True + StreamToken.START = StreamToken( RoomStreamToken(stream=0), 0, 0, MultiWriterStreamToken(stream=0), 0, 0, 0, 0, 0, 0