mirror of
https://github.com/element-hq/synapse.git
synced 2024-12-19 01:21:09 +03:00
Merge branch 'master' into dinsic
This commit is contained in:
commit
50d5a97c1b
39 changed files with 649 additions and 230 deletions
88
CHANGES.rst
88
CHANGES.rst
|
@ -1,5 +1,92 @@
|
||||||
|
Changes in synapse v0.28.1 (2018-05-01)
|
||||||
|
=======================================
|
||||||
|
|
||||||
|
SECURITY UPDATE
|
||||||
|
|
||||||
|
* Clamp the allowed values of event depth received over federation to be
|
||||||
|
[0, 2^63 - 1]. This mitigates an attack where malicious events
|
||||||
|
injected with depth = 2^63 - 1 render rooms unusable. Depth is used to
|
||||||
|
determine the cosmetic ordering of events within a room, and so the ordering
|
||||||
|
of events in such a room will default to using stream_ordering rather than depth
|
||||||
|
(topological_ordering).
|
||||||
|
|
||||||
|
This is a temporary solution to mitigate abuse in the wild, whilst a long term solution
|
||||||
|
is being implemented to improve how the depth parameter is used.
|
||||||
|
|
||||||
|
Full details at
|
||||||
|
https://docs.google.com/document/d/1I3fi2S-XnpO45qrpCsowZv8P8dHcNZ4fsBsbOW7KABI
|
||||||
|
|
||||||
|
* Pin Twisted to <18.4 until we stop using the private _OpenSSLECCurve API.
|
||||||
|
|
||||||
|
|
||||||
|
Changes in synapse v0.28.0 (2018-04-26)
|
||||||
|
=======================================
|
||||||
|
|
||||||
|
Bug Fixes:
|
||||||
|
|
||||||
|
* Fix quarantine media admin API and search reindex (PR #3130)
|
||||||
|
* Fix media admin APIs (PR #3134)
|
||||||
|
|
||||||
|
|
||||||
|
Changes in synapse v0.28.0-rc1 (2018-04-24)
|
||||||
|
===========================================
|
||||||
|
|
||||||
|
Minor performance improvement to federation sending and bug fixes.
|
||||||
|
|
||||||
|
(Note: This release does not include the delta state resolution implementation discussed in matrix live)
|
||||||
|
|
||||||
|
|
||||||
|
Features:
|
||||||
|
|
||||||
|
* Add metrics for event processing lag (PR #3090)
|
||||||
|
* Add metrics for ResponseCache (PR #3092)
|
||||||
|
|
||||||
|
Changes:
|
||||||
|
|
||||||
|
* Synapse on PyPy (PR #2760) Thanks to @Valodim!
|
||||||
|
* move handling of auto_join_rooms to RegisterHandler (PR #2996) Thanks to @krombel!
|
||||||
|
* Improve handling of SRV records for federation connections (PR #3016) Thanks to @silkeh!
|
||||||
|
* Document the behaviour of ResponseCache (PR #3059)
|
||||||
|
* Preparation for py3 (PR #3061, #3073, #3074, #3075, #3103, #3104, #3106, #3107, #3109, #3110) Thanks to @NotAFile!
|
||||||
|
* update prometheus dashboard to use new metric names (PR #3069) Thanks to @krombel!
|
||||||
|
* use python3-compatible prints (PR #3074) Thanks to @NotAFile!
|
||||||
|
* Send federation events concurrently (PR #3078)
|
||||||
|
* Limit concurrent event sends for a room (PR #3079)
|
||||||
|
* Improve R30 stat definition (PR #3086)
|
||||||
|
* Send events to ASes concurrently (PR #3088)
|
||||||
|
* Refactor ResponseCache usage (PR #3093)
|
||||||
|
* Clarify that SRV may not point to a CNAME (PR #3100) Thanks to @silkeh!
|
||||||
|
* Use str(e) instead of e.message (PR #3103) Thanks to @NotAFile!
|
||||||
|
* Use six.itervalues in some places (PR #3106) Thanks to @NotAFile!
|
||||||
|
* Refactor store.have_events (PR #3117)
|
||||||
|
|
||||||
|
Bug Fixes:
|
||||||
|
|
||||||
|
* Return 401 for invalid access_token on logout (PR #2938) Thanks to @dklug!
|
||||||
|
* Return a 404 rather than a 500 on rejoining empty rooms (PR #3080)
|
||||||
|
* fix federation_domain_whitelist (PR #3099)
|
||||||
|
* Avoid creating events with huge numbers of prev_events (PR #3113)
|
||||||
|
* Reject events which have lots of prev_events (PR #3118)
|
||||||
|
|
||||||
|
|
||||||
|
Changes in synapse v0.27.4 (2018-04-13)
|
||||||
|
======================================
|
||||||
|
|
||||||
|
Changes:
|
||||||
|
|
||||||
|
* Update canonicaljson dependency (#3095)
|
||||||
|
|
||||||
|
|
||||||
|
Changes in synapse v0.27.3 (2018-04-11)
|
||||||
|
======================================
|
||||||
|
|
||||||
|
Bug fixes:
|
||||||
|
|
||||||
|
* URL quote path segments over federation (#3082)
|
||||||
|
|
||||||
Changes in synapse v0.27.3-rc2 (2018-04-09)
|
Changes in synapse v0.27.3-rc2 (2018-04-09)
|
||||||
==========================================
|
==========================================
|
||||||
|
|
||||||
v0.27.3-rc1 used a stale version of the develop branch so the changelog overstates
|
v0.27.3-rc1 used a stale version of the develop branch so the changelog overstates
|
||||||
the functionality. v0.27.3-rc2 is up to date, rc1 should be ignored.
|
the functionality. v0.27.3-rc2 is up to date, rc1 should be ignored.
|
||||||
|
|
||||||
|
@ -45,7 +132,6 @@ Bug fixes:
|
||||||
|
|
||||||
* Add room_id to the response of `rooms/{roomId}/join` (PR #2986) Thanks to @jplatte!
|
* Add room_id to the response of `rooms/{roomId}/join` (PR #2986) Thanks to @jplatte!
|
||||||
* Fix replication after switch to simplejson (PR #3015)
|
* Fix replication after switch to simplejson (PR #3015)
|
||||||
* Fix replication after switch to simplejson (PR #3015)
|
|
||||||
* 404 correctly on missing paths via NoResource (PR #3022)
|
* 404 correctly on missing paths via NoResource (PR #3022)
|
||||||
* Fix error when claiming e2e keys from offline servers (PR #3034)
|
* Fix error when claiming e2e keys from offline servers (PR #3034)
|
||||||
* fix tests/storage/test_user_directory.py (PR #3042)
|
* fix tests/storage/test_user_directory.py (PR #3042)
|
||||||
|
|
|
@ -157,8 +157,8 @@ if you prefer.
|
||||||
|
|
||||||
In case of problems, please see the _`Troubleshooting` section below.
|
In case of problems, please see the _`Troubleshooting` section below.
|
||||||
|
|
||||||
Alternatively, Silvio Fricke has contributed a Dockerfile to automate the
|
Alternatively, Andreas Peters (previously Silvio Fricke) has contributed a Dockerfile to automate the
|
||||||
above in Docker at https://registry.hub.docker.com/u/silviof/docker-matrix/.
|
above in Docker at https://hub.docker.com/r/avhost/docker-matrix/tags/
|
||||||
|
|
||||||
Also, Martin Giess has created an auto-deployment process with vagrant/ansible,
|
Also, Martin Giess has created an auto-deployment process with vagrant/ansible,
|
||||||
tested with VirtualBox/AWS/DigitalOcean - see https://github.com/EMnify/matrix-synapse-auto-deploy
|
tested with VirtualBox/AWS/DigitalOcean - see https://github.com/EMnify/matrix-synapse-auto-deploy
|
||||||
|
@ -614,6 +614,9 @@ should have the format ``_matrix._tcp.<yourdomain.com> <ttl> IN SRV 10 0 <port>
|
||||||
$ dig -t srv _matrix._tcp.example.com
|
$ dig -t srv _matrix._tcp.example.com
|
||||||
_matrix._tcp.example.com. 3600 IN SRV 10 0 8448 synapse.example.com.
|
_matrix._tcp.example.com. 3600 IN SRV 10 0 8448 synapse.example.com.
|
||||||
|
|
||||||
|
Note that the server hostname cannot be an alias (CNAME record): it has to point
|
||||||
|
directly to the server hosting the synapse instance.
|
||||||
|
|
||||||
You can then configure your homeserver to use ``<yourdomain.com>`` as the domain in
|
You can then configure your homeserver to use ``<yourdomain.com>`` as the domain in
|
||||||
its user-ids, by setting ``server_name``::
|
its user-ids, by setting ``server_name``::
|
||||||
|
|
||||||
|
|
10
contrib/README.rst
Normal file
10
contrib/README.rst
Normal file
|
@ -0,0 +1,10 @@
|
||||||
|
Community Contributions
|
||||||
|
=======================
|
||||||
|
|
||||||
|
Everything in this directory are projects submitted by the community that may be useful
|
||||||
|
to others. As such, the project maintainers cannot guarantee support, stability
|
||||||
|
or backwards compatibility of these projects.
|
||||||
|
|
||||||
|
Files in this directory should *not* be relied on directly, as they may not
|
||||||
|
continue to work or exist in future. If you wish to use any of these files then
|
||||||
|
they should be copied to avoid them breaking from underneath you.
|
|
@ -2,6 +2,9 @@
|
||||||
# (e.g. https://www.archlinux.org/packages/community/any/matrix-synapse/ for ArchLinux)
|
# (e.g. https://www.archlinux.org/packages/community/any/matrix-synapse/ for ArchLinux)
|
||||||
# rather than in a user home directory or similar under virtualenv.
|
# rather than in a user home directory or similar under virtualenv.
|
||||||
|
|
||||||
|
# **NOTE:** This is an example service file that may change in the future. If you
|
||||||
|
# wish to use this please copy rather than symlink it.
|
||||||
|
|
||||||
[Unit]
|
[Unit]
|
||||||
Description=Synapse Matrix homeserver
|
Description=Synapse Matrix homeserver
|
||||||
|
|
||||||
|
@ -12,6 +15,7 @@ Group=synapse
|
||||||
WorkingDirectory=/var/lib/synapse
|
WorkingDirectory=/var/lib/synapse
|
||||||
ExecStart=/usr/bin/python2.7 -m synapse.app.homeserver --config-path=/etc/synapse/homeserver.yaml
|
ExecStart=/usr/bin/python2.7 -m synapse.app.homeserver --config-path=/etc/synapse/homeserver.yaml
|
||||||
ExecStop=/usr/bin/synctl stop /etc/synapse/homeserver.yaml
|
ExecStop=/usr/bin/synctl stop /etc/synapse/homeserver.yaml
|
||||||
|
# EnvironmentFile=-/etc/sysconfig/synapse # Can be used to e.g. set SYNAPSE_CACHE_FACTOR
|
||||||
|
|
||||||
[Install]
|
[Install]
|
||||||
WantedBy=multi-user.target
|
WantedBy=multi-user.target
|
||||||
|
|
|
@ -16,4 +16,4 @@
|
||||||
""" This is a reference implementation of a Matrix home server.
|
""" This is a reference implementation of a Matrix home server.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
__version__ = "0.27.3-rc2"
|
__version__ = "0.28.1"
|
||||||
|
|
|
@ -16,6 +16,9 @@
|
||||||
|
|
||||||
"""Contains constants from the specification."""
|
"""Contains constants from the specification."""
|
||||||
|
|
||||||
|
# the "depth" field on events is limited to 2**63 - 1
|
||||||
|
MAX_DEPTH = 2**63 - 1
|
||||||
|
|
||||||
|
|
||||||
class Membership(object):
|
class Membership(object):
|
||||||
|
|
||||||
|
|
|
@ -18,6 +18,7 @@
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
import simplejson as json
|
import simplejson as json
|
||||||
|
from six import iteritems
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -297,7 +298,7 @@ def cs_error(msg, code=Codes.UNKNOWN, **kwargs):
|
||||||
A dict representing the error response JSON.
|
A dict representing the error response JSON.
|
||||||
"""
|
"""
|
||||||
err = {"error": msg, "errcode": code}
|
err = {"error": msg, "errcode": code}
|
||||||
for key, value in kwargs.iteritems():
|
for key, value in iteritems(kwargs):
|
||||||
err[key] = value
|
err[key] = value
|
||||||
return err
|
return err
|
||||||
|
|
||||||
|
|
|
@ -58,6 +58,8 @@ from synapse.util.versionstring import get_version_string
|
||||||
from twisted.internet import defer, reactor
|
from twisted.internet import defer, reactor
|
||||||
from twisted.web.resource import NoResource
|
from twisted.web.resource import NoResource
|
||||||
|
|
||||||
|
from six import iteritems
|
||||||
|
|
||||||
logger = logging.getLogger("synapse.app.synchrotron")
|
logger = logging.getLogger("synapse.app.synchrotron")
|
||||||
|
|
||||||
|
|
||||||
|
@ -211,7 +213,7 @@ class SynchrotronPresence(object):
|
||||||
|
|
||||||
def get_currently_syncing_users(self):
|
def get_currently_syncing_users(self):
|
||||||
return [
|
return [
|
||||||
user_id for user_id, count in self.user_to_num_current_syncs.iteritems()
|
user_id for user_id, count in iteritems(self.user_to_num_current_syncs)
|
||||||
if count > 0
|
if count > 0
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
|
@ -18,7 +18,6 @@ from synapse.api.constants import ThirdPartyEntityKind
|
||||||
from synapse.api.errors import CodeMessageException
|
from synapse.api.errors import CodeMessageException
|
||||||
from synapse.http.client import SimpleHttpClient
|
from synapse.http.client import SimpleHttpClient
|
||||||
from synapse.events.utils import serialize_event
|
from synapse.events.utils import serialize_event
|
||||||
from synapse.util.logcontext import preserve_fn, make_deferred_yieldable
|
|
||||||
from synapse.util.caches.response_cache import ResponseCache
|
from synapse.util.caches.response_cache import ResponseCache
|
||||||
from synapse.types import ThirdPartyInstanceID
|
from synapse.types import ThirdPartyInstanceID
|
||||||
|
|
||||||
|
@ -194,12 +193,7 @@ class ApplicationServiceApi(SimpleHttpClient):
|
||||||
defer.returnValue(None)
|
defer.returnValue(None)
|
||||||
|
|
||||||
key = (service.id, protocol)
|
key = (service.id, protocol)
|
||||||
result = self.protocol_meta_cache.get(key)
|
return self.protocol_meta_cache.wrap(key, _get)
|
||||||
if not result:
|
|
||||||
result = self.protocol_meta_cache.set(
|
|
||||||
key, preserve_fn(_get)()
|
|
||||||
)
|
|
||||||
return make_deferred_yieldable(result)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def push_bulk(self, service, events, txn_id=None):
|
def push_bulk(self, service, events, txn_id=None):
|
||||||
|
|
|
@ -352,7 +352,7 @@ class Keyring(object):
|
||||||
logger.exception(
|
logger.exception(
|
||||||
"Unable to get key from %r: %s %s",
|
"Unable to get key from %r: %s %s",
|
||||||
perspective_name,
|
perspective_name,
|
||||||
type(e).__name__, str(e.message),
|
type(e).__name__, str(e),
|
||||||
)
|
)
|
||||||
defer.returnValue({})
|
defer.returnValue({})
|
||||||
|
|
||||||
|
@ -384,7 +384,7 @@ class Keyring(object):
|
||||||
logger.info(
|
logger.info(
|
||||||
"Unable to get key %r for %r directly: %s %s",
|
"Unable to get key %r for %r directly: %s %s",
|
||||||
key_ids, server_name,
|
key_ids, server_name,
|
||||||
type(e).__name__, str(e.message),
|
type(e).__name__, str(e),
|
||||||
)
|
)
|
||||||
|
|
||||||
if not keys:
|
if not keys:
|
||||||
|
@ -734,7 +734,7 @@ def _handle_key_deferred(verify_request):
|
||||||
except IOError as e:
|
except IOError as e:
|
||||||
logger.warn(
|
logger.warn(
|
||||||
"Got IOError when downloading keys for %s: %s %s",
|
"Got IOError when downloading keys for %s: %s %s",
|
||||||
server_name, type(e).__name__, str(e.message),
|
server_name, type(e).__name__, str(e),
|
||||||
)
|
)
|
||||||
raise SynapseError(
|
raise SynapseError(
|
||||||
502,
|
502,
|
||||||
|
@ -744,7 +744,7 @@ def _handle_key_deferred(verify_request):
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.exception(
|
logger.exception(
|
||||||
"Got Exception when downloading keys for %s: %s %s",
|
"Got Exception when downloading keys for %s: %s %s",
|
||||||
server_name, type(e).__name__, str(e.message),
|
server_name, type(e).__name__, str(e),
|
||||||
)
|
)
|
||||||
raise SynapseError(
|
raise SynapseError(
|
||||||
401,
|
401,
|
||||||
|
|
|
@ -14,7 +14,10 @@
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
from synapse.api.errors import SynapseError
|
import six
|
||||||
|
|
||||||
|
from synapse.api.constants import MAX_DEPTH
|
||||||
|
from synapse.api.errors import SynapseError, Codes
|
||||||
from synapse.crypto.event_signing import check_event_content_hash
|
from synapse.crypto.event_signing import check_event_content_hash
|
||||||
from synapse.events import FrozenEvent
|
from synapse.events import FrozenEvent
|
||||||
from synapse.events.utils import prune_event
|
from synapse.events.utils import prune_event
|
||||||
|
@ -190,11 +193,23 @@ def event_from_pdu_json(pdu_json, outlier=False):
|
||||||
FrozenEvent
|
FrozenEvent
|
||||||
|
|
||||||
Raises:
|
Raises:
|
||||||
SynapseError: if the pdu is missing required fields
|
SynapseError: if the pdu is missing required fields or is otherwise
|
||||||
|
not a valid matrix event
|
||||||
"""
|
"""
|
||||||
# we could probably enforce a bunch of other fields here (room_id, sender,
|
# we could probably enforce a bunch of other fields here (room_id, sender,
|
||||||
# origin, etc etc)
|
# origin, etc etc)
|
||||||
assert_params_in_request(pdu_json, ('event_id', 'type'))
|
assert_params_in_request(pdu_json, ('event_id', 'type', 'depth'))
|
||||||
|
|
||||||
|
depth = pdu_json['depth']
|
||||||
|
if not isinstance(depth, six.integer_types):
|
||||||
|
raise SynapseError(400, "Depth %r not an intger" % (depth, ),
|
||||||
|
Codes.BAD_JSON)
|
||||||
|
|
||||||
|
if depth < 0:
|
||||||
|
raise SynapseError(400, "Depth too small", Codes.BAD_JSON)
|
||||||
|
elif depth > MAX_DEPTH:
|
||||||
|
raise SynapseError(400, "Depth too large", Codes.BAD_JSON)
|
||||||
|
|
||||||
event = FrozenEvent(
|
event = FrozenEvent(
|
||||||
pdu_json
|
pdu_json
|
||||||
)
|
)
|
||||||
|
|
|
@ -394,7 +394,7 @@ class FederationClient(FederationBase):
|
||||||
seen_events = yield self.store.get_events(event_ids, allow_rejected=True)
|
seen_events = yield self.store.get_events(event_ids, allow_rejected=True)
|
||||||
signed_events = seen_events.values()
|
signed_events = seen_events.values()
|
||||||
else:
|
else:
|
||||||
seen_events = yield self.store.have_events(event_ids)
|
seen_events = yield self.store.have_seen_events(event_ids)
|
||||||
signed_events = []
|
signed_events = []
|
||||||
|
|
||||||
failed_to_fetch = set()
|
failed_to_fetch = set()
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
# -*- coding: utf-8 -*-
|
# -*- coding: utf-8 -*-
|
||||||
# Copyright 2015, 2016 OpenMarket Ltd
|
# Copyright 2015, 2016 OpenMarket Ltd
|
||||||
|
# Copyright 2018 New Vector Ltd
|
||||||
#
|
#
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
# you may not use this file except in compliance with the License.
|
# you may not use this file except in compliance with the License.
|
||||||
|
@ -30,9 +31,10 @@ import synapse.metrics
|
||||||
from synapse.types import get_domain_from_id
|
from synapse.types import get_domain_from_id
|
||||||
from synapse.util import async
|
from synapse.util import async
|
||||||
from synapse.util.caches.response_cache import ResponseCache
|
from synapse.util.caches.response_cache import ResponseCache
|
||||||
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
|
|
||||||
from synapse.util.logutils import log_function
|
from synapse.util.logutils import log_function
|
||||||
|
|
||||||
|
from six import iteritems
|
||||||
|
|
||||||
# when processing incoming transactions, we try to handle multiple rooms in
|
# when processing incoming transactions, we try to handle multiple rooms in
|
||||||
# parallel, up to this limit.
|
# parallel, up to this limit.
|
||||||
TRANSACTION_CONCURRENCY_LIMIT = 10
|
TRANSACTION_CONCURRENCY_LIMIT = 10
|
||||||
|
@ -212,16 +214,17 @@ class FederationServer(FederationBase):
|
||||||
if not in_room:
|
if not in_room:
|
||||||
raise AuthError(403, "Host not in room.")
|
raise AuthError(403, "Host not in room.")
|
||||||
|
|
||||||
result = self._state_resp_cache.get((room_id, event_id))
|
# we grab the linearizer to protect ourselves from servers which hammer
|
||||||
if not result:
|
# us. In theory we might already have the response to this query
|
||||||
|
# in the cache so we could return it without waiting for the linearizer
|
||||||
|
# - but that's non-trivial to get right, and anyway somewhat defeats
|
||||||
|
# the point of the linearizer.
|
||||||
with (yield self._server_linearizer.queue((origin, room_id))):
|
with (yield self._server_linearizer.queue((origin, room_id))):
|
||||||
d = self._state_resp_cache.set(
|
resp = yield self._state_resp_cache.wrap(
|
||||||
(room_id, event_id),
|
(room_id, event_id),
|
||||||
preserve_fn(self._on_context_state_request_compute)(room_id, event_id)
|
self._on_context_state_request_compute,
|
||||||
|
room_id, event_id,
|
||||||
)
|
)
|
||||||
resp = yield make_deferred_yieldable(d)
|
|
||||||
else:
|
|
||||||
resp = yield make_deferred_yieldable(result)
|
|
||||||
|
|
||||||
defer.returnValue((200, resp))
|
defer.returnValue((200, resp))
|
||||||
|
|
||||||
|
@ -425,9 +428,9 @@ class FederationServer(FederationBase):
|
||||||
"Claimed one-time-keys: %s",
|
"Claimed one-time-keys: %s",
|
||||||
",".join((
|
",".join((
|
||||||
"%s for %s:%s" % (key_id, user_id, device_id)
|
"%s for %s:%s" % (key_id, user_id, device_id)
|
||||||
for user_id, user_keys in json_result.iteritems()
|
for user_id, user_keys in iteritems(json_result)
|
||||||
for device_id, device_keys in user_keys.iteritems()
|
for device_id, device_keys in iteritems(user_keys)
|
||||||
for key_id, _ in device_keys.iteritems()
|
for key_id, _ in iteritems(device_keys)
|
||||||
)),
|
)),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -494,12 +497,32 @@ class FederationServer(FederationBase):
|
||||||
def _handle_received_pdu(self, origin, pdu):
|
def _handle_received_pdu(self, origin, pdu):
|
||||||
""" Process a PDU received in a federation /send/ transaction.
|
""" Process a PDU received in a federation /send/ transaction.
|
||||||
|
|
||||||
|
If the event is invalid, then this method throws a FederationError.
|
||||||
|
(The error will then be logged and sent back to the sender (which
|
||||||
|
probably won't do anything with it), and other events in the
|
||||||
|
transaction will be processed as normal).
|
||||||
|
|
||||||
|
It is likely that we'll then receive other events which refer to
|
||||||
|
this rejected_event in their prev_events, etc. When that happens,
|
||||||
|
we'll attempt to fetch the rejected event again, which will presumably
|
||||||
|
fail, so those second-generation events will also get rejected.
|
||||||
|
|
||||||
|
Eventually, we get to the point where there are more than 10 events
|
||||||
|
between any new events and the original rejected event. Since we
|
||||||
|
only try to backfill 10 events deep on received pdu, we then accept the
|
||||||
|
new event, possibly introducing a discontinuity in the DAG, with new
|
||||||
|
forward extremities, so normal service is approximately returned,
|
||||||
|
until we try to backfill across the discontinuity.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
origin (str): server which sent the pdu
|
origin (str): server which sent the pdu
|
||||||
pdu (FrozenEvent): received pdu
|
pdu (FrozenEvent): received pdu
|
||||||
|
|
||||||
Returns (Deferred): completes with None
|
Returns (Deferred): completes with None
|
||||||
Raises: FederationError if the signatures / hash do not match
|
|
||||||
|
Raises: FederationError if the signatures / hash do not match, or
|
||||||
|
if the event was unacceptable for any other reason (eg, too large,
|
||||||
|
too many prev_events, couldn't find the prev_events)
|
||||||
"""
|
"""
|
||||||
# check that it's actually being sent from a valid destination to
|
# check that it's actually being sent from a valid destination to
|
||||||
# workaround bug #1753 in 0.18.5 and 0.18.6
|
# workaround bug #1753 in 0.18.5 and 0.18.6
|
||||||
|
|
|
@ -35,11 +35,13 @@ from synapse.storage.presence import UserPresenceState
|
||||||
from synapse.util.metrics import Measure
|
from synapse.util.metrics import Measure
|
||||||
import synapse.metrics
|
import synapse.metrics
|
||||||
|
|
||||||
from sortedcontainers import SortedDict
|
from blist import sorteddict
|
||||||
from collections import namedtuple
|
from collections import namedtuple
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
from six import itervalues, iteritems
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
@ -56,19 +58,19 @@ class FederationRemoteSendQueue(object):
|
||||||
self.is_mine_id = hs.is_mine_id
|
self.is_mine_id = hs.is_mine_id
|
||||||
|
|
||||||
self.presence_map = {} # Pending presence map user_id -> UserPresenceState
|
self.presence_map = {} # Pending presence map user_id -> UserPresenceState
|
||||||
self.presence_changed = SortedDict() # Stream position -> user_id
|
self.presence_changed = sorteddict() # Stream position -> user_id
|
||||||
|
|
||||||
self.keyed_edu = {} # (destination, key) -> EDU
|
self.keyed_edu = {} # (destination, key) -> EDU
|
||||||
self.keyed_edu_changed = SortedDict() # stream position -> (destination, key)
|
self.keyed_edu_changed = sorteddict() # stream position -> (destination, key)
|
||||||
|
|
||||||
self.edus = SortedDict() # stream position -> Edu
|
self.edus = sorteddict() # stream position -> Edu
|
||||||
|
|
||||||
self.failures = SortedDict() # stream position -> (destination, Failure)
|
self.failures = sorteddict() # stream position -> (destination, Failure)
|
||||||
|
|
||||||
self.device_messages = SortedDict() # stream position -> destination
|
self.device_messages = sorteddict() # stream position -> destination
|
||||||
|
|
||||||
self.pos = 1
|
self.pos = 1
|
||||||
self.pos_time = SortedDict()
|
self.pos_time = sorteddict()
|
||||||
|
|
||||||
# EVERYTHING IS SAD. In particular, python only makes new scopes when
|
# EVERYTHING IS SAD. In particular, python only makes new scopes when
|
||||||
# we make a new function, so we need to make a new function so the inner
|
# we make a new function, so we need to make a new function so the inner
|
||||||
|
@ -122,7 +124,7 @@ class FederationRemoteSendQueue(object):
|
||||||
|
|
||||||
user_ids = set(
|
user_ids = set(
|
||||||
user_id
|
user_id
|
||||||
for uids in self.presence_changed.itervalues()
|
for uids in itervalues(self.presence_changed)
|
||||||
for user_id in uids
|
for user_id in uids
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -276,7 +278,7 @@ class FederationRemoteSendQueue(object):
|
||||||
# stream position.
|
# stream position.
|
||||||
keyed_edus = {self.keyed_edu_changed[k]: k for k in keys[i:j]}
|
keyed_edus = {self.keyed_edu_changed[k]: k for k in keys[i:j]}
|
||||||
|
|
||||||
for ((destination, edu_key), pos) in keyed_edus.iteritems():
|
for ((destination, edu_key), pos) in iteritems(keyed_edus):
|
||||||
rows.append((pos, KeyedEduRow(
|
rows.append((pos, KeyedEduRow(
|
||||||
key=edu_key,
|
key=edu_key,
|
||||||
edu=self.keyed_edu[(destination, edu_key)],
|
edu=self.keyed_edu[(destination, edu_key)],
|
||||||
|
@ -309,7 +311,7 @@ class FederationRemoteSendQueue(object):
|
||||||
j = keys.bisect_right(to_token) + 1
|
j = keys.bisect_right(to_token) + 1
|
||||||
device_messages = {self.device_messages[k]: k for k in keys[i:j]}
|
device_messages = {self.device_messages[k]: k for k in keys[i:j]}
|
||||||
|
|
||||||
for (destination, pos) in device_messages.iteritems():
|
for (destination, pos) in iteritems(device_messages):
|
||||||
rows.append((pos, DeviceRow(
|
rows.append((pos, DeviceRow(
|
||||||
destination=destination,
|
destination=destination,
|
||||||
)))
|
)))
|
||||||
|
@ -528,19 +530,19 @@ def process_rows_for_federation(transaction_queue, rows):
|
||||||
if buff.presence:
|
if buff.presence:
|
||||||
transaction_queue.send_presence(buff.presence)
|
transaction_queue.send_presence(buff.presence)
|
||||||
|
|
||||||
for destination, edu_map in buff.keyed_edus.iteritems():
|
for destination, edu_map in iteritems(buff.keyed_edus):
|
||||||
for key, edu in edu_map.items():
|
for key, edu in edu_map.items():
|
||||||
transaction_queue.send_edu(
|
transaction_queue.send_edu(
|
||||||
edu.destination, edu.edu_type, edu.content, key=key,
|
edu.destination, edu.edu_type, edu.content, key=key,
|
||||||
)
|
)
|
||||||
|
|
||||||
for destination, edu_list in buff.edus.iteritems():
|
for destination, edu_list in iteritems(buff.edus):
|
||||||
for edu in edu_list:
|
for edu in edu_list:
|
||||||
transaction_queue.send_edu(
|
transaction_queue.send_edu(
|
||||||
edu.destination, edu.edu_type, edu.content, key=None,
|
edu.destination, edu.edu_type, edu.content, key=None,
|
||||||
)
|
)
|
||||||
|
|
||||||
for destination, failure_list in buff.failures.iteritems():
|
for destination, failure_list in iteritems(buff.failures):
|
||||||
for failure in failure_list:
|
for failure in failure_list:
|
||||||
transaction_queue.send_failure(destination, failure)
|
transaction_queue.send_failure(destination, failure)
|
||||||
|
|
||||||
|
|
|
@ -94,12 +94,6 @@ class Authenticator(object):
|
||||||
"signatures": {},
|
"signatures": {},
|
||||||
}
|
}
|
||||||
|
|
||||||
if (
|
|
||||||
self.federation_domain_whitelist is not None and
|
|
||||||
self.server_name not in self.federation_domain_whitelist
|
|
||||||
):
|
|
||||||
raise FederationDeniedError(self.server_name)
|
|
||||||
|
|
||||||
if content is not None:
|
if content is not None:
|
||||||
json_request["content"] = content
|
json_request["content"] = content
|
||||||
|
|
||||||
|
@ -138,6 +132,12 @@ class Authenticator(object):
|
||||||
json_request["origin"] = origin
|
json_request["origin"] = origin
|
||||||
json_request["signatures"].setdefault(origin, {})[key] = sig
|
json_request["signatures"].setdefault(origin, {})[key] = sig
|
||||||
|
|
||||||
|
if (
|
||||||
|
self.federation_domain_whitelist is not None and
|
||||||
|
origin not in self.federation_domain_whitelist
|
||||||
|
):
|
||||||
|
raise FederationDeniedError(origin)
|
||||||
|
|
||||||
if not json_request["signatures"]:
|
if not json_request["signatures"]:
|
||||||
raise NoAuthenticationError(
|
raise NoAuthenticationError(
|
||||||
401, "Missing Authorization headers", Codes.UNAUTHORIZED,
|
401, "Missing Authorization headers", Codes.UNAUTHORIZED,
|
||||||
|
|
|
@ -15,8 +15,14 @@
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
"""Contains handlers for federation events."""
|
"""Contains handlers for federation events."""
|
||||||
|
|
||||||
|
import httplib
|
||||||
|
import itertools
|
||||||
|
import logging
|
||||||
|
|
||||||
from signedjson.key import decode_verify_key_bytes
|
from signedjson.key import decode_verify_key_bytes
|
||||||
from signedjson.sign import verify_signed_json
|
from signedjson.sign import verify_signed_json
|
||||||
|
from twisted.internet import defer
|
||||||
from unpaddedbase64 import decode_base64
|
from unpaddedbase64 import decode_base64
|
||||||
|
|
||||||
from ._base import BaseHandler
|
from ._base import BaseHandler
|
||||||
|
@ -43,10 +49,6 @@ from synapse.util.retryutils import NotRetryingDestination
|
||||||
|
|
||||||
from synapse.util.distributor import user_joined_room
|
from synapse.util.distributor import user_joined_room
|
||||||
|
|
||||||
from twisted.internet import defer
|
|
||||||
|
|
||||||
import itertools
|
|
||||||
import logging
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -115,6 +117,19 @@ class FederationHandler(BaseHandler):
|
||||||
logger.debug("Already seen pdu %s", pdu.event_id)
|
logger.debug("Already seen pdu %s", pdu.event_id)
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# do some initial sanity-checking of the event. In particular, make
|
||||||
|
# sure it doesn't have hundreds of prev_events or auth_events, which
|
||||||
|
# could cause a huge state resolution or cascade of event fetches.
|
||||||
|
try:
|
||||||
|
self._sanity_check_event(pdu)
|
||||||
|
except SynapseError as err:
|
||||||
|
raise FederationError(
|
||||||
|
"ERROR",
|
||||||
|
err.code,
|
||||||
|
err.msg,
|
||||||
|
affected=pdu.event_id,
|
||||||
|
)
|
||||||
|
|
||||||
# If we are currently in the process of joining this room, then we
|
# If we are currently in the process of joining this room, then we
|
||||||
# queue up events for later processing.
|
# queue up events for later processing.
|
||||||
if pdu.room_id in self.room_queues:
|
if pdu.room_id in self.room_queues:
|
||||||
|
@ -149,10 +164,6 @@ class FederationHandler(BaseHandler):
|
||||||
|
|
||||||
auth_chain = []
|
auth_chain = []
|
||||||
|
|
||||||
have_seen = yield self.store.have_events(
|
|
||||||
[ev for ev, _ in pdu.prev_events]
|
|
||||||
)
|
|
||||||
|
|
||||||
fetch_state = False
|
fetch_state = False
|
||||||
|
|
||||||
# Get missing pdus if necessary.
|
# Get missing pdus if necessary.
|
||||||
|
@ -168,7 +179,7 @@ class FederationHandler(BaseHandler):
|
||||||
)
|
)
|
||||||
|
|
||||||
prevs = {e_id for e_id, _ in pdu.prev_events}
|
prevs = {e_id for e_id, _ in pdu.prev_events}
|
||||||
seen = set(have_seen.keys())
|
seen = yield self.store.have_seen_events(prevs)
|
||||||
|
|
||||||
if min_depth and pdu.depth < min_depth:
|
if min_depth and pdu.depth < min_depth:
|
||||||
# This is so that we don't notify the user about this
|
# This is so that we don't notify the user about this
|
||||||
|
@ -196,8 +207,7 @@ class FederationHandler(BaseHandler):
|
||||||
|
|
||||||
# Update the set of things we've seen after trying to
|
# Update the set of things we've seen after trying to
|
||||||
# fetch the missing stuff
|
# fetch the missing stuff
|
||||||
have_seen = yield self.store.have_events(prevs)
|
seen = yield self.store.have_seen_events(prevs)
|
||||||
seen = set(have_seen.iterkeys())
|
|
||||||
|
|
||||||
if not prevs - seen:
|
if not prevs - seen:
|
||||||
logger.info(
|
logger.info(
|
||||||
|
@ -248,8 +258,7 @@ class FederationHandler(BaseHandler):
|
||||||
min_depth (int): Minimum depth of events to return.
|
min_depth (int): Minimum depth of events to return.
|
||||||
"""
|
"""
|
||||||
# We recalculate seen, since it may have changed.
|
# We recalculate seen, since it may have changed.
|
||||||
have_seen = yield self.store.have_events(prevs)
|
seen = yield self.store.have_seen_events(prevs)
|
||||||
seen = set(have_seen.keys())
|
|
||||||
|
|
||||||
if not prevs - seen:
|
if not prevs - seen:
|
||||||
return
|
return
|
||||||
|
@ -361,9 +370,7 @@ class FederationHandler(BaseHandler):
|
||||||
if auth_chain:
|
if auth_chain:
|
||||||
event_ids |= {e.event_id for e in auth_chain}
|
event_ids |= {e.event_id for e in auth_chain}
|
||||||
|
|
||||||
seen_ids = set(
|
seen_ids = yield self.store.have_seen_events(event_ids)
|
||||||
(yield self.store.have_events(event_ids)).keys()
|
|
||||||
)
|
|
||||||
|
|
||||||
if state and auth_chain is not None:
|
if state and auth_chain is not None:
|
||||||
# If we have any state or auth_chain given to us by the replication
|
# If we have any state or auth_chain given to us by the replication
|
||||||
|
@ -527,9 +534,16 @@ class FederationHandler(BaseHandler):
|
||||||
def backfill(self, dest, room_id, limit, extremities):
|
def backfill(self, dest, room_id, limit, extremities):
|
||||||
""" Trigger a backfill request to `dest` for the given `room_id`
|
""" Trigger a backfill request to `dest` for the given `room_id`
|
||||||
|
|
||||||
This will attempt to get more events from the remote. This may return
|
This will attempt to get more events from the remote. If the other side
|
||||||
be successfull and still return no events if the other side has no new
|
has no new events to offer, this will return an empty list.
|
||||||
events to offer.
|
|
||||||
|
As the events are received, we check their signatures, and also do some
|
||||||
|
sanity-checking on them. If any of the backfilled events are invalid,
|
||||||
|
this method throws a SynapseError.
|
||||||
|
|
||||||
|
TODO: make this more useful to distinguish failures of the remote
|
||||||
|
server from invalid events (there is probably no point in trying to
|
||||||
|
re-fetch invalid events from every other HS in the room.)
|
||||||
"""
|
"""
|
||||||
if dest == self.server_name:
|
if dest == self.server_name:
|
||||||
raise SynapseError(400, "Can't backfill from self.")
|
raise SynapseError(400, "Can't backfill from self.")
|
||||||
|
@ -541,6 +555,16 @@ class FederationHandler(BaseHandler):
|
||||||
extremities=extremities,
|
extremities=extremities,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# ideally we'd sanity check the events here for excess prev_events etc,
|
||||||
|
# but it's hard to reject events at this point without completely
|
||||||
|
# breaking backfill in the same way that it is currently broken by
|
||||||
|
# events whose signature we cannot verify (#3121).
|
||||||
|
#
|
||||||
|
# So for now we accept the events anyway. #3124 tracks this.
|
||||||
|
#
|
||||||
|
# for ev in events:
|
||||||
|
# self._sanity_check_event(ev)
|
||||||
|
|
||||||
# Don't bother processing events we already have.
|
# Don't bother processing events we already have.
|
||||||
seen_events = yield self.store.have_events_in_timeline(
|
seen_events = yield self.store.have_events_in_timeline(
|
||||||
set(e.event_id for e in events)
|
set(e.event_id for e in events)
|
||||||
|
@ -633,7 +657,7 @@ class FederationHandler(BaseHandler):
|
||||||
|
|
||||||
failed_to_fetch = missing_auth - set(auth_events)
|
failed_to_fetch = missing_auth - set(auth_events)
|
||||||
|
|
||||||
seen_events = yield self.store.have_events(
|
seen_events = yield self.store.have_seen_events(
|
||||||
set(auth_events.keys()) | set(state_events.keys())
|
set(auth_events.keys()) | set(state_events.keys())
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -843,6 +867,38 @@ class FederationHandler(BaseHandler):
|
||||||
|
|
||||||
defer.returnValue(False)
|
defer.returnValue(False)
|
||||||
|
|
||||||
|
def _sanity_check_event(self, ev):
|
||||||
|
"""
|
||||||
|
Do some early sanity checks of a received event
|
||||||
|
|
||||||
|
In particular, checks it doesn't have an excessive number of
|
||||||
|
prev_events or auth_events, which could cause a huge state resolution
|
||||||
|
or cascade of event fetches.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
ev (synapse.events.EventBase): event to be checked
|
||||||
|
|
||||||
|
Returns: None
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
SynapseError if the event does not pass muster
|
||||||
|
"""
|
||||||
|
if len(ev.prev_events) > 20:
|
||||||
|
logger.warn("Rejecting event %s which has %i prev_events",
|
||||||
|
ev.event_id, len(ev.prev_events))
|
||||||
|
raise SynapseError(
|
||||||
|
httplib.BAD_REQUEST,
|
||||||
|
"Too many prev_events",
|
||||||
|
)
|
||||||
|
|
||||||
|
if len(ev.auth_events) > 10:
|
||||||
|
logger.warn("Rejecting event %s which has %i auth_events",
|
||||||
|
ev.event_id, len(ev.auth_events))
|
||||||
|
raise SynapseError(
|
||||||
|
httplib.BAD_REQUEST,
|
||||||
|
"Too many auth_events",
|
||||||
|
)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def send_invite(self, target_host, event):
|
def send_invite(self, target_host, event):
|
||||||
""" Sends the invite to the remote server for signing.
|
""" Sends the invite to the remote server for signing.
|
||||||
|
@ -1736,7 +1792,8 @@ class FederationHandler(BaseHandler):
|
||||||
event_key = None
|
event_key = None
|
||||||
|
|
||||||
if event_auth_events - current_state:
|
if event_auth_events - current_state:
|
||||||
have_events = yield self.store.have_events(
|
# TODO: can we use store.have_seen_events here instead?
|
||||||
|
have_events = yield self.store.get_seen_events_with_rejections(
|
||||||
event_auth_events - current_state
|
event_auth_events - current_state
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
|
@ -1759,12 +1816,12 @@ class FederationHandler(BaseHandler):
|
||||||
origin, event.room_id, event.event_id
|
origin, event.room_id, event.event_id
|
||||||
)
|
)
|
||||||
|
|
||||||
seen_remotes = yield self.store.have_events(
|
seen_remotes = yield self.store.have_seen_events(
|
||||||
[e.event_id for e in remote_auth_chain]
|
[e.event_id for e in remote_auth_chain]
|
||||||
)
|
)
|
||||||
|
|
||||||
for e in remote_auth_chain:
|
for e in remote_auth_chain:
|
||||||
if e.event_id in seen_remotes.keys():
|
if e.event_id in seen_remotes:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if e.event_id == event.event_id:
|
if e.event_id == event.event_id:
|
||||||
|
@ -1791,7 +1848,7 @@ class FederationHandler(BaseHandler):
|
||||||
except AuthError:
|
except AuthError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
have_events = yield self.store.have_events(
|
have_events = yield self.store.get_seen_events_with_rejections(
|
||||||
[e_id for e_id, _ in event.auth_events]
|
[e_id for e_id, _ in event.auth_events]
|
||||||
)
|
)
|
||||||
seen_events = set(have_events.keys())
|
seen_events = set(have_events.keys())
|
||||||
|
@ -1876,13 +1933,13 @@ class FederationHandler(BaseHandler):
|
||||||
local_auth_chain,
|
local_auth_chain,
|
||||||
)
|
)
|
||||||
|
|
||||||
seen_remotes = yield self.store.have_events(
|
seen_remotes = yield self.store.have_seen_events(
|
||||||
[e.event_id for e in result["auth_chain"]]
|
[e.event_id for e in result["auth_chain"]]
|
||||||
)
|
)
|
||||||
|
|
||||||
# 3. Process any remote auth chain events we haven't seen.
|
# 3. Process any remote auth chain events we haven't seen.
|
||||||
for ev in result["auth_chain"]:
|
for ev in result["auth_chain"]:
|
||||||
if ev.event_id in seen_remotes.keys():
|
if ev.event_id in seen_remotes:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if ev.event_id == event.event_id:
|
if ev.event_id == event.event_id:
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
from twisted.internet import defer, reactor
|
from twisted.internet import defer, reactor
|
||||||
from twisted.python.failure import Failure
|
from twisted.python.failure import Failure
|
||||||
|
|
||||||
from synapse.api.constants import EventTypes, Membership
|
from synapse.api.constants import EventTypes, Membership, MAX_DEPTH
|
||||||
from synapse.api.errors import AuthError, Codes, SynapseError
|
from synapse.api.errors import AuthError, Codes, SynapseError
|
||||||
from synapse.crypto.event_signing import add_hashes_and_signatures
|
from synapse.crypto.event_signing import add_hashes_and_signatures
|
||||||
from synapse.events.utils import serialize_event
|
from synapse.events.utils import serialize_event
|
||||||
|
@ -37,7 +37,6 @@ from ._base import BaseHandler
|
||||||
from canonicaljson import encode_canonical_json
|
from canonicaljson import encode_canonical_json
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
import random
|
|
||||||
import simplejson
|
import simplejson
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
@ -433,7 +432,7 @@ class EventCreationHandler(object):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def create_event(self, requester, event_dict, token_id=None, txn_id=None,
|
def create_event(self, requester, event_dict, token_id=None, txn_id=None,
|
||||||
prev_event_ids=None):
|
prev_events_and_hashes=None):
|
||||||
"""
|
"""
|
||||||
Given a dict from a client, create a new event.
|
Given a dict from a client, create a new event.
|
||||||
|
|
||||||
|
@ -447,7 +446,13 @@ class EventCreationHandler(object):
|
||||||
event_dict (dict): An entire event
|
event_dict (dict): An entire event
|
||||||
token_id (str)
|
token_id (str)
|
||||||
txn_id (str)
|
txn_id (str)
|
||||||
prev_event_ids (list): The prev event ids to use when creating the event
|
|
||||||
|
prev_events_and_hashes (list[(str, dict[str, str], int)]|None):
|
||||||
|
the forward extremities to use as the prev_events for the
|
||||||
|
new event. For each event, a tuple of (event_id, hashes, depth)
|
||||||
|
where *hashes* is a map from algorithm to hash.
|
||||||
|
|
||||||
|
If None, they will be requested from the database.
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
Tuple of created event (FrozenEvent), Context
|
Tuple of created event (FrozenEvent), Context
|
||||||
|
@ -485,7 +490,7 @@ class EventCreationHandler(object):
|
||||||
event, context = yield self.create_new_client_event(
|
event, context = yield self.create_new_client_event(
|
||||||
builder=builder,
|
builder=builder,
|
||||||
requester=requester,
|
requester=requester,
|
||||||
prev_event_ids=prev_event_ids,
|
prev_events_and_hashes=prev_events_and_hashes,
|
||||||
)
|
)
|
||||||
|
|
||||||
defer.returnValue((event, context))
|
defer.returnValue((event, context))
|
||||||
|
@ -588,38 +593,47 @@ class EventCreationHandler(object):
|
||||||
|
|
||||||
@measure_func("create_new_client_event")
|
@measure_func("create_new_client_event")
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def create_new_client_event(self, builder, requester=None, prev_event_ids=None):
|
def create_new_client_event(self, builder, requester=None,
|
||||||
if prev_event_ids:
|
prev_events_and_hashes=None):
|
||||||
prev_events = yield self.store.add_event_hashes(prev_event_ids)
|
"""Create a new event for a local client
|
||||||
prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids)
|
|
||||||
depth = prev_max_depth + 1
|
Args:
|
||||||
else:
|
builder (EventBuilder):
|
||||||
latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room(
|
|
||||||
builder.room_id,
|
requester (synapse.types.Requester|None):
|
||||||
|
|
||||||
|
prev_events_and_hashes (list[(str, dict[str, str], int)]|None):
|
||||||
|
the forward extremities to use as the prev_events for the
|
||||||
|
new event. For each event, a tuple of (event_id, hashes, depth)
|
||||||
|
where *hashes* is a map from algorithm to hash.
|
||||||
|
|
||||||
|
If None, they will be requested from the database.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Deferred[(synapse.events.EventBase, synapse.events.snapshot.EventContext)]
|
||||||
|
"""
|
||||||
|
|
||||||
|
if prev_events_and_hashes is not None:
|
||||||
|
assert len(prev_events_and_hashes) <= 10, \
|
||||||
|
"Attempting to create an event with %i prev_events" % (
|
||||||
|
len(prev_events_and_hashes),
|
||||||
)
|
)
|
||||||
|
else:
|
||||||
|
prev_events_and_hashes = \
|
||||||
|
yield self.store.get_prev_events_for_room(builder.room_id)
|
||||||
|
|
||||||
# We want to limit the max number of prev events we point to in our
|
if prev_events_and_hashes:
|
||||||
# new event
|
depth = max([d for _, _, d in prev_events_and_hashes]) + 1
|
||||||
if len(latest_ret) > 10:
|
# we cap depth of generated events, to ensure that they are not
|
||||||
# Sort by reverse depth, so we point to the most recent.
|
# rejected by other servers (and so that they can be persisted in
|
||||||
latest_ret.sort(key=lambda a: -a[2])
|
# the db)
|
||||||
new_latest_ret = latest_ret[:5]
|
depth = min(depth, MAX_DEPTH)
|
||||||
|
|
||||||
# We also randomly point to some of the older events, to make
|
|
||||||
# sure that we don't completely ignore the older events.
|
|
||||||
if latest_ret[5:]:
|
|
||||||
sample_size = min(5, len(latest_ret[5:]))
|
|
||||||
new_latest_ret.extend(random.sample(latest_ret[5:], sample_size))
|
|
||||||
latest_ret = new_latest_ret
|
|
||||||
|
|
||||||
if latest_ret:
|
|
||||||
depth = max([d for _, _, d in latest_ret]) + 1
|
|
||||||
else:
|
else:
|
||||||
depth = 1
|
depth = 1
|
||||||
|
|
||||||
prev_events = [
|
prev_events = [
|
||||||
(event_id, prev_hashes)
|
(event_id, prev_hashes)
|
||||||
for event_id, prev_hashes, _ in latest_ret
|
for event_id, prev_hashes, _ in prev_events_and_hashes
|
||||||
]
|
]
|
||||||
|
|
||||||
builder.prev_events = prev_events
|
builder.prev_events = prev_events
|
||||||
|
|
|
@ -20,7 +20,6 @@ from ._base import BaseHandler
|
||||||
from synapse.api.constants import (
|
from synapse.api.constants import (
|
||||||
EventTypes, JoinRules,
|
EventTypes, JoinRules,
|
||||||
)
|
)
|
||||||
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
|
|
||||||
from synapse.util.async import concurrently_execute
|
from synapse.util.async import concurrently_execute
|
||||||
from synapse.util.caches.descriptors import cachedInlineCallbacks
|
from synapse.util.caches.descriptors import cachedInlineCallbacks
|
||||||
from synapse.util.caches.response_cache import ResponseCache
|
from synapse.util.caches.response_cache import ResponseCache
|
||||||
|
@ -78,18 +77,11 @@ class RoomListHandler(BaseHandler):
|
||||||
)
|
)
|
||||||
|
|
||||||
key = (limit, since_token, network_tuple)
|
key = (limit, since_token, network_tuple)
|
||||||
result = self.response_cache.get(key)
|
return self.response_cache.wrap(
|
||||||
if not result:
|
|
||||||
logger.info("No cached result, calculating one.")
|
|
||||||
result = self.response_cache.set(
|
|
||||||
key,
|
key,
|
||||||
preserve_fn(self._get_public_room_list)(
|
self._get_public_room_list,
|
||||||
limit, since_token, network_tuple=network_tuple
|
limit, since_token, network_tuple=network_tuple,
|
||||||
)
|
)
|
||||||
)
|
|
||||||
else:
|
|
||||||
logger.info("Using cached deferred result.")
|
|
||||||
return make_deferred_yieldable(result)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _get_public_room_list(self, limit=None, since_token=None,
|
def _get_public_room_list(self, limit=None, since_token=None,
|
||||||
|
@ -423,18 +415,14 @@ class RoomListHandler(BaseHandler):
|
||||||
server_name, limit, since_token, include_all_networks,
|
server_name, limit, since_token, include_all_networks,
|
||||||
third_party_instance_id,
|
third_party_instance_id,
|
||||||
)
|
)
|
||||||
result = self.remote_response_cache.get(key)
|
return self.remote_response_cache.wrap(
|
||||||
if not result:
|
|
||||||
result = self.remote_response_cache.set(
|
|
||||||
key,
|
key,
|
||||||
repl_layer.get_public_rooms(
|
repl_layer.get_public_rooms,
|
||||||
server_name, limit=limit, since_token=since_token,
|
server_name, limit=limit, since_token=since_token,
|
||||||
search_filter=search_filter,
|
search_filter=search_filter,
|
||||||
include_all_networks=include_all_networks,
|
include_all_networks=include_all_networks,
|
||||||
third_party_instance_id=third_party_instance_id,
|
third_party_instance_id=third_party_instance_id,
|
||||||
)
|
)
|
||||||
)
|
|
||||||
return result
|
|
||||||
|
|
||||||
|
|
||||||
class RoomListNextBatch(namedtuple("RoomListNextBatch", (
|
class RoomListNextBatch(namedtuple("RoomListNextBatch", (
|
||||||
|
|
|
@ -149,7 +149,7 @@ class RoomMemberHandler(object):
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _local_membership_update(
|
def _local_membership_update(
|
||||||
self, requester, target, room_id, membership,
|
self, requester, target, room_id, membership,
|
||||||
prev_event_ids,
|
prev_events_and_hashes,
|
||||||
txn_id=None,
|
txn_id=None,
|
||||||
ratelimit=True,
|
ratelimit=True,
|
||||||
content=None,
|
content=None,
|
||||||
|
@ -175,7 +175,7 @@ class RoomMemberHandler(object):
|
||||||
},
|
},
|
||||||
token_id=requester.access_token_id,
|
token_id=requester.access_token_id,
|
||||||
txn_id=txn_id,
|
txn_id=txn_id,
|
||||||
prev_event_ids=prev_event_ids,
|
prev_events_and_hashes=prev_events_and_hashes,
|
||||||
)
|
)
|
||||||
|
|
||||||
# Check if this event matches the previous membership event for the user.
|
# Check if this event matches the previous membership event for the user.
|
||||||
|
@ -314,7 +314,12 @@ class RoomMemberHandler(object):
|
||||||
403, "Invites have been disabled on this server",
|
403, "Invites have been disabled on this server",
|
||||||
)
|
)
|
||||||
|
|
||||||
latest_event_ids = yield self.store.get_latest_event_ids_in_room(room_id)
|
prev_events_and_hashes = yield self.store.get_prev_events_for_room(
|
||||||
|
room_id,
|
||||||
|
)
|
||||||
|
latest_event_ids = (
|
||||||
|
event_id for (event_id, _, _) in prev_events_and_hashes
|
||||||
|
)
|
||||||
current_state_ids = yield self.state_handler.get_current_state_ids(
|
current_state_ids = yield self.state_handler.get_current_state_ids(
|
||||||
room_id, latest_event_ids=latest_event_ids,
|
room_id, latest_event_ids=latest_event_ids,
|
||||||
)
|
)
|
||||||
|
@ -403,7 +408,7 @@ class RoomMemberHandler(object):
|
||||||
membership=effective_membership_state,
|
membership=effective_membership_state,
|
||||||
txn_id=txn_id,
|
txn_id=txn_id,
|
||||||
ratelimit=ratelimit,
|
ratelimit=ratelimit,
|
||||||
prev_event_ids=latest_event_ids,
|
prev_events_and_hashes=prev_events_and_hashes,
|
||||||
content=content,
|
content=content,
|
||||||
)
|
)
|
||||||
defer.returnValue(res)
|
defer.returnValue(res)
|
||||||
|
|
|
@ -15,7 +15,7 @@
|
||||||
|
|
||||||
from synapse.api.constants import Membership, EventTypes
|
from synapse.api.constants import Membership, EventTypes
|
||||||
from synapse.util.async import concurrently_execute
|
from synapse.util.async import concurrently_execute
|
||||||
from synapse.util.logcontext import LoggingContext, make_deferred_yieldable, preserve_fn
|
from synapse.util.logcontext import LoggingContext
|
||||||
from synapse.util.metrics import Measure, measure_func
|
from synapse.util.metrics import Measure, measure_func
|
||||||
from synapse.util.caches.response_cache import ResponseCache
|
from synapse.util.caches.response_cache import ResponseCache
|
||||||
from synapse.push.clientformat import format_push_rules_for_user
|
from synapse.push.clientformat import format_push_rules_for_user
|
||||||
|
@ -52,6 +52,7 @@ class TimelineBatch(collections.namedtuple("TimelineBatch", [
|
||||||
to tell if room needs to be part of the sync result.
|
to tell if room needs to be part of the sync result.
|
||||||
"""
|
"""
|
||||||
return bool(self.events)
|
return bool(self.events)
|
||||||
|
__bool__ = __nonzero__ # python3
|
||||||
|
|
||||||
|
|
||||||
class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
|
class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
|
||||||
|
@ -76,6 +77,7 @@ class JoinedSyncResult(collections.namedtuple("JoinedSyncResult", [
|
||||||
# nb the notification count does not, er, count: if there's nothing
|
# nb the notification count does not, er, count: if there's nothing
|
||||||
# else in the result, we don't need to send it.
|
# else in the result, we don't need to send it.
|
||||||
)
|
)
|
||||||
|
__bool__ = __nonzero__ # python3
|
||||||
|
|
||||||
|
|
||||||
class ArchivedSyncResult(collections.namedtuple("ArchivedSyncResult", [
|
class ArchivedSyncResult(collections.namedtuple("ArchivedSyncResult", [
|
||||||
|
@ -95,6 +97,7 @@ class ArchivedSyncResult(collections.namedtuple("ArchivedSyncResult", [
|
||||||
or self.state
|
or self.state
|
||||||
or self.account_data
|
or self.account_data
|
||||||
)
|
)
|
||||||
|
__bool__ = __nonzero__ # python3
|
||||||
|
|
||||||
|
|
||||||
class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
|
class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
|
||||||
|
@ -106,6 +109,7 @@ class InvitedSyncResult(collections.namedtuple("InvitedSyncResult", [
|
||||||
def __nonzero__(self):
|
def __nonzero__(self):
|
||||||
"""Invited rooms should always be reported to the client"""
|
"""Invited rooms should always be reported to the client"""
|
||||||
return True
|
return True
|
||||||
|
__bool__ = __nonzero__ # python3
|
||||||
|
|
||||||
|
|
||||||
class GroupsSyncResult(collections.namedtuple("GroupsSyncResult", [
|
class GroupsSyncResult(collections.namedtuple("GroupsSyncResult", [
|
||||||
|
@ -117,6 +121,7 @@ class GroupsSyncResult(collections.namedtuple("GroupsSyncResult", [
|
||||||
|
|
||||||
def __nonzero__(self):
|
def __nonzero__(self):
|
||||||
return bool(self.join or self.invite or self.leave)
|
return bool(self.join or self.invite or self.leave)
|
||||||
|
__bool__ = __nonzero__ # python3
|
||||||
|
|
||||||
|
|
||||||
class DeviceLists(collections.namedtuple("DeviceLists", [
|
class DeviceLists(collections.namedtuple("DeviceLists", [
|
||||||
|
@ -127,6 +132,7 @@ class DeviceLists(collections.namedtuple("DeviceLists", [
|
||||||
|
|
||||||
def __nonzero__(self):
|
def __nonzero__(self):
|
||||||
return bool(self.changed or self.left)
|
return bool(self.changed or self.left)
|
||||||
|
__bool__ = __nonzero__ # python3
|
||||||
|
|
||||||
|
|
||||||
class SyncResult(collections.namedtuple("SyncResult", [
|
class SyncResult(collections.namedtuple("SyncResult", [
|
||||||
|
@ -159,6 +165,7 @@ class SyncResult(collections.namedtuple("SyncResult", [
|
||||||
self.device_lists or
|
self.device_lists or
|
||||||
self.groups
|
self.groups
|
||||||
)
|
)
|
||||||
|
__bool__ = __nonzero__ # python3
|
||||||
|
|
||||||
|
|
||||||
class SyncHandler(object):
|
class SyncHandler(object):
|
||||||
|
@ -180,15 +187,11 @@ class SyncHandler(object):
|
||||||
Returns:
|
Returns:
|
||||||
A Deferred SyncResult.
|
A Deferred SyncResult.
|
||||||
"""
|
"""
|
||||||
result = self.response_cache.get(sync_config.request_key)
|
return self.response_cache.wrap(
|
||||||
if not result:
|
|
||||||
result = self.response_cache.set(
|
|
||||||
sync_config.request_key,
|
sync_config.request_key,
|
||||||
preserve_fn(self._wait_for_sync_for_user)(
|
self._wait_for_sync_for_user,
|
||||||
sync_config, since_token, timeout, full_state
|
sync_config, since_token, timeout, full_state,
|
||||||
)
|
)
|
||||||
)
|
|
||||||
return make_deferred_yieldable(result)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _wait_for_sync_for_user(self, sync_config, since_token, timeout,
|
def _wait_for_sync_for_user(self, sync_config, since_token, timeout,
|
||||||
|
|
|
@ -144,6 +144,7 @@ class _NotifierUserStream(object):
|
||||||
class EventStreamResult(namedtuple("EventStreamResult", ("events", "tokens"))):
|
class EventStreamResult(namedtuple("EventStreamResult", ("events", "tokens"))):
|
||||||
def __nonzero__(self):
|
def __nonzero__(self):
|
||||||
return bool(self.events)
|
return bool(self.events)
|
||||||
|
__bool__ = __nonzero__ # python3
|
||||||
|
|
||||||
|
|
||||||
class Notifier(object):
|
class Notifier(object):
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
# Copyright 2015, 2016 OpenMarket Ltd
|
# Copyright 2015, 2016 OpenMarket Ltd
|
||||||
# Copyright 2017 Vector Creations Ltd
|
# Copyright 2017 Vector Creations Ltd
|
||||||
|
# Copyright 2018 New Vector Ltd
|
||||||
#
|
#
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||||
# you may not use this file except in compliance with the License.
|
# you may not use this file except in compliance with the License.
|
||||||
|
@ -18,15 +19,31 @@ from distutils.version import LooseVersion
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
# this dict maps from python package name to a list of modules we expect it to
|
||||||
|
# provide.
|
||||||
|
#
|
||||||
|
# the key is a "requirement specifier", as used as a parameter to `pip
|
||||||
|
# install`[1], or an `install_requires` argument to `setuptools.setup` [2].
|
||||||
|
#
|
||||||
|
# the value is a sequence of strings; each entry should be the name of the
|
||||||
|
# python module, optionally followed by a version assertion which can be either
|
||||||
|
# ">=<ver>" or "==<ver>".
|
||||||
|
#
|
||||||
|
# [1] https://pip.pypa.io/en/stable/reference/pip_install/#requirement-specifiers.
|
||||||
|
# [2] https://setuptools.readthedocs.io/en/latest/setuptools.html#declaring-dependencies
|
||||||
REQUIREMENTS = {
|
REQUIREMENTS = {
|
||||||
"jsonschema>=2.5.1": ["jsonschema>=2.5.1"],
|
"jsonschema>=2.5.1": ["jsonschema>=2.5.1"],
|
||||||
"frozendict>=0.4": ["frozendict"],
|
"frozendict>=0.4": ["frozendict"],
|
||||||
"unpaddedbase64>=1.1.0": ["unpaddedbase64>=1.1.0"],
|
"unpaddedbase64>=1.1.0": ["unpaddedbase64>=1.1.0"],
|
||||||
"canonicaljson>=1.0.0": ["canonicaljson>=1.0.0"],
|
"canonicaljson>=1.1.3": ["canonicaljson>=1.1.3"],
|
||||||
"signedjson>=1.0.0": ["signedjson>=1.0.0"],
|
"signedjson>=1.0.0": ["signedjson>=1.0.0"],
|
||||||
"pynacl>=1.2.1": ["nacl>=1.2.1", "nacl.bindings"],
|
"pynacl>=1.2.1": ["nacl>=1.2.1", "nacl.bindings"],
|
||||||
"service_identity>=1.0.0": ["service_identity>=1.0.0"],
|
"service_identity>=1.0.0": ["service_identity>=1.0.0"],
|
||||||
"Twisted>=16.0.0": ["twisted>=16.0.0"],
|
|
||||||
|
# we break under Twisted 18.4
|
||||||
|
# (https://github.com/matrix-org/synapse/issues/3135)
|
||||||
|
"Twisted>=16.0.0,<18.4": ["twisted>=16.0.0"],
|
||||||
|
|
||||||
"pyopenssl>=0.14": ["OpenSSL>=0.14"],
|
"pyopenssl>=0.14": ["OpenSSL>=0.14"],
|
||||||
"pyyaml": ["yaml"],
|
"pyyaml": ["yaml"],
|
||||||
"pyasn1": ["pyasn1"],
|
"pyasn1": ["pyasn1"],
|
||||||
|
@ -34,11 +51,12 @@ REQUIREMENTS = {
|
||||||
"bcrypt": ["bcrypt>=3.1.0"],
|
"bcrypt": ["bcrypt>=3.1.0"],
|
||||||
"pillow": ["PIL"],
|
"pillow": ["PIL"],
|
||||||
"pydenticon": ["pydenticon"],
|
"pydenticon": ["pydenticon"],
|
||||||
|
"blist": ["blist"],
|
||||||
"pysaml2>=3.0.0": ["saml2>=3.0.0"],
|
"pysaml2>=3.0.0": ["saml2>=3.0.0"],
|
||||||
"sortedcontainers": ["sortedcontainers"],
|
|
||||||
"pymacaroons-pynacl": ["pymacaroons"],
|
"pymacaroons-pynacl": ["pymacaroons"],
|
||||||
"msgpack-python>=0.3.0": ["msgpack"],
|
"msgpack-python>=0.3.0": ["msgpack"],
|
||||||
"phonenumbers>=8.2.0": ["phonenumbers"],
|
"phonenumbers>=8.2.0": ["phonenumbers"],
|
||||||
|
"six": ["six"],
|
||||||
}
|
}
|
||||||
CONDITIONAL_REQUIREMENTS = {
|
CONDITIONAL_REQUIREMENTS = {
|
||||||
"web_client": {
|
"web_client": {
|
||||||
|
|
|
@ -23,7 +23,6 @@ from synapse.events.snapshot import EventContext
|
||||||
from synapse.http.servlet import RestServlet, parse_json_object_from_request
|
from synapse.http.servlet import RestServlet, parse_json_object_from_request
|
||||||
from synapse.util.async import sleep
|
from synapse.util.async import sleep
|
||||||
from synapse.util.caches.response_cache import ResponseCache
|
from synapse.util.caches.response_cache import ResponseCache
|
||||||
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
|
|
||||||
from synapse.util.metrics import Measure
|
from synapse.util.metrics import Measure
|
||||||
from synapse.types import Requester, UserID
|
from synapse.types import Requester, UserID
|
||||||
|
|
||||||
|
@ -118,17 +117,12 @@ class ReplicationSendEventRestServlet(RestServlet):
|
||||||
self.response_cache = ResponseCache(hs, "send_event", timeout_ms=30 * 60 * 1000)
|
self.response_cache = ResponseCache(hs, "send_event", timeout_ms=30 * 60 * 1000)
|
||||||
|
|
||||||
def on_PUT(self, request, event_id):
|
def on_PUT(self, request, event_id):
|
||||||
result = self.response_cache.get(event_id)
|
return self.response_cache.wrap(
|
||||||
if not result:
|
|
||||||
result = self.response_cache.set(
|
|
||||||
event_id,
|
event_id,
|
||||||
self._handle_request(request)
|
self._handle_request,
|
||||||
|
request
|
||||||
)
|
)
|
||||||
else:
|
|
||||||
logger.warn("Returning cached response")
|
|
||||||
return make_deferred_yieldable(result)
|
|
||||||
|
|
||||||
@preserve_fn
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def _handle_request(self, request):
|
def _handle_request(self, request):
|
||||||
with Measure(self.clock, "repl_send_event_parse"):
|
with Measure(self.clock, "repl_send_event_parse"):
|
||||||
|
|
|
@ -12,6 +12,7 @@
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
import random
|
||||||
|
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
|
@ -24,7 +25,9 @@ from synapse.util.caches.descriptors import cached
|
||||||
from unpaddedbase64 import encode_base64
|
from unpaddedbase64 import encode_base64
|
||||||
|
|
||||||
import logging
|
import logging
|
||||||
from Queue import PriorityQueue, Empty
|
from six.moves.queue import PriorityQueue, Empty
|
||||||
|
|
||||||
|
from six.moves import range
|
||||||
|
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
@ -78,7 +81,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
|
||||||
front_list = list(front)
|
front_list = list(front)
|
||||||
chunks = [
|
chunks = [
|
||||||
front_list[x:x + 100]
|
front_list[x:x + 100]
|
||||||
for x in xrange(0, len(front), 100)
|
for x in range(0, len(front), 100)
|
||||||
]
|
]
|
||||||
for chunk in chunks:
|
for chunk in chunks:
|
||||||
txn.execute(
|
txn.execute(
|
||||||
|
@ -133,7 +136,47 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
|
||||||
retcol="event_id",
|
retcol="event_id",
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def get_prev_events_for_room(self, room_id):
|
||||||
|
"""
|
||||||
|
Gets a subset of the current forward extremities in the given room.
|
||||||
|
|
||||||
|
Limits the result to 10 extremities, so that we can avoid creating
|
||||||
|
events which refer to hundreds of prev_events.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
room_id (str): room_id
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Deferred[list[(str, dict[str, str], int)]]
|
||||||
|
for each event, a tuple of (event_id, hashes, depth)
|
||||||
|
where *hashes* is a map from algorithm to hash.
|
||||||
|
"""
|
||||||
|
res = yield self.get_latest_event_ids_and_hashes_in_room(room_id)
|
||||||
|
if len(res) > 10:
|
||||||
|
# Sort by reverse depth, so we point to the most recent.
|
||||||
|
res.sort(key=lambda a: -a[2])
|
||||||
|
|
||||||
|
# we use half of the limit for the actual most recent events, and
|
||||||
|
# the other half to randomly point to some of the older events, to
|
||||||
|
# make sure that we don't completely ignore the older events.
|
||||||
|
res = res[0:5] + random.sample(res[5:], 5)
|
||||||
|
|
||||||
|
defer.returnValue(res)
|
||||||
|
|
||||||
def get_latest_event_ids_and_hashes_in_room(self, room_id):
|
def get_latest_event_ids_and_hashes_in_room(self, room_id):
|
||||||
|
"""
|
||||||
|
Gets the current forward extremities in the given room
|
||||||
|
|
||||||
|
Args:
|
||||||
|
room_id (str): room_id
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Deferred[list[(str, dict[str, str], int)]]
|
||||||
|
for each event, a tuple of (event_id, hashes, depth)
|
||||||
|
where *hashes* is a map from algorithm to hash.
|
||||||
|
"""
|
||||||
|
|
||||||
return self.runInteraction(
|
return self.runInteraction(
|
||||||
"get_latest_event_ids_and_hashes_in_room",
|
"get_latest_event_ids_and_hashes_in_room",
|
||||||
self._get_latest_event_ids_and_hashes_in_room,
|
self._get_latest_event_ids_and_hashes_in_room,
|
||||||
|
@ -182,22 +225,6 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore,
|
||||||
room_id,
|
room_id,
|
||||||
)
|
)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
|
||||||
def get_max_depth_of_events(self, event_ids):
|
|
||||||
sql = (
|
|
||||||
"SELECT MAX(depth) FROM events WHERE event_id IN (%s)"
|
|
||||||
) % (",".join(["?"] * len(event_ids)),)
|
|
||||||
|
|
||||||
rows = yield self._execute(
|
|
||||||
"get_max_depth_of_events", None,
|
|
||||||
sql, *event_ids
|
|
||||||
)
|
|
||||||
|
|
||||||
if rows:
|
|
||||||
defer.returnValue(rows[0][0])
|
|
||||||
else:
|
|
||||||
defer.returnValue(1)
|
|
||||||
|
|
||||||
def _get_min_depth_interaction(self, txn, room_id):
|
def _get_min_depth_interaction(self, txn, room_id):
|
||||||
min_depth = self._simple_select_one_onecol_txn(
|
min_depth = self._simple_select_one_onecol_txn(
|
||||||
txn,
|
txn,
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
|
|
||||||
from collections import OrderedDict, deque, namedtuple
|
from collections import OrderedDict, deque, namedtuple
|
||||||
from functools import wraps
|
from functools import wraps
|
||||||
|
import itertools
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
import simplejson as json
|
import simplejson as json
|
||||||
|
@ -1320,13 +1321,49 @@ class EventsStore(EventsWorkerStore):
|
||||||
|
|
||||||
defer.returnValue(set(r["event_id"] for r in rows))
|
defer.returnValue(set(r["event_id"] for r in rows))
|
||||||
|
|
||||||
def have_events(self, event_ids):
|
@defer.inlineCallbacks
|
||||||
|
def have_seen_events(self, event_ids):
|
||||||
"""Given a list of event ids, check if we have already processed them.
|
"""Given a list of event ids, check if we have already processed them.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
event_ids (iterable[str]):
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
dict: Has an entry for each event id we already have seen. Maps to
|
Deferred[set[str]]: The events we have already seen.
|
||||||
the rejected reason string if we rejected the event, else maps to
|
"""
|
||||||
None.
|
results = set()
|
||||||
|
|
||||||
|
def have_seen_events_txn(txn, chunk):
|
||||||
|
sql = (
|
||||||
|
"SELECT event_id FROM events as e WHERE e.event_id IN (%s)"
|
||||||
|
% (",".join("?" * len(chunk)), )
|
||||||
|
)
|
||||||
|
txn.execute(sql, chunk)
|
||||||
|
for (event_id, ) in txn:
|
||||||
|
results.add(event_id)
|
||||||
|
|
||||||
|
# break the input up into chunks of 100
|
||||||
|
input_iterator = iter(event_ids)
|
||||||
|
for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)),
|
||||||
|
[]):
|
||||||
|
yield self.runInteraction(
|
||||||
|
"have_seen_events",
|
||||||
|
have_seen_events_txn,
|
||||||
|
chunk,
|
||||||
|
)
|
||||||
|
defer.returnValue(results)
|
||||||
|
|
||||||
|
def get_seen_events_with_rejections(self, event_ids):
|
||||||
|
"""Given a list of event ids, check if we rejected them.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
event_ids (list[str])
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Deferred[dict[str, str|None):
|
||||||
|
Has an entry for each event id we already have seen. Maps to
|
||||||
|
the rejected reason string if we rejected the event, else maps
|
||||||
|
to None.
|
||||||
"""
|
"""
|
||||||
if not event_ids:
|
if not event_ids:
|
||||||
return defer.succeed({})
|
return defer.succeed({})
|
||||||
|
@ -1348,9 +1385,7 @@ class EventsStore(EventsWorkerStore):
|
||||||
|
|
||||||
return res
|
return res
|
||||||
|
|
||||||
return self.runInteraction(
|
return self.runInteraction("get_rejection_reasons", f)
|
||||||
"have_events", f,
|
|
||||||
)
|
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def count_daily_messages(self):
|
def count_daily_messages(self):
|
||||||
|
|
|
@ -530,7 +530,7 @@ class RoomStore(RoomWorkerStore, SearchStore):
|
||||||
|
|
||||||
# Convert the IDs to MXC URIs
|
# Convert the IDs to MXC URIs
|
||||||
for media_id in local_mxcs:
|
for media_id in local_mxcs:
|
||||||
local_media_mxcs.append("mxc://%s/%s" % (self.hostname, media_id))
|
local_media_mxcs.append("mxc://%s/%s" % (self.hs.hostname, media_id))
|
||||||
for hostname, media_id in remote_mxcs:
|
for hostname, media_id in remote_mxcs:
|
||||||
remote_media_mxcs.append("mxc://%s/%s" % (hostname, media_id))
|
remote_media_mxcs.append("mxc://%s/%s" % (hostname, media_id))
|
||||||
|
|
||||||
|
@ -595,7 +595,7 @@ class RoomStore(RoomWorkerStore, SearchStore):
|
||||||
while next_token:
|
while next_token:
|
||||||
sql = """
|
sql = """
|
||||||
SELECT stream_ordering, json FROM events
|
SELECT stream_ordering, json FROM events
|
||||||
JOIN event_json USING (event_id)
|
JOIN event_json USING (room_id, event_id)
|
||||||
WHERE room_id = ?
|
WHERE room_id = ?
|
||||||
AND stream_ordering < ?
|
AND stream_ordering < ?
|
||||||
AND contains_url = ? AND outlier = ?
|
AND contains_url = ? AND outlier = ?
|
||||||
|
@ -619,7 +619,7 @@ class RoomStore(RoomWorkerStore, SearchStore):
|
||||||
if matches:
|
if matches:
|
||||||
hostname = matches.group(1)
|
hostname = matches.group(1)
|
||||||
media_id = matches.group(2)
|
media_id = matches.group(2)
|
||||||
if hostname == self.hostname:
|
if hostname == self.hs.hostname:
|
||||||
local_media_mxcs.append(media_id)
|
local_media_mxcs.append(media_id)
|
||||||
else:
|
else:
|
||||||
remote_media_mxcs.append((hostname, media_id))
|
remote_media_mxcs.append((hostname, media_id))
|
||||||
|
|
|
@ -77,7 +77,7 @@ class SearchStore(BackgroundUpdateStore):
|
||||||
sql = (
|
sql = (
|
||||||
"SELECT stream_ordering, event_id, room_id, type, json, "
|
"SELECT stream_ordering, event_id, room_id, type, json, "
|
||||||
" origin_server_ts FROM events"
|
" origin_server_ts FROM events"
|
||||||
" JOIN event_json USING (event_id)"
|
" JOIN event_json USING (room_id, event_id)"
|
||||||
" WHERE ? <= stream_ordering AND stream_ordering < ?"
|
" WHERE ? <= stream_ordering AND stream_ordering < ?"
|
||||||
" AND (%s)"
|
" AND (%s)"
|
||||||
" ORDER BY stream_ordering DESC"
|
" ORDER BY stream_ordering DESC"
|
||||||
|
|
|
@ -12,9 +12,15 @@
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
import logging
|
||||||
|
|
||||||
|
from twisted.internet import defer
|
||||||
|
|
||||||
from synapse.util.async import ObservableDeferred
|
from synapse.util.async import ObservableDeferred
|
||||||
from synapse.util.caches import metrics as cache_metrics
|
from synapse.util.caches import metrics as cache_metrics
|
||||||
|
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class ResponseCache(object):
|
class ResponseCache(object):
|
||||||
|
@ -31,6 +37,7 @@ class ResponseCache(object):
|
||||||
self.clock = hs.get_clock()
|
self.clock = hs.get_clock()
|
||||||
self.timeout_sec = timeout_ms / 1000.
|
self.timeout_sec = timeout_ms / 1000.
|
||||||
|
|
||||||
|
self._name = name
|
||||||
self._metrics = cache_metrics.register_cache(
|
self._metrics = cache_metrics.register_cache(
|
||||||
"response_cache",
|
"response_cache",
|
||||||
size_callback=lambda: self.size(),
|
size_callback=lambda: self.size(),
|
||||||
|
@ -43,15 +50,21 @@ class ResponseCache(object):
|
||||||
def get(self, key):
|
def get(self, key):
|
||||||
"""Look up the given key.
|
"""Look up the given key.
|
||||||
|
|
||||||
Returns a deferred which doesn't follow the synapse logcontext rules,
|
Can return either a new Deferred (which also doesn't follow the synapse
|
||||||
so you'll probably want to make_deferred_yieldable it.
|
logcontext rules), or, if the request has completed, the actual
|
||||||
|
result. You will probably want to make_deferred_yieldable the result.
|
||||||
|
|
||||||
|
If there is no entry for the key, returns None. It is worth noting that
|
||||||
|
this means there is no way to distinguish a completed result of None
|
||||||
|
from an absent cache entry.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
key (str):
|
key (hashable):
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
twisted.internet.defer.Deferred|None: None if there is no entry
|
twisted.internet.defer.Deferred|None|E: None if there is no entry
|
||||||
for this key; otherwise a deferred result.
|
for this key; otherwise either a deferred result or the result
|
||||||
|
itself.
|
||||||
"""
|
"""
|
||||||
result = self.pending_result_cache.get(key)
|
result = self.pending_result_cache.get(key)
|
||||||
if result is not None:
|
if result is not None:
|
||||||
|
@ -68,19 +81,17 @@ class ResponseCache(object):
|
||||||
you should wrap normal synapse deferreds with
|
you should wrap normal synapse deferreds with
|
||||||
logcontext.run_in_background).
|
logcontext.run_in_background).
|
||||||
|
|
||||||
Returns a new Deferred which also doesn't follow the synapse logcontext
|
Can return either a new Deferred (which also doesn't follow the synapse
|
||||||
rules, so you will want to make_deferred_yieldable it
|
logcontext rules), or, if *deferred* was already complete, the actual
|
||||||
|
result. You will probably want to make_deferred_yieldable the result.
|
||||||
(TODO: before using this more widely, it might make sense to refactor
|
|
||||||
it and get() so that they do the necessary wrapping rather than having
|
|
||||||
to do it everywhere ResponseCache is used.)
|
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
key (str):
|
key (hashable):
|
||||||
deferred (twisted.internet.defer.Deferred):
|
deferred (twisted.internet.defer.Deferred[T):
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
twisted.internet.defer.Deferred
|
twisted.internet.defer.Deferred[T]|T: a new deferred, or the actual
|
||||||
|
result.
|
||||||
"""
|
"""
|
||||||
result = ObservableDeferred(deferred, consumeErrors=True)
|
result = ObservableDeferred(deferred, consumeErrors=True)
|
||||||
self.pending_result_cache[key] = result
|
self.pending_result_cache[key] = result
|
||||||
|
@ -97,3 +108,52 @@ class ResponseCache(object):
|
||||||
|
|
||||||
result.addBoth(remove)
|
result.addBoth(remove)
|
||||||
return result.observe()
|
return result.observe()
|
||||||
|
|
||||||
|
def wrap(self, key, callback, *args, **kwargs):
|
||||||
|
"""Wrap together a *get* and *set* call, taking care of logcontexts
|
||||||
|
|
||||||
|
First looks up the key in the cache, and if it is present makes it
|
||||||
|
follow the synapse logcontext rules and returns it.
|
||||||
|
|
||||||
|
Otherwise, makes a call to *callback(*args, **kwargs)*, which should
|
||||||
|
follow the synapse logcontext rules, and adds the result to the cache.
|
||||||
|
|
||||||
|
Example usage:
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def handle_request(request):
|
||||||
|
# etc
|
||||||
|
defer.returnValue(result)
|
||||||
|
|
||||||
|
result = yield response_cache.wrap(
|
||||||
|
key,
|
||||||
|
handle_request,
|
||||||
|
request,
|
||||||
|
)
|
||||||
|
|
||||||
|
Args:
|
||||||
|
key (hashable): key to get/set in the cache
|
||||||
|
|
||||||
|
callback (callable): function to call if the key is not found in
|
||||||
|
the cache
|
||||||
|
|
||||||
|
*args: positional parameters to pass to the callback, if it is used
|
||||||
|
|
||||||
|
**kwargs: named paramters to pass to the callback, if it is used
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
twisted.internet.defer.Deferred: yieldable result
|
||||||
|
"""
|
||||||
|
result = self.get(key)
|
||||||
|
if not result:
|
||||||
|
logger.info("[%s]: no cached result for [%s], calculating new one",
|
||||||
|
self._name, key)
|
||||||
|
d = run_in_background(callback, *args, **kwargs)
|
||||||
|
result = self.set(key, d)
|
||||||
|
elif not isinstance(result, defer.Deferred) or result.called:
|
||||||
|
logger.info("[%s]: using completed cached result for [%s]",
|
||||||
|
self._name, key)
|
||||||
|
else:
|
||||||
|
logger.info("[%s]: using incomplete cached result for [%s]",
|
||||||
|
self._name, key)
|
||||||
|
return make_deferred_yieldable(result)
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
from synapse.util.caches import register_cache, CACHE_SIZE_FACTOR
|
from synapse.util.caches import register_cache, CACHE_SIZE_FACTOR
|
||||||
|
|
||||||
|
|
||||||
from sortedcontainers import SortedDict
|
from blist import sorteddict
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
|
|
||||||
|
@ -35,7 +35,7 @@ class StreamChangeCache(object):
|
||||||
def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache={}):
|
def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache={}):
|
||||||
self._max_size = int(max_size * CACHE_SIZE_FACTOR)
|
self._max_size = int(max_size * CACHE_SIZE_FACTOR)
|
||||||
self._entity_to_key = {}
|
self._entity_to_key = {}
|
||||||
self._cache = SortedDict()
|
self._cache = sorteddict()
|
||||||
self._earliest_known_stream_pos = current_stream_pos
|
self._earliest_known_stream_pos = current_stream_pos
|
||||||
self.name = name
|
self.name = name
|
||||||
self.metrics = register_cache(self.name, self._cache)
|
self.metrics = register_cache(self.name, self._cache)
|
||||||
|
|
|
@ -17,7 +17,7 @@ from twisted.internet import threads, reactor
|
||||||
|
|
||||||
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
|
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
|
||||||
|
|
||||||
import Queue
|
from six.moves import queue
|
||||||
|
|
||||||
|
|
||||||
class BackgroundFileConsumer(object):
|
class BackgroundFileConsumer(object):
|
||||||
|
@ -49,7 +49,7 @@ class BackgroundFileConsumer(object):
|
||||||
|
|
||||||
# Queue of slices of bytes to be written. When producer calls
|
# Queue of slices of bytes to be written. When producer calls
|
||||||
# unregister a final None is sent.
|
# unregister a final None is sent.
|
||||||
self._bytes_queue = Queue.Queue()
|
self._bytes_queue = queue.Queue()
|
||||||
|
|
||||||
# Deferred that is resolved when finished writing
|
# Deferred that is resolved when finished writing
|
||||||
self._finished_deferred = None
|
self._finished_deferred = None
|
||||||
|
|
|
@ -92,6 +92,7 @@ class LoggingContext(object):
|
||||||
|
|
||||||
def __nonzero__(self):
|
def __nonzero__(self):
|
||||||
return False
|
return False
|
||||||
|
__bool__ = __nonzero__ # python3
|
||||||
|
|
||||||
sentinel = Sentinel()
|
sentinel = Sentinel()
|
||||||
|
|
||||||
|
|
|
@ -24,7 +24,7 @@ class ConfigLoadingTestCase(unittest.TestCase):
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
self.dir = tempfile.mkdtemp()
|
self.dir = tempfile.mkdtemp()
|
||||||
print self.dir
|
print(self.dir)
|
||||||
self.file = os.path.join(self.dir, "homeserver.yaml")
|
self.file = os.path.join(self.dir, "homeserver.yaml")
|
||||||
|
|
||||||
def tearDown(self):
|
def tearDown(self):
|
||||||
|
|
|
@ -183,7 +183,7 @@ class KeyringTestCase(unittest.TestCase):
|
||||||
res_deferreds_2 = kr.verify_json_objects_for_server(
|
res_deferreds_2 = kr.verify_json_objects_for_server(
|
||||||
[("server10", json1)],
|
[("server10", json1)],
|
||||||
)
|
)
|
||||||
yield async.sleep(01)
|
yield async.sleep(1)
|
||||||
self.http_client.post_json.assert_not_called()
|
self.http_client.post_json.assert_not_called()
|
||||||
res_deferreds_2[0].addBoth(self.check_context, None)
|
res_deferreds_2[0].addBoth(self.check_context, None)
|
||||||
|
|
||||||
|
|
|
@ -480,9 +480,9 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase):
|
||||||
ApplicationServiceStore(None, hs)
|
ApplicationServiceStore(None, hs)
|
||||||
|
|
||||||
e = cm.exception
|
e = cm.exception
|
||||||
self.assertIn(f1, e.message)
|
self.assertIn(f1, str(e))
|
||||||
self.assertIn(f2, e.message)
|
self.assertIn(f2, str(e))
|
||||||
self.assertIn("id", e.message)
|
self.assertIn("id", str(e))
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
def test_duplicate_as_tokens(self):
|
def test_duplicate_as_tokens(self):
|
||||||
|
@ -504,6 +504,6 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase):
|
||||||
ApplicationServiceStore(None, hs)
|
ApplicationServiceStore(None, hs)
|
||||||
|
|
||||||
e = cm.exception
|
e = cm.exception
|
||||||
self.assertIn(f1, e.message)
|
self.assertIn(f1, str(e))
|
||||||
self.assertIn(f2, e.message)
|
self.assertIn(f2, str(e))
|
||||||
self.assertIn("as_token", e.message)
|
self.assertIn("as_token", str(e))
|
||||||
|
|
68
tests/storage/test_event_federation.py
Normal file
68
tests/storage/test_event_federation.py
Normal file
|
@ -0,0 +1,68 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
# Copyright 2018 New Vector Ltd
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the 'License');
|
||||||
|
# you may not use this file except in compliance with the License.
|
||||||
|
# You may obtain a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an 'AS IS' BASIS,
|
||||||
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
# See the License for the specific language governing permissions and
|
||||||
|
# limitations under the License.
|
||||||
|
|
||||||
|
from twisted.internet import defer
|
||||||
|
|
||||||
|
import tests.unittest
|
||||||
|
import tests.utils
|
||||||
|
|
||||||
|
|
||||||
|
class EventFederationWorkerStoreTestCase(tests.unittest.TestCase):
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def setUp(self):
|
||||||
|
hs = yield tests.utils.setup_test_homeserver()
|
||||||
|
self.store = hs.get_datastore()
|
||||||
|
|
||||||
|
@defer.inlineCallbacks
|
||||||
|
def test_get_prev_events_for_room(self):
|
||||||
|
room_id = '@ROOM:local'
|
||||||
|
|
||||||
|
# add a bunch of events and hashes to act as forward extremities
|
||||||
|
def insert_event(txn, i):
|
||||||
|
event_id = '$event_%i:local' % i
|
||||||
|
|
||||||
|
txn.execute((
|
||||||
|
"INSERT INTO events ("
|
||||||
|
" room_id, event_id, type, depth, topological_ordering,"
|
||||||
|
" content, processed, outlier) "
|
||||||
|
"VALUES (?, ?, 'm.test', ?, ?, 'test', ?, ?)"
|
||||||
|
), (room_id, event_id, i, i, True, False))
|
||||||
|
|
||||||
|
txn.execute((
|
||||||
|
'INSERT INTO event_forward_extremities (room_id, event_id) '
|
||||||
|
'VALUES (?, ?)'
|
||||||
|
), (room_id, event_id))
|
||||||
|
|
||||||
|
txn.execute((
|
||||||
|
'INSERT INTO event_reference_hashes '
|
||||||
|
'(event_id, algorithm, hash) '
|
||||||
|
"VALUES (?, 'sha256', ?)"
|
||||||
|
), (event_id, 'ffff'))
|
||||||
|
|
||||||
|
for i in range(0, 11):
|
||||||
|
yield self.store.runInteraction("insert", insert_event, i)
|
||||||
|
|
||||||
|
# this should get the last five and five others
|
||||||
|
r = yield self.store.get_prev_events_for_room(room_id)
|
||||||
|
self.assertEqual(10, len(r))
|
||||||
|
for i in range(0, 5):
|
||||||
|
el = r[i]
|
||||||
|
depth = el[2]
|
||||||
|
self.assertEqual(10 - i, depth)
|
||||||
|
|
||||||
|
for i in range(5, 5):
|
||||||
|
el = r[i]
|
||||||
|
depth = el[2]
|
||||||
|
self.assertLessEqual(5, depth)
|
|
@ -62,7 +62,7 @@ class DistributorTestCase(unittest.TestCase):
|
||||||
def test_signal_catch(self):
|
def test_signal_catch(self):
|
||||||
self.dist.declare("alarm")
|
self.dist.declare("alarm")
|
||||||
|
|
||||||
observers = [Mock() for i in 1, 2]
|
observers = [Mock() for i in (1, 2)]
|
||||||
for o in observers:
|
for o in observers:
|
||||||
self.dist.observe("alarm", o)
|
self.dist.observe("alarm", o)
|
||||||
|
|
||||||
|
|
|
@ -20,7 +20,7 @@ from mock import NonCallableMock
|
||||||
from synapse.util.file_consumer import BackgroundFileConsumer
|
from synapse.util.file_consumer import BackgroundFileConsumer
|
||||||
|
|
||||||
from tests import unittest
|
from tests import unittest
|
||||||
from StringIO import StringIO
|
from six import StringIO
|
||||||
|
|
||||||
import threading
|
import threading
|
||||||
|
|
||||||
|
|
|
@ -18,6 +18,7 @@ from tests import unittest
|
||||||
from twisted.internet import defer
|
from twisted.internet import defer
|
||||||
|
|
||||||
from synapse.util.async import Linearizer
|
from synapse.util.async import Linearizer
|
||||||
|
from six.moves import range
|
||||||
|
|
||||||
|
|
||||||
class LinearizerTestCase(unittest.TestCase):
|
class LinearizerTestCase(unittest.TestCase):
|
||||||
|
@ -58,7 +59,7 @@ class LinearizerTestCase(unittest.TestCase):
|
||||||
logcontext.LoggingContext.current_context(), lc)
|
logcontext.LoggingContext.current_context(), lc)
|
||||||
|
|
||||||
func(0, sleep=True)
|
func(0, sleep=True)
|
||||||
for i in xrange(1, 100):
|
for i in range(1, 100):
|
||||||
func(i)
|
func(i)
|
||||||
|
|
||||||
return func(1000)
|
return func(1000)
|
||||||
|
|
|
@ -59,6 +59,10 @@ def setup_test_homeserver(name="test", datastore=None, config=None, **kargs):
|
||||||
config.email_enable_notifs = False
|
config.email_enable_notifs = False
|
||||||
config.block_non_admin_invites = False
|
config.block_non_admin_invites = False
|
||||||
config.federation_domain_whitelist = None
|
config.federation_domain_whitelist = None
|
||||||
|
config.federation_rc_reject_limit = 10
|
||||||
|
config.federation_rc_sleep_limit = 10
|
||||||
|
config.federation_rc_concurrent = 10
|
||||||
|
config.filter_timeline_limit = 5000
|
||||||
config.user_directory_search_all_users = False
|
config.user_directory_search_all_users = False
|
||||||
config.replicate_user_profiles_to = []
|
config.replicate_user_profiles_to = []
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue