add basic jaeger support

This commit is contained in:
Erik Johnston 2018-11-26 18:16:02 +00:00 committed by Brendan Abolivier
parent 27ca009b0a
commit ec288b48fd
8 changed files with 179 additions and 67 deletions

View file

@ -73,7 +73,7 @@ log_config: "/compiled/log.config"
## Ratelimiting ##
rc_messages_per_second: 0.2
rc_messages_per_second: 50
rc_message_burst_count: 10.0
federation_rc_window_size: 1000
federation_rc_sleep_limit: 10

View file

@ -5,4 +5,4 @@ nodaemon=true
command=/start.py
[program:proxy]
command=/proxy/proxy --maps-dir /proxy/maps
command=/proxy/proxy --maps-dir /proxy/maps --debug-log

View file

@ -140,6 +140,8 @@ class TransactionQueue(object):
self._processing_pending_presence = False
self.tracer = hs.get_tracer()
def notify_new_events(self, current_id):
"""This gets called when we have some new events we might want to
send out to other servers.
@ -526,26 +528,30 @@ class TransactionQueue(object):
# END CRITICAL SECTION
success = yield self._send_new_transaction(
destination, pending_pdus, pending_edus,
)
if success:
sent_transactions_counter.inc()
# Remove the acknowledged device messages from the database
# Only bother if we actually sent some device messages
if device_message_edus:
yield self.store.delete_device_msgs_for_remote(
destination, device_stream_id
)
logger.info("Marking as sent %r %r", destination, dev_list_id)
yield self.store.mark_as_sent_devices_by_remote(
destination, dev_list_id
)
with self.tracer.start_span('_send_new_transaction') as span:
span.set_tag("destination", destination)
self.last_device_stream_id_by_dest[destination] = device_stream_id
self.last_device_list_stream_id_by_dest[destination] = dev_list_id
else:
break
success = yield self._send_new_transaction(
destination, pending_pdus, pending_edus, span,
)
span.set_tag("success", success)
if success:
sent_transactions_counter.inc()
# Remove the acknowledged device messages from the database
# Only bother if we actually sent some device messages
if device_message_edus:
yield self.store.delete_device_msgs_for_remote(
destination, device_stream_id
)
logger.info("Marking as sent %r %r", destination, dev_list_id)
yield self.store.mark_as_sent_devices_by_remote(
destination, dev_list_id
)
self.last_device_stream_id_by_dest[destination] = device_stream_id
self.last_device_list_stream_id_by_dest[destination] = dev_list_id
else:
break
except NotRetryingDestination as e:
logger.debug(
"TX [%s] not ready for retry yet (next retry at %s) - "
@ -604,7 +610,7 @@ class TransactionQueue(object):
@measure_func("_send_new_transaction")
@defer.inlineCallbacks
def _send_new_transaction(self, destination, pending_pdus, pending_edus):
def _send_new_transaction(self, destination, pending_pdus, pending_edus, span):
# Sort based on the order field
pending_pdus.sort(key=lambda t: t[1])
@ -617,6 +623,8 @@ class TransactionQueue(object):
txn_id = str(self._next_txn_id)
span.set_tag("txn-id", txn_id)
logger.debug(
"TX [%s] {%s} Attempting new transaction"
" (pdus: %d, edus: %d)",
@ -667,7 +675,7 @@ class TransactionQueue(object):
try:
response = yield self.transport_layer.send_transaction(
transaction, json_data_cb
transaction, json_data_cb, span,
)
code = 200
except HttpResponseException as e:
@ -686,6 +694,8 @@ class TransactionQueue(object):
destination, txn_id, code
)
span.set_tag("http.status_code", code)
yield self.transaction_actions.delivered(
transaction, code, response
)

View file

@ -136,7 +136,7 @@ class TransportLayerClient(object):
@defer.inlineCallbacks
@log_function
def send_transaction(self, transaction, json_data_callback=None):
def send_transaction(self, transaction, json_data_callback=None, span=None):
""" Sends the given Transaction to its destination
Args:
@ -176,6 +176,7 @@ class TransportLayerClient(object):
json_data_callback=json_data_callback,
long_retries=False,
backoff_on_404=True, # If we get a 404 the other side has gone
span=span,
)
defer.returnValue(response)

View file

@ -17,6 +17,9 @@
import functools
import logging
import re
import opentracing
import six
from twisted.internet import defer
@ -227,10 +230,11 @@ class BaseFederationServlet(object):
"""
REQUIRE_AUTH = True
def __init__(self, handler, authenticator, ratelimiter, server_name):
def __init__(self, handler, authenticator, ratelimiter, server_name, hs):
self.handler = handler
self.authenticator = authenticator
self.ratelimiter = ratelimiter
self.tracer = hs.get_tracer()
def _wrap(self, func):
authenticator = self.authenticator
@ -251,32 +255,58 @@ class BaseFederationServlet(object):
Deferred[(int, object)|None]: (response code, response object) as returned
by the callback method. None if the request has already been handled.
"""
content = None
if request.method in [b"PUT", b"POST"]:
# TODO: Handle other method types? other content types?
content = parse_json_object_from_request(request)
try:
origin = yield authenticator.authenticate_request(request, content)
except NoAuthenticationError:
origin = None
if self.REQUIRE_AUTH:
logger.warn("authenticate_request failed: missing authentication")
raise
except Exception as e:
logger.warn("authenticate_request failed: %s", e)
raise
carrier = {}
for key, value in six.iteritems(request.headers):
carrier[key] = value
parent_ctx = self.tracer.extract(
format=opentracing.Format.HTTP_HEADERS, carrier=carrier
)
except Exception:
logger.exception("trace extract failed")
parent_ctx = None
if origin:
with ratelimiter.ratelimit(origin) as d:
yield d
tags_dict = {
"http.method": request.method.decode('ascii'),
"http.url": request.uri.decode('ascii'),
}
span = self.tracer.start_span(
operation_name="federation-server",
child_of=parent_ctx,
tags=tags_dict,
)
with span:
content = None
if request.method in [b"PUT", b"POST"]:
# TODO: Handle other method types? other content types?
content = parse_json_object_from_request(request)
try:
origin = yield authenticator.authenticate_request(request, content)
except NoAuthenticationError:
origin = None
if self.REQUIRE_AUTH:
logger.warn("authenticate_request failed: missing authentication")
raise
except Exception as e:
logger.warn("authenticate_request failed: %s", e)
raise
if origin:
span.set_tag("origin", origin)
with ratelimiter.ratelimit(origin) as d:
yield d
response = yield func(
origin, content, request.args, *args, **kwargs
)
else:
response = yield func(
origin, content, request.args, *args, **kwargs
)
else:
response = yield func(
origin, content, request.args, *args, **kwargs
)
span.set_tag("http.status_code", response[0])
defer.returnValue(response)
@ -1322,6 +1352,7 @@ def register_servlets(hs, resource, authenticator, ratelimiter):
authenticator=authenticator,
ratelimiter=ratelimiter,
server_name=hs.hostname,
hs=hs,
).register(resource)
for servletclass in ROOM_LIST_CLASSES:
@ -1330,6 +1361,7 @@ def register_servlets(hs, resource, authenticator, ratelimiter):
authenticator=authenticator,
ratelimiter=ratelimiter,
server_name=hs.hostname,
hs=hs,
).register(resource)
for servletclass in GROUP_SERVER_SERVLET_CLASSES:
@ -1338,6 +1370,7 @@ def register_servlets(hs, resource, authenticator, ratelimiter):
authenticator=authenticator,
ratelimiter=ratelimiter,
server_name=hs.hostname,
hs=hs,
).register(resource)
for servletclass in GROUP_LOCAL_SERVLET_CLASSES:
@ -1346,6 +1379,7 @@ def register_servlets(hs, resource, authenticator, ratelimiter):
authenticator=authenticator,
ratelimiter=ratelimiter,
server_name=hs.hostname,
hs=hs,
).register(resource)
for servletclass in GROUP_ATTESTATION_SERVLET_CLASSES:
@ -1354,4 +1388,5 @@ def register_servlets(hs, resource, authenticator, ratelimiter):
authenticator=authenticator,
ratelimiter=ratelimiter,
server_name=hs.hostname,
hs=hs,
).register(resource)

View file

@ -19,7 +19,9 @@ import random
import sys
from io import BytesIO
from six import PY3, string_types
import opentracing
from six import PY3, string_types, iteritems
from six.moves import urllib
import attr
@ -203,6 +205,8 @@ class MatrixFederationHttpClient(object):
self._cooperator = Cooperator(scheduler=schedule)
self.tracer = hs.get_tracer()
@defer.inlineCallbacks
def _send_request(
self,
@ -211,7 +215,8 @@ class MatrixFederationHttpClient(object):
timeout=None,
long_retries=False,
ignore_backoff=False,
backoff_on_404=False
backoff_on_404=False,
span=None,
):
"""
Sends a request to the given server.
@ -321,25 +326,44 @@ class MatrixFederationHttpClient(object):
url_str,
)
# we don't want all the fancy cookie and redirect handling that
# treq.request gives: just use the raw Agent.
request_deferred = self.agent.request(
method_bytes,
url_bytes,
headers=Headers(headers_dict),
bodyProducer=producer,
)
request_deferred = timeout_deferred(
request_deferred,
timeout=_sec_timeout,
reactor=self.hs.get_reactor(),
)
with Measure(self.clock, "outbound_request"):
response = yield make_deferred_yieldable(
request_deferred,
with self.tracer.start_span('request', child_of=span) as child_span:
carrier = {}
opentracing.tracer.inject(
span_context=child_span.context,
format=opentracing.Format.HTTP_HEADERS,
carrier=carrier,
)
for key, value in iteritems(carrier):
headers_dict[key.encode("ascii")] = [value.encode("ascii")]
# we don't want all the fancy cookie and redirect handling that
# treq.request gives: just use the raw Agent.
request_deferred = self.agent.request(
method_bytes,
url_bytes,
headers=Headers(headers_dict),
bodyProducer=producer,
)
request_deferred = timeout_deferred(
request_deferred,
timeout=_sec_timeout,
reactor=self.hs.get_reactor(),
)
child_span.set_tag("http.method", request.method)
child_span.set_tag("http.url", url_str)
try:
with Measure(self.clock, "outbound_request"):
response = yield make_deferred_yieldable(
request_deferred,
)
except Exception as e:
child_span.set_tag("error", str(e))
raise
child_span.set_tag("http.status_code", response.code)
break
except Exception as e:
@ -455,7 +479,8 @@ class MatrixFederationHttpClient(object):
json_data_callback=None,
long_retries=False, timeout=None,
ignore_backoff=False,
backoff_on_404=False):
backoff_on_404=False,
span=None):
""" Sends the specifed json data using PUT
Args:
@ -506,6 +531,7 @@ class MatrixFederationHttpClient(object):
timeout=timeout,
ignore_backoff=ignore_backoff,
backoff_on_404=backoff_on_404,
span=span,
)
body = yield _handle_json_response(

View file

@ -66,6 +66,9 @@ REQUIREMENTS = {
# we use attr.s(slots), which arrived in 16.0.0
"attrs>=16.0.0": ["attr>=16.0.0"],
"netaddr>=0.7.18": ["netaddr"],
"jaeger_client": ["jaeger_client"],
"opentracing": ["opentracing"],
}
CONDITIONAL_REQUIREMENTS = {

View file

@ -21,6 +21,9 @@
# Imports required for the default HomeServer() implementation
import abc
import logging
import os
from jaeger_client import Config
from twisted.enterprise import adbapi
from twisted.mail.smtp import sendmail
@ -176,6 +179,7 @@ class HomeServer(object):
'pagination_handler',
'room_context_handler',
'sendmail',
'tracer',
]
# This is overridden in derived application classes
@ -472,6 +476,39 @@ class HomeServer(object):
def build_room_context_handler(self):
return RoomContextHandler(self)
def build_tracer(self):
# TODO: Make optional
jaeger_host = os.environ.get("SYNAPSE_JAEGER_HOST", None)
if jaeger_host:
config_dict = {
'sampler': {
'type': 'const',
'param': 1,
},
'logging': True,
'local_agent': {
'reporting_host': '172.18.0.1',
},
}
else:
config_dict = {
'sampler': {
'type': 'const',
'param': 0,
},
'logging': True,
}
config = Config(
config=config_dict,
service_name="synapse-" + self.config.server_name,
validate=True,
)
# this call also sets opentracing.tracer
tracer = config.initialize_tracer()
return tracer
def remove_pusher(self, app_id, push_key, user_id):
return self.get_pusherpool().remove_pusher(app_id, push_key, user_id)