Fix events from rooms we're not joined to affecting the joined room stream ordering

This commit is contained in:
Eric Eastwood 2024-08-12 19:40:53 -05:00
parent 53232e6df5
commit ab074f5335
2 changed files with 305 additions and 69 deletions

View file

@ -609,6 +609,10 @@ class PersistEventsStore:
txn, room_id, state_delta_for_room, min_stream_order
)
self._update_sliding_sync_tables_with_new_persisted_events_txn(
txn, events_and_contexts
)
def _persist_event_auth_chain_txn(
self,
txn: LoggingTransaction,
@ -1631,6 +1635,84 @@ class PersistEventsStore:
txn, {m for m in members_to_cache_bust if not self.hs.is_mine_id(m)}
)
def _update_sliding_sync_tables_with_new_persisted_events_txn(
self,
txn: LoggingTransaction,
events_and_contexts: List[Tuple[EventBase, EventContext]],
) -> None:
"""
Update the latest `event_stream_ordering`/`bump_stamp` columns in the
`sliding_sync_joined_rooms` table for the room with new events.
This function assumes that `_store_event_txn()` (to persist the event) and
`_update_current_state_txn(...)` (so that `sliding_sync_joined_rooms` table has
been updated with rooms that were joined) have already been run.
Args:
txn
events_and_contexts: The events being persisted
"""
# Handle updating `sliding_sync_joined_rooms`
room_id_to_stream_ordering_map: Dict[str, int] = {}
room_id_to_bump_stamp_map: Dict[str, int] = {}
for event, _ in events_and_contexts:
existing_stream_ordering = room_id_to_stream_ordering_map.get(event.room_id)
# This should exist for persisted events
assert event.internal_metadata.stream_ordering is not None
# Ignore backfilled events which will have a negative stream ordering
if event.internal_metadata.stream_ordering < 0:
continue
if (
existing_stream_ordering is None
or existing_stream_ordering < event.internal_metadata.stream_ordering
):
room_id_to_stream_ordering_map[event.room_id] = (
event.internal_metadata.stream_ordering
)
if event.type in SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES:
existing_bump_stamp = room_id_to_bump_stamp_map.get(event.room_id)
# This should exist at this point because we're inserting events here which require it
assert event.internal_metadata.stream_ordering is not None
if (
existing_bump_stamp is None
or existing_bump_stamp < event.internal_metadata.stream_ordering
):
room_id_to_bump_stamp_map[event.room_id] = (
event.internal_metadata.stream_ordering
)
txn.execute_batch(
"""
UPDATE sliding_sync_joined_rooms
SET
event_stream_ordering = CASE
WHEN event_stream_ordering IS NULL OR event_stream_ordering < ?
THEN ?
ELSE event_stream_ordering
END,
bump_stamp = CASE
WHEN bump_stamp IS NULL OR bump_stamp < ?
THEN ?
ELSE bump_stamp
END
WHERE room_id = ?
""",
[
[
room_id_to_stream_ordering_map[room_id],
room_id_to_stream_ordering_map[room_id],
room_id_to_bump_stamp_map.get(room_id),
room_id_to_bump_stamp_map.get(room_id),
room_id,
]
for room_id in room_id_to_stream_ordering_map.keys()
],
)
def _upsert_room_version_txn(self, txn: LoggingTransaction, room_id: str) -> None:
"""Update the room version in the database based off current state
events.
@ -1976,67 +2058,6 @@ class PersistEventsStore:
],
)
# Handle updating `sliding_sync_joined_rooms`
room_id_to_stream_ordering_map: Dict[str, int] = {}
room_id_to_bump_stamp_map: Dict[str, int] = {}
for event, _ in events_and_contexts:
existing_stream_ordering = room_id_to_stream_ordering_map.get(event.room_id)
# This should exist at this point because we're inserting events here which require it
assert event.internal_metadata.stream_ordering is not None
if (
existing_stream_ordering is None
or existing_stream_ordering < event.internal_metadata.stream_ordering
):
room_id_to_stream_ordering_map[event.room_id] = (
event.internal_metadata.stream_ordering
)
if event.type in SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES:
existing_bump_stamp = room_id_to_bump_stamp_map.get(event.room_id)
# This should exist at this point because we're inserting events here which require it
assert event.internal_metadata.stream_ordering is not None
if (
existing_bump_stamp is None
or existing_bump_stamp < event.internal_metadata.stream_ordering
):
room_id_to_bump_stamp_map[event.room_id] = (
event.internal_metadata.stream_ordering
)
# This function (`_store_event_txn(...)`) is run before
# `_update_current_state_txn(...)` which handles deleting the rows if we are no
# longer in the room so we don't need to worry about inserting something that
# will be orphaned.
txn.execute_batch(
"""
INSERT INTO sliding_sync_joined_rooms
(room_id, event_stream_ordering, bump_stamp)
VALUES (
?, ?, ?
)
ON CONFLICT (room_id)
DO UPDATE SET
event_stream_ordering = CASE
WHEN event_stream_ordering IS NULL OR event_stream_ordering < EXCLUDED.event_stream_ordering
THEN EXCLUDED.event_stream_ordering
ELSE event_stream_ordering
END,
bump_stamp = CASE
WHEN bump_stamp IS NULL OR bump_stamp < EXCLUDED.bump_stamp
THEN EXCLUDED.bump_stamp
ELSE bump_stamp
END
""",
[
[
room_id,
room_id_to_stream_ordering_map[room_id],
room_id_to_bump_stamp_map.get(room_id),
]
for room_id in room_id_to_stream_ordering_map.keys()
],
)
def _store_rejected_events_txn(
self,
txn: LoggingTransaction,
@ -2385,6 +2406,8 @@ class PersistEventsStore:
)
# Sanity check that we at-least have the create event
if create_stripped_event is not None:
insert_values["has_known_state"] = True
# Find the room_type
insert_values["room_type"] = (
create_stripped_event.content.get(

View file

@ -28,7 +28,8 @@ from twisted.test.proto_helpers import MemoryReactor
from synapse.api.constants import EventContentFields, EventTypes, Membership, RoomTypes
from synapse.api.room_versions import RoomVersions
from synapse.events import EventBase
from synapse.events import EventBase, StrippedStateEvent, make_event_from_dict
from synapse.events.snapshot import EventContext
from synapse.federation.federation_base import event_from_pdu_json
from synapse.rest import admin
from synapse.rest.client import login, room
@ -500,7 +501,7 @@ class _SlidingSyncJoinedRoomResult:
bump_stamp: Optional[int]
room_type: Optional[str]
room_name: Optional[str]
is_encrypted: Optional[bool]
is_encrypted: bool
@attr.s(slots=True, frozen=True, auto_attribs=True)
@ -515,9 +516,10 @@ class _SlidingSyncMembershipSnapshotResult:
# exists for persisted events but in the context of these tests, we're only working
# with persisted events and we're making comparisons so we will find any mismatch.
event_stream_ordering: Optional[int]
has_known_state: bool
room_type: Optional[str]
room_name: Optional[str]
is_encrypted: Optional[bool]
is_encrypted: bool
class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase):
@ -569,7 +571,7 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase):
bump_stamp=row[2],
room_type=row[3],
room_name=row[4],
is_encrypted=row[5],
is_encrypted=bool(row[5]),
)
for row in rows
}
@ -584,7 +586,7 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase):
Mapping from the (room_id, user_id) to _SlidingSyncMembershipSnapshotResult.
"""
rows = cast(
List[Tuple[str, str, str, str, int, str, str, bool]],
List[Tuple[str, str, str, str, int, bool, str, str, bool]],
self.get_success(
self.store.db_pool.simple_select_list(
"sliding_sync_membership_snapshots",
@ -595,6 +597,7 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase):
"membership_event_id",
"membership",
"event_stream_ordering",
"has_known_state",
"room_type",
"room_name",
"is_encrypted",
@ -610,13 +613,88 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase):
membership_event_id=row[2],
membership=row[3],
event_stream_ordering=row[4],
room_type=row[5],
room_name=row[6],
is_encrypted=row[7],
has_known_state=bool(row[5]),
room_type=row[6],
room_name=row[7],
is_encrypted=bool(row[8]),
)
for row in rows
}
_remote_invite_count: int = 0
def _create_remote_invite_room_for_user(
self,
invitee_user_id: str,
unsigned_invite_room_state: Optional[List[StrippedStateEvent]],
) -> Tuple[str, EventBase]:
"""
Create a fake invite for a remote room and persist it.
We don't have any state for these kind of rooms and can only rely on the
stripped state included in the unsigned portion of the invite event to identify
the room.
Args:
invitee_user_id: The person being invited
unsigned_invite_room_state: List of stripped state events to assist the
receiver in identifying the room.
Returns:
The room ID of the remote invite room and the persisted remote invite event.
"""
invite_room_id = f"!test_room{self._remote_invite_count}:remote_server"
invite_event_dict = {
"room_id": invite_room_id,
"sender": "@inviter:remote_server",
"state_key": invitee_user_id,
"depth": 1,
"origin_server_ts": 1,
"type": EventTypes.Member,
"content": {"membership": Membership.INVITE},
"auth_events": [],
"prev_events": [],
}
if unsigned_invite_room_state is not None:
serialized_stripped_state_events = []
for stripped_event in unsigned_invite_room_state:
serialized_stripped_state_events.append(
{
"type": stripped_event.type,
"state_key": stripped_event.state_key,
"sender": stripped_event.sender,
"content": stripped_event.content,
}
)
invite_event_dict["unsigned"] = {
"invite_room_state": serialized_stripped_state_events
}
invite_event = make_event_from_dict(
invite_event_dict,
room_version=RoomVersions.V10,
)
invite_event.internal_metadata.outlier = True
invite_event.internal_metadata.out_of_band_membership = True
self.get_success(
self.store.maybe_store_room_on_outlier_membership(
room_id=invite_room_id, room_version=invite_event.room_version
)
)
context = EventContext.for_outlier(self.hs.get_storage_controllers())
persist_controller = self.hs.get_storage_controllers().persistence
assert persist_controller is not None
persisted_event, _, _ = self.get_success(
persist_controller.persist_event(invite_event, context)
)
self._remote_invite_count += 1
return invite_room_id, persisted_event
def test_joined_room_with_no_info(self) -> None:
"""
Test joined room that doesn't have a room type, encryption, or name shows up in
@ -675,6 +753,7 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase):
event_stream_ordering=state_map[
(EventTypes.Member, user1_id)
].internal_metadata.stream_ordering,
has_known_state=True,
room_type=None,
room_name=None,
is_encrypted=False,
@ -758,6 +837,7 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase):
event_stream_ordering=state_map[
(EventTypes.Member, user1_id)
].internal_metadata.stream_ordering,
has_known_state=True,
room_type=None,
room_name="my super duper room",
is_encrypted=True,
@ -774,6 +854,7 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase):
event_stream_ordering=state_map[
(EventTypes.Member, user2_id)
].internal_metadata.stream_ordering,
has_known_state=True,
room_type=None,
# Even though this room does have a name and is encrypted, user2 is the
# room creator and joined at the room creation time which didn't have
@ -859,6 +940,7 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase):
event_stream_ordering=state_map[
(EventTypes.Member, user1_id)
].internal_metadata.stream_ordering,
has_known_state=True,
room_type=RoomTypes.SPACE,
room_name="my super duper space",
is_encrypted=False,
@ -875,6 +957,7 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase):
event_stream_ordering=state_map[
(EventTypes.Member, user2_id)
].internal_metadata.stream_ordering,
has_known_state=True,
room_type=RoomTypes.SPACE,
# Even though this room does have a name, user2 is the room creator and
# joined at the room creation time which didn't have this state set yet.
@ -1005,6 +1088,7 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase):
event_stream_ordering=state_map[
(EventTypes.Member, user1_id)
].internal_metadata.stream_ordering,
has_known_state=True,
room_type=None,
room_name="my super duper room",
is_encrypted=False,
@ -1021,6 +1105,7 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase):
event_stream_ordering=state_map[
(EventTypes.Member, user2_id)
].internal_metadata.stream_ordering,
has_known_state=True,
room_type=None,
room_name=None,
is_encrypted=False,
@ -1096,6 +1181,7 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase):
event_stream_ordering=state_map[
(EventTypes.Member, user1_id)
].internal_metadata.stream_ordering,
has_known_state=True,
room_type=None,
room_name="my super duper room",
is_encrypted=False,
@ -1113,6 +1199,7 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase):
event_stream_ordering=state_map[
(EventTypes.Member, user2_id)
].internal_metadata.stream_ordering,
has_known_state=True,
room_type=None,
room_name=None,
is_encrypted=False,
@ -1269,6 +1356,7 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase):
membership_event_id=user1_invited_response["event_id"],
membership=Membership.INVITE,
event_stream_ordering=user1_invited_event_pos.stream,
has_known_state=True,
room_type=RoomTypes.SPACE,
room_name="my super duper space",
is_encrypted=True,
@ -1285,6 +1373,7 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase):
event_stream_ordering=state_map[
(EventTypes.Member, user2_id)
].internal_metadata.stream_ordering,
has_known_state=True,
room_type=RoomTypes.SPACE,
room_name=None,
is_encrypted=False,
@ -1370,6 +1459,7 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase):
membership_event_id=user1_invited_response["event_id"],
membership=Membership.INVITE,
event_stream_ordering=user1_invited_event_pos.stream,
has_known_state=True,
room_type=None,
room_name=None,
is_encrypted=False,
@ -1386,6 +1476,7 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase):
event_stream_ordering=state_map[
(EventTypes.Member, user2_id)
].internal_metadata.stream_ordering,
has_known_state=True,
room_type=None,
room_name=None,
is_encrypted=False,
@ -1400,12 +1491,132 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase):
membership_event_id=user3_ban_response["event_id"],
membership=Membership.BAN,
event_stream_ordering=user3_ban_event_pos.stream,
has_known_state=True,
room_type=None,
room_name=None,
is_encrypted=False,
),
)
def test_non_join_remote_invite_no_stripped_state(self) -> None:
"""
Test remote invite with no stripped state provided shows up in
`sliding_sync_membership_snapshots` with `has_known_state=False`.
"""
user1_id = self.register_user("user1", "pass")
_user1_tok = self.login(user1_id, "pass")
# Create a remote invite room without any `unsigned.invite_room_state`
remote_invite_room_id, remote_invite_event = (
self._create_remote_invite_room_for_user(user1_id, None)
)
# No one local is joined to the remote room
sliding_sync_joined_rooms_results = self._get_sliding_sync_joined_rooms()
self.assertIncludes(
set(sliding_sync_joined_rooms_results.keys()),
set(),
exact=True,
)
sliding_sync_membership_snapshots_results = (
self._get_sliding_sync_membership_snapshots()
)
self.assertIncludes(
set(sliding_sync_membership_snapshots_results.keys()),
{
(remote_invite_room_id, user1_id),
},
exact=True,
)
self.assertEqual(
sliding_sync_membership_snapshots_results.get(
(remote_invite_room_id, user1_id)
),
_SlidingSyncMembershipSnapshotResult(
room_id=remote_invite_room_id,
user_id=user1_id,
membership_event_id=remote_invite_event.event_id,
membership=Membership.INVITE,
event_stream_ordering=remote_invite_event.internal_metadata.stream_ordering,
# No stripped state provided
has_known_state=False,
room_type=None,
room_name=None,
is_encrypted=False,
),
)
def test_non_join_remote_invite_encrypted_room(self) -> None:
"""
Test remote invite with stripped state (encrypted room) shows up in
`sliding_sync_membership_snapshots`.
"""
user1_id = self.register_user("user1", "pass")
_user1_tok = self.login(user1_id, "pass")
# Create a remote invite room with some `unsigned.invite_room_state`
# indicating that the room is encrypted.
remote_invite_room_id, remote_invite_event = (
self._create_remote_invite_room_for_user(
user1_id,
[
StrippedStateEvent(
type=EventTypes.Create,
state_key="",
sender="@inviter:remote_server",
content={
EventContentFields.ROOM_CREATOR: "@inviter:remote_server",
EventContentFields.ROOM_VERSION: RoomVersions.V10.identifier,
},
),
StrippedStateEvent(
type=EventTypes.RoomEncryption,
state_key="",
sender="@inviter:remote_server",
content={
EventContentFields.ENCRYPTION_ALGORITHM: "m.megolm.v1.aes-sha2",
},
),
],
)
)
# No one local is joined to the remote room
sliding_sync_joined_rooms_results = self._get_sliding_sync_joined_rooms()
self.assertIncludes(
set(sliding_sync_joined_rooms_results.keys()),
set(),
exact=True,
)
sliding_sync_membership_snapshots_results = (
self._get_sliding_sync_membership_snapshots()
)
self.assertIncludes(
set(sliding_sync_membership_snapshots_results.keys()),
{
(remote_invite_room_id, user1_id),
},
exact=True,
)
self.assertEqual(
sliding_sync_membership_snapshots_results.get(
(remote_invite_room_id, user1_id)
),
_SlidingSyncMembershipSnapshotResult(
room_id=remote_invite_room_id,
user_id=user1_id,
membership_event_id=remote_invite_event.event_id,
membership=Membership.INVITE,
event_stream_ordering=remote_invite_event.internal_metadata.stream_ordering,
has_known_state=True,
room_type=None,
room_name=None,
is_encrypted=True,
),
)
# TODO: Test remote invite
# TODO: Test rejection of a remote invite
@ -1467,6 +1678,7 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase):
membership_event_id=user1_leave_response["event_id"],
membership=Membership.LEAVE,
event_stream_ordering=user1_leave_event_pos.stream,
has_known_state=True,
room_type=None,
room_name=None,
is_encrypted=False,
@ -1480,6 +1692,7 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase):
membership_event_id=user2_leave_response["event_id"],
membership=Membership.LEAVE,
event_stream_ordering=user2_leave_event_pos.stream,
has_known_state=True,
room_type=None,
room_name=None,
is_encrypted=False,