Make handlers.sync return a state dictionary, instead of an event list.

Basically this moves the process of flattening the existing dictionary into a
list up to rest.client.*, instead of doing it in handlers.sync. This simplifies
a bit of the code in handlers.sync, but it is also going to be somewhat
beneficial in the next stage of my hacking on SPEC-254.

Merged from PR #371
This commit is contained in:
Richard van der Hoff 2015-11-12 16:34:42 +00:00
parent 5dea4d37d1
commit 5ab4b0afe8
2 changed files with 40 additions and 32 deletions

View file

@ -49,7 +49,7 @@ class TimelineBatch(collections.namedtuple("TimelineBatch", [
class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
"room_id", # str "room_id", # str
"timeline", # TimelineBatch "timeline", # TimelineBatch
"state", # list[FrozenEvent] "state", # dict[(str, str), FrozenEvent]
"ephemeral", "ephemeral",
"private_user_data", "private_user_data",
])): ])):
@ -70,7 +70,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
class ArchivedSyncResult(collections.namedtuple("JoinedSyncResult", [ class ArchivedSyncResult(collections.namedtuple("JoinedSyncResult", [
"room_id", # str "room_id", # str
"timeline", # TimelineBatch "timeline", # TimelineBatch
"state", # list[FrozenEvent] "state", # dict[(str, str), FrozenEvent]
"private_user_data", "private_user_data",
])): ])):
__slots__ = [] __slots__ = []
@ -257,12 +257,11 @@ class SyncHandler(BaseHandler):
current_state = yield self.state_handler.get_current_state( current_state = yield self.state_handler.get_current_state(
room_id room_id
) )
current_state_events = current_state.values()
defer.returnValue(JoinedSyncResult( defer.returnValue(JoinedSyncResult(
room_id=room_id, room_id=room_id,
timeline=batch, timeline=batch,
state=current_state_events, state=current_state,
ephemeral=ephemeral_by_room.get(room_id, []), ephemeral=ephemeral_by_room.get(room_id, []),
private_user_data=self.private_user_data_for_room( private_user_data=self.private_user_data_for_room(
room_id, tags_by_room room_id, tags_by_room
@ -361,7 +360,7 @@ class SyncHandler(BaseHandler):
defer.returnValue(ArchivedSyncResult( defer.returnValue(ArchivedSyncResult(
room_id=room_id, room_id=room_id,
timeline=batch, timeline=batch,
state=leave_state[leave_event_id].values(), state=leave_state[leave_event_id],
private_user_data=self.private_user_data_for_room( private_user_data=self.private_user_data_for_room(
room_id, tags_by_room room_id, tags_by_room
), ),
@ -440,7 +439,10 @@ class SyncHandler(BaseHandler):
for room_id in joined_room_ids: for room_id in joined_room_ids:
recents = events_by_room_id.get(room_id, []) recents = events_by_room_id.get(room_id, [])
state = [event for event in recents if event.is_state()] state = {
(event.type, event.state_key): event
for event in recents if event.is_state()}
if recents: if recents:
prev_batch = now_token.copy_and_replace( prev_batch = now_token.copy_and_replace(
"room_key", recents[0].internal_metadata.before "room_key", recents[0].internal_metadata.before
@ -575,7 +577,6 @@ class SyncHandler(BaseHandler):
current_state = yield self.state_handler.get_current_state( current_state = yield self.state_handler.get_current_state(
room_id room_id
) )
current_state_events = current_state.values()
state_at_previous_sync = yield self.get_state_at_previous_sync( state_at_previous_sync = yield self.get_state_at_previous_sync(
room_id, since_token=since_token room_id, since_token=since_token
@ -584,7 +585,7 @@ class SyncHandler(BaseHandler):
state_events_delta = yield self.compute_state_delta( state_events_delta = yield self.compute_state_delta(
since_token=since_token, since_token=since_token,
previous_state=state_at_previous_sync, previous_state=state_at_previous_sync,
current_state=current_state_events, current_state=current_state,
) )
state_events_delta, _ = yield self.check_joined_room( state_events_delta, _ = yield self.check_joined_room(
@ -632,7 +633,7 @@ class SyncHandler(BaseHandler):
[leave_event.event_id], None [leave_event.event_id], None
) )
state_events_at_leave = leave_state[leave_event.event_id].values() state_events_at_leave = leave_state[leave_event.event_id]
state_at_previous_sync = yield self.get_state_at_previous_sync( state_at_previous_sync = yield self.get_state_at_previous_sync(
leave_event.room_id, since_token=since_token leave_event.room_id, since_token=since_token
@ -661,7 +662,7 @@ class SyncHandler(BaseHandler):
def get_state_at_previous_sync(self, room_id, since_token): def get_state_at_previous_sync(self, room_id, since_token):
""" Get the room state at the previous sync the client made. """ Get the room state at the previous sync the client made.
Returns: Returns:
A Deferred list of Events. A Deferred map from ((type, state_key)->Event)
""" """
last_events, token = yield self.store.get_recent_events_for_room( last_events, token = yield self.store.get_recent_events_for_room(
room_id, end_token=since_token.room_key, limit=1, room_id, end_token=since_token.room_key, limit=1,
@ -673,11 +674,12 @@ class SyncHandler(BaseHandler):
last_event last_event
) )
if last_event.is_state(): if last_event.is_state():
state = [last_event] + last_context.current_state.values() state = last_context.current_state.copy()
state[(last_event.type, last_event.state_key)] = last_event
else: else:
state = last_context.current_state.values() state = last_context.current_state
else: else:
state = () state = {}
defer.returnValue(state) defer.returnValue(state)
def compute_state_delta(self, since_token, previous_state, current_state): def compute_state_delta(self, since_token, previous_state, current_state):
@ -685,21 +687,23 @@ class SyncHandler(BaseHandler):
state the client got when it last performed a sync. state the client got when it last performed a sync.
:param str since_token: the point we are comparing against :param str since_token: the point we are comparing against
:param list[synapse.events.FrozenEvent] previous_state: the state to :param dict[(str,str), synapse.events.FrozenEvent] previous_state: the
compare to state to compare to
:param list[synapse.events.FrozenEvent] current_state: the new state :param dict[(str,str), synapse.events.FrozenEvent] current_state: the
new state
:returns: A list of events. :returns A new event dictionary
""" """
# TODO(mjark) Check if the state events were received by the server # TODO(mjark) Check if the state events were received by the server
# after the previous sync, since we need to include those state # after the previous sync, since we need to include those state
# updates even if they occured logically before the previous event. # updates even if they occured logically before the previous event.
# TODO(mjark) Check for new redactions in the state events. # TODO(mjark) Check for new redactions in the state events.
previous_dict = {event.event_id: event for event in previous_state}
state_delta = [] state_delta = {}
for event in current_state: for key, event in current_state.iteritems():
if event.event_id not in previous_dict: if (key not in previous_state or
state_delta.append(event) previous_state[key].event_id != event.event_id):
state_delta[key] = event
return state_delta return state_delta
@defer.inlineCallbacks @defer.inlineCallbacks
@ -708,21 +712,25 @@ class SyncHandler(BaseHandler):
Check if the user has just joined the given room. If so, return the Check if the user has just joined the given room. If so, return the
full state for the room, instead of the delta since the last sync. full state for the room, instead of the delta since the last sync.
:param sync_config:
:param room_id:
:param dict[(str,str), synapse.events.FrozenEvent] state_delta: the
difference in state since the last sync
:returns A deferred Tuple (state_delta, limited) :returns A deferred Tuple (state_delta, limited)
""" """
joined = False joined = False
limited = False limited = False
for event in state_delta:
if ( join_event = state_delta.get((
event.type == EventTypes.Member EventTypes.Member, sync_config.user.to_string()), None)
and event.state_key == sync_config.user.to_string() if join_event is not None:
): if join_event.content["membership"] == Membership.JOIN:
if event.content["membership"] == Membership.JOIN: joined = True
joined = True
if joined: if joined:
res = yield self.state_handler.get_current_state(room_id) state_delta = yield self.state_handler.get_current_state(room_id)
state_delta = res.values() # the timeline is inherently limited if we've just joined
limited = True limited = True
defer.returnValue((state_delta, limited)) defer.returnValue((state_delta, limited))

View file

@ -256,7 +256,7 @@ class SyncRestServlet(RestServlet):
:rtype: dict[str, object] :rtype: dict[str, object]
""" """
event_map = {} event_map = {}
state_events = filter.filter_room_state(room.state) state_events = filter.filter_room_state(room.state.values())
state_event_ids = [] state_event_ids = []
for event in state_events: for event in state_events:
# TODO(mjark): Respect formatting requirements in the filter. # TODO(mjark): Respect formatting requirements in the filter.