Add back concurrently_execute

This commit is contained in:
Erik Johnston 2016-05-23 18:21:27 +01:00
parent b5605dfecc
commit c0c79ef444

View file

@ -13,7 +13,6 @@
# See the License for the specific language governing permissions and # See the License for the specific language governing permissions and
# limitations under the License. # limitations under the License.
from synapse.streams.config import PaginationConfig
from synapse.api.constants import Membership, EventTypes from synapse.api.constants import Membership, EventTypes
from synapse.util.async import concurrently_execute from synapse.util.async import concurrently_execute
from synapse.util.logcontext import LoggingContext from synapse.util.logcontext import LoggingContext
@ -478,26 +477,6 @@ class SyncHandler(object):
for e in sync_config.filter_collection.filter_room_state(state.values()) for e in sync_config.filter_collection.filter_room_state(state.values())
}) })
def check_joined_room(self, sync_config, state_delta):
"""
Check if the user has just joined the given room (so should
be given the full state)
Args:
sync_config(synapse.handlers.sync.SyncConfig):
state_delta(dict[(str,str), synapse.events.FrozenEvent]): the
difference in state since the last sync
Returns:
A deferred Tuple (state_delta, limited)
"""
join_event = state_delta.get((
EventTypes.Member, sync_config.user.to_string()), None)
if join_event is not None:
if join_event.content["membership"] == Membership.JOIN:
return True
return False
@defer.inlineCallbacks @defer.inlineCallbacks
def unread_notifs_for_room_id(self, room_id, sync_config): def unread_notifs_for_room_id(self, room_id, sync_config):
with Measure(self.clock, "unread_notifs_for_room_id"): with Measure(self.clock, "unread_notifs_for_room_id"):
@ -664,8 +643,8 @@ class SyncHandler(object):
tags_by_room = yield self.store.get_tags_for_user(user_id) tags_by_room = yield self.store.get_tags_for_user(user_id)
for room_entry in joined: def handle_joined(room_entry):
yield self._generate_room_entry( return self._generate_room_entry(
"joined", "joined",
sync_result_builer, sync_result_builer,
ignored_users, ignored_users,
@ -675,8 +654,11 @@ class SyncHandler(object):
account_data=account_data_by_room.get(room_entry.room_id, {}), account_data=account_data_by_room.get(room_entry.room_id, {}),
always_include=sync_result_builer.full_state, always_include=sync_result_builer.full_state,
) )
for room_entry in archived:
yield self._generate_room_entry( yield concurrently_execute(handle_joined, joined, 10)
def handle_archived(room_entry):
return self._generate_room_entry(
"archived", "archived",
sync_result_builer, sync_result_builer,
ignored_users, ignored_users,
@ -687,6 +669,8 @@ class SyncHandler(object):
always_include=sync_result_builer.full_state, always_include=sync_result_builer.full_state,
) )
yield concurrently_execute(handle_archived, archived, 10)
sync_result_builer.invited.extend(invited) sync_result_builer.invited.extend(invited)
# Now we want to get any newly joined users # Now we want to get any newly joined users