mirror of
https://github.com/element-hq/synapse.git
synced 2024-12-03 16:33:30 +03:00
Add batching
This commit is contained in:
parent
9a482a61a9
commit
98cb4b8755
1 changed files with 51 additions and 19 deletions
|
@ -23,6 +23,7 @@ from synapse.events import EventBase
|
|||
from synapse.logging.opentracing import log_kv
|
||||
from synapse.storage._base import SQLBaseStore, db_to_json
|
||||
from synapse.storage.database import LoggingTransaction
|
||||
from synapse.storage.engines import PostgresEngine
|
||||
from synapse.types import MultiWriterStreamToken, RoomStreamToken
|
||||
from synapse.types.handlers.sliding_sync import (
|
||||
HaveSentRoom,
|
||||
|
@ -458,27 +459,58 @@ class SlidingSyncStore(SQLBaseStore):
|
|||
def get_visibility_for_events_txn(
|
||||
txn: LoggingTransaction,
|
||||
) -> Mapping[str, Optional[str]]:
|
||||
sql = """
|
||||
SELECT visibility FROM history_visibility_ranges
|
||||
WHERE start_range <= ? AND (? < end_range OR end_range IS NULL)
|
||||
AND room_id = ?
|
||||
"""
|
||||
if isinstance(txn.database_engine, PostgresEngine):
|
||||
sql = """
|
||||
SELECT start_range, end_range, visibility FROM history_visibility_ranges
|
||||
WHERE int8range(start_range, end_range, '[)') @> ANY(?::bigint[])
|
||||
AND room_id = ?
|
||||
"""
|
||||
stream_orderings = [
|
||||
event.internal_metadata.stream_ordering for event in events
|
||||
]
|
||||
txn.execute(sql, (stream_orderings, room_id))
|
||||
|
||||
results = {}
|
||||
for event in events:
|
||||
txn.execute(
|
||||
sql,
|
||||
(
|
||||
event.internal_metadata.stream_ordering,
|
||||
event.internal_metadata.stream_ordering,
|
||||
room_id,
|
||||
),
|
||||
)
|
||||
row = txn.fetchone()
|
||||
if row is not None:
|
||||
results[event.event_id] = row[0]
|
||||
ranges = [
|
||||
((start_range, end_range), visibility)
|
||||
for start_range, end_range, visibility in txn
|
||||
]
|
||||
|
||||
return results
|
||||
results: Dict[str, Optional[str]] = {}
|
||||
for event in events:
|
||||
stream_ordering = event.internal_metadata.stream_ordering
|
||||
for (start_range, end_range), visibility in ranges:
|
||||
if stream_ordering < start_range:
|
||||
continue
|
||||
if end_range is not None and end_range <= stream_ordering:
|
||||
continue
|
||||
|
||||
results[event.event_id] = visibility
|
||||
break
|
||||
|
||||
return results
|
||||
|
||||
else:
|
||||
sql = """
|
||||
SELECT visibility FROM history_visibility_ranges
|
||||
WHERE start_range <= ? AND (? < end_range OR end_range IS NULL)
|
||||
AND room_id = ?
|
||||
"""
|
||||
|
||||
results = {}
|
||||
for event in events:
|
||||
txn.execute(
|
||||
sql,
|
||||
(
|
||||
event.internal_metadata.stream_ordering,
|
||||
event.internal_metadata.stream_ordering,
|
||||
room_id,
|
||||
),
|
||||
)
|
||||
row = txn.fetchone()
|
||||
if row is not None:
|
||||
results[event.event_id] = row[0]
|
||||
|
||||
return results
|
||||
|
||||
return await self.db_pool.runInteraction(
|
||||
"get_visibility_for_events", get_visibility_for_events_txn
|
||||
|
|
Loading…
Reference in a new issue