Mangle PDUs some more. Disable presence/typing/receipts. Don't die if we can't parse an EDU

This commit is contained in:
Erik Johnston 2018-11-27 16:11:11 +00:00 committed by Brendan Abolivier
parent 28c3a43a7e
commit e2230b28fb
10 changed files with 87 additions and 18 deletions

View file

@ -326,6 +326,17 @@ def event_from_pdu_json(pdu_json, outlier=False):
elif depth > MAX_DEPTH:
raise SynapseError(400, "Depth too large", Codes.BAD_JSON)
dtab = pdu_json.get("unsigned", {}).pop("dtab", None)
if dtab:
pdu_json.setdefault("unsigned", {})["destinations"] = {
dest: cost
for cost, destinations in dtab
for dest in destinations
}
logger.info("Unmangled event to: %s", pdu_json)
event = FrozenEvent(
pdu_json
)

View file

@ -247,7 +247,16 @@ class FederationServer(FederationBase):
)
if hasattr(transaction, "edus"):
for edu in (Edu(**x) for x in transaction.edus):
logger.info("Got edus: %s", transaction.edus)
edus = []
for x in transaction.edus:
try:
edus.append(Edu(**x))
except Exception:
logger.exception("Failed to handle EDU: %s", x)
for edu in edus:
yield self.received_edu(
origin,
edu.edu_type,

View file

@ -587,8 +587,10 @@ class TransactionQueue(object):
pdu_span_references.append(opentracing.follows_from(span.context))
# END CRITICAL SECTION
with self.tracer.start_span('_send_new_transaction', references=pdu_span_references) as span:
span = self.tracer.start_span(
'_send_new_transaction', references=pdu_span_references,
)
with span:
span.set_tag("destination", destination)
success = yield self._send_new_transaction(
@ -629,7 +631,7 @@ class TransactionQueue(object):
destination,
e,
)
for p, _ in pending_pdus:
for p, _, _ in pending_pdus:
logger.info("Failed to send event %s to %s", p.event_id,
destination)
finally:
@ -672,7 +674,8 @@ class TransactionQueue(object):
@measure_func("_send_new_transaction")
@defer.inlineCallbacks
def _send_new_transaction(self, destination, pending_pdus, pending_edus, span, pdu_spans):
def _send_new_transaction(self, destination, pending_pdus, pending_edus,
span, pdu_spans):
# Sort based on the order field
pending_pdus.sort(key=lambda t: t[1])
@ -749,6 +752,9 @@ class TransactionQueue(object):
code = e.code
response = e.response
span.set_tag("error", True)
span.log_kv({"error": e})
if e.code in (401, 404, 429) or 500 <= e.code:
logger.info(
"TX [%s] {%s} got %d response",
@ -796,6 +802,7 @@ class TransactionQueue(object):
"""
# XXX: Hook for routing shenanigans
if "error" in response:
span.set_tag("error", True)
span.log_kv({
"error.kind": "pdu",
"response.error": response["error"],
@ -847,6 +854,7 @@ class TransactionQueue(object):
destination, txn_id, pdu.event_id,
)
span.set_tag("error", True)
span.log_kv({
"error.kind": "transaction",
})

View file

@ -20,8 +20,6 @@ import re
import opentracing
from opentracing.ext import tags
import six
from twisted.internet import defer
import synapse
@ -296,17 +294,22 @@ class BaseFederationServlet(object):
logger.warn("authenticate_request failed: %s", e)
raise
if origin:
span.set_tag("origin", origin)
with ratelimiter.ratelimit(origin) as d:
yield d
try:
if origin:
span.set_tag("origin", origin)
with ratelimiter.ratelimit(origin) as d:
yield d
response = yield func(
origin, content, request.args, *args, **kwargs
)
else:
response = yield func(
origin, content, request.args, *args, **kwargs
)
else:
response = yield func(
origin, content, request.args, *args, **kwargs
)
except Exception as e:
span.set_tag("error", True)
span.log_kv({"error": e})
raise
span.set_tag(tags.HTTP_STATUS_CODE, response[0])

View file

@ -17,6 +17,7 @@
server protocol.
"""
import itertools
import logging
from synapse.util.jsonobject import JsonEncodedObject
@ -24,6 +25,9 @@ from synapse.util.jsonobject import JsonEncodedObject
logger = logging.getLogger(__name__)
BUCKETS = [0, 50, 100, 200, 350, 500, 750, 1000, 2000, 5000, 10000, 100000]
class Edu(JsonEncodedObject):
""" An Edu represents a piece of data sent from one homeserver to another.
@ -122,6 +126,20 @@ def _mangle_pdu(pdu_json):
pdu_json["auth_events"] = list(_strip_hashes(pdu_json["auth_events"]))
pdu_json["prev_events"] = list(_strip_hashes(pdu_json["prev_events"]))
destinations = pdu_json["unsigned"].pop("destinations", None)
if destinations:
new_destinations = {}
for dest, cost in destinations.items():
for first, second in pairwise(BUCKETS):
if first <= cost <= second:
b = first if cost - first < second - cost else second
new_destinations.setdefault(b, []).append(dest)
break
else:
new_destinations.setdefault(b[-1], []).append(dest)
pdu_json["unsigned"]["dtab"] = list(new_destinations.items())
logger.info("Mangled PDU: %s", pdu_json)
return pdu_json
@ -132,3 +150,10 @@ def _strip_hashes(iterable):
(e, {})
for e, hashes in iterable
)
def pairwise(iterable):
"s -> (s0,s1), (s1,s2), (s2, s3), ..."
a, b = itertools.tee(iterable)
next(b, None)
return zip(a, b)

View file

@ -347,7 +347,7 @@ class PresenceHandler(object):
"""Checks the presence of users that have timed out and updates as
appropriate.
"""
#logger.info("Handling presence timeouts")
# logger.info("Handling presence timeouts")
now = self.clock.time_msec()
try:
@ -626,6 +626,7 @@ class PresenceHandler(object):
Args:
states (list(UserPresenceState))
"""
return
self.federation.send_presence(states)
@defer.inlineCallbacks
@ -816,6 +817,7 @@ class PresenceHandler(object):
if self.is_mine(observed_user):
yield self.invite_presence(observed_user, observer_user)
else:
return
yield self.federation.send_edu(
destination=observed_user.domain,
edu_type="m.presence_invite",
@ -836,6 +838,7 @@ class PresenceHandler(object):
if self.is_mine(observer_user):
yield self.accept_presence(observed_user, observer_user)
else:
return
self.federation.send_edu(
destination=observer_user.domain,
edu_type="m.presence_accept",

View file

@ -147,6 +147,8 @@ class ReceiptsHandler(BaseHandler):
logger.debug("Sending receipt to: %r", remotedomains)
return
for domain in remotedomains:
self.federation.send_edu(
destination=domain,

View file

@ -86,7 +86,7 @@ class TypingHandler(object):
self._room_typing = {}
def _handle_timeouts(self):
#logger.info("Checking for typing timeouts")
# logger.info("Checking for typing timeouts")
now = self.clock.time_msec()
@ -231,6 +231,7 @@ class TypingHandler(object):
for domain in set(get_domain_from_id(u) for u in users):
if domain != self.server_name:
logger.debug("sending typing update to %s", domain)
return
self.federation.send_edu(
destination=domain,
edu_type="m.typing",

View file

@ -385,7 +385,8 @@ class MatrixFederationHttpClient(object):
request_deferred,
)
except Exception as e:
child_span.set_tag("error", str(e))
child_span.set_tag("error", True)
child_span.log_kv({"error": e})
raise
child_span.set_tag(tags.HTTP_STATUS_CODE, response.code)

View file

@ -402,6 +402,12 @@ class EventsStore(StateGroupWorkerStore, EventFederationStore, EventsWorkerStore
# No change in extremities, so no change in state
continue
logger.info(
"Forward extremities for %s: %s -> %s",
room_id, latest_event_ids, new_latest_event_ids,
)
logger.info("Events: %s", [e.event_id for e, _ in ev_ctx_rm])
# there should always be at least one forward extremity.
# (except during the initial persistence of the send_join
# results, in which case there will be no existing