Merge branch 'develop' into bwindels/registerasregularuser

This commit is contained in:
Bruno Windels 2018-09-19 18:18:40 +02:00
commit 77f1de141d
112 changed files with 1704 additions and 643 deletions

View file

@ -9,6 +9,8 @@ jobs:
- store_artifacts:
path: ~/project/logs
destination: logs
- store_test_results:
path: logs
sytestpy2postgres:
machine: true
steps:
@ -18,15 +20,45 @@ jobs:
- store_artifacts:
path: ~/project/logs
destination: logs
- store_test_results:
path: logs
sytestpy2merged:
machine: true
steps:
- checkout
- run: bash .circleci/merge_base_branch.sh
- run: docker pull matrixdotorg/sytest-synapsepy2
- run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs matrixdotorg/sytest-synapsepy2
- store_artifacts:
path: ~/project/logs
destination: logs
- store_test_results:
path: logs
sytestpy2postgresmerged:
machine: true
steps:
- checkout
- run: bash .circleci/merge_base_branch.sh
- run: docker pull matrixdotorg/sytest-synapsepy2
- run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs -e POSTGRES=1 matrixdotorg/sytest-synapsepy2
- store_artifacts:
path: ~/project/logs
destination: logs
- store_test_results:
path: logs
sytestpy3:
machine: true
steps:
- checkout
- run: docker pull matrixdotorg/sytest-synapsepy3
- run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs hawkowl/sytestpy3
- run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs matrixdotorg/sytest-synapsepy3
- store_artifacts:
path: ~/project/logs
destination: logs
- store_test_results:
path: logs
sytestpy3postgres:
machine: true
steps:
@ -36,6 +68,32 @@ jobs:
- store_artifacts:
path: ~/project/logs
destination: logs
- store_test_results:
path: logs
sytestpy3merged:
machine: true
steps:
- checkout
- run: bash .circleci/merge_base_branch.sh
- run: docker pull matrixdotorg/sytest-synapsepy3
- run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs matrixdotorg/sytest-synapsepy3
- store_artifacts:
path: ~/project/logs
destination: logs
- store_test_results:
path: logs
sytestpy3postgresmerged:
machine: true
steps:
- checkout
- run: bash .circleci/merge_base_branch.sh
- run: docker pull matrixdotorg/sytest-synapsepy3
- run: docker run --rm -it -v $(pwd)\:/src -v $(pwd)/logs\:/logs -e POSTGRES=1 matrixdotorg/sytest-synapsepy3
- store_artifacts:
path: ~/project/logs
destination: logs
- store_test_results:
path: logs
workflows:
version: 2
@ -43,6 +101,21 @@ workflows:
jobs:
- sytestpy2
- sytestpy2postgres
# Currently broken while the Python 3 port is incomplete
# - sytestpy3
# - sytestpy3postgres
- sytestpy3
- sytestpy3postgres
- sytestpy2merged:
filters:
branches:
ignore: /develop|master/
- sytestpy2postgresmerged:
filters:
branches:
ignore: /develop|master/
- sytestpy3merged:
filters:
branches:
ignore: /develop|master/
- sytestpy3postgresmerged:
filters:
branches:
ignore: /develop|master/

31
.circleci/merge_base_branch.sh Executable file
View file

@ -0,0 +1,31 @@
#!/usr/bin/env bash
set -e
# CircleCI doesn't give CIRCLE_PR_NUMBER in the environment for non-forked PRs. Wonderful.
# In this case, we just need to do some ~shell magic~ to strip it out of the PULL_REQUEST URL.
echo 'export CIRCLE_PR_NUMBER="${CIRCLE_PR_NUMBER:-${CIRCLE_PULL_REQUEST##*/}}"' >> $BASH_ENV
source $BASH_ENV
if [[ -z "${CIRCLE_PR_NUMBER}" ]]
then
echo "Can't figure out what the PR number is!"
exit 1
fi
# Get the reference, using the GitHub API
GITBASE=`curl -q https://api.github.com/repos/matrix-org/synapse/pulls/${CIRCLE_PR_NUMBER} | jq -r '.base.ref'`
# Show what we are before
git show -s
# Set up username so it can do a merge
git config --global user.email bot@matrix.org
git config --global user.name "A robot"
# Fetch and merge. If it doesn't work, it will raise due to set -e.
git fetch -u origin $GITBASE
git merge --no-edit origin/$GITBASE
# Show what we are after.
git show -s

1
.gitignore vendored
View file

@ -4,6 +4,7 @@
.DS_Store
_trial_temp/
_trial_temp*/
logs/
dbs/
*.egg

View file

@ -25,6 +25,9 @@ matrix:
services:
- postgresql
- python: 3.5
env: TOX_ENV=py35
- python: 3.6
env: TOX_ENV=py36

View file

@ -459,37 +459,13 @@ https://github.com/NixOS/nixpkgs/blob/master/nixos/modules/services/misc/matrix-
Windows Install
---------------
Synapse can be installed on Cygwin. It requires the following Cygwin packages:
- gcc
- git
- libffi-devel
- openssl (and openssl-devel, python-openssl)
- python
- python-setuptools
The content repository requires additional packages and will be unable to process
uploads without them:
- libjpeg8
- libjpeg8-devel
- zlib
If you choose to install Synapse without these packages, you will need to reinstall
``pillow`` for changes to be applied, e.g. ``pip uninstall pillow`` ``pip install
pillow --user``
Troubleshooting:
- You may need to upgrade ``setuptools`` to get this to work correctly:
``pip install setuptools --upgrade``.
- You may encounter errors indicating that ``ffi.h`` is missing, even with
``libffi-devel`` installed. If you do, copy the ``.h`` files:
``cp /usr/lib/libffi-3.0.13/include/*.h /usr/include``
- You may need to install libsodium from source in order to install PyNacl. If
you do, you may need to create a symlink to ``libsodium.a`` so ``ld`` can find
it: ``ln -s /usr/local/lib/libsodium.a /usr/lib/libsodium.a``
If you wish to run or develop Synapse on Windows, the Windows Subsystem For
Linux provides a Linux environment on Windows 10 which is capable of using the
Debian, Fedora, or source installation methods. More information about WSL can
be found at https://docs.microsoft.com/en-us/windows/wsl/install-win10 for
Windows 10 and https://docs.microsoft.com/en-us/windows/wsl/install-on-server
for Windows Server.
Troubleshooting
===============
@ -908,7 +884,7 @@ to install using pip and a virtualenv::
virtualenv -p python2.7 env
source env/bin/activate
python synapse/python_dependencies.py | xargs pip install
python -m synapse.python_dependencies | xargs pip install
pip install lxml mock
This will run a process of downloading and installing all the needed
@ -963,5 +939,13 @@ variable. The default is 0.5, which can be decreased to reduce RAM usage
in memory constrained enviroments, or increased if performance starts to
degrade.
Using `libjemalloc <http://jemalloc.net/>`_ can also yield a significant
improvement in overall amount, and especially in terms of giving back RAM
to the OS. To use it, the library must simply be put in the LD_PRELOAD
environment variable when launching Synapse. On Debian, this can be done
by installing the ``libjemalloc1`` package and adding this line to
``/etc/default/matrix-synapse``::
LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libjemalloc.so.1
.. _`key_management`: https://matrix.org/docs/spec/server_server/unstable.html#retrieving-server-keys

1
changelog.d/3576.feature Normal file
View file

@ -0,0 +1 @@
Python 3.5+ is now supported.

1
changelog.d/3704.misc Normal file
View file

@ -0,0 +1 @@
CircleCI tests now run on the potential merge of a PR.

1
changelog.d/3822.misc Normal file
View file

@ -0,0 +1 @@
crypto/ is now ported to Python 3.

1
changelog.d/3823.misc Normal file
View file

@ -0,0 +1 @@
rest/ is now ported to Python 3.

1
changelog.d/3824.bugfix Normal file
View file

@ -0,0 +1 @@
Fix jwt import check

1
changelog.d/3826.misc Normal file
View file

@ -0,0 +1 @@
add some logging for the keyring queue

1
changelog.d/3827.misc Normal file
View file

@ -0,0 +1 @@
speed up lazy loading by 2-3x

1
changelog.d/3835.bugfix Normal file
View file

@ -0,0 +1 @@
fix VOIP crashes under Python 3 (#3821)

1
changelog.d/3840.misc Normal file
View file

@ -0,0 +1 @@
Disable lazy loading for incremental syncs for now

1
changelog.d/3841.bugfix Normal file
View file

@ -0,0 +1 @@
Fix manhole so that it works with latest openssh clients

1
changelog.d/3845.bugfix Normal file
View file

@ -0,0 +1 @@
Fix outbound requests occasionally wedging, which can result in federation breaking between servers.

1
changelog.d/3846.feature Normal file
View file

@ -0,0 +1 @@
Add synapse_admin_mau:registered_reserved_users metric to expose number of real reaserved users

1
changelog.d/3847.misc Normal file
View file

@ -0,0 +1 @@
federation/ is now ported to Python 3.

1
changelog.d/3851.bugfix Normal file
View file

@ -0,0 +1 @@
Show heroes if room name/canonical alias has been deleted

1
changelog.d/3853.misc Normal file
View file

@ -0,0 +1 @@
Log when we retry outbound requests

1
changelog.d/3855.misc Normal file
View file

@ -0,0 +1 @@
Removed some excess logging messages.

1
changelog.d/3856.misc Normal file
View file

@ -0,0 +1 @@
Speed up purge history for rooms that have been previously purged

1
changelog.d/3857.misc Normal file
View file

@ -0,0 +1 @@
Refactor some HTTP timeout code.

1
changelog.d/3858.misc Normal file
View file

@ -0,0 +1 @@
Fix running merged builds on CircleCI

1
changelog.d/3859.bugfix Normal file
View file

@ -0,0 +1 @@
Fix handling of redacted events from federation

1
changelog.d/3860.misc Normal file
View file

@ -0,0 +1 @@
Fix typo in replication stream exception.

1
changelog.d/3871.misc Normal file
View file

@ -0,0 +1 @@
Add in flight real time metrics for Measure blocks

1
changelog.d/3872.misc Normal file
View file

@ -0,0 +1 @@
Disable buffering and automatic retrying in treq requests to prevent timeouts.

2
changelog.d/3873.misc Normal file
View file

@ -0,0 +1,2 @@
Remove documentation regarding installation on Cygwin, the use of WSL is
recommended instead.

0
changelog.d/3874.bugfix Normal file
View file

1
changelog.d/3875.bugfix Normal file
View file

@ -0,0 +1 @@
Mitigate outbound federation randomly becoming wedged

1
changelog.d/3877.misc Normal file
View file

@ -0,0 +1 @@
mention jemalloc in the README

1
changelog.d/3879.bugfix Normal file
View file

@ -0,0 +1 @@
Don't ratelimit autojoins

1
changelog.d/3883.feature Normal file
View file

@ -0,0 +1 @@
Adding the ability to change MAX_UPLOAD_SIZE for the docker container variables.

1
changelog.d/3888.misc Normal file
View file

@ -0,0 +1 @@
Remove unmaintained "nuke-room-from-db.sh" script

1
changelog.d/3889.bugfix Normal file
View file

@ -0,0 +1 @@
Fix 500 error when deleting unknown room alias

1
changelog.d/3892.bugfix Normal file
View file

@ -0,0 +1 @@
Fix some b'abcd' noise in logs and metrics

1
changelog.d/3894.feature Normal file
View file

@ -0,0 +1 @@
Report "python_version" in the phone home stats

1
changelog.d/3895.bugfix Normal file
View file

@ -0,0 +1 @@
Fix some b'abcd' noise in logs and metrics

1
changelog.d/3897.misc Normal file
View file

@ -0,0 +1 @@
Fix typo in README, synaspse -> synapse

1
changelog.d/3899.bugfix Normal file
View file

@ -0,0 +1 @@
When we join a room, always try the server we used for the alias lookup first, to avoid unresponsive and out-of-date servers.

1
changelog.d/3903.misc Normal file
View file

@ -0,0 +1 @@
Increase the timeout when filling missing events in federation requests

1
changelog.d/3906.misc Normal file
View file

@ -0,0 +1 @@
Improve logging of outbound federation requests

1
changelog.d/3907.bugfix Normal file
View file

@ -0,0 +1 @@
Fix incorrect server-name indication for outgoing federation requests

1
changelog.d/3909.misc Normal file
View file

@ -0,0 +1 @@
Improve logging of outbound federation requests

1
changelog.d/3910.bugfix Normal file
View file

@ -0,0 +1 @@
Fix bug where things occaisonally were not being timed out correctly.

View file

@ -88,6 +88,7 @@ variables are available for configuration:
* ``SYNAPSE_TURN_URIS``, set this variable to the coma-separated list of TURN
uris to enable TURN for this homeserver.
* ``SYNAPSE_TURN_SECRET``, set this to the TURN shared secret if required.
* ``SYNAPSE_MAX_UPLOAD_SIZE``, set this variable to change the max upload size [default `10M`].
Shared secrets, that will be initialized to random values if not set:

View file

@ -85,7 +85,7 @@ federation_rc_concurrent: 3
media_store_path: "/data/media"
uploads_path: "/data/uploads"
max_upload_size: "10M"
max_upload_size: "{{ SYNAPSE_MAX_UPLOAD_SIZE or "10M" }}"
max_image_pixels: "32M"
dynamic_thumbnails: false

View file

@ -1,57 +0,0 @@
#!/bin/bash
## CAUTION:
## This script will remove (hopefully) all trace of the given room ID from
## your homeserver.db
## Do not run it lightly.
set -e
if [ "$1" == "-h" ] || [ "$1" == "" ]; then
echo "Call with ROOM_ID as first option and then pipe it into the database. So for instance you might run"
echo " nuke-room-from-db.sh <room_id> | sqlite3 homeserver.db"
echo "or"
echo " nuke-room-from-db.sh <room_id> | psql --dbname=synapse"
exit
fi
ROOMID="$1"
cat <<EOF
DELETE FROM event_forward_extremities WHERE room_id = '$ROOMID';
DELETE FROM event_backward_extremities WHERE room_id = '$ROOMID';
DELETE FROM event_edges WHERE room_id = '$ROOMID';
DELETE FROM room_depth WHERE room_id = '$ROOMID';
DELETE FROM state_forward_extremities WHERE room_id = '$ROOMID';
DELETE FROM events WHERE room_id = '$ROOMID';
DELETE FROM event_json WHERE room_id = '$ROOMID';
DELETE FROM state_events WHERE room_id = '$ROOMID';
DELETE FROM current_state_events WHERE room_id = '$ROOMID';
DELETE FROM room_memberships WHERE room_id = '$ROOMID';
DELETE FROM feedback WHERE room_id = '$ROOMID';
DELETE FROM topics WHERE room_id = '$ROOMID';
DELETE FROM room_names WHERE room_id = '$ROOMID';
DELETE FROM rooms WHERE room_id = '$ROOMID';
DELETE FROM room_hosts WHERE room_id = '$ROOMID';
DELETE FROM room_aliases WHERE room_id = '$ROOMID';
DELETE FROM state_groups WHERE room_id = '$ROOMID';
DELETE FROM state_groups_state WHERE room_id = '$ROOMID';
DELETE FROM receipts_graph WHERE room_id = '$ROOMID';
DELETE FROM receipts_linearized WHERE room_id = '$ROOMID';
DELETE FROM event_search WHERE room_id = '$ROOMID';
DELETE FROM guest_access WHERE room_id = '$ROOMID';
DELETE FROM history_visibility WHERE room_id = '$ROOMID';
DELETE FROM room_tags WHERE room_id = '$ROOMID';
DELETE FROM room_tags_revisions WHERE room_id = '$ROOMID';
DELETE FROM room_account_data WHERE room_id = '$ROOMID';
DELETE FROM event_push_actions WHERE room_id = '$ROOMID';
DELETE FROM local_invites WHERE room_id = '$ROOMID';
DELETE FROM pusher_throttle WHERE room_id = '$ROOMID';
DELETE FROM event_reports WHERE room_id = '$ROOMID';
DELETE FROM public_room_list_stream WHERE room_id = '$ROOMID';
DELETE FROM stream_ordering_to_exterm WHERE room_id = '$ROOMID';
DELETE FROM event_auth WHERE room_id = '$ROOMID';
DELETE FROM appservice_room_list WHERE room_id = '$ROOMID';
VACUUM;
EOF

View file

@ -17,4 +17,14 @@
""" This is a reference implementation of a Matrix home server.
"""
try:
from twisted.internet import protocol
from twisted.internet.protocol import Factory
from twisted.names.dns import DNSDatagramProtocol
protocol.Factory.noisy = False
Factory.noisy = False
DNSDatagramProtocol.noisy = False
except ImportError:
pass
__version__ = "0.33.4"

View file

@ -307,6 +307,10 @@ class SynapseHomeServer(HomeServer):
# Gauges to expose monthly active user control metrics
current_mau_gauge = Gauge("synapse_admin_mau:current", "Current MAU")
max_mau_gauge = Gauge("synapse_admin_mau:max", "MAU Limit")
registered_reserved_users_mau_gauge = Gauge(
"synapse_admin_mau:registered_reserved_users",
"Registered users with reserved threepids"
)
def setup(config_options):
@ -453,6 +457,10 @@ def run(hs):
stats["homeserver"] = hs.config.server_name
stats["timestamp"] = now
stats["uptime_seconds"] = uptime
version = sys.version_info
stats["python_version"] = "{}.{}.{}".format(
version.major, version.minor, version.micro
)
stats["total_users"] = yield hs.get_datastore().count_all_users()
total_nonbridged_users = yield hs.get_datastore().count_nonbridged_users()
@ -531,10 +539,14 @@ def run(hs):
@defer.inlineCallbacks
def generate_monthly_active_users():
count = 0
current_mau_count = 0
reserved_count = 0
store = hs.get_datastore()
if hs.config.limit_usage_by_mau:
count = yield hs.get_datastore().get_monthly_active_count()
current_mau_gauge.set(float(count))
current_mau_count = yield store.get_monthly_active_count()
reserved_count = yield store.get_registered_reserved_users_count()
current_mau_gauge.set(float(current_mau_count))
registered_reserved_users_mau_gauge.set(float(reserved_count))
max_mau_gauge.set(float(hs.config.max_mau_value))
hs.get_datastore().initialise_reserved_users(

View file

@ -21,7 +21,7 @@ from .consent_config import ConsentConfig
from .database import DatabaseConfig
from .emailconfig import EmailConfig
from .groups import GroupsConfig
from .jwt import JWTConfig
from .jwt_config import JWTConfig
from .key import KeyConfig
from .logger import LoggingConfig
from .metrics import MetricsConfig

View file

@ -227,7 +227,22 @@ def setup_logging(config, use_worker_options=False):
#
# However this may not be too much of a problem if we are just writing to a file.
observer = STDLibLogObserver()
def _log(event):
if "log_text" in event:
if event["log_text"].startswith("DNSDatagramProtocol starting on "):
return
if event["log_text"].startswith("(UDP Port "):
return
if event["log_text"].startswith("Timing out client"):
return
return observer(event)
globalLogBeginner.beginLoggingTo(
[observer],
[_log],
redirectStandardIO=not config.no_redirect_stdio,
)

View file

@ -123,6 +123,6 @@ class ClientTLSOptionsFactory(object):
def get_options(self, host):
return ClientTLSOptions(
host.decode('utf-8'),
host,
CertificateOptions(verify=False).getContext()
)

View file

@ -50,7 +50,7 @@ def fetch_server_key(server_name, tls_client_options_factory, path=KEY_API_V1):
defer.returnValue((server_response, server_certificate))
except SynapseKeyClientError as e:
logger.warn("Error getting key for %r: %s", server_name, e)
if e.status.startswith("4"):
if e.status.startswith(b"4"):
# Don't retry for 4xx responses.
raise IOError("Cannot get key for %r" % server_name)
except (ConnectError, DomainError) as e:
@ -82,6 +82,12 @@ class SynapseKeyClientProtocol(HTTPClient):
self._peer = self.transport.getPeer()
logger.debug("Connected to %s", self._peer)
if not isinstance(self.path, bytes):
self.path = self.path.encode('ascii')
if not isinstance(self.host, bytes):
self.host = self.host.encode('ascii')
self.sendCommand(b"GET", self.path)
if self.host:
self.sendHeader(b"Host", self.host)

View file

@ -16,9 +16,10 @@
import hashlib
import logging
import urllib
from collections import namedtuple
from six.moves import urllib
from signedjson.key import (
decode_verify_key_bytes,
encode_verify_key_base64,
@ -40,6 +41,7 @@ from synapse.api.errors import Codes, SynapseError
from synapse.crypto.keyclient import fetch_server_key
from synapse.util import logcontext, unwrapFirstError
from synapse.util.logcontext import (
LoggingContext,
PreserveLoggingContext,
preserve_fn,
run_in_background,
@ -216,23 +218,34 @@ class Keyring(object):
servers have completed. Follows the synapse rules of logcontext
preservation.
"""
loop_count = 1
while True:
wait_on = [
self.key_downloads[server_name]
(server_name, self.key_downloads[server_name])
for server_name in server_names
if server_name in self.key_downloads
]
if wait_on:
with PreserveLoggingContext():
yield defer.DeferredList(wait_on)
else:
if not wait_on:
break
logger.info(
"Waiting for existing lookups for %s to complete [loop %i]",
[w[0] for w in wait_on], loop_count,
)
with PreserveLoggingContext():
yield defer.DeferredList((w[1] for w in wait_on))
loop_count += 1
ctx = LoggingContext.current_context()
def rm(r, server_name_):
self.key_downloads.pop(server_name_, None)
with PreserveLoggingContext(ctx):
logger.debug("Releasing key lookup lock on %s", server_name_)
self.key_downloads.pop(server_name_, None)
return r
for server_name, deferred in server_to_deferred.items():
logger.debug("Got key lookup lock on %s", server_name)
self.key_downloads[server_name] = deferred
deferred.addBoth(rm, server_name)
@ -432,7 +445,7 @@ class Keyring(object):
# an incoming request.
query_response = yield self.client.post_json(
destination=perspective_name,
path=b"/_matrix/key/v2/query",
path="/_matrix/key/v2/query",
data={
u"server_keys": {
server_name: {
@ -513,8 +526,8 @@ class Keyring(object):
(response, tls_certificate) = yield fetch_server_key(
server_name, self.hs.tls_client_options_factory,
path=(b"/_matrix/key/v2/server/%s" % (
urllib.quote(requested_key_id),
path=("/_matrix/key/v2/server/%s" % (
urllib.parse.quote(requested_key_id),
)).encode("ascii"),
)

View file

@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import six
from synapse.util.caches import intern_dict
from synapse.util.frozenutils import freeze
@ -147,6 +149,9 @@ class EventBase(object):
def items(self):
return list(self._event_dict.items())
def keys(self):
return six.iterkeys(self._event_dict)
class FrozenEvent(EventBase):
def __init__(self, event_dict, internal_metadata_dict={}, rejected_reason=None):

View file

@ -143,11 +143,31 @@ class FederationBase(object):
def callback(_, pdu):
with logcontext.PreserveLoggingContext(ctx):
if not check_event_content_hash(pdu):
logger.warn(
"Event content has been tampered, redacting %s: %s",
pdu.event_id, pdu.get_pdu_json()
)
return prune_event(pdu)
# let's try to distinguish between failures because the event was
# redacted (which are somewhat expected) vs actual ball-tampering
# incidents.
#
# This is just a heuristic, so we just assume that if the keys are
# about the same between the redacted and received events, then the
# received event was probably a redacted copy (but we then use our
# *actual* redacted copy to be on the safe side.)
redacted_event = prune_event(pdu)
if (
set(redacted_event.keys()) == set(pdu.keys()) and
set(six.iterkeys(redacted_event.content))
== set(six.iterkeys(pdu.content))
):
logger.info(
"Event %s seems to have been redacted; using our redacted "
"copy",
pdu.event_id,
)
else:
logger.warning(
"Event %s content has been tampered, redacting",
pdu.event_id, pdu.get_pdu_json(),
)
return redacted_event
if self.spam_checker.check_event_for_spam(pdu):
logger.warn(
@ -162,8 +182,8 @@ class FederationBase(object):
failure.trap(SynapseError)
with logcontext.PreserveLoggingContext(ctx):
logger.warn(
"Signature check failed for %s",
pdu.event_id,
"Signature check failed for %s: %s",
pdu.event_id, failure.getErrorMessage(),
)
return failure

View file

@ -271,10 +271,10 @@ class FederationClient(FederationBase):
event_id, destination, e,
)
except NotRetryingDestination as e:
logger.info(e.message)
logger.info(str(e))
continue
except FederationDeniedError as e:
logger.info(e.message)
logger.info(str(e))
continue
except Exception as e:
pdu_attempts[destination] = now
@ -510,7 +510,7 @@ class FederationClient(FederationBase):
else:
logger.warn(
"Failed to %s via %s: %i %s",
description, destination, e.code, e.message,
description, destination, e.code, e.args[0],
)
except Exception:
logger.warn(
@ -875,7 +875,7 @@ class FederationClient(FederationBase):
except Exception as e:
logger.exception(
"Failed to send_third_party_invite via %s: %s",
destination, e.message
destination, str(e)
)
raise RuntimeError("Failed to send to any server.")

View file

@ -15,7 +15,8 @@
# limitations under the License.
import logging
import urllib
from six.moves import urllib
from twisted.internet import defer
@ -951,4 +952,4 @@ def _create_path(prefix, path, *args):
Returns:
str
"""
return prefix + path % tuple(urllib.quote(arg, "") for arg in args)
return prefix + path % tuple(urllib.parse.quote(arg, "") for arg in args)

View file

@ -90,8 +90,8 @@ class Authenticator(object):
@defer.inlineCallbacks
def authenticate_request(self, request, content):
json_request = {
"method": request.method,
"uri": request.uri,
"method": request.method.decode('ascii'),
"uri": request.uri.decode('ascii'),
"destination": self.server_name,
"signatures": {},
}
@ -252,7 +252,7 @@ class BaseFederationServlet(object):
by the callback method. None if the request has already been handled.
"""
content = None
if request.method in ["PUT", "POST"]:
if request.method in [b"PUT", b"POST"]:
# TODO: Handle other method types? other content types?
content = parse_json_object_from_request(request)
@ -386,7 +386,7 @@ class FederationStateServlet(BaseFederationServlet):
return self.handler.on_context_state_request(
origin,
context,
query.get("event_id", [None])[0],
parse_string_from_args(query, "event_id", None),
)
@ -397,7 +397,7 @@ class FederationStateIdsServlet(BaseFederationServlet):
return self.handler.on_state_ids_request(
origin,
room_id,
query.get("event_id", [None])[0],
parse_string_from_args(query, "event_id", None),
)
@ -405,14 +405,12 @@ class FederationBackfillServlet(BaseFederationServlet):
PATH = "/backfill/(?P<context>[^/]*)/"
def on_GET(self, origin, content, query, context):
versions = query["v"]
limits = query["limit"]
versions = [x.decode('ascii') for x in query[b"v"]]
limit = parse_integer_from_args(query, "limit", None)
if not limits:
if not limit:
return defer.succeed((400, {"error": "Did not include limit param"}))
limit = int(limits[-1])
return self.handler.on_backfill_request(origin, context, versions, limit)
@ -423,7 +421,7 @@ class FederationQueryServlet(BaseFederationServlet):
def on_GET(self, origin, content, query, query_type):
return self.handler.on_query_request(
query_type,
{k: v[0].decode("utf-8") for k, v in query.items()}
{k.decode('utf8'): v[0].decode("utf-8") for k, v in query.items()}
)
@ -630,14 +628,14 @@ class OpenIdUserInfo(BaseFederationServlet):
@defer.inlineCallbacks
def on_GET(self, origin, content, query):
token = query.get("access_token", [None])[0]
token = query.get(b"access_token", [None])[0]
if token is None:
defer.returnValue((401, {
"errcode": "M_MISSING_TOKEN", "error": "Access Token required"
}))
return
user_id = yield self.handler.on_openid_userinfo(token)
user_id = yield self.handler.on_openid_userinfo(token.decode('ascii'))
if user_id is None:
defer.returnValue((401, {

View file

@ -20,7 +20,14 @@ import string
from twisted.internet import defer
from synapse.api.constants import EventTypes
from synapse.api.errors import AuthError, CodeMessageException, Codes, SynapseError
from synapse.api.errors import (
AuthError,
CodeMessageException,
Codes,
NotFoundError,
StoreError,
SynapseError,
)
from synapse.types import RoomAlias, UserID, get_domain_from_id
from ._base import BaseHandler
@ -109,7 +116,13 @@ class DirectoryHandler(BaseHandler):
def delete_association(self, requester, user_id, room_alias):
# association deletion for human users
can_delete = yield self._user_can_delete_alias(room_alias, user_id)
try:
can_delete = yield self._user_can_delete_alias(room_alias, user_id)
except StoreError as e:
if e.code == 404:
raise NotFoundError("Unknown room alias")
raise
if not can_delete:
raise AuthError(
403, "You don't have permission to delete the alias.",
@ -320,7 +333,7 @@ class DirectoryHandler(BaseHandler):
def _user_can_delete_alias(self, alias, user_id):
creator = yield self.store.get_room_alias_creator(alias.to_string())
if creator and creator == user_id:
if creator is not None and creator == user_id:
defer.returnValue(True)
is_admin = yield self.auth.is_server_admin(UserID.from_string(user_id))

View file

@ -360,6 +360,35 @@ class FederationHandler(BaseHandler):
# apparently.
#
# see https://github.com/matrix-org/synapse/pull/1744
#
# ----
#
# Update richvdh 2018/09/18: There are a number of problems with timing this
# request out agressively on the client side:
#
# - it plays badly with the server-side rate-limiter, which starts tarpitting you
# if you send too many requests at once, so you end up with the server carefully
# working through the backlog of your requests, which you have already timed
# out.
#
# - for this request in particular, we now (as of
# https://github.com/matrix-org/synapse/pull/3456) reject any PDUs where the
# server can't produce a plausible-looking set of prev_events - so we becone
# much more likely to reject the event.
#
# - contrary to what it says above, we do *not* fall back to fetching fresh state
# for the room if get_missing_events times out. Rather, we give up processing
# the PDU whose prevs we are missing, which then makes it much more likely that
# we'll end up back here for the *next* PDU in the list, which exacerbates the
# problem.
#
# - the agressive 10s timeout was introduced to deal with incoming federation
# requests taking 8 hours to process. It's not entirely clear why that was going
# on; certainly there were other issues causing traffic storms which are now
# resolved, and I think in any case we may be more sensible about our locking
# now. We're *certainly* more sensible about our logging.
#
# All that said: Let's try increasing the timout to 60s and see what happens.
missing_events = yield self.federation_client.get_missing_events(
origin,
@ -368,7 +397,7 @@ class FederationHandler(BaseHandler):
latest_events=[pdu],
limit=10,
min_depth=min_depth,
timeout=10000,
timeout=60000,
)
logger.info(

View file

@ -269,14 +269,7 @@ class PaginationHandler(object):
if state_ids:
state = yield self.store.get_events(list(state_ids.values()))
if state:
state = yield filter_events_for_client(
self.store,
user_id,
state.values(),
is_peeking=(member_event_id is None),
)
state = state.values()
time_now = self.clock.time_msec()

View file

@ -534,4 +534,5 @@ class RegistrationHandler(BaseHandler):
room_id=room_id,
remote_room_hosts=remote_room_hosts,
action="join",
ratelimit=False,
)

View file

@ -583,6 +583,11 @@ class RoomMemberHandler(object):
room_id = mapping["room_id"]
servers = mapping["servers"]
# put the server which owns the alias at the front of the server list.
if room_alias.domain in servers:
servers.remove(room_alias.domain)
servers.insert(0, room_alias.domain)
defer.returnValue((RoomID.from_string(room_id), servers))
@defer.inlineCallbacks

View file

@ -24,6 +24,7 @@ from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
from synapse.push.clientformat import format_push_rules_for_user
from synapse.storage.roommember import MemberSummary
from synapse.types import RoomStreamToken
from synapse.util.async_helpers import concurrently_execute
from synapse.util.caches.expiringcache import ExpiringCache
@ -525,6 +526,8 @@ class SyncHandler(object):
A deferred dict describing the room summary
"""
# FIXME: we could/should get this from room_stats when matthew/stats lands
# FIXME: this promulgates https://github.com/matrix-org/synapse/issues/3305
last_events, _ = yield self.store.get_recent_event_ids_for_room(
room_id, end_token=now_token.room_key, limit=1,
@ -537,44 +540,67 @@ class SyncHandler(object):
last_event = last_events[-1]
state_ids = yield self.store.get_state_ids_for_event(
last_event.event_id, [
(EventTypes.Member, None),
(EventTypes.Name, ''),
(EventTypes.CanonicalAlias, ''),
]
)
member_ids = {
state_key: event_id
for (t, state_key), event_id in iteritems(state_ids)
if t == EventTypes.Member
}
# this is heavily cached, thus: fast.
details = yield self.store.get_room_summary(room_id)
name_id = state_ids.get((EventTypes.Name, ''))
canonical_alias_id = state_ids.get((EventTypes.CanonicalAlias, ''))
summary = {}
# FIXME: it feels very heavy to load up every single membership event
# just to calculate the counts.
member_events = yield self.store.get_events(member_ids.values())
joined_user_ids = []
invited_user_ids = []
for ev in member_events.values():
if ev.content.get("membership") == Membership.JOIN:
joined_user_ids.append(ev.state_key)
elif ev.content.get("membership") == Membership.INVITE:
invited_user_ids.append(ev.state_key)
empty_ms = MemberSummary([], 0)
# TODO: only send these when they change.
summary["m.joined_member_count"] = len(joined_user_ids)
summary["m.invited_member_count"] = len(invited_user_ids)
summary["m.joined_member_count"] = (
details.get(Membership.JOIN, empty_ms).count
)
summary["m.invited_member_count"] = (
details.get(Membership.INVITE, empty_ms).count
)
if name_id or canonical_alias_id:
defer.returnValue(summary)
# if the room has a name or canonical_alias set, we can skip
# calculating heroes. we assume that if the event has contents, it'll
# be a valid name or canonical_alias - i.e. we're checking that they
# haven't been "deleted" by blatting {} over the top.
if name_id:
name = yield self.store.get_event(name_id, allow_none=False)
if name and name.content:
defer.returnValue(summary)
# FIXME: order by stream ordering, not alphabetic
if canonical_alias_id:
canonical_alias = yield self.store.get_event(
canonical_alias_id, allow_none=False,
)
if canonical_alias and canonical_alias.content:
defer.returnValue(summary)
joined_user_ids = [
r[0] for r in details.get(Membership.JOIN, empty_ms).members
]
invited_user_ids = [
r[0] for r in details.get(Membership.INVITE, empty_ms).members
]
gone_user_ids = (
[r[0] for r in details.get(Membership.LEAVE, empty_ms).members] +
[r[0] for r in details.get(Membership.BAN, empty_ms).members]
)
# FIXME: only build up a member_ids list for our heroes
member_ids = {}
for membership in (
Membership.JOIN,
Membership.INVITE,
Membership.LEAVE,
Membership.BAN
):
for user_id, event_id in details.get(membership, empty_ms).members:
member_ids[user_id] = event_id
# FIXME: order by stream ordering rather than as returned by SQL
me = sync_config.user.to_string()
if (joined_user_ids or invited_user_ids):
summary['m.heroes'] = sorted(
@ -586,7 +612,11 @@ class SyncHandler(object):
)[0:5]
else:
summary['m.heroes'] = sorted(
[user_id for user_id in member_ids.keys() if user_id != me]
[
user_id
for user_id in gone_user_ids
if user_id != me
]
)[0:5]
if not sync_config.filter_collection.lazy_load_members():
@ -719,6 +749,26 @@ class SyncHandler(object):
lazy_load_members=lazy_load_members,
)
elif batch.limited:
state_at_timeline_start = yield self.store.get_state_ids_for_event(
batch.events[0].event_id, types=types,
filtered_types=filtered_types,
)
# for now, we disable LL for gappy syncs - see
# https://github.com/vector-im/riot-web/issues/7211#issuecomment-419976346
# N.B. this slows down incr syncs as we are now processing way
# more state in the server than if we were LLing.
#
# We still have to filter timeline_start to LL entries (above) in order
# for _calculate_state's LL logic to work, as we have to include LL
# members for timeline senders in case they weren't loaded in the initial
# sync. We do this by (counterintuitively) by filtering timeline_start
# members to just be ones which were timeline senders, which then ensures
# all of the rest get included in the state block (if we need to know
# about them).
types = None
filtered_types = None
state_at_previous_sync = yield self.get_state_at(
room_id, stream_position=since_token, types=types,
filtered_types=filtered_types,
@ -729,24 +779,21 @@ class SyncHandler(object):
filtered_types=filtered_types,
)
state_at_timeline_start = yield self.store.get_state_ids_for_event(
batch.events[0].event_id, types=types,
filtered_types=filtered_types,
)
state_ids = _calculate_state(
timeline_contains=timeline_state,
timeline_start=state_at_timeline_start,
previous=state_at_previous_sync,
current=current_state_ids,
# we have to include LL members in case LL initial sync missed them
lazy_load_members=lazy_load_members,
)
else:
state_ids = {}
if lazy_load_members:
if types:
# We're returning an incremental sync, with no "gap" since
# the previous sync, so normally there would be no state to return
# We're returning an incremental sync, with no
# "gap" since the previous sync, so normally there would be
# no state to return.
# But we're lazy-loading, so the client might need some more
# member events to understand the events in this timeline.
# So we fish out all the member events corresponding to the
@ -1616,10 +1663,24 @@ class SyncHandler(object):
)
summary = {}
# we include a summary in room responses when we're lazy loading
# members (as the client otherwise doesn't have enough info to form
# the name itself).
if (
sync_config.filter_collection.lazy_load_members() and
(
# we recalulate the summary:
# if there are membership changes in the timeline, or
# if membership has changed during a gappy sync, or
# if this is an initial sync.
any(ev.type == EventTypes.Member for ev in batch.events) or
(
# XXX: this may include false positives in the form of LL
# members which have snuck into state
batch.limited and
any(t == EventTypes.Member for (t, k) in state)
) or
since_token is None
)
):
@ -1649,6 +1710,16 @@ class SyncHandler(object):
unread_notifications["highlight_count"] = notifs["highlight_count"]
sync_result_builder.joined.append(room_sync)
if batch.limited and since_token:
user_id = sync_result_builder.sync_config.user.to_string()
logger.info(
"Incremental gappy sync of %s for user %s with %d state events" % (
room_id,
user_id,
len(state),
)
)
elif room_builder.rtype == "archived":
room_sync = ArchivedSyncResult(
room_id=room_id,

View file

@ -38,12 +38,12 @@ def cancelled_to_request_timed_out_error(value, timeout):
return value
ACCESS_TOKEN_RE = re.compile(br'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
ACCESS_TOKEN_RE = re.compile(r'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
def redact_uri(uri):
"""Strips access tokens from the uri replaces with <redacted>"""
return ACCESS_TOKEN_RE.sub(
br'\1<redacted>\3',
r'\1<redacted>\3',
uri
)

View file

@ -43,7 +43,7 @@ from twisted.web.http_headers import Headers
from synapse.api.errors import Codes, HttpResponseException, SynapseError
from synapse.http import cancelled_to_request_timed_out_error, redact_uri
from synapse.http.endpoint import SpiderEndpoint
from synapse.util.async_helpers import add_timeout_to_deferred
from synapse.util.async_helpers import timeout_deferred
from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.logcontext import make_deferred_yieldable
@ -93,13 +93,13 @@ class SimpleHttpClient(object):
outgoing_requests_counter.labels(method).inc()
# log request but strip `access_token` (AS requests for example include this)
logger.info("Sending request %s %s", method, redact_uri(uri.encode('ascii')))
logger.info("Sending request %s %s", method, redact_uri(uri))
try:
request_deferred = treq.request(
method, uri, agent=self.agent, data=data, headers=headers
)
add_timeout_to_deferred(
request_deferred = timeout_deferred(
request_deferred, 60, self.hs.get_reactor(),
cancelled_to_request_timed_out_error,
)
@ -108,14 +108,14 @@ class SimpleHttpClient(object):
incoming_responses_counter.labels(method, response.code).inc()
logger.info(
"Received response to %s %s: %s",
method, redact_uri(uri.encode('ascii')), response.code
method, redact_uri(uri), response.code
)
defer.returnValue(response)
except Exception as e:
incoming_responses_counter.labels(method, "ERR").inc()
logger.info(
"Error sending request to %s %s: %s %s",
method, redact_uri(uri.encode('ascii')), type(e).__name__, e.args[0]
method, redact_uri(uri), type(e).__name__, e.args[0]
)
raise
@ -348,7 +348,8 @@ class SimpleHttpClient(object):
resp_headers = dict(response.headers.getAllRawHeaders())
if 'Content-Length' in resp_headers and resp_headers['Content-Length'] > max_size:
if (b'Content-Length' in resp_headers and
int(resp_headers[b'Content-Length']) > max_size):
logger.warn("Requested URL is too large > %r bytes" % (self.max_size,))
raise SynapseError(
502,
@ -381,7 +382,12 @@ class SimpleHttpClient(object):
)
defer.returnValue(
(length, resp_headers, response.request.absoluteURI, response.code),
(
length,
resp_headers,
response.request.absoluteURI.decode('ascii'),
response.code,
),
)
@ -466,9 +472,9 @@ class SpiderEndpointFactory(object):
def endpointForURI(self, uri):
logger.info("Getting endpoint for %s", uri.toBytes())
if uri.scheme == "http":
if uri.scheme == b"http":
endpoint_factory = HostnameEndpoint
elif uri.scheme == "https":
elif uri.scheme == b"https":
tlsCreator = self.policyForHTTPS.creatorForNetloc(uri.host, uri.port)
def endpoint_factory(reactor, host, port, **kw):

View file

@ -108,7 +108,7 @@ def matrix_federation_endpoint(reactor, destination, tls_client_options_factory=
Args:
reactor: Twisted reactor.
destination (bytes): The name of the server to connect to.
destination (unicode): The name of the server to connect to.
tls_client_options_factory
(synapse.crypto.context_factory.ClientTLSOptionsFactory):
Factory which generates TLS options for client connections.
@ -126,10 +126,17 @@ def matrix_federation_endpoint(reactor, destination, tls_client_options_factory=
transport_endpoint = HostnameEndpoint
default_port = 8008
else:
# the SNI string should be the same as the Host header, minus the port.
# as per https://github.com/matrix-org/synapse/issues/2525#issuecomment-336896777,
# the Host header and SNI should therefore be the server_name of the remote
# server.
tls_options = tls_client_options_factory.get_options(domain)
def transport_endpoint(reactor, host, port, timeout):
return wrapClientTLS(
tls_client_options_factory.get_options(host),
HostnameEndpoint(reactor, host, port, timeout=timeout))
tls_options,
HostnameEndpoint(reactor, host, port, timeout=timeout),
)
default_port = 8448
if port is None:

View file

@ -17,19 +17,22 @@ import cgi
import logging
import random
import sys
from io import BytesIO
from six import PY3, string_types
from six.moves import urllib
import attr
import treq
from canonicaljson import encode_canonical_json
from prometheus_client import Counter
from signedjson.sign import sign_json
from twisted.internet import defer, protocol, reactor
from twisted.internet import defer, protocol
from twisted.internet.error import DNSLookupError
from twisted.internet.task import _EPSILON, Cooperator
from twisted.web._newclient import ResponseDone
from twisted.web.client import Agent, HTTPConnectionPool
from twisted.web.client import Agent, FileBodyProducer, HTTPConnectionPool
from twisted.web.http_headers import Headers
import synapse.metrics
@ -40,14 +43,12 @@ from synapse.api.errors import (
HttpResponseException,
SynapseError,
)
from synapse.http import cancelled_to_request_timed_out_error
from synapse.http.endpoint import matrix_federation_endpoint
from synapse.util import logcontext
from synapse.util.async_helpers import add_timeout_to_deferred
from synapse.util.async_helpers import timeout_deferred
from synapse.util.logcontext import make_deferred_yieldable
from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
outbound_logger = logging.getLogger("synapse.http.outbound")
outgoing_requests_counter = Counter("synapse_http_matrixfederationclient_requests",
"", ["method"])
@ -66,17 +67,111 @@ else:
class MatrixFederationEndpointFactory(object):
def __init__(self, hs):
self.reactor = hs.get_reactor()
self.tls_client_options_factory = hs.tls_client_options_factory
def endpointForURI(self, uri):
destination = uri.netloc.decode('ascii')
return matrix_federation_endpoint(
reactor, destination, timeout=10,
self.reactor, destination, timeout=10,
tls_client_options_factory=self.tls_client_options_factory
)
_next_id = 1
@attr.s
class MatrixFederationRequest(object):
method = attr.ib()
"""HTTP method
:type: str
"""
path = attr.ib()
"""HTTP path
:type: str
"""
destination = attr.ib()
"""The remote server to send the HTTP request to.
:type: str"""
json = attr.ib(default=None)
"""JSON to send in the body.
:type: dict|None
"""
json_callback = attr.ib(default=None)
"""A callback to generate the JSON.
:type: func|None
"""
query = attr.ib(default=None)
"""Query arguments.
:type: dict|None
"""
txn_id = attr.ib(default=None)
"""Unique ID for this request (for logging)
:type: str|None
"""
def __attrs_post_init__(self):
global _next_id
self.txn_id = "%s-O-%s" % (self.method, _next_id)
_next_id = (_next_id + 1) % (MAXINT - 1)
def get_json(self):
if self.json_callback:
return self.json_callback()
return self.json
@defer.inlineCallbacks
def _handle_json_response(reactor, timeout_sec, request, response):
"""
Reads the JSON body of a response, with a timeout
Args:
reactor (IReactor): twisted reactor, for the timeout
timeout_sec (float): number of seconds to wait for response to complete
request (MatrixFederationRequest): the request that triggered the response
response (IResponse): response to the request
Returns:
dict: parsed JSON response
"""
try:
check_content_type_is_json(response.headers)
d = treq.json_content(response)
d = timeout_deferred(
d,
timeout=timeout_sec,
reactor=reactor,
)
body = yield make_deferred_yieldable(d)
except Exception as e:
logger.warn(
"{%s} [%s] Error reading response: %s",
request.txn_id,
request.destination,
e,
)
raise
logger.info(
"{%s} [%s] Completed: %d %s",
request.txn_id,
request.destination,
response.code,
response.phrase.decode('ascii', errors='replace'),
)
defer.returnValue(body)
class MatrixFederationHttpClient(object):
"""HTTP client used to talk to other homeservers over the federation
protocol. Send client certificates and signs requests.
@ -90,7 +185,9 @@ class MatrixFederationHttpClient(object):
self.hs = hs
self.signing_key = hs.config.signing_key[0]
self.server_name = hs.hostname
reactor = hs.get_reactor()
pool = HTTPConnectionPool(reactor)
pool.retryAutomatically = False
pool.maxPersistentPerHost = 5
pool.cachedConnectionTimeout = 2 * 60
self.agent = Agent.usingEndpointFactory(
@ -99,33 +196,35 @@ class MatrixFederationHttpClient(object):
self.clock = hs.get_clock()
self._store = hs.get_datastore()
self.version_string = hs.version_string.encode('ascii')
self._next_id = 1
self.default_timeout = 60
def _create_url(self, destination, path_bytes, param_bytes, query_bytes):
return urllib.parse.urlunparse(
(b"matrix", destination, path_bytes, param_bytes, query_bytes, b"")
)
def schedule(x):
reactor.callLater(_EPSILON, x)
self._cooperator = Cooperator(scheduler=schedule)
@defer.inlineCallbacks
def _request(self, destination, method, path,
json=None, json_callback=None,
param_bytes=b"",
query=None, retry_on_dns_fail=True,
timeout=None, long_retries=False,
ignore_backoff=False,
backoff_on_404=False):
def _send_request(
self,
request,
retry_on_dns_fail=True,
timeout=None,
long_retries=False,
ignore_backoff=False,
backoff_on_404=False
):
"""
Creates and sends a request to the given server.
Sends a request to the given server.
Args:
destination (str): The remote server to send the HTTP request to.
method (str): HTTP method
path (str): The HTTP path
json (dict or None): JSON to send in the body.
json_callback (func or None): A callback to generate the JSON.
query (dict or None): Query arguments.
request (MatrixFederationRequest): details of request to be sent
timeout (int|None): number of milliseconds to wait for the response headers
(including connecting to the server). 60s by default.
ignore_backoff (bool): true to ignore the historical backoff data
and try the request anyway.
backoff_on_404 (bool): Back off if we get a 404
Returns:
@ -143,45 +242,39 @@ class MatrixFederationHttpClient(object):
(May also fail with plenty of other Exceptions for things like DNS
failures, connection failures, SSL failures.)
"""
if timeout:
_sec_timeout = timeout / 1000
else:
_sec_timeout = self.default_timeout
if (
self.hs.config.federation_domain_whitelist is not None and
destination not in self.hs.config.federation_domain_whitelist
request.destination not in self.hs.config.federation_domain_whitelist
):
raise FederationDeniedError(destination)
raise FederationDeniedError(request.destination)
limiter = yield synapse.util.retryutils.get_retry_limiter(
destination,
request.destination,
self.clock,
self._store,
backoff_on_404=backoff_on_404,
ignore_backoff=ignore_backoff,
)
headers_dict = {}
path_bytes = path.encode("ascii")
if query:
query_bytes = encode_query_args(query)
method = request.method
destination = request.destination
path_bytes = request.path.encode("ascii")
if request.query:
query_bytes = encode_query_args(request.query)
else:
query_bytes = b""
headers_dict = {
"User-Agent": [self.version_string],
"Host": [destination],
"Host": [request.destination],
}
with limiter:
url = self._create_url(
destination.encode("ascii"), path_bytes, param_bytes, query_bytes
).decode('ascii')
txn_id = "%s-O-%s" % (method, self._next_id)
self._next_id = (self._next_id + 1) % (MAXINT - 1)
outbound_logger.info(
"{%s} [%s] Sending request: %s %s",
txn_id, destination, method, url
)
# XXX: Would be much nicer to retry only at the transaction-layer
# (once we have reliable transactions in place)
if long_retries:
@ -189,98 +282,119 @@ class MatrixFederationHttpClient(object):
else:
retries_left = MAX_SHORT_RETRIES
http_url = urllib.parse.urlunparse(
(b"", b"", path_bytes, param_bytes, query_bytes, b"")
).decode('ascii')
url = urllib.parse.urlunparse((
b"matrix", destination.encode("ascii"),
path_bytes, None, query_bytes, b"",
)).decode('ascii')
log_result = None
try:
while True:
try:
if json_callback:
json = json_callback()
http_url = urllib.parse.urlunparse((
b"", b"",
path_bytes, None, query_bytes, b"",
)).decode('ascii')
if json:
data = encode_canonical_json(json)
headers_dict["Content-Type"] = ["application/json"]
self.sign_request(
destination, method, http_url, headers_dict, json
)
else:
data = None
self.sign_request(destination, method, http_url, headers_dict)
request_deferred = treq.request(
method,
url,
headers=Headers(headers_dict),
data=data,
agent=self.agent,
while True:
try:
json = request.get_json()
if json:
data = encode_canonical_json(json)
headers_dict["Content-Type"] = ["application/json"]
self.sign_request(
destination, method, http_url, headers_dict, json
)
add_timeout_to_deferred(
request_deferred,
timeout / 1000. if timeout else 60,
self.hs.get_reactor(),
cancelled_to_request_timed_out_error,
else:
data = None
self.sign_request(destination, method, http_url, headers_dict)
logger.info(
"{%s} [%s] Sending request: %s %s",
request.txn_id, destination, method, url
)
if data:
producer = FileBodyProducer(
BytesIO(data),
cooperator=self._cooperator
)
else:
producer = None
request_deferred = treq.request(
method,
url,
headers=Headers(headers_dict),
data=producer,
agent=self.agent,
reactor=self.hs.get_reactor(),
unbuffered=True
)
request_deferred = timeout_deferred(
request_deferred,
timeout=_sec_timeout,
reactor=self.hs.get_reactor(),
)
with Measure(self.clock, "outbound_request"):
response = yield make_deferred_yieldable(
request_deferred,
)
log_result = "%d %s" % (response.code, response.phrase,)
break
except Exception as e:
if not retry_on_dns_fail and isinstance(e, DNSLookupError):
logger.warn(
"DNS Lookup failed to %s with %s",
destination,
e
)
log_result = "DNS Lookup failed to %s with %s" % (
destination, e
)
raise
break
except Exception as e:
logger.warn(
"{%s} [%s] Request failed: %s %s: %s",
request.txn_id,
destination,
method,
url,
_flatten_response_never_received(e),
)
logger.warn(
"{%s} Sending request failed to %s: %s %s: %s",
txn_id,
if not retry_on_dns_fail and isinstance(e, DNSLookupError):
raise
if retries_left and not timeout:
if long_retries:
delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
delay = min(delay, 60)
delay *= random.uniform(0.8, 1.4)
else:
delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
delay = min(delay, 2)
delay *= random.uniform(0.8, 1.4)
logger.debug(
"{%s} [%s] Waiting %ss before re-sending...",
request.txn_id,
destination,
method,
url,
_flatten_response_never_received(e),
delay,
)
log_result = _flatten_response_never_received(e)
yield self.clock.sleep(delay)
retries_left -= 1
else:
raise
if retries_left and not timeout:
if long_retries:
delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
delay = min(delay, 60)
delay *= random.uniform(0.8, 1.4)
else:
delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
delay = min(delay, 2)
delay *= random.uniform(0.8, 1.4)
yield self.clock.sleep(delay)
retries_left -= 1
else:
raise
finally:
outbound_logger.info(
"{%s} [%s] Result: %s",
txn_id,
destination,
log_result,
)
logger.info(
"{%s} [%s] Got response headers: %d %s",
request.txn_id,
destination,
response.code,
response.phrase.decode('ascii', errors='replace'),
)
if 200 <= response.code < 300:
pass
else:
# :'(
# Update transactions table?
with logcontext.PreserveLoggingContext():
body = yield treq.content(response)
d = treq.content(response)
d = timeout_deferred(
d,
timeout=_sec_timeout,
reactor=self.hs.get_reactor(),
)
body = yield make_deferred_yieldable(d)
raise HttpResponseException(
response.code, response.phrase, body
)
@ -374,27 +488,26 @@ class MatrixFederationHttpClient(object):
is not on our federation whitelist
"""
if not json_data_callback:
json_data_callback = lambda: data
response = yield self._request(
destination,
"PUT",
path,
json_callback=json_data_callback,
request = MatrixFederationRequest(
method="PUT",
destination=destination,
path=path,
query=args,
json_callback=json_data_callback,
json=data,
)
response = yield self._send_request(
request,
long_retries=long_retries,
timeout=timeout,
ignore_backoff=ignore_backoff,
backoff_on_404=backoff_on_404,
)
if 200 <= response.code < 300:
# We need to update the transactions table to say it was sent?
check_content_type_is_json(response.headers)
with logcontext.PreserveLoggingContext():
body = yield treq.json_content(response)
body = yield _handle_json_response(
self.hs.get_reactor(), self.default_timeout, request, response,
)
defer.returnValue(body)
@defer.inlineCallbacks
@ -428,24 +541,30 @@ class MatrixFederationHttpClient(object):
Fails with ``FederationDeniedError`` if this destination
is not on our federation whitelist
"""
response = yield self._request(
destination,
"POST",
path,
request = MatrixFederationRequest(
method="POST",
destination=destination,
path=path,
query=args,
json=data,
)
response = yield self._send_request(
request,
long_retries=long_retries,
timeout=timeout,
ignore_backoff=ignore_backoff,
)
if 200 <= response.code < 300:
# We need to update the transactions table to say it was sent?
check_content_type_is_json(response.headers)
with logcontext.PreserveLoggingContext():
body = yield treq.json_content(response)
if timeout:
_sec_timeout = timeout / 1000
else:
_sec_timeout = self.default_timeout
body = yield _handle_json_response(
self.hs.get_reactor(), _sec_timeout, request, response,
)
defer.returnValue(body)
@defer.inlineCallbacks
@ -481,23 +600,23 @@ class MatrixFederationHttpClient(object):
logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail)
response = yield self._request(
destination,
"GET",
path,
request = MatrixFederationRequest(
method="GET",
destination=destination,
path=path,
query=args,
)
response = yield self._send_request(
request,
retry_on_dns_fail=retry_on_dns_fail,
timeout=timeout,
ignore_backoff=ignore_backoff,
)
if 200 <= response.code < 300:
# We need to update the transactions table to say it was sent?
check_content_type_is_json(response.headers)
with logcontext.PreserveLoggingContext():
body = yield treq.json_content(response)
body = yield _handle_json_response(
self.hs.get_reactor(), self.default_timeout, request, response,
)
defer.returnValue(body)
@defer.inlineCallbacks
@ -528,23 +647,23 @@ class MatrixFederationHttpClient(object):
Fails with ``FederationDeniedError`` if this destination
is not on our federation whitelist
"""
response = yield self._request(
destination,
"DELETE",
path,
request = MatrixFederationRequest(
method="DELETE",
destination=destination,
path=path,
query=args,
)
response = yield self._send_request(
request,
long_retries=long_retries,
timeout=timeout,
ignore_backoff=ignore_backoff,
)
if 200 <= response.code < 300:
# We need to update the transactions table to say it was sent?
check_content_type_is_json(response.headers)
with logcontext.PreserveLoggingContext():
body = yield treq.json_content(response)
body = yield _handle_json_response(
self.hs.get_reactor(), self.default_timeout, request, response,
)
defer.returnValue(body)
@defer.inlineCallbacks
@ -572,11 +691,15 @@ class MatrixFederationHttpClient(object):
Fails with ``FederationDeniedError`` if this destination
is not on our federation whitelist
"""
response = yield self._request(
destination,
"GET",
path,
request = MatrixFederationRequest(
method="GET",
destination=destination,
path=path,
query=args,
)
response = yield self._send_request(
request,
retry_on_dns_fail=retry_on_dns_fail,
ignore_backoff=ignore_backoff,
)
@ -584,14 +707,25 @@ class MatrixFederationHttpClient(object):
headers = dict(response.headers.getAllRawHeaders())
try:
with logcontext.PreserveLoggingContext():
length = yield _readBodyToFile(
response, output_stream, max_size
)
except Exception:
logger.exception("Failed to download body")
d = _readBodyToFile(response, output_stream, max_size)
d.addTimeout(self.default_timeout, self.hs.get_reactor())
length = yield make_deferred_yieldable(d)
except Exception as e:
logger.warn(
"{%s} [%s] Error reading response: %s",
request.txn_id,
request.destination,
e,
)
raise
logger.info(
"{%s} [%s] Completed: %d %s [%d bytes]",
request.txn_id,
request.destination,
response.code,
response.phrase.decode('ascii', errors='replace'),
length,
)
defer.returnValue((length, headers))

View file

@ -162,7 +162,7 @@ class RequestMetrics(object):
with _in_flight_requests_lock:
_in_flight_requests.add(self)
def stop(self, time_sec, request):
def stop(self, time_sec, response_code, sent_bytes):
with _in_flight_requests_lock:
_in_flight_requests.discard(self)
@ -179,35 +179,35 @@ class RequestMetrics(object):
)
return
response_code = str(request.code)
response_code = str(response_code)
outgoing_responses_counter.labels(request.method, response_code).inc()
outgoing_responses_counter.labels(self.method, response_code).inc()
response_count.labels(request.method, self.name, tag).inc()
response_count.labels(self.method, self.name, tag).inc()
response_timer.labels(request.method, self.name, tag, response_code).observe(
response_timer.labels(self.method, self.name, tag, response_code).observe(
time_sec - self.start
)
resource_usage = context.get_resource_usage()
response_ru_utime.labels(request.method, self.name, tag).inc(
response_ru_utime.labels(self.method, self.name, tag).inc(
resource_usage.ru_utime,
)
response_ru_stime.labels(request.method, self.name, tag).inc(
response_ru_stime.labels(self.method, self.name, tag).inc(
resource_usage.ru_stime,
)
response_db_txn_count.labels(request.method, self.name, tag).inc(
response_db_txn_count.labels(self.method, self.name, tag).inc(
resource_usage.db_txn_count
)
response_db_txn_duration.labels(request.method, self.name, tag).inc(
response_db_txn_duration.labels(self.method, self.name, tag).inc(
resource_usage.db_txn_duration_sec
)
response_db_sched_duration.labels(request.method, self.name, tag).inc(
response_db_sched_duration.labels(self.method, self.name, tag).inc(
resource_usage.db_sched_duration_sec
)
response_size.labels(request.method, self.name, tag).inc(request.sentLength)
response_size.labels(self.method, self.name, tag).inc(sent_bytes)
# We always call this at the end to ensure that we update the metrics
# regardless of whether a call to /metrics while the request was in

View file

@ -82,10 +82,13 @@ class SynapseRequest(Request):
)
def get_request_id(self):
return "%s-%i" % (self.method, self.request_seq)
return "%s-%i" % (self.method.decode('ascii'), self.request_seq)
def get_redacted_uri(self):
return redact_uri(self.uri)
uri = self.uri
if isinstance(uri, bytes):
uri = self.uri.decode('ascii')
return redact_uri(uri)
def get_user_agent(self):
return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1]
@ -116,7 +119,7 @@ class SynapseRequest(Request):
# dispatching to the handler, so that the handler
# can update the servlet name in the request
# metrics
requests_counter.labels(self.method,
requests_counter.labels(self.method.decode('ascii'),
self.request_metrics.name).inc()
@contextlib.contextmanager
@ -277,15 +280,15 @@ class SynapseRequest(Request):
int(usage.db_txn_count),
self.sentLength,
code,
self.method,
self.method.decode('ascii'),
self.get_redacted_uri(),
self.clientproto,
self.clientproto.decode('ascii', errors='replace'),
user_agent,
usage.evt_db_fetch_count,
)
try:
self.request_metrics.stop(self.finish_time, self)
self.request_metrics.stop(self.finish_time, self.code, self.sentLength)
except Exception as e:
logger.warn("Failed to stop metrics: %r", e)

View file

@ -18,8 +18,11 @@ import gc
import logging
import os
import platform
import threading
import time
import six
import attr
from prometheus_client import Counter, Gauge, Histogram
from prometheus_client.core import REGISTRY, GaugeMetricFamily
@ -68,7 +71,7 @@ class LaterGauge(object):
return
if isinstance(calls, dict):
for k, v in calls.items():
for k, v in six.iteritems(calls):
g.add_metric(k, v)
else:
g.add_metric([], calls)
@ -87,6 +90,109 @@ class LaterGauge(object):
all_gauges[self.name] = self
class InFlightGauge(object):
"""Tracks number of things (e.g. requests, Measure blocks, etc) in flight
at any given time.
Each InFlightGauge will create a metric called `<name>_total` that counts
the number of in flight blocks, as well as a metrics for each item in the
given `sub_metrics` as `<name>_<sub_metric>` which will get updated by the
callbacks.
Args:
name (str)
desc (str)
labels (list[str])
sub_metrics (list[str]): A list of sub metrics that the callbacks
will update.
"""
def __init__(self, name, desc, labels, sub_metrics):
self.name = name
self.desc = desc
self.labels = labels
self.sub_metrics = sub_metrics
# Create a class which have the sub_metrics values as attributes, which
# default to 0 on initialization. Used to pass to registered callbacks.
self._metrics_class = attr.make_class(
"_MetricsEntry",
attrs={x: attr.ib(0) for x in sub_metrics},
slots=True,
)
# Counts number of in flight blocks for a given set of label values
self._registrations = {}
# Protects access to _registrations
self._lock = threading.Lock()
self._register_with_collector()
def register(self, key, callback):
"""Registers that we've entered a new block with labels `key`.
`callback` gets called each time the metrics are collected. The same
value must also be given to `unregister`.
`callback` gets called with an object that has an attribute per
sub_metric, which should be updated with the necessary values. Note that
the metrics object is shared between all callbacks registered with the
same key.
Note that `callback` may be called on a separate thread.
"""
with self._lock:
self._registrations.setdefault(key, set()).add(callback)
def unregister(self, key, callback):
"""Registers that we've exited a block with labels `key`.
"""
with self._lock:
self._registrations.setdefault(key, set()).discard(callback)
def collect(self):
"""Called by prometheus client when it reads metrics.
Note: may be called by a separate thread.
"""
in_flight = GaugeMetricFamily(self.name + "_total", self.desc, labels=self.labels)
metrics_by_key = {}
# We copy so that we don't mutate the list while iterating
with self._lock:
keys = list(self._registrations)
for key in keys:
with self._lock:
callbacks = set(self._registrations[key])
in_flight.add_metric(key, len(callbacks))
metrics = self._metrics_class()
metrics_by_key[key] = metrics
for callback in callbacks:
callback(metrics)
yield in_flight
for name in self.sub_metrics:
gauge = GaugeMetricFamily("_".join([self.name, name]), "", labels=self.labels)
for key, metrics in six.iteritems(metrics_by_key):
gauge.add_metric(key, getattr(metrics, name))
yield gauge
def _register_with_collector(self):
if self.name in all_gauges.keys():
logger.warning("%s already registered, reregistering" % (self.name,))
REGISTRY.unregister(all_gauges.pop(self.name))
REGISTRY.register(self)
all_gauges[self.name] = self
#
# Detailed CPU metrics
#

View file

@ -25,11 +25,7 @@ from synapse.api.errors import AuthError
from synapse.handlers.presence import format_user_presence_state
from synapse.metrics import LaterGauge
from synapse.types import StreamToken
from synapse.util.async_helpers import (
DeferredTimeoutError,
ObservableDeferred,
add_timeout_to_deferred,
)
from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
from synapse.util.logcontext import PreserveLoggingContext, run_in_background
from synapse.util.logutils import log_function
from synapse.util.metrics import Measure
@ -337,7 +333,7 @@ class Notifier(object):
# Now we wait for the _NotifierUserStream to be told there
# is a new token.
listener = user_stream.new_listener(prev_token)
add_timeout_to_deferred(
listener.deferred = timeout_deferred(
listener.deferred,
(end_time - now) / 1000.,
self.hs.get_reactor(),
@ -354,7 +350,7 @@ class Notifier(object):
# Update the prev_token to the current_token since nothing
# has happened between the old prev_token and the current_token
prev_token = current_token
except DeferredTimeoutError:
except defer.TimeoutError:
break
except defer.CancelledError:
break
@ -559,15 +555,16 @@ class Notifier(object):
if end_time <= now:
break
add_timeout_to_deferred(
listener.deferred.addTimeout,
(end_time - now) / 1000.,
self.hs.get_reactor(),
listener.deferred = timeout_deferred(
listener.deferred,
timeout=(end_time - now) / 1000.,
reactor=self.hs.get_reactor(),
)
try:
with PreserveLoggingContext():
yield listener.deferred
except DeferredTimeoutError:
except defer.TimeoutError:
break
except defer.CancelledError:
break

View file

@ -15,6 +15,8 @@
# limitations under the License.
import logging
import six
from prometheus_client import Counter
from twisted.internet import defer
@ -26,6 +28,9 @@ from synapse.util.metrics import Measure
from . import push_rule_evaluator, push_tools
if six.PY3:
long = int
logger = logging.getLogger(__name__)
http_push_processed_counter = Counter("synapse_http_httppusher_http_pushes_processed", "")
@ -96,7 +101,7 @@ class HttpPusher(object):
@defer.inlineCallbacks
def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering)
self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering or 0)
yield self._process()
@defer.inlineCallbacks

View file

@ -17,10 +17,11 @@ import email.mime.multipart
import email.utils
import logging
import time
import urllib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from six.moves import urllib
import bleach
import jinja2
@ -474,7 +475,7 @@ class Mailer(object):
# XXX: make r0 once API is stable
return "%s_matrix/client/unstable/pushers/remove?%s" % (
self.hs.config.public_baseurl,
urllib.urlencode(params),
urllib.parse.urlencode(params),
)
@ -561,7 +562,7 @@ def _create_mxc_to_http_filter(config):
return "%s_matrix/media/v1/thumbnail/%s?%s%s" % (
config.public_baseurl,
serverAndMediaId,
urllib.urlencode(params),
urllib.parse.urlencode(params),
fragment or "",
)

View file

@ -13,6 +13,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import six
from synapse.storage import DataStore
from synapse.storage.end_to_end_keys import EndToEndKeyStore
from synapse.util.caches.stream_change_cache import StreamChangeCache
@ -21,6 +23,13 @@ from ._base import BaseSlavedStore
from ._slaved_id_tracker import SlavedIdTracker
def __func__(inp):
if six.PY3:
return inp
else:
return inp.__func__
class SlavedDeviceStore(BaseSlavedStore):
def __init__(self, db_conn, hs):
super(SlavedDeviceStore, self).__init__(db_conn, hs)
@ -38,14 +47,14 @@ class SlavedDeviceStore(BaseSlavedStore):
"DeviceListFederationStreamChangeCache", device_list_max,
)
get_device_stream_token = DataStore.get_device_stream_token.__func__
get_user_whose_devices_changed = DataStore.get_user_whose_devices_changed.__func__
get_devices_by_remote = DataStore.get_devices_by_remote.__func__
_get_devices_by_remote_txn = DataStore._get_devices_by_remote_txn.__func__
_get_e2e_device_keys_txn = DataStore._get_e2e_device_keys_txn.__func__
mark_as_sent_devices_by_remote = DataStore.mark_as_sent_devices_by_remote.__func__
get_device_stream_token = __func__(DataStore.get_device_stream_token)
get_user_whose_devices_changed = __func__(DataStore.get_user_whose_devices_changed)
get_devices_by_remote = __func__(DataStore.get_devices_by_remote)
_get_devices_by_remote_txn = __func__(DataStore._get_devices_by_remote_txn)
_get_e2e_device_keys_txn = __func__(DataStore._get_e2e_device_keys_txn)
mark_as_sent_devices_by_remote = __func__(DataStore.mark_as_sent_devices_by_remote)
_mark_as_sent_devices_by_remote_txn = (
DataStore._mark_as_sent_devices_by_remote_txn.__func__
__func__(DataStore._mark_as_sent_devices_by_remote_txn)
)
count_e2e_one_time_keys = EndToEndKeyStore.__dict__["count_e2e_one_time_keys"]

View file

@ -196,7 +196,7 @@ class Stream(object):
)
if len(rows) >= MAX_EVENTS_BEHIND:
raise Exception("stream %s has fallen behined" % (self.NAME))
raise Exception("stream %s has fallen behind" % (self.NAME))
else:
rows = yield self.update_function(
from_token, current_token,

View file

@ -101,7 +101,7 @@ class UserRegisterServlet(ClientV1RestServlet):
nonce = self.hs.get_secrets().token_hex(64)
self.nonces[nonce] = int(self.reactor.seconds())
return (200, {"nonce": nonce.encode('ascii')})
return (200, {"nonce": nonce})
@defer.inlineCallbacks
def on_POST(self, request):
@ -164,7 +164,7 @@ class UserRegisterServlet(ClientV1RestServlet):
key=self.hs.config.registration_shared_secret.encode(),
digestmod=hashlib.sha1,
)
want_mac.update(nonce)
want_mac.update(nonce.encode('utf8'))
want_mac.update(b"\x00")
want_mac.update(username)
want_mac.update(b"\x00")
@ -173,7 +173,10 @@ class UserRegisterServlet(ClientV1RestServlet):
want_mac.update(b"admin" if admin else b"notadmin")
want_mac = want_mac.hexdigest()
if not hmac.compare_digest(want_mac, got_mac.encode('ascii')):
if not hmac.compare_digest(
want_mac.encode('ascii'),
got_mac.encode('ascii')
):
raise SynapseError(403, "HMAC incorrect")
# Reuse the parts of RegisterRestServlet to reduce code duplication

View file

@ -45,20 +45,20 @@ class EventStreamRestServlet(ClientV1RestServlet):
is_guest = requester.is_guest
room_id = None
if is_guest:
if "room_id" not in request.args:
if b"room_id" not in request.args:
raise SynapseError(400, "Guest users must specify room_id param")
if "room_id" in request.args:
room_id = request.args["room_id"][0]
if b"room_id" in request.args:
room_id = request.args[b"room_id"][0].decode('ascii')
pagin_config = PaginationConfig.from_request(request)
timeout = EventStreamRestServlet.DEFAULT_LONGPOLL_TIME_MS
if "timeout" in request.args:
if b"timeout" in request.args:
try:
timeout = int(request.args["timeout"][0])
timeout = int(request.args[b"timeout"][0])
except ValueError:
raise SynapseError(400, "timeout must be in milliseconds.")
as_client_event = "raw" not in request.args
as_client_event = b"raw" not in request.args
chunk = yield self.event_stream_handler.get_stream(
requester.user.to_string(),

View file

@ -32,7 +32,7 @@ class InitialSyncRestServlet(ClientV1RestServlet):
@defer.inlineCallbacks
def on_GET(self, request):
requester = yield self.auth.get_user_by_req(request)
as_client_event = "raw" not in request.args
as_client_event = b"raw" not in request.args
pagination_config = PaginationConfig.from_request(request)
include_archived = parse_boolean(request, "archived", default=False)
content = yield self.initial_sync_handler.snapshot_all_rooms(

View file

@ -14,10 +14,9 @@
# limitations under the License.
import logging
import urllib
import xml.etree.ElementTree as ET
from six.moves.urllib import parse as urlparse
from six.moves import urllib
from canonicaljson import json
from saml2 import BINDING_HTTP_POST, config
@ -134,7 +133,7 @@ class LoginRestServlet(ClientV1RestServlet):
LoginRestServlet.SAML2_TYPE):
relay_state = ""
if "relay_state" in login_submission:
relay_state = "&RelayState=" + urllib.quote(
relay_state = "&RelayState=" + urllib.parse.quote(
login_submission["relay_state"])
result = {
"uri": "%s%s" % (self.idp_redirect_url, relay_state)
@ -366,7 +365,7 @@ class SAML2RestServlet(ClientV1RestServlet):
(user_id, token) = yield handler.register_saml2(username)
# Forward to the RelayState callback along with ava
if 'RelayState' in request.args:
request.redirect(urllib.unquote(
request.redirect(urllib.parse.unquote(
request.args['RelayState'][0]) +
'?status=authenticated&access_token=' +
token + '&user_id=' + user_id + '&ava=' +
@ -377,7 +376,7 @@ class SAML2RestServlet(ClientV1RestServlet):
"user_id": user_id, "token": token,
"ava": saml2_auth.ava}))
elif 'RelayState' in request.args:
request.redirect(urllib.unquote(
request.redirect(urllib.parse.unquote(
request.args['RelayState'][0]) +
'?status=not_authenticated')
finish_request(request)
@ -390,21 +389,22 @@ class CasRedirectServlet(ClientV1RestServlet):
def __init__(self, hs):
super(CasRedirectServlet, self).__init__(hs)
self.cas_server_url = hs.config.cas_server_url
self.cas_service_url = hs.config.cas_service_url
self.cas_server_url = hs.config.cas_server_url.encode('ascii')
self.cas_service_url = hs.config.cas_service_url.encode('ascii')
def on_GET(self, request):
args = request.args
if "redirectUrl" not in args:
if b"redirectUrl" not in args:
return (400, "Redirect URL not specified for CAS auth")
client_redirect_url_param = urllib.urlencode({
"redirectUrl": args["redirectUrl"][0]
})
hs_redirect_url = self.cas_service_url + "/_matrix/client/api/v1/login/cas/ticket"
service_param = urllib.urlencode({
"service": "%s?%s" % (hs_redirect_url, client_redirect_url_param)
})
request.redirect("%s/login?%s" % (self.cas_server_url, service_param))
client_redirect_url_param = urllib.parse.urlencode({
b"redirectUrl": args[b"redirectUrl"][0]
}).encode('ascii')
hs_redirect_url = (self.cas_service_url +
b"/_matrix/client/api/v1/login/cas/ticket")
service_param = urllib.parse.urlencode({
b"service": b"%s?%s" % (hs_redirect_url, client_redirect_url_param)
}).encode('ascii')
request.redirect(b"%s/login?%s" % (self.cas_server_url, service_param))
finish_request(request)
@ -422,11 +422,11 @@ class CasTicketServlet(ClientV1RestServlet):
@defer.inlineCallbacks
def on_GET(self, request):
client_redirect_url = request.args["redirectUrl"][0]
client_redirect_url = request.args[b"redirectUrl"][0]
http_client = self.hs.get_simple_http_client()
uri = self.cas_server_url + "/proxyValidate"
args = {
"ticket": request.args["ticket"],
"ticket": request.args[b"ticket"][0].decode('ascii'),
"service": self.cas_service_url
}
try:
@ -471,11 +471,11 @@ class CasTicketServlet(ClientV1RestServlet):
finish_request(request)
def add_login_token_to_redirect_url(self, url, token):
url_parts = list(urlparse.urlparse(url))
query = dict(urlparse.parse_qsl(url_parts[4]))
url_parts = list(urllib.parse.urlparse(url))
query = dict(urllib.parse.parse_qsl(url_parts[4]))
query.update({"loginToken": token})
url_parts[4] = urllib.urlencode(query)
return urlparse.urlunparse(url_parts)
url_parts[4] = urllib.parse.urlencode(query).encode('ascii')
return urllib.parse.urlunparse(url_parts)
def parse_cas_response(self, cas_response_body):
user = None

View file

@ -46,7 +46,7 @@ class PushRuleRestServlet(ClientV1RestServlet):
try:
priority_class = _priority_class_from_spec(spec)
except InvalidRuleException as e:
raise SynapseError(400, e.message)
raise SynapseError(400, str(e))
requester = yield self.auth.get_user_by_req(request)
@ -73,7 +73,7 @@ class PushRuleRestServlet(ClientV1RestServlet):
content,
)
except InvalidRuleException as e:
raise SynapseError(400, e.message)
raise SynapseError(400, str(e))
before = parse_string(request, "before")
if before:
@ -95,9 +95,9 @@ class PushRuleRestServlet(ClientV1RestServlet):
)
self.notify_user(user_id)
except InconsistentRuleException as e:
raise SynapseError(400, e.message)
raise SynapseError(400, str(e))
except RuleNotFoundException as e:
raise SynapseError(400, e.message)
raise SynapseError(400, str(e))
defer.returnValue((200, {}))
@ -142,10 +142,10 @@ class PushRuleRestServlet(ClientV1RestServlet):
PushRuleRestServlet.SLIGHTLY_PEDANTIC_TRAILING_SLASH_ERROR
)
if path[0] == '':
if path[0] == b'':
defer.returnValue((200, rules))
elif path[0] == 'global':
path = path[1:]
elif path[0] == b'global':
path = [x.decode('ascii') for x in path[1:]]
result = _filter_ruleset_with_path(rules['global'], path)
defer.returnValue((200, result))
else:
@ -192,10 +192,10 @@ class PushRuleRestServlet(ClientV1RestServlet):
def _rule_spec_from_path(path):
if len(path) < 2:
raise UnrecognizedRequestError()
if path[0] != 'pushrules':
if path[0] != b'pushrules':
raise UnrecognizedRequestError()
scope = path[1]
scope = path[1].decode('ascii')
path = path[2:]
if scope != 'global':
raise UnrecognizedRequestError()
@ -203,13 +203,13 @@ def _rule_spec_from_path(path):
if len(path) == 0:
raise UnrecognizedRequestError()
template = path[0]
template = path[0].decode('ascii')
path = path[1:]
if len(path) == 0 or len(path[0]) == 0:
raise UnrecognizedRequestError()
rule_id = path[0]
rule_id = path[0].decode('ascii')
spec = {
'scope': scope,
@ -220,7 +220,7 @@ def _rule_spec_from_path(path):
path = path[1:]
if len(path) > 0 and len(path[0]) > 0:
spec['attr'] = path[0]
spec['attr'] = path[0].decode('ascii')
return spec

View file

@ -59,7 +59,7 @@ class PushersRestServlet(ClientV1RestServlet):
]
for p in pushers:
for k, v in p.items():
for k, v in list(p.items()):
if k not in allowed_keys:
del p[k]
@ -126,7 +126,7 @@ class PushersSetRestServlet(ClientV1RestServlet):
profile_tag=content.get('profile_tag', ""),
)
except PusherConfigException as pce:
raise SynapseError(400, "Config Error: " + pce.message,
raise SynapseError(400, "Config Error: " + str(pce),
errcode=Codes.MISSING_PARAM)
self.notifier.on_new_replication_data()

View file

@ -207,7 +207,7 @@ class RoomSendEventRestServlet(ClientV1RestServlet):
"sender": requester.user.to_string(),
}
if 'ts' in request.args and requester.app_service:
if b'ts' in request.args and requester.app_service:
event_dict['origin_server_ts'] = parse_integer(request, "ts", 0)
event = yield self.event_creation_hander.create_and_send_nonmember_event(
@ -255,7 +255,9 @@ class JoinRoomAliasServlet(ClientV1RestServlet):
if RoomID.is_valid(room_identifier):
room_id = room_identifier
try:
remote_room_hosts = request.args["server_name"]
remote_room_hosts = [
x.decode('ascii') for x in request.args[b"server_name"]
]
except Exception:
remote_room_hosts = None
elif RoomAlias.is_valid(room_identifier):
@ -461,10 +463,10 @@ class RoomMessageListRestServlet(ClientV1RestServlet):
pagination_config = PaginationConfig.from_request(
request, default_limit=10,
)
as_client_event = "raw" not in request.args
filter_bytes = parse_string(request, "filter")
as_client_event = b"raw" not in request.args
filter_bytes = parse_string(request, b"filter", encoding=None)
if filter_bytes:
filter_json = urlparse.unquote(filter_bytes).decode("UTF-8")
filter_json = urlparse.unquote(filter_bytes.decode("UTF-8"))
event_filter = Filter(json.loads(filter_json))
else:
event_filter = None
@ -560,7 +562,7 @@ class RoomEventContextServlet(ClientV1RestServlet):
# picking the API shape for symmetry with /messages
filter_bytes = parse_string(request, "filter")
if filter_bytes:
filter_json = urlparse.unquote(filter_bytes).decode("UTF-8")
filter_json = urlparse.unquote(filter_bytes)
event_filter = Filter(json.loads(filter_json))
else:
event_filter = None

View file

@ -42,7 +42,11 @@ class VoipRestServlet(ClientV1RestServlet):
expiry = (self.hs.get_clock().time_msec() + userLifetime) / 1000
username = "%d:%s" % (expiry, requester.user.to_string())
mac = hmac.new(turnSecret, msg=username, digestmod=hashlib.sha1)
mac = hmac.new(
turnSecret.encode(),
msg=username.encode(),
digestmod=hashlib.sha1
)
# We need to use standard padded base64 encoding here
# encode_base64 because we need to add the standard padding to get the
# same result as the TURN server.

View file

@ -89,7 +89,7 @@ class SyncRestServlet(RestServlet):
@defer.inlineCallbacks
def on_GET(self, request):
if "from" in request.args:
if b"from" in request.args:
# /events used to use 'from', but /sync uses 'since'.
# Lets be helpful and whine if we see a 'from'.
raise SynapseError(

View file

@ -79,7 +79,7 @@ class ThirdPartyUserServlet(RestServlet):
yield self.auth.get_user_by_req(request, allow_guest=True)
fields = request.args
fields.pop("access_token", None)
fields.pop(b"access_token", None)
results = yield self.appservice_handler.query_3pe(
ThirdPartyEntityKind.USER, protocol, fields
@ -102,7 +102,7 @@ class ThirdPartyLocationServlet(RestServlet):
yield self.auth.get_user_by_req(request, allow_guest=True)
fields = request.args
fields.pop("access_token", None)
fields.pop(b"access_token", None)
results = yield self.appservice_handler.query_3pe(
ThirdPartyEntityKind.LOCATION, protocol, fields

View file

@ -88,5 +88,5 @@ class LocalKey(Resource):
)
def getChild(self, name, request):
if name == '':
if name == b'':
return self

View file

@ -22,5 +22,5 @@ from .remote_key_resource import RemoteKey
class KeyApiV2Resource(Resource):
def __init__(self, hs):
Resource.__init__(self)
self.putChild("server", LocalKey(hs))
self.putChild("query", RemoteKey(hs))
self.putChild(b"server", LocalKey(hs))
self.putChild(b"query", RemoteKey(hs))

View file

@ -103,7 +103,7 @@ class RemoteKey(Resource):
def async_render_GET(self, request):
if len(request.postpath) == 1:
server, = request.postpath
query = {server: {}}
query = {server.decode('ascii'): {}}
elif len(request.postpath) == 2:
server, key_id = request.postpath
minimum_valid_until_ts = parse_integer(
@ -112,11 +112,12 @@ class RemoteKey(Resource):
arguments = {}
if minimum_valid_until_ts is not None:
arguments["minimum_valid_until_ts"] = minimum_valid_until_ts
query = {server: {key_id: arguments}}
query = {server.decode('ascii'): {key_id.decode('ascii'): arguments}}
else:
raise SynapseError(
404, "Not found %r" % request.postpath, Codes.NOT_FOUND
)
yield self.query_keys(request, query, query_remote_on_cache_miss=True)
def render_POST(self, request):
@ -135,6 +136,7 @@ class RemoteKey(Resource):
@defer.inlineCallbacks
def query_keys(self, request, query, query_remote_on_cache_miss=False):
logger.info("Handling query for keys %r", query)
store_queries = []
for server_name, key_ids in query.items():
if (

View file

@ -56,7 +56,7 @@ class ContentRepoResource(resource.Resource):
# servers.
# TODO: A little crude here, we could do this better.
filename = request.path.split('/')[-1]
filename = request.path.decode('ascii').split('/')[-1]
# be paranoid
filename = re.sub("[^0-9A-z.-_]", "", filename)
@ -78,7 +78,7 @@ class ContentRepoResource(resource.Resource):
# select private. don't bother setting Expires as all our matrix
# clients are smart enough to be happy with Cache-Control (right?)
request.setHeader(
"Cache-Control", "public,max-age=86400,s-maxage=86400"
b"Cache-Control", b"public,max-age=86400,s-maxage=86400"
)
d = FileSender().beginFileTransfer(f, request)

View file

@ -15,9 +15,8 @@
import logging
import os
import urllib
from six.moves.urllib import parse as urlparse
from six.moves import urllib
from twisted.internet import defer
from twisted.protocols.basic import FileSender
@ -35,10 +34,15 @@ def parse_media_id(request):
# This allows users to append e.g. /test.png to the URL. Useful for
# clients that parse the URL to see content type.
server_name, media_id = request.postpath[:2]
if isinstance(server_name, bytes):
server_name = server_name.decode('utf-8')
media_id = media_id.decode('utf8')
file_name = None
if len(request.postpath) > 2:
try:
file_name = urlparse.unquote(request.postpath[-1]).decode("utf-8")
file_name = urllib.parse.unquote(request.postpath[-1].decode("utf-8"))
except UnicodeDecodeError:
pass
return server_name, media_id, file_name
@ -93,22 +97,18 @@ def add_file_headers(request, media_type, file_size, upload_name):
file_size (int): Size in bytes of the media, if known.
upload_name (str): The name of the requested file, if any.
"""
def _quote(x):
return urllib.parse.quote(x.encode("utf-8"))
request.setHeader(b"Content-Type", media_type.encode("UTF-8"))
if upload_name:
if is_ascii(upload_name):
request.setHeader(
b"Content-Disposition",
b"inline; filename=%s" % (
urllib.quote(upload_name.encode("utf-8")),
),
)
disposition = ("inline; filename=%s" % (_quote(upload_name),)).encode("ascii")
else:
request.setHeader(
b"Content-Disposition",
b"inline; filename*=utf-8''%s" % (
urllib.quote(upload_name.encode("utf-8")),
),
)
disposition = (
"inline; filename*=utf-8''%s" % (_quote(upload_name),)).encode("ascii")
request.setHeader(b"Content-Disposition", disposition)
# cache for at least a day.
# XXX: we might want to turn this off for data we don't want to

View file

@ -47,12 +47,12 @@ class DownloadResource(Resource):
def _async_render_GET(self, request):
set_cors_headers(request)
request.setHeader(
"Content-Security-Policy",
"default-src 'none';"
" script-src 'none';"
" plugin-types application/pdf;"
" style-src 'unsafe-inline';"
" object-src 'self';"
b"Content-Security-Policy",
b"default-src 'none';"
b" script-src 'none';"
b" plugin-types application/pdf;"
b" style-src 'unsafe-inline';"
b" object-src 'self';"
)
server_name, media_id, name = parse_media_id(request)
if server_name == self.server_name:

View file

@ -20,7 +20,7 @@ import logging
import os
import shutil
from six import iteritems
from six import PY3, iteritems
from six.moves.urllib import parse as urlparse
import twisted.internet.error
@ -397,13 +397,13 @@ class MediaRepository(object):
yield finish()
media_type = headers["Content-Type"][0]
media_type = headers[b"Content-Type"][0].decode('ascii')
time_now_ms = self.clock.time_msec()
content_disposition = headers.get("Content-Disposition", None)
content_disposition = headers.get(b"Content-Disposition", None)
if content_disposition:
_, params = cgi.parse_header(content_disposition[0],)
_, params = cgi.parse_header(content_disposition[0].decode('ascii'),)
upload_name = None
# First check if there is a valid UTF-8 filename
@ -419,9 +419,13 @@ class MediaRepository(object):
upload_name = upload_name_ascii
if upload_name:
upload_name = urlparse.unquote(upload_name)
if PY3:
upload_name = urlparse.unquote(upload_name)
else:
upload_name = urlparse.unquote(upload_name.encode('ascii'))
try:
upload_name = upload_name.decode("utf-8")
if isinstance(upload_name, bytes):
upload_name = upload_name.decode("utf-8")
except UnicodeDecodeError:
upload_name = None
else:
@ -755,14 +759,15 @@ class MediaRepositoryResource(Resource):
Resource.__init__(self)
media_repo = hs.get_media_repository()
self.putChild("upload", UploadResource(hs, media_repo))
self.putChild("download", DownloadResource(hs, media_repo))
self.putChild("thumbnail", ThumbnailResource(
self.putChild(b"upload", UploadResource(hs, media_repo))
self.putChild(b"download", DownloadResource(hs, media_repo))
self.putChild(b"thumbnail", ThumbnailResource(
hs, media_repo, media_repo.media_storage,
))
self.putChild("identicon", IdenticonResource())
self.putChild(b"identicon", IdenticonResource())
if hs.config.url_preview_enabled:
self.putChild("preview_url", PreviewUrlResource(
self.putChild(b"preview_url", PreviewUrlResource(
hs, media_repo, media_repo.media_storage,
))
self.putChild("config", MediaConfigResource(hs))
self.putChild(b"config", MediaConfigResource(hs))

View file

@ -261,7 +261,7 @@ class PreviewUrlResource(Resource):
logger.debug("Calculated OG for %s as %s" % (url, og))
jsonog = json.dumps(og)
jsonog = json.dumps(og).encode('utf8')
# store OG in history-aware DB cache
yield self.store.store_url_cache(
@ -301,20 +301,20 @@ class PreviewUrlResource(Resource):
logger.warn("Error downloading %s: %r", url, e)
raise SynapseError(
500, "Failed to download content: %s" % (
traceback.format_exception_only(sys.exc_type, e),
traceback.format_exception_only(sys.exc_info()[0], e),
),
Codes.UNKNOWN,
)
yield finish()
try:
if "Content-Type" in headers:
media_type = headers["Content-Type"][0]
if b"Content-Type" in headers:
media_type = headers[b"Content-Type"][0].decode('ascii')
else:
media_type = "application/octet-stream"
time_now_ms = self.clock.time_msec()
content_disposition = headers.get("Content-Disposition", None)
content_disposition = headers.get(b"Content-Disposition", None)
if content_disposition:
_, params = cgi.parse_header(content_disposition[0],)
download_name = None

View file

@ -75,7 +75,6 @@ class DirectoryWorkerStore(SQLBaseStore):
},
retcol="creator",
desc="get_room_alias_creator",
allow_none=True
)
@cached(max_entries=5000)

View file

@ -929,6 +929,10 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
txn, self.get_users_in_room, (room_id,)
)
self._invalidate_cache_and_stream(
txn, self.get_room_summary, (room_id,)
)
self._invalidate_cache_and_stream(
txn, self.get_current_state_ids, (room_id,)
)
@ -1886,20 +1890,6 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
")"
)
# create an index on should_delete because later we'll be looking for
# the should_delete / shouldn't_delete subsets
txn.execute(
"CREATE INDEX events_to_purge_should_delete"
" ON events_to_purge(should_delete)",
)
# We do joins against events_to_purge for e.g. calculating state
# groups to purge, etc., so lets make an index.
txn.execute(
"CREATE INDEX events_to_purge_id"
" ON events_to_purge(event_id)",
)
# First ensure that we're not about to delete all the forward extremeties
txn.execute(
"SELECT e.event_id, e.depth FROM events as e "
@ -1926,19 +1916,45 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
should_delete_params = ()
if not delete_local_events:
should_delete_expr += " AND event_id NOT LIKE ?"
should_delete_params += ("%:" + self.hs.hostname, )
# We include the parameter twice since we use the expression twice
should_delete_params += (
"%:" + self.hs.hostname,
"%:" + self.hs.hostname,
)
should_delete_params += (room_id, token.topological)
# Note that we insert events that are outliers and aren't going to be
# deleted, as nothing will happen to them.
txn.execute(
"INSERT INTO events_to_purge"
" SELECT event_id, %s"
" FROM events AS e LEFT JOIN state_events USING (event_id)"
" WHERE e.room_id = ? AND topological_ordering < ?" % (
" WHERE (NOT outlier OR (%s)) AND e.room_id = ? AND topological_ordering < ?"
% (
should_delete_expr,
should_delete_expr,
),
should_delete_params,
)
# We create the indices *after* insertion as that's a lot faster.
# create an index on should_delete because later we'll be looking for
# the should_delete / shouldn't_delete subsets
txn.execute(
"CREATE INDEX events_to_purge_should_delete"
" ON events_to_purge(should_delete)",
)
# We do joins against events_to_purge for e.g. calculating state
# groups to purge, etc., so lets make an index.
txn.execute(
"CREATE INDEX events_to_purge_id"
" ON events_to_purge(event_id)",
)
txn.execute(
"SELECT event_id, should_delete FROM events_to_purge"
)

Some files were not shown because too many files have changed in this diff Show more