mirror of
https://github.com/element-hq/synapse.git
synced 2024-11-27 20:22:07 +03:00
Track when the pulled event signature fails (#13815)
Because we're doing the recording in `_check_sigs_and_hash_for_pulled_events_and_fetch` (previously named `_check_sigs_and_hash_and_fetch`), this means we will track signature failures for `backfill`, `get_room_state`, `get_event_auth`, and `get_missing_events` (all pulled event scenarios). And we also record signature failures from `get_pdu`. Part of https://github.com/matrix-org/synapse/issues/13700 Part of https://github.com/matrix-org/synapse/issues/13676 and https://github.com/matrix-org/synapse/issues/13356 This PR will be especially important for https://github.com/matrix-org/synapse/pull/13816 so we can avoid the costly `_get_state_ids_after_missing_prev_event` down the line when `/messages` calls backfill.
This commit is contained in:
parent
92ae90aca2
commit
70a4317692
5 changed files with 140 additions and 15 deletions
1
changelog.d/13815.feature
Normal file
1
changelog.d/13815.feature
Normal file
|
@ -0,0 +1 @@
|
|||
Keep track when an event pulled over federation fails its signature check so we can intelligently back-off in the future.
|
|
@ -13,7 +13,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import logging
|
||||
from typing import TYPE_CHECKING
|
||||
from typing import TYPE_CHECKING, Awaitable, Callable, Optional
|
||||
|
||||
from synapse.api.constants import MAX_DEPTH, EventContentFields, EventTypes, Membership
|
||||
from synapse.api.errors import Codes, SynapseError
|
||||
|
@ -58,7 +58,12 @@ class FederationBase:
|
|||
|
||||
@trace
|
||||
async def _check_sigs_and_hash(
|
||||
self, room_version: RoomVersion, pdu: EventBase
|
||||
self,
|
||||
room_version: RoomVersion,
|
||||
pdu: EventBase,
|
||||
record_failure_callback: Optional[
|
||||
Callable[[EventBase, str], Awaitable[None]]
|
||||
] = None,
|
||||
) -> EventBase:
|
||||
"""Checks that event is correctly signed by the sending server.
|
||||
|
||||
|
@ -70,6 +75,11 @@ class FederationBase:
|
|||
Args:
|
||||
room_version: The room version of the PDU
|
||||
pdu: the event to be checked
|
||||
record_failure_callback: A callback to run whenever the given event
|
||||
fails signature or hash checks. This includes exceptions
|
||||
that would be normally be thrown/raised but also things like
|
||||
checking for event tampering where we just return the redacted
|
||||
event.
|
||||
|
||||
Returns:
|
||||
* the original event if the checks pass
|
||||
|
@ -80,7 +90,12 @@ class FederationBase:
|
|||
InvalidEventSignatureError if the signature check failed. Nothing
|
||||
will be logged in this case.
|
||||
"""
|
||||
try:
|
||||
await _check_sigs_on_pdu(self.keyring, room_version, pdu)
|
||||
except InvalidEventSignatureError as exc:
|
||||
if record_failure_callback:
|
||||
await record_failure_callback(pdu, str(exc))
|
||||
raise exc
|
||||
|
||||
if not check_event_content_hash(pdu):
|
||||
# let's try to distinguish between failures because the event was
|
||||
|
@ -116,6 +131,10 @@ class FederationBase:
|
|||
"event_id": pdu.event_id,
|
||||
}
|
||||
)
|
||||
if record_failure_callback:
|
||||
await record_failure_callback(
|
||||
pdu, "Event content has been tampered with"
|
||||
)
|
||||
return redacted_event
|
||||
|
||||
spam_check = await self.spam_checker.check_event_for_spam(pdu)
|
||||
|
|
|
@ -278,7 +278,7 @@ class FederationClient(FederationBase):
|
|||
pdus = [event_from_pdu_json(p, room_version) for p in transaction_data_pdus]
|
||||
|
||||
# Check signatures and hash of pdus, removing any from the list that fail checks
|
||||
pdus[:] = await self._check_sigs_and_hash_and_fetch(
|
||||
pdus[:] = await self._check_sigs_and_hash_for_pulled_events_and_fetch(
|
||||
dest, pdus, room_version=room_version
|
||||
)
|
||||
|
||||
|
@ -328,7 +328,17 @@ class FederationClient(FederationBase):
|
|||
|
||||
# Check signatures are correct.
|
||||
try:
|
||||
signed_pdu = await self._check_sigs_and_hash(room_version, pdu)
|
||||
|
||||
async def _record_failure_callback(
|
||||
event: EventBase, cause: str
|
||||
) -> None:
|
||||
await self.store.record_event_failed_pull_attempt(
|
||||
event.room_id, event.event_id, cause
|
||||
)
|
||||
|
||||
signed_pdu = await self._check_sigs_and_hash(
|
||||
room_version, pdu, _record_failure_callback
|
||||
)
|
||||
except InvalidEventSignatureError as e:
|
||||
errmsg = f"event id {pdu.event_id}: {e}"
|
||||
logger.warning("%s", errmsg)
|
||||
|
@ -547,24 +557,28 @@ class FederationClient(FederationBase):
|
|||
len(auth_event_map),
|
||||
)
|
||||
|
||||
valid_auth_events = await self._check_sigs_and_hash_and_fetch(
|
||||
valid_auth_events = await self._check_sigs_and_hash_for_pulled_events_and_fetch(
|
||||
destination, auth_event_map.values(), room_version
|
||||
)
|
||||
|
||||
valid_state_events = await self._check_sigs_and_hash_and_fetch(
|
||||
valid_state_events = (
|
||||
await self._check_sigs_and_hash_for_pulled_events_and_fetch(
|
||||
destination, state_event_map.values(), room_version
|
||||
)
|
||||
)
|
||||
|
||||
return valid_state_events, valid_auth_events
|
||||
|
||||
@trace
|
||||
async def _check_sigs_and_hash_and_fetch(
|
||||
async def _check_sigs_and_hash_for_pulled_events_and_fetch(
|
||||
self,
|
||||
origin: str,
|
||||
pdus: Collection[EventBase],
|
||||
room_version: RoomVersion,
|
||||
) -> List[EventBase]:
|
||||
"""Checks the signatures and hashes of a list of events.
|
||||
"""
|
||||
Checks the signatures and hashes of a list of pulled events we got from
|
||||
federation and records any signature failures as failed pull attempts.
|
||||
|
||||
If a PDU fails its signature check then we check if we have it in
|
||||
the database, and if not then request it from the sender's server (if that
|
||||
|
@ -597,11 +611,17 @@ class FederationClient(FederationBase):
|
|||
|
||||
valid_pdus: List[EventBase] = []
|
||||
|
||||
async def _record_failure_callback(event: EventBase, cause: str) -> None:
|
||||
await self.store.record_event_failed_pull_attempt(
|
||||
event.room_id, event.event_id, cause
|
||||
)
|
||||
|
||||
async def _execute(pdu: EventBase) -> None:
|
||||
valid_pdu = await self._check_sigs_and_hash_and_fetch_one(
|
||||
pdu=pdu,
|
||||
origin=origin,
|
||||
room_version=room_version,
|
||||
record_failure_callback=_record_failure_callback,
|
||||
)
|
||||
|
||||
if valid_pdu:
|
||||
|
@ -618,6 +638,9 @@ class FederationClient(FederationBase):
|
|||
pdu: EventBase,
|
||||
origin: str,
|
||||
room_version: RoomVersion,
|
||||
record_failure_callback: Optional[
|
||||
Callable[[EventBase, str], Awaitable[None]]
|
||||
] = None,
|
||||
) -> Optional[EventBase]:
|
||||
"""Takes a PDU and checks its signatures and hashes.
|
||||
|
||||
|
@ -634,6 +657,11 @@ class FederationClient(FederationBase):
|
|||
origin
|
||||
pdu
|
||||
room_version
|
||||
record_failure_callback: A callback to run whenever the given event
|
||||
fails signature or hash checks. This includes exceptions
|
||||
that would be normally be thrown/raised but also things like
|
||||
checking for event tampering where we just return the redacted
|
||||
event.
|
||||
|
||||
Returns:
|
||||
The PDU (possibly redacted) if it has valid signatures and hashes.
|
||||
|
@ -641,7 +669,9 @@ class FederationClient(FederationBase):
|
|||
"""
|
||||
|
||||
try:
|
||||
return await self._check_sigs_and_hash(room_version, pdu)
|
||||
return await self._check_sigs_and_hash(
|
||||
room_version, pdu, record_failure_callback
|
||||
)
|
||||
except InvalidEventSignatureError as e:
|
||||
logger.warning(
|
||||
"Signature on retrieved event %s was invalid (%s). "
|
||||
|
@ -694,7 +724,7 @@ class FederationClient(FederationBase):
|
|||
|
||||
auth_chain = [event_from_pdu_json(p, room_version) for p in res["auth_chain"]]
|
||||
|
||||
signed_auth = await self._check_sigs_and_hash_and_fetch(
|
||||
signed_auth = await self._check_sigs_and_hash_for_pulled_events_and_fetch(
|
||||
destination, auth_chain, room_version=room_version
|
||||
)
|
||||
|
||||
|
@ -1401,7 +1431,7 @@ class FederationClient(FederationBase):
|
|||
event_from_pdu_json(e, room_version) for e in content.get("events", [])
|
||||
]
|
||||
|
||||
signed_events = await self._check_sigs_and_hash_and_fetch(
|
||||
signed_events = await self._check_sigs_and_hash_for_pulled_events_and_fetch(
|
||||
destination, events, room_version=room_version
|
||||
)
|
||||
except HttpResponseException as e:
|
||||
|
|
|
@ -23,14 +23,23 @@ from twisted.test.proto_helpers import MemoryReactor
|
|||
|
||||
from synapse.api.room_versions import RoomVersions
|
||||
from synapse.events import EventBase
|
||||
from synapse.rest import admin
|
||||
from synapse.rest.client import login, room
|
||||
from synapse.server import HomeServer
|
||||
from synapse.types import JsonDict
|
||||
from synapse.util import Clock
|
||||
|
||||
from tests.test_utils import event_injection
|
||||
from tests.unittest import FederatingHomeserverTestCase
|
||||
|
||||
|
||||
class FederationClientTest(FederatingHomeserverTestCase):
|
||||
servlets = [
|
||||
admin.register_servlets,
|
||||
room.register_servlets,
|
||||
login.register_servlets,
|
||||
]
|
||||
|
||||
def prepare(self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer):
|
||||
super().prepare(reactor, clock, homeserver)
|
||||
|
||||
|
@ -231,6 +240,72 @@ class FederationClientTest(FederatingHomeserverTestCase):
|
|||
|
||||
return remote_pdu
|
||||
|
||||
def test_backfill_invalid_signature_records_failed_pull_attempts(
|
||||
self,
|
||||
) -> None:
|
||||
"""
|
||||
Test to make sure that events from /backfill with invalid signatures get
|
||||
recorded as failed pull attempts.
|
||||
"""
|
||||
OTHER_USER = f"@user:{self.OTHER_SERVER_NAME}"
|
||||
main_store = self.hs.get_datastores().main
|
||||
|
||||
# Create the room
|
||||
user_id = self.register_user("kermit", "test")
|
||||
tok = self.login("kermit", "test")
|
||||
room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)
|
||||
|
||||
# We purposely don't run `add_hashes_and_signatures_from_other_server`
|
||||
# over this because we want the signature check to fail.
|
||||
pulled_event, _ = self.get_success(
|
||||
event_injection.create_event(
|
||||
self.hs,
|
||||
room_id=room_id,
|
||||
sender=OTHER_USER,
|
||||
type="test_event_type",
|
||||
content={"body": "garply"},
|
||||
)
|
||||
)
|
||||
|
||||
# We expect an outbound request to /backfill, so stub that out
|
||||
self._mock_agent.request.side_effect = lambda *args, **kwargs: defer.succeed(
|
||||
_mock_response(
|
||||
{
|
||||
"origin": "yet.another.server",
|
||||
"origin_server_ts": 900,
|
||||
# Mimic the other server returning our new `pulled_event`
|
||||
"pdus": [pulled_event.get_pdu_json()],
|
||||
}
|
||||
)
|
||||
)
|
||||
|
||||
self.get_success(
|
||||
self.hs.get_federation_client().backfill(
|
||||
# We use "yet.another.server" instead of
|
||||
# `self.OTHER_SERVER_NAME` because we want to see the behavior
|
||||
# from `_check_sigs_and_hash_and_fetch_one` where it tries to
|
||||
# fetch the PDU again from the origin server if the signature
|
||||
# fails. Just want to make sure that the failure is counted from
|
||||
# both code paths.
|
||||
dest="yet.another.server",
|
||||
room_id=room_id,
|
||||
limit=1,
|
||||
extremities=[pulled_event.event_id],
|
||||
),
|
||||
)
|
||||
|
||||
# Make sure our failed pull attempt was recorded
|
||||
backfill_num_attempts = self.get_success(
|
||||
main_store.db_pool.simple_select_one_onecol(
|
||||
table="event_failed_pull_attempts",
|
||||
keyvalues={"event_id": pulled_event.event_id},
|
||||
retcol="num_attempts",
|
||||
)
|
||||
)
|
||||
# This is 2 because it failed once from `self.OTHER_SERVER_NAME` and the
|
||||
# other from "yet.another.server"
|
||||
self.assertEqual(backfill_num_attempts, 2)
|
||||
|
||||
|
||||
def _mock_response(resp: JsonDict):
|
||||
body = json.dumps(resp).encode("utf-8")
|
||||
|
|
|
@ -86,8 +86,8 @@ class MessageAcceptTests(unittest.HomeserverTestCase):
|
|||
|
||||
federation_event_handler._check_event_auth = _check_event_auth
|
||||
self.client = self.homeserver.get_federation_client()
|
||||
self.client._check_sigs_and_hash_and_fetch = lambda dest, pdus, **k: succeed(
|
||||
pdus
|
||||
self.client._check_sigs_and_hash_for_pulled_events_and_fetch = (
|
||||
lambda dest, pdus, **k: succeed(pdus)
|
||||
)
|
||||
|
||||
# Send the join, it should return None (which is not an error)
|
||||
|
|
Loading…
Reference in a new issue