mirror of
https://github.com/element-hq/synapse.git
synced 2024-12-19 17:56:19 +03:00
Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes
This commit is contained in:
commit
abd516304e
8 changed files with 87 additions and 21 deletions
1
changelog.d/17548.misc
Normal file
1
changelog.d/17548.misc
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Fix performance of device lists in `/key/changes` and sliding sync.
|
1
changelog.d/17566.misc
Normal file
1
changelog.d/17566.misc
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Speed up responding to media requests.
|
1
changelog.d/17568.bugfix
Normal file
1
changelog.d/17568.bugfix
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Fix fetching federation signing keys from servers that omit `old_verify_keys`. Contributed by @tulir @ Beeper.
|
1
changelog.d/17571.misc
Normal file
1
changelog.d/17571.misc
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Add a flag to `/versions`, `org.matrix.simplified_msc3575`, to indicate whether experimental sliding sync support has been enabled.
|
|
@ -589,7 +589,7 @@ class BaseV2KeyFetcher(KeyFetcher):
|
||||||
% (server_name,)
|
% (server_name,)
|
||||||
)
|
)
|
||||||
|
|
||||||
for key_id, key_data in response_json["old_verify_keys"].items():
|
for key_id, key_data in response_json.get("old_verify_keys", {}).items():
|
||||||
if is_signing_algorithm_supported(key_id):
|
if is_signing_algorithm_supported(key_id):
|
||||||
key_base64 = key_data["key"]
|
key_base64 = key_data["key"]
|
||||||
key_bytes = decode_base64(key_base64)
|
key_bytes = decode_base64(key_base64)
|
||||||
|
|
|
@ -267,27 +267,23 @@ class DeviceWorkerHandler:
|
||||||
newly_left_rooms.add(change.room_id)
|
newly_left_rooms.add(change.room_id)
|
||||||
|
|
||||||
# We now work out if any other users have since joined or left the rooms
|
# We now work out if any other users have since joined or left the rooms
|
||||||
# the user is currently in. First we filter out rooms that we know
|
# the user is currently in.
|
||||||
# haven't changed recently.
|
|
||||||
rooms_changed = self.store.get_rooms_that_changed(
|
|
||||||
joined_room_ids, from_token.room_key
|
|
||||||
)
|
|
||||||
|
|
||||||
# List of membership changes per room
|
# List of membership changes per room
|
||||||
room_to_deltas: Dict[str, List[StateDelta]] = {}
|
room_to_deltas: Dict[str, List[StateDelta]] = {}
|
||||||
# The set of event IDs of membership events (so we can fetch their
|
# The set of event IDs of membership events (so we can fetch their
|
||||||
# associated membership).
|
# associated membership).
|
||||||
memberships_to_fetch: Set[str] = set()
|
memberships_to_fetch: Set[str] = set()
|
||||||
for room_id in rooms_changed:
|
|
||||||
# TODO: Only pull out membership events?
|
# TODO: Only pull out membership events?
|
||||||
state_changes = await self.store.get_current_state_deltas_for_room(
|
state_changes = await self.store.get_current_state_deltas_for_rooms(
|
||||||
room_id, from_token=from_token.room_key, to_token=now_token.room_key
|
joined_room_ids, from_token=from_token.room_key, to_token=now_token.room_key
|
||||||
)
|
)
|
||||||
for delta in state_changes:
|
for delta in state_changes:
|
||||||
if delta.event_type != EventTypes.Member:
|
if delta.event_type != EventTypes.Member:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
room_to_deltas.setdefault(room_id, []).append(delta)
|
room_to_deltas.setdefault(delta.room_id, []).append(delta)
|
||||||
if delta.event_id:
|
if delta.event_id:
|
||||||
memberships_to_fetch.add(delta.event_id)
|
memberships_to_fetch.add(delta.event_id)
|
||||||
if delta.prev_event_id:
|
if delta.prev_event_id:
|
||||||
|
|
|
@ -64,6 +64,7 @@ class VersionsRestServlet(RestServlet):
|
||||||
|
|
||||||
async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
|
async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
|
||||||
msc3881_enabled = self.config.experimental.msc3881_enabled
|
msc3881_enabled = self.config.experimental.msc3881_enabled
|
||||||
|
msc3575_enabled = self.config.experimental.msc3575_enabled
|
||||||
|
|
||||||
if self.auth.has_access_token(request):
|
if self.auth.has_access_token(request):
|
||||||
requester = await self.auth.get_user_by_req(
|
requester = await self.auth.get_user_by_req(
|
||||||
|
@ -77,6 +78,9 @@ class VersionsRestServlet(RestServlet):
|
||||||
msc3881_enabled = await self.store.is_feature_enabled(
|
msc3881_enabled = await self.store.is_feature_enabled(
|
||||||
user_id, ExperimentalFeature.MSC3881
|
user_id, ExperimentalFeature.MSC3881
|
||||||
)
|
)
|
||||||
|
msc3575_enabled = await self.store.is_feature_enabled(
|
||||||
|
user_id, ExperimentalFeature.MSC3575
|
||||||
|
)
|
||||||
|
|
||||||
return (
|
return (
|
||||||
200,
|
200,
|
||||||
|
@ -169,6 +173,8 @@ class VersionsRestServlet(RestServlet):
|
||||||
),
|
),
|
||||||
# MSC4151: Report room API (Client-Server API)
|
# MSC4151: Report room API (Client-Server API)
|
||||||
"org.matrix.msc4151": self.config.experimental.msc4151_enabled,
|
"org.matrix.msc4151": self.config.experimental.msc4151_enabled,
|
||||||
|
# Simplified sliding sync
|
||||||
|
"org.matrix.simplified_msc3575": msc3575_enabled,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
|
@ -26,10 +26,11 @@ import attr
|
||||||
|
|
||||||
from synapse.logging.opentracing import trace
|
from synapse.logging.opentracing import trace
|
||||||
from synapse.storage._base import SQLBaseStore
|
from synapse.storage._base import SQLBaseStore
|
||||||
from synapse.storage.database import LoggingTransaction
|
from synapse.storage.database import LoggingTransaction, make_in_list_sql_clause
|
||||||
from synapse.storage.databases.main.stream import _filter_results_by_stream
|
from synapse.storage.databases.main.stream import _filter_results_by_stream
|
||||||
from synapse.types import RoomStreamToken
|
from synapse.types import RoomStreamToken, StrCollection
|
||||||
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
from synapse.util.caches.stream_change_cache import StreamChangeCache
|
||||||
|
from synapse.util.iterutils import batch_iter
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -200,3 +201,62 @@ class StateDeltasStore(SQLBaseStore):
|
||||||
return await self.db_pool.runInteraction(
|
return await self.db_pool.runInteraction(
|
||||||
"get_current_state_deltas_for_room", get_current_state_deltas_for_room_txn
|
"get_current_state_deltas_for_room", get_current_state_deltas_for_room_txn
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@trace
|
||||||
|
async def get_current_state_deltas_for_rooms(
|
||||||
|
self,
|
||||||
|
room_ids: StrCollection,
|
||||||
|
from_token: RoomStreamToken,
|
||||||
|
to_token: RoomStreamToken,
|
||||||
|
) -> List[StateDelta]:
|
||||||
|
"""Get the state deltas between two tokens for the set of rooms."""
|
||||||
|
|
||||||
|
room_ids = self._curr_state_delta_stream_cache.get_entities_changed(
|
||||||
|
room_ids, from_token.stream
|
||||||
|
)
|
||||||
|
if not room_ids:
|
||||||
|
return []
|
||||||
|
|
||||||
|
def get_current_state_deltas_for_rooms_txn(
|
||||||
|
txn: LoggingTransaction,
|
||||||
|
room_ids: StrCollection,
|
||||||
|
) -> List[StateDelta]:
|
||||||
|
clause, args = make_in_list_sql_clause(
|
||||||
|
self.database_engine, "room_id", room_ids
|
||||||
|
)
|
||||||
|
|
||||||
|
sql = f"""
|
||||||
|
SELECT instance_name, stream_id, room_id, type, state_key, event_id, prev_event_id
|
||||||
|
FROM current_state_delta_stream
|
||||||
|
WHERE {clause} AND ? < stream_id AND stream_id <= ?
|
||||||
|
ORDER BY stream_id ASC
|
||||||
|
"""
|
||||||
|
args.append(from_token.stream)
|
||||||
|
args.append(to_token.get_max_stream_pos())
|
||||||
|
|
||||||
|
txn.execute(sql, args)
|
||||||
|
|
||||||
|
return [
|
||||||
|
StateDelta(
|
||||||
|
stream_id=row[1],
|
||||||
|
room_id=row[2],
|
||||||
|
event_type=row[3],
|
||||||
|
state_key=row[4],
|
||||||
|
event_id=row[5],
|
||||||
|
prev_event_id=row[6],
|
||||||
|
)
|
||||||
|
for row in txn
|
||||||
|
if _filter_results_by_stream(from_token, to_token, row[0], row[1])
|
||||||
|
]
|
||||||
|
|
||||||
|
results = []
|
||||||
|
for batch in batch_iter(room_ids, 1000):
|
||||||
|
deltas = await self.db_pool.runInteraction(
|
||||||
|
"get_current_state_deltas_for_rooms",
|
||||||
|
get_current_state_deltas_for_rooms_txn,
|
||||||
|
batch,
|
||||||
|
)
|
||||||
|
|
||||||
|
results.extend(deltas)
|
||||||
|
|
||||||
|
return results
|
||||||
|
|
Loading…
Reference in a new issue