Convert synapse.app to async/await. (#7868)

This commit is contained in:
Patrick Cloke 2020-07-17 07:08:56 -04:00 committed by GitHub
parent 6fca1b3506
commit 00e57b755c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 37 additions and 41 deletions

1
changelog.d/7868.misc Normal file
View file

@ -0,0 +1 @@
Convert synapse.app and federation client to async/await.

View file

@ -21,7 +21,7 @@ from typing import Dict, Iterable, Optional, Set
from typing_extensions import ContextManager from typing_extensions import ContextManager
from twisted.internet import address, defer, reactor from twisted.internet import address, reactor
import synapse import synapse
import synapse.events import synapse.events
@ -375,9 +375,8 @@ class GenericWorkerPresence(BasePresenceHandler):
return _user_syncing() return _user_syncing()
@defer.inlineCallbacks async def notify_from_replication(self, states, stream_id):
def notify_from_replication(self, states, stream_id): parties = await get_interested_parties(self.store, states)
parties = yield get_interested_parties(self.store, states)
room_ids_to_states, users_to_states = parties room_ids_to_states, users_to_states = parties
self.notifier.on_new_event( self.notifier.on_new_event(
@ -387,8 +386,7 @@ class GenericWorkerPresence(BasePresenceHandler):
users=users_to_states.keys(), users=users_to_states.keys(),
) )
@defer.inlineCallbacks async def process_replication_rows(self, token, rows):
def process_replication_rows(self, token, rows):
states = [ states = [
UserPresenceState( UserPresenceState(
row.user_id, row.user_id,
@ -406,7 +404,7 @@ class GenericWorkerPresence(BasePresenceHandler):
self.user_to_current_state[state.user_id] = state self.user_to_current_state[state.user_id] = state
stream_id = token stream_id = token
yield self.notify_from_replication(states, stream_id) await self.notify_from_replication(states, stream_id)
def get_currently_syncing_users_for_replication(self) -> Iterable[str]: def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
return [ return [

View file

@ -483,8 +483,7 @@ class SynapseService(service.Service):
_stats_process = [] _stats_process = []
@defer.inlineCallbacks async def phone_stats_home(hs, stats, stats_process=_stats_process):
def phone_stats_home(hs, stats, stats_process=_stats_process):
logger.info("Gathering stats for reporting") logger.info("Gathering stats for reporting")
now = int(hs.get_clock().time()) now = int(hs.get_clock().time())
uptime = int(now - hs.start_time) uptime = int(now - hs.start_time)
@ -522,28 +521,28 @@ def phone_stats_home(hs, stats, stats_process=_stats_process):
stats["python_version"] = "{}.{}.{}".format( stats["python_version"] = "{}.{}.{}".format(
version.major, version.minor, version.micro version.major, version.minor, version.micro
) )
stats["total_users"] = yield hs.get_datastore().count_all_users() stats["total_users"] = await hs.get_datastore().count_all_users()
total_nonbridged_users = yield hs.get_datastore().count_nonbridged_users() total_nonbridged_users = await hs.get_datastore().count_nonbridged_users()
stats["total_nonbridged_users"] = total_nonbridged_users stats["total_nonbridged_users"] = total_nonbridged_users
daily_user_type_results = yield hs.get_datastore().count_daily_user_type() daily_user_type_results = await hs.get_datastore().count_daily_user_type()
for name, count in daily_user_type_results.items(): for name, count in daily_user_type_results.items():
stats["daily_user_type_" + name] = count stats["daily_user_type_" + name] = count
room_count = yield hs.get_datastore().get_room_count() room_count = await hs.get_datastore().get_room_count()
stats["total_room_count"] = room_count stats["total_room_count"] = room_count
stats["daily_active_users"] = yield hs.get_datastore().count_daily_users() stats["daily_active_users"] = await hs.get_datastore().count_daily_users()
stats["monthly_active_users"] = yield hs.get_datastore().count_monthly_users() stats["monthly_active_users"] = await hs.get_datastore().count_monthly_users()
stats["daily_active_rooms"] = yield hs.get_datastore().count_daily_active_rooms() stats["daily_active_rooms"] = await hs.get_datastore().count_daily_active_rooms()
stats["daily_messages"] = yield hs.get_datastore().count_daily_messages() stats["daily_messages"] = await hs.get_datastore().count_daily_messages()
r30_results = yield hs.get_datastore().count_r30_users() r30_results = await hs.get_datastore().count_r30_users()
for name, count in r30_results.items(): for name, count in r30_results.items():
stats["r30_users_" + name] = count stats["r30_users_" + name] = count
daily_sent_messages = yield hs.get_datastore().count_daily_sent_messages() daily_sent_messages = await hs.get_datastore().count_daily_sent_messages()
stats["daily_sent_messages"] = daily_sent_messages stats["daily_sent_messages"] = daily_sent_messages
stats["cache_factor"] = hs.config.caches.global_factor stats["cache_factor"] = hs.config.caches.global_factor
stats["event_cache_size"] = hs.config.caches.event_cache_size stats["event_cache_size"] = hs.config.caches.event_cache_size
@ -558,7 +557,7 @@ def phone_stats_home(hs, stats, stats_process=_stats_process):
logger.info("Reporting stats to %s: %s" % (hs.config.report_stats_endpoint, stats)) logger.info("Reporting stats to %s: %s" % (hs.config.report_stats_endpoint, stats))
try: try:
yield hs.get_proxied_http_client().put_json( await hs.get_proxied_http_client().put_json(
hs.config.report_stats_endpoint, stats hs.config.report_stats_endpoint, stats
) )
except Exception as e: except Exception as e:

View file

@ -374,30 +374,27 @@ class FederationClient(FederationBase):
""" """
deferreds = self._check_sigs_and_hashes(room_version, pdus) deferreds = self._check_sigs_and_hashes(room_version, pdus)
@defer.inlineCallbacks async def handle_check_result(pdu: EventBase, deferred: Deferred):
def handle_check_result(pdu: EventBase, deferred: Deferred):
try: try:
res = yield make_deferred_yieldable(deferred) res = await make_deferred_yieldable(deferred)
except SynapseError: except SynapseError:
res = None res = None
if not res: if not res:
# Check local db. # Check local db.
res = yield self.store.get_event( res = await self.store.get_event(
pdu.event_id, allow_rejected=True, allow_none=True pdu.event_id, allow_rejected=True, allow_none=True
) )
if not res and pdu.origin != origin: if not res and pdu.origin != origin:
try: try:
res = yield defer.ensureDeferred( res = await self.get_pdu(
self.get_pdu(
destinations=[pdu.origin], destinations=[pdu.origin],
event_id=pdu.event_id, event_id=pdu.event_id,
room_version=room_version, room_version=room_version,
outlier=outlier, outlier=outlier,
timeout=10000, timeout=10000,
) )
)
except SynapseError: except SynapseError:
pass pass
@ -995,24 +992,25 @@ class FederationClient(FederationBase):
raise RuntimeError("Failed to send to any server.") raise RuntimeError("Failed to send to any server.")
@defer.inlineCallbacks async def get_room_complexity(
def get_room_complexity(self, destination, room_id): self, destination: str, room_id: str
) -> Optional[dict]:
""" """
Fetch the complexity of a remote room from another server. Fetch the complexity of a remote room from another server.
Args: Args:
destination (str): The remote server destination: The remote server
room_id (str): The room ID to ask about. room_id: The room ID to ask about.
Returns: Returns:
Deferred[dict] or Deferred[None]: Dict contains the complexity Dict contains the complexity metric versions, while None means we
metric versions, while None means we could not fetch the complexity. could not fetch the complexity.
""" """
try: try:
complexity = yield self.transport_layer.get_room_complexity( complexity = await self.transport_layer.get_room_complexity(
destination=destination, room_id=room_id destination=destination, room_id=room_id
) )
defer.returnValue(complexity) return complexity
except CodeMessageException as e: except CodeMessageException as e:
# We didn't manage to get it -- probably a 404. We are okay if other # We didn't manage to get it -- probably a 404. We are okay if other
# servers don't give it to us. # servers don't give it to us.
@ -1029,4 +1027,4 @@ class FederationClient(FederationBase):
# If we don't manage to find it, return None. It's not an error if a # If we don't manage to find it, return None. It's not an error if a
# server doesn't give it to us. # server doesn't give it to us.
defer.returnValue(None) return None