Merge branch 'develop' into uhoreg/e2e_backup_hash

This commit is contained in:
Hubert Chathi 2019-08-15 20:00:37 -07:00
commit 0355a75285
87 changed files with 1022 additions and 593 deletions

View file

@ -3,10 +3,6 @@
Message history can be paginated
m.room.history_visibility == "world_readable" allows/forbids appropriately for Guest users
m.room.history_visibility == "world_readable" allows/forbids appropriately for Real users
Can re-join room if re-invited
/upgrade creates a new room

1
.gitignore vendored
View file

@ -16,6 +16,7 @@ _trial_temp*/
/*.log
/*.log.config
/*.pid
/.python-version
/*.signing.key
/env/
/homeserver*.yaml

View file

@ -1,3 +1,87 @@
Synapse 1.3.0 (2019-08-15)
==========================
Bugfixes
--------
- Fix 500 Internal Server Error on `publicRooms` when the public room list was
cached. ([\#5851](https://github.com/matrix-org/synapse/issues/5851))
Synapse 1.3.0rc1 (2019-08-13)
==========================
Features
--------
- Use `M_USER_DEACTIVATED` instead of `M_UNKNOWN` for errcode when a deactivated user attempts to login. ([\#5686](https://github.com/matrix-org/synapse/issues/5686))
- Add sd_notify hooks to ease systemd integration and allows usage of Type=Notify. ([\#5732](https://github.com/matrix-org/synapse/issues/5732))
- Synapse will no longer serve any media repo admin endpoints when `enable_media_repo` is set to False in the configuration. If a media repo worker is used, the admin APIs relating to the media repo will be served from it instead. ([\#5754](https://github.com/matrix-org/synapse/issues/5754), [\#5848](https://github.com/matrix-org/synapse/issues/5848))
- Synapse can now be configured to not join remote rooms of a given "complexity" (currently, state events) over federation. This option can be used to prevent adverse performance on resource-constrained homeservers. ([\#5783](https://github.com/matrix-org/synapse/issues/5783))
- Allow defining HTML templates to serve the user on account renewal attempt when using the account validity feature. ([\#5807](https://github.com/matrix-org/synapse/issues/5807))
Bugfixes
--------
- Fix UISIs during homeserver outage. ([\#5693](https://github.com/matrix-org/synapse/issues/5693), [\#5789](https://github.com/matrix-org/synapse/issues/5789))
- Fix stack overflow in server key lookup code. ([\#5724](https://github.com/matrix-org/synapse/issues/5724))
- start.sh no longer uses deprecated cli option. ([\#5725](https://github.com/matrix-org/synapse/issues/5725))
- Log when we receive an event receipt from an unexpected origin. ([\#5743](https://github.com/matrix-org/synapse/issues/5743))
- Fix debian packaging scripts to correctly build sid packages. ([\#5775](https://github.com/matrix-org/synapse/issues/5775))
- Correctly handle redactions of redactions. ([\#5788](https://github.com/matrix-org/synapse/issues/5788))
- Return 404 instead of 403 when accessing /rooms/{roomId}/event/{eventId} for an event without the appropriate permissions. ([\#5798](https://github.com/matrix-org/synapse/issues/5798))
- Fix check that tombstone is a state event in push rules. ([\#5804](https://github.com/matrix-org/synapse/issues/5804))
- Fix error when trying to login as a deactivated user when using a worker to handle login. ([\#5806](https://github.com/matrix-org/synapse/issues/5806))
- Fix bug where user `/sync` stream could get wedged in rare circumstances. ([\#5825](https://github.com/matrix-org/synapse/issues/5825))
- The purge_remote_media.sh script was fixed. ([\#5839](https://github.com/matrix-org/synapse/issues/5839))
Deprecations and Removals
-------------------------
- Synapse now no longer accepts the `-v`/`--verbose`, `-f`/`--log-file`, or `--log-config` command line flags, and removes the deprecated `verbose` and `log_file` configuration file options. Users of these options should migrate their options into the dedicated log configuration. ([\#5678](https://github.com/matrix-org/synapse/issues/5678), [\#5729](https://github.com/matrix-org/synapse/issues/5729))
- Remove non-functional 'expire_access_token' setting. ([\#5782](https://github.com/matrix-org/synapse/issues/5782))
Internal Changes
----------------
- Make Jaeger fully configurable. ([\#5694](https://github.com/matrix-org/synapse/issues/5694))
- Add precautionary measures to prevent future abuse of `window.opener` in default welcome page. ([\#5695](https://github.com/matrix-org/synapse/issues/5695))
- Reduce database IO usage by optimising queries for current membership. ([\#5706](https://github.com/matrix-org/synapse/issues/5706), [\#5738](https://github.com/matrix-org/synapse/issues/5738), [\#5746](https://github.com/matrix-org/synapse/issues/5746), [\#5752](https://github.com/matrix-org/synapse/issues/5752), [\#5770](https://github.com/matrix-org/synapse/issues/5770), [\#5774](https://github.com/matrix-org/synapse/issues/5774), [\#5792](https://github.com/matrix-org/synapse/issues/5792), [\#5793](https://github.com/matrix-org/synapse/issues/5793))
- Improve caching when fetching `get_filtered_current_state_ids`. ([\#5713](https://github.com/matrix-org/synapse/issues/5713))
- Don't accept opentracing data from clients. ([\#5715](https://github.com/matrix-org/synapse/issues/5715))
- Speed up PostgreSQL unit tests in CI. ([\#5717](https://github.com/matrix-org/synapse/issues/5717))
- Update the coding style document. ([\#5719](https://github.com/matrix-org/synapse/issues/5719))
- Improve database query performance when recording retry intervals for remote hosts. ([\#5720](https://github.com/matrix-org/synapse/issues/5720))
- Add a set of opentracing utils. ([\#5722](https://github.com/matrix-org/synapse/issues/5722))
- Cache result of get_version_string to reduce overhead of `/version` federation requests. ([\#5730](https://github.com/matrix-org/synapse/issues/5730))
- Return 'user_type' in admin API user endpoints results. ([\#5731](https://github.com/matrix-org/synapse/issues/5731))
- Don't package the sytest test blacklist file. ([\#5733](https://github.com/matrix-org/synapse/issues/5733))
- Replace uses of returnValue with plain return, as returnValue is not needed on Python 3. ([\#5736](https://github.com/matrix-org/synapse/issues/5736))
- Blacklist some flakey tests in worker mode. ([\#5740](https://github.com/matrix-org/synapse/issues/5740))
- Fix some error cases in the caching layer. ([\#5749](https://github.com/matrix-org/synapse/issues/5749))
- Add a prometheus metric for pending cache lookups. ([\#5750](https://github.com/matrix-org/synapse/issues/5750))
- Stop trying to fetch events with event_id=None. ([\#5753](https://github.com/matrix-org/synapse/issues/5753))
- Convert RedactionTestCase to modern test style. ([\#5768](https://github.com/matrix-org/synapse/issues/5768))
- Allow looping calls to be given arguments. ([\#5780](https://github.com/matrix-org/synapse/issues/5780))
- Set the logs emitted when checking typing and presence timeouts to DEBUG level, not INFO. ([\#5785](https://github.com/matrix-org/synapse/issues/5785))
- Remove DelayedCall debugging from the test suite, as it is no longer required in the vast majority of Synapse's tests. ([\#5787](https://github.com/matrix-org/synapse/issues/5787))
- Remove some spurious exceptions from the logs where we failed to talk to a remote server. ([\#5790](https://github.com/matrix-org/synapse/issues/5790))
- Improve performance when making `.well-known` requests by sharing the SSL options between requests. ([\#5794](https://github.com/matrix-org/synapse/issues/5794))
- Disable codecov GitHub comments on PRs. ([\#5796](https://github.com/matrix-org/synapse/issues/5796))
- Don't allow clients to send tombstone events that reference the room it's sent in. ([\#5801](https://github.com/matrix-org/synapse/issues/5801))
- Deny redactions of events sent in a different room. ([\#5802](https://github.com/matrix-org/synapse/issues/5802))
- Deny sending well known state types as non-state events. ([\#5805](https://github.com/matrix-org/synapse/issues/5805))
- Handle incorrectly encoded query params correctly by returning a 400. ([\#5808](https://github.com/matrix-org/synapse/issues/5808))
- Handle pusher being deleted during processing rather than logging an exception. ([\#5809](https://github.com/matrix-org/synapse/issues/5809))
- Return 502 not 500 when failing to reach any remote server. ([\#5810](https://github.com/matrix-org/synapse/issues/5810))
- Reduce global pauses in the events stream caused by expensive state resolution during persistence. ([\#5826](https://github.com/matrix-org/synapse/issues/5826))
- Add a lower bound to well-known lookup cache time to avoid repeated lookups. ([\#5836](https://github.com/matrix-org/synapse/issues/5836))
- Whitelist history visbility sytests in worker mode tests. ([\#5843](https://github.com/matrix-org/synapse/issues/5843))
Synapse 1.2.1 (2019-07-26)
==========================

View file

@ -419,12 +419,11 @@ If Synapse is not configured with an SMTP server, password reset via email will
## Registering a user
You will need at least one user on your server in order to use a Matrix
client. Users can be registered either via a Matrix client, or via a
commandline script.
The easiest way to create a new user is to do so from a client like [Riot](https://riot.im).
To get started, it is easiest to use the command line to register new
users. This can be done as follows:
Alternatively you can do so from the command line if you have installed via pip.
This can be done as follows:
```
$ source ~/synapse/env/bin/activate

1
changelog.d/5633.bugfix Normal file
View file

@ -0,0 +1 @@
Don't create broken room when power_level_content_override.users does not contain creator_id.

View file

@ -1 +0,0 @@
Synapse now no longer accepts the `-v`/`--verbose`, `-f`/`--log-file`, or `--log-config` command line flags, and removes the deprecated `verbose` and `log_file` configuration file options. Users of these options should migrate their options into the dedicated log configuration.

View file

@ -1 +0,0 @@
Use `M_USER_DEACTIVATED` instead of `M_UNKNOWN` for errcode when a deactivated user attempts to login.

View file

@ -1 +0,0 @@
Fix UISIs during homeserver outage.

View file

@ -1 +0,0 @@
Make Jaeger fully configurable.

View file

@ -1 +0,0 @@
Add precautionary measures to prevent future abuse of `window.opener` in default welcome page.

View file

@ -1 +0,0 @@
Reduce database IO usage by optimising queries for current membership.

View file

@ -1 +0,0 @@
Improve caching when fetching `get_filtered_current_state_ids`.

View file

@ -1 +0,0 @@
Don't accept opentracing data from clients.

View file

@ -1 +0,0 @@
Speed up PostgreSQL unit tests in CI.

View file

@ -1 +0,0 @@
Update the coding style document.

View file

@ -1 +0,0 @@
Improve database query performance when recording retry intervals for remote hosts.

View file

@ -1 +0,0 @@
Add a set of opentracing utils.

View file

@ -1 +0,0 @@
Fix stack overflow in server key lookup code.

View file

@ -1 +0,0 @@
start.sh no longer uses deprecated cli option.

View file

@ -1 +0,0 @@
Synapse now no longer accepts the `-v`/`--verbose`, `-f`/`--log-file`, or `--log-config` command line flags, and removes the deprecated `verbose` and `log_file` configuration file options. Users of these options should migrate their options into the dedicated log configuration.

View file

@ -1 +0,0 @@
Cache result of get_version_string to reduce overhead of `/version` federation requests.

View file

@ -1 +0,0 @@
Return 'user_type' in admin API user endpoints results.

View file

@ -1 +0,0 @@
Add sd_notify hooks to ease systemd integration and allows usage of Type=Notify.

View file

@ -1 +0,0 @@
Don't package the sytest test blacklist file.

View file

@ -1 +0,0 @@
Replace uses of returnValue with plain return, as returnValue is not needed on Python 3.

View file

@ -1 +0,0 @@
Reduce database IO usage by optimising queries for current membership.

View file

@ -1 +0,0 @@
Blacklist some flakey tests in worker mode.

View file

@ -1 +0,0 @@
Log when we receive an event receipt from an unexpected origin.

View file

@ -1 +0,0 @@
Reduce database IO usage by optimising queries for current membership.

View file

@ -1 +0,0 @@
Fix some error cases in the caching layer.

View file

@ -1 +0,0 @@
Add a prometheus metric for pending cache lookups.

View file

@ -1 +0,0 @@
Reduce database IO usage by optimising queries for current membership.

View file

@ -1 +0,0 @@
Stop trying to fetch events with event_id=None.

View file

@ -1 +0,0 @@
Convert RedactionTestCase to modern test style.

View file

@ -1 +0,0 @@
Reduce database IO usage by optimising queries for current membership.

View file

@ -1 +0,0 @@
Reduce database IO usage by optimising queries for current membership.

View file

@ -1 +0,0 @@
Fix debian packaging scripts to correctly build sid packages.

View file

@ -1 +0,0 @@
Allow looping calls to be given arguments.

View file

@ -1 +0,0 @@
Remove non-functional 'expire_access_token' setting.

View file

@ -1 +0,0 @@
Synapse can now be configured to not join remote rooms of a given "complexity" (currently, state events) over federation. This option can be used to prevent adverse performance on resource-constrained homeservers.

View file

@ -1 +0,0 @@
Set the logs emitted when checking typing and presence timeouts to DEBUG level, not INFO.

View file

@ -1 +0,0 @@
Remove DelayedCall debugging from the test suite, as it is no longer required in the vast majority of Synapse's tests.

View file

@ -1 +0,0 @@
Fix UISIs during homeserver outage.

View file

@ -1 +0,0 @@
Remove some spurious exceptions from the logs where we failed to talk to a remote server.

View file

@ -1 +0,0 @@
Reduce database IO usage by optimising queries for current membership.

View file

@ -1 +0,0 @@
Reduce database IO usage by optimising queries for current membership.

View file

@ -1 +0,0 @@
Improve performance when making `.well-known` requests by sharing the SSL options between requests.

View file

@ -1 +0,0 @@
Disable codecov GitHub comments on PRs.

View file

@ -1 +0,0 @@
Return 404 instead of 403 when accessing /rooms/{roomId}/event/{eventId} for an event without the appropriate permissions.

View file

@ -1 +0,0 @@
Don't allow clients to send tombstone events that reference the room it's sent in.

View file

@ -1 +0,0 @@
Deny redactions of events sent in a different room.

View file

@ -1 +0,0 @@
Fix check that tombstone is a state event in push rules.

View file

@ -1 +0,0 @@
Deny sending well known state types as non-state events.

View file

@ -1 +0,0 @@
Fix error when trying to login as a deactivated user when using a worker to handle login.

View file

@ -1 +0,0 @@
Allow defining HTML templates to serve the user on account renewal attempt when using the account validity feature.

View file

@ -1 +0,0 @@
Handle incorrectly encoded query params correctly by returning a 400.

View file

@ -1 +0,0 @@
Return 502 not 500 when failing to reach any remote server.

View file

@ -1 +0,0 @@
Fix bug where user `/sync` stream could get wedged in rare circumstances.

1
changelog.d/5844.misc Normal file
View file

@ -0,0 +1 @@
Retry well-known lookup before the cache expires, giving a grace period where the remote well-known can be down but we still use the old result.

1
changelog.d/5863.bugfix Normal file
View file

@ -0,0 +1 @@
Fix Synapse looking for config options `password_reset_failure_template` and `password_reset_success_template`, when they are actually `password_reset_template_failure_html`, `password_reset_template_success_html`.

View file

@ -51,4 +51,4 @@ TOKEN=$(sql "SELECT token FROM access_tokens WHERE user_id='$ADMIN' ORDER BY id
# finally start pruning media:
###############################################################################
set -x # for debugging the generated string
curl --header "Authorization: Bearer $TOKEN" -v POST "$API_URL/admin/purge_media_cache/?before_ts=$UNIX_TIMESTAMP"
curl --header "Authorization: Bearer $TOKEN" -X POST "$API_URL/admin/purge_media_cache/?before_ts=$UNIX_TIMESTAMP"

10
debian/changelog vendored
View file

@ -1,8 +1,7 @@
matrix-synapse-py3 (1.2.1) stable; urgency=medium
matrix-synapse-py3 (1.3.0) stable; urgency=medium
* New synapse release 1.2.1.
-- Synapse Packaging team <packages@matrix.org> Fri, 26 Jul 2019 11:32:47 +0100
[ Andrew Morgan ]
* Remove libsqlite3-dev from required build dependencies.
matrix-synapse-py3 (1.2.0) stable; urgency=medium
@ -14,8 +13,9 @@ matrix-synapse-py3 (1.2.0) stable; urgency=medium
[ Synapse Packaging team ]
* New synapse release 1.2.0.
* New synapse release 1.3.0.
-- Synapse Packaging team <packages@matrix.org> Thu, 25 Jul 2019 14:10:07 +0100
-- Synapse Packaging team <packages@matrix.org> Thu, 15 Aug 2019 12:04:23 +0100
matrix-synapse-py3 (1.1.0) stable; urgency=medium

1
debian/control vendored
View file

@ -15,7 +15,6 @@ Build-Depends:
python3-setuptools,
python3-pip,
python3-venv,
libsqlite3-dev,
tar,
Standards-Version: 3.9.8
Homepage: https://github.com/matrix-org/synapse

View file

@ -565,6 +565,13 @@ log_config: "CONFDIR/SERVERNAME.log.config"
## Media Store ##
# Enable the media store service in the Synapse master. Uncomment the
# following if you are using a separate media store worker.
#
#enable_media_repo: false
# Directory where uploaded images and attachments are stored.
#
media_store_path: "DATADIR/media_store"

View file

@ -206,6 +206,13 @@ Handles the media repository. It can handle all endpoints starting with::
/_matrix/media/
And the following regular expressions matching media-specific administration
APIs::
^/_synapse/admin/v1/purge_media_cache$
^/_synapse/admin/v1/room/.*/media$
^/_synapse/admin/v1/quarantine_media/.*$
You should also set ``enable_media_repo: False`` in the shared configuration
file to stop the main synapse running background jobs related to managing the
media repository.

View file

@ -35,4 +35,4 @@ try:
except ImportError:
pass
__version__ = "1.2.1"
__version__ = "1.3.0"

View file

@ -26,6 +26,7 @@ from synapse.app import _base
from synapse.config._base import ConfigError
from synapse.config.homeserver import HomeServerConfig
from synapse.config.logger import setup_logging
from synapse.http.server import JsonResource
from synapse.http.site import SynapseSite
from synapse.logging.context import LoggingContext
from synapse.metrics import METRICS_PREFIX, MetricsResource, RegistryProxy
@ -35,6 +36,7 @@ from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
from synapse.replication.slave.storage.registration import SlavedRegistrationStore
from synapse.replication.slave.storage.transactions import SlavedTransactionStore
from synapse.replication.tcp.client import ReplicationClientHandler
from synapse.rest.admin import register_servlets_for_media_repo
from synapse.rest.media.v0.content_repository import ContentRepoResource
from synapse.server import HomeServer
from synapse.storage.engines import create_engine
@ -71,6 +73,12 @@ class MediaRepositoryServer(HomeServer):
resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
elif name == "media":
media_repo = self.get_media_repository_resource()
# We need to serve the admin servlets for media on the
# worker.
admin_resource = JsonResource(self, canonical_json=False)
register_servlets_for_media_repo(self, admin_resource)
resources.update(
{
MEDIA_PREFIX: media_repo,
@ -78,6 +86,7 @@ class MediaRepositoryServer(HomeServer):
CONTENT_REPO_PREFIX: ContentRepoResource(
self, self.config.uploads_path
),
"/_synapse/admin": admin_resource,
}
)

View file

@ -132,21 +132,21 @@ class EmailConfig(Config):
self.email_password_reset_template_text = email_config.get(
"password_reset_template_text", "password_reset.txt"
)
self.email_password_reset_failure_template = email_config.get(
"password_reset_failure_template", "password_reset_failure.html"
self.email_password_reset_template_failure_html = email_config.get(
"password_reset_template_failure_html", "password_reset_failure.html"
)
# This template does not support any replaceable variables, so we will
# read it from the disk once during setup
email_password_reset_success_template = email_config.get(
"password_reset_success_template", "password_reset_success.html"
email_password_reset_template_success_html = email_config.get(
"password_reset_template_success_html", "password_reset_success.html"
)
# Check templates exist
for f in [
self.email_password_reset_template_html,
self.email_password_reset_template_text,
self.email_password_reset_failure_template,
email_password_reset_success_template,
self.email_password_reset_template_failure_html,
email_password_reset_template_success_html,
]:
p = os.path.join(self.email_template_dir, f)
if not os.path.isfile(p):
@ -154,9 +154,9 @@ class EmailConfig(Config):
# Retrieve content of web templates
filepath = os.path.join(
self.email_template_dir, email_password_reset_success_template
self.email_template_dir, email_password_reset_template_success_html
)
self.email_password_reset_success_html_content = self.read_file(
self.email_password_reset_template_success_html_content = self.read_file(
filepath, "email.password_reset_template_success_html"
)

View file

@ -12,6 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
from collections import namedtuple
@ -87,6 +88,18 @@ def parse_thumbnail_requirements(thumbnail_sizes):
class ContentRepositoryConfig(Config):
def read_config(self, config, **kwargs):
# Only enable the media repo if either the media repo is enabled or the
# current worker app is the media repo.
if (
self.enable_media_repo is False
and config.get("worker_app") != "synapse.app.media_repository"
):
self.can_load_media_repo = False
return
else:
self.can_load_media_repo = True
self.max_upload_size = self.parse_size(config.get("max_upload_size", "10M"))
self.max_image_pixels = self.parse_size(config.get("max_image_pixels", "32M"))
self.max_spider_size = self.parse_size(config.get("max_spider_size", "10M"))
@ -202,6 +215,13 @@ class ContentRepositoryConfig(Config):
return (
r"""
## Media Store ##
# Enable the media store service in the Synapse master. Uncomment the
# following if you are using a separate media store worker.
#
#enable_media_repo: false
# Directory where uploaded images and attachments are stored.
#
media_store_path: "%(media_store)s"

View file

@ -19,6 +19,8 @@ import functools
import logging
import re
from twisted.internet.defer import maybeDeferred
import synapse
import synapse.logging.opentracing as opentracing
from synapse.api.errors import Codes, FederationDeniedError, SynapseError
@ -745,8 +747,12 @@ class PublicRoomList(BaseFederationServlet):
else:
network_tuple = ThirdPartyInstanceID(None, None)
data = await self.handler.get_local_public_room_list(
limit, since_token, network_tuple=network_tuple, from_federation=True
data = await maybeDeferred(
self.handler.get_local_public_room_list,
limit,
since_token,
network_tuple=network_tuple,
from_federation=True,
)
return 200, data

View file

@ -560,6 +560,18 @@ class RoomCreationHandler(BaseHandler):
yield self.event_creation_handler.assert_accepted_privacy_policy(requester)
power_level_content_override = config.get("power_level_content_override")
if (
power_level_content_override
and "users" in power_level_content_override
and user_id not in power_level_content_override["users"]
):
raise SynapseError(
400,
"Not a valid power_level_content_override: 'users' did not contain %s"
% (user_id,),
)
invite_3pid_list = config.get("invite_3pid", [])
visibility = config.get("visibility", None)
@ -604,7 +616,7 @@ class RoomCreationHandler(BaseHandler):
initial_state=initial_state,
creation_content=creation_content,
room_alias=room_alias,
power_level_content_override=config.get("power_level_content_override"),
power_level_content_override=power_level_content_override,
creator_join_profile=creator_join_profile,
)

View file

@ -12,10 +12,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import random
import time
import attr
from netaddr import IPAddress
@ -24,31 +22,16 @@ from zope.interface import implementer
from twisted.internet import defer
from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
from twisted.internet.interfaces import IStreamClientEndpoint
from twisted.web.client import URI, Agent, HTTPConnectionPool, RedirectAgent, readBody
from twisted.web.http import stringToDatetime
from twisted.web.client import URI, Agent, HTTPConnectionPool
from twisted.web.http_headers import Headers
from twisted.web.iweb import IAgent
from synapse.http.federation.srv_resolver import SrvResolver, pick_server_from_list
from synapse.http.federation.well_known_resolver import WellKnownResolver
from synapse.logging.context import make_deferred_yieldable
from synapse.util import Clock
from synapse.util.caches.ttlcache import TTLCache
from synapse.util.metrics import Measure
# period to cache .well-known results for by default
WELL_KNOWN_DEFAULT_CACHE_PERIOD = 24 * 3600
# jitter to add to the .well-known default cache ttl
WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER = 10 * 60
# period to cache failure to fetch .well-known for
WELL_KNOWN_INVALID_CACHE_PERIOD = 1 * 3600
# cap for .well-known cache period
WELL_KNOWN_MAX_CACHE_PERIOD = 48 * 3600
logger = logging.getLogger(__name__)
well_known_cache = TTLCache("well-known")
@implementer(IAgent)
@ -78,7 +61,7 @@ class MatrixFederationAgent(object):
reactor,
tls_client_options_factory,
_srv_resolver=None,
_well_known_cache=well_known_cache,
_well_known_cache=None,
):
self._reactor = reactor
self._clock = Clock(reactor)
@ -93,20 +76,15 @@ class MatrixFederationAgent(object):
self._pool.maxPersistentPerHost = 5
self._pool.cachedConnectionTimeout = 2 * 60
_well_known_agent = RedirectAgent(
Agent(
self._well_known_resolver = WellKnownResolver(
self._reactor,
agent=Agent(
self._reactor,
pool=self._pool,
contextFactory=tls_client_options_factory,
)
),
well_known_cache=_well_known_cache,
)
self._well_known_agent = _well_known_agent
# our cache of .well-known lookup results, mapping from server name
# to delegated name. The values can be:
# `bytes`: a valid server-name
# `None`: there is no (valid) .well-known here
self._well_known_cache = _well_known_cache
@defer.inlineCallbacks
def request(self, method, uri, headers=None, bodyProducer=None):
@ -217,7 +195,10 @@ class MatrixFederationAgent(object):
if lookup_well_known:
# try a .well-known lookup
well_known_server = yield self._get_well_known(parsed_uri.host)
well_known_result = yield self._well_known_resolver.get_well_known(
parsed_uri.host
)
well_known_server = well_known_result.delegated_server
if well_known_server:
# if we found a .well-known, start again, but don't do another
@ -280,85 +261,6 @@ class MatrixFederationAgent(object):
target_port=port,
)
@defer.inlineCallbacks
def _get_well_known(self, server_name):
"""Attempt to fetch and parse a .well-known file for the given server
Args:
server_name (bytes): name of the server, from the requested url
Returns:
Deferred[bytes|None]: either the new server name, from the .well-known, or
None if there was no .well-known file.
"""
try:
result = self._well_known_cache[server_name]
except KeyError:
# TODO: should we linearise so that we don't end up doing two .well-known
# requests for the same server in parallel?
with Measure(self._clock, "get_well_known"):
result, cache_period = yield self._do_get_well_known(server_name)
if cache_period > 0:
self._well_known_cache.set(server_name, result, cache_period)
return result
@defer.inlineCallbacks
def _do_get_well_known(self, server_name):
"""Actually fetch and parse a .well-known, without checking the cache
Args:
server_name (bytes): name of the server, from the requested url
Returns:
Deferred[Tuple[bytes|None|object],int]:
result, cache period, where result is one of:
- the new server name from the .well-known (as a `bytes`)
- None if there was no .well-known file.
- INVALID_WELL_KNOWN if the .well-known was invalid
"""
uri = b"https://%s/.well-known/matrix/server" % (server_name,)
uri_str = uri.decode("ascii")
logger.info("Fetching %s", uri_str)
try:
response = yield make_deferred_yieldable(
self._well_known_agent.request(b"GET", uri)
)
body = yield make_deferred_yieldable(readBody(response))
if response.code != 200:
raise Exception("Non-200 response %s" % (response.code,))
parsed_body = json.loads(body.decode("utf-8"))
logger.info("Response from .well-known: %s", parsed_body)
if not isinstance(parsed_body, dict):
raise Exception("not a dict")
if "m.server" not in parsed_body:
raise Exception("Missing key 'm.server'")
except Exception as e:
logger.info("Error fetching %s: %s", uri_str, e)
# add some randomness to the TTL to avoid a stampeding herd every hour
# after startup
cache_period = WELL_KNOWN_INVALID_CACHE_PERIOD
cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER)
return (None, cache_period)
result = parsed_body["m.server"].encode("ascii")
cache_period = _cache_period_from_headers(
response.headers, time_now=self._reactor.seconds
)
if cache_period is None:
cache_period = WELL_KNOWN_DEFAULT_CACHE_PERIOD
# add some randomness to the TTL to avoid a stampeding herd every 24 hours
# after startup
cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER)
else:
cache_period = min(cache_period, WELL_KNOWN_MAX_CACHE_PERIOD)
return (result, cache_period)
@implementer(IStreamClientEndpoint)
class LoggingHostnameEndpoint(object):
@ -374,44 +276,6 @@ class LoggingHostnameEndpoint(object):
return self.ep.connect(protocol_factory)
def _cache_period_from_headers(headers, time_now=time.time):
cache_controls = _parse_cache_control(headers)
if b"no-store" in cache_controls:
return 0
if b"max-age" in cache_controls:
try:
max_age = int(cache_controls[b"max-age"])
return max_age
except ValueError:
pass
expires = headers.getRawHeaders(b"expires")
if expires is not None:
try:
expires_date = stringToDatetime(expires[-1])
return expires_date - time_now()
except ValueError:
# RFC7234 says 'A cache recipient MUST interpret invalid date formats,
# especially the value "0", as representing a time in the past (i.e.,
# "already expired").
return 0
return None
def _parse_cache_control(headers):
cache_controls = {}
for hdr in headers.getRawHeaders(b"cache-control", []):
for directive in hdr.split(b","):
splits = [x.strip() for x in directive.split(b"=", 1)]
k = splits[0].lower()
v = splits[1] if len(splits) > 1 else None
cache_controls[k] = v
return cache_controls
@attr.s
class _RoutingResult(object):
"""The result returned by `_route_matrix_uri`.

View file

@ -0,0 +1,225 @@
# -*- coding: utf-8 -*-
# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import logging
import random
import time
import attr
from twisted.internet import defer
from twisted.web.client import RedirectAgent, readBody
from twisted.web.http import stringToDatetime
from synapse.logging.context import make_deferred_yieldable
from synapse.util import Clock
from synapse.util.caches.ttlcache import TTLCache
from synapse.util.metrics import Measure
# period to cache .well-known results for by default
WELL_KNOWN_DEFAULT_CACHE_PERIOD = 24 * 3600
# jitter to add to the .well-known default cache ttl
WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER = 10 * 60
# period to cache failure to fetch .well-known for
WELL_KNOWN_INVALID_CACHE_PERIOD = 1 * 3600
# cap for .well-known cache period
WELL_KNOWN_MAX_CACHE_PERIOD = 48 * 3600
# lower bound for .well-known cache period
WELL_KNOWN_MIN_CACHE_PERIOD = 5 * 60
# Attempt to refetch a cached well-known N% of the TTL before it expires.
# e.g. if set to 0.2 and we have a cached entry with a TTL of 5mins, then
# we'll start trying to refetch 1 minute before it expires.
WELL_KNOWN_GRACE_PERIOD_FACTOR = 0.2
logger = logging.getLogger(__name__)
_well_known_cache = TTLCache("well-known")
@attr.s(slots=True, frozen=True)
class WellKnownLookupResult(object):
delegated_server = attr.ib()
class WellKnownResolver(object):
"""Handles well-known lookups for matrix servers.
"""
def __init__(self, reactor, agent, well_known_cache=None):
self._reactor = reactor
self._clock = Clock(reactor)
if well_known_cache is None:
well_known_cache = _well_known_cache
self._well_known_cache = well_known_cache
self._well_known_agent = RedirectAgent(agent)
@defer.inlineCallbacks
def get_well_known(self, server_name):
"""Attempt to fetch and parse a .well-known file for the given server
Args:
server_name (bytes): name of the server, from the requested url
Returns:
Deferred[WellKnownLookupResult]: The result of the lookup
"""
try:
prev_result, expiry, ttl = self._well_known_cache.get_with_expiry(
server_name
)
now = self._clock.time()
if now < expiry - WELL_KNOWN_GRACE_PERIOD_FACTOR * ttl:
return WellKnownLookupResult(delegated_server=prev_result)
except KeyError:
prev_result = None
# TODO: should we linearise so that we don't end up doing two .well-known
# requests for the same server in parallel?
try:
with Measure(self._clock, "get_well_known"):
result, cache_period = yield self._do_get_well_known(server_name)
except _FetchWellKnownFailure as e:
if prev_result and e.temporary:
# This is a temporary failure and we have a still valid cached
# result, so lets return that. Hopefully the next time we ask
# the remote will be back up again.
return WellKnownLookupResult(delegated_server=prev_result)
result = None
# add some randomness to the TTL to avoid a stampeding herd every hour
# after startup
cache_period = WELL_KNOWN_INVALID_CACHE_PERIOD
cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER)
if cache_period > 0:
self._well_known_cache.set(server_name, result, cache_period)
return WellKnownLookupResult(delegated_server=result)
@defer.inlineCallbacks
def _do_get_well_known(self, server_name):
"""Actually fetch and parse a .well-known, without checking the cache
Args:
server_name (bytes): name of the server, from the requested url
Raises:
_FetchWellKnownFailure if we fail to lookup a result
Returns:
Deferred[Tuple[bytes,int]]: The lookup result and cache period.
"""
uri = b"https://%s/.well-known/matrix/server" % (server_name,)
uri_str = uri.decode("ascii")
logger.info("Fetching %s", uri_str)
# We do this in two steps to differentiate between possibly transient
# errors (e.g. can't connect to host, 503 response) and more permenant
# errors (such as getting a 404 response).
try:
response = yield make_deferred_yieldable(
self._well_known_agent.request(b"GET", uri)
)
body = yield make_deferred_yieldable(readBody(response))
if 500 <= response.code < 600:
raise Exception("Non-200 response %s" % (response.code,))
except Exception as e:
logger.info("Error fetching %s: %s", uri_str, e)
raise _FetchWellKnownFailure(temporary=True)
try:
if response.code != 200:
raise Exception("Non-200 response %s" % (response.code,))
parsed_body = json.loads(body.decode("utf-8"))
logger.info("Response from .well-known: %s", parsed_body)
result = parsed_body["m.server"].encode("ascii")
except Exception as e:
logger.info("Error fetching %s: %s", uri_str, e)
raise _FetchWellKnownFailure(temporary=False)
cache_period = _cache_period_from_headers(
response.headers, time_now=self._reactor.seconds
)
if cache_period is None:
cache_period = WELL_KNOWN_DEFAULT_CACHE_PERIOD
# add some randomness to the TTL to avoid a stampeding herd every 24 hours
# after startup
cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER)
else:
cache_period = min(cache_period, WELL_KNOWN_MAX_CACHE_PERIOD)
cache_period = max(cache_period, WELL_KNOWN_MIN_CACHE_PERIOD)
return (result, cache_period)
def _cache_period_from_headers(headers, time_now=time.time):
cache_controls = _parse_cache_control(headers)
if b"no-store" in cache_controls:
return 0
if b"max-age" in cache_controls:
try:
max_age = int(cache_controls[b"max-age"])
return max_age
except ValueError:
pass
expires = headers.getRawHeaders(b"expires")
if expires is not None:
try:
expires_date = stringToDatetime(expires[-1])
return expires_date - time_now()
except ValueError:
# RFC7234 says 'A cache recipient MUST interpret invalid date formats,
# especially the value "0", as representing a time in the past (i.e.,
# "already expired").
return 0
return None
def _parse_cache_control(headers):
cache_controls = {}
for hdr in headers.getRawHeaders(b"cache-control", []):
for directive in hdr.split(b","):
splits = [x.strip() for x in directive.split(b"=", 1)]
k = splits[0].lower()
v = splits[1] if len(splits) > 1 else None
cache_controls[k] = v
return cache_controls
@attr.s()
class _FetchWellKnownFailure(Exception):
# True if we didn't get a non-5xx HTTP response, i.e. this may or may not be
# a temporary failure.
temporary = attr.ib()

View file

@ -234,13 +234,19 @@ class EmailPusher(object):
return
self.last_stream_ordering = last_stream_ordering
yield self.store.update_pusher_last_stream_ordering_and_success(
self.app_id,
self.email,
self.user_id,
last_stream_ordering,
self.clock.time_msec(),
pusher_still_exists = (
yield self.store.update_pusher_last_stream_ordering_and_success(
self.app_id,
self.email,
self.user_id,
last_stream_ordering,
self.clock.time_msec(),
)
)
if not pusher_still_exists:
# The pusher has been deleted while we were processing, so
# lets just stop and return.
self.on_stop()
def seconds_until(self, ts_msec):
secs = (ts_msec - self.clock.time_msec()) / 1000

View file

@ -199,13 +199,21 @@ class HttpPusher(object):
http_push_processed_counter.inc()
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
self.last_stream_ordering = push_action["stream_ordering"]
yield self.store.update_pusher_last_stream_ordering_and_success(
self.app_id,
self.pushkey,
self.user_id,
self.last_stream_ordering,
self.clock.time_msec(),
pusher_still_exists = (
yield self.store.update_pusher_last_stream_ordering_and_success(
self.app_id,
self.pushkey,
self.user_id,
self.last_stream_ordering,
self.clock.time_msec(),
)
)
if not pusher_still_exists:
# The pusher has been deleted while we were processing, so
# lets just stop and return.
self.on_stop()
return
if self.failing_since:
self.failing_since = None
yield self.store.update_pusher_failing_since(
@ -234,12 +242,17 @@ class HttpPusher(object):
)
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
self.last_stream_ordering = push_action["stream_ordering"]
yield self.store.update_pusher_last_stream_ordering(
pusher_still_exists = yield self.store.update_pusher_last_stream_ordering(
self.app_id,
self.pushkey,
self.user_id,
self.last_stream_ordering,
)
if not pusher_still_exists:
# The pusher has been deleted while we were processing, so
# lets just stop and return.
self.on_stop()
return
self.failing_since = None
yield self.store.update_pusher_failing_since(

View file

@ -27,7 +27,7 @@ from twisted.internet import defer
import synapse
from synapse.api.constants import Membership, UserTypes
from synapse.api.errors import AuthError, Codes, NotFoundError, SynapseError
from synapse.api.errors import Codes, NotFoundError, SynapseError
from synapse.http.server import JsonResource
from synapse.http.servlet import (
RestServlet,
@ -36,7 +36,12 @@ from synapse.http.servlet import (
parse_json_object_from_request,
parse_string,
)
from synapse.rest.admin._base import assert_requester_is_admin, assert_user_is_admin
from synapse.rest.admin._base import (
assert_requester_is_admin,
assert_user_is_admin,
historical_admin_path_patterns,
)
from synapse.rest.admin.media import register_servlets_for_media_repo
from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet
from synapse.types import UserID, create_requester
from synapse.util.versionstring import get_version_string
@ -44,28 +49,6 @@ from synapse.util.versionstring import get_version_string
logger = logging.getLogger(__name__)
def historical_admin_path_patterns(path_regex):
"""Returns the list of patterns for an admin endpoint, including historical ones
This is a backwards-compatibility hack. Previously, the Admin API was exposed at
various paths under /_matrix/client. This function returns a list of patterns
matching those paths (as well as the new one), so that existing scripts which rely
on the endpoints being available there are not broken.
Note that this should only be used for existing endpoints: new ones should just
register for the /_synapse/admin path.
"""
return list(
re.compile(prefix + path_regex)
for prefix in (
"^/_synapse/admin/v1",
"^/_matrix/client/api/v1/admin",
"^/_matrix/client/unstable/admin",
"^/_matrix/client/r0/admin",
)
)
class UsersRestServlet(RestServlet):
PATTERNS = historical_admin_path_patterns("/users/(?P<user_id>[^/]*)")
@ -255,25 +238,6 @@ class WhoisRestServlet(RestServlet):
return (200, ret)
class PurgeMediaCacheRestServlet(RestServlet):
PATTERNS = historical_admin_path_patterns("/purge_media_cache")
def __init__(self, hs):
self.media_repository = hs.get_media_repository()
self.auth = hs.get_auth()
@defer.inlineCallbacks
def on_POST(self, request):
yield assert_requester_is_admin(self.auth, request)
before_ts = parse_integer(request, "before_ts", required=True)
logger.info("before_ts: %r", before_ts)
ret = yield self.media_repository.delete_old_remote_media(before_ts)
return (200, ret)
class PurgeHistoryRestServlet(RestServlet):
PATTERNS = historical_admin_path_patterns(
"/purge_history/(?P<room_id>[^/]*)(/(?P<event_id>[^/]+))?"
@ -542,50 +506,6 @@ class ShutdownRoomRestServlet(RestServlet):
)
class QuarantineMediaInRoom(RestServlet):
"""Quarantines all media in a room so that no one can download it via
this server.
"""
PATTERNS = historical_admin_path_patterns("/quarantine_media/(?P<room_id>[^/]+)")
def __init__(self, hs):
self.store = hs.get_datastore()
self.auth = hs.get_auth()
@defer.inlineCallbacks
def on_POST(self, request, room_id):
requester = yield self.auth.get_user_by_req(request)
yield assert_user_is_admin(self.auth, requester.user)
num_quarantined = yield self.store.quarantine_media_ids_in_room(
room_id, requester.user.to_string()
)
return (200, {"num_quarantined": num_quarantined})
class ListMediaInRoom(RestServlet):
"""Lists all of the media in a given room.
"""
PATTERNS = historical_admin_path_patterns("/room/(?P<room_id>[^/]+)/media")
def __init__(self, hs):
self.store = hs.get_datastore()
@defer.inlineCallbacks
def on_GET(self, request, room_id):
requester = yield self.auth.get_user_by_req(request)
is_admin = yield self.auth.is_server_admin(requester.user)
if not is_admin:
raise AuthError(403, "You are not a server admin")
local_mxcs, remote_mxcs = yield self.store.get_media_mxcs_in_room(room_id)
return (200, {"local": local_mxcs, "remote": remote_mxcs})
class ResetPasswordRestServlet(RestServlet):
"""Post request to allow an administrator reset password for a user.
This needs user to have administrator access in Synapse.
@ -825,7 +745,6 @@ def register_servlets(hs, http_server):
def register_servlets_for_client_rest_resource(hs, http_server):
"""Register only the servlets which need to be exposed on /_matrix/client/xxx"""
WhoisRestServlet(hs).register(http_server)
PurgeMediaCacheRestServlet(hs).register(http_server)
PurgeHistoryStatusRestServlet(hs).register(http_server)
DeactivateAccountRestServlet(hs).register(http_server)
PurgeHistoryRestServlet(hs).register(http_server)
@ -834,10 +753,13 @@ def register_servlets_for_client_rest_resource(hs, http_server):
GetUsersPaginatedRestServlet(hs).register(http_server)
SearchUsersRestServlet(hs).register(http_server)
ShutdownRoomRestServlet(hs).register(http_server)
QuarantineMediaInRoom(hs).register(http_server)
ListMediaInRoom(hs).register(http_server)
UserRegisterServlet(hs).register(http_server)
DeleteGroupAdminRestServlet(hs).register(http_server)
AccountValidityRenewServlet(hs).register(http_server)
# Load the media repo ones if we're using them.
if hs.config.can_load_media_repo:
register_servlets_for_media_repo(hs, http_server)
# don't add more things here: new servlets should only be exposed on
# /_synapse/admin so should not go here. Instead register them in AdminRestResource.

View file

@ -12,11 +12,36 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import re
from twisted.internet import defer
from synapse.api.errors import AuthError
def historical_admin_path_patterns(path_regex):
"""Returns the list of patterns for an admin endpoint, including historical ones
This is a backwards-compatibility hack. Previously, the Admin API was exposed at
various paths under /_matrix/client. This function returns a list of patterns
matching those paths (as well as the new one), so that existing scripts which rely
on the endpoints being available there are not broken.
Note that this should only be used for existing endpoints: new ones should just
register for the /_synapse/admin path.
"""
return list(
re.compile(prefix + path_regex)
for prefix in (
"^/_synapse/admin/v1",
"^/_matrix/client/api/v1/admin",
"^/_matrix/client/unstable/admin",
"^/_matrix/client/r0/admin",
)
)
@defer.inlineCallbacks
def assert_requester_is_admin(auth, request):
"""Verify that the requester is an admin user

101
synapse/rest/admin/media.py Normal file
View file

@ -0,0 +1,101 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2018-2019 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from twisted.internet import defer
from synapse.api.errors import AuthError
from synapse.http.servlet import RestServlet, parse_integer
from synapse.rest.admin._base import (
assert_requester_is_admin,
assert_user_is_admin,
historical_admin_path_patterns,
)
logger = logging.getLogger(__name__)
class QuarantineMediaInRoom(RestServlet):
"""Quarantines all media in a room so that no one can download it via
this server.
"""
PATTERNS = historical_admin_path_patterns("/quarantine_media/(?P<room_id>[^/]+)")
def __init__(self, hs):
self.store = hs.get_datastore()
self.auth = hs.get_auth()
@defer.inlineCallbacks
def on_POST(self, request, room_id):
requester = yield self.auth.get_user_by_req(request)
yield assert_user_is_admin(self.auth, requester.user)
num_quarantined = yield self.store.quarantine_media_ids_in_room(
room_id, requester.user.to_string()
)
return (200, {"num_quarantined": num_quarantined})
class ListMediaInRoom(RestServlet):
"""Lists all of the media in a given room.
"""
PATTERNS = historical_admin_path_patterns("/room/(?P<room_id>[^/]+)/media")
def __init__(self, hs):
self.store = hs.get_datastore()
@defer.inlineCallbacks
def on_GET(self, request, room_id):
requester = yield self.auth.get_user_by_req(request)
is_admin = yield self.auth.is_server_admin(requester.user)
if not is_admin:
raise AuthError(403, "You are not a server admin")
local_mxcs, remote_mxcs = yield self.store.get_media_mxcs_in_room(room_id)
return (200, {"local": local_mxcs, "remote": remote_mxcs})
class PurgeMediaCacheRestServlet(RestServlet):
PATTERNS = historical_admin_path_patterns("/purge_media_cache")
def __init__(self, hs):
self.media_repository = hs.get_media_repository()
self.auth = hs.get_auth()
@defer.inlineCallbacks
def on_POST(self, request):
yield assert_requester_is_admin(self.auth, request)
before_ts = parse_integer(request, "before_ts", required=True)
logger.info("before_ts: %r", before_ts)
ret = yield self.media_repository.delete_old_remote_media(before_ts)
return (200, ret)
def register_servlets_for_media_repo(hs, http_server):
"""
Media repo specific APIs.
"""
PurgeMediaCacheRestServlet(hs).register(http_server)
QuarantineMediaInRoom(hs).register(http_server)
ListMediaInRoom(hs).register(http_server)

View file

@ -282,13 +282,13 @@ class PasswordResetSubmitTokenServlet(RestServlet):
return None
# Otherwise show the success template
html = self.config.email_password_reset_success_html_content
html = self.config.email_password_reset_template_success_html_content
request.setResponseCode(200)
except ThreepidValidationError as e:
# Show a failure page with a reason
html = self.load_jinja2_template(
self.config.email_template_dir,
self.config.email_password_reset_failure_template,
self.config.email_password_reset_template_failure_html,
template_vars={"failure_reason": e.msg},
)
request.setResponseCode(e.code)

View file

@ -33,6 +33,7 @@ from synapse.api.errors import (
RequestSendFailed,
SynapseError,
)
from synapse.config._base import ConfigError
from synapse.logging.context import defer_to_thread
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.async_helpers import Linearizer
@ -753,8 +754,11 @@ class MediaRepositoryResource(Resource):
"""
def __init__(self, hs):
Resource.__init__(self)
# If we're not configured to use it, raise if we somehow got here.
if not hs.config.can_load_media_repo:
raise ConfigError("Synapse is not configured to use a media repo.")
super().__init__()
media_repo = hs.get_media_repository()
self.putChild(b"upload", UploadResource(hs, media_repo))

View file

@ -364,147 +364,161 @@ class EventsStore(
if not events_and_contexts:
return
if backfilled:
stream_ordering_manager = self._backfill_id_gen.get_next_mult(
len(events_and_contexts)
)
else:
stream_ordering_manager = self._stream_id_gen.get_next_mult(
len(events_and_contexts)
)
chunks = [
events_and_contexts[x : x + 100]
for x in range(0, len(events_and_contexts), 100)
]
with stream_ordering_manager as stream_orderings:
for (event, context), stream in zip(events_and_contexts, stream_orderings):
event.internal_metadata.stream_ordering = stream
for chunk in chunks:
# We can't easily parallelize these since different chunks
# might contain the same event. :(
chunks = [
events_and_contexts[x : x + 100]
for x in range(0, len(events_and_contexts), 100)
]
# NB: Assumes that we are only persisting events for one room
# at a time.
for chunk in chunks:
# We can't easily parallelize these since different chunks
# might contain the same event. :(
# map room_id->list[event_ids] giving the new forward
# extremities in each room
new_forward_extremeties = {}
# NB: Assumes that we are only persisting events for one room
# at a time.
# map room_id->(type,state_key)->event_id tracking the full
# state in each room after adding these events.
# This is simply used to prefill the get_current_state_ids
# cache
current_state_for_room = {}
# map room_id->list[event_ids] giving the new forward
# extremities in each room
new_forward_extremeties = {}
# map room_id->(to_delete, to_insert) where to_delete is a list
# of type/state keys to remove from current state, and to_insert
# is a map (type,key)->event_id giving the state delta in each
# room
state_delta_for_room = {}
# map room_id->(type,state_key)->event_id tracking the full
# state in each room after adding these events.
# This is simply used to prefill the get_current_state_ids
# cache
current_state_for_room = {}
if not backfilled:
with Measure(self._clock, "_calculate_state_and_extrem"):
# Work out the new "current state" for each room.
# We do this by working out what the new extremities are and then
# calculating the state from that.
events_by_room = {}
for event, context in chunk:
events_by_room.setdefault(event.room_id, []).append(
(event, context)
)
# map room_id->(to_delete, to_insert) where to_delete is a list
# of type/state keys to remove from current state, and to_insert
# is a map (type,key)->event_id giving the state delta in each
# room
state_delta_for_room = {}
for room_id, ev_ctx_rm in iteritems(events_by_room):
latest_event_ids = yield self.get_latest_event_ids_in_room(
room_id
)
new_latest_event_ids = yield self._calculate_new_extremities(
room_id, ev_ctx_rm, latest_event_ids
)
if not backfilled:
with Measure(self._clock, "_calculate_state_and_extrem"):
# Work out the new "current state" for each room.
# We do this by working out what the new extremities are and then
# calculating the state from that.
events_by_room = {}
for event, context in chunk:
events_by_room.setdefault(event.room_id, []).append(
(event, context)
latest_event_ids = set(latest_event_ids)
if new_latest_event_ids == latest_event_ids:
# No change in extremities, so no change in state
continue
# there should always be at least one forward extremity.
# (except during the initial persistence of the send_join
# results, in which case there will be no existing
# extremities, so we'll `continue` above and skip this bit.)
assert new_latest_event_ids, "No forward extremities left!"
new_forward_extremeties[room_id] = new_latest_event_ids
len_1 = (
len(latest_event_ids) == 1
and len(new_latest_event_ids) == 1
)
if len_1:
all_single_prev_not_state = all(
len(event.prev_event_ids()) == 1
and not event.is_state()
for event, ctx in ev_ctx_rm
)
for room_id, ev_ctx_rm in iteritems(events_by_room):
latest_event_ids = yield self.get_latest_event_ids_in_room(
room_id
)
new_latest_event_ids = yield self._calculate_new_extremities(
room_id, ev_ctx_rm, latest_event_ids
)
latest_event_ids = set(latest_event_ids)
if new_latest_event_ids == latest_event_ids:
# No change in extremities, so no change in state
# Don't bother calculating state if they're just
# a long chain of single ancestor non-state events.
if all_single_prev_not_state:
continue
# there should always be at least one forward extremity.
# (except during the initial persistence of the send_join
# results, in which case there will be no existing
# extremities, so we'll `continue` above and skip this bit.)
assert new_latest_event_ids, "No forward extremities left!"
state_delta_counter.inc()
if len(new_latest_event_ids) == 1:
state_delta_single_event_counter.inc()
new_forward_extremeties[room_id] = new_latest_event_ids
# This is a fairly handwavey check to see if we could
# have guessed what the delta would have been when
# processing one of these events.
# What we're interested in is if the latest extremities
# were the same when we created the event as they are
# now. When this server creates a new event (as opposed
# to receiving it over federation) it will use the
# forward extremities as the prev_events, so we can
# guess this by looking at the prev_events and checking
# if they match the current forward extremities.
for ev, _ in ev_ctx_rm:
prev_event_ids = set(ev.prev_event_ids())
if latest_event_ids == prev_event_ids:
state_delta_reuse_delta_counter.inc()
break
len_1 = (
len(latest_event_ids) == 1
and len(new_latest_event_ids) == 1
logger.info("Calculating state delta for room %s", room_id)
with Measure(
self._clock, "persist_events.get_new_state_after_events"
):
res = yield self._get_new_state_after_events(
room_id,
ev_ctx_rm,
latest_event_ids,
new_latest_event_ids,
)
if len_1:
all_single_prev_not_state = all(
len(event.prev_event_ids()) == 1
and not event.is_state()
for event, ctx in ev_ctx_rm
)
# Don't bother calculating state if they're just
# a long chain of single ancestor non-state events.
if all_single_prev_not_state:
continue
current_state, delta_ids = res
state_delta_counter.inc()
if len(new_latest_event_ids) == 1:
state_delta_single_event_counter.inc()
# This is a fairly handwavey check to see if we could
# have guessed what the delta would have been when
# processing one of these events.
# What we're interested in is if the latest extremities
# were the same when we created the event as they are
# now. When this server creates a new event (as opposed
# to receiving it over federation) it will use the
# forward extremities as the prev_events, so we can
# guess this by looking at the prev_events and checking
# if they match the current forward extremities.
for ev, _ in ev_ctx_rm:
prev_event_ids = set(ev.prev_event_ids())
if latest_event_ids == prev_event_ids:
state_delta_reuse_delta_counter.inc()
break
logger.info("Calculating state delta for room %s", room_id)
# If either are not None then there has been a change,
# and we need to work out the delta (or use that
# given)
if delta_ids is not None:
# If there is a delta we know that we've
# only added or replaced state, never
# removed keys entirely.
state_delta_for_room[room_id] = ([], delta_ids)
elif current_state is not None:
with Measure(
self._clock, "persist_events.get_new_state_after_events"
self._clock, "persist_events.calculate_state_delta"
):
res = yield self._get_new_state_after_events(
room_id,
ev_ctx_rm,
latest_event_ids,
new_latest_event_ids,
delta = yield self._calculate_state_delta(
room_id, current_state
)
current_state, delta_ids = res
state_delta_for_room[room_id] = delta
# If either are not None then there has been a change,
# and we need to work out the delta (or use that
# given)
if delta_ids is not None:
# If there is a delta we know that we've
# only added or replaced state, never
# removed keys entirely.
state_delta_for_room[room_id] = ([], delta_ids)
elif current_state is not None:
with Measure(
self._clock, "persist_events.calculate_state_delta"
):
delta = yield self._calculate_state_delta(
room_id, current_state
)
state_delta_for_room[room_id] = delta
# If we have the current_state then lets prefill
# the cache with it.
if current_state is not None:
current_state_for_room[room_id] = current_state
# If we have the current_state then lets prefill
# the cache with it.
if current_state is not None:
current_state_for_room[room_id] = current_state
# We want to calculate the stream orderings as late as possible, as
# we only notify after all events with a lesser stream ordering have
# been persisted. I.e. if we spend 10s inside the with block then
# that will delay all subsequent events from being notified about.
# Hence why we do it down here rather than wrapping the entire
# function.
#
# Its safe to do this after calculating the state deltas etc as we
# only need to protect the *persistence* of the events. This is to
# ensure that queries of the form "fetch events since X" don't
# return events and stream positions after events that are still in
# flight, as otherwise subsequent requests "fetch event since Y"
# will not return those events.
#
# Note: Multiple instances of this function cannot be in flight at
# the same time for the same room.
if backfilled:
stream_ordering_manager = self._backfill_id_gen.get_next_mult(
len(chunk)
)
else:
stream_ordering_manager = self._stream_id_gen.get_next_mult(len(chunk))
with stream_ordering_manager as stream_orderings:
for (event, context), stream in zip(chunk, stream_orderings):
event.internal_metadata.stream_ordering = stream
yield self.runInteraction(
"persist_events",

View file

@ -29,12 +29,7 @@ from synapse.api.room_versions import EventFormatVersions
from synapse.events import FrozenEvent, event_type_from_format_version # noqa: F401
from synapse.events.snapshot import EventContext # noqa: F401
from synapse.events.utils import prune_event
from synapse.logging.context import (
LoggingContext,
PreserveLoggingContext,
make_deferred_yieldable,
run_in_background,
)
from synapse.logging.context import LoggingContext, PreserveLoggingContext
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import get_domain_from_id
from synapse.util import batch_iter
@ -342,13 +337,12 @@ class EventsWorkerStore(SQLBaseStore):
log_ctx = LoggingContext.current_context()
log_ctx.record_event_fetch(len(missing_events_ids))
# Note that _enqueue_events is also responsible for turning db rows
# Note that _get_events_from_db is also responsible for turning db rows
# into FrozenEvents (via _get_event_from_row), which involves seeing if
# the events have been redacted, and if so pulling the redaction event out
# of the database to check it.
#
# _enqueue_events is a bit of a rubbish name but naming is hard.
missing_events = yield self._enqueue_events(
missing_events = yield self._get_events_from_db(
missing_events_ids, allow_rejected=allow_rejected
)
@ -421,28 +415,28 @@ class EventsWorkerStore(SQLBaseStore):
The fetch requests. Each entry consists of a list of event
ids to be fetched, and a deferred to be completed once the
events have been fetched.
The deferreds are callbacked with a dictionary mapping from event id
to event row. Note that it may well contain additional events that
were not part of this request.
"""
with Measure(self._clock, "_fetch_event_list"):
try:
event_id_lists = list(zip(*event_list))[0]
event_ids = [item for sublist in event_id_lists for item in sublist]
events_to_fetch = set(
event_id for events, _ in event_list for event_id in events
)
row_dict = self._new_transaction(
conn, "do_fetch", [], [], self._fetch_event_rows, event_ids
conn, "do_fetch", [], [], self._fetch_event_rows, events_to_fetch
)
# We only want to resolve deferreds from the main thread
def fire(lst, res):
for ids, d in lst:
if not d.called:
try:
with PreserveLoggingContext():
d.callback([res[i] for i in ids if i in res])
except Exception:
logger.exception("Failed to callback")
def fire():
for _, d in event_list:
d.callback(row_dict)
with PreserveLoggingContext():
self.hs.get_reactor().callFromThread(fire, event_list, row_dict)
self.hs.get_reactor().callFromThread(fire)
except Exception as e:
logger.exception("do_fetch")
@ -457,13 +451,98 @@ class EventsWorkerStore(SQLBaseStore):
self.hs.get_reactor().callFromThread(fire, event_list, e)
@defer.inlineCallbacks
def _enqueue_events(self, events, allow_rejected=False):
def _get_events_from_db(self, event_ids, allow_rejected=False):
"""Fetch a bunch of events from the database.
Returned events will be added to the cache for future lookups.
Args:
event_ids (Iterable[str]): The event_ids of the events to fetch
allow_rejected (bool): Whether to include rejected events
Returns:
Deferred[Dict[str, _EventCacheEntry]]:
map from event id to result. May return extra events which
weren't asked for.
"""
fetched_events = {}
events_to_fetch = event_ids
while events_to_fetch:
row_map = yield self._enqueue_events(events_to_fetch)
# we need to recursively fetch any redactions of those events
redaction_ids = set()
for event_id in events_to_fetch:
row = row_map.get(event_id)
fetched_events[event_id] = row
if row:
redaction_ids.update(row["redactions"])
events_to_fetch = redaction_ids.difference(fetched_events.keys())
if events_to_fetch:
logger.debug("Also fetching redaction events %s", events_to_fetch)
# build a map from event_id to EventBase
event_map = {}
for event_id, row in fetched_events.items():
if not row:
continue
assert row["event_id"] == event_id
rejected_reason = row["rejected_reason"]
if not allow_rejected and rejected_reason:
continue
d = json.loads(row["json"])
internal_metadata = json.loads(row["internal_metadata"])
format_version = row["format_version"]
if format_version is None:
# This means that we stored the event before we had the concept
# of a event format version, so it must be a V1 event.
format_version = EventFormatVersions.V1
original_ev = event_type_from_format_version(format_version)(
event_dict=d,
internal_metadata_dict=internal_metadata,
rejected_reason=rejected_reason,
)
event_map[event_id] = original_ev
# finally, we can decide whether each one nededs redacting, and build
# the cache entries.
result_map = {}
for event_id, original_ev in event_map.items():
redactions = fetched_events[event_id]["redactions"]
redacted_event = self._maybe_redact_event_row(
original_ev, redactions, event_map
)
cache_entry = _EventCacheEntry(
event=original_ev, redacted_event=redacted_event
)
self._get_event_cache.prefill((event_id,), cache_entry)
result_map[event_id] = cache_entry
return result_map
@defer.inlineCallbacks
def _enqueue_events(self, events):
"""Fetches events from the database using the _event_fetch_list. This
allows batch and bulk fetching of events - it allows us to fetch events
without having to create a new transaction for each request for events.
Args:
events (Iterable[str]): events to be fetched.
Returns:
Deferred[Dict[str, Dict]]: map from event id to row data from the database.
May contain events that weren't requested.
"""
if not events:
return {}
events_d = defer.Deferred()
with self._event_fetch_lock:
@ -482,32 +561,12 @@ class EventsWorkerStore(SQLBaseStore):
"fetch_events", self.runWithConnection, self._do_fetch
)
logger.debug("Loading %d events", len(events))
logger.debug("Loading %d events: %s", len(events), events)
with PreserveLoggingContext():
rows = yield events_d
logger.debug("Loaded %d events (%d rows)", len(events), len(rows))
row_map = yield events_d
logger.debug("Loaded %d events (%d rows)", len(events), len(row_map))
if not allow_rejected:
rows[:] = [r for r in rows if r["rejected_reason"] is None]
res = yield make_deferred_yieldable(
defer.gatherResults(
[
run_in_background(
self._get_event_from_row,
row["internal_metadata"],
row["json"],
row["redactions"],
rejected_reason=row["rejected_reason"],
format_version=row["format_version"],
)
for row in rows
],
consumeErrors=True,
)
)
return {e.event.event_id: e for e in res if e}
return row_map
def _fetch_event_rows(self, txn, event_ids):
"""Fetch event rows from the database
@ -580,50 +639,7 @@ class EventsWorkerStore(SQLBaseStore):
return event_dict
@defer.inlineCallbacks
def _get_event_from_row(
self, internal_metadata, js, redactions, format_version, rejected_reason=None
):
"""Parse an event row which has been read from the database
Args:
internal_metadata (str): json-encoded internal_metadata column
js (str): json-encoded event body from event_json
redactions (list[str]): a list of the events which claim to have redacted
this event, from the redactions table
format_version: (str): the 'format_version' column
rejected_reason (str|None): the reason this event was rejected, if any
Returns:
_EventCacheEntry
"""
with Measure(self._clock, "_get_event_from_row"):
d = json.loads(js)
internal_metadata = json.loads(internal_metadata)
if format_version is None:
# This means that we stored the event before we had the concept
# of a event format version, so it must be a V1 event.
format_version = EventFormatVersions.V1
original_ev = event_type_from_format_version(format_version)(
event_dict=d,
internal_metadata_dict=internal_metadata,
rejected_reason=rejected_reason,
)
redacted_event = yield self._maybe_redact_event_row(original_ev, redactions)
cache_entry = _EventCacheEntry(
event=original_ev, redacted_event=redacted_event
)
self._get_event_cache.prefill((original_ev.event_id,), cache_entry)
return cache_entry
@defer.inlineCallbacks
def _maybe_redact_event_row(self, original_ev, redactions):
def _maybe_redact_event_row(self, original_ev, redactions, event_map):
"""Given an event object and a list of possible redacting event ids,
determine whether to honour any of those redactions and if so return a redacted
event.
@ -631,6 +647,8 @@ class EventsWorkerStore(SQLBaseStore):
Args:
original_ev (EventBase):
redactions (iterable[str]): list of event ids of potential redaction events
event_map (dict[str, EventBase]): other events which have been fetched, in
which we can look up the redaaction events. Map from event id to event.
Returns:
Deferred[EventBase|None]: if the event should be redacted, a pruned
@ -640,15 +658,9 @@ class EventsWorkerStore(SQLBaseStore):
# we choose to ignore redactions of m.room.create events.
return None
if original_ev.type == "m.room.redaction":
# ... and redaction events
return None
redaction_map = yield self._get_events_from_cache_or_db(redactions)
for redaction_id in redactions:
redaction_entry = redaction_map.get(redaction_id)
if not redaction_entry:
redaction_event = event_map.get(redaction_id)
if not redaction_event or redaction_event.rejected_reason:
# we don't have the redaction event, or the redaction event was not
# authorized.
logger.debug(
@ -658,7 +670,6 @@ class EventsWorkerStore(SQLBaseStore):
)
continue
redaction_event = redaction_entry.event
if redaction_event.room_id != original_ev.room_id:
logger.debug(
"%s was redacted by %s but redaction was in a different room!",

View file

@ -308,22 +308,36 @@ class PusherStore(PusherWorkerStore):
def update_pusher_last_stream_ordering_and_success(
self, app_id, pushkey, user_id, last_stream_ordering, last_success
):
yield self._simple_update_one(
"pushers",
{"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
{
"""Update the last stream ordering position we've processed up to for
the given pusher.
Args:
app_id (str)
pushkey (str)
last_stream_ordering (int)
last_success (int)
Returns:
Deferred[bool]: True if the pusher still exists; False if it has been deleted.
"""
updated = yield self._simple_update(
table="pushers",
keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
updatevalues={
"last_stream_ordering": last_stream_ordering,
"last_success": last_success,
},
desc="update_pusher_last_stream_ordering_and_success",
)
return bool(updated)
@defer.inlineCallbacks
def update_pusher_failing_since(self, app_id, pushkey, user_id, failing_since):
yield self._simple_update_one(
"pushers",
{"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
{"failing_since": failing_since},
yield self._simple_update(
table="pushers",
keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
updatevalues={"failing_since": failing_since},
desc="update_pusher_failing_since",
)

View file

@ -55,7 +55,7 @@ class TTLCache(object):
if e != SENTINEL:
self._expiry_list.remove(e)
entry = _CacheEntry(expiry_time=expiry, key=key, value=value)
entry = _CacheEntry(expiry_time=expiry, ttl=ttl, key=key, value=value)
self._data[key] = entry
self._expiry_list.add(entry)
@ -87,7 +87,8 @@ class TTLCache(object):
key: key to look up
Returns:
Tuple[Any, float]: the value from the cache, and the expiry time
Tuple[Any, float, float]: the value from the cache, the expiry time
and the TTL
Raises:
KeyError if the entry is not found
@ -99,7 +100,7 @@ class TTLCache(object):
self._metrics.inc_misses()
raise
self._metrics.inc_hits()
return e.value, e.expiry_time
return e.value, e.expiry_time, e.ttl
def pop(self, key, default=SENTINEL):
"""Remove a value from the cache
@ -158,5 +159,6 @@ class _CacheEntry(object):
# expiry_time is the first attribute, so that entries are sorted by expiry.
expiry_time = attr.ib()
ttl = attr.ib()
key = attr.ib()
value = attr.ib()

View file

@ -25,17 +25,19 @@ from twisted.internet._sslverify import ClientTLSOptions, OpenSSLCertificateOpti
from twisted.internet.protocol import Factory
from twisted.protocols.tls import TLSMemoryBIOFactory
from twisted.web._newclient import ResponseNeverReceived
from twisted.web.client import Agent
from twisted.web.http import HTTPChannel
from twisted.web.http_headers import Headers
from twisted.web.iweb import IPolicyForHTTPS
from synapse.config.homeserver import HomeServerConfig
from synapse.crypto.context_factory import ClientTLSOptionsFactory
from synapse.http.federation.matrix_federation_agent import (
MatrixFederationAgent,
from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
from synapse.http.federation.srv_resolver import Server
from synapse.http.federation.well_known_resolver import (
WellKnownResolver,
_cache_period_from_headers,
)
from synapse.http.federation.srv_resolver import Server
from synapse.logging.context import LoggingContext
from synapse.util.caches.ttlcache import TTLCache
@ -79,9 +81,10 @@ class MatrixFederationAgentTests(TestCase):
self._config = config = HomeServerConfig()
config.parse_config_dict(config_dict, "", "")
self.tls_factory = ClientTLSOptionsFactory(config)
self.agent = MatrixFederationAgent(
reactor=self.reactor,
tls_client_options_factory=ClientTLSOptionsFactory(config),
tls_client_options_factory=self.tls_factory,
_srv_resolver=self.mock_resolver,
_well_known_cache=self.well_known_cache,
)
@ -928,20 +931,16 @@ class MatrixFederationAgentTests(TestCase):
self.reactor.pump((0.1,))
self.successResultOf(test_d)
@defer.inlineCallbacks
def do_get_well_known(self, serv):
try:
result = yield self.agent._get_well_known(serv)
logger.info("Result from well-known fetch: %s", result)
except Exception as e:
logger.warning("Error fetching well-known: %s", e)
raise
return result
def test_well_known_cache(self):
well_known_resolver = WellKnownResolver(
self.reactor,
Agent(self.reactor, contextFactory=self.tls_factory),
well_known_cache=self.well_known_cache,
)
self.reactor.lookups["testserv"] = "1.2.3.4"
fetch_d = self.do_get_well_known(b"testserv")
fetch_d = well_known_resolver.get_well_known(b"testserv")
# there should be an attempt to connect on port 443 for the .well-known
clients = self.reactor.tcpClients
@ -953,26 +952,26 @@ class MatrixFederationAgentTests(TestCase):
well_known_server = self._handle_well_known_connection(
client_factory,
expected_sni=b"testserv",
response_headers={b"Cache-Control": b"max-age=10"},
response_headers={b"Cache-Control": b"max-age=1000"},
content=b'{ "m.server": "target-server" }',
)
r = self.successResultOf(fetch_d)
self.assertEqual(r, b"target-server")
self.assertEqual(r.delegated_server, b"target-server")
# close the tcp connection
well_known_server.loseConnection()
# repeat the request: it should hit the cache
fetch_d = self.do_get_well_known(b"testserv")
fetch_d = well_known_resolver.get_well_known(b"testserv")
r = self.successResultOf(fetch_d)
self.assertEqual(r, b"target-server")
self.assertEqual(r.delegated_server, b"target-server")
# expire the cache
self.reactor.pump((10.0,))
self.reactor.pump((1000.0,))
# now it should connect again
fetch_d = self.do_get_well_known(b"testserv")
fetch_d = well_known_resolver.get_well_known(b"testserv")
self.assertEqual(len(clients), 1)
(host, port, client_factory, _timeout, _bindAddress) = clients.pop(0)
@ -986,7 +985,76 @@ class MatrixFederationAgentTests(TestCase):
)
r = self.successResultOf(fetch_d)
self.assertEqual(r, b"other-server")
self.assertEqual(r.delegated_server, b"other-server")
def test_well_known_cache_with_temp_failure(self):
"""Test that we refetch well-known before the cache expires, and that
it ignores transient errors.
"""
well_known_resolver = WellKnownResolver(
self.reactor,
Agent(self.reactor, contextFactory=self.tls_factory),
well_known_cache=self.well_known_cache,
)
self.reactor.lookups["testserv"] = "1.2.3.4"
fetch_d = well_known_resolver.get_well_known(b"testserv")
# there should be an attempt to connect on port 443 for the .well-known
clients = self.reactor.tcpClients
self.assertEqual(len(clients), 1)
(host, port, client_factory, _timeout, _bindAddress) = clients.pop(0)
self.assertEqual(host, "1.2.3.4")
self.assertEqual(port, 443)
well_known_server = self._handle_well_known_connection(
client_factory,
expected_sni=b"testserv",
response_headers={b"Cache-Control": b"max-age=1000"},
content=b'{ "m.server": "target-server" }',
)
r = self.successResultOf(fetch_d)
self.assertEqual(r.delegated_server, b"target-server")
# close the tcp connection
well_known_server.loseConnection()
# Get close to the cache expiry, this will cause the resolver to do
# another lookup.
self.reactor.pump((900.0,))
fetch_d = well_known_resolver.get_well_known(b"testserv")
clients = self.reactor.tcpClients
(host, port, client_factory, _timeout, _bindAddress) = clients.pop(0)
# fonx the connection attempt, this will be treated as a temporary
# failure.
client_factory.clientConnectionFailed(None, Exception("nope"))
# attemptdelay on the hostnameendpoint is 0.3, so takes that long before the
# .well-known request fails.
self.reactor.pump((0.4,))
# Resolver should return cached value, despite the lookup failing.
r = self.successResultOf(fetch_d)
self.assertEqual(r.delegated_server, b"target-server")
# Expire the cache and repeat the request
self.reactor.pump((100.0,))
# Repated the request, this time it should fail if the lookup fails.
fetch_d = well_known_resolver.get_well_known(b"testserv")
clients = self.reactor.tcpClients
(host, port, client_factory, _timeout, _bindAddress) = clients.pop(0)
client_factory.clientConnectionFailed(None, Exception("nope"))
self.reactor.pump((0.4,))
r = self.successResultOf(fetch_d)
self.assertEqual(r.delegated_server, None)
class TestCachePeriodFromHeaders(TestCase):

View file

@ -17,6 +17,8 @@
from mock import Mock
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.api.room_versions import RoomVersions
from synapse.types import RoomID, UserID
@ -216,3 +218,71 @@ class RedactionTestCase(unittest.HomeserverTestCase):
},
event.unsigned["redacted_because"],
)
def test_circular_redaction(self):
redaction_event_id1 = "$redaction1_id:test"
redaction_event_id2 = "$redaction2_id:test"
class EventIdManglingBuilder:
def __init__(self, base_builder, event_id):
self._base_builder = base_builder
self._event_id = event_id
@defer.inlineCallbacks
def build(self, prev_event_ids):
built_event = yield self._base_builder.build(prev_event_ids)
built_event.event_id = self._event_id
built_event._event_dict["event_id"] = self._event_id
return built_event
@property
def room_id(self):
return self._base_builder.room_id
event_1, context_1 = self.get_success(
self.event_creation_handler.create_new_client_event(
EventIdManglingBuilder(
self.event_builder_factory.for_room_version(
RoomVersions.V1,
{
"type": EventTypes.Redaction,
"sender": self.u_alice.to_string(),
"room_id": self.room1.to_string(),
"content": {"reason": "test"},
"redacts": redaction_event_id2,
},
),
redaction_event_id1,
)
)
)
self.get_success(self.store.persist_event(event_1, context_1))
event_2, context_2 = self.get_success(
self.event_creation_handler.create_new_client_event(
EventIdManglingBuilder(
self.event_builder_factory.for_room_version(
RoomVersions.V1,
{
"type": EventTypes.Redaction,
"sender": self.u_alice.to_string(),
"room_id": self.room1.to_string(),
"content": {"reason": "test"},
"redacts": redaction_event_id1,
},
),
redaction_event_id2,
)
)
)
self.get_success(self.store.persist_event(event_2, context_2))
# fetch one of the redactions
fetched = self.get_success(self.store.get_event(redaction_event_id1))
# it should have been redacted
self.assertEqual(fetched.unsigned["redacted_by"], redaction_event_id2)
self.assertEqual(
fetched.unsigned["redacted_because"].event_id, redaction_event_id2
)

View file

@ -36,7 +36,7 @@ class CacheTestCase(unittest.TestCase):
self.assertTrue("one" in self.cache)
self.assertEqual(self.cache.get("one"), "1")
self.assertEqual(self.cache["one"], "1")
self.assertEqual(self.cache.get_with_expiry("one"), ("1", 110))
self.assertEqual(self.cache.get_with_expiry("one"), ("1", 110, 10))
self.assertEqual(self.cache._metrics.hits, 3)
self.assertEqual(self.cache._metrics.misses, 0)
@ -77,7 +77,7 @@ class CacheTestCase(unittest.TestCase):
self.assertEqual(self.cache["two"], "2")
self.assertEqual(self.cache["three"], "3")
self.assertEqual(self.cache.get_with_expiry("two"), ("2", 120))
self.assertEqual(self.cache.get_with_expiry("two"), ("2", 120, 20))
self.assertEqual(self.cache._metrics.hits, 5)
self.assertEqual(self.cache._metrics.misses, 0)