Merge pull request #2224 from matrix-org/erikj/prefill_state

Prefill state caches
This commit is contained in:
Erik Johnston 2017-05-16 15:50:11 +01:00 committed by GitHub
commit b8492b6c2f
3 changed files with 26 additions and 9 deletions

View file

@ -60,12 +60,12 @@ class LoggingTransaction(object):
object.__setattr__(self, "database_engine", database_engine) object.__setattr__(self, "database_engine", database_engine)
object.__setattr__(self, "after_callbacks", after_callbacks) object.__setattr__(self, "after_callbacks", after_callbacks)
def call_after(self, callback, *args): def call_after(self, callback, *args, **kwargs):
"""Call the given callback on the main twisted thread after the """Call the given callback on the main twisted thread after the
transaction has finished. Used to invalidate the caches on the transaction has finished. Used to invalidate the caches on the
correct thread. correct thread.
""" """
self.after_callbacks.append((callback, args)) self.after_callbacks.append((callback, args, kwargs))
def __getattr__(self, name): def __getattr__(self, name):
return getattr(self.txn, name) return getattr(self.txn, name)
@ -319,8 +319,8 @@ class SQLBaseStore(object):
inner_func, *args, **kwargs inner_func, *args, **kwargs
) )
finally: finally:
for after_callback, after_args in after_callbacks: for after_callback, after_args, after_kwargs in after_callbacks:
after_callback(*after_args) after_callback(*after_args, **after_kwargs)
defer.returnValue(result) defer.returnValue(result)
@defer.inlineCallbacks @defer.inlineCallbacks

View file

@ -399,6 +399,11 @@ class EventsStore(SQLBaseStore):
event_counter.inc(event.type, origin_type, origin_entity) event_counter.inc(event.type, origin_type, origin_entity)
for room_id, (_, _, new_state) in current_state_for_room.iteritems():
self.get_current_state_ids.prefill(
(room_id, ), new_state
)
@defer.inlineCallbacks @defer.inlineCallbacks
def _calculate_new_extremeties(self, room_id, event_contexts, latest_event_ids): def _calculate_new_extremeties(self, room_id, event_contexts, latest_event_ids):
"""Calculates the new forward extremeties for a room given events to """Calculates the new forward extremeties for a room given events to
@ -447,10 +452,10 @@ class EventsStore(SQLBaseStore):
Assumes that we are only persisting events for one room at a time. Assumes that we are only persisting events for one room at a time.
Returns: Returns:
2-tuple (to_delete, to_insert) where both are state dicts, i.e. 3-tuple (to_delete, to_insert, new_state) where both are state dicts,
(type, state_key) -> event_id. `to_delete` are the entries to i.e. (type, state_key) -> event_id. `to_delete` are the entries to
first be deleted from current_state_events, `to_insert` are entries first be deleted from current_state_events, `to_insert` are entries
to insert. to insert. `new_state` is the full set of state.
May return None if there are no changes to be applied. May return None if there are no changes to be applied.
""" """
# Now we need to work out the different state sets for # Now we need to work out the different state sets for
@ -557,7 +562,7 @@ class EventsStore(SQLBaseStore):
if ev_id in events_to_insert if ev_id in events_to_insert
} }
defer.returnValue((to_delete, to_insert)) defer.returnValue((to_delete, to_insert, current_state))
@defer.inlineCallbacks @defer.inlineCallbacks
def get_event(self, event_id, check_redacted=True, def get_event(self, event_id, check_redacted=True,
@ -710,7 +715,7 @@ class EventsStore(SQLBaseStore):
def _update_current_state_txn(self, txn, state_delta_by_room): def _update_current_state_txn(self, txn, state_delta_by_room):
for room_id, current_state_tuple in state_delta_by_room.iteritems(): for room_id, current_state_tuple in state_delta_by_room.iteritems():
to_delete, to_insert = current_state_tuple to_delete, to_insert, _ = current_state_tuple
txn.executemany( txn.executemany(
"DELETE FROM current_state_events WHERE event_id = ?", "DELETE FROM current_state_events WHERE event_id = ?",
[(ev_id,) for ev_id in to_delete.itervalues()], [(ev_id,) for ev_id in to_delete.itervalues()],

View file

@ -227,6 +227,18 @@ class StateStore(SQLBaseStore):
], ],
) )
# Prefill the state group cache with this group.
# It's fine to use the sequence like this as the state group map
# is immutable. (If the map wasn't immutable then this prefill could
# race with another update)
txn.call_after(
self._state_group_cache.update,
self._state_group_cache.sequence,
key=context.state_group,
value=dict(context.current_state_ids),
full=True,
)
self._simple_insert_many_txn( self._simple_insert_many_txn(
txn, txn,
table="event_to_state_groups", table="event_to_state_groups",