mirror of
https://github.com/element-hq/synapse.git
synced 2024-11-27 03:58:06 +03:00
Merge pull request #3497 from matrix-org/rav/measure_fetch_event_loop
Add CPU metrics for _fetch_event_list
This commit is contained in:
commit
b1fe697b3c
2 changed files with 33 additions and 19 deletions
1
changelog.d/3497.feature
Normal file
1
changelog.d/3497.feature
Normal file
|
@ -0,0 +1 @@
|
|||
Add CPU metrics for _fetch_event_list
|
|
@ -222,25 +222,39 @@ class EventsWorkerStore(SQLBaseStore):
|
|||
"""Takes a database connection and waits for requests for events from
|
||||
the _event_fetch_list queue.
|
||||
"""
|
||||
event_list = []
|
||||
i = 0
|
||||
while True:
|
||||
with self._event_fetch_lock:
|
||||
event_list = self._event_fetch_list
|
||||
self._event_fetch_list = []
|
||||
|
||||
if not event_list:
|
||||
single_threaded = self.database_engine.single_threaded
|
||||
if single_threaded or i > EVENT_QUEUE_ITERATIONS:
|
||||
self._event_fetch_ongoing -= 1
|
||||
return
|
||||
else:
|
||||
self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S)
|
||||
i += 1
|
||||
continue
|
||||
i = 0
|
||||
|
||||
self._fetch_event_list(conn, event_list)
|
||||
|
||||
def _fetch_event_list(self, conn, event_list):
|
||||
"""Handle a load of requests from the _event_fetch_list queue
|
||||
|
||||
Args:
|
||||
conn (twisted.enterprise.adbapi.Connection): database connection
|
||||
|
||||
event_list (list[Tuple[list[str], Deferred]]):
|
||||
The fetch requests. Each entry consists of a list of event
|
||||
ids to be fetched, and a deferred to be completed once the
|
||||
events have been fetched.
|
||||
|
||||
"""
|
||||
with Measure(self._clock, "_fetch_event_list"):
|
||||
try:
|
||||
with self._event_fetch_lock:
|
||||
event_list = self._event_fetch_list
|
||||
self._event_fetch_list = []
|
||||
|
||||
if not event_list:
|
||||
single_threaded = self.database_engine.single_threaded
|
||||
if single_threaded or i > EVENT_QUEUE_ITERATIONS:
|
||||
self._event_fetch_ongoing -= 1
|
||||
return
|
||||
else:
|
||||
self._event_fetch_lock.wait(EVENT_QUEUE_TIMEOUT_S)
|
||||
i += 1
|
||||
continue
|
||||
i = 0
|
||||
|
||||
event_id_lists = zip(*event_list)[0]
|
||||
event_ids = [
|
||||
item for sublist in event_id_lists for item in sublist
|
||||
|
@ -280,9 +294,8 @@ class EventsWorkerStore(SQLBaseStore):
|
|||
with PreserveLoggingContext():
|
||||
d.errback(e)
|
||||
|
||||
if event_list:
|
||||
with PreserveLoggingContext():
|
||||
self.hs.get_reactor().callFromThread(fire, event_list)
|
||||
with PreserveLoggingContext():
|
||||
self.hs.get_reactor().callFromThread(fire, event_list)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _enqueue_events(self, events, check_redacted=True, allow_rejected=False):
|
||||
|
|
Loading…
Reference in a new issue