Merge branch 'develop' into py3-xrange-1

This commit is contained in:
Richard van der Hoff 2018-04-30 01:02:25 +01:00 committed by GitHub
commit db75c86e84
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
42 changed files with 161 additions and 121 deletions

View file

@ -38,7 +38,7 @@ from synapse.server import HomeServer
from synapse.storage.engines import create_engine
from synapse.util.async import Linearizer
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, preserve_fn
from synapse.util.logcontext import LoggingContext, run_in_background
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
from twisted.internet import defer, reactor
@ -229,7 +229,7 @@ class FederationSenderHandler(object):
# presence, typing, etc.
if stream_name == "federation":
send_queue.process_rows_for_federation(self.federation_sender, rows)
preserve_fn(self.update_token)(token)
run_in_background(self.update_token, token)
# We also need to poke the federation sender when new events happen
elif stream_name == "events":

View file

@ -33,7 +33,7 @@ from synapse.server import HomeServer
from synapse.storage import DataStore
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, preserve_fn
from synapse.util.logcontext import LoggingContext, run_in_background
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
from twisted.internet import defer, reactor
@ -140,7 +140,7 @@ class PusherReplicationHandler(ReplicationClientHandler):
def on_rdata(self, stream_name, token, rows):
super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
preserve_fn(self.poke_pushers)(stream_name, token, rows)
run_in_background(self.poke_pushers, stream_name, token, rows)
@defer.inlineCallbacks
def poke_pushers(self, stream_name, token, rows):

View file

@ -51,7 +51,7 @@ from synapse.storage.engines import create_engine
from synapse.storage.presence import UserPresenceState
from synapse.storage.roommember import RoomMemberStore
from synapse.util.httpresourcetree import create_resource_tree
from synapse.util.logcontext import LoggingContext, preserve_fn
from synapse.util.logcontext import LoggingContext, run_in_background
from synapse.util.manhole import manhole
from synapse.util.stringutils import random_string
from synapse.util.versionstring import get_version_string
@ -327,8 +327,7 @@ class SyncReplicationHandler(ReplicationClientHandler):
def on_rdata(self, stream_name, token, rows):
super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
preserve_fn(self.process_and_notify)(stream_name, token, rows)
run_in_background(self.process_and_notify, stream_name, token, rows)
def get_streams_to_replicate(self):
args = super(SyncReplicationHandler, self).get_streams_to_replicate()

View file

@ -51,7 +51,7 @@ components.
from twisted.internet import defer
from synapse.appservice import ApplicationServiceState
from synapse.util.logcontext import preserve_fn
from synapse.util.logcontext import run_in_background
from synapse.util.metrics import Measure
import logging
@ -106,7 +106,7 @@ class _ServiceQueuer(object):
def enqueue(self, service, event):
# if this service isn't being sent something
self.queued_events.setdefault(service.id, []).append(event)
preserve_fn(self._send_request)(service)
run_in_background(self._send_request, service)
@defer.inlineCallbacks
def _send_request(self, service):
@ -152,10 +152,10 @@ class _TransactionController(object):
if sent:
yield txn.complete(self.store)
else:
preserve_fn(self._start_recoverer)(service)
except Exception as e:
logger.exception(e)
preserve_fn(self._start_recoverer)(service)
run_in_background(self._start_recoverer, service)
except Exception:
logger.exception("Error creating appservice transaction")
run_in_background(self._start_recoverer, service)
@defer.inlineCallbacks
def on_recovered(self, recoverer):

View file

@ -281,15 +281,15 @@ class Config(object):
)
if not cls.path_exists(config_dir_path):
os.makedirs(config_dir_path)
with open(config_path, "wb") as config_file:
config_bytes, config = obj.generate_config(
with open(config_path, "w") as config_file:
config_str, config = obj.generate_config(
config_dir_path=config_dir_path,
server_name=server_name,
report_stats=(config_args.report_stats == "yes"),
is_generating_file=True
)
obj.invoke_all("generate_files", config)
config_file.write(config_bytes)
config_file.write(config_str)
print((
"A config file has been generated in %r for server name"
" %r with corresponding SSL keys and self-signed"

View file

@ -17,11 +17,11 @@ from ._base import Config, ConfigError
from synapse.appservice import ApplicationService
from synapse.types import UserID
import urllib
import yaml
import logging
from six import string_types
from six.moves.urllib import parse as urlparse
logger = logging.getLogger(__name__)
@ -105,7 +105,7 @@ def _load_appservice(hostname, as_info, config_filename):
)
localpart = as_info["sender_localpart"]
if urllib.quote(localpart) != localpart:
if urlparse.quote(localpart) != localpart:
raise ValueError(
"sender_localpart needs characters which are not URL encoded."
)

View file

@ -117,7 +117,7 @@ class LoggingConfig(Config):
log_config = config.get("log_config")
if log_config and not os.path.exists(log_config):
log_file = self.abspath("homeserver.log")
with open(log_config, "wb") as log_config_file:
with open(log_config, "w") as log_config_file:
log_config_file.write(
DEFAULT_LOG_CONFIG.substitute(log_file=log_file)
)

View file

@ -133,7 +133,7 @@ class TlsConfig(Config):
tls_dh_params_path = config["tls_dh_params_path"]
if not self.path_exists(tls_private_key_path):
with open(tls_private_key_path, "w") as private_key_file:
with open(tls_private_key_path, "wb") as private_key_file:
tls_private_key = crypto.PKey()
tls_private_key.generate_key(crypto.TYPE_RSA, 2048)
private_key_pem = crypto.dump_privatekey(
@ -148,7 +148,7 @@ class TlsConfig(Config):
)
if not self.path_exists(tls_certificate_path):
with open(tls_certificate_path, "w") as certificate_file:
with open(tls_certificate_path, "wb") as certificate_file:
cert = crypto.X509()
subject = cert.get_subject()
subject.CN = config["server_name"]

View file

@ -19,7 +19,8 @@ from synapse.api.errors import SynapseError, Codes
from synapse.util import unwrapFirstError, logcontext
from synapse.util.logcontext import (
PreserveLoggingContext,
preserve_fn
preserve_fn,
run_in_background,
)
from synapse.util.metrics import Measure
@ -127,7 +128,7 @@ class Keyring(object):
verify_requests.append(verify_request)
preserve_fn(self._start_key_lookups)(verify_requests)
run_in_background(self._start_key_lookups, verify_requests)
# Pass those keys to handle_key_deferred so that the json object
# signatures can be verified
@ -316,7 +317,7 @@ class Keyring(object):
if not verify_request.deferred.called:
verify_request.deferred.errback(err)
preserve_fn(do_iterations)().addErrback(on_err)
run_in_background(do_iterations).addErrback(on_err)
@defer.inlineCallbacks
def get_keys_from_store(self, server_name_and_key_ids):
@ -332,8 +333,9 @@ class Keyring(object):
"""
res = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
preserve_fn(self.store.get_server_verify_keys)(
server_name, key_ids
run_in_background(
self.store.get_server_verify_keys,
server_name, key_ids,
).addCallback(lambda ks, server: (server, ks), server_name)
for server_name, key_ids in server_name_and_key_ids
],
@ -361,7 +363,7 @@ class Keyring(object):
results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
preserve_fn(get_key)(p_name, p_keys)
run_in_background(get_key, p_name, p_keys)
for p_name, p_keys in self.perspective_servers.items()
],
consumeErrors=True,
@ -401,7 +403,7 @@ class Keyring(object):
results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
preserve_fn(get_key)(server_name, key_ids)
run_in_background(get_key, server_name, key_ids)
for server_name, key_ids in server_name_and_key_ids
],
consumeErrors=True,
@ -484,7 +486,8 @@ class Keyring(object):
yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
preserve_fn(self.store_keys)(
run_in_background(
self.store_keys,
server_name=server_name,
from_server=perspective_name,
verify_keys=response_keys,
@ -542,7 +545,8 @@ class Keyring(object):
yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
preserve_fn(self.store_keys)(
run_in_background(
self.store_keys,
server_name=key_server_name,
from_server=server_name,
verify_keys=verify_keys,
@ -618,7 +622,8 @@ class Keyring(object):
yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
preserve_fn(self.store.store_server_keys_json)(
run_in_background(
self.store.store_server_keys_json,
server_name=server_name,
key_id=key_id,
from_server=server_name,
@ -719,7 +724,8 @@ class Keyring(object):
# TODO(markjh): Store whether the keys have expired.
return logcontext.make_deferred_yieldable(defer.gatherResults(
[
preserve_fn(self.store.store_server_verify_key)(
run_in_background(
self.store.store_server_verify_key,
server_name, server_name, key.time_added, key
)
for key_id, key in verify_keys.items()

View file

@ -35,7 +35,7 @@ from synapse.federation.federation_base import (
import synapse.metrics
from synapse.util import logcontext, unwrapFirstError
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.logutils import log_function
from synapse.util.retryutils import NotRetryingDestination
@ -419,7 +419,8 @@ class FederationClient(FederationBase):
batch = set(missing_events[i:i + batch_size])
deferreds = [
preserve_fn(self.get_pdu)(
run_in_background(
self.get_pdu,
destinations=random_server_list(),
event_id=e_id,
)

View file

@ -42,7 +42,7 @@ from twisted.internet import defer
from synapse.api.errors import SynapseError
from synapse.types import get_domain_from_id
from synapse.util.logcontext import preserve_fn
from synapse.util.logcontext import run_in_background
from signedjson.sign import sign_json
@ -196,4 +196,4 @@ class GroupAttestionRenewer(object):
group_id = row["group_id"]
user_id = row["user_id"]
preserve_fn(_renew_attestation)(group_id, user_id)
run_in_background(_renew_attestation, group_id, user_id)

View file

@ -198,7 +198,10 @@ class ApplicationServicesHandler(object):
services = yield self._get_services_for_3pn(protocol)
results = yield make_deferred_yieldable(defer.DeferredList([
preserve_fn(self.appservice_api.query_3pe)(service, kind, protocol, fields)
run_in_background(
self.appservice_api.query_3pe,
service, kind, protocol, fields,
)
for service in services
], consumeErrors=True))

View file

@ -24,7 +24,7 @@ from synapse.api.errors import (
SynapseError, CodeMessageException, FederationDeniedError,
)
from synapse.types import get_domain_from_id, UserID
from synapse.util.logcontext import preserve_fn, make_deferred_yieldable
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.retryutils import NotRetryingDestination
logger = logging.getLogger(__name__)
@ -139,7 +139,7 @@ class E2eKeysHandler(object):
failures[destination] = _exception_to_failure(e)
yield make_deferred_yieldable(defer.gatherResults([
preserve_fn(do_remote_query)(destination)
run_in_background(do_remote_query, destination)
for destination in remote_queries_not_in_cache
], consumeErrors=True))
@ -242,7 +242,7 @@ class E2eKeysHandler(object):
failures[destination] = _exception_to_failure(e)
yield make_deferred_yieldable(defer.gatherResults([
preserve_fn(claim_client_keys)(destination)
run_in_background(claim_client_keys, destination)
for destination in remote_queries
], consumeErrors=True))

View file

@ -639,7 +639,8 @@ class FederationHandler(BaseHandler):
results = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
logcontext.preserve_fn(self.replication_layer.get_pdu)(
logcontext.run_in_background(
self.replication_layer.get_pdu,
[dest],
event_id,
outlier=True,
@ -1025,7 +1026,7 @@ class FederationHandler(BaseHandler):
# lots of requests for missing prev_events which we do actually
# have. Hence we fire off the deferred, but don't wait for it.
logcontext.preserve_fn(self._handle_queued_pdus)(room_queue)
logcontext.run_in_background(self._handle_queued_pdus, room_queue)
defer.returnValue(True)
@ -1527,8 +1528,9 @@ class FederationHandler(BaseHandler):
if not backfilled:
# this intentionally does not yield: we don't care about the result
# and don't need to wait for it.
logcontext.preserve_fn(self.pusher_pool.on_new_notifications)(
event_stream_id, max_stream_id
logcontext.run_in_background(
self.pusher_pool.on_new_notifications,
event_stream_id, max_stream_id,
)
defer.returnValue((context, event_stream_id, max_stream_id))
@ -1542,7 +1544,8 @@ class FederationHandler(BaseHandler):
"""
contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
logcontext.preserve_fn(self._prep_event)(
logcontext.run_in_background(
self._prep_event,
origin,
ev_info["event"],
state=ev_info.get("state"),
@ -1871,7 +1874,8 @@ class FederationHandler(BaseHandler):
different_events = yield logcontext.make_deferred_yieldable(
defer.gatherResults([
logcontext.preserve_fn(self.store.get_event)(
logcontext.run_in_background(
self.store.get_event,
d,
allow_none=True,
allow_rejected=False,

View file

@ -27,7 +27,7 @@ from synapse.types import (
from synapse.util import unwrapFirstError
from synapse.util.async import concurrently_execute
from synapse.util.caches.snapshot_cache import SnapshotCache
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.visibility import filter_events_for_client
from ._base import BaseHandler
@ -166,7 +166,8 @@ class InitialSyncHandler(BaseHandler):
(messages, token), current_state = yield make_deferred_yieldable(
defer.gatherResults(
[
preserve_fn(self.store.get_recent_events_for_room)(
run_in_background(
self.store.get_recent_events_for_room,
event.room_id,
limit=limit,
end_token=room_end_token,
@ -391,9 +392,10 @@ class InitialSyncHandler(BaseHandler):
presence, receipts, (messages, token) = yield defer.gatherResults(
[
preserve_fn(get_presence)(),
preserve_fn(get_receipts)(),
preserve_fn(self.store.get_recent_events_for_room)(
run_in_background(get_presence),
run_in_background(get_receipts),
run_in_background(
self.store.get_recent_events_for_room,
room_id,
limit=limit,
end_token=now_token.room_key,

View file

@ -31,7 +31,7 @@ from synapse.types import (
UserID, RoomAlias, RoomStreamToken,
)
from synapse.util.async import run_on_reactor, ReadWriteLock, Limiter
from synapse.util.logcontext import preserve_fn, run_in_background
from synapse.util.logcontext import run_in_background
from synapse.util.metrics import measure_func
from synapse.util.frozenutils import frozendict_json_encoder
from synapse.util.stringutils import random_string
@ -857,7 +857,8 @@ class EventCreationHandler(object):
# this intentionally does not yield: we don't care about the result
# and don't need to wait for it.
preserve_fn(self.pusher_pool.on_new_notifications)(
run_in_background(
self.pusher_pool.on_new_notifications,
event_stream_id, max_stream_id
)
@ -872,7 +873,7 @@ class EventCreationHandler(object):
except Exception:
logger.exception("Error notifying about new room event")
preserve_fn(_notify)()
run_in_background(_notify)
if event.type == EventTypes.Message:
# We don't want to block sending messages on any presence code. This

View file

@ -16,7 +16,7 @@
from twisted.internet import defer
from synapse.api.errors import SynapseError, AuthError
from synapse.util.logcontext import preserve_fn
from synapse.util.logcontext import run_in_background
from synapse.util.metrics import Measure
from synapse.util.wheel_timer import WheelTimer
from synapse.types import UserID, get_domain_from_id
@ -97,7 +97,8 @@ class TypingHandler(object):
if self.hs.is_mine_id(member.user_id):
last_fed_poke = self._member_last_federation_poke.get(member, None)
if not last_fed_poke or last_fed_poke + FEDERATION_PING_INTERVAL <= now:
preserve_fn(self._push_remote)(
run_in_background(
self._push_remote,
member=member,
typing=True
)
@ -196,7 +197,7 @@ class TypingHandler(object):
def _push_update(self, member, typing):
if self.hs.is_mine_id(member.user_id):
# Only send updates for changes to our own users.
preserve_fn(self._push_remote)(member, typing)
run_in_background(self._push_remote, member, typing)
self._push_update_local(
member=member,

View file

@ -40,7 +40,7 @@ from twisted.web.http import PotentialDataLoss
from twisted.web.http_headers import Headers
from twisted.web._newclient import ResponseDone
from StringIO import StringIO
from six import StringIO
import simplejson as json
import logging
@ -507,7 +507,7 @@ class SpiderHttpClient(SimpleHttpClient):
reactor,
SpiderEndpointFactory(hs)
)
), [('gzip', GzipDecoder)]
), [(b'gzip', GzipDecoder)]
)
# We could look like Chrome:
# self.user_agent = ("Mozilla/5.0 (%s) (KHTML, like Gecko)

View file

@ -286,7 +286,7 @@ def resolve_service(service_name, dns_client=client, cache=SERVER_CACHE, clock=t
if (len(answers) == 1
and answers[0].type == dns.SRV
and answers[0].payload
and answers[0].payload.target == dns.Name('.')):
and answers[0].payload.target == dns.Name(b'.')):
raise ConnectError("Service %s unavailable" % service_name)
for answer in answers:

View file

@ -41,8 +41,7 @@ import logging
import random
import sys
import urllib
import urlparse
from six.moves.urllib import parse as urlparse
logger = logging.getLogger(__name__)
outbound_logger = logging.getLogger("synapse.http.outbound")

View file

@ -546,6 +546,6 @@ def _request_user_agent_is_curl(request):
b"User-Agent", default=[]
)
for user_agent in user_agents:
if "curl" in user_agent:
if b"curl" in user_agent:
return True
return False

View file

@ -14,13 +14,13 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
from twisted.internet import defer
from .pusher import PusherFactory
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.push.pusher import PusherFactory
from synapse.util.async import run_on_reactor
import logging
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
logger = logging.getLogger(__name__)
@ -137,8 +137,9 @@ class PusherPool:
if u in self.pushers:
for p in self.pushers[u].values():
deferreds.append(
preserve_fn(p.on_new_notifications)(
min_stream_id, max_stream_id
run_in_background(
p.on_new_notifications,
min_stream_id, max_stream_id,
)
)
@ -166,7 +167,10 @@ class PusherPool:
if u in self.pushers:
for p in self.pushers[u].values():
deferreds.append(
preserve_fn(p.on_new_receipts)(min_stream_id, max_stream_id)
run_in_background(
p.on_new_receipts,
min_stream_id, max_stream_id,
)
)
yield make_deferred_yieldable(
@ -211,7 +215,7 @@ class PusherPool:
if appid_pushkey in byuser:
byuser[appid_pushkey].on_stop()
byuser[appid_pushkey] = p
preserve_fn(p.on_started)()
run_in_background(p.on_started)
logger.info("Started pushers")

View file

@ -25,7 +25,7 @@ from .base import ClientV1RestServlet, client_path_patterns
import simplejson as json
import urllib
import urlparse
from six.moves.urllib import parse as urlparse
import logging
from saml2 import BINDING_HTTP_POST

View file

@ -30,6 +30,8 @@ from hashlib import sha1
import hmac
import logging
from six import string_types
logger = logging.getLogger(__name__)
@ -333,11 +335,11 @@ class RegisterRestServlet(ClientV1RestServlet):
def _do_shared_secret(self, request, register_json, session):
yield run_on_reactor()
if not isinstance(register_json.get("mac", None), basestring):
if not isinstance(register_json.get("mac", None), string_types):
raise SynapseError(400, "Expected mac.")
if not isinstance(register_json.get("user", None), basestring):
if not isinstance(register_json.get("user", None), string_types):
raise SynapseError(400, "Expected 'user' key.")
if not isinstance(register_json.get("password", None), basestring):
if not isinstance(register_json.get("password", None), string_types):
raise SynapseError(400, "Expected 'password' key.")
if not self.hs.config.registration_shared_secret:
@ -358,14 +360,14 @@ class RegisterRestServlet(ClientV1RestServlet):
got_mac = str(register_json["mac"])
want_mac = hmac.new(
key=self.hs.config.registration_shared_secret,
key=self.hs.config.registration_shared_secret.encode(),
digestmod=sha1,
)
want_mac.update(user)
want_mac.update("\x00")
want_mac.update(b"\x00")
want_mac.update(password)
want_mac.update("\x00")
want_mac.update("admin" if admin else "notadmin")
want_mac.update(b"\x00")
want_mac.update(b"admin" if admin else b"notadmin")
want_mac = want_mac.hexdigest()
if compare_digest(want_mac, got_mac):

View file

@ -28,8 +28,9 @@ from synapse.http.servlet import (
parse_json_object_from_request, parse_string, parse_integer
)
from six.moves.urllib import parse as urlparse
import logging
import urllib
import simplejson as json
logger = logging.getLogger(__name__)
@ -433,7 +434,7 @@ class RoomMessageListRestServlet(ClientV1RestServlet):
as_client_event = "raw" not in request.args
filter_bytes = request.args.get("filter", None)
if filter_bytes:
filter_json = urllib.unquote(filter_bytes[-1]).decode("UTF-8")
filter_json = urlparse.unquote(filter_bytes[-1]).decode("UTF-8")
event_filter = Filter(json.loads(filter_json))
else:
event_filter = None
@ -718,8 +719,8 @@ class RoomTypingRestServlet(ClientV1RestServlet):
def on_PUT(self, request, room_id, user_id):
requester = yield self.auth.get_user_by_req(request)
room_id = urllib.unquote(room_id)
target_user = UserID.from_string(urllib.unquote(user_id))
room_id = urlparse.unquote(room_id)
target_user = UserID.from_string(urlparse.unquote(user_id))
content = parse_json_object_from_request(request)

View file

@ -35,6 +35,8 @@ from hashlib import sha1
from synapse.util.async import run_on_reactor
from synapse.util.ratelimitutils import FederationRateLimiter
from six import string_types
# We ought to be using hmac.compare_digest() but on older pythons it doesn't
# exist. It's a _really minor_ security flaw to use plain string comparison
@ -210,14 +212,14 @@ class RegisterRestServlet(RestServlet):
# in sessions. Pull out the username/password provided to us.
desired_password = None
if 'password' in body:
if (not isinstance(body['password'], basestring) or
if (not isinstance(body['password'], string_types) or
len(body['password']) > 512):
raise SynapseError(400, "Invalid password")
desired_password = body["password"]
desired_username = None
if 'username' in body:
if (not isinstance(body['username'], basestring) or
if (not isinstance(body['username'], string_types) or
len(body['username']) > 512):
raise SynapseError(400, "Invalid username")
desired_username = body['username']
@ -243,7 +245,7 @@ class RegisterRestServlet(RestServlet):
access_token = get_access_token_from_request(request)
if isinstance(desired_username, basestring):
if isinstance(desired_username, string_types):
result = yield self._do_appservice_registration(
desired_username, access_token, body
)
@ -464,7 +466,7 @@ class RegisterRestServlet(RestServlet):
# includes the password and admin flag in the hashed text. Why are
# these different?
want_mac = hmac.new(
key=self.hs.config.registration_shared_secret,
key=self.hs.config.registration_shared_secret.encode(),
msg=user,
digestmod=sha1,
).hexdigest()

View file

@ -28,7 +28,7 @@ import os
import logging
import urllib
import urlparse
from six.moves.urllib import parse as urlparse
logger = logging.getLogger(__name__)

View file

@ -47,7 +47,7 @@ import shutil
import cgi
import logging
import urlparse
from six.moves.urllib import parse as urlparse
logger = logging.getLogger(__name__)

View file

@ -35,7 +35,7 @@ from ._base import FileInfo
from synapse.api.errors import (
SynapseError, Codes,
)
from synapse.util.logcontext import preserve_fn, make_deferred_yieldable
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.stringutils import random_string
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.http.client import SpiderHttpClient
@ -144,7 +144,8 @@ class PreviewUrlResource(Resource):
observable = self._cache.get(url)
if not observable:
download = preserve_fn(self._do_preview)(
download = run_in_background(
self._do_preview,
url, requester.user, ts,
)
observable = ObservableDeferred(

View file

@ -81,15 +81,15 @@ class UploadResource(Resource):
headers = request.requestHeaders
if headers.hasHeader("Content-Type"):
media_type = headers.getRawHeaders("Content-Type")[0]
media_type = headers.getRawHeaders(b"Content-Type")[0]
else:
raise SynapseError(
msg="Upload request missing 'Content-Type'",
code=400,
)
# if headers.hasHeader("Content-Disposition"):
# disposition = headers.getRawHeaders("Content-Disposition")[0]
# if headers.hasHeader(b"Content-Disposition"):
# disposition = headers.getRawHeaders(b"Content-Disposition")[0]
# TODO(markjh): parse content-dispostion
content_uri = yield self.media_repo.create_content(

View file

@ -20,7 +20,7 @@ from synapse.events import FrozenEvent
from synapse.events.utils import prune_event
from synapse.util.logcontext import (
preserve_fn, PreserveLoggingContext, make_deferred_yieldable
PreserveLoggingContext, make_deferred_yieldable, run_in_background,
)
from synapse.util.metrics import Measure
from synapse.api.errors import SynapseError
@ -319,7 +319,8 @@ class EventsWorkerStore(SQLBaseStore):
res = yield make_deferred_yieldable(defer.gatherResults(
[
preserve_fn(self._get_event_from_row)(
run_in_background(
self._get_event_from_row,
row["internal_metadata"], row["json"], row["redacts"],
rejected_reason=row["rejects"],
)

View file

@ -41,7 +41,7 @@ from synapse.storage.events import EventsWorkerStore
from synapse.util.caches.descriptors import cached
from synapse.types import RoomStreamToken
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.storage.engines import PostgresEngine, Sqlite3Engine
import abc
@ -200,7 +200,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
room_ids = list(room_ids)
for rm_ids in (room_ids[i:i + 20] for i in range(0, len(room_ids), 20)):
res = yield make_deferred_yieldable(defer.gatherResults([
preserve_fn(self.get_room_events_stream_for_room)(
run_in_background(
self.get_room_events_stream_for_room,
room_id, from_key, to_key, limit, order=order,
)
for room_id in rm_ids

View file

@ -19,7 +19,7 @@ from twisted.internet.defer import CancelledError
from twisted.python import failure
from .logcontext import (
PreserveLoggingContext, make_deferred_yieldable, preserve_fn
PreserveLoggingContext, make_deferred_yieldable, run_in_background
)
from synapse.util import logcontext, unwrapFirstError
@ -165,7 +165,7 @@ def concurrently_execute(func, args, limit):
pass
return logcontext.make_deferred_yieldable(defer.gatherResults([
preserve_fn(_concurrently_execute_inner)()
run_in_background(_concurrently_execute_inner)
for _ in range(limit)
], consumeErrors=True)).addErrback(unwrapFirstError)

View file

@ -15,7 +15,7 @@
from twisted.internet import threads, reactor
from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from six.moves import queue
@ -70,7 +70,9 @@ class BackgroundFileConsumer(object):
self._producer = producer
self.streaming = streaming
self._finished_deferred = preserve_fn(threads.deferToThread)(self._writer)
self._finished_deferred = run_in_background(
threads.deferToThread, self._writer
)
if not streaming:
self._producer.resumeProducing()

View file

@ -40,9 +40,12 @@ def create_resource_tree(desired_tree, root_resource):
# extra resources to existing nodes. See self._resource_id for the key.
resource_mappings = {}
for full_path, res in desired_tree.items():
# twisted requires all resources to be bytes
full_path = full_path.encode("utf-8")
logger.info("Attaching %s to path %s", res, full_path)
last_resource = root_resource
for path_seg in full_path.split('/')[1:-1]:
for path_seg in full_path.split(b'/')[1:-1]:
if path_seg not in last_resource.listNames():
# resource doesn't exist, so make a "dummy resource"
child_resource = NoResource()
@ -57,7 +60,7 @@ def create_resource_tree(desired_tree, root_resource):
# ===========================
# now attach the actual desired resource
last_path_seg = full_path.split('/')[-1]
last_path_seg = full_path.split(b'/')[-1]
# if there is already a resource here, thieve its children and
# replace it

View file

@ -346,7 +346,7 @@ def make_deferred_yieldable(deferred):
returning a deferred. Then, when the deferred completes, restores the
current logcontext before running callbacks/errbacks.
(This is more-or-less the opposite operation to preserve_fn.)
(This is more-or-less the opposite operation to run_in_background.)
"""
if isinstance(deferred, defer.Deferred) and not deferred.called:
prev_context = LoggingContext.set_current_context(LoggingContext.sentinel)

View file

@ -14,7 +14,7 @@
# limitations under the License.
import StringIO
from six import StringIO
import logging
import traceback

View file

@ -18,7 +18,7 @@ from twisted.internet import defer
from synapse.api.errors import LimitExceededError
from synapse.util.async import sleep
from synapse.util.logcontext import preserve_fn
from synapse.util.logcontext import run_in_background
import collections
import contextlib
@ -150,7 +150,7 @@ class _PerHostRatelimiter(object):
"Ratelimit [%s]: sleeping req",
id(request_id),
)
ret_defer = preserve_fn(sleep)(self.sleep_msec / 1000.0)
ret_defer = run_in_background(sleep, self.sleep_msec / 1000.0)
self.sleeping_requests.add(request_id)

View file

@ -203,8 +203,8 @@ class RetryDestinationLimiter(object):
)
except Exception:
logger.exception(
"Failed to store set_destination_retry_timings",
"Failed to store destination_retry_timings",
)
# we deliberately do this in the background.
synapse.util.logcontext.preserve_fn(store_retry_timings)()
synapse.util.logcontext.run_in_background(store_retry_timings)

View file

@ -24,7 +24,7 @@ from synapse.api.constants import Membership
from synapse.types import UserID
import json
import urllib
from six.moves.urllib import parse as urlparse
from ....utils import MockHttpResource, setup_test_homeserver
from .utils import RestTestCase
@ -766,7 +766,7 @@ class RoomMemberStateTestCase(RestTestCase):
@defer.inlineCallbacks
def test_rooms_members_self(self):
path = "/rooms/%s/state/m.room.member/%s" % (
urllib.quote(self.room_id), self.user_id
urlparse.quote(self.room_id), self.user_id
)
# valid join message (NOOP since we made the room)
@ -786,7 +786,7 @@ class RoomMemberStateTestCase(RestTestCase):
def test_rooms_members_other(self):
self.other_id = "@zzsid1:red"
path = "/rooms/%s/state/m.room.member/%s" % (
urllib.quote(self.room_id), self.other_id
urlparse.quote(self.room_id), self.other_id
)
# valid invite message
@ -802,7 +802,7 @@ class RoomMemberStateTestCase(RestTestCase):
def test_rooms_members_other_custom_keys(self):
self.other_id = "@zzsid1:red"
path = "/rooms/%s/state/m.room.member/%s" % (
urllib.quote(self.room_id), self.other_id
urlparse.quote(self.room_id), self.other_id
)
# valid invite message with custom key
@ -859,7 +859,7 @@ class RoomMessagesTestCase(RestTestCase):
@defer.inlineCallbacks
def test_invalid_puts(self):
path = "/rooms/%s/send/m.room.message/mid1" % (
urllib.quote(self.room_id))
urlparse.quote(self.room_id))
# missing keys or invalid json
(code, response) = yield self.mock_resource.trigger(
"PUT", path, '{}'
@ -894,7 +894,7 @@ class RoomMessagesTestCase(RestTestCase):
@defer.inlineCallbacks
def test_rooms_messages_sent(self):
path = "/rooms/%s/send/m.room.message/mid1" % (
urllib.quote(self.room_id))
urlparse.quote(self.room_id))
content = '{"body":"test","msgtype":{"type":"a"}}'
(code, response) = yield self.mock_resource.trigger("PUT", path, content)
@ -911,7 +911,7 @@ class RoomMessagesTestCase(RestTestCase):
# m.text message type
path = "/rooms/%s/send/m.room.message/mid2" % (
urllib.quote(self.room_id))
urlparse.quote(self.room_id))
content = '{"body":"test2","msgtype":"m.text"}'
(code, response) = yield self.mock_resource.trigger("PUT", path, content)
self.assertEquals(200, code, msg=str(response))

View file

@ -15,8 +15,7 @@
import hashlib
from inspect import getcallargs
import urllib
import urlparse
from six.moves.urllib import parse as urlparse
from mock import Mock, patch
from twisted.internet import defer, reactor
@ -238,7 +237,7 @@ class MockHttpResource(HttpServer):
if matcher:
try:
args = [
urllib.unquote(u).decode("UTF-8")
urlparse.unquote(u).decode("UTF-8")
for u in matcher.groups()
]

10
tox.ini
View file

@ -1,5 +1,5 @@
[tox]
envlist = packaging, py27, pep8
envlist = packaging, py27, py36, pep8
[testenv]
deps =
@ -46,6 +46,14 @@ commands =
# )
usedevelop=true
[testenv:py36]
usedevelop=true
commands =
/usr/bin/find "{toxinidir}" -name '*.pyc' -delete
coverage run {env:COVERAGE_OPTS:} --source="{toxinidir}/synapse" \
"{envbindir}/trial" {env:TRIAL_FLAGS:} {posargs:tests/metrics} {env:TOXSUFFIX:}
{env:DUMP_COVERAGE_COMMAND:coverage report -m}
[testenv:packaging]
deps =
check-manifest