Bundle aggregations outside of the serialization method. (#11612)

This makes the serialization of events synchronous (and it no
longer access the database), but we must manually calculate and
provide the bundled aggregations.

Overall this should cause no change in behavior, but is prep work
for other improvements.
This commit is contained in:
Patrick Cloke 2022-01-07 09:10:46 -05:00 committed by GitHub
parent 6c68e874b1
commit 6bf81a7a61
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 248 additions and 155 deletions

1
changelog.d/11612.misc Normal file
View file

@ -0,0 +1 @@
Avoid database access in the JSON serialization process.

View file

@ -14,17 +14,7 @@
# limitations under the License.
import collections.abc
import re
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Iterable,
List,
Mapping,
Optional,
Union,
)
from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Union
from frozendict import frozendict
@ -32,14 +22,10 @@ from synapse.api.constants import EventContentFields, EventTypes, RelationTypes
from synapse.api.errors import Codes, SynapseError
from synapse.api.room_versions import RoomVersion
from synapse.types import JsonDict
from synapse.util.async_helpers import yieldable_gather_results
from synapse.util.frozenutils import unfreeze
from . import EventBase
if TYPE_CHECKING:
from synapse.server import HomeServer
# Split strings on "." but not "\." This uses a negative lookbehind assertion for '\'
# (?<!stuff) matches if the current position in the string is not preceded
# by a match for 'stuff'.
@ -385,17 +371,12 @@ class EventClientSerializer:
clients.
"""
def __init__(self, hs: "HomeServer"):
self.store = hs.get_datastore()
self._msc1849_enabled = hs.config.experimental.msc1849_enabled
self._msc3440_enabled = hs.config.experimental.msc3440_enabled
async def serialize_event(
def serialize_event(
self,
event: Union[JsonDict, EventBase],
time_now: int,
*,
bundle_aggregations: bool = False,
bundle_aggregations: Optional[Dict[str, JsonDict]] = None,
**kwargs: Any,
) -> JsonDict:
"""Serializes a single event.
@ -418,66 +399,41 @@ class EventClientSerializer:
serialized_event = serialize_event(event, time_now, **kwargs)
# Check if there are any bundled aggregations to include with the event.
#
# Do not bundle aggregations if any of the following at true:
#
# * Support is disabled via the configuration or the caller.
# * The event is a state event.
# * The event has been redacted.
if (
self._msc1849_enabled
and bundle_aggregations
and not event.is_state()
and not event.internal_metadata.is_redacted()
):
await self._injected_bundled_aggregations(event, time_now, serialized_event)
if bundle_aggregations:
event_aggregations = bundle_aggregations.get(event.event_id)
if event_aggregations:
self._injected_bundled_aggregations(
event,
time_now,
bundle_aggregations[event.event_id],
serialized_event,
)
return serialized_event
async def _injected_bundled_aggregations(
self, event: EventBase, time_now: int, serialized_event: JsonDict
def _injected_bundled_aggregations(
self,
event: EventBase,
time_now: int,
aggregations: JsonDict,
serialized_event: JsonDict,
) -> None:
"""Potentially injects bundled aggregations into the unsigned portion of the serialized event.
Args:
event: The event being serialized.
time_now: The current time in milliseconds
aggregations: The bundled aggregation to serialize.
serialized_event: The serialized event which may be modified.
"""
# Do not bundle aggregations for an event which represents an edit or an
# annotation. It does not make sense for them to have related events.
relates_to = event.content.get("m.relates_to")
if isinstance(relates_to, (dict, frozendict)):
relation_type = relates_to.get("rel_type")
if relation_type in (RelationTypes.ANNOTATION, RelationTypes.REPLACE):
return
# Make a copy in-case the object is cached.
aggregations = aggregations.copy()
event_id = event.event_id
room_id = event.room_id
# The bundled aggregations to include.
aggregations = {}
annotations = await self.store.get_aggregation_groups_for_event(
event_id, room_id
)
if annotations.chunk:
aggregations[RelationTypes.ANNOTATION] = annotations.to_dict()
references = await self.store.get_relations_for_event(
event_id, room_id, RelationTypes.REFERENCE, direction="f"
)
if references.chunk:
aggregations[RelationTypes.REFERENCE] = references.to_dict()
edit = None
if event.type == EventTypes.Message:
edit = await self.store.get_applicable_edit(event_id, room_id)
if edit:
if RelationTypes.REPLACE in aggregations:
# If there is an edit replace the content, preserving existing
# relations.
edit = aggregations[RelationTypes.REPLACE]
# Ensure we take copies of the edit content, otherwise we risk modifying
# the original event.
@ -502,27 +458,19 @@ class EventClientSerializer:
}
# If this event is the start of a thread, include a summary of the replies.
if self._msc3440_enabled:
(
thread_count,
latest_thread_event,
) = await self.store.get_thread_summary(event_id, room_id)
if latest_thread_event:
aggregations[RelationTypes.THREAD] = {
# Don't bundle aggregations as this could recurse forever.
"latest_event": await self.serialize_event(
latest_thread_event, time_now, bundle_aggregations=False
),
"count": thread_count,
}
if RelationTypes.THREAD in aggregations:
# Serialize the latest thread event.
latest_thread_event = aggregations[RelationTypes.THREAD]["latest_event"]
# If any bundled aggregations were found, include them.
if aggregations:
serialized_event["unsigned"].setdefault("m.relations", {}).update(
aggregations
# Don't bundle aggregations as this could recurse forever.
aggregations[RelationTypes.THREAD]["latest_event"] = self.serialize_event(
latest_thread_event, time_now, bundle_aggregations=None
)
async def serialize_events(
# Include the bundled aggregations in the event.
serialized_event["unsigned"].setdefault("m.relations", {}).update(aggregations)
def serialize_events(
self, events: Iterable[Union[JsonDict, EventBase]], time_now: int, **kwargs: Any
) -> List[JsonDict]:
"""Serializes multiple events.
@ -535,9 +483,9 @@ class EventClientSerializer:
Returns:
The list of serialized events
"""
return await yieldable_gather_results(
self.serialize_event, events, time_now=time_now, **kwargs
)
return [
self.serialize_event(event, time_now=time_now, **kwargs) for event in events
]
def copy_power_levels_contents(

View file

@ -119,7 +119,7 @@ class EventStreamHandler:
events.extend(to_add)
chunks = await self._event_serializer.serialize_events(
chunks = self._event_serializer.serialize_events(
events,
time_now,
as_client_event=as_client_event,

View file

@ -170,7 +170,7 @@ class InitialSyncHandler:
d["inviter"] = event.sender
invite_event = await self.store.get_event(event.event_id)
d["invite"] = await self._event_serializer.serialize_event(
d["invite"] = self._event_serializer.serialize_event(
invite_event,
time_now,
as_client_event=as_client_event,
@ -222,7 +222,7 @@ class InitialSyncHandler:
d["messages"] = {
"chunk": (
await self._event_serializer.serialize_events(
self._event_serializer.serialize_events(
messages,
time_now=time_now,
as_client_event=as_client_event,
@ -232,7 +232,7 @@ class InitialSyncHandler:
"end": await end_token.to_string(self.store),
}
d["state"] = await self._event_serializer.serialize_events(
d["state"] = self._event_serializer.serialize_events(
current_state.values(),
time_now=time_now,
as_client_event=as_client_event,
@ -376,16 +376,14 @@ class InitialSyncHandler:
"messages": {
"chunk": (
# Don't bundle aggregations as this is a deprecated API.
await self._event_serializer.serialize_events(messages, time_now)
self._event_serializer.serialize_events(messages, time_now)
),
"start": await start_token.to_string(self.store),
"end": await end_token.to_string(self.store),
},
"state": (
# Don't bundle aggregations as this is a deprecated API.
await self._event_serializer.serialize_events(
room_state.values(), time_now
)
self._event_serializer.serialize_events(room_state.values(), time_now)
),
"presence": [],
"receipts": [],
@ -404,7 +402,7 @@ class InitialSyncHandler:
# TODO: These concurrently
time_now = self.clock.time_msec()
# Don't bundle aggregations as this is a deprecated API.
state = await self._event_serializer.serialize_events(
state = self._event_serializer.serialize_events(
current_state.values(), time_now
)
@ -480,7 +478,7 @@ class InitialSyncHandler:
"messages": {
"chunk": (
# Don't bundle aggregations as this is a deprecated API.
await self._event_serializer.serialize_events(messages, time_now)
self._event_serializer.serialize_events(messages, time_now)
),
"start": await start_token.to_string(self.store),
"end": await end_token.to_string(self.store),

View file

@ -246,7 +246,7 @@ class MessageHandler:
room_state = room_state_events[membership_event_id]
now = self.clock.time_msec()
events = await self._event_serializer.serialize_events(room_state.values(), now)
events = self._event_serializer.serialize_events(room_state.values(), now)
return events
async def get_joined_members(self, requester: Requester, room_id: str) -> dict:

View file

@ -537,14 +537,16 @@ class PaginationHandler:
state_dict = await self.store.get_events(list(state_ids.values()))
state = state_dict.values()
aggregations = await self.store.get_bundled_aggregations(events)
time_now = self.clock.time_msec()
chunk = {
"chunk": (
await self._event_serializer.serialize_events(
self._event_serializer.serialize_events(
events,
time_now,
bundle_aggregations=True,
bundle_aggregations=aggregations,
as_client_event=as_client_event,
)
),
@ -553,7 +555,7 @@ class PaginationHandler:
}
if state:
chunk["state"] = await self._event_serializer.serialize_events(
chunk["state"] = self._event_serializer.serialize_events(
state, time_now, as_client_event=as_client_event
)

View file

@ -1181,6 +1181,16 @@ class RoomContextHandler:
# `filtered` rather than the event we retrieved from the datastore.
results["event"] = filtered[0]
# Fetch the aggregations.
aggregations = await self.store.get_bundled_aggregations([results["event"]])
aggregations.update(
await self.store.get_bundled_aggregations(results["events_before"])
)
aggregations.update(
await self.store.get_bundled_aggregations(results["events_after"])
)
results["aggregations"] = aggregations
if results["events_after"]:
last_event_id = results["events_after"][-1].event_id
else:

View file

@ -420,10 +420,10 @@ class SearchHandler:
time_now = self.clock.time_msec()
for context in contexts.values():
context["events_before"] = await self._event_serializer.serialize_events(
context["events_before"] = self._event_serializer.serialize_events(
context["events_before"], time_now
)
context["events_after"] = await self._event_serializer.serialize_events(
context["events_after"] = self._event_serializer.serialize_events(
context["events_after"], time_now
)
@ -441,9 +441,7 @@ class SearchHandler:
results.append(
{
"rank": rank_map[e.event_id],
"result": (
await self._event_serializer.serialize_event(e, time_now)
),
"result": self._event_serializer.serialize_event(e, time_now),
"context": contexts.get(e.event_id, {}),
}
)
@ -457,7 +455,7 @@ class SearchHandler:
if state_results:
s = {}
for room_id, state_events in state_results.items():
s[room_id] = await self._event_serializer.serialize_events(
s[room_id] = self._event_serializer.serialize_events(
state_events, time_now
)

View file

@ -424,7 +424,7 @@ class RoomStateRestServlet(RestServlet):
event_ids = await self.store.get_current_state_ids(room_id)
events = await self.store.get_events(event_ids.values())
now = self.clock.time_msec()
room_state = await self._event_serializer.serialize_events(events.values(), now)
room_state = self._event_serializer.serialize_events(events.values(), now)
ret = {"state": room_state}
return HTTPStatus.OK, ret
@ -744,22 +744,22 @@ class RoomEventContextServlet(RestServlet):
)
time_now = self.clock.time_msec()
results["events_before"] = await self._event_serializer.serialize_events(
results["events_before"] = self._event_serializer.serialize_events(
results["events_before"],
time_now,
bundle_aggregations=True,
bundle_aggregations=results["aggregations"],
)
results["event"] = await self._event_serializer.serialize_event(
results["event"] = self._event_serializer.serialize_event(
results["event"],
time_now,
bundle_aggregations=True,
bundle_aggregations=results["aggregations"],
)
results["events_after"] = await self._event_serializer.serialize_events(
results["events_after"] = self._event_serializer.serialize_events(
results["events_after"],
time_now,
bundle_aggregations=True,
bundle_aggregations=results["aggregations"],
)
results["state"] = await self._event_serializer.serialize_events(
results["state"] = self._event_serializer.serialize_events(
results["state"], time_now
)

View file

@ -91,7 +91,7 @@ class EventRestServlet(RestServlet):
time_now = self.clock.time_msec()
if event:
result = await self._event_serializer.serialize_event(event, time_now)
result = self._event_serializer.serialize_event(event, time_now)
return 200, result
else:
return 404, "Event not found."

View file

@ -72,7 +72,7 @@ class NotificationsServlet(RestServlet):
"actions": pa.actions,
"ts": pa.received_ts,
"event": (
await self._event_serializer.serialize_event(
self._event_serializer.serialize_event(
notif_events[pa.event_id],
self.clock.time_msec(),
event_format=format_event_for_client_v2_without_room_id,

View file

@ -113,13 +113,14 @@ class RelationPaginationServlet(RestServlet):
now = self.clock.time_msec()
# Do not bundle aggregations when retrieving the original event because
# we want the content before relations are applied to it.
original_event = await self._event_serializer.serialize_event(
event, now, bundle_aggregations=False
original_event = self._event_serializer.serialize_event(
event, now, bundle_aggregations=None
)
# The relations returned for the requested event do include their
# bundled aggregations.
serialized_events = await self._event_serializer.serialize_events(
events, now, bundle_aggregations=True
aggregations = await self.store.get_bundled_aggregations(events)
serialized_events = self._event_serializer.serialize_events(
events, now, bundle_aggregations=aggregations
)
return_value = pagination_chunk.to_dict()
@ -308,7 +309,7 @@ class RelationAggregationGroupPaginationServlet(RestServlet):
)
now = self.clock.time_msec()
serialized_events = await self._event_serializer.serialize_events(events, now)
serialized_events = self._event_serializer.serialize_events(events, now)
return_value = result.to_dict()
return_value["chunk"] = serialized_events

View file

@ -642,6 +642,7 @@ class RoomEventServlet(RestServlet):
def __init__(self, hs: "HomeServer"):
super().__init__()
self.clock = hs.get_clock()
self._store = hs.get_datastore()
self.event_handler = hs.get_event_handler()
self._event_serializer = hs.get_event_client_serializer()
self.auth = hs.get_auth()
@ -660,10 +661,13 @@ class RoomEventServlet(RestServlet):
# https://matrix.org/docs/spec/client_server/r0.5.0#get-matrix-client-r0-rooms-roomid-event-eventid
raise SynapseError(404, "Event not found.", errcode=Codes.NOT_FOUND)
time_now = self.clock.time_msec()
if event:
event_dict = await self._event_serializer.serialize_event(
event, time_now, bundle_aggregations=True
# Ensure there are bundled aggregations available.
aggregations = await self._store.get_bundled_aggregations([event])
time_now = self.clock.time_msec()
event_dict = self._event_serializer.serialize_event(
event, time_now, bundle_aggregations=aggregations
)
return 200, event_dict
@ -708,16 +712,20 @@ class RoomEventContextServlet(RestServlet):
raise SynapseError(404, "Event not found.", errcode=Codes.NOT_FOUND)
time_now = self.clock.time_msec()
results["events_before"] = await self._event_serializer.serialize_events(
results["events_before"], time_now, bundle_aggregations=True
results["events_before"] = self._event_serializer.serialize_events(
results["events_before"],
time_now,
bundle_aggregations=results["aggregations"],
)
results["event"] = await self._event_serializer.serialize_event(
results["event"], time_now, bundle_aggregations=True
results["event"] = self._event_serializer.serialize_event(
results["event"], time_now, bundle_aggregations=results["aggregations"]
)
results["events_after"] = await self._event_serializer.serialize_events(
results["events_after"], time_now, bundle_aggregations=True
results["events_after"] = self._event_serializer.serialize_events(
results["events_after"],
time_now,
bundle_aggregations=results["aggregations"],
)
results["state"] = await self._event_serializer.serialize_events(
results["state"] = self._event_serializer.serialize_events(
results["state"], time_now
)

View file

@ -17,7 +17,6 @@ from collections import defaultdict
from typing import (
TYPE_CHECKING,
Any,
Awaitable,
Callable,
Dict,
Iterable,
@ -395,7 +394,7 @@ class SyncRestServlet(RestServlet):
"""
invited = {}
for room in rooms:
invite = await self._event_serializer.serialize_event(
invite = self._event_serializer.serialize_event(
room.invite,
time_now,
token_id=token_id,
@ -432,7 +431,7 @@ class SyncRestServlet(RestServlet):
"""
knocked = {}
for room in rooms:
knock = await self._event_serializer.serialize_event(
knock = self._event_serializer.serialize_event(
room.knock,
time_now,
token_id=token_id,
@ -525,21 +524,14 @@ class SyncRestServlet(RestServlet):
The room, encoded in our response format
"""
def serialize(events: Iterable[EventBase]) -> Awaitable[List[JsonDict]]:
def serialize(
events: Iterable[EventBase],
aggregations: Optional[Dict[str, Dict[str, Any]]] = None,
) -> List[JsonDict]:
return self._event_serializer.serialize_events(
events,
time_now=time_now,
# Don't bother to bundle aggregations if the timeline is unlimited,
# as clients will have all the necessary information.
# bundle_aggregations=room.timeline.limited,
#
# richvdh 2021-12-15: disable this temporarily as it has too high an
# overhead for initialsyncs. We need to figure out a way that the
# bundling can be done *before* the events are stored in the
# SyncResponseCache so that this part can be synchronous.
#
# Ensure to re-enable the test at tests/rest/client/test_relations.py::RelationsTestCase.test_bundled_aggregations.
bundle_aggregations=False,
bundle_aggregations=aggregations,
token_id=token_id,
event_format=event_formatter,
only_event_fields=only_fields,
@ -561,8 +553,21 @@ class SyncRestServlet(RestServlet):
event.room_id,
)
serialized_state = await serialize(state_events)
serialized_timeline = await serialize(timeline_events)
serialized_state = serialize(state_events)
# Don't bother to bundle aggregations if the timeline is unlimited,
# as clients will have all the necessary information.
# bundle_aggregations=room.timeline.limited,
#
# richvdh 2021-12-15: disable this temporarily as it has too high an
# overhead for initialsyncs. We need to figure out a way that the
# bundling can be done *before* the events are stored in the
# SyncResponseCache so that this part can be synchronous.
#
# Ensure to re-enable the test at tests/rest/client/test_relations.py::RelationsTestCase.test_bundled_aggregations.
# if room.timeline.limited:
# aggregations = await self.store.get_bundled_aggregations(timeline_events)
aggregations = None
serialized_timeline = serialize(timeline_events, aggregations)
account_data = room.account_data

View file

@ -759,7 +759,7 @@ class HomeServer(metaclass=abc.ABCMeta):
@cache_in_self
def get_event_client_serializer(self) -> EventClientSerializer:
return EventClientSerializer(self)
return EventClientSerializer()
@cache_in_self
def get_password_policy_handler(self) -> PasswordPolicyHandler:

View file

@ -13,14 +13,30 @@
# limitations under the License.
import logging
from typing import List, Optional, Tuple, Union, cast
from typing import (
TYPE_CHECKING,
Any,
Dict,
Iterable,
List,
Optional,
Tuple,
Union,
cast,
)
import attr
from frozendict import frozendict
from synapse.api.constants import RelationTypes
from synapse.api.constants import EventTypes, RelationTypes
from synapse.events import EventBase
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import LoggingTransaction, make_in_list_sql_clause
from synapse.storage.database import (
DatabasePool,
LoggingDatabaseConnection,
LoggingTransaction,
make_in_list_sql_clause,
)
from synapse.storage.databases.main.stream import generate_pagination_where_clause
from synapse.storage.relations import (
AggregationPaginationToken,
@ -29,10 +45,24 @@ from synapse.storage.relations import (
)
from synapse.util.caches.descriptors import cached
if TYPE_CHECKING:
from synapse.server import HomeServer
logger = logging.getLogger(__name__)
class RelationsWorkerStore(SQLBaseStore):
def __init__(
self,
database: DatabasePool,
db_conn: LoggingDatabaseConnection,
hs: "HomeServer",
):
super().__init__(database, db_conn, hs)
self._msc1849_enabled = hs.config.experimental.msc1849_enabled
self._msc3440_enabled = hs.config.experimental.msc3440_enabled
@cached(tree=True)
async def get_relations_for_event(
self,
@ -515,6 +545,98 @@ class RelationsWorkerStore(SQLBaseStore):
"get_if_user_has_annotated_event", _get_if_user_has_annotated_event
)
async def _get_bundled_aggregation_for_event(
self, event: EventBase
) -> Optional[Dict[str, Any]]:
"""Generate bundled aggregations for an event.
Note that this does not use a cache, but depends on cached methods.
Args:
event: The event to calculate bundled aggregations for.
Returns:
The bundled aggregations for an event, if bundled aggregations are
enabled and the event can have bundled aggregations.
"""
# State events and redacted events do not get bundled aggregations.
if event.is_state() or event.internal_metadata.is_redacted():
return None
# Do not bundle aggregations for an event which represents an edit or an
# annotation. It does not make sense for them to have related events.
relates_to = event.content.get("m.relates_to")
if isinstance(relates_to, (dict, frozendict)):
relation_type = relates_to.get("rel_type")
if relation_type in (RelationTypes.ANNOTATION, RelationTypes.REPLACE):
return None
event_id = event.event_id
room_id = event.room_id
# The bundled aggregations to include, a mapping of relation type to a
# type-specific value. Some types include the direct return type here
# while others need more processing during serialization.
aggregations: Dict[str, Any] = {}
annotations = await self.get_aggregation_groups_for_event(event_id, room_id)
if annotations.chunk:
aggregations[RelationTypes.ANNOTATION] = annotations.to_dict()
references = await self.get_relations_for_event(
event_id, room_id, RelationTypes.REFERENCE, direction="f"
)
if references.chunk:
aggregations[RelationTypes.REFERENCE] = references.to_dict()
edit = None
if event.type == EventTypes.Message:
edit = await self.get_applicable_edit(event_id, room_id)
if edit:
aggregations[RelationTypes.REPLACE] = edit
# If this event is the start of a thread, include a summary of the replies.
if self._msc3440_enabled:
(
thread_count,
latest_thread_event,
) = await self.get_thread_summary(event_id, room_id)
if latest_thread_event:
aggregations[RelationTypes.THREAD] = {
# Don't bundle aggregations as this could recurse forever.
"latest_event": latest_thread_event,
"count": thread_count,
}
# Store the bundled aggregations in the event metadata for later use.
return aggregations
async def get_bundled_aggregations(
self, events: Iterable[EventBase]
) -> Dict[str, Dict[str, Any]]:
"""Generate bundled aggregations for events.
Args:
events: The iterable of events to calculate bundled aggregations for.
Returns:
A map of event ID to the bundled aggregation for the event. Not all
events may have bundled aggregations in the results.
"""
# If bundled aggregations are disabled, nothing to do.
if not self._msc1849_enabled:
return {}
# TODO Parallelize.
results = {}
for event in events:
event_result = await self._get_bundled_aggregation_for_event(event)
if event_result is not None:
results[event.event_id] = event_result
return results
class RelationsStore(RelationsWorkerStore):
pass

View file

@ -228,7 +228,7 @@ class RetentionTestCase(unittest.HomeserverTestCase):
self.assertIsNotNone(event)
time_now = self.clock.time_msec()
serialized = self.get_success(self.serializer.serialize_event(event, time_now))
serialized = self.serializer.serialize_event(event, time_now)
return serialized