mirror of
https://github.com/element-hq/synapse.git
synced 2024-12-21 12:14:29 +03:00
WIP experiment in lazyloading room members
This commit is contained in:
parent
735fd8719a
commit
9b334b3f97
2 changed files with 73 additions and 17 deletions
|
@ -399,7 +399,7 @@ class SyncHandler(object):
|
||||||
))
|
))
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_state_after_event(self, event):
|
def get_state_after_event(self, event, types=None):
|
||||||
"""
|
"""
|
||||||
Get the room state after the given event
|
Get the room state after the given event
|
||||||
|
|
||||||
|
@ -409,14 +409,14 @@ class SyncHandler(object):
|
||||||
Returns:
|
Returns:
|
||||||
A Deferred map from ((type, state_key)->Event)
|
A Deferred map from ((type, state_key)->Event)
|
||||||
"""
|
"""
|
||||||
state_ids = yield self.store.get_state_ids_for_event(event.event_id)
|
state_ids = yield self.store.get_state_ids_for_event(event.event_id, types)
|
||||||
if event.is_state():
|
if event.is_state():
|
||||||
state_ids = state_ids.copy()
|
state_ids = state_ids.copy()
|
||||||
state_ids[(event.type, event.state_key)] = event.event_id
|
state_ids[(event.type, event.state_key)] = event.event_id
|
||||||
defer.returnValue(state_ids)
|
defer.returnValue(state_ids)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def get_state_at(self, room_id, stream_position):
|
def get_state_at(self, room_id, stream_position, types=None):
|
||||||
""" Get the room state at a particular stream position
|
""" Get the room state at a particular stream position
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
|
@ -432,7 +432,7 @@ class SyncHandler(object):
|
||||||
|
|
||||||
if last_events:
|
if last_events:
|
||||||
last_event = last_events[-1]
|
last_event = last_events[-1]
|
||||||
state = yield self.get_state_after_event(last_event)
|
state = yield self.get_state_after_event(last_event, types)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# no events in this room - so presumably no state
|
# no events in this room - so presumably no state
|
||||||
|
@ -441,7 +441,7 @@ class SyncHandler(object):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def compute_state_delta(self, room_id, batch, sync_config, since_token, now_token,
|
def compute_state_delta(self, room_id, batch, sync_config, since_token, now_token,
|
||||||
full_state):
|
full_state, filter_members):
|
||||||
""" Works out the differnce in state between the start of the timeline
|
""" Works out the differnce in state between the start of the timeline
|
||||||
and the previous sync.
|
and the previous sync.
|
||||||
|
|
||||||
|
@ -454,6 +454,8 @@ class SyncHandler(object):
|
||||||
be None.
|
be None.
|
||||||
now_token(str): Token of the end of the current batch.
|
now_token(str): Token of the end of the current batch.
|
||||||
full_state(bool): Whether to force returning the full state.
|
full_state(bool): Whether to force returning the full state.
|
||||||
|
filter_members(bool): Whether to only return state for members
|
||||||
|
referenced in this timeline segment
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
A deferred new event dictionary
|
A deferred new event dictionary
|
||||||
|
@ -464,18 +466,35 @@ class SyncHandler(object):
|
||||||
# TODO(mjark) Check for new redactions in the state events.
|
# TODO(mjark) Check for new redactions in the state events.
|
||||||
|
|
||||||
with Measure(self.clock, "compute_state_delta"):
|
with Measure(self.clock, "compute_state_delta"):
|
||||||
|
|
||||||
|
types = None
|
||||||
|
if filter_members:
|
||||||
|
# We only request state for the members needed to display the
|
||||||
|
# timeline:
|
||||||
|
types = (
|
||||||
|
(EventTypes.Member, state_key)
|
||||||
|
for state_key in set(
|
||||||
|
event.sender # FIXME: we also care about targets etc.
|
||||||
|
for event in batch.events
|
||||||
|
)
|
||||||
|
)
|
||||||
|
types.append((None, None)) # don't just filter to room members
|
||||||
|
|
||||||
|
# TODO: we should opportunistically deduplicate these members too
|
||||||
|
# within the same sync series (based on an in-memory cache)
|
||||||
|
|
||||||
if full_state:
|
if full_state:
|
||||||
if batch:
|
if batch:
|
||||||
current_state_ids = yield self.store.get_state_ids_for_event(
|
current_state_ids = yield self.store.get_state_ids_for_event(
|
||||||
batch.events[-1].event_id
|
batch.events[-1].event_id, types=types
|
||||||
)
|
)
|
||||||
|
|
||||||
state_ids = yield self.store.get_state_ids_for_event(
|
state_ids = yield self.store.get_state_ids_for_event(
|
||||||
batch.events[0].event_id
|
batch.events[0].event_id, types=types
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
current_state_ids = yield self.get_state_at(
|
current_state_ids = yield self.get_state_at(
|
||||||
room_id, stream_position=now_token
|
room_id, stream_position=now_token, types=types
|
||||||
)
|
)
|
||||||
|
|
||||||
state_ids = current_state_ids
|
state_ids = current_state_ids
|
||||||
|
@ -493,15 +512,15 @@ class SyncHandler(object):
|
||||||
)
|
)
|
||||||
elif batch.limited:
|
elif batch.limited:
|
||||||
state_at_previous_sync = yield self.get_state_at(
|
state_at_previous_sync = yield self.get_state_at(
|
||||||
room_id, stream_position=since_token
|
room_id, stream_position=since_token, types=types
|
||||||
)
|
)
|
||||||
|
|
||||||
current_state_ids = yield self.store.get_state_ids_for_event(
|
current_state_ids = yield self.store.get_state_ids_for_event(
|
||||||
batch.events[-1].event_id
|
batch.events[-1].event_id, types=types
|
||||||
)
|
)
|
||||||
|
|
||||||
state_at_timeline_start = yield self.store.get_state_ids_for_event(
|
state_at_timeline_start = yield self.store.get_state_ids_for_event(
|
||||||
batch.events[0].event_id
|
batch.events[0].event_id, types=types
|
||||||
)
|
)
|
||||||
|
|
||||||
timeline_state = {
|
timeline_state = {
|
||||||
|
@ -1325,7 +1344,7 @@ class SyncHandler(object):
|
||||||
|
|
||||||
state = yield self.compute_state_delta(
|
state = yield self.compute_state_delta(
|
||||||
room_id, batch, sync_config, since_token, now_token,
|
room_id, batch, sync_config, since_token, now_token,
|
||||||
full_state=full_state
|
full_state=full_state, filter_members=True
|
||||||
)
|
)
|
||||||
|
|
||||||
if room_builder.rtype == "joined":
|
if room_builder.rtype == "joined":
|
||||||
|
|
|
@ -198,8 +198,15 @@ class StateGroupWorkerStore(SQLBaseStore):
|
||||||
|
|
||||||
def _get_state_groups_from_groups_txn(self, txn, groups, types=None):
|
def _get_state_groups_from_groups_txn(self, txn, groups, types=None):
|
||||||
results = {group: {} for group in groups}
|
results = {group: {} for group in groups}
|
||||||
|
|
||||||
|
include_other_types = False
|
||||||
|
|
||||||
if types is not None:
|
if types is not None:
|
||||||
types = list(set(types)) # deduplicate types list
|
type_set = set(types)
|
||||||
|
if (None, None) in type_set:
|
||||||
|
include_other_types = True
|
||||||
|
type_set.remove((None, None))
|
||||||
|
types = list(type_set) # deduplicate types list
|
||||||
|
|
||||||
if isinstance(self.database_engine, PostgresEngine):
|
if isinstance(self.database_engine, PostgresEngine):
|
||||||
# Temporarily disable sequential scans in this transaction. This is
|
# Temporarily disable sequential scans in this transaction. This is
|
||||||
|
@ -238,11 +245,21 @@ class StateGroupWorkerStore(SQLBaseStore):
|
||||||
if types:
|
if types:
|
||||||
clause_to_args = [
|
clause_to_args = [
|
||||||
(
|
(
|
||||||
"AND type = ? AND state_key = ?",
|
"AND type = ? AND state_key = ?" if state_key is not None else "AND type = ?",
|
||||||
(etype, state_key)
|
(etype, state_key) if state_key is not None else (etype)
|
||||||
)
|
)
|
||||||
for etype, state_key in types
|
for etype, state_key in types
|
||||||
]
|
]
|
||||||
|
|
||||||
|
if include_other_types:
|
||||||
|
# XXX: check whether this slows postgres down like a list of
|
||||||
|
# ORs does too?
|
||||||
|
clause_to_args.append(
|
||||||
|
(
|
||||||
|
"AND type <> ? " * len(types),
|
||||||
|
[t for (t, _) in types]
|
||||||
|
)
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
# If types is None we fetch all the state, and so just use an
|
# If types is None we fetch all the state, and so just use an
|
||||||
# empty where clause with no extra args.
|
# empty where clause with no extra args.
|
||||||
|
@ -263,6 +280,10 @@ class StateGroupWorkerStore(SQLBaseStore):
|
||||||
where_clause = "AND (%s)" % (
|
where_clause = "AND (%s)" % (
|
||||||
" OR ".join(["(type = ? AND state_key = ?)"] * len(types)),
|
" OR ".join(["(type = ? AND state_key = ?)"] * len(types)),
|
||||||
)
|
)
|
||||||
|
if include_other_types:
|
||||||
|
where_clause += " AND (%s)" % (
|
||||||
|
" AND ".join(["type <> ?"] * len(types)),
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
where_clause = ""
|
where_clause = ""
|
||||||
|
|
||||||
|
@ -449,17 +470,27 @@ class StateGroupWorkerStore(SQLBaseStore):
|
||||||
group: The state group to lookup
|
group: The state group to lookup
|
||||||
types (list): List of 2-tuples of the form (`type`, `state_key`),
|
types (list): List of 2-tuples of the form (`type`, `state_key`),
|
||||||
where a `state_key` of `None` matches all state_keys for the
|
where a `state_key` of `None` matches all state_keys for the
|
||||||
`type`.
|
`type`. Presence of type of `None` indicates that types not
|
||||||
|
in the list should not be filtered out.
|
||||||
"""
|
"""
|
||||||
is_all, known_absent, state_dict_ids = self._state_group_cache.get(group)
|
is_all, known_absent, state_dict_ids = self._state_group_cache.get(group)
|
||||||
|
|
||||||
type_to_key = {}
|
type_to_key = {}
|
||||||
missing_types = set()
|
missing_types = set()
|
||||||
|
|
||||||
|
include_other_types = False
|
||||||
|
|
||||||
for typ, state_key in types:
|
for typ, state_key in types:
|
||||||
key = (typ, state_key)
|
key = (typ, state_key)
|
||||||
|
|
||||||
|
if typ is None:
|
||||||
|
include_other_types = True
|
||||||
|
next
|
||||||
|
|
||||||
if state_key is None:
|
if state_key is None:
|
||||||
type_to_key[typ] = None
|
type_to_key[typ] = None
|
||||||
|
# XXX: why do we mark the type as missing from our cache just
|
||||||
|
# because we weren't filtering on a specific value of state_key?
|
||||||
missing_types.add(key)
|
missing_types.add(key)
|
||||||
else:
|
else:
|
||||||
if type_to_key.get(typ, object()) is not None:
|
if type_to_key.get(typ, object()) is not None:
|
||||||
|
@ -478,7 +509,7 @@ class StateGroupWorkerStore(SQLBaseStore):
|
||||||
return True
|
return True
|
||||||
if state_key in valid_state_keys:
|
if state_key in valid_state_keys:
|
||||||
return True
|
return True
|
||||||
return False
|
return include_other_types
|
||||||
|
|
||||||
got_all = is_all or not missing_types
|
got_all = is_all or not missing_types
|
||||||
|
|
||||||
|
@ -507,6 +538,12 @@ class StateGroupWorkerStore(SQLBaseStore):
|
||||||
with matching types. `types` is a list of `(type, state_key)`, where
|
with matching types. `types` is a list of `(type, state_key)`, where
|
||||||
a `state_key` of None matches all state_keys. If `types` is None then
|
a `state_key` of None matches all state_keys. If `types` is None then
|
||||||
all events are returned.
|
all events are returned.
|
||||||
|
|
||||||
|
XXX: is it really true that `state_key` of None in `types` matches all
|
||||||
|
state_keys? it looks like _get-some_state_from_cache does the right thing,
|
||||||
|
but _get_state_groups_from_groups_txn treats ths None is turned into
|
||||||
|
'AND state_key = NULL' or similar (at least until i just fixed it) --Matthew
|
||||||
|
I've filed this as https://github.com/matrix-org/synapse/issues/2969
|
||||||
"""
|
"""
|
||||||
if types:
|
if types:
|
||||||
types = frozenset(types)
|
types = frozenset(types)
|
||||||
|
|
Loading…
Reference in a new issue