mirror of
https://github.com/element-hq/synapse.git
synced 2024-12-22 04:34:28 +03:00
Merge branch 'develop' into rei/rss_target
This commit is contained in:
commit
baeaf00a12
61 changed files with 1300 additions and 503 deletions
1
changelog.d/5771.feature
Normal file
1
changelog.d/5771.feature
Normal file
|
@ -0,0 +1 @@
|
|||
Make Opentracing work in worker mode.
|
1
changelog.d/5776.misc
Normal file
1
changelog.d/5776.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Update opentracing docs to use the unified `trace` method.
|
1
changelog.d/5845.feature
Normal file
1
changelog.d/5845.feature
Normal file
|
@ -0,0 +1 @@
|
|||
Add an admin API to purge old rooms from the database.
|
1
changelog.d/5850.feature
Normal file
1
changelog.d/5850.feature
Normal file
|
@ -0,0 +1 @@
|
|||
Add retry to well-known lookups if we have recently seen a valid well-known record for the server.
|
1
changelog.d/5852.feature
Normal file
1
changelog.d/5852.feature
Normal file
|
@ -0,0 +1 @@
|
|||
Pass opentracing contexts between servers when transmitting EDUs.
|
1
changelog.d/5855.misc
Normal file
1
changelog.d/5855.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Opentracing for room and e2e keys.
|
1
changelog.d/5860.misc
Normal file
1
changelog.d/5860.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Remove log line for debugging issue #5407.
|
1
changelog.d/5877.removal
Normal file
1
changelog.d/5877.removal
Normal file
|
@ -0,0 +1 @@
|
|||
Remove shared secret registration from client/r0/register endpoint. Contributed by Awesome Technologies Innovationslabor GmbH.
|
1
changelog.d/5878.feature
Normal file
1
changelog.d/5878.feature
Normal file
|
@ -0,0 +1 @@
|
|||
Add admin API endpoint for setting whether or not a user is a server administrator.
|
1
changelog.d/5885.bugfix
Normal file
1
changelog.d/5885.bugfix
Normal file
|
@ -0,0 +1 @@
|
|||
Fix stack overflow when recovering an appservice which had an outage.
|
1
changelog.d/5886.misc
Normal file
1
changelog.d/5886.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Refactor the Appservice scheduler code.
|
1
changelog.d/5893.misc
Normal file
1
changelog.d/5893.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Drop some unused tables.
|
1
changelog.d/5894.misc
Normal file
1
changelog.d/5894.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Add missing index on users_in_public_rooms to improve the performance of directory queries.
|
1
changelog.d/5895.feature
Normal file
1
changelog.d/5895.feature
Normal file
|
@ -0,0 +1 @@
|
|||
Add config option to sign remote key query responses with a separate key.
|
1
changelog.d/5896.misc
Normal file
1
changelog.d/5896.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Improve the logging when we have an error when fetching signing keys.
|
1
changelog.d/5906.feature
Normal file
1
changelog.d/5906.feature
Normal file
|
@ -0,0 +1 @@
|
|||
Increase max display name size to 256.
|
1
changelog.d/5909.misc
Normal file
1
changelog.d/5909.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Fix error message which referred to public_base_url instead of public_baseurl. Thanks to @aaronraimist for the fix!
|
1
changelog.d/5911.misc
Normal file
1
changelog.d/5911.misc
Normal file
|
@ -0,0 +1 @@
|
|||
Add support for database engine-specific schema deltas, based on file extension.
|
18
docs/admin_api/purge_room.md
Normal file
18
docs/admin_api/purge_room.md
Normal file
|
@ -0,0 +1,18 @@
|
|||
Purge room API
|
||||
==============
|
||||
|
||||
This API will remove all trace of a room from your database.
|
||||
|
||||
All local users must have left the room before it can be removed.
|
||||
|
||||
The API is:
|
||||
|
||||
```
|
||||
POST /_synapse/admin/v1/purge_room
|
||||
|
||||
{
|
||||
"room_id": "!room:id"
|
||||
}
|
||||
```
|
||||
|
||||
You must authenticate using the access token of an admin user.
|
|
@ -84,3 +84,23 @@ with a body of:
|
|||
}
|
||||
|
||||
including an ``access_token`` of a server admin.
|
||||
|
||||
|
||||
Change whether a user is a server administrator or not
|
||||
======================================================
|
||||
|
||||
Note that you cannot demote yourself.
|
||||
|
||||
The api is::
|
||||
|
||||
PUT /_synapse/admin/v1/users/<user_id>/admin
|
||||
|
||||
with a body of:
|
||||
|
||||
.. code:: json
|
||||
|
||||
{
|
||||
"admin": true
|
||||
}
|
||||
|
||||
including an ``access_token`` of a server admin.
|
||||
|
|
|
@ -92,6 +92,29 @@ two problems, namely:
|
|||
but that doesn't prevent another server sending you baggage which will be logged
|
||||
to OpenTracing's logs.
|
||||
|
||||
==========
|
||||
EDU FORMAT
|
||||
==========
|
||||
|
||||
EDUs can contain tracing data in their content. This is not specced but
|
||||
it could be of interest for other homeservers.
|
||||
|
||||
EDU format (if you're using jaeger):
|
||||
|
||||
.. code-block:: json
|
||||
|
||||
{
|
||||
"edu_type": "type",
|
||||
"content": {
|
||||
"org.matrix.opentracing_context": {
|
||||
"uber-trace-id": "fe57cf3e65083289"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Though you don't have to use jaeger you must inject the span context into
|
||||
`org.matrix.opentracing_context` using the opentracing `Format.TEXT_MAP` inject method.
|
||||
|
||||
==================
|
||||
Configuring Jaeger
|
||||
==================
|
||||
|
|
|
@ -1027,6 +1027,14 @@ signing_key_path: "CONFDIR/SERVERNAME.signing.key"
|
|||
#
|
||||
#trusted_key_servers:
|
||||
# - server_name: "matrix.org"
|
||||
#
|
||||
|
||||
# The signing keys to use when acting as a trusted key server. If not specified
|
||||
# defaults to the server signing key.
|
||||
#
|
||||
# Can contain multiple keys, one per line.
|
||||
#
|
||||
#key_server_signing_keys_path: "key_server_signing_keys.key"
|
||||
|
||||
|
||||
# Enable SAML2 for registration and login. Uses pysaml2.
|
||||
|
|
|
@ -70,35 +70,37 @@ class ApplicationServiceScheduler(object):
|
|||
self.store = hs.get_datastore()
|
||||
self.as_api = hs.get_application_service_api()
|
||||
|
||||
def create_recoverer(service, callback):
|
||||
return _Recoverer(self.clock, self.store, self.as_api, service, callback)
|
||||
|
||||
self.txn_ctrl = _TransactionController(
|
||||
self.clock, self.store, self.as_api, create_recoverer
|
||||
)
|
||||
self.txn_ctrl = _TransactionController(self.clock, self.store, self.as_api)
|
||||
self.queuer = _ServiceQueuer(self.txn_ctrl, self.clock)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def start(self):
|
||||
logger.info("Starting appservice scheduler")
|
||||
|
||||
# check for any DOWN ASes and start recoverers for them.
|
||||
recoverers = yield _Recoverer.start(
|
||||
self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered
|
||||
services = yield self.store.get_appservices_by_state(
|
||||
ApplicationServiceState.DOWN
|
||||
)
|
||||
self.txn_ctrl.add_recoverers(recoverers)
|
||||
|
||||
for service in services:
|
||||
self.txn_ctrl.start_recoverer(service)
|
||||
|
||||
def submit_event_for_as(self, service, event):
|
||||
self.queuer.enqueue(service, event)
|
||||
|
||||
|
||||
class _ServiceQueuer(object):
|
||||
"""Queues events for the same application service together, sending
|
||||
transactions as soon as possible. Once a transaction is sent successfully,
|
||||
this schedules any other events in the queue to run.
|
||||
"""Queue of events waiting to be sent to appservices.
|
||||
|
||||
Groups events into transactions per-appservice, and sends them on to the
|
||||
TransactionController. Makes sure that we only have one transaction in flight per
|
||||
appservice at a given time.
|
||||
"""
|
||||
|
||||
def __init__(self, txn_ctrl, clock):
|
||||
self.queued_events = {} # dict of {service_id: [events]}
|
||||
|
||||
# the appservices which currently have a transaction in flight
|
||||
self.requests_in_flight = set()
|
||||
self.txn_ctrl = txn_ctrl
|
||||
self.clock = clock
|
||||
|
@ -136,13 +138,29 @@ class _ServiceQueuer(object):
|
|||
|
||||
|
||||
class _TransactionController(object):
|
||||
def __init__(self, clock, store, as_api, recoverer_fn):
|
||||
"""Transaction manager.
|
||||
|
||||
Builds AppServiceTransactions and runs their lifecycle. Also starts a Recoverer
|
||||
if a transaction fails.
|
||||
|
||||
(Note we have only have one of these in the homeserver.)
|
||||
|
||||
Args:
|
||||
clock (synapse.util.Clock):
|
||||
store (synapse.storage.DataStore):
|
||||
as_api (synapse.appservice.api.ApplicationServiceApi):
|
||||
"""
|
||||
|
||||
def __init__(self, clock, store, as_api):
|
||||
self.clock = clock
|
||||
self.store = store
|
||||
self.as_api = as_api
|
||||
self.recoverer_fn = recoverer_fn
|
||||
# keep track of how many recoverers there are
|
||||
self.recoverers = []
|
||||
|
||||
# map from service id to recoverer instance
|
||||
self.recoverers = {}
|
||||
|
||||
# for UTs
|
||||
self.RECOVERER_CLASS = _Recoverer
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def send(self, service, events):
|
||||
|
@ -154,42 +172,45 @@ class _TransactionController(object):
|
|||
if sent:
|
||||
yield txn.complete(self.store)
|
||||
else:
|
||||
run_in_background(self._start_recoverer, service)
|
||||
run_in_background(self._on_txn_fail, service)
|
||||
except Exception:
|
||||
logger.exception("Error creating appservice transaction")
|
||||
run_in_background(self._start_recoverer, service)
|
||||
run_in_background(self._on_txn_fail, service)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_recovered(self, recoverer):
|
||||
self.recoverers.remove(recoverer)
|
||||
logger.info(
|
||||
"Successfully recovered application service AS ID %s", recoverer.service.id
|
||||
)
|
||||
self.recoverers.pop(recoverer.service.id)
|
||||
logger.info("Remaining active recoverers: %s", len(self.recoverers))
|
||||
yield self.store.set_appservice_state(
|
||||
recoverer.service, ApplicationServiceState.UP
|
||||
)
|
||||
|
||||
def add_recoverers(self, recoverers):
|
||||
for r in recoverers:
|
||||
self.recoverers.append(r)
|
||||
if len(recoverers) > 0:
|
||||
logger.info("New active recoverers: %s", len(self.recoverers))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _start_recoverer(self, service):
|
||||
def _on_txn_fail(self, service):
|
||||
try:
|
||||
yield self.store.set_appservice_state(service, ApplicationServiceState.DOWN)
|
||||
logger.info(
|
||||
"Application service falling behind. Starting recoverer. AS ID %s",
|
||||
service.id,
|
||||
)
|
||||
recoverer = self.recoverer_fn(service, self.on_recovered)
|
||||
self.add_recoverers([recoverer])
|
||||
recoverer.recover()
|
||||
self.start_recoverer(service)
|
||||
except Exception:
|
||||
logger.exception("Error starting AS recoverer")
|
||||
|
||||
def start_recoverer(self, service):
|
||||
"""Start a Recoverer for the given service
|
||||
|
||||
Args:
|
||||
service (synapse.appservice.ApplicationService):
|
||||
"""
|
||||
logger.info("Starting recoverer for AS ID %s", service.id)
|
||||
assert service.id not in self.recoverers
|
||||
recoverer = self.RECOVERER_CLASS(
|
||||
self.clock, self.store, self.as_api, service, self.on_recovered
|
||||
)
|
||||
self.recoverers[service.id] = recoverer
|
||||
recoverer.recover()
|
||||
logger.info("Now %i active recoverers", len(self.recoverers))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _is_service_up(self, service):
|
||||
state = yield self.store.get_appservice_state(service)
|
||||
|
@ -197,18 +218,17 @@ class _TransactionController(object):
|
|||
|
||||
|
||||
class _Recoverer(object):
|
||||
@staticmethod
|
||||
@defer.inlineCallbacks
|
||||
def start(clock, store, as_api, callback):
|
||||
services = yield store.get_appservices_by_state(ApplicationServiceState.DOWN)
|
||||
recoverers = [_Recoverer(clock, store, as_api, s, callback) for s in services]
|
||||
for r in recoverers:
|
||||
logger.info(
|
||||
"Starting recoverer for AS ID %s which was marked as " "DOWN",
|
||||
r.service.id,
|
||||
)
|
||||
r.recover()
|
||||
return recoverers
|
||||
"""Manages retries and backoff for a DOWN appservice.
|
||||
|
||||
We have one of these for each appservice which is currently considered DOWN.
|
||||
|
||||
Args:
|
||||
clock (synapse.util.Clock):
|
||||
store (synapse.storage.DataStore):
|
||||
as_api (synapse.appservice.api.ApplicationServiceApi):
|
||||
service (synapse.appservice.ApplicationService): the service we are managing
|
||||
callback (callable[_Recoverer]): called once the service recovers.
|
||||
"""
|
||||
|
||||
def __init__(self, clock, store, as_api, service, callback):
|
||||
self.clock = clock
|
||||
|
@ -224,7 +244,9 @@ class _Recoverer(object):
|
|||
"as-recoverer-%s" % (self.service.id,), self.retry
|
||||
)
|
||||
|
||||
self.clock.call_later((2 ** self.backoff_counter), _retry)
|
||||
delay = 2 ** self.backoff_counter
|
||||
logger.info("Scheduling retries on %s in %fs", self.service.id, delay)
|
||||
self.clock.call_later(delay, _retry)
|
||||
|
||||
def _backoff(self):
|
||||
# cap the backoff to be around 8.5min => (2^9) = 512 secs
|
||||
|
@ -234,25 +256,30 @@ class _Recoverer(object):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def retry(self):
|
||||
logger.info("Starting retries on %s", self.service.id)
|
||||
try:
|
||||
while True:
|
||||
txn = yield self.store.get_oldest_unsent_txn(self.service)
|
||||
if txn:
|
||||
if not txn:
|
||||
# nothing left: we're done!
|
||||
self.callback(self)
|
||||
return
|
||||
|
||||
logger.info(
|
||||
"Retrying transaction %s for AS ID %s", txn.id, txn.service.id
|
||||
)
|
||||
sent = yield txn.send(self.as_api)
|
||||
if sent:
|
||||
yield txn.complete(self.store)
|
||||
# reset the backoff counter and retry immediately
|
||||
self.backoff_counter = 1
|
||||
yield self.retry()
|
||||
else:
|
||||
self._backoff()
|
||||
else:
|
||||
self._set_service_recovered()
|
||||
except Exception as e:
|
||||
logger.exception(e)
|
||||
self._backoff()
|
||||
if not sent:
|
||||
break
|
||||
|
||||
def _set_service_recovered(self):
|
||||
self.callback(self)
|
||||
yield txn.complete(self.store)
|
||||
|
||||
# reset the backoff counter and then process the next transaction
|
||||
self.backoff_counter = 1
|
||||
|
||||
except Exception:
|
||||
logger.exception("Unexpected error running retries")
|
||||
|
||||
# we didn't manage to send all of the transactions before we got an error of
|
||||
# some flavour: reschedule the next retry.
|
||||
self._backoff()
|
||||
|
|
|
@ -115,7 +115,7 @@ class EmailConfig(Config):
|
|||
missing.append("email." + k)
|
||||
|
||||
if config.get("public_baseurl") is None:
|
||||
missing.append("public_base_url")
|
||||
missing.append("public_baseurl")
|
||||
|
||||
if len(missing) > 0:
|
||||
raise RuntimeError(
|
||||
|
|
|
@ -76,7 +76,7 @@ class KeyConfig(Config):
|
|||
config_dir_path, config["server_name"] + ".signing.key"
|
||||
)
|
||||
|
||||
self.signing_key = self.read_signing_key(signing_key_path)
|
||||
self.signing_key = self.read_signing_keys(signing_key_path, "signing_key")
|
||||
|
||||
self.old_signing_keys = self.read_old_signing_keys(
|
||||
config.get("old_signing_keys", {})
|
||||
|
@ -85,6 +85,14 @@ class KeyConfig(Config):
|
|||
config.get("key_refresh_interval", "1d")
|
||||
)
|
||||
|
||||
key_server_signing_keys_path = config.get("key_server_signing_keys_path")
|
||||
if key_server_signing_keys_path:
|
||||
self.key_server_signing_keys = self.read_signing_keys(
|
||||
key_server_signing_keys_path, "key_server_signing_keys_path"
|
||||
)
|
||||
else:
|
||||
self.key_server_signing_keys = list(self.signing_key)
|
||||
|
||||
# if neither trusted_key_servers nor perspectives are given, use the default.
|
||||
if "perspectives" not in config and "trusted_key_servers" not in config:
|
||||
key_servers = [{"server_name": "matrix.org"}]
|
||||
|
@ -210,16 +218,34 @@ class KeyConfig(Config):
|
|||
#
|
||||
#trusted_key_servers:
|
||||
# - server_name: "matrix.org"
|
||||
#
|
||||
|
||||
# The signing keys to use when acting as a trusted key server. If not specified
|
||||
# defaults to the server signing key.
|
||||
#
|
||||
# Can contain multiple keys, one per line.
|
||||
#
|
||||
#key_server_signing_keys_path: "key_server_signing_keys.key"
|
||||
"""
|
||||
% locals()
|
||||
)
|
||||
|
||||
def read_signing_key(self, signing_key_path):
|
||||
signing_keys = self.read_file(signing_key_path, "signing_key")
|
||||
def read_signing_keys(self, signing_key_path, name):
|
||||
"""Read the signing keys in the given path.
|
||||
|
||||
Args:
|
||||
signing_key_path (str)
|
||||
name (str): Associated config key name
|
||||
|
||||
Returns:
|
||||
list[SigningKey]
|
||||
"""
|
||||
|
||||
signing_keys = self.read_file(signing_key_path, name)
|
||||
try:
|
||||
return read_signing_keys(signing_keys.splitlines(True))
|
||||
except Exception as e:
|
||||
raise ConfigError("Error reading signing_key: %s" % (str(e)))
|
||||
raise ConfigError("Error reading %s: %s" % (name, str(e)))
|
||||
|
||||
def read_old_signing_keys(self, old_signing_keys):
|
||||
keys = {}
|
||||
|
|
|
@ -18,7 +18,6 @@ import logging
|
|||
from collections import defaultdict
|
||||
|
||||
import six
|
||||
from six import raise_from
|
||||
from six.moves import urllib
|
||||
|
||||
import attr
|
||||
|
@ -30,7 +29,6 @@ from signedjson.key import (
|
|||
from signedjson.sign import (
|
||||
SignatureVerifyException,
|
||||
encode_canonical_json,
|
||||
sign_json,
|
||||
signature_ids,
|
||||
verify_signed_json,
|
||||
)
|
||||
|
@ -540,13 +538,7 @@ class BaseV2KeyFetcher(object):
|
|||
verify_key=verify_key, valid_until_ts=key_data["expired_ts"]
|
||||
)
|
||||
|
||||
# re-sign the json with our own key, so that it is ready if we are asked to
|
||||
# give it out as a notary server
|
||||
signed_key_json = sign_json(
|
||||
response_json, self.config.server_name, self.config.signing_key[0]
|
||||
)
|
||||
|
||||
signed_key_json_bytes = encode_canonical_json(signed_key_json)
|
||||
key_json_bytes = encode_canonical_json(response_json)
|
||||
|
||||
yield make_deferred_yieldable(
|
||||
defer.gatherResults(
|
||||
|
@ -558,7 +550,7 @@ class BaseV2KeyFetcher(object):
|
|||
from_server=from_server,
|
||||
ts_now_ms=time_added_ms,
|
||||
ts_expires_ms=ts_valid_until_ms,
|
||||
key_json_bytes=signed_key_json_bytes,
|
||||
key_json_bytes=key_json_bytes,
|
||||
)
|
||||
for key_id in verify_keys
|
||||
],
|
||||
|
@ -657,9 +649,10 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
|
|||
},
|
||||
)
|
||||
except (NotRetryingDestination, RequestSendFailed) as e:
|
||||
raise_from(KeyLookupError("Failed to connect to remote server"), e)
|
||||
# these both have str() representations which we can't really improve upon
|
||||
raise KeyLookupError(str(e))
|
||||
except HttpResponseException as e:
|
||||
raise_from(KeyLookupError("Remote server returned an error"), e)
|
||||
raise KeyLookupError("Remote server returned an error: %s" % (e,))
|
||||
|
||||
keys = {}
|
||||
added_keys = []
|
||||
|
@ -821,9 +814,11 @@ class ServerKeyFetcher(BaseV2KeyFetcher):
|
|||
timeout=10000,
|
||||
)
|
||||
except (NotRetryingDestination, RequestSendFailed) as e:
|
||||
raise_from(KeyLookupError("Failed to connect to remote server"), e)
|
||||
# these both have str() representations which we can't really improve
|
||||
# upon
|
||||
raise KeyLookupError(str(e))
|
||||
except HttpResponseException as e:
|
||||
raise_from(KeyLookupError("Remote server returned an error"), e)
|
||||
raise KeyLookupError("Remote server returned an error: %s" % (e,))
|
||||
|
||||
if response["server_name"] != server_name:
|
||||
raise KeyLookupError(
|
||||
|
|
|
@ -43,6 +43,7 @@ from synapse.federation.persistence import TransactionActions
|
|||
from synapse.federation.units import Edu, Transaction
|
||||
from synapse.http.endpoint import parse_server_name
|
||||
from synapse.logging.context import nested_logging_context
|
||||
from synapse.logging.opentracing import log_kv, start_active_span_from_edu, trace
|
||||
from synapse.logging.utils import log_function
|
||||
from synapse.replication.http.federation import (
|
||||
ReplicationFederationSendEduRestServlet,
|
||||
|
@ -507,6 +508,7 @@ class FederationServer(FederationBase):
|
|||
def on_query_user_devices(self, origin, user_id):
|
||||
return self.on_query_request("user_devices", user_id)
|
||||
|
||||
@trace
|
||||
@defer.inlineCallbacks
|
||||
@log_function
|
||||
def on_claim_client_keys(self, origin, content):
|
||||
|
@ -515,6 +517,7 @@ class FederationServer(FederationBase):
|
|||
for device_id, algorithm in device_keys.items():
|
||||
query.append((user_id, device_id, algorithm))
|
||||
|
||||
log_kv({"message": "Claiming one time keys.", "user, device pairs": query})
|
||||
results = yield self.store.claim_e2e_one_time_keys(query)
|
||||
|
||||
json_result = {}
|
||||
|
@ -808,6 +811,7 @@ class FederationHandlerRegistry(object):
|
|||
if not handler:
|
||||
logger.warn("No handler registered for EDU type %s", edu_type)
|
||||
|
||||
with start_active_span_from_edu(content, "handle_edu"):
|
||||
try:
|
||||
yield handler(origin, content)
|
||||
except SynapseError as e:
|
||||
|
|
|
@ -14,11 +14,19 @@
|
|||
# limitations under the License.
|
||||
import logging
|
||||
|
||||
from canonicaljson import json
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.errors import HttpResponseException
|
||||
from synapse.federation.persistence import TransactionActions
|
||||
from synapse.federation.units import Transaction
|
||||
from synapse.logging.opentracing import (
|
||||
extract_text_map,
|
||||
set_tag,
|
||||
start_active_span_follows_from,
|
||||
tags,
|
||||
)
|
||||
from synapse.util.metrics import measure_func
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
@ -44,6 +52,19 @@ class TransactionManager(object):
|
|||
@defer.inlineCallbacks
|
||||
def send_new_transaction(self, destination, pending_pdus, pending_edus):
|
||||
|
||||
# Make a transaction-sending opentracing span. This span follows on from
|
||||
# all the edus in that transaction. This needs to be done since there is
|
||||
# no active span here, so if the edus were not received by the remote the
|
||||
# span would have no causality and it would be forgotten.
|
||||
# The span_contexts is a generator so that it won't be evaluated if
|
||||
# opentracing is disabled. (Yay speed!)
|
||||
|
||||
span_contexts = (
|
||||
extract_text_map(json.loads(edu.get_context())) for edu in pending_edus
|
||||
)
|
||||
|
||||
with start_active_span_follows_from("send_transaction", span_contexts):
|
||||
|
||||
# Sort based on the order field
|
||||
pending_pdus.sort(key=lambda t: t[1])
|
||||
pdus = [x[0] for x in pending_pdus]
|
||||
|
@ -108,7 +129,9 @@ class TransactionManager(object):
|
|||
response = e.response
|
||||
|
||||
if e.code in (401, 404, 429) or 500 <= e.code:
|
||||
logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
|
||||
logger.info(
|
||||
"TX [%s] {%s} got %d response", destination, txn_id, code
|
||||
)
|
||||
raise e
|
||||
|
||||
logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
|
||||
|
@ -133,4 +156,5 @@ class TransactionManager(object):
|
|||
)
|
||||
success = False
|
||||
|
||||
set_tag(tags.ERROR, not success)
|
||||
return success
|
||||
|
|
|
@ -38,7 +38,12 @@ from synapse.http.servlet import (
|
|||
parse_string_from_args,
|
||||
)
|
||||
from synapse.logging.context import run_in_background
|
||||
from synapse.logging.opentracing import start_active_span_from_context, tags
|
||||
from synapse.logging.opentracing import (
|
||||
start_active_span,
|
||||
start_active_span_from_request,
|
||||
tags,
|
||||
whitelisted_homeserver,
|
||||
)
|
||||
from synapse.types import ThirdPartyInstanceID, get_domain_from_id
|
||||
from synapse.util.ratelimitutils import FederationRateLimiter
|
||||
from synapse.util.versionstring import get_version_string
|
||||
|
@ -288,11 +293,7 @@ class BaseFederationServlet(object):
|
|||
logger.warn("authenticate_request failed: %s", e)
|
||||
raise
|
||||
|
||||
# Start an opentracing span
|
||||
with start_active_span_from_context(
|
||||
request.requestHeaders,
|
||||
"incoming-federation-request",
|
||||
tags={
|
||||
request_tags = {
|
||||
"request_id": request.get_request_id(),
|
||||
tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER,
|
||||
tags.HTTP_METHOD: request.get_method(),
|
||||
|
@ -300,8 +301,20 @@ class BaseFederationServlet(object):
|
|||
tags.PEER_HOST_IPV6: request.getClientIP(),
|
||||
"authenticated_entity": origin,
|
||||
"servlet_name": request.request_metrics.name,
|
||||
},
|
||||
):
|
||||
}
|
||||
|
||||
# Only accept the span context if the origin is authenticated
|
||||
# and whitelisted
|
||||
if origin and whitelisted_homeserver(origin):
|
||||
scope = start_active_span_from_request(
|
||||
request, "incoming-federation-request", tags=request_tags
|
||||
)
|
||||
else:
|
||||
scope = start_active_span(
|
||||
"incoming-federation-request", tags=request_tags
|
||||
)
|
||||
|
||||
with scope:
|
||||
if origin:
|
||||
with ratelimiter.ratelimit(origin) as d:
|
||||
await d
|
||||
|
|
|
@ -38,6 +38,9 @@ class Edu(JsonEncodedObject):
|
|||
|
||||
internal_keys = ["origin", "destination"]
|
||||
|
||||
def get_context(self):
|
||||
return getattr(self, "content", {}).get("org.matrix.opentracing_context", "{}")
|
||||
|
||||
|
||||
class Transaction(JsonEncodedObject):
|
||||
""" A transaction is a list of Pdus and Edus to be sent to a remote home
|
||||
|
|
|
@ -94,6 +94,16 @@ class AdminHandler(BaseHandler):
|
|||
|
||||
return ret
|
||||
|
||||
def set_user_server_admin(self, user, admin):
|
||||
"""
|
||||
Set the admin bit on a user.
|
||||
|
||||
Args:
|
||||
user_id (UserID): the (necessarily local) user to manipulate
|
||||
admin (bool): whether or not the user should be an admin of this server
|
||||
"""
|
||||
return self.store.set_server_admin(user, admin)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def export_user_data(self, user_id, writer):
|
||||
"""Write all data we have on the user to the given writer.
|
||||
|
|
|
@ -15,9 +15,17 @@
|
|||
|
||||
import logging
|
||||
|
||||
from canonicaljson import json
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.logging.opentracing import (
|
||||
get_active_span_text_map,
|
||||
set_tag,
|
||||
start_active_span,
|
||||
whitelisted_homeserver,
|
||||
)
|
||||
from synapse.types import UserID, get_domain_from_id
|
||||
from synapse.util.stringutils import random_string
|
||||
|
||||
|
@ -100,13 +108,20 @@ class DeviceMessageHandler(object):
|
|||
|
||||
message_id = random_string(16)
|
||||
|
||||
context = get_active_span_text_map()
|
||||
|
||||
remote_edu_contents = {}
|
||||
for destination, messages in remote_messages.items():
|
||||
with start_active_span("to_device_for_user"):
|
||||
set_tag("destination", destination)
|
||||
remote_edu_contents[destination] = {
|
||||
"messages": messages,
|
||||
"sender": sender_user_id,
|
||||
"type": message_type,
|
||||
"message_id": message_id,
|
||||
"org.matrix.opentracing_context": json.dumps(context)
|
||||
if whitelisted_homeserver(destination)
|
||||
else None,
|
||||
}
|
||||
|
||||
stream_id = yield self.store.add_messages_to_device_inbox(
|
||||
|
|
|
@ -24,6 +24,7 @@ from twisted.internet import defer
|
|||
|
||||
from synapse.api.errors import CodeMessageException, SynapseError
|
||||
from synapse.logging.context import make_deferred_yieldable, run_in_background
|
||||
from synapse.logging.opentracing import log_kv, set_tag, tag_args, trace
|
||||
from synapse.types import UserID, get_domain_from_id
|
||||
from synapse.util import unwrapFirstError
|
||||
from synapse.util.retryutils import NotRetryingDestination
|
||||
|
@ -46,6 +47,7 @@ class E2eKeysHandler(object):
|
|||
"client_keys", self.on_federation_query_client_keys
|
||||
)
|
||||
|
||||
@trace
|
||||
@defer.inlineCallbacks
|
||||
def query_devices(self, query_body, timeout):
|
||||
""" Handle a device key query from a client
|
||||
|
@ -81,6 +83,9 @@ class E2eKeysHandler(object):
|
|||
else:
|
||||
remote_queries[user_id] = device_ids
|
||||
|
||||
set_tag("local_key_query", local_query)
|
||||
set_tag("remote_key_query", remote_queries)
|
||||
|
||||
# First get local devices.
|
||||
failures = {}
|
||||
results = {}
|
||||
|
@ -121,6 +126,7 @@ class E2eKeysHandler(object):
|
|||
r[user_id] = remote_queries[user_id]
|
||||
|
||||
# Now fetch any devices that we don't have in our cache
|
||||
@trace
|
||||
@defer.inlineCallbacks
|
||||
def do_remote_query(destination):
|
||||
"""This is called when we are querying the device list of a user on
|
||||
|
@ -185,6 +191,8 @@ class E2eKeysHandler(object):
|
|||
except Exception as e:
|
||||
failure = _exception_to_failure(e)
|
||||
failures[destination] = failure
|
||||
set_tag("error", True)
|
||||
set_tag("reason", failure)
|
||||
|
||||
yield make_deferred_yieldable(
|
||||
defer.gatherResults(
|
||||
|
@ -198,6 +206,7 @@ class E2eKeysHandler(object):
|
|||
|
||||
return {"device_keys": results, "failures": failures}
|
||||
|
||||
@trace
|
||||
@defer.inlineCallbacks
|
||||
def query_local_devices(self, query):
|
||||
"""Get E2E device keys for local users
|
||||
|
@ -210,6 +219,7 @@ class E2eKeysHandler(object):
|
|||
defer.Deferred: (resolves to dict[string, dict[string, dict]]):
|
||||
map from user_id -> device_id -> device details
|
||||
"""
|
||||
set_tag("local_query", query)
|
||||
local_query = []
|
||||
|
||||
result_dict = {}
|
||||
|
@ -217,6 +227,14 @@ class E2eKeysHandler(object):
|
|||
# we use UserID.from_string to catch invalid user ids
|
||||
if not self.is_mine(UserID.from_string(user_id)):
|
||||
logger.warning("Request for keys for non-local user %s", user_id)
|
||||
log_kv(
|
||||
{
|
||||
"message": "Requested a local key for a user which"
|
||||
" was not local to the homeserver",
|
||||
"user_id": user_id,
|
||||
}
|
||||
)
|
||||
set_tag("error", True)
|
||||
raise SynapseError(400, "Not a user here")
|
||||
|
||||
if not device_ids:
|
||||
|
@ -241,6 +259,7 @@ class E2eKeysHandler(object):
|
|||
r["unsigned"]["device_display_name"] = display_name
|
||||
result_dict[user_id][device_id] = r
|
||||
|
||||
log_kv(results)
|
||||
return result_dict
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
@ -251,6 +270,7 @@ class E2eKeysHandler(object):
|
|||
res = yield self.query_local_devices(device_keys_query)
|
||||
return {"device_keys": res}
|
||||
|
||||
@trace
|
||||
@defer.inlineCallbacks
|
||||
def claim_one_time_keys(self, query, timeout):
|
||||
local_query = []
|
||||
|
@ -265,6 +285,9 @@ class E2eKeysHandler(object):
|
|||
domain = get_domain_from_id(user_id)
|
||||
remote_queries.setdefault(domain, {})[user_id] = device_keys
|
||||
|
||||
set_tag("local_key_query", local_query)
|
||||
set_tag("remote_key_query", remote_queries)
|
||||
|
||||
results = yield self.store.claim_e2e_one_time_keys(local_query)
|
||||
|
||||
json_result = {}
|
||||
|
@ -276,8 +299,10 @@ class E2eKeysHandler(object):
|
|||
key_id: json.loads(json_bytes)
|
||||
}
|
||||
|
||||
@trace
|
||||
@defer.inlineCallbacks
|
||||
def claim_client_keys(destination):
|
||||
set_tag("destination", destination)
|
||||
device_keys = remote_queries[destination]
|
||||
try:
|
||||
remote_result = yield self.federation.claim_client_keys(
|
||||
|
@ -290,6 +315,8 @@ class E2eKeysHandler(object):
|
|||
except Exception as e:
|
||||
failure = _exception_to_failure(e)
|
||||
failures[destination] = failure
|
||||
set_tag("error", True)
|
||||
set_tag("reason", failure)
|
||||
|
||||
yield make_deferred_yieldable(
|
||||
defer.gatherResults(
|
||||
|
@ -313,9 +340,11 @@ class E2eKeysHandler(object):
|
|||
),
|
||||
)
|
||||
|
||||
log_kv({"one_time_keys": json_result, "failures": failures})
|
||||
return {"one_time_keys": json_result, "failures": failures}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
@tag_args
|
||||
def upload_keys_for_user(self, user_id, device_id, keys):
|
||||
|
||||
time_now = self.clock.time_msec()
|
||||
|
@ -329,6 +358,13 @@ class E2eKeysHandler(object):
|
|||
user_id,
|
||||
time_now,
|
||||
)
|
||||
log_kv(
|
||||
{
|
||||
"message": "Updating device_keys for user.",
|
||||
"user_id": user_id,
|
||||
"device_id": device_id,
|
||||
}
|
||||
)
|
||||
# TODO: Sign the JSON with the server key
|
||||
changed = yield self.store.set_e2e_device_keys(
|
||||
user_id, device_id, time_now, device_keys
|
||||
|
@ -336,12 +372,24 @@ class E2eKeysHandler(object):
|
|||
if changed:
|
||||
# Only notify about device updates *if* the keys actually changed
|
||||
yield self.device_handler.notify_device_update(user_id, [device_id])
|
||||
|
||||
else:
|
||||
log_kv({"message": "Not updating device_keys for user", "user_id": user_id})
|
||||
one_time_keys = keys.get("one_time_keys", None)
|
||||
if one_time_keys:
|
||||
log_kv(
|
||||
{
|
||||
"message": "Updating one_time_keys for device.",
|
||||
"user_id": user_id,
|
||||
"device_id": device_id,
|
||||
}
|
||||
)
|
||||
yield self._upload_one_time_keys_for_user(
|
||||
user_id, device_id, time_now, one_time_keys
|
||||
)
|
||||
else:
|
||||
log_kv(
|
||||
{"message": "Did not update one_time_keys", "reason": "no keys given"}
|
||||
)
|
||||
|
||||
# the device should have been registered already, but it may have been
|
||||
# deleted due to a race with a DELETE request. Or we may be using an
|
||||
|
@ -352,6 +400,7 @@ class E2eKeysHandler(object):
|
|||
|
||||
result = yield self.store.count_e2e_one_time_keys(user_id, device_id)
|
||||
|
||||
set_tag("one_time_key_counts", result)
|
||||
return {"one_time_key_counts": result}
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
@ -395,6 +444,7 @@ class E2eKeysHandler(object):
|
|||
(algorithm, key_id, encode_canonical_json(key).decode("ascii"))
|
||||
)
|
||||
|
||||
log_kv({"message": "Inserting new one_time_keys.", "keys": new_keys})
|
||||
yield self.store.add_e2e_one_time_keys(user_id, device_id, time_now, new_keys)
|
||||
|
||||
|
||||
|
|
|
@ -26,6 +26,7 @@ from synapse.api.errors import (
|
|||
StoreError,
|
||||
SynapseError,
|
||||
)
|
||||
from synapse.logging.opentracing import log_kv, trace
|
||||
from synapse.util.async_helpers import Linearizer
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
@ -49,6 +50,7 @@ class E2eRoomKeysHandler(object):
|
|||
# changed.
|
||||
self._upload_linearizer = Linearizer("upload_room_keys_lock")
|
||||
|
||||
@trace
|
||||
@defer.inlineCallbacks
|
||||
def get_room_keys(self, user_id, version, room_id=None, session_id=None):
|
||||
"""Bulk get the E2E room keys for a given backup, optionally filtered to a given
|
||||
|
@ -84,8 +86,10 @@ class E2eRoomKeysHandler(object):
|
|||
user_id, version, room_id, session_id
|
||||
)
|
||||
|
||||
log_kv(results)
|
||||
return results
|
||||
|
||||
@trace
|
||||
@defer.inlineCallbacks
|
||||
def delete_room_keys(self, user_id, version, room_id=None, session_id=None):
|
||||
"""Bulk delete the E2E room keys for a given backup, optionally filtered to a given
|
||||
|
@ -107,6 +111,7 @@ class E2eRoomKeysHandler(object):
|
|||
with (yield self._upload_linearizer.queue(user_id)):
|
||||
yield self.store.delete_e2e_room_keys(user_id, version, room_id, session_id)
|
||||
|
||||
@trace
|
||||
@defer.inlineCallbacks
|
||||
def upload_room_keys(self, user_id, version, room_keys):
|
||||
"""Bulk upload a list of room keys into a given backup version, asserting
|
||||
|
@ -186,7 +191,14 @@ class E2eRoomKeysHandler(object):
|
|||
session_id(str): the session whose room_key we're setting
|
||||
room_key(dict): the room_key being set
|
||||
"""
|
||||
|
||||
log_kv(
|
||||
{
|
||||
"message": "Trying to upload room key",
|
||||
"room_id": room_id,
|
||||
"session_id": session_id,
|
||||
"user_id": user_id,
|
||||
}
|
||||
)
|
||||
# get the room_key for this particular row
|
||||
current_room_key = None
|
||||
try:
|
||||
|
@ -195,14 +207,23 @@ class E2eRoomKeysHandler(object):
|
|||
)
|
||||
except StoreError as e:
|
||||
if e.code == 404:
|
||||
pass
|
||||
log_kv(
|
||||
{
|
||||
"message": "Room key not found.",
|
||||
"room_id": room_id,
|
||||
"user_id": user_id,
|
||||
}
|
||||
)
|
||||
else:
|
||||
raise
|
||||
|
||||
if self._should_replace_room_key(current_room_key, room_key):
|
||||
log_kv({"message": "Replacing room key."})
|
||||
yield self.store.set_e2e_room_key(
|
||||
user_id, version, room_id, session_id, room_key
|
||||
)
|
||||
else:
|
||||
log_kv({"message": "Not replacing room_key."})
|
||||
|
||||
@staticmethod
|
||||
def _should_replace_room_key(current_room_key, room_key):
|
||||
|
@ -236,6 +257,7 @@ class E2eRoomKeysHandler(object):
|
|||
return False
|
||||
return True
|
||||
|
||||
@trace
|
||||
@defer.inlineCallbacks
|
||||
def create_version(self, user_id, version_info):
|
||||
"""Create a new backup version. This automatically becomes the new
|
||||
|
@ -294,6 +316,7 @@ class E2eRoomKeysHandler(object):
|
|||
raise
|
||||
return res
|
||||
|
||||
@trace
|
||||
@defer.inlineCallbacks
|
||||
def delete_version(self, user_id, version=None):
|
||||
"""Deletes a given version of the user's e2e_room_keys backup
|
||||
|
@ -314,6 +337,7 @@ class E2eRoomKeysHandler(object):
|
|||
else:
|
||||
raise
|
||||
|
||||
@trace
|
||||
@defer.inlineCallbacks
|
||||
def update_version(self, user_id, version, version_info):
|
||||
"""Update the info about a given version of the user's backup
|
||||
|
|
|
@ -70,6 +70,7 @@ class PaginationHandler(object):
|
|||
self.auth = hs.get_auth()
|
||||
self.store = hs.get_datastore()
|
||||
self.clock = hs.get_clock()
|
||||
self._server_name = hs.hostname
|
||||
|
||||
self.pagination_lock = ReadWriteLock()
|
||||
self._purges_in_progress_by_room = set()
|
||||
|
@ -153,6 +154,22 @@ class PaginationHandler(object):
|
|||
"""
|
||||
return self._purges_by_id.get(purge_id)
|
||||
|
||||
async def purge_room(self, room_id):
|
||||
"""Purge the given room from the database"""
|
||||
with (await self.pagination_lock.write(room_id)):
|
||||
# check we know about the room
|
||||
await self.store.get_room_version(room_id)
|
||||
|
||||
# first check that we have no users in this room
|
||||
joined = await defer.maybeDeferred(
|
||||
self.store.is_host_joined, room_id, self._server_name
|
||||
)
|
||||
|
||||
if joined:
|
||||
raise SynapseError(400, "Users are still joined to this room")
|
||||
|
||||
await self.store.purge_room(room_id)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def get_messages(
|
||||
self,
|
||||
|
|
|
@ -34,7 +34,7 @@ from ._base import BaseHandler
|
|||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
MAX_DISPLAYNAME_LEN = 100
|
||||
MAX_DISPLAYNAME_LEN = 256
|
||||
MAX_AVATAR_URL_LEN = 1000
|
||||
|
||||
|
||||
|
|
|
@ -786,9 +786,8 @@ class SyncHandler(object):
|
|||
batch.events[0].event_id, state_filter=state_filter
|
||||
)
|
||||
else:
|
||||
# Its not clear how we get here, but empirically we do
|
||||
# (#5407). Logging has been added elsewhere to try and
|
||||
# figure out where this state comes from.
|
||||
# We can get here if the user has ignored the senders of all
|
||||
# the recent events.
|
||||
state_at_timeline_start = yield self.get_state_at(
|
||||
room_id, stream_position=now_token, state_filter=state_filter
|
||||
)
|
||||
|
@ -1771,20 +1770,9 @@ class SyncHandler(object):
|
|||
newly_joined_room=newly_joined,
|
||||
)
|
||||
|
||||
if not batch and batch.limited:
|
||||
# This resulted in #5407, which is weird, so lets log! We do it
|
||||
# here as we have the maximum amount of information.
|
||||
user_id = sync_result_builder.sync_config.user.to_string()
|
||||
logger.info(
|
||||
"Issue #5407: Found limited batch with no events. user %s, room %s,"
|
||||
" sync_config %s, newly_joined %s, events %s, batch %s.",
|
||||
user_id,
|
||||
room_id,
|
||||
sync_config,
|
||||
newly_joined,
|
||||
events,
|
||||
batch,
|
||||
)
|
||||
# Note: `batch` can be both empty and limited here in the case where
|
||||
# `_load_filtered_recents` can't find any events the user should see
|
||||
# (e.g. due to having ignored the sender of the last 50 events).
|
||||
|
||||
if newly_joined:
|
||||
# debug for https://github.com/matrix-org/synapse/issues/4422
|
||||
|
|
|
@ -51,9 +51,9 @@ class MatrixFederationAgent(object):
|
|||
SRVResolver impl to use for looking up SRV records. None to use a default
|
||||
implementation.
|
||||
|
||||
_well_known_cache (TTLCache|None):
|
||||
TTLCache impl for storing cached well-known lookups. None to use a default
|
||||
implementation.
|
||||
_well_known_resolver (WellKnownResolver|None):
|
||||
WellKnownResolver to use to perform well-known lookups. None to use a
|
||||
default implementation.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
|
@ -61,7 +61,7 @@ class MatrixFederationAgent(object):
|
|||
reactor,
|
||||
tls_client_options_factory,
|
||||
_srv_resolver=None,
|
||||
_well_known_cache=None,
|
||||
_well_known_resolver=None,
|
||||
):
|
||||
self._reactor = reactor
|
||||
self._clock = Clock(reactor)
|
||||
|
@ -76,16 +76,18 @@ class MatrixFederationAgent(object):
|
|||
self._pool.maxPersistentPerHost = 5
|
||||
self._pool.cachedConnectionTimeout = 2 * 60
|
||||
|
||||
self._well_known_resolver = WellKnownResolver(
|
||||
if _well_known_resolver is None:
|
||||
_well_known_resolver = WellKnownResolver(
|
||||
self._reactor,
|
||||
agent=Agent(
|
||||
self._reactor,
|
||||
pool=self._pool,
|
||||
contextFactory=tls_client_options_factory,
|
||||
),
|
||||
well_known_cache=_well_known_cache,
|
||||
)
|
||||
|
||||
self._well_known_resolver = _well_known_resolver
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def request(self, method, uri, headers=None, bodyProducer=None):
|
||||
"""
|
||||
|
|
|
@ -32,12 +32,19 @@ from synapse.util.metrics import Measure
|
|||
# period to cache .well-known results for by default
|
||||
WELL_KNOWN_DEFAULT_CACHE_PERIOD = 24 * 3600
|
||||
|
||||
# jitter to add to the .well-known default cache ttl
|
||||
WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER = 10 * 60
|
||||
# jitter factor to add to the .well-known default cache ttls
|
||||
WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER = 0.1
|
||||
|
||||
# period to cache failure to fetch .well-known for
|
||||
WELL_KNOWN_INVALID_CACHE_PERIOD = 1 * 3600
|
||||
|
||||
# period to cache failure to fetch .well-known if there has recently been a
|
||||
# valid well-known for that domain.
|
||||
WELL_KNOWN_DOWN_CACHE_PERIOD = 2 * 60
|
||||
|
||||
# period to remember there was a valid well-known after valid record expires
|
||||
WELL_KNOWN_REMEMBER_DOMAIN_HAD_VALID = 2 * 3600
|
||||
|
||||
# cap for .well-known cache period
|
||||
WELL_KNOWN_MAX_CACHE_PERIOD = 48 * 3600
|
||||
|
||||
|
@ -49,11 +56,16 @@ WELL_KNOWN_MIN_CACHE_PERIOD = 5 * 60
|
|||
# we'll start trying to refetch 1 minute before it expires.
|
||||
WELL_KNOWN_GRACE_PERIOD_FACTOR = 0.2
|
||||
|
||||
# Number of times we retry fetching a well-known for a domain we know recently
|
||||
# had a valid entry.
|
||||
WELL_KNOWN_RETRY_ATTEMPTS = 3
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
_well_known_cache = TTLCache("well-known")
|
||||
_had_valid_well_known_cache = TTLCache("had-valid-well-known")
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True)
|
||||
|
@ -65,14 +77,20 @@ class WellKnownResolver(object):
|
|||
"""Handles well-known lookups for matrix servers.
|
||||
"""
|
||||
|
||||
def __init__(self, reactor, agent, well_known_cache=None):
|
||||
def __init__(
|
||||
self, reactor, agent, well_known_cache=None, had_well_known_cache=None
|
||||
):
|
||||
self._reactor = reactor
|
||||
self._clock = Clock(reactor)
|
||||
|
||||
if well_known_cache is None:
|
||||
well_known_cache = _well_known_cache
|
||||
|
||||
if had_well_known_cache is None:
|
||||
had_well_known_cache = _had_valid_well_known_cache
|
||||
|
||||
self._well_known_cache = well_known_cache
|
||||
self._had_valid_well_known_cache = had_well_known_cache
|
||||
self._well_known_agent = RedirectAgent(agent)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
@ -100,7 +118,7 @@ class WellKnownResolver(object):
|
|||
# requests for the same server in parallel?
|
||||
try:
|
||||
with Measure(self._clock, "get_well_known"):
|
||||
result, cache_period = yield self._do_get_well_known(server_name)
|
||||
result, cache_period = yield self._fetch_well_known(server_name)
|
||||
|
||||
except _FetchWellKnownFailure as e:
|
||||
if prev_result and e.temporary:
|
||||
|
@ -111,10 +129,18 @@ class WellKnownResolver(object):
|
|||
|
||||
result = None
|
||||
|
||||
# add some randomness to the TTL to avoid a stampeding herd every hour
|
||||
# after startup
|
||||
if self._had_valid_well_known_cache.get(server_name, False):
|
||||
# We have recently seen a valid well-known record for this
|
||||
# server, so we cache the lack of well-known for a shorter time.
|
||||
cache_period = WELL_KNOWN_DOWN_CACHE_PERIOD
|
||||
else:
|
||||
cache_period = WELL_KNOWN_INVALID_CACHE_PERIOD
|
||||
cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER)
|
||||
|
||||
# add some randomness to the TTL to avoid a stampeding herd
|
||||
cache_period *= random.uniform(
|
||||
1 - WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER,
|
||||
1 + WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER,
|
||||
)
|
||||
|
||||
if cache_period > 0:
|
||||
self._well_known_cache.set(server_name, result, cache_period)
|
||||
|
@ -122,7 +148,7 @@ class WellKnownResolver(object):
|
|||
return WellKnownLookupResult(delegated_server=result)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _do_get_well_known(self, server_name):
|
||||
def _fetch_well_known(self, server_name):
|
||||
"""Actually fetch and parse a .well-known, without checking the cache
|
||||
|
||||
Args:
|
||||
|
@ -134,24 +160,15 @@ class WellKnownResolver(object):
|
|||
Returns:
|
||||
Deferred[Tuple[bytes,int]]: The lookup result and cache period.
|
||||
"""
|
||||
uri = b"https://%s/.well-known/matrix/server" % (server_name,)
|
||||
uri_str = uri.decode("ascii")
|
||||
logger.info("Fetching %s", uri_str)
|
||||
|
||||
had_valid_well_known = self._had_valid_well_known_cache.get(server_name, False)
|
||||
|
||||
# We do this in two steps to differentiate between possibly transient
|
||||
# errors (e.g. can't connect to host, 503 response) and more permenant
|
||||
# errors (such as getting a 404 response).
|
||||
try:
|
||||
response = yield make_deferred_yieldable(
|
||||
self._well_known_agent.request(b"GET", uri)
|
||||
response, body = yield self._make_well_known_request(
|
||||
server_name, retry=had_valid_well_known
|
||||
)
|
||||
body = yield make_deferred_yieldable(readBody(response))
|
||||
|
||||
if 500 <= response.code < 600:
|
||||
raise Exception("Non-200 response %s" % (response.code,))
|
||||
except Exception as e:
|
||||
logger.info("Error fetching %s: %s", uri_str, e)
|
||||
raise _FetchWellKnownFailure(temporary=True)
|
||||
|
||||
try:
|
||||
if response.code != 200:
|
||||
|
@ -161,8 +178,11 @@ class WellKnownResolver(object):
|
|||
logger.info("Response from .well-known: %s", parsed_body)
|
||||
|
||||
result = parsed_body["m.server"].encode("ascii")
|
||||
except defer.CancelledError:
|
||||
# Bail if we've been cancelled
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.info("Error fetching %s: %s", uri_str, e)
|
||||
logger.info("Error parsing well-known for %s: %s", server_name, e)
|
||||
raise _FetchWellKnownFailure(temporary=False)
|
||||
|
||||
cache_period = _cache_period_from_headers(
|
||||
|
@ -172,13 +192,69 @@ class WellKnownResolver(object):
|
|||
cache_period = WELL_KNOWN_DEFAULT_CACHE_PERIOD
|
||||
# add some randomness to the TTL to avoid a stampeding herd every 24 hours
|
||||
# after startup
|
||||
cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER)
|
||||
cache_period *= random.uniform(
|
||||
1 - WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER,
|
||||
1 + WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER,
|
||||
)
|
||||
else:
|
||||
cache_period = min(cache_period, WELL_KNOWN_MAX_CACHE_PERIOD)
|
||||
cache_period = max(cache_period, WELL_KNOWN_MIN_CACHE_PERIOD)
|
||||
|
||||
# We got a success, mark as such in the cache
|
||||
self._had_valid_well_known_cache.set(
|
||||
server_name,
|
||||
bool(result),
|
||||
cache_period + WELL_KNOWN_REMEMBER_DOMAIN_HAD_VALID,
|
||||
)
|
||||
|
||||
return (result, cache_period)
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _make_well_known_request(self, server_name, retry):
|
||||
"""Make the well known request.
|
||||
|
||||
This will retry the request if requested and it fails (with unable
|
||||
to connect or receives a 5xx error).
|
||||
|
||||
Args:
|
||||
server_name (bytes)
|
||||
retry (bool): Whether to retry the request if it fails.
|
||||
|
||||
Returns:
|
||||
Deferred[tuple[IResponse, bytes]] Returns the response object and
|
||||
body. Response may be a non-200 response.
|
||||
"""
|
||||
uri = b"https://%s/.well-known/matrix/server" % (server_name,)
|
||||
uri_str = uri.decode("ascii")
|
||||
|
||||
i = 0
|
||||
while True:
|
||||
i += 1
|
||||
|
||||
logger.info("Fetching %s", uri_str)
|
||||
try:
|
||||
response = yield make_deferred_yieldable(
|
||||
self._well_known_agent.request(b"GET", uri)
|
||||
)
|
||||
body = yield make_deferred_yieldable(readBody(response))
|
||||
|
||||
if 500 <= response.code < 600:
|
||||
raise Exception("Non-200 response %s" % (response.code,))
|
||||
|
||||
return response, body
|
||||
except defer.CancelledError:
|
||||
# Bail if we've been cancelled
|
||||
raise
|
||||
except Exception as e:
|
||||
if not retry or i >= WELL_KNOWN_RETRY_ATTEMPTS:
|
||||
logger.info("Error fetching %s: %s", uri_str, e)
|
||||
raise _FetchWellKnownFailure(temporary=True)
|
||||
|
||||
logger.info("Error fetching %s: %s. Retrying", uri_str, e)
|
||||
|
||||
# Sleep briefly in the hopes that they come back up
|
||||
yield self._clock.sleep(0.5)
|
||||
|
||||
|
||||
def _cache_period_from_headers(headers, time_now=time.time):
|
||||
cache_controls = _parse_cache_control(headers)
|
||||
|
|
|
@ -300,7 +300,7 @@ class RestServlet(object):
|
|||
http_server.register_paths(
|
||||
method,
|
||||
patterns,
|
||||
trace_servlet(servlet_classname, method_handler),
|
||||
trace_servlet(servlet_classname)(method_handler),
|
||||
servlet_classname,
|
||||
)
|
||||
|
||||
|
|
|
@ -43,6 +43,9 @@ OpenTracing to be easily disabled in Synapse and thereby have OpenTracing as
|
|||
an optional dependency. This does however limit the number of modifiable spans
|
||||
at any point in the code to one. From here out references to `opentracing`
|
||||
in the code snippets refer to the Synapses module.
|
||||
Most methods provided in the module have a direct correlation to those provided
|
||||
by opentracing. Refer to docs there for a more in-depth documentation on some of
|
||||
the args and methods.
|
||||
|
||||
Tracing
|
||||
-------
|
||||
|
@ -68,52 +71,62 @@ set a tag on the current active span.
|
|||
Tracing functions
|
||||
-----------------
|
||||
|
||||
Functions can be easily traced using decorators. There is a decorator for
|
||||
'normal' function and for functions which are actually deferreds. The name of
|
||||
Functions can be easily traced using decorators. The name of
|
||||
the function becomes the operation name for the span.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
from synapse.logging.opentracing import trace, trace_deferred
|
||||
from synapse.logging.opentracing import trace
|
||||
|
||||
# Start a span using 'normal_function' as the operation name
|
||||
# Start a span using 'interesting_function' as the operation name
|
||||
@trace
|
||||
def normal_function(*args, **kwargs):
|
||||
def interesting_function(*args, **kwargs):
|
||||
# Does all kinds of cool and expected things
|
||||
return something_usual_and_useful
|
||||
|
||||
# Start a span using 'deferred_function' as the operation name
|
||||
@trace_deferred
|
||||
@defer.inlineCallbacks
|
||||
def deferred_function(*args, **kwargs):
|
||||
# We start
|
||||
yield we_wait
|
||||
# we finish
|
||||
return something_usual_and_useful
|
||||
|
||||
Operation names can be explicitly set for functions by using
|
||||
``trace_using_operation_name`` and
|
||||
``trace_deferred_using_operation_name``
|
||||
``trace_using_operation_name``
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
from synapse.logging.opentracing import (
|
||||
trace_using_operation_name,
|
||||
trace_deferred_using_operation_name
|
||||
)
|
||||
from synapse.logging.opentracing import trace_using_operation_name
|
||||
|
||||
@trace_using_operation_name("A *much* better operation name")
|
||||
def normal_function(*args, **kwargs):
|
||||
def interesting_badly_named_function(*args, **kwargs):
|
||||
# Does all kinds of cool and expected things
|
||||
return something_usual_and_useful
|
||||
|
||||
@trace_deferred_using_operation_name("Another exciting operation name!")
|
||||
@defer.inlineCallbacks
|
||||
def deferred_function(*args, **kwargs):
|
||||
# We start
|
||||
yield we_wait
|
||||
# we finish
|
||||
return something_usual_and_useful
|
||||
Setting Tags
|
||||
------------
|
||||
|
||||
To set a tag on the active span do
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
from synapse.logging.opentracing import set_tag
|
||||
|
||||
set_tag(tag_name, tag_value)
|
||||
|
||||
There's a convenient decorator to tag all the args of the method. It uses
|
||||
inspection in order to use the formal parameter names prefixed with 'ARG_' as
|
||||
tag names. It uses kwarg names as tag names without the prefix.
|
||||
|
||||
.. code-block:: python
|
||||
|
||||
from synapse.logging.opentracing import tag_args
|
||||
|
||||
@tag_args
|
||||
def set_fates(clotho, lachesis, atropos, father="Zues", mother="Themis"):
|
||||
pass
|
||||
|
||||
set_fates("the story", "the end", "the act")
|
||||
# This will have the following tags
|
||||
# - ARG_clotho: "the story"
|
||||
# - ARG_lachesis: "the end"
|
||||
# - ARG_atropos: "the act"
|
||||
# - father: "Zues"
|
||||
# - mother: "Themis"
|
||||
|
||||
Contexts and carriers
|
||||
---------------------
|
||||
|
@ -136,6 +149,9 @@ unchartered waters will require the enforcement of the whitelist.
|
|||
``logging/opentracing.py`` has a ``whitelisted_homeserver`` method which takes
|
||||
in a destination and compares it to the whitelist.
|
||||
|
||||
Most injection methods take a 'destination' arg. The context will only be injected
|
||||
if the destination matches the whitelist or the destination is None.
|
||||
|
||||
=======
|
||||
Gotchas
|
||||
=======
|
||||
|
@ -161,10 +177,48 @@ from twisted.internet import defer
|
|||
|
||||
from synapse.config import ConfigError
|
||||
|
||||
# Helper class
|
||||
|
||||
|
||||
class _DummyTagNames(object):
|
||||
"""wrapper of opentracings tags. We need to have them if we
|
||||
want to reference them without opentracing around. Clearly they
|
||||
should never actually show up in a trace. `set_tags` overwrites
|
||||
these with the correct ones."""
|
||||
|
||||
INVALID_TAG = "invalid-tag"
|
||||
COMPONENT = INVALID_TAG
|
||||
DATABASE_INSTANCE = INVALID_TAG
|
||||
DATABASE_STATEMENT = INVALID_TAG
|
||||
DATABASE_TYPE = INVALID_TAG
|
||||
DATABASE_USER = INVALID_TAG
|
||||
ERROR = INVALID_TAG
|
||||
HTTP_METHOD = INVALID_TAG
|
||||
HTTP_STATUS_CODE = INVALID_TAG
|
||||
HTTP_URL = INVALID_TAG
|
||||
MESSAGE_BUS_DESTINATION = INVALID_TAG
|
||||
PEER_ADDRESS = INVALID_TAG
|
||||
PEER_HOSTNAME = INVALID_TAG
|
||||
PEER_HOST_IPV4 = INVALID_TAG
|
||||
PEER_HOST_IPV6 = INVALID_TAG
|
||||
PEER_PORT = INVALID_TAG
|
||||
PEER_SERVICE = INVALID_TAG
|
||||
SAMPLING_PRIORITY = INVALID_TAG
|
||||
SERVICE = INVALID_TAG
|
||||
SPAN_KIND = INVALID_TAG
|
||||
SPAN_KIND_CONSUMER = INVALID_TAG
|
||||
SPAN_KIND_PRODUCER = INVALID_TAG
|
||||
SPAN_KIND_RPC_CLIENT = INVALID_TAG
|
||||
SPAN_KIND_RPC_SERVER = INVALID_TAG
|
||||
|
||||
|
||||
try:
|
||||
import opentracing
|
||||
|
||||
tags = opentracing.tags
|
||||
except ImportError:
|
||||
opentracing = None
|
||||
tags = _DummyTagNames
|
||||
try:
|
||||
from jaeger_client import Config as JaegerConfig
|
||||
from synapse.logging.scopecontextmanager import LogContextScopeManager
|
||||
|
@ -239,10 +293,6 @@ def init_tracer(config):
|
|||
scope_manager=LogContextScopeManager(config),
|
||||
).initialize_tracer()
|
||||
|
||||
# Set up tags to be opentracing's tags
|
||||
global tags
|
||||
tags = opentracing.tags
|
||||
|
||||
|
||||
# Whitelisting
|
||||
|
||||
|
@ -321,8 +371,8 @@ def start_active_span_follows_from(operation_name, contexts):
|
|||
return scope
|
||||
|
||||
|
||||
def start_active_span_from_context(
|
||||
headers,
|
||||
def start_active_span_from_request(
|
||||
request,
|
||||
operation_name,
|
||||
references=None,
|
||||
tags=None,
|
||||
|
@ -331,9 +381,9 @@ def start_active_span_from_context(
|
|||
finish_on_close=True,
|
||||
):
|
||||
"""
|
||||
Extracts a span context from Twisted Headers.
|
||||
Extracts a span context from a Twisted Request.
|
||||
args:
|
||||
headers (twisted.web.http_headers.Headers)
|
||||
headers (twisted.web.http.Request)
|
||||
|
||||
For the other args see opentracing.tracer
|
||||
|
||||
|
@ -347,7 +397,9 @@ def start_active_span_from_context(
|
|||
if opentracing is None:
|
||||
return _noop_context_manager()
|
||||
|
||||
header_dict = {k.decode(): v[0].decode() for k, v in headers.getAllRawHeaders()}
|
||||
header_dict = {
|
||||
k.decode(): v[0].decode() for k, v in request.requestHeaders.getAllRawHeaders()
|
||||
}
|
||||
context = opentracing.tracer.extract(opentracing.Format.HTTP_HEADERS, header_dict)
|
||||
|
||||
return opentracing.tracer.start_active_span(
|
||||
|
@ -435,7 +487,7 @@ def set_operation_name(operation_name):
|
|||
|
||||
|
||||
@only_if_tracing
|
||||
def inject_active_span_twisted_headers(headers, destination):
|
||||
def inject_active_span_twisted_headers(headers, destination, check_destination=True):
|
||||
"""
|
||||
Injects a span context into twisted headers in-place
|
||||
|
||||
|
@ -454,7 +506,7 @@ def inject_active_span_twisted_headers(headers, destination):
|
|||
https://github.com/jaegertracing/jaeger-client-python/blob/master/jaeger_client/constants.py
|
||||
"""
|
||||
|
||||
if not whitelisted_homeserver(destination):
|
||||
if check_destination and not whitelisted_homeserver(destination):
|
||||
return
|
||||
|
||||
span = opentracing.tracer.active_span
|
||||
|
@ -466,7 +518,7 @@ def inject_active_span_twisted_headers(headers, destination):
|
|||
|
||||
|
||||
@only_if_tracing
|
||||
def inject_active_span_byte_dict(headers, destination):
|
||||
def inject_active_span_byte_dict(headers, destination, check_destination=True):
|
||||
"""
|
||||
Injects a span context into a dict where the headers are encoded as byte
|
||||
strings
|
||||
|
@ -498,7 +550,7 @@ def inject_active_span_byte_dict(headers, destination):
|
|||
|
||||
|
||||
@only_if_tracing
|
||||
def inject_active_span_text_map(carrier, destination=None):
|
||||
def inject_active_span_text_map(carrier, destination, check_destination=True):
|
||||
"""
|
||||
Injects a span context into a dict
|
||||
|
||||
|
@ -519,7 +571,7 @@ def inject_active_span_text_map(carrier, destination=None):
|
|||
https://github.com/jaegertracing/jaeger-client-python/blob/master/jaeger_client/constants.py
|
||||
"""
|
||||
|
||||
if destination and not whitelisted_homeserver(destination):
|
||||
if check_destination and not whitelisted_homeserver(destination):
|
||||
return
|
||||
|
||||
opentracing.tracer.inject(
|
||||
|
@ -527,6 +579,29 @@ def inject_active_span_text_map(carrier, destination=None):
|
|||
)
|
||||
|
||||
|
||||
def get_active_span_text_map(destination=None):
|
||||
"""
|
||||
Gets a span context as a dict. This can be used instead of manually
|
||||
injecting a span into an empty carrier.
|
||||
|
||||
Args:
|
||||
destination (str): the name of the remote server.
|
||||
|
||||
Returns:
|
||||
dict: the active span's context if opentracing is enabled, otherwise empty.
|
||||
"""
|
||||
|
||||
if not opentracing or (destination and not whitelisted_homeserver(destination)):
|
||||
return {}
|
||||
|
||||
carrier = {}
|
||||
opentracing.tracer.inject(
|
||||
opentracing.tracer.active_span, opentracing.Format.TEXT_MAP, carrier
|
||||
)
|
||||
|
||||
return carrier
|
||||
|
||||
|
||||
def active_span_context_as_string():
|
||||
"""
|
||||
Returns:
|
||||
|
@ -676,65 +751,43 @@ def tag_args(func):
|
|||
return _tag_args_inner
|
||||
|
||||
|
||||
def trace_servlet(servlet_name, func):
|
||||
def trace_servlet(servlet_name, extract_context=False):
|
||||
"""Decorator which traces a serlet. It starts a span with some servlet specific
|
||||
tags such as the servlet_name and request information"""
|
||||
tags such as the servlet_name and request information
|
||||
|
||||
Args:
|
||||
servlet_name (str): The name to be used for the span's operation_name
|
||||
extract_context (bool): Whether to attempt to extract the opentracing
|
||||
context from the request the servlet is handling.
|
||||
|
||||
"""
|
||||
|
||||
def _trace_servlet_inner_1(func):
|
||||
if not opentracing:
|
||||
return func
|
||||
|
||||
@wraps(func)
|
||||
@defer.inlineCallbacks
|
||||
def _trace_servlet_inner(request, *args, **kwargs):
|
||||
with start_active_span(
|
||||
"incoming-client-request",
|
||||
tags={
|
||||
request_tags = {
|
||||
"request_id": request.get_request_id(),
|
||||
tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER,
|
||||
tags.HTTP_METHOD: request.get_method(),
|
||||
tags.HTTP_URL: request.get_redacted_uri(),
|
||||
tags.PEER_HOST_IPV6: request.getClientIP(),
|
||||
"servlet_name": servlet_name,
|
||||
},
|
||||
):
|
||||
}
|
||||
|
||||
if extract_context:
|
||||
scope = start_active_span_from_request(
|
||||
request, servlet_name, tags=request_tags
|
||||
)
|
||||
else:
|
||||
scope = start_active_span(servlet_name, tags=request_tags)
|
||||
|
||||
with scope:
|
||||
result = yield defer.maybeDeferred(func, request, *args, **kwargs)
|
||||
return result
|
||||
|
||||
return _trace_servlet_inner
|
||||
|
||||
|
||||
# Helper class
|
||||
|
||||
|
||||
class _DummyTagNames(object):
|
||||
"""wrapper of opentracings tags. We need to have them if we
|
||||
want to reference them without opentracing around. Clearly they
|
||||
should never actually show up in a trace. `set_tags` overwrites
|
||||
these with the correct ones."""
|
||||
|
||||
INVALID_TAG = "invalid-tag"
|
||||
COMPONENT = INVALID_TAG
|
||||
DATABASE_INSTANCE = INVALID_TAG
|
||||
DATABASE_STATEMENT = INVALID_TAG
|
||||
DATABASE_TYPE = INVALID_TAG
|
||||
DATABASE_USER = INVALID_TAG
|
||||
ERROR = INVALID_TAG
|
||||
HTTP_METHOD = INVALID_TAG
|
||||
HTTP_STATUS_CODE = INVALID_TAG
|
||||
HTTP_URL = INVALID_TAG
|
||||
MESSAGE_BUS_DESTINATION = INVALID_TAG
|
||||
PEER_ADDRESS = INVALID_TAG
|
||||
PEER_HOSTNAME = INVALID_TAG
|
||||
PEER_HOST_IPV4 = INVALID_TAG
|
||||
PEER_HOST_IPV6 = INVALID_TAG
|
||||
PEER_PORT = INVALID_TAG
|
||||
PEER_SERVICE = INVALID_TAG
|
||||
SAMPLING_PRIORITY = INVALID_TAG
|
||||
SERVICE = INVALID_TAG
|
||||
SPAN_KIND = INVALID_TAG
|
||||
SPAN_KIND_CONSUMER = INVALID_TAG
|
||||
SPAN_KIND_PRODUCER = INVALID_TAG
|
||||
SPAN_KIND_RPC_CLIENT = INVALID_TAG
|
||||
SPAN_KIND_RPC_SERVER = INVALID_TAG
|
||||
|
||||
|
||||
tags = _DummyTagNames
|
||||
return _trace_servlet_inner_1
|
||||
|
|
|
@ -22,6 +22,7 @@ from six.moves import urllib
|
|||
|
||||
from twisted.internet import defer
|
||||
|
||||
import synapse.logging.opentracing as opentracing
|
||||
from synapse.api.errors import (
|
||||
CodeMessageException,
|
||||
HttpResponseException,
|
||||
|
@ -165,8 +166,12 @@ class ReplicationEndpoint(object):
|
|||
# have a good idea that the request has either succeeded or failed on
|
||||
# the master, and so whether we should clean up or not.
|
||||
while True:
|
||||
headers = {}
|
||||
opentracing.inject_active_span_byte_dict(
|
||||
headers, None, check_destination=False
|
||||
)
|
||||
try:
|
||||
result = yield request_func(uri, data)
|
||||
result = yield request_func(uri, data, headers=headers)
|
||||
break
|
||||
except CodeMessageException as e:
|
||||
if e.code != 504 or not cls.RETRY_ON_TIMEOUT:
|
||||
|
@ -205,7 +210,14 @@ class ReplicationEndpoint(object):
|
|||
args = "/".join("(?P<%s>[^/]+)" % (arg,) for arg in url_args)
|
||||
pattern = re.compile("^/_synapse/replication/%s/%s$" % (self.NAME, args))
|
||||
|
||||
http_server.register_paths(method, [pattern], handler, self.__class__.__name__)
|
||||
http_server.register_paths(
|
||||
method,
|
||||
[pattern],
|
||||
opentracing.trace_servlet(self.__class__.__name__, extract_context=True)(
|
||||
handler
|
||||
),
|
||||
self.__class__.__name__,
|
||||
)
|
||||
|
||||
def _cached_handler(self, request, txn_id, **kwargs):
|
||||
"""Called on new incoming requests when caching is enabled. Checks
|
||||
|
|
|
@ -42,7 +42,9 @@ from synapse.rest.admin._base import (
|
|||
historical_admin_path_patterns,
|
||||
)
|
||||
from synapse.rest.admin.media import register_servlets_for_media_repo
|
||||
from synapse.rest.admin.purge_room_servlet import PurgeRoomServlet
|
||||
from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet
|
||||
from synapse.rest.admin.users import UserAdminServlet
|
||||
from synapse.types import UserID, create_requester
|
||||
from synapse.util.versionstring import get_version_string
|
||||
|
||||
|
@ -738,8 +740,10 @@ def register_servlets(hs, http_server):
|
|||
Register all the admin servlets.
|
||||
"""
|
||||
register_servlets_for_client_rest_resource(hs, http_server)
|
||||
PurgeRoomServlet(hs).register(http_server)
|
||||
SendServerNoticeServlet(hs).register(http_server)
|
||||
VersionServlet(hs).register(http_server)
|
||||
UserAdminServlet(hs).register(http_server)
|
||||
|
||||
|
||||
def register_servlets_for_client_rest_resource(hs, http_server):
|
||||
|
|
57
synapse/rest/admin/purge_room_servlet.py
Normal file
57
synapse/rest/admin/purge_room_servlet.py
Normal file
|
@ -0,0 +1,57 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import re
|
||||
|
||||
from synapse.http.servlet import (
|
||||
RestServlet,
|
||||
assert_params_in_dict,
|
||||
parse_json_object_from_request,
|
||||
)
|
||||
from synapse.rest.admin import assert_requester_is_admin
|
||||
|
||||
|
||||
class PurgeRoomServlet(RestServlet):
|
||||
"""Servlet which will remove all trace of a room from the database
|
||||
|
||||
POST /_synapse/admin/v1/purge_room
|
||||
{
|
||||
"room_id": "!room:id"
|
||||
}
|
||||
|
||||
returns:
|
||||
|
||||
{}
|
||||
"""
|
||||
|
||||
PATTERNS = (re.compile("^/_synapse/admin/v1/purge_room$"),)
|
||||
|
||||
def __init__(self, hs):
|
||||
"""
|
||||
Args:
|
||||
hs (synapse.server.HomeServer): server
|
||||
"""
|
||||
self.hs = hs
|
||||
self.auth = hs.get_auth()
|
||||
self.pagination_handler = hs.get_pagination_handler()
|
||||
|
||||
async def on_POST(self, request):
|
||||
await assert_requester_is_admin(self.auth, request)
|
||||
|
||||
body = parse_json_object_from_request(request)
|
||||
assert_params_in_dict(body, ("room_id",))
|
||||
|
||||
await self.pagination_handler.purge_room(body["room_id"])
|
||||
|
||||
return (200, {})
|
76
synapse/rest/admin/users.py
Normal file
76
synapse/rest/admin/users.py
Normal file
|
@ -0,0 +1,76 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import re
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.errors import SynapseError
|
||||
from synapse.http.servlet import (
|
||||
RestServlet,
|
||||
assert_params_in_dict,
|
||||
parse_json_object_from_request,
|
||||
)
|
||||
from synapse.rest.admin import assert_requester_is_admin
|
||||
from synapse.types import UserID
|
||||
|
||||
|
||||
class UserAdminServlet(RestServlet):
|
||||
"""
|
||||
Set whether or not a user is a server administrator.
|
||||
|
||||
Note that only local users can be server administrators, and that an
|
||||
administrator may not demote themselves.
|
||||
|
||||
Only server administrators can use this API.
|
||||
|
||||
Example:
|
||||
PUT /_synapse/admin/v1/users/@reivilibre:librepush.net/admin
|
||||
{
|
||||
"admin": true
|
||||
}
|
||||
"""
|
||||
|
||||
PATTERNS = (re.compile("^/_synapse/admin/v1/users/(?P<user_id>@[^/]*)/admin$"),)
|
||||
|
||||
def __init__(self, hs):
|
||||
self.hs = hs
|
||||
self.auth = hs.get_auth()
|
||||
self.handlers = hs.get_handlers()
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def on_PUT(self, request, user_id):
|
||||
yield assert_requester_is_admin(self.auth, request)
|
||||
requester = yield self.auth.get_user_by_req(request)
|
||||
auth_user = requester.user
|
||||
|
||||
target_user = UserID.from_string(user_id)
|
||||
|
||||
body = parse_json_object_from_request(request)
|
||||
|
||||
assert_params_in_dict(body, ["admin"])
|
||||
|
||||
if not self.hs.is_mine(target_user):
|
||||
raise SynapseError(400, "Only local users can be admins of this homeserver")
|
||||
|
||||
set_admin_to = bool(body["admin"])
|
||||
|
||||
if target_user == auth_user and not set_admin_to:
|
||||
raise SynapseError(400, "You may not demote yourself.")
|
||||
|
||||
yield self.handlers.admin_handler.set_user_server_admin(
|
||||
target_user, set_admin_to
|
||||
)
|
||||
|
||||
return (200, {})
|
|
@ -24,6 +24,7 @@ from synapse.http.servlet import (
|
|||
parse_json_object_from_request,
|
||||
parse_string,
|
||||
)
|
||||
from synapse.logging.opentracing import log_kv, set_tag, trace_using_operation_name
|
||||
from synapse.types import StreamToken
|
||||
|
||||
from ._base import client_patterns
|
||||
|
@ -68,6 +69,7 @@ class KeyUploadServlet(RestServlet):
|
|||
self.auth = hs.get_auth()
|
||||
self.e2e_keys_handler = hs.get_e2e_keys_handler()
|
||||
|
||||
@trace_using_operation_name("upload_keys")
|
||||
@defer.inlineCallbacks
|
||||
def on_POST(self, request, device_id):
|
||||
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
|
||||
|
@ -78,6 +80,14 @@ class KeyUploadServlet(RestServlet):
|
|||
# passing the device_id here is deprecated; however, we allow it
|
||||
# for now for compatibility with older clients.
|
||||
if requester.device_id is not None and device_id != requester.device_id:
|
||||
set_tag("error", True)
|
||||
log_kv(
|
||||
{
|
||||
"message": "Client uploading keys for a different device",
|
||||
"logged_in_id": requester.device_id,
|
||||
"key_being_uploaded": device_id,
|
||||
}
|
||||
)
|
||||
logger.warning(
|
||||
"Client uploading keys for a different device "
|
||||
"(logged in as %s, uploading for %s)",
|
||||
|
@ -178,10 +188,11 @@ class KeyChangesServlet(RestServlet):
|
|||
requester = yield self.auth.get_user_by_req(request, allow_guest=True)
|
||||
|
||||
from_token_string = parse_string(request, "from")
|
||||
set_tag("from", from_token_string)
|
||||
|
||||
# We want to enforce they do pass us one, but we ignore it and return
|
||||
# changes after the "to" as well as before.
|
||||
parse_string(request, "to")
|
||||
set_tag("to", parse_string(request, "to"))
|
||||
|
||||
from_token = StreamToken.from_string(from_token_string)
|
||||
|
||||
|
|
|
@ -16,7 +16,6 @@
|
|||
|
||||
import hmac
|
||||
import logging
|
||||
from hashlib import sha1
|
||||
|
||||
from six import string_types
|
||||
|
||||
|
@ -239,14 +238,12 @@ class RegisterRestServlet(RestServlet):
|
|||
|
||||
# we do basic sanity checks here because the auth layer will store these
|
||||
# in sessions. Pull out the username/password provided to us.
|
||||
desired_password = None
|
||||
if "password" in body:
|
||||
if (
|
||||
not isinstance(body["password"], string_types)
|
||||
or len(body["password"]) > 512
|
||||
):
|
||||
raise SynapseError(400, "Invalid password")
|
||||
desired_password = body["password"]
|
||||
|
||||
desired_username = None
|
||||
if "username" in body:
|
||||
|
@ -261,8 +258,8 @@ class RegisterRestServlet(RestServlet):
|
|||
if self.auth.has_access_token(request):
|
||||
appservice = yield self.auth.get_appservice_by_req(request)
|
||||
|
||||
# fork off as soon as possible for ASes and shared secret auth which
|
||||
# have completely different registration flows to normal users
|
||||
# fork off as soon as possible for ASes which have completely
|
||||
# different registration flows to normal users
|
||||
|
||||
# == Application Service Registration ==
|
||||
if appservice:
|
||||
|
@ -285,8 +282,8 @@ class RegisterRestServlet(RestServlet):
|
|||
return (200, result) # we throw for non 200 responses
|
||||
return
|
||||
|
||||
# for either shared secret or regular registration, downcase the
|
||||
# provided username before attempting to register it. This should mean
|
||||
# for regular registration, downcase the provided username before
|
||||
# attempting to register it. This should mean
|
||||
# that people who try to register with upper-case in their usernames
|
||||
# don't get a nasty surprise. (Note that we treat username
|
||||
# case-insenstively in login, so they are free to carry on imagining
|
||||
|
@ -294,16 +291,6 @@ class RegisterRestServlet(RestServlet):
|
|||
if desired_username is not None:
|
||||
desired_username = desired_username.lower()
|
||||
|
||||
# == Shared Secret Registration == (e.g. create new user scripts)
|
||||
if "mac" in body:
|
||||
# FIXME: Should we really be determining if this is shared secret
|
||||
# auth based purely on the 'mac' key?
|
||||
result = yield self._do_shared_secret_registration(
|
||||
desired_username, desired_password, body
|
||||
)
|
||||
return (200, result) # we throw for non 200 responses
|
||||
return
|
||||
|
||||
# == Normal User Registration == (everyone else)
|
||||
if not self.hs.config.enable_registration:
|
||||
raise SynapseError(403, "Registration has been disabled")
|
||||
|
@ -512,42 +499,6 @@ class RegisterRestServlet(RestServlet):
|
|||
)
|
||||
return (yield self._create_registration_details(user_id, body))
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _do_shared_secret_registration(self, username, password, body):
|
||||
if not self.hs.config.registration_shared_secret:
|
||||
raise SynapseError(400, "Shared secret registration is not enabled")
|
||||
if not username:
|
||||
raise SynapseError(
|
||||
400, "username must be specified", errcode=Codes.BAD_JSON
|
||||
)
|
||||
|
||||
# use the username from the original request rather than the
|
||||
# downcased one in `username` for the mac calculation
|
||||
user = body["username"].encode("utf-8")
|
||||
|
||||
# str() because otherwise hmac complains that 'unicode' does not
|
||||
# have the buffer interface
|
||||
got_mac = str(body["mac"])
|
||||
|
||||
# FIXME this is different to the /v1/register endpoint, which
|
||||
# includes the password and admin flag in the hashed text. Why are
|
||||
# these different?
|
||||
want_mac = hmac.new(
|
||||
key=self.hs.config.registration_shared_secret.encode(),
|
||||
msg=user,
|
||||
digestmod=sha1,
|
||||
).hexdigest()
|
||||
|
||||
if not compare_digest(want_mac, got_mac):
|
||||
raise SynapseError(403, "HMAC incorrect")
|
||||
|
||||
user_id = yield self.registration_handler.register_user(
|
||||
localpart=username, password=password
|
||||
)
|
||||
|
||||
result = yield self._create_registration_details(user_id, body)
|
||||
return result
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def _create_registration_details(self, user_id, params):
|
||||
"""Complete registration of newly-registered user
|
||||
|
|
|
@ -13,7 +13,9 @@
|
|||
# limitations under the License.
|
||||
|
||||
import logging
|
||||
from io import BytesIO
|
||||
|
||||
from canonicaljson import encode_canonical_json, json
|
||||
from signedjson.sign import sign_json
|
||||
|
||||
from twisted.internet import defer
|
||||
|
||||
|
@ -95,6 +97,7 @@ class RemoteKey(DirectServeResource):
|
|||
self.store = hs.get_datastore()
|
||||
self.clock = hs.get_clock()
|
||||
self.federation_domain_whitelist = hs.config.federation_domain_whitelist
|
||||
self.config = hs.config
|
||||
|
||||
@wrap_json_request_handler
|
||||
async def _async_render_GET(self, request):
|
||||
|
@ -214,15 +217,14 @@ class RemoteKey(DirectServeResource):
|
|||
yield self.fetcher.get_keys(cache_misses)
|
||||
yield self.query_keys(request, query, query_remote_on_cache_miss=False)
|
||||
else:
|
||||
result_io = BytesIO()
|
||||
result_io.write(b'{"server_keys":')
|
||||
sep = b"["
|
||||
for json_bytes in json_results:
|
||||
result_io.write(sep)
|
||||
result_io.write(json_bytes)
|
||||
sep = b","
|
||||
if sep == b"[":
|
||||
result_io.write(sep)
|
||||
result_io.write(b"]}")
|
||||
signed_keys = []
|
||||
for key_json in json_results:
|
||||
key_json = json.loads(key_json)
|
||||
for signing_key in self.config.key_server_signing_keys:
|
||||
key_json = sign_json(key_json, self.config.server_name, signing_key)
|
||||
|
||||
respond_with_json_bytes(request, 200, result_io.getvalue())
|
||||
signed_keys.append(key_json)
|
||||
|
||||
results = {"server_keys": signed_keys}
|
||||
|
||||
respond_with_json_bytes(request, 200, encode_canonical_json(results))
|
||||
|
|
|
@ -34,7 +34,7 @@ class WellKnownBuilder(object):
|
|||
self._config = hs.config
|
||||
|
||||
def get_well_known(self):
|
||||
# if we don't have a public_base_url, we can't help much here.
|
||||
# if we don't have a public_baseurl, we can't help much here.
|
||||
if self._config.public_baseurl is None:
|
||||
return None
|
||||
|
||||
|
|
|
@ -21,6 +21,11 @@ from canonicaljson import json
|
|||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.errors import StoreError
|
||||
from synapse.logging.opentracing import (
|
||||
get_active_span_text_map,
|
||||
trace,
|
||||
whitelisted_homeserver,
|
||||
)
|
||||
from synapse.metrics.background_process_metrics import run_as_background_process
|
||||
from synapse.storage._base import Cache, SQLBaseStore, db_to_json
|
||||
from synapse.storage.background_updates import BackgroundUpdateStore
|
||||
|
@ -73,6 +78,7 @@ class DeviceWorkerStore(SQLBaseStore):
|
|||
|
||||
return {d["device_id"]: d for d in devices}
|
||||
|
||||
@trace
|
||||
@defer.inlineCallbacks
|
||||
def get_devices_by_remote(self, destination, from_stream_id, limit):
|
||||
"""Get stream of updates to send to remote servers
|
||||
|
@ -127,8 +133,15 @@ class DeviceWorkerStore(SQLBaseStore):
|
|||
# (user_id, device_id) entries into a map, with the value being
|
||||
# the max stream_id across each set of duplicate entries
|
||||
#
|
||||
# maps (user_id, device_id) -> stream_id
|
||||
# maps (user_id, device_id) -> (stream_id, opentracing_context)
|
||||
# as long as their stream_id does not match that of the last row
|
||||
#
|
||||
# opentracing_context contains the opentracing metadata for the request
|
||||
# that created the poke
|
||||
#
|
||||
# The most recent request's opentracing_context is used as the
|
||||
# context which created the Edu.
|
||||
|
||||
query_map = {}
|
||||
for update in updates:
|
||||
if stream_id_cutoff is not None and update[2] >= stream_id_cutoff:
|
||||
|
@ -136,7 +149,14 @@ class DeviceWorkerStore(SQLBaseStore):
|
|||
break
|
||||
|
||||
key = (update[0], update[1])
|
||||
query_map[key] = max(query_map.get(key, 0), update[2])
|
||||
|
||||
update_context = update[3]
|
||||
update_stream_id = update[2]
|
||||
|
||||
previous_update_stream_id, _ = query_map.get(key, (0, None))
|
||||
|
||||
if update_stream_id > previous_update_stream_id:
|
||||
query_map[key] = (update_stream_id, update_context)
|
||||
|
||||
# If we didn't find any updates with a stream_id lower than the cutoff, it
|
||||
# means that there are more than limit updates all of which have the same
|
||||
|
@ -171,7 +191,7 @@ class DeviceWorkerStore(SQLBaseStore):
|
|||
List: List of device updates
|
||||
"""
|
||||
sql = """
|
||||
SELECT user_id, device_id, stream_id FROM device_lists_outbound_pokes
|
||||
SELECT user_id, device_id, stream_id, opentracing_context FROM device_lists_outbound_pokes
|
||||
WHERE destination = ? AND ? < stream_id AND stream_id <= ? AND sent = ?
|
||||
ORDER BY stream_id
|
||||
LIMIT ?
|
||||
|
@ -187,8 +207,9 @@ class DeviceWorkerStore(SQLBaseStore):
|
|||
Args:
|
||||
destination (str): The host the device updates are intended for
|
||||
from_stream_id (int): The minimum stream_id to filter updates by, exclusive
|
||||
query_map (Dict[(str, str): int]): Dictionary mapping
|
||||
user_id/device_id to update stream_id
|
||||
query_map (Dict[(str, str): (int, str|None)]): Dictionary mapping
|
||||
user_id/device_id to update stream_id and the relevent json-encoded
|
||||
opentracing context
|
||||
|
||||
Returns:
|
||||
List[Dict]: List of objects representing an device update EDU
|
||||
|
@ -210,12 +231,13 @@ class DeviceWorkerStore(SQLBaseStore):
|
|||
destination, user_id, from_stream_id
|
||||
)
|
||||
for device_id, device in iteritems(user_devices):
|
||||
stream_id = query_map[(user_id, device_id)]
|
||||
stream_id, opentracing_context = query_map[(user_id, device_id)]
|
||||
result = {
|
||||
"user_id": user_id,
|
||||
"device_id": device_id,
|
||||
"prev_id": [prev_id] if prev_id else [],
|
||||
"stream_id": stream_id,
|
||||
"org.matrix.opentracing_context": opentracing_context,
|
||||
}
|
||||
|
||||
prev_id = stream_id
|
||||
|
@ -814,6 +836,8 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
|
|||
],
|
||||
)
|
||||
|
||||
context = get_active_span_text_map()
|
||||
|
||||
self._simple_insert_many_txn(
|
||||
txn,
|
||||
table="device_lists_outbound_pokes",
|
||||
|
@ -825,6 +849,9 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
|
|||
"device_id": device_id,
|
||||
"sent": False,
|
||||
"ts": now,
|
||||
"opentracing_context": json.dumps(context)
|
||||
if whitelisted_homeserver(destination)
|
||||
else None,
|
||||
}
|
||||
for destination in hosts
|
||||
for device_id in device_ids
|
||||
|
|
|
@ -18,6 +18,7 @@ import json
|
|||
from twisted.internet import defer
|
||||
|
||||
from synapse.api.errors import StoreError
|
||||
from synapse.logging.opentracing import log_kv, trace
|
||||
|
||||
from ._base import SQLBaseStore
|
||||
|
||||
|
@ -94,7 +95,16 @@ class EndToEndRoomKeyStore(SQLBaseStore):
|
|||
},
|
||||
lock=False,
|
||||
)
|
||||
log_kv(
|
||||
{
|
||||
"message": "Set room key",
|
||||
"room_id": room_id,
|
||||
"session_id": session_id,
|
||||
"room_key": room_key,
|
||||
}
|
||||
)
|
||||
|
||||
@trace
|
||||
@defer.inlineCallbacks
|
||||
def get_e2e_room_keys(self, user_id, version, room_id=None, session_id=None):
|
||||
"""Bulk get the E2E room keys for a given backup, optionally filtered to a given
|
||||
|
@ -153,6 +163,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
|
|||
|
||||
return sessions
|
||||
|
||||
@trace
|
||||
@defer.inlineCallbacks
|
||||
def delete_e2e_room_keys(self, user_id, version, room_id=None, session_id=None):
|
||||
"""Bulk delete the E2E room keys for a given backup, optionally filtered to a given
|
||||
|
@ -236,6 +247,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
|
|||
"get_e2e_room_keys_version_info", _get_e2e_room_keys_version_info_txn
|
||||
)
|
||||
|
||||
@trace
|
||||
def create_e2e_room_keys_version(self, user_id, info):
|
||||
"""Atomically creates a new version of this user's e2e_room_keys store
|
||||
with the given version info.
|
||||
|
@ -276,6 +288,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
|
|||
"create_e2e_room_keys_version_txn", _create_e2e_room_keys_version_txn
|
||||
)
|
||||
|
||||
@trace
|
||||
def update_e2e_room_keys_version(self, user_id, version, info):
|
||||
"""Update a given backup version
|
||||
|
||||
|
@ -292,6 +305,7 @@ class EndToEndRoomKeyStore(SQLBaseStore):
|
|||
desc="update_e2e_room_keys_version",
|
||||
)
|
||||
|
||||
@trace
|
||||
def delete_e2e_room_keys_version(self, user_id, version=None):
|
||||
"""Delete a given backup version of the user's room keys.
|
||||
Doesn't delete their actual key data.
|
||||
|
|
|
@ -18,12 +18,14 @@ from canonicaljson import encode_canonical_json
|
|||
|
||||
from twisted.internet import defer
|
||||
|
||||
from synapse.logging.opentracing import log_kv, set_tag, trace
|
||||
from synapse.util.caches.descriptors import cached
|
||||
|
||||
from ._base import SQLBaseStore, db_to_json
|
||||
|
||||
|
||||
class EndToEndKeyWorkerStore(SQLBaseStore):
|
||||
@trace
|
||||
@defer.inlineCallbacks
|
||||
def get_e2e_device_keys(
|
||||
self, query_list, include_all_devices=False, include_deleted_devices=False
|
||||
|
@ -40,6 +42,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
|
|||
Dict mapping from user-id to dict mapping from device_id to
|
||||
dict containing "key_json", "device_display_name".
|
||||
"""
|
||||
set_tag("query_list", query_list)
|
||||
if not query_list:
|
||||
return {}
|
||||
|
||||
|
@ -57,9 +60,13 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
|
|||
|
||||
return results
|
||||
|
||||
@trace
|
||||
def _get_e2e_device_keys_txn(
|
||||
self, txn, query_list, include_all_devices=False, include_deleted_devices=False
|
||||
):
|
||||
set_tag("include_all_devices", include_all_devices)
|
||||
set_tag("include_deleted_devices", include_deleted_devices)
|
||||
|
||||
query_clauses = []
|
||||
query_params = []
|
||||
|
||||
|
@ -104,6 +111,7 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
|
|||
for user_id, device_id in deleted_devices:
|
||||
result.setdefault(user_id, {})[device_id] = None
|
||||
|
||||
log_kv(result)
|
||||
return result
|
||||
|
||||
@defer.inlineCallbacks
|
||||
|
@ -129,8 +137,9 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
|
|||
keyvalues={"user_id": user_id, "device_id": device_id},
|
||||
desc="add_e2e_one_time_keys_check",
|
||||
)
|
||||
|
||||
return {(row["algorithm"], row["key_id"]): row["key_json"] for row in rows}
|
||||
result = {(row["algorithm"], row["key_id"]): row["key_json"] for row in rows}
|
||||
log_kv({"message": "Fetched one time keys for user", "one_time_keys": result})
|
||||
return result
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def add_e2e_one_time_keys(self, user_id, device_id, time_now, new_keys):
|
||||
|
@ -146,6 +155,9 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
|
|||
"""
|
||||
|
||||
def _add_e2e_one_time_keys(txn):
|
||||
set_tag("user_id", user_id)
|
||||
set_tag("device_id", device_id)
|
||||
set_tag("new_keys", new_keys)
|
||||
# We are protected from race between lookup and insertion due to
|
||||
# a unique constraint. If there is a race of two calls to
|
||||
# `add_e2e_one_time_keys` then they'll conflict and we will only
|
||||
|
@ -202,6 +214,11 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
|
|||
"""
|
||||
|
||||
def _set_e2e_device_keys_txn(txn):
|
||||
set_tag("user_id", user_id)
|
||||
set_tag("device_id", device_id)
|
||||
set_tag("time_now", time_now)
|
||||
set_tag("device_keys", device_keys)
|
||||
|
||||
old_key_json = self._simple_select_one_onecol_txn(
|
||||
txn,
|
||||
table="e2e_device_keys_json",
|
||||
|
@ -215,6 +232,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
|
|||
new_key_json = encode_canonical_json(device_keys).decode("utf-8")
|
||||
|
||||
if old_key_json == new_key_json:
|
||||
log_kv({"Message": "Device key already stored."})
|
||||
return False
|
||||
|
||||
self._simple_upsert_txn(
|
||||
|
@ -223,7 +241,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
|
|||
keyvalues={"user_id": user_id, "device_id": device_id},
|
||||
values={"ts_added_ms": time_now, "key_json": new_key_json},
|
||||
)
|
||||
|
||||
log_kv({"message": "Device keys stored."})
|
||||
return True
|
||||
|
||||
return self.runInteraction("set_e2e_device_keys", _set_e2e_device_keys_txn)
|
||||
|
@ -231,6 +249,7 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
|
|||
def claim_e2e_one_time_keys(self, query_list):
|
||||
"""Take a list of one time keys out of the database"""
|
||||
|
||||
@trace
|
||||
def _claim_e2e_one_time_keys(txn):
|
||||
sql = (
|
||||
"SELECT key_id, key_json FROM e2e_one_time_keys_json"
|
||||
|
@ -252,7 +271,13 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
|
|||
" AND key_id = ?"
|
||||
)
|
||||
for user_id, device_id, algorithm, key_id in delete:
|
||||
log_kv(
|
||||
{
|
||||
"message": "Executing claim e2e_one_time_keys transaction on database."
|
||||
}
|
||||
)
|
||||
txn.execute(sql, (user_id, device_id, algorithm, key_id))
|
||||
log_kv({"message": "finished executing and invalidating cache"})
|
||||
self._invalidate_cache_and_stream(
|
||||
txn, self.count_e2e_one_time_keys, (user_id, device_id)
|
||||
)
|
||||
|
@ -262,6 +287,13 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
|
|||
|
||||
def delete_e2e_keys_by_device(self, user_id, device_id):
|
||||
def delete_e2e_keys_by_device_txn(txn):
|
||||
log_kv(
|
||||
{
|
||||
"message": "Deleting keys for device",
|
||||
"device_id": device_id,
|
||||
"user_id": user_id,
|
||||
}
|
||||
)
|
||||
self._simple_delete_txn(
|
||||
txn,
|
||||
table="e2e_device_keys_json",
|
||||
|
|
|
@ -1302,15 +1302,11 @@ class EventsStore(
|
|||
"event_reference_hashes",
|
||||
"event_search",
|
||||
"event_to_state_groups",
|
||||
"guest_access",
|
||||
"history_visibility",
|
||||
"local_invites",
|
||||
"room_names",
|
||||
"state_events",
|
||||
"rejections",
|
||||
"redactions",
|
||||
"room_memberships",
|
||||
"topics",
|
||||
):
|
||||
txn.executemany(
|
||||
"DELETE FROM %s WHERE event_id = ?" % (table,),
|
||||
|
@ -1454,10 +1450,10 @@ class EventsStore(
|
|||
|
||||
for event, _ in events_and_contexts:
|
||||
if event.type == EventTypes.Name:
|
||||
# Insert into the room_names and event_search tables.
|
||||
# Insert into the event_search table.
|
||||
self._store_room_name_txn(txn, event)
|
||||
elif event.type == EventTypes.Topic:
|
||||
# Insert into the topics table and event_search table.
|
||||
# Insert into the event_search table.
|
||||
self._store_room_topic_txn(txn, event)
|
||||
elif event.type == EventTypes.Message:
|
||||
# Insert into the event_search table.
|
||||
|
@ -1465,12 +1461,6 @@ class EventsStore(
|
|||
elif event.type == EventTypes.Redaction:
|
||||
# Insert into the redactions table.
|
||||
self._store_redaction(txn, event)
|
||||
elif event.type == EventTypes.RoomHistoryVisibility:
|
||||
# Insert into the event_search table.
|
||||
self._store_history_visibility_txn(txn, event)
|
||||
elif event.type == EventTypes.GuestAccess:
|
||||
# Insert into the event_search table.
|
||||
self._store_guest_access_txn(txn, event)
|
||||
|
||||
self._handle_event_relations(txn, event)
|
||||
|
||||
|
@ -2191,6 +2181,143 @@ class EventsStore(
|
|||
|
||||
return to_delete, to_dedelta
|
||||
|
||||
def purge_room(self, room_id):
|
||||
"""Deletes all record of a room
|
||||
|
||||
Args:
|
||||
room_id (str):
|
||||
"""
|
||||
|
||||
return self.runInteraction("purge_room", self._purge_room_txn, room_id)
|
||||
|
||||
def _purge_room_txn(self, txn, room_id):
|
||||
# first we have to delete the state groups states
|
||||
logger.info("[purge] removing %s from state_groups_state", room_id)
|
||||
|
||||
txn.execute(
|
||||
"""
|
||||
DELETE FROM state_groups_state WHERE state_group IN (
|
||||
SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
|
||||
WHERE events.room_id=?
|
||||
)
|
||||
""",
|
||||
(room_id,),
|
||||
)
|
||||
|
||||
# ... and the state group edges
|
||||
logger.info("[purge] removing %s from state_group_edges", room_id)
|
||||
|
||||
txn.execute(
|
||||
"""
|
||||
DELETE FROM state_group_edges WHERE state_group IN (
|
||||
SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
|
||||
WHERE events.room_id=?
|
||||
)
|
||||
""",
|
||||
(room_id,),
|
||||
)
|
||||
|
||||
# ... and the state groups
|
||||
logger.info("[purge] removing %s from state_groups", room_id)
|
||||
|
||||
txn.execute(
|
||||
"""
|
||||
DELETE FROM state_groups WHERE id IN (
|
||||
SELECT state_group FROM events JOIN event_to_state_groups USING(event_id)
|
||||
WHERE events.room_id=?
|
||||
)
|
||||
""",
|
||||
(room_id,),
|
||||
)
|
||||
|
||||
# and then tables which lack an index on room_id but have one on event_id
|
||||
for table in (
|
||||
"event_auth",
|
||||
"event_edges",
|
||||
"event_push_actions_staging",
|
||||
"event_reference_hashes",
|
||||
"event_relations",
|
||||
"event_to_state_groups",
|
||||
"redactions",
|
||||
"rejections",
|
||||
"state_events",
|
||||
):
|
||||
logger.info("[purge] removing %s from %s", room_id, table)
|
||||
|
||||
txn.execute(
|
||||
"""
|
||||
DELETE FROM %s WHERE event_id IN (
|
||||
SELECT event_id FROM events WHERE room_id=?
|
||||
)
|
||||
"""
|
||||
% (table,),
|
||||
(room_id,),
|
||||
)
|
||||
|
||||
# and finally, the tables with an index on room_id (or no useful index)
|
||||
for table in (
|
||||
"current_state_events",
|
||||
"event_backward_extremities",
|
||||
"event_forward_extremities",
|
||||
"event_json",
|
||||
"event_push_actions",
|
||||
"event_search",
|
||||
"events",
|
||||
"group_rooms",
|
||||
"public_room_list_stream",
|
||||
"receipts_graph",
|
||||
"receipts_linearized",
|
||||
"room_aliases",
|
||||
"room_depth",
|
||||
"room_memberships",
|
||||
"room_state",
|
||||
"room_stats",
|
||||
"room_stats_earliest_token",
|
||||
"rooms",
|
||||
"stream_ordering_to_exterm",
|
||||
"topics",
|
||||
"users_in_public_rooms",
|
||||
"users_who_share_private_rooms",
|
||||
# no useful index, but let's clear them anyway
|
||||
"appservice_room_list",
|
||||
"e2e_room_keys",
|
||||
"event_push_summary",
|
||||
"pusher_throttle",
|
||||
"group_summary_rooms",
|
||||
"local_invites",
|
||||
"room_account_data",
|
||||
"room_tags",
|
||||
):
|
||||
logger.info("[purge] removing %s from %s", room_id, table)
|
||||
txn.execute("DELETE FROM %s WHERE room_id=?" % (table,), (room_id,))
|
||||
|
||||
# Other tables we do NOT need to clear out:
|
||||
#
|
||||
# - blocked_rooms
|
||||
# This is important, to make sure that we don't accidentally rejoin a blocked
|
||||
# room after it was purged
|
||||
#
|
||||
# - user_directory
|
||||
# This has a room_id column, but it is unused
|
||||
#
|
||||
|
||||
# Other tables that we might want to consider clearing out include:
|
||||
#
|
||||
# - event_reports
|
||||
# Given that these are intended for abuse management my initial
|
||||
# inclination is to leave them in place.
|
||||
#
|
||||
# - current_state_delta_stream
|
||||
# - ex_outlier_stream
|
||||
# - room_tags_revisions
|
||||
# The problem with these is that they are largeish and there is no room_id
|
||||
# index on them. In any case we should be clearing out 'stream' tables
|
||||
# periodically anyway (#5888)
|
||||
|
||||
# TODO: we could probably usefully do a bunch of cache invalidation here
|
||||
|
||||
logger.info("[purge] done")
|
||||
|
||||
@defer.inlineCallbacks
|
||||
def is_event_after(self, event_id1, event_id2):
|
||||
"""Returns True if event_id1 is after event_id2 in the stream
|
||||
|
|
|
@ -238,6 +238,13 @@ def _upgrade_existing_database(
|
|||
|
||||
logger.debug("applied_delta_files: %s", applied_delta_files)
|
||||
|
||||
if isinstance(database_engine, PostgresEngine):
|
||||
specific_engine_extension = ".postgres"
|
||||
else:
|
||||
specific_engine_extension = ".sqlite"
|
||||
|
||||
specific_engine_extensions = (".sqlite", ".postgres")
|
||||
|
||||
for v in range(start_ver, SCHEMA_VERSION + 1):
|
||||
logger.info("Upgrading schema to v%d", v)
|
||||
|
||||
|
@ -274,15 +281,22 @@ def _upgrade_existing_database(
|
|||
# Sometimes .pyc files turn up anyway even though we've
|
||||
# disabled their generation; e.g. from distribution package
|
||||
# installers. Silently skip it
|
||||
pass
|
||||
continue
|
||||
elif ext == ".sql":
|
||||
# A plain old .sql file, just read and execute it
|
||||
logger.info("Applying schema %s", relative_path)
|
||||
executescript(cur, absolute_path)
|
||||
elif ext == specific_engine_extension and root_name.endswith(".sql"):
|
||||
# A .sql file specific to our engine; just read and execute it
|
||||
logger.info("Applying engine-specific schema %s", relative_path)
|
||||
executescript(cur, absolute_path)
|
||||
elif ext in specific_engine_extensions and root_name.endswith(".sql"):
|
||||
# A .sql file for a different engine; skip it.
|
||||
continue
|
||||
else:
|
||||
# Not a valid delta file.
|
||||
logger.warn(
|
||||
"Found directory entry that did not end in .py or" " .sql: %s",
|
||||
logger.warning(
|
||||
"Found directory entry that did not end in .py or .sql: %s",
|
||||
relative_path,
|
||||
)
|
||||
continue
|
||||
|
@ -290,7 +304,7 @@ def _upgrade_existing_database(
|
|||
# Mark as done.
|
||||
cur.execute(
|
||||
database_engine.convert_param_style(
|
||||
"INSERT INTO applied_schema_deltas (version, file)" " VALUES (?,?)"
|
||||
"INSERT INTO applied_schema_deltas (version, file) VALUES (?,?)"
|
||||
),
|
||||
(v, relative_path),
|
||||
)
|
||||
|
@ -298,7 +312,7 @@ def _upgrade_existing_database(
|
|||
cur.execute("DELETE FROM schema_version")
|
||||
cur.execute(
|
||||
database_engine.convert_param_style(
|
||||
"INSERT INTO schema_version (version, upgraded)" " VALUES (?,?)"
|
||||
"INSERT INTO schema_version (version, upgraded) VALUES (?,?)"
|
||||
),
|
||||
(v, True),
|
||||
)
|
||||
|
|
|
@ -272,6 +272,14 @@ class RegistrationWorkerStore(SQLBaseStore):
|
|||
|
||||
@defer.inlineCallbacks
|
||||
def is_server_admin(self, user):
|
||||
"""Determines if a user is an admin of this homeserver.
|
||||
|
||||
Args:
|
||||
user (UserID): user ID of the user to test
|
||||
|
||||
Returns (bool):
|
||||
true iff the user is a server admin, false otherwise.
|
||||
"""
|
||||
res = yield self._simple_select_one_onecol(
|
||||
table="users",
|
||||
keyvalues={"name": user.to_string()},
|
||||
|
@ -282,6 +290,21 @@ class RegistrationWorkerStore(SQLBaseStore):
|
|||
|
||||
return res if res else False
|
||||
|
||||
def set_server_admin(self, user, admin):
|
||||
"""Sets whether a user is an admin of this homeserver.
|
||||
|
||||
Args:
|
||||
user (UserID): user ID of the user to test
|
||||
admin (bool): true iff the user is to be a server admin,
|
||||
false otherwise.
|
||||
"""
|
||||
return self._simple_update_one(
|
||||
table="users",
|
||||
keyvalues={"name": user.to_string()},
|
||||
updatevalues={"admin": 1 if admin else 0},
|
||||
desc="set_server_admin",
|
||||
)
|
||||
|
||||
def _query_for_auth(self, txn, token):
|
||||
sql = (
|
||||
"SELECT users.name, users.is_guest, access_tokens.id as token_id,"
|
||||
|
|
|
@ -386,32 +386,12 @@ class RoomStore(RoomWorkerStore, SearchStore):
|
|||
|
||||
def _store_room_topic_txn(self, txn, event):
|
||||
if hasattr(event, "content") and "topic" in event.content:
|
||||
self._simple_insert_txn(
|
||||
txn,
|
||||
"topics",
|
||||
{
|
||||
"event_id": event.event_id,
|
||||
"room_id": event.room_id,
|
||||
"topic": event.content["topic"],
|
||||
},
|
||||
)
|
||||
|
||||
self.store_event_search_txn(
|
||||
txn, event, "content.topic", event.content["topic"]
|
||||
)
|
||||
|
||||
def _store_room_name_txn(self, txn, event):
|
||||
if hasattr(event, "content") and "name" in event.content:
|
||||
self._simple_insert_txn(
|
||||
txn,
|
||||
"room_names",
|
||||
{
|
||||
"event_id": event.event_id,
|
||||
"room_id": event.room_id,
|
||||
"name": event.content["name"],
|
||||
},
|
||||
)
|
||||
|
||||
self.store_event_search_txn(
|
||||
txn, event, "content.name", event.content["name"]
|
||||
)
|
||||
|
@ -422,21 +402,6 @@ class RoomStore(RoomWorkerStore, SearchStore):
|
|||
txn, event, "content.body", event.content["body"]
|
||||
)
|
||||
|
||||
def _store_history_visibility_txn(self, txn, event):
|
||||
self._store_content_index_txn(txn, event, "history_visibility")
|
||||
|
||||
def _store_guest_access_txn(self, txn, event):
|
||||
self._store_content_index_txn(txn, event, "guest_access")
|
||||
|
||||
def _store_content_index_txn(self, txn, event, key):
|
||||
if hasattr(event, "content") and key in event.content:
|
||||
sql = (
|
||||
"INSERT INTO %(key)s"
|
||||
" (event_id, room_id, %(key)s)"
|
||||
" VALUES (?, ?, ?)" % {"key": key}
|
||||
)
|
||||
txn.execute(sql, (event.event_id, event.room_id, event.content[key]))
|
||||
|
||||
def add_event_report(
|
||||
self, room_id, event_id, user_id, reason, content, received_ts
|
||||
):
|
||||
|
|
|
@ -0,0 +1,20 @@
|
|||
/* Copyright 2019 The Matrix.org Foundation C.I.C
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
/*
|
||||
* Opentracing context data for inclusion in the device_list_update EDUs, as a
|
||||
* json-encoded dictionary. NULL if opentracing is disabled (or not enabled for this destination).
|
||||
*/
|
||||
ALTER TABLE device_lists_outbound_pokes ADD opentracing_context TEXT;
|
20
synapse/storage/schema/delta/56/drop_unused_event_tables.sql
Normal file
20
synapse/storage/schema/delta/56/drop_unused_event_tables.sql
Normal file
|
@ -0,0 +1,20 @@
|
|||
/* Copyright 2019 The Matrix.org Foundation C.I.C.
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- these tables are never used.
|
||||
DROP TABLE IF EXISTS room_names;
|
||||
DROP TABLE IF EXISTS topics;
|
||||
DROP TABLE IF EXISTS history_visibility;
|
||||
DROP TABLE IF EXISTS guest_access;
|
|
@ -0,0 +1,17 @@
|
|||
/* Copyright 2019 Matrix.org Foundation CIC
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
-- this was apparently forgotten when the table was created back in delta 53.
|
||||
CREATE INDEX users_in_public_rooms_r_idx ON users_in_public_rooms(room_id);
|
|
@ -37,11 +37,9 @@ class ApplicationServiceSchedulerTransactionCtrlTestCase(unittest.TestCase):
|
|||
self.recoverer = Mock()
|
||||
self.recoverer_fn = Mock(return_value=self.recoverer)
|
||||
self.txnctrl = _TransactionController(
|
||||
clock=self.clock,
|
||||
store=self.store,
|
||||
as_api=self.as_api,
|
||||
recoverer_fn=self.recoverer_fn,
|
||||
clock=self.clock, store=self.store, as_api=self.as_api
|
||||
)
|
||||
self.txnctrl.RECOVERER_CLASS = self.recoverer_fn
|
||||
|
||||
def test_single_service_up_txn_sent(self):
|
||||
# Test: The AS is up and the txn is successfully sent.
|
||||
|
|
|
@ -73,8 +73,6 @@ class MatrixFederationAgentTests(TestCase):
|
|||
|
||||
self.mock_resolver = Mock()
|
||||
|
||||
self.well_known_cache = TTLCache("test_cache", timer=self.reactor.seconds)
|
||||
|
||||
config_dict = default_config("test", parse=False)
|
||||
config_dict["federation_custom_ca_list"] = [get_test_ca_cert_file()]
|
||||
|
||||
|
@ -82,11 +80,21 @@ class MatrixFederationAgentTests(TestCase):
|
|||
config.parse_config_dict(config_dict, "", "")
|
||||
|
||||
self.tls_factory = ClientTLSOptionsFactory(config)
|
||||
|
||||
self.well_known_cache = TTLCache("test_cache", timer=self.reactor.seconds)
|
||||
self.had_well_known_cache = TTLCache("test_cache", timer=self.reactor.seconds)
|
||||
self.well_known_resolver = WellKnownResolver(
|
||||
self.reactor,
|
||||
Agent(self.reactor, contextFactory=self.tls_factory),
|
||||
well_known_cache=self.well_known_cache,
|
||||
had_well_known_cache=self.had_well_known_cache,
|
||||
)
|
||||
|
||||
self.agent = MatrixFederationAgent(
|
||||
reactor=self.reactor,
|
||||
tls_client_options_factory=self.tls_factory,
|
||||
_srv_resolver=self.mock_resolver,
|
||||
_well_known_cache=self.well_known_cache,
|
||||
_well_known_resolver=self.well_known_resolver,
|
||||
)
|
||||
|
||||
def _make_connection(self, client_factory, expected_sni):
|
||||
|
@ -543,7 +551,7 @@ class MatrixFederationAgentTests(TestCase):
|
|||
self.assertEqual(self.well_known_cache[b"testserv"], b"target-server")
|
||||
|
||||
# check the cache expires
|
||||
self.reactor.pump((25 * 3600,))
|
||||
self.reactor.pump((48 * 3600,))
|
||||
self.well_known_cache.expire()
|
||||
self.assertNotIn(b"testserv", self.well_known_cache)
|
||||
|
||||
|
@ -631,7 +639,7 @@ class MatrixFederationAgentTests(TestCase):
|
|||
self.assertEqual(self.well_known_cache[b"testserv"], b"target-server")
|
||||
|
||||
# check the cache expires
|
||||
self.reactor.pump((25 * 3600,))
|
||||
self.reactor.pump((48 * 3600,))
|
||||
self.well_known_cache.expire()
|
||||
self.assertNotIn(b"testserv", self.well_known_cache)
|
||||
|
||||
|
@ -701,11 +709,18 @@ class MatrixFederationAgentTests(TestCase):
|
|||
|
||||
config = default_config("test", parse=True)
|
||||
|
||||
# Build a new agent and WellKnownResolver with a different tls factory
|
||||
tls_factory = ClientTLSOptionsFactory(config)
|
||||
agent = MatrixFederationAgent(
|
||||
reactor=self.reactor,
|
||||
tls_client_options_factory=ClientTLSOptionsFactory(config),
|
||||
tls_client_options_factory=tls_factory,
|
||||
_srv_resolver=self.mock_resolver,
|
||||
_well_known_cache=self.well_known_cache,
|
||||
_well_known_resolver=WellKnownResolver(
|
||||
self.reactor,
|
||||
Agent(self.reactor, contextFactory=tls_factory),
|
||||
well_known_cache=self.well_known_cache,
|
||||
had_well_known_cache=self.had_well_known_cache,
|
||||
),
|
||||
)
|
||||
|
||||
test_d = agent.request(b"GET", b"matrix://testserv/foo/bar")
|
||||
|
@ -932,15 +947,9 @@ class MatrixFederationAgentTests(TestCase):
|
|||
self.successResultOf(test_d)
|
||||
|
||||
def test_well_known_cache(self):
|
||||
well_known_resolver = WellKnownResolver(
|
||||
self.reactor,
|
||||
Agent(self.reactor, contextFactory=self.tls_factory),
|
||||
well_known_cache=self.well_known_cache,
|
||||
)
|
||||
|
||||
self.reactor.lookups["testserv"] = "1.2.3.4"
|
||||
|
||||
fetch_d = well_known_resolver.get_well_known(b"testserv")
|
||||
fetch_d = self.well_known_resolver.get_well_known(b"testserv")
|
||||
|
||||
# there should be an attempt to connect on port 443 for the .well-known
|
||||
clients = self.reactor.tcpClients
|
||||
|
@ -963,7 +972,7 @@ class MatrixFederationAgentTests(TestCase):
|
|||
well_known_server.loseConnection()
|
||||
|
||||
# repeat the request: it should hit the cache
|
||||
fetch_d = well_known_resolver.get_well_known(b"testserv")
|
||||
fetch_d = self.well_known_resolver.get_well_known(b"testserv")
|
||||
r = self.successResultOf(fetch_d)
|
||||
self.assertEqual(r.delegated_server, b"target-server")
|
||||
|
||||
|
@ -971,7 +980,7 @@ class MatrixFederationAgentTests(TestCase):
|
|||
self.reactor.pump((1000.0,))
|
||||
|
||||
# now it should connect again
|
||||
fetch_d = well_known_resolver.get_well_known(b"testserv")
|
||||
fetch_d = self.well_known_resolver.get_well_known(b"testserv")
|
||||
|
||||
self.assertEqual(len(clients), 1)
|
||||
(host, port, client_factory, _timeout, _bindAddress) = clients.pop(0)
|
||||
|
@ -992,15 +1001,9 @@ class MatrixFederationAgentTests(TestCase):
|
|||
it ignores transient errors.
|
||||
"""
|
||||
|
||||
well_known_resolver = WellKnownResolver(
|
||||
self.reactor,
|
||||
Agent(self.reactor, contextFactory=self.tls_factory),
|
||||
well_known_cache=self.well_known_cache,
|
||||
)
|
||||
|
||||
self.reactor.lookups["testserv"] = "1.2.3.4"
|
||||
|
||||
fetch_d = well_known_resolver.get_well_known(b"testserv")
|
||||
fetch_d = self.well_known_resolver.get_well_known(b"testserv")
|
||||
|
||||
# there should be an attempt to connect on port 443 for the .well-known
|
||||
clients = self.reactor.tcpClients
|
||||
|
@ -1026,27 +1029,37 @@ class MatrixFederationAgentTests(TestCase):
|
|||
# another lookup.
|
||||
self.reactor.pump((900.0,))
|
||||
|
||||
fetch_d = well_known_resolver.get_well_known(b"testserv")
|
||||
fetch_d = self.well_known_resolver.get_well_known(b"testserv")
|
||||
|
||||
# The resolver may retry a few times, so fonx all requests that come along
|
||||
attempts = 0
|
||||
while self.reactor.tcpClients:
|
||||
clients = self.reactor.tcpClients
|
||||
(host, port, client_factory, _timeout, _bindAddress) = clients.pop(0)
|
||||
|
||||
attempts += 1
|
||||
|
||||
# fonx the connection attempt, this will be treated as a temporary
|
||||
# failure.
|
||||
client_factory.clientConnectionFailed(None, Exception("nope"))
|
||||
|
||||
# attemptdelay on the hostnameendpoint is 0.3, so takes that long before the
|
||||
# .well-known request fails.
|
||||
self.reactor.pump((0.4,))
|
||||
# There's a few sleeps involved, so we have to pump the reactor a
|
||||
# bit.
|
||||
self.reactor.pump((1.0, 1.0))
|
||||
|
||||
# We expect to see more than one attempt as there was previously a valid
|
||||
# well known.
|
||||
self.assertGreater(attempts, 1)
|
||||
|
||||
# Resolver should return cached value, despite the lookup failing.
|
||||
r = self.successResultOf(fetch_d)
|
||||
self.assertEqual(r.delegated_server, b"target-server")
|
||||
|
||||
# Expire the cache and repeat the request
|
||||
self.reactor.pump((100.0,))
|
||||
# Expire both caches and repeat the request
|
||||
self.reactor.pump((10000.0,))
|
||||
|
||||
# Repated the request, this time it should fail if the lookup fails.
|
||||
fetch_d = well_known_resolver.get_well_known(b"testserv")
|
||||
fetch_d = self.well_known_resolver.get_well_known(b"testserv")
|
||||
|
||||
clients = self.reactor.tcpClients
|
||||
(host, port, client_factory, _timeout, _bindAddress) = clients.pop(0)
|
||||
|
|
Loading…
Reference in a new issue