mirror of
https://github.com/element-hq/synapse.git
synced 2024-11-27 12:08:32 +03:00
Replace the @metrics.counted annotations in federation with specifically-written counters and distributions
This commit is contained in:
parent
c1cdd7954d
commit
2e4f0b2bd7
3 changed files with 29 additions and 41 deletions
|
@ -36,7 +36,15 @@ import random
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
metrics = synapse.metrics.get_metrics_for(__name__)
|
|
||||||
|
# synapse.federation.federation_client is a silly name
|
||||||
|
metrics = synapse.metrics.get_metrics_for("synapse.federation.client")
|
||||||
|
|
||||||
|
sent_pdus_destination_dist = metrics.register_distribution("sent_pdu_destinations")
|
||||||
|
|
||||||
|
sent_edus_counter = metrics.register_counter("sent_edus")
|
||||||
|
|
||||||
|
sent_queries_counter = metrics.register_counter("sent_queries", labels=["type"])
|
||||||
|
|
||||||
|
|
||||||
class FederationClient(FederationBase):
|
class FederationClient(FederationBase):
|
||||||
|
@ -53,7 +61,6 @@ class FederationClient(FederationBase):
|
||||||
self._get_pdu_cache.start()
|
self._get_pdu_cache.start()
|
||||||
|
|
||||||
@log_function
|
@log_function
|
||||||
@metrics.counted
|
|
||||||
def send_pdu(self, pdu, destinations):
|
def send_pdu(self, pdu, destinations):
|
||||||
"""Informs the replication layer about a new PDU generated within the
|
"""Informs the replication layer about a new PDU generated within the
|
||||||
home server that should be transmitted to others.
|
home server that should be transmitted to others.
|
||||||
|
@ -70,6 +77,8 @@ class FederationClient(FederationBase):
|
||||||
order = self._order
|
order = self._order
|
||||||
self._order += 1
|
self._order += 1
|
||||||
|
|
||||||
|
sent_pdus_destination_dist.inc_by(len(destinations))
|
||||||
|
|
||||||
logger.debug("[%s] transaction_layer.enqueue_pdu... ", pdu.event_id)
|
logger.debug("[%s] transaction_layer.enqueue_pdu... ", pdu.event_id)
|
||||||
|
|
||||||
# TODO, add errback, etc.
|
# TODO, add errback, etc.
|
||||||
|
@ -81,7 +90,6 @@ class FederationClient(FederationBase):
|
||||||
)
|
)
|
||||||
|
|
||||||
@log_function
|
@log_function
|
||||||
@metrics.counted
|
|
||||||
def send_edu(self, destination, edu_type, content):
|
def send_edu(self, destination, edu_type, content):
|
||||||
edu = Edu(
|
edu = Edu(
|
||||||
origin=self.server_name,
|
origin=self.server_name,
|
||||||
|
@ -90,18 +98,18 @@ class FederationClient(FederationBase):
|
||||||
content=content,
|
content=content,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
sent_edus_counter.inc()
|
||||||
|
|
||||||
# TODO, add errback, etc.
|
# TODO, add errback, etc.
|
||||||
self._transaction_queue.enqueue_edu(edu)
|
self._transaction_queue.enqueue_edu(edu)
|
||||||
return defer.succeed(None)
|
return defer.succeed(None)
|
||||||
|
|
||||||
@log_function
|
@log_function
|
||||||
@metrics.counted
|
|
||||||
def send_failure(self, failure, destination):
|
def send_failure(self, failure, destination):
|
||||||
self._transaction_queue.enqueue_failure(failure, destination)
|
self._transaction_queue.enqueue_failure(failure, destination)
|
||||||
return defer.succeed(None)
|
return defer.succeed(None)
|
||||||
|
|
||||||
@log_function
|
@log_function
|
||||||
@metrics.counted
|
|
||||||
def make_query(self, destination, query_type, args,
|
def make_query(self, destination, query_type, args,
|
||||||
retry_on_dns_fail=True):
|
retry_on_dns_fail=True):
|
||||||
"""Sends a federation Query to a remote homeserver of the given type
|
"""Sends a federation Query to a remote homeserver of the given type
|
||||||
|
@ -118,6 +126,8 @@ class FederationClient(FederationBase):
|
||||||
a Deferred which will eventually yield a JSON object from the
|
a Deferred which will eventually yield a JSON object from the
|
||||||
response
|
response
|
||||||
"""
|
"""
|
||||||
|
sent_queries_counter.inc(query_type)
|
||||||
|
|
||||||
return self.transport_layer.make_query(
|
return self.transport_layer.make_query(
|
||||||
destination, query_type, args, retry_on_dns_fail=retry_on_dns_fail
|
destination, query_type, args, retry_on_dns_fail=retry_on_dns_fail
|
||||||
)
|
)
|
||||||
|
@ -163,7 +173,6 @@ class FederationClient(FederationBase):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
@log_function
|
||||||
@metrics.counted
|
|
||||||
def get_pdu(self, destinations, event_id, outlier=False):
|
def get_pdu(self, destinations, event_id, outlier=False):
|
||||||
"""Requests the PDU with given origin and ID from the remote home
|
"""Requests the PDU with given origin and ID from the remote home
|
||||||
servers.
|
servers.
|
||||||
|
@ -253,7 +262,6 @@ class FederationClient(FederationBase):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
@log_function
|
||||||
@metrics.counted
|
|
||||||
def get_state_for_room(self, destination, room_id, event_id):
|
def get_state_for_room(self, destination, room_id, event_id):
|
||||||
"""Requests all of the `current` state PDUs for a given room from
|
"""Requests all of the `current` state PDUs for a given room from
|
||||||
a remote home server.
|
a remote home server.
|
||||||
|
@ -294,7 +302,6 @@ class FederationClient(FederationBase):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
@log_function
|
||||||
@metrics.counted
|
|
||||||
def get_event_auth(self, destination, room_id, event_id):
|
def get_event_auth(self, destination, room_id, event_id):
|
||||||
res = yield self.transport_layer.get_event_auth(
|
res = yield self.transport_layer.get_event_auth(
|
||||||
destination, room_id, event_id,
|
destination, room_id, event_id,
|
||||||
|
@ -314,7 +321,6 @@ class FederationClient(FederationBase):
|
||||||
defer.returnValue(signed_auth)
|
defer.returnValue(signed_auth)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@metrics.counted
|
|
||||||
def make_join(self, destinations, room_id, user_id):
|
def make_join(self, destinations, room_id, user_id):
|
||||||
for destination in destinations:
|
for destination in destinations:
|
||||||
try:
|
try:
|
||||||
|
@ -341,7 +347,6 @@ class FederationClient(FederationBase):
|
||||||
raise RuntimeError("Failed to send to any server.")
|
raise RuntimeError("Failed to send to any server.")
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@metrics.counted
|
|
||||||
def send_join(self, destinations, pdu):
|
def send_join(self, destinations, pdu):
|
||||||
for destination in destinations:
|
for destination in destinations:
|
||||||
try:
|
try:
|
||||||
|
@ -391,7 +396,6 @@ class FederationClient(FederationBase):
|
||||||
raise RuntimeError("Failed to send to any server.")
|
raise RuntimeError("Failed to send to any server.")
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@metrics.counted
|
|
||||||
def send_invite(self, destination, room_id, event_id, pdu):
|
def send_invite(self, destination, room_id, event_id, pdu):
|
||||||
time_now = self._clock.time_msec()
|
time_now = self._clock.time_msec()
|
||||||
code, content = yield self.transport_layer.send_invite(
|
code, content = yield self.transport_layer.send_invite(
|
||||||
|
@ -415,7 +419,6 @@ class FederationClient(FederationBase):
|
||||||
defer.returnValue(pdu)
|
defer.returnValue(pdu)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@metrics.counted
|
|
||||||
def query_auth(self, destination, room_id, event_id, local_auth):
|
def query_auth(self, destination, room_id, event_id, local_auth):
|
||||||
"""
|
"""
|
||||||
Params:
|
Params:
|
||||||
|
|
|
@ -33,7 +33,14 @@ import logging
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
metrics = synapse.metrics.get_metrics_for(__name__)
|
# synapse.federation.federation_server is a silly name
|
||||||
|
metrics = synapse.metrics.get_metrics_for("synapse.federation.server")
|
||||||
|
|
||||||
|
received_pdus_counter = metrics.register_counter("received_pdus")
|
||||||
|
|
||||||
|
received_edus_counter = metrics.register_counter("received_edus")
|
||||||
|
|
||||||
|
received_queries_counter = metrics.register_counter("received_queries", labels=["type"])
|
||||||
|
|
||||||
|
|
||||||
class FederationServer(FederationBase):
|
class FederationServer(FederationBase):
|
||||||
|
@ -75,7 +82,6 @@ class FederationServer(FederationBase):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
@log_function
|
||||||
@metrics.counted
|
|
||||||
def on_backfill_request(self, origin, room_id, versions, limit):
|
def on_backfill_request(self, origin, room_id, versions, limit):
|
||||||
pdus = yield self.handler.on_backfill_request(
|
pdus = yield self.handler.on_backfill_request(
|
||||||
origin, room_id, versions, limit
|
origin, room_id, versions, limit
|
||||||
|
@ -85,10 +91,11 @@ class FederationServer(FederationBase):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
@log_function
|
||||||
@metrics.counted
|
|
||||||
def on_incoming_transaction(self, transaction_data):
|
def on_incoming_transaction(self, transaction_data):
|
||||||
transaction = Transaction(**transaction_data)
|
transaction = Transaction(**transaction_data)
|
||||||
|
|
||||||
|
received_pdus_counter.inc_by(len(transaction.pdus))
|
||||||
|
|
||||||
for p in transaction.pdus:
|
for p in transaction.pdus:
|
||||||
if "unsigned" in p:
|
if "unsigned" in p:
|
||||||
unsigned = p["unsigned"]
|
unsigned = p["unsigned"]
|
||||||
|
@ -158,6 +165,8 @@ class FederationServer(FederationBase):
|
||||||
defer.returnValue((200, response))
|
defer.returnValue((200, response))
|
||||||
|
|
||||||
def received_edu(self, origin, edu_type, content):
|
def received_edu(self, origin, edu_type, content):
|
||||||
|
received_edus_counter.inc()
|
||||||
|
|
||||||
if edu_type in self.edu_handlers:
|
if edu_type in self.edu_handlers:
|
||||||
self.edu_handlers[edu_type](origin, content)
|
self.edu_handlers[edu_type](origin, content)
|
||||||
else:
|
else:
|
||||||
|
@ -165,7 +174,6 @@ class FederationServer(FederationBase):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
@log_function
|
||||||
@metrics.counted
|
|
||||||
def on_context_state_request(self, origin, room_id, event_id):
|
def on_context_state_request(self, origin, room_id, event_id):
|
||||||
if event_id:
|
if event_id:
|
||||||
pdus = yield self.handler.get_state_for_pdu(
|
pdus = yield self.handler.get_state_for_pdu(
|
||||||
|
@ -193,7 +201,6 @@ class FederationServer(FederationBase):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
@log_function
|
||||||
@metrics.counted
|
|
||||||
def on_pdu_request(self, origin, event_id):
|
def on_pdu_request(self, origin, event_id):
|
||||||
pdu = yield self._get_persisted_pdu(origin, event_id)
|
pdu = yield self._get_persisted_pdu(origin, event_id)
|
||||||
|
|
||||||
|
@ -206,13 +213,13 @@ class FederationServer(FederationBase):
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@log_function
|
@log_function
|
||||||
@metrics.counted
|
|
||||||
def on_pull_request(self, origin, versions):
|
def on_pull_request(self, origin, versions):
|
||||||
raise NotImplementedError("Pull transactions not implemented")
|
raise NotImplementedError("Pull transactions not implemented")
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@metrics.counted
|
|
||||||
def on_query_request(self, query_type, args):
|
def on_query_request(self, query_type, args):
|
||||||
|
received_queries_counter.inc(query_type)
|
||||||
|
|
||||||
if query_type in self.query_handlers:
|
if query_type in self.query_handlers:
|
||||||
response = yield self.query_handlers[query_type](args)
|
response = yield self.query_handlers[query_type](args)
|
||||||
defer.returnValue((200, response))
|
defer.returnValue((200, response))
|
||||||
|
@ -222,14 +229,12 @@ class FederationServer(FederationBase):
|
||||||
)
|
)
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@metrics.counted
|
|
||||||
def on_make_join_request(self, room_id, user_id):
|
def on_make_join_request(self, room_id, user_id):
|
||||||
pdu = yield self.handler.on_make_join_request(room_id, user_id)
|
pdu = yield self.handler.on_make_join_request(room_id, user_id)
|
||||||
time_now = self._clock.time_msec()
|
time_now = self._clock.time_msec()
|
||||||
defer.returnValue({"event": pdu.get_pdu_json(time_now)})
|
defer.returnValue({"event": pdu.get_pdu_json(time_now)})
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@metrics.counted
|
|
||||||
def on_invite_request(self, origin, content):
|
def on_invite_request(self, origin, content):
|
||||||
pdu = self.event_from_pdu_json(content)
|
pdu = self.event_from_pdu_json(content)
|
||||||
ret_pdu = yield self.handler.on_invite_request(origin, pdu)
|
ret_pdu = yield self.handler.on_invite_request(origin, pdu)
|
||||||
|
@ -237,7 +242,6 @@ class FederationServer(FederationBase):
|
||||||
defer.returnValue((200, {"event": ret_pdu.get_pdu_json(time_now)}))
|
defer.returnValue((200, {"event": ret_pdu.get_pdu_json(time_now)}))
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@metrics.counted
|
|
||||||
def on_send_join_request(self, origin, content):
|
def on_send_join_request(self, origin, content):
|
||||||
logger.debug("on_send_join_request: content: %s", content)
|
logger.debug("on_send_join_request: content: %s", content)
|
||||||
pdu = self.event_from_pdu_json(content)
|
pdu = self.event_from_pdu_json(content)
|
||||||
|
@ -252,7 +256,6 @@ class FederationServer(FederationBase):
|
||||||
}))
|
}))
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@metrics.counted
|
|
||||||
def on_event_auth(self, origin, room_id, event_id):
|
def on_event_auth(self, origin, room_id, event_id):
|
||||||
time_now = self._clock.time_msec()
|
time_now = self._clock.time_msec()
|
||||||
auth_pdus = yield self.handler.on_event_auth(event_id)
|
auth_pdus = yield self.handler.on_event_auth(event_id)
|
||||||
|
@ -261,7 +264,6 @@ class FederationServer(FederationBase):
|
||||||
}))
|
}))
|
||||||
|
|
||||||
@defer.inlineCallbacks
|
@defer.inlineCallbacks
|
||||||
@metrics.counted
|
|
||||||
def on_query_auth_request(self, origin, content, event_id):
|
def on_query_auth_request(self, origin, content, event_id):
|
||||||
"""
|
"""
|
||||||
Content is a dict with keys::
|
Content is a dict with keys::
|
||||||
|
|
|
@ -62,23 +62,6 @@ class Metrics(object):
|
||||||
def register_cache(self, *args, **kwargs):
|
def register_cache(self, *args, **kwargs):
|
||||||
return self._register(CacheMetric, *args, **kwargs)
|
return self._register(CacheMetric, *args, **kwargs)
|
||||||
|
|
||||||
def counted(self, func):
|
|
||||||
""" A method decorator that registers a counter, to count invocations
|
|
||||||
of this method. """
|
|
||||||
if not hasattr(self, "method_counter"):
|
|
||||||
self.method_counter = self.register_counter(
|
|
||||||
"calls",
|
|
||||||
labels=["method"]
|
|
||||||
)
|
|
||||||
|
|
||||||
counter = self.method_counter
|
|
||||||
name = func.__name__
|
|
||||||
|
|
||||||
def wrapped(*args, **kwargs):
|
|
||||||
counter.inc(name)
|
|
||||||
return func(*args, **kwargs)
|
|
||||||
return wrapped
|
|
||||||
|
|
||||||
|
|
||||||
def get_metrics_for(pkg_name):
|
def get_metrics_for(pkg_name):
|
||||||
""" Returns a Metrics instance for conveniently creating metrics
|
""" Returns a Metrics instance for conveniently creating metrics
|
||||||
|
|
Loading…
Reference in a new issue