Handle slow/lossy connections better when sending transactions

This commit is contained in:
Erik Johnston 2018-11-29 15:50:04 +00:00 committed by Brendan Abolivier
parent 76d888cf48
commit 4e0ac33053
4 changed files with 44 additions and 36 deletions

View file

@ -37,7 +37,7 @@ from synapse.api.errors import (
from synapse.crypto.event_signing import compute_event_signature from synapse.crypto.event_signing import compute_event_signature
from synapse.federation.federation_base import FederationBase, event_from_pdu_json from synapse.federation.federation_base import FederationBase, event_from_pdu_json
from synapse.federation.persistence import TransactionActions from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Edu, Transaction from synapse.federation.units import Edu, Transaction, _mangle_pdu
from synapse.http.endpoint import parse_server_name from synapse.http.endpoint import parse_server_name
from synapse.replication.http.federation import ( from synapse.replication.http.federation import (
ReplicationFederationSendEduRestServlet, ReplicationFederationSendEduRestServlet,
@ -49,6 +49,7 @@ from synapse.util.caches.response_cache import ResponseCache
from synapse.util.logcontext import nested_logging_context from synapse.util.logcontext import nested_logging_context
from synapse.util.logutils import log_function from synapse.util.logutils import log_function
# when processing incoming transactions, we try to handle multiple rooms in # when processing incoming transactions, we try to handle multiple rooms in
# parallel, up to this limit. # parallel, up to this limit.
TRANSACTION_CONCURRENCY_LIMIT = 10 TRANSACTION_CONCURRENCY_LIMIT = 10
@ -365,8 +366,8 @@ class FederationServer(FederationBase):
) )
defer.returnValue({ defer.returnValue({
"pdus": [pdu.get_pdu_json() for pdu in pdus], "pdus": [_mangle_pdu(pdu.get_pdu_json()) for pdu in pdus],
"auth_chain": [pdu.get_pdu_json() for pdu in auth_chain], "auth_chain": [_mangle_pdu(pdu.get_pdu_json()) for pdu in auth_chain],
}) })
@defer.inlineCallbacks @defer.inlineCallbacks
@ -411,7 +412,7 @@ class FederationServer(FederationBase):
yield self.check_server_matches_acl(origin_host, pdu.room_id) yield self.check_server_matches_acl(origin_host, pdu.room_id)
ret_pdu = yield self.handler.on_invite_request(origin, pdu) ret_pdu = yield self.handler.on_invite_request(origin, pdu)
time_now = self._clock.time_msec() time_now = self._clock.time_msec()
defer.returnValue((200, {"event": ret_pdu.get_pdu_json(time_now)})) defer.returnValue((200, {"event": _mangle_pdu(ret_pdu.get_pdu_json(time_now))}))
@defer.inlineCallbacks @defer.inlineCallbacks
def on_send_join_request(self, origin, content): def on_send_join_request(self, origin, content):
@ -425,9 +426,9 @@ class FederationServer(FederationBase):
res_pdus = yield self.handler.on_send_join_request(origin, pdu) res_pdus = yield self.handler.on_send_join_request(origin, pdu)
time_now = self._clock.time_msec() time_now = self._clock.time_msec()
defer.returnValue((200, { defer.returnValue((200, {
"state": [p.get_pdu_json(time_now) for p in res_pdus["state"]], "state": [_mangle_pdu(p.get_pdu_json(time_now)) for p in res_pdus["state"]],
"auth_chain": [ "auth_chain": [
p.get_pdu_json(time_now) for p in res_pdus["auth_chain"] _mangle_pdu(p.get_pdu_json(time_now)) for p in res_pdus["auth_chain"]
], ],
})) }))
@ -460,7 +461,7 @@ class FederationServer(FederationBase):
time_now = self._clock.time_msec() time_now = self._clock.time_msec()
auth_pdus = yield self.handler.on_event_auth(event_id) auth_pdus = yield self.handler.on_event_auth(event_id)
res = { res = {
"auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus], "auth_chain": [_mangle_pdu(a.get_pdu_json(time_now)) for a in auth_pdus],
} }
defer.returnValue((200, res)) defer.returnValue((200, res))
@ -509,7 +510,7 @@ class FederationServer(FederationBase):
time_now = self._clock.time_msec() time_now = self._clock.time_msec()
send_content = { send_content = {
"auth_chain": [ "auth_chain": [
e.get_pdu_json(time_now) _mangle_pdu(e.get_pdu_json(time_now))
for e in ret["auth_chain"] for e in ret["auth_chain"]
], ],
"rejects": ret.get("rejects", []), "rejects": ret.get("rejects", []),
@ -585,7 +586,7 @@ class FederationServer(FederationBase):
time_now = self._clock.time_msec() time_now = self._clock.time_msec()
defer.returnValue({ defer.returnValue({
"events": [ev.get_pdu_json(time_now) for ev in missing_events], "events": [_mangle_pdu(ev.get_pdu_json(time_now)) for ev in missing_events],
}) })
@log_function @log_function

View file

@ -509,16 +509,22 @@ class TransactionQueue(object):
pending_pdus = self.pending_pdus_by_dest.pop(destination, []) pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
# We can only include at most 50 PDUs per transactions # We can only include at most 50 PDUs per transactions
pending_pdus, leftover_pdus = pending_pdus[:50], pending_pdus[50:] pending_pdus, leftover_pdus = pending_pdus[-5:], pending_pdus[:-5]
if leftover_pdus: if leftover_pdus:
self.pending_pdus_by_dest[destination] = leftover_pdus # self.pending_pdus_by_dest[destination] = leftover_pdus
for _, _, p_span in leftover_pdus:
p_span.set_tag("success", False)
p_span.log_kv({"result": "dropped"})
p_span.finish()
logger.info("TX [%s] Sending PDUs: %s", destination, pending_pdus)
pending_edus = self.pending_edus_by_dest.pop(destination, []) pending_edus = self.pending_edus_by_dest.pop(destination, [])
# We can only include at most 100 EDUs per transactions # We can only include at most 100 EDUs per transactions
pending_edus, leftover_edus = pending_edus[:100], pending_edus[100:] pending_edus, leftover_edus = pending_edus[-5:], pending_edus[:-5]
if leftover_edus: # if leftover_edus:
self.pending_edus_by_dest[destination] = leftover_edus # self.pending_edus_by_dest[destination] = leftover_edus
pending_presence = self.pending_presence_by_dest.pop(destination, {}) pending_presence = self.pending_presence_by_dest.pop(destination, {})
@ -834,7 +840,7 @@ class TransactionQueue(object):
yield self._send_pdu(pdu, list(new_destinations), span) yield self._send_pdu(pdu, list(new_destinations), span)
@defer.inlineCallbacks # @defer.inlineCallbacks
def _pdu_send_txn_failed(self, destination, txn_id, pdu, span): def _pdu_send_txn_failed(self, destination, txn_id, pdu, span):
"""Gets called when sending a transaction failed (after retries) """Gets called when sending a transaction failed (after retries)
""" """
@ -858,9 +864,9 @@ class TransactionQueue(object):
}, },
) )
new_destinations = set(pdu.unsigned.get("destinations", [])) # new_destinations = set(pdu.unsigned.get("destinations", []))
new_destinations.discard(destination) # new_destinations.discard(destination)
yield self._send_pdu(pdu, list(new_destinations), span) # yield self._send_pdu(pdu, list(new_destinations), span)
def _numberToBase(n, b): def _numberToBase(n, b):

View file

@ -20,7 +20,7 @@ server protocol.
import itertools import itertools
import logging import logging
from synapse.types import get_localpart_from_id from synapse.types import get_localpart_from_id, get_domain_from_id
from synapse.util.jsonobject import JsonEncodedObject from synapse.util.jsonobject import JsonEncodedObject
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -130,6 +130,7 @@ def _mangle_pdu(pdu_json):
pdu_json["auth_events"] = list(_strip_hashes(pdu_json["auth_events"])) pdu_json["auth_events"] = list(_strip_hashes(pdu_json["auth_events"]))
pdu_json["prev_events"] = list(_strip_hashes(pdu_json["prev_events"])) pdu_json["prev_events"] = list(_strip_hashes(pdu_json["prev_events"]))
if get_domain_from_id(pdu_json["event_id"]) == get_domain_from_id(pdu_json["sender"]):
pdu_json["event_id"] = get_localpart_from_id(pdu_json["event_id"]) pdu_json["event_id"] = get_localpart_from_id(pdu_json["event_id"])
destinations = pdu_json["unsigned"].pop("destinations", None) destinations = pdu_json["unsigned"].pop("destinations", None)

View file

@ -325,20 +325,20 @@ class FederationHandler(BaseHandler):
# but there is an interaction with min_depth that I'm not really # but there is an interaction with min_depth that I'm not really
# following. # following.
if sent_to_us_directly: # if sent_to_us_directly:
logger.warn( # logger.warn(
"[%s %s] Rejecting: failed to fetch %d prev events: %s", # "[%s %s] Rejecting: failed to fetch %d prev events: %s",
room_id, event_id, len(prevs - seen), shortstr(prevs - seen) # room_id, event_id, len(prevs - seen), shortstr(prevs - seen)
) # )
raise FederationError( # raise FederationError(
"ERROR", # "ERROR",
403, # 403,
( # (
"Your server isn't divulging details about prev_events " # "Your server isn't divulging details about prev_events "
"referenced in this event." # "referenced in this event."
), # ),
affected=pdu.event_id, # affected=pdu.event_id,
) # )
# Calculate the state after each of the previous events, and # Calculate the state after each of the previous events, and
# resolve them to find the correct state at the current event. # resolve them to find the correct state at the current event.
@ -563,9 +563,9 @@ class FederationHandler(BaseHandler):
room_id, room_id,
earliest_events_ids=list(latest), earliest_events_ids=list(latest),
latest_events=[pdu], latest_events=[pdu],
limit=10, limit=5,
min_depth=min_depth, min_depth=min_depth,
timeout=60000, timeout=15000,
) )
logger.info( logger.info(