mirror of
https://github.com/element-hq/synapse.git
synced 2024-11-29 15:39:00 +03:00
Merge pull request #2649 from matrix-org/rav/fix_delta_on_state_res
Fix bug in state group storage
This commit is contained in:
commit
02a9a93bde
3 changed files with 58 additions and 33 deletions
|
@ -1706,6 +1706,17 @@ class FederationHandler(BaseHandler):
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
@log_function
|
||||||
def do_auth(self, origin, event, context, auth_events):
|
def do_auth(self, origin, event, context, auth_events):
|
||||||
|
"""
|
||||||
|
|
||||||
|
Args:
|
||||||
|
origin (str):
|
||||||
|
event (synapse.events.FrozenEvent):
|
||||||
|
context (synapse.events.snapshot.EventContext):
|
||||||
|
auth_events (dict[(str, str)->str]):
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
defer.Deferred[None]
|
||||||
|
"""
|
||||||
# Check if we have all the auth events.
|
# Check if we have all the auth events.
|
||||||
current_state = set(e.event_id for e in auth_events.values())
|
current_state = set(e.event_id for e in auth_events.values())
|
||||||
event_auth_events = set(e_id for e_id, _ in event.auth_events)
|
event_auth_events = set(e_id for e_id, _ in event.auth_events)
|
||||||
|
@ -1817,16 +1828,9 @@ class FederationHandler(BaseHandler):
|
||||||
current_state = set(e.event_id for e in auth_events.values())
|
current_state = set(e.event_id for e in auth_events.values())
|
||||||
different_auth = event_auth_events - current_state
|
different_auth = event_auth_events - current_state
|
||||||
|
|
||||||
context.current_state_ids = dict(context.current_state_ids)
|
self._update_context_for_auth_events(
|
||||||
context.current_state_ids.update({
|
context, auth_events, event_key,
|
||||||
k: a.event_id for k, a in auth_events.items()
|
)
|
||||||
if k != event_key
|
|
||||||
})
|
|
||||||
context.prev_state_ids = dict(context.prev_state_ids)
|
|
||||||
context.prev_state_ids.update({
|
|
||||||
k: a.event_id for k, a in auth_events.items()
|
|
||||||
})
|
|
||||||
context.state_group = self.store.get_next_state_group()
|
|
||||||
|
|
||||||
if different_auth and not event.internal_metadata.is_outlier():
|
if different_auth and not event.internal_metadata.is_outlier():
|
||||||
logger.info("Different auth after resolution: %s", different_auth)
|
logger.info("Different auth after resolution: %s", different_auth)
|
||||||
|
@ -1906,16 +1910,9 @@ class FederationHandler(BaseHandler):
|
||||||
# 4. Look at rejects and their proofs.
|
# 4. Look at rejects and their proofs.
|
||||||
# TODO.
|
# TODO.
|
||||||
|
|
||||||
context.current_state_ids = dict(context.current_state_ids)
|
self._update_context_for_auth_events(
|
||||||
context.current_state_ids.update({
|
context, auth_events, event_key,
|
||||||
k: a.event_id for k, a in auth_events.items()
|
)
|
||||||
if k != event_key
|
|
||||||
})
|
|
||||||
context.prev_state_ids = dict(context.prev_state_ids)
|
|
||||||
context.prev_state_ids.update({
|
|
||||||
k: a.event_id for k, a in auth_events.items()
|
|
||||||
})
|
|
||||||
context.state_group = self.store.get_next_state_group()
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.auth.check(event, auth_events=auth_events)
|
self.auth.check(event, auth_events=auth_events)
|
||||||
|
@ -1923,6 +1920,35 @@ class FederationHandler(BaseHandler):
|
||||||
logger.warn("Failed auth resolution for %r because %s", event, e)
|
logger.warn("Failed auth resolution for %r because %s", event, e)
|
||||||
raise e
|
raise e
|
||||||
|
|
||||||
|
def _update_context_for_auth_events(self, context, auth_events,
|
||||||
|
event_key):
|
||||||
|
"""Update the state_ids in an event context after auth event resolution
|
||||||
|
|
||||||
|
Args:
|
||||||
|
context (synapse.events.snapshot.EventContext): event context
|
||||||
|
to be updated
|
||||||
|
|
||||||
|
auth_events (dict[(str, str)->str]): Events to update in the event
|
||||||
|
context.
|
||||||
|
|
||||||
|
event_key ((str, str)): (type, state_key) for the current event.
|
||||||
|
this will not be included in the current_state in the context.
|
||||||
|
"""
|
||||||
|
state_updates = {
|
||||||
|
k: a.event_id for k, a in auth_events.iteritems()
|
||||||
|
if k != event_key
|
||||||
|
}
|
||||||
|
context.current_state_ids = dict(context.current_state_ids)
|
||||||
|
context.current_state_ids.update(state_updates)
|
||||||
|
if context.delta_ids is not None:
|
||||||
|
context.delta_ids = dict(context.delta_ids)
|
||||||
|
context.delta_ids.update(state_updates)
|
||||||
|
context.prev_state_ids = dict(context.prev_state_ids)
|
||||||
|
context.prev_state_ids.update({
|
||||||
|
k: a.event_id for k, a in auth_events.iteritems()
|
||||||
|
})
|
||||||
|
context.state_group = self.store.get_next_state_group()
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def construct_auth_difference(self, local_auth, remote_auth):
|
def construct_auth_difference(self, local_auth, remote_auth):
|
||||||
""" Given a local and remote auth chain, find the differences. This
|
""" Given a local and remote auth chain, find the differences. This
|
||||||
|
|
|
@ -16,8 +16,6 @@ import logging
|
||||||
|
|
||||||
from synapse.api.errors import StoreError
|
from synapse.api.errors import StoreError
|
||||||
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
|
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
|
||||||
from synapse.util.caches import CACHE_SIZE_FACTOR
|
|
||||||
from synapse.util.caches.dictionary_cache import DictionaryCache
|
|
||||||
from synapse.util.caches.descriptors import Cache
|
from synapse.util.caches.descriptors import Cache
|
||||||
from synapse.storage.engines import PostgresEngine
|
from synapse.storage.engines import PostgresEngine
|
||||||
import synapse.metrics
|
import synapse.metrics
|
||||||
|
@ -180,10 +178,6 @@ class SQLBaseStore(object):
|
||||||
self._get_event_cache = Cache("*getEvent*", keylen=3,
|
self._get_event_cache = Cache("*getEvent*", keylen=3,
|
||||||
max_entries=hs.config.event_cache_size)
|
max_entries=hs.config.event_cache_size)
|
||||||
|
|
||||||
self._state_group_cache = DictionaryCache(
|
|
||||||
"*stateGroupCache*", 100000 * CACHE_SIZE_FACTOR
|
|
||||||
)
|
|
||||||
|
|
||||||
self._event_fetch_lock = threading.Condition()
|
self._event_fetch_lock = threading.Condition()
|
||||||
self._event_fetch_list = []
|
self._event_fetch_list = []
|
||||||
self._event_fetch_ongoing = 0
|
self._event_fetch_ongoing = 0
|
||||||
|
|
|
@ -13,16 +13,17 @@
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
from ._base import SQLBaseStore
|
from collections import namedtuple
|
||||||
from synapse.util.caches.descriptors import cached, cachedList
|
import logging
|
||||||
from synapse.util.caches import intern_string
|
|
||||||
from synapse.util.stringutils import to_ascii
|
|
||||||
from synapse.storage.engines import PostgresEngine
|
|
||||||
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
from collections import namedtuple
|
|
||||||
|
|
||||||
import logging
|
from synapse.storage.engines import PostgresEngine
|
||||||
|
from synapse.util.caches import intern_string, CACHE_SIZE_FACTOR
|
||||||
|
from synapse.util.caches.descriptors import cached, cachedList
|
||||||
|
from synapse.util.caches.dictionary_cache import DictionaryCache
|
||||||
|
from synapse.util.stringutils import to_ascii
|
||||||
|
from ._base import SQLBaseStore
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -81,6 +82,10 @@ class StateStore(SQLBaseStore):
|
||||||
where_clause="type='m.room.member'",
|
where_clause="type='m.room.member'",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
self._state_group_cache = DictionaryCache(
|
||||||
|
"*stateGroupCache*", 100000 * CACHE_SIZE_FACTOR
|
||||||
|
)
|
||||||
|
|
||||||
@cached(max_entries=100000, iterable=True)
|
@cached(max_entries=100000, iterable=True)
|
||||||
def get_current_state_ids(self, room_id):
|
def get_current_state_ids(self, room_id):
|
||||||
"""Get the current state event ids for a room based on the
|
"""Get the current state event ids for a room based on the
|
||||||
|
|
Loading…
Reference in a new issue