mirror of
https://github.com/element-hq/synapse.git
synced 2024-11-26 11:36:03 +03:00
Pre-populate membership
and membership_event_stream_ordering
See https://github.com/element-hq/synapse/pull/17512#discussion_r1725311745
This commit is contained in:
parent
f6d7ffd9c5
commit
e7a3328228
3 changed files with 40 additions and 58 deletions
|
@ -160,6 +160,8 @@ class SlidingSyncMembershipInfo:
|
||||||
user_id: str
|
user_id: str
|
||||||
sender: str
|
sender: str
|
||||||
membership_event_id: str
|
membership_event_id: str
|
||||||
|
membership: str
|
||||||
|
membership_event_stream_ordering: int
|
||||||
|
|
||||||
|
|
||||||
@attr.s(slots=True, auto_attribs=True)
|
@attr.s(slots=True, auto_attribs=True)
|
||||||
|
@ -401,7 +403,7 @@ class PersistEventsStore:
|
||||||
if state_key[0] == EventTypes.Member and self.is_mine_id(state_key[1]):
|
if state_key[0] == EventTypes.Member and self.is_mine_id(state_key[1]):
|
||||||
membership_event_id_to_user_id_map[event_id] = state_key[1]
|
membership_event_id_to_user_id_map[event_id] = state_key[1]
|
||||||
|
|
||||||
event_id_to_sender_map: Dict[str, str] = {}
|
membership_event_map: Dict[str, EventBase] = {}
|
||||||
# In normal event persist scenarios, we should be able to find the
|
# In normal event persist scenarios, we should be able to find the
|
||||||
# membership events in the `events_and_contexts` given to us but it's
|
# membership events in the `events_and_contexts` given to us but it's
|
||||||
# possible a state reset happened which added us to the room without a
|
# possible a state reset happened which added us to the room without a
|
||||||
|
@ -410,36 +412,40 @@ class PersistEventsStore:
|
||||||
for membership_event_id in membership_event_id_to_user_id_map.keys():
|
for membership_event_id in membership_event_id_to_user_id_map.keys():
|
||||||
membership_event = event_map.get(membership_event_id)
|
membership_event = event_map.get(membership_event_id)
|
||||||
if membership_event:
|
if membership_event:
|
||||||
event_id_to_sender_map[membership_event_id] = (
|
membership_event_map[membership_event_id] = membership_event
|
||||||
membership_event.sender
|
|
||||||
)
|
|
||||||
else:
|
else:
|
||||||
missing_membership_event_ids.add(membership_event_id)
|
missing_membership_event_ids.add(membership_event_id)
|
||||||
|
|
||||||
# Otherwise, we need to find a couple events that we were reset to.
|
# Otherwise, we need to find a couple events that we were reset to.
|
||||||
if missing_membership_event_ids:
|
if missing_membership_event_ids:
|
||||||
remaining_event_id_to_sender_map = (
|
remaining_events = await self.store.get_events(
|
||||||
await self.store.get_sender_for_event_ids(
|
missing_membership_event_ids
|
||||||
missing_membership_event_ids
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
# There shouldn't be any missing events
|
# There shouldn't be any missing events
|
||||||
assert (
|
assert (
|
||||||
remaining_event_id_to_sender_map.keys()
|
remaining_events.keys() == missing_membership_event_ids
|
||||||
== missing_membership_event_ids
|
), missing_membership_event_ids.difference(remaining_events.keys())
|
||||||
), missing_membership_event_ids.difference(
|
membership_event_map.update(remaining_events)
|
||||||
remaining_event_id_to_sender_map.keys()
|
|
||||||
)
|
|
||||||
event_id_to_sender_map.update(remaining_event_id_to_sender_map)
|
|
||||||
|
|
||||||
membership_infos_to_insert_membership_snapshots = [
|
for (
|
||||||
SlidingSyncMembershipInfo(
|
membership_event_id,
|
||||||
user_id=user_id,
|
user_id,
|
||||||
sender=event_id_to_sender_map[membership_event_id],
|
) in membership_event_id_to_user_id_map.items():
|
||||||
membership_event_id=membership_event_id,
|
# We should only be seeing events with stream_ordering assigned by this point
|
||||||
|
membership_event_stream_ordering = membership_event_map[
|
||||||
|
membership_event_id
|
||||||
|
].internal_metadata.stream_ordering
|
||||||
|
assert membership_event_stream_ordering is not None
|
||||||
|
|
||||||
|
membership_infos_to_insert_membership_snapshots.append(
|
||||||
|
SlidingSyncMembershipInfo(
|
||||||
|
user_id=user_id,
|
||||||
|
sender=membership_event_map[membership_event_id].sender,
|
||||||
|
membership_event_id=membership_event_id,
|
||||||
|
membership=membership_event_map[membership_event_id].membership,
|
||||||
|
membership_event_stream_ordering=membership_event_stream_ordering,
|
||||||
|
)
|
||||||
)
|
)
|
||||||
for membership_event_id, user_id in membership_event_id_to_user_id_map.items()
|
|
||||||
]
|
|
||||||
|
|
||||||
if membership_infos_to_insert_membership_snapshots:
|
if membership_infos_to_insert_membership_snapshots:
|
||||||
current_state_ids_map: MutableStateMap[str] = dict(
|
current_state_ids_map: MutableStateMap[str] = dict(
|
||||||
|
@ -1717,9 +1723,7 @@ class PersistEventsStore:
|
||||||
(room_id, user_id, membership_event_id, membership, event_stream_ordering
|
(room_id, user_id, membership_event_id, membership, event_stream_ordering
|
||||||
{("," + ", ".join(insert_keys)) if insert_keys else ""})
|
{("," + ", ".join(insert_keys)) if insert_keys else ""})
|
||||||
VALUES (
|
VALUES (
|
||||||
?, ?, ?,
|
?, ?, ?, ?, ?
|
||||||
(SELECT membership FROM room_memberships WHERE event_id = ?),
|
|
||||||
(SELECT stream_ordering FROM events WHERE event_id = ?)
|
|
||||||
{("," + ", ".join("?" for _ in insert_values)) if insert_values else ""}
|
{("," + ", ".join("?" for _ in insert_values)) if insert_values else ""}
|
||||||
)
|
)
|
||||||
ON CONFLICT (room_id, user_id)
|
ON CONFLICT (room_id, user_id)
|
||||||
|
@ -1734,8 +1738,8 @@ class PersistEventsStore:
|
||||||
room_id,
|
room_id,
|
||||||
membership_info.user_id,
|
membership_info.user_id,
|
||||||
membership_info.membership_event_id,
|
membership_info.membership_event_id,
|
||||||
membership_info.membership_event_id,
|
membership_info.membership,
|
||||||
membership_info.membership_event_id,
|
membership_info.membership_event_stream_ordering,
|
||||||
]
|
]
|
||||||
+ list(insert_values)
|
+ list(insert_values)
|
||||||
for membership_info in sliding_sync_table_changes.to_insert_membership_snapshots
|
for membership_info in sliding_sync_table_changes.to_insert_membership_snapshots
|
||||||
|
|
|
@ -1960,6 +1960,8 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
sender=sender,
|
sender=sender,
|
||||||
membership_event_id=membership_event_id,
|
membership_event_id=membership_event_id,
|
||||||
|
membership=membership,
|
||||||
|
membership_event_stream_ordering=membership_event_stream_ordering,
|
||||||
)
|
)
|
||||||
|
|
||||||
def _backfill_table_txn(txn: LoggingTransaction) -> None:
|
def _backfill_table_txn(txn: LoggingTransaction) -> None:
|
||||||
|
@ -1967,6 +1969,10 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
|
||||||
room_id, user_id = key
|
room_id, user_id = key
|
||||||
membership_info = to_insert_membership_infos[key]
|
membership_info = to_insert_membership_infos[key]
|
||||||
membership_event_id = membership_info.membership_event_id
|
membership_event_id = membership_info.membership_event_id
|
||||||
|
membership = membership_info.membership
|
||||||
|
membership_event_stream_ordering = (
|
||||||
|
membership_info.membership_event_stream_ordering
|
||||||
|
)
|
||||||
|
|
||||||
# Pulling keys/values separately is safe and will produce congruent
|
# Pulling keys/values separately is safe and will produce congruent
|
||||||
# lists
|
# lists
|
||||||
|
@ -1980,9 +1986,7 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
|
||||||
(room_id, user_id, membership_event_id, membership, event_stream_ordering
|
(room_id, user_id, membership_event_id, membership, event_stream_ordering
|
||||||
{("," + ", ".join(insert_keys)) if insert_keys else ""})
|
{("," + ", ".join(insert_keys)) if insert_keys else ""})
|
||||||
VALUES (
|
VALUES (
|
||||||
?, ?, ?,
|
?, ?, ?, ?, ?
|
||||||
(SELECT membership FROM room_memberships WHERE event_id = ?),
|
|
||||||
(SELECT stream_ordering FROM events WHERE event_id = ?)
|
|
||||||
{("," + ", ".join("?" for _ in insert_values)) if insert_values else ""}
|
{("," + ", ".join("?" for _ in insert_values)) if insert_values else ""}
|
||||||
)
|
)
|
||||||
ON CONFLICT (room_id, user_id)
|
ON CONFLICT (room_id, user_id)
|
||||||
|
@ -1992,8 +1996,8 @@ class EventsBackgroundUpdatesStore(StreamWorkerStore, StateDeltasStore, SQLBaseS
|
||||||
room_id,
|
room_id,
|
||||||
user_id,
|
user_id,
|
||||||
membership_event_id,
|
membership_event_id,
|
||||||
membership_event_id,
|
membership,
|
||||||
membership_event_id,
|
membership_event_stream_ordering,
|
||||||
]
|
]
|
||||||
+ list(insert_values),
|
+ list(insert_values),
|
||||||
)
|
)
|
||||||
|
|
|
@ -81,7 +81,7 @@ from synapse.storage.util.id_generators import (
|
||||||
MultiWriterIdGenerator,
|
MultiWriterIdGenerator,
|
||||||
)
|
)
|
||||||
from synapse.storage.util.sequence import build_sequence_generator
|
from synapse.storage.util.sequence import build_sequence_generator
|
||||||
from synapse.types import JsonDict, StrCollection, get_domain_from_id
|
from synapse.types import JsonDict, get_domain_from_id
|
||||||
from synapse.types.state import StateFilter
|
from synapse.types.state import StateFilter
|
||||||
from synapse.util import unwrapFirstError
|
from synapse.util import unwrapFirstError
|
||||||
from synapse.util.async_helpers import ObservableDeferred, delay_cancellation
|
from synapse.util.async_helpers import ObservableDeferred, delay_cancellation
|
||||||
|
@ -1981,32 +1981,6 @@ class EventsWorkerStore(SQLBaseStore):
|
||||||
|
|
||||||
return int(res[0]), int(res[1])
|
return int(res[0]), int(res[1])
|
||||||
|
|
||||||
async def get_sender_for_event_ids(
|
|
||||||
self, event_ids: StrCollection
|
|
||||||
) -> Mapping[str, str]:
|
|
||||||
"""
|
|
||||||
Get the sender for a list of event IDs.
|
|
||||||
Args:
|
|
||||||
event_ids: The event IDs to look up.
|
|
||||||
Returns:
|
|
||||||
A mapping from event ID to event sender.
|
|
||||||
"""
|
|
||||||
rows = cast(
|
|
||||||
List[Tuple[str, str]],
|
|
||||||
await self.db_pool.simple_select_many_batch(
|
|
||||||
table="events",
|
|
||||||
column="event_id",
|
|
||||||
iterable=event_ids,
|
|
||||||
retcols=(
|
|
||||||
"event_id",
|
|
||||||
"sender",
|
|
||||||
),
|
|
||||||
desc="get_sender_for_event_ids",
|
|
||||||
),
|
|
||||||
)
|
|
||||||
|
|
||||||
return dict(rows)
|
|
||||||
|
|
||||||
async def get_next_event_to_expire(self) -> Optional[Tuple[str, int]]:
|
async def get_next_event_to_expire(self) -> Optional[Tuple[str, int]]:
|
||||||
"""Retrieve the entry with the lowest expiry timestamp in the event_expiry
|
"""Retrieve the entry with the lowest expiry timestamp in the event_expiry
|
||||||
table, or None if there's no more event to expire.
|
table, or None if there's no more event to expire.
|
||||||
|
|
Loading…
Reference in a new issue