Add backchatter

This commit is contained in:
Erik Johnston 2018-11-22 17:47:29 +00:00 committed by Andrew Morgan
parent ed43a63fcf
commit c400d9dcca
3 changed files with 72 additions and 7 deletions

View file

@ -226,11 +226,11 @@ class FederationServer(FederationBase):
thread_id, new_thread = pdu_to_thread[pdu.event_id]
logger.info("Assigning thread %d to %s", thread_id, pdu.event_id)
try:
yield self._handle_received_pdu(
ret = yield self._handle_received_pdu(
origin, pdu, thread_id=thread_id,
new_thread=new_thread
)
pdu_results[event_id] = {}
pdu_results[event_id] = ret
except FederationError as e:
logger.warn("Error handling PDU %s: %s", event_id, e)
pdu_results[event_id] = {"error": str(e)}
@ -259,7 +259,7 @@ class FederationServer(FederationBase):
"pdus": pdu_results,
}
logger.debug("Returning: %s", str(response))
logger.info("Returning: %s", str(response))
yield self.transaction_actions.set_response(
origin,
@ -627,11 +627,29 @@ class FederationServer(FederationBase):
affected=pdu.event_id,
)
destinations = pdu.unsigned.get("destinations", {})
costs = yield self.store.get_destination_healths(list(destinations))
logger.info("Destinations: %s", destinations)
logger.info("Costs: %s", costs)
dont_relay = set()
for dest, their_cost in destinations.items():
our_cost = costs.get(dest)
if our_cost and their_cost and their_cost < our_cost:
dont_relay.add(dest)
if destinations:
pdu.unsigned["destinations"] = {d: c for d, c in destinations.items() if d not in dont_relay}
yield self.handler.on_receive_pdu(
origin, pdu, sent_to_us_directly=True,
thread_id=thread_id, new_thread=new_thread,
)
defer.returnValue({"did_not_relay": list(dont_relay)})
def __str__(self):
return "<ReplicationLayer(%s)>" % self.server_name

View file

@ -23,6 +23,7 @@ from twisted.internet import defer
import synapse.metrics
from synapse.api.errors import FederationDeniedError, HttpResponseException
from synapse.events import FrozenEvent
from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
from synapse.metrics import (
LaterGauge,
@ -692,6 +693,10 @@ class TransactionQueue(object):
logger.debug("TX [%s] {%s} Marked as delivered", destination, txn_id)
if code == 200:
logger.info(
"TX [%s] {%s} got response json %s",
destination, txn_id, response
)
pdu_results = response.get("pdus", {})
for p in pdus:
yield self._pdu_send_result(
@ -705,6 +710,7 @@ class TransactionQueue(object):
defer.returnValue(success)
@defer.inlineCallbacks
def _pdu_send_result(self, destination, txn_id, pdu, response):
"""Gets called after sending the event in a transaction, with the
result for the event from the remote server.
@ -715,7 +721,35 @@ class TransactionQueue(object):
"TX [%s] {%s} Remote returned error for %s: %s",
destination, txn_id, pdu.event_id, response,
)
pdu_logger.info(
"SendErrorPDU",
extra={
"event_id": pdu.event_id, "room_id": pdu.room_id,
"destination": destination,
"server": self.server_name,
},
)
new_destinations = set(pdu.unsigned.get("destinations", []))
new_destinations.discard(destination)
yield self._send_pdu(pdu, list(new_destinations))
elif "did_not_relay" in response and response["did_not_relay"]:
new_destinations = set(response["did_not_relay"])
new_destinations.discard(destination)
pdu_logger.info(
"DidNotRelayPDU",
extra={
"event_id": pdu.event_id, "room_id": pdu.room_id,
"destination": destination,
"new_destinations": json.dumps(list(new_destinations)),
"server": self.server_name,
},
)
yield self._send_pdu(pdu, list(new_destinations))
@defer.inlineCallbacks
def _pdu_send_txn_failed(self, destination, txn_id, pdu):
"""Gets called when sending a transaction failed (after retries)
"""
@ -724,3 +758,16 @@ class TransactionQueue(object):
"TX [%s] {%s} Failed to send event %s",
destination, txn_id, pdu.event_id,
)
pdu_logger.info(
"SendFailPDU",
extra={
"event_id": pdu.event_id, "room_id": pdu.room_id,
"destination": destination,
"server": self.server_name,
},
)
new_destinations = set(pdu.unsigned.get("destinations", []))
new_destinations.discard(destination)
yield self._send_pdu(pdu, list(new_destinations))

View file

@ -120,10 +120,10 @@ class Authenticator(object):
):
raise FederationDeniedError(origin)
if not json_request["signatures"]:
raise NoAuthenticationError(
401, "Missing Authorization headers", Codes.UNAUTHORIZED,
)
# if not json_request["signatures"]:
# raise NoAuthenticationError(
# 401, "Missing Authorization headers", Codes.UNAUTHORIZED,
# )
# yield self.keyring.verify_json_for_server(origin, json_request)