mirror of
https://github.com/element-hq/synapse.git
synced 2024-11-28 23:20:09 +03:00
Instrument /messages
for understandable traces in Jaeger (#13368)
In Jaeger: - Before: huge list of uncategorized database calls - After: nice and collapsible into units of work
This commit is contained in:
parent
78a3111c41
commit
92d21faf12
11 changed files with 32 additions and 1 deletions
1
changelog.d/13368.misc
Normal file
1
changelog.d/13368.misc
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Instrument `/messages` for understandable traces in Jaeger.
|
|
@ -31,7 +31,12 @@ from synapse.api.errors import (
|
||||||
from synapse.appservice import ApplicationService
|
from synapse.appservice import ApplicationService
|
||||||
from synapse.http import get_request_user_agent
|
from synapse.http import get_request_user_agent
|
||||||
from synapse.http.site import SynapseRequest
|
from synapse.http.site import SynapseRequest
|
||||||
from synapse.logging.opentracing import active_span, force_tracing, start_active_span
|
from synapse.logging.opentracing import (
|
||||||
|
active_span,
|
||||||
|
force_tracing,
|
||||||
|
start_active_span,
|
||||||
|
trace,
|
||||||
|
)
|
||||||
from synapse.storage.databases.main.registration import TokenLookupResult
|
from synapse.storage.databases.main.registration import TokenLookupResult
|
||||||
from synapse.types import Requester, UserID, create_requester
|
from synapse.types import Requester, UserID, create_requester
|
||||||
|
|
||||||
|
@ -567,6 +572,7 @@ class Auth:
|
||||||
|
|
||||||
return query_params[0].decode("ascii")
|
return query_params[0].decode("ascii")
|
||||||
|
|
||||||
|
@trace
|
||||||
async def check_user_in_room_or_world_readable(
|
async def check_user_in_room_or_world_readable(
|
||||||
self, room_id: str, user_id: str, allow_departed_users: bool = False
|
self, room_id: str, user_id: str, allow_departed_users: bool = False
|
||||||
) -> Tuple[str, Optional[str]]:
|
) -> Tuple[str, Optional[str]]:
|
||||||
|
|
|
@ -61,6 +61,7 @@ from synapse.federation.federation_base import (
|
||||||
)
|
)
|
||||||
from synapse.federation.transport.client import SendJoinResponse
|
from synapse.federation.transport.client import SendJoinResponse
|
||||||
from synapse.http.types import QueryParams
|
from synapse.http.types import QueryParams
|
||||||
|
from synapse.logging.opentracing import trace
|
||||||
from synapse.types import JsonDict, UserID, get_domain_from_id
|
from synapse.types import JsonDict, UserID, get_domain_from_id
|
||||||
from synapse.util.async_helpers import concurrently_execute
|
from synapse.util.async_helpers import concurrently_execute
|
||||||
from synapse.util.caches.expiringcache import ExpiringCache
|
from synapse.util.caches.expiringcache import ExpiringCache
|
||||||
|
@ -233,6 +234,7 @@ class FederationClient(FederationBase):
|
||||||
destination, content, timeout
|
destination, content, timeout
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@trace
|
||||||
async def backfill(
|
async def backfill(
|
||||||
self, dest: str, room_id: str, limit: int, extremities: Collection[str]
|
self, dest: str, room_id: str, limit: int, extremities: Collection[str]
|
||||||
) -> Optional[List[EventBase]]:
|
) -> Optional[List[EventBase]]:
|
||||||
|
|
|
@ -59,6 +59,7 @@ from synapse.events.validator import EventValidator
|
||||||
from synapse.federation.federation_client import InvalidResponseError
|
from synapse.federation.federation_client import InvalidResponseError
|
||||||
from synapse.http.servlet import assert_params_in_dict
|
from synapse.http.servlet import assert_params_in_dict
|
||||||
from synapse.logging.context import nested_logging_context
|
from synapse.logging.context import nested_logging_context
|
||||||
|
from synapse.logging.opentracing import trace
|
||||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||||
from synapse.module_api import NOT_SPAM
|
from synapse.module_api import NOT_SPAM
|
||||||
from synapse.replication.http.federation import (
|
from synapse.replication.http.federation import (
|
||||||
|
@ -180,6 +181,7 @@ class FederationHandler:
|
||||||
"resume_sync_partial_state_room", self._resume_sync_partial_state_room
|
"resume_sync_partial_state_room", self._resume_sync_partial_state_room
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@trace
|
||||||
async def maybe_backfill(
|
async def maybe_backfill(
|
||||||
self, room_id: str, current_depth: int, limit: int
|
self, room_id: str, current_depth: int, limit: int
|
||||||
) -> bool:
|
) -> bool:
|
||||||
|
|
|
@ -59,6 +59,7 @@ from synapse.events import EventBase
|
||||||
from synapse.events.snapshot import EventContext
|
from synapse.events.snapshot import EventContext
|
||||||
from synapse.federation.federation_client import InvalidResponseError
|
from synapse.federation.federation_client import InvalidResponseError
|
||||||
from synapse.logging.context import nested_logging_context
|
from synapse.logging.context import nested_logging_context
|
||||||
|
from synapse.logging.opentracing import trace
|
||||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||||
from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
|
from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
|
||||||
from synapse.replication.http.federation import (
|
from synapse.replication.http.federation import (
|
||||||
|
@ -566,6 +567,7 @@ class FederationEventHandler:
|
||||||
event.event_id
|
event.event_id
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@trace
|
||||||
async def backfill(
|
async def backfill(
|
||||||
self, dest: str, room_id: str, limit: int, extremities: Collection[str]
|
self, dest: str, room_id: str, limit: int, extremities: Collection[str]
|
||||||
) -> None:
|
) -> None:
|
||||||
|
@ -610,6 +612,7 @@ class FederationEventHandler:
|
||||||
backfilled=True,
|
backfilled=True,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@trace
|
||||||
async def _get_missing_events_for_pdu(
|
async def _get_missing_events_for_pdu(
|
||||||
self, origin: str, pdu: EventBase, prevs: Set[str], min_depth: int
|
self, origin: str, pdu: EventBase, prevs: Set[str], min_depth: int
|
||||||
) -> None:
|
) -> None:
|
||||||
|
@ -710,6 +713,7 @@ class FederationEventHandler:
|
||||||
logger.info("Got %d prev_events", len(missing_events))
|
logger.info("Got %d prev_events", len(missing_events))
|
||||||
await self._process_pulled_events(origin, missing_events, backfilled=False)
|
await self._process_pulled_events(origin, missing_events, backfilled=False)
|
||||||
|
|
||||||
|
@trace
|
||||||
async def _process_pulled_events(
|
async def _process_pulled_events(
|
||||||
self, origin: str, events: Iterable[EventBase], backfilled: bool
|
self, origin: str, events: Iterable[EventBase], backfilled: bool
|
||||||
) -> None:
|
) -> None:
|
||||||
|
@ -748,6 +752,7 @@ class FederationEventHandler:
|
||||||
with nested_logging_context(ev.event_id):
|
with nested_logging_context(ev.event_id):
|
||||||
await self._process_pulled_event(origin, ev, backfilled=backfilled)
|
await self._process_pulled_event(origin, ev, backfilled=backfilled)
|
||||||
|
|
||||||
|
@trace
|
||||||
async def _process_pulled_event(
|
async def _process_pulled_event(
|
||||||
self, origin: str, event: EventBase, backfilled: bool
|
self, origin: str, event: EventBase, backfilled: bool
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
|
@ -24,6 +24,7 @@ from synapse.api.errors import SynapseError
|
||||||
from synapse.api.filtering import Filter
|
from synapse.api.filtering import Filter
|
||||||
from synapse.events.utils import SerializeEventConfig
|
from synapse.events.utils import SerializeEventConfig
|
||||||
from synapse.handlers.room import ShutdownRoomResponse
|
from synapse.handlers.room import ShutdownRoomResponse
|
||||||
|
from synapse.logging.opentracing import trace
|
||||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||||
from synapse.storage.state import StateFilter
|
from synapse.storage.state import StateFilter
|
||||||
from synapse.streams.config import PaginationConfig
|
from synapse.streams.config import PaginationConfig
|
||||||
|
@ -416,6 +417,7 @@ class PaginationHandler:
|
||||||
|
|
||||||
await self._storage_controllers.purge_events.purge_room(room_id)
|
await self._storage_controllers.purge_events.purge_room(room_id)
|
||||||
|
|
||||||
|
@trace
|
||||||
async def get_messages(
|
async def get_messages(
|
||||||
self,
|
self,
|
||||||
requester: Requester,
|
requester: Requester,
|
||||||
|
|
|
@ -19,6 +19,7 @@ import attr
|
||||||
from synapse.api.constants import RelationTypes
|
from synapse.api.constants import RelationTypes
|
||||||
from synapse.api.errors import SynapseError
|
from synapse.api.errors import SynapseError
|
||||||
from synapse.events import EventBase, relation_from_event
|
from synapse.events import EventBase, relation_from_event
|
||||||
|
from synapse.logging.opentracing import trace
|
||||||
from synapse.storage.databases.main.relations import _RelatedEvent
|
from synapse.storage.databases.main.relations import _RelatedEvent
|
||||||
from synapse.types import JsonDict, Requester, StreamToken, UserID
|
from synapse.types import JsonDict, Requester, StreamToken, UserID
|
||||||
from synapse.visibility import filter_events_for_client
|
from synapse.visibility import filter_events_for_client
|
||||||
|
@ -361,6 +362,7 @@ class RelationsHandler:
|
||||||
|
|
||||||
return results
|
return results
|
||||||
|
|
||||||
|
@trace
|
||||||
async def get_bundled_aggregations(
|
async def get_bundled_aggregations(
|
||||||
self, events: Iterable[EventBase], user_id: str
|
self, events: Iterable[EventBase], user_id: str
|
||||||
) -> Dict[str, BundledAggregations]:
|
) -> Dict[str, BundledAggregations]:
|
||||||
|
|
|
@ -29,6 +29,7 @@ from typing import (
|
||||||
|
|
||||||
from synapse.api.constants import EventTypes
|
from synapse.api.constants import EventTypes
|
||||||
from synapse.events import EventBase
|
from synapse.events import EventBase
|
||||||
|
from synapse.logging.opentracing import trace
|
||||||
from synapse.storage.state import StateFilter
|
from synapse.storage.state import StateFilter
|
||||||
from synapse.storage.util.partial_state_events_tracker import (
|
from synapse.storage.util.partial_state_events_tracker import (
|
||||||
PartialCurrentStateTracker,
|
PartialCurrentStateTracker,
|
||||||
|
@ -179,6 +180,7 @@ class StateStorageController:
|
||||||
|
|
||||||
return self.stores.state._get_state_groups_from_groups(groups, state_filter)
|
return self.stores.state._get_state_groups_from_groups(groups, state_filter)
|
||||||
|
|
||||||
|
@trace
|
||||||
async def get_state_for_events(
|
async def get_state_for_events(
|
||||||
self, event_ids: Collection[str], state_filter: Optional[StateFilter] = None
|
self, event_ids: Collection[str], state_filter: Optional[StateFilter] = None
|
||||||
) -> Dict[str, StateMap[EventBase]]:
|
) -> Dict[str, StateMap[EventBase]]:
|
||||||
|
@ -225,6 +227,7 @@ class StateStorageController:
|
||||||
|
|
||||||
return {event: event_to_state[event] for event in event_ids}
|
return {event: event_to_state[event] for event in event_ids}
|
||||||
|
|
||||||
|
@trace
|
||||||
async def get_state_ids_for_events(
|
async def get_state_ids_for_events(
|
||||||
self,
|
self,
|
||||||
event_ids: Collection[str],
|
event_ids: Collection[str],
|
||||||
|
@ -287,6 +290,7 @@ class StateStorageController:
|
||||||
)
|
)
|
||||||
return state_map[event_id]
|
return state_map[event_id]
|
||||||
|
|
||||||
|
@trace
|
||||||
async def get_state_ids_for_event(
|
async def get_state_ids_for_event(
|
||||||
self, event_id: str, state_filter: Optional[StateFilter] = None
|
self, event_id: str, state_filter: Optional[StateFilter] = None
|
||||||
) -> StateMap[str]:
|
) -> StateMap[str]:
|
||||||
|
@ -327,6 +331,7 @@ class StateStorageController:
|
||||||
groups, state_filter or StateFilter.all()
|
groups, state_filter or StateFilter.all()
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@trace
|
||||||
async def get_state_group_for_events(
|
async def get_state_group_for_events(
|
||||||
self,
|
self,
|
||||||
event_ids: Collection[str],
|
event_ids: Collection[str],
|
||||||
|
|
|
@ -58,6 +58,7 @@ from twisted.internet import defer
|
||||||
from synapse.api.filtering import Filter
|
from synapse.api.filtering import Filter
|
||||||
from synapse.events import EventBase
|
from synapse.events import EventBase
|
||||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||||
|
from synapse.logging.opentracing import trace
|
||||||
from synapse.storage._base import SQLBaseStore
|
from synapse.storage._base import SQLBaseStore
|
||||||
from synapse.storage.database import (
|
from synapse.storage.database import (
|
||||||
DatabasePool,
|
DatabasePool,
|
||||||
|
@ -1346,6 +1347,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
|
||||||
|
|
||||||
return rows, next_token
|
return rows, next_token
|
||||||
|
|
||||||
|
@trace
|
||||||
async def paginate_room_events(
|
async def paginate_room_events(
|
||||||
self,
|
self,
|
||||||
room_id: str,
|
room_id: str,
|
||||||
|
|
|
@ -21,6 +21,7 @@ from synapse.handlers.presence import PresenceEventSource
|
||||||
from synapse.handlers.receipts import ReceiptEventSource
|
from synapse.handlers.receipts import ReceiptEventSource
|
||||||
from synapse.handlers.room import RoomEventSource
|
from synapse.handlers.room import RoomEventSource
|
||||||
from synapse.handlers.typing import TypingNotificationEventSource
|
from synapse.handlers.typing import TypingNotificationEventSource
|
||||||
|
from synapse.logging.opentracing import trace
|
||||||
from synapse.streams import EventSource
|
from synapse.streams import EventSource
|
||||||
from synapse.types import StreamToken
|
from synapse.types import StreamToken
|
||||||
|
|
||||||
|
@ -69,6 +70,7 @@ class EventSources:
|
||||||
)
|
)
|
||||||
return token
|
return token
|
||||||
|
|
||||||
|
@trace
|
||||||
async def get_current_token_for_pagination(self, room_id: str) -> StreamToken:
|
async def get_current_token_for_pagination(self, room_id: str) -> StreamToken:
|
||||||
"""Get the current token for a given room to be used to paginate
|
"""Get the current token for a given room to be used to paginate
|
||||||
events.
|
events.
|
||||||
|
|
|
@ -23,6 +23,7 @@ from synapse.api.constants import EventTypes, HistoryVisibility, Membership
|
||||||
from synapse.events import EventBase
|
from synapse.events import EventBase
|
||||||
from synapse.events.snapshot import EventContext
|
from synapse.events.snapshot import EventContext
|
||||||
from synapse.events.utils import prune_event
|
from synapse.events.utils import prune_event
|
||||||
|
from synapse.logging.opentracing import trace
|
||||||
from synapse.storage.controllers import StorageControllers
|
from synapse.storage.controllers import StorageControllers
|
||||||
from synapse.storage.databases.main import DataStore
|
from synapse.storage.databases.main import DataStore
|
||||||
from synapse.storage.state import StateFilter
|
from synapse.storage.state import StateFilter
|
||||||
|
@ -51,6 +52,7 @@ MEMBERSHIP_PRIORITY = (
|
||||||
_HISTORY_VIS_KEY: Final[Tuple[str, str]] = (EventTypes.RoomHistoryVisibility, "")
|
_HISTORY_VIS_KEY: Final[Tuple[str, str]] = (EventTypes.RoomHistoryVisibility, "")
|
||||||
|
|
||||||
|
|
||||||
|
@trace
|
||||||
async def filter_events_for_client(
|
async def filter_events_for_client(
|
||||||
storage: StorageControllers,
|
storage: StorageControllers,
|
||||||
user_id: str,
|
user_id: str,
|
||||||
|
|
Loading…
Reference in a new issue