From f3db863420ff2093359d828839952950c0d15584 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Mon, 16 Oct 2023 17:55:05 +0100 Subject: [PATCH] TEMPORARY Subdivide _resolve_events Measure blocks --- synapse/state/v2.py | 171 ++++++++++++++++++++++++-------------------- 1 file changed, 92 insertions(+), 79 deletions(-) diff --git a/synapse/state/v2.py b/synapse/state/v2.py index b2e63aed1e..e224af8dd8 100644 --- a/synapse/state/v2.py +++ b/synapse/state/v2.py @@ -38,6 +38,7 @@ from synapse.api.errors import AuthError from synapse.api.room_versions import RoomVersion from synapse.events import EventBase from synapse.types import MutableStateMap, StateMap, StrCollection +from synapse.util.metrics import Measure logger = logging.getLogger(__name__) @@ -104,106 +105,118 @@ async def resolve_events_with_store( Returns: A map from (type, state_key) to event_id. """ + with Measure(clock, "rei_state_res:rews2_a"): # TODO temporary (rei) + logger.debug("Computing conflicted state") - logger.debug("Computing conflicted state") + # We use event_map as a cache, so if its None we need to initialize it + if event_map is None: + event_map = {} - # We use event_map as a cache, so if its None we need to initialize it - if event_map is None: - event_map = {} + # First split up the un/conflicted state + unconflicted_state, conflicted_state = _seperate(state_sets) - # First split up the un/conflicted state - unconflicted_state, conflicted_state = _seperate(state_sets) + if not conflicted_state: + return unconflicted_state - if not conflicted_state: - return unconflicted_state + logger.debug("%d conflicted state entries", len(conflicted_state)) + logger.debug("Calculating auth chain difference") - logger.debug("%d conflicted state entries", len(conflicted_state)) - logger.debug("Calculating auth chain difference") - - # Also fetch all auth events that appear in only some of the state sets' - # auth chains. - auth_diff = await _get_auth_chain_difference( - room_id, state_sets, event_map, state_res_store - ) - - full_conflicted_set = set( - itertools.chain( - itertools.chain.from_iterable(conflicted_state.values()), auth_diff + # Also fetch all auth events that appear in only some of the state sets' + # auth chains. + auth_diff = await _get_auth_chain_difference( + room_id, state_sets, event_map, state_res_store ) - ) - events = await state_res_store.get_events( - [eid for eid in full_conflicted_set if eid not in event_map], - allow_rejected=True, - ) - event_map.update(events) - - # everything in the event map should be in the right room - for event in event_map.values(): - if event.room_id != room_id: - raise Exception( - "Attempting to state-resolve for room %s with event %s which is in %s" - % ( - room_id, - event.event_id, - event.room_id, - ) + with Measure(clock, "rei_state_res:rews2_b"): # TODO temporary (rei) + full_conflicted_set = set( + itertools.chain( + itertools.chain.from_iterable(conflicted_state.values()), auth_diff ) + ) - full_conflicted_set = {eid for eid in full_conflicted_set if eid in event_map} + events = await state_res_store.get_events( + [eid for eid in full_conflicted_set if eid not in event_map], + allow_rejected=True, + ) + event_map.update(events) - logger.debug("%d full_conflicted_set entries", len(full_conflicted_set)) + with Measure(clock, "rei_state_res:rews2_c"): # TODO temporary (rei) + # everything in the event map should be in the right room + for event in event_map.values(): + if event.room_id != room_id: + raise Exception( + "Attempting to state-resolve for room %s with event %s which is in %s" + % ( + room_id, + event.event_id, + event.room_id, + ) + ) - # Get and sort all the power events (kicks/bans/etc) - power_events = ( - eid for eid in full_conflicted_set if _is_power_event(event_map[eid]) - ) + full_conflicted_set = {eid for eid in full_conflicted_set if eid in event_map} - sorted_power_events = await _reverse_topological_power_sort( - clock, room_id, power_events, event_map, state_res_store, full_conflicted_set - ) + logger.debug("%d full_conflicted_set entries", len(full_conflicted_set)) - logger.debug("sorted %d power events", len(sorted_power_events)) + # Get and sort all the power events (kicks/bans/etc) + power_events = ( + eid for eid in full_conflicted_set if _is_power_event(event_map[eid]) + ) - # Now sequentially auth each one - resolved_state = await _iterative_auth_checks( - clock, - room_id, - room_version, - sorted_power_events, - unconflicted_state, - event_map, - state_res_store, - ) + with Measure(clock, "rei_state_res:rews2_d"): # TODO temporary (rei) + sorted_power_events = await _reverse_topological_power_sort( + clock, + room_id, + power_events, + event_map, + state_res_store, + full_conflicted_set, + ) - logger.debug("resolved power events") + logger.debug("sorted %d power events", len(sorted_power_events)) - # OK, so we've now resolved the power events. Now sort the remaining - # events using the mainline of the resolved power level. + with Measure(clock, "rei_state_res:rews2_e"): # TODO temporary (rei) + # Now sequentially auth each one + resolved_state = await _iterative_auth_checks( + clock, + room_id, + room_version, + sorted_power_events, + unconflicted_state, + event_map, + state_res_store, + ) - set_power_events = set(sorted_power_events) - leftover_events = [ - ev_id for ev_id in full_conflicted_set if ev_id not in set_power_events - ] + logger.debug("resolved power events") - logger.debug("sorting %d remaining events", len(leftover_events)) + with Measure(clock, "rei_state_res:rews2_f"): # TODO temporary (rei) + # OK, so we've now resolved the power events. Now sort the remaining + # events using the mainline of the resolved power level. - pl = resolved_state.get((EventTypes.PowerLevels, ""), None) - leftover_events = await _mainline_sort( - clock, room_id, leftover_events, pl, event_map, state_res_store - ) + set_power_events = set(sorted_power_events) + leftover_events = [ + ev_id for ev_id in full_conflicted_set if ev_id not in set_power_events + ] - logger.debug("resolving remaining events") + logger.debug("sorting %d remaining events", len(leftover_events)) - resolved_state = await _iterative_auth_checks( - clock, - room_id, - room_version, - leftover_events, - resolved_state, - event_map, - state_res_store, - ) + with Measure(clock, "rei_state_res:rews2_g"): # TODO temporary (rei) + pl = resolved_state.get((EventTypes.PowerLevels, ""), None) + leftover_events = await _mainline_sort( + clock, room_id, leftover_events, pl, event_map, state_res_store + ) + + with Measure(clock, "rei_state_res:rews2_h"): # TODO temporary (rei) + logger.debug("resolving remaining events") + + resolved_state = await _iterative_auth_checks( + clock, + room_id, + room_version, + leftover_events, + resolved_state, + event_map, + state_res_store, + ) logger.debug("resolved")