mirror of
https://github.com/element-hq/synapse.git
synced 2024-11-22 01:25:44 +03:00
Ensure that incoming to-device messages are not dropped (#17127)
... when workers are unreachable, etc. Fixes https://github.com/element-hq/synapse/issues/17117. The general principle is just to make sure that we propagate any exceptions to the JsonResource, so that we return an error code to the sending server. That means that the sending server no longer considers the message safely sent, so it will retry later. In the issue, Erik mentions that an alternative solution would be to persist the to-device messages into a table so that they can be retried. This might be an improvement for performance, but even if we did that, we still need this mechanism, since we might be unable to reach the database. So, if we want to do that, it can be a later follow-up. --------- Co-authored-by: Erik Johnston <erik@matrix.org>
This commit is contained in:
parent
38bc7a009d
commit
c897ac63e9
5 changed files with 55 additions and 19 deletions
1
changelog.d/17127.bugfix
Normal file
1
changelog.d/17127.bugfix
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Fix a bug which meant that to-device messages received over federation could be dropped when the server was under load or networking problems caused problems between Synapse processes or the database.
|
|
@ -546,7 +546,25 @@ class FederationServer(FederationBase):
|
||||||
edu_type=edu_dict["edu_type"],
|
edu_type=edu_dict["edu_type"],
|
||||||
content=edu_dict["content"],
|
content=edu_dict["content"],
|
||||||
)
|
)
|
||||||
|
try:
|
||||||
await self.registry.on_edu(edu.edu_type, origin, edu.content)
|
await self.registry.on_edu(edu.edu_type, origin, edu.content)
|
||||||
|
except Exception:
|
||||||
|
# If there was an error handling the EDU, we must reject the
|
||||||
|
# transaction.
|
||||||
|
#
|
||||||
|
# Some EDU types (notably, to-device messages) are, despite their name,
|
||||||
|
# expected to be reliable; if we weren't able to do something with it,
|
||||||
|
# we have to tell the sender that, and the only way the protocol gives
|
||||||
|
# us to do so is by sending an HTTP error back on the transaction.
|
||||||
|
#
|
||||||
|
# We log the exception now, and then raise a new SynapseError to cause
|
||||||
|
# the transaction to be failed.
|
||||||
|
logger.exception("Error handling EDU of type %s", edu.edu_type)
|
||||||
|
raise SynapseError(500, f"Error handing EDU of type {edu.edu_type}")
|
||||||
|
|
||||||
|
# TODO: if the first EDU fails, we should probably abort the whole
|
||||||
|
# thing rather than carrying on with the rest of them. That would
|
||||||
|
# probably be best done inside `concurrently_execute`.
|
||||||
|
|
||||||
await concurrently_execute(
|
await concurrently_execute(
|
||||||
_process_edu,
|
_process_edu,
|
||||||
|
@ -1414,12 +1432,7 @@ class FederationHandlerRegistry:
|
||||||
handler = self.edu_handlers.get(edu_type)
|
handler = self.edu_handlers.get(edu_type)
|
||||||
if handler:
|
if handler:
|
||||||
with start_active_span_from_edu(content, "handle_edu"):
|
with start_active_span_from_edu(content, "handle_edu"):
|
||||||
try:
|
|
||||||
await handler(origin, content)
|
await handler(origin, content)
|
||||||
except SynapseError as e:
|
|
||||||
logger.info("Failed to handle edu %r: %r", edu_type, e)
|
|
||||||
except Exception:
|
|
||||||
logger.exception("Failed to handle edu %r", edu_type)
|
|
||||||
return
|
return
|
||||||
|
|
||||||
# Check if we can route it somewhere else that isn't us
|
# Check if we can route it somewhere else that isn't us
|
||||||
|
@ -1428,17 +1441,12 @@ class FederationHandlerRegistry:
|
||||||
# Pick an instance randomly so that we don't overload one.
|
# Pick an instance randomly so that we don't overload one.
|
||||||
route_to = random.choice(instances)
|
route_to = random.choice(instances)
|
||||||
|
|
||||||
try:
|
|
||||||
await self._send_edu(
|
await self._send_edu(
|
||||||
instance_name=route_to,
|
instance_name=route_to,
|
||||||
edu_type=edu_type,
|
edu_type=edu_type,
|
||||||
origin=origin,
|
origin=origin,
|
||||||
content=content,
|
content=content,
|
||||||
)
|
)
|
||||||
except SynapseError as e:
|
|
||||||
logger.info("Failed to handle edu %r: %r", edu_type, e)
|
|
||||||
except Exception:
|
|
||||||
logger.exception("Failed to handle edu %r", edu_type)
|
|
||||||
return
|
return
|
||||||
|
|
||||||
# Oh well, let's just log and move on.
|
# Oh well, let's just log and move on.
|
||||||
|
|
|
@ -104,6 +104,9 @@ class DeviceMessageHandler:
|
||||||
"""
|
"""
|
||||||
Handle receiving to-device messages from remote homeservers.
|
Handle receiving to-device messages from remote homeservers.
|
||||||
|
|
||||||
|
Note that any errors thrown from this method will cause the federation /send
|
||||||
|
request to receive an error response.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
origin: The remote homeserver.
|
origin: The remote homeserver.
|
||||||
content: The JSON dictionary containing the to-device messages.
|
content: The JSON dictionary containing the to-device messages.
|
||||||
|
|
|
@ -67,6 +67,23 @@ class FederationServerTests(unittest.FederatingHomeserverTestCase):
|
||||||
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, channel.result)
|
self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, channel.result)
|
||||||
self.assertEqual(channel.json_body["errcode"], "M_NOT_JSON")
|
self.assertEqual(channel.json_body["errcode"], "M_NOT_JSON")
|
||||||
|
|
||||||
|
def test_failed_edu_causes_500(self) -> None:
|
||||||
|
"""If the EDU handler fails, /send should return a 500."""
|
||||||
|
|
||||||
|
async def failing_handler(_origin: str, _content: JsonDict) -> None:
|
||||||
|
raise Exception("bleh")
|
||||||
|
|
||||||
|
self.hs.get_federation_registry().register_edu_handler(
|
||||||
|
"FAIL_EDU_TYPE", failing_handler
|
||||||
|
)
|
||||||
|
|
||||||
|
channel = self.make_signed_federation_request(
|
||||||
|
"PUT",
|
||||||
|
"/_matrix/federation/v1/send/txn",
|
||||||
|
{"edus": [{"edu_type": "FAIL_EDU_TYPE", "content": {}}]},
|
||||||
|
)
|
||||||
|
self.assertEqual(500, channel.code, channel.result)
|
||||||
|
|
||||||
|
|
||||||
class ServerACLsTestCase(unittest.TestCase):
|
class ServerACLsTestCase(unittest.TestCase):
|
||||||
def test_blocked_server(self) -> None:
|
def test_blocked_server(self) -> None:
|
||||||
|
|
|
@ -59,7 +59,14 @@ class RoomDirectoryFederationTests(unittest.FederatingHomeserverTestCase):
|
||||||
"/_matrix/federation/v1/send/txn_id_1234/",
|
"/_matrix/federation/v1/send/txn_id_1234/",
|
||||||
content={
|
content={
|
||||||
"edus": [
|
"edus": [
|
||||||
{"edu_type": EduTypes.DEVICE_LIST_UPDATE, "content": {"foo": "bar"}}
|
{
|
||||||
|
"edu_type": EduTypes.DEVICE_LIST_UPDATE,
|
||||||
|
"content": {
|
||||||
|
"device_id": "QBUAZIFURK",
|
||||||
|
"stream_id": 0,
|
||||||
|
"user_id": "@user:id",
|
||||||
|
},
|
||||||
|
},
|
||||||
],
|
],
|
||||||
"pdus": [],
|
"pdus": [],
|
||||||
},
|
},
|
||||||
|
|
Loading…
Reference in a new issue