mirror of
https://github.com/element-hq/synapse.git
synced 2024-11-26 11:36:03 +03:00
dd71eb0f8a
Currently federation catchup will send the last *local* event that we failed to send to the remote. This can cause issues for large rooms where lots of servers have sent events while the remote server was down, as when it comes back up again it'll be flooded with events from various points in the DAG. Instead, let's make it so that all the servers send the most recent events, even if its not theirs. The remote should deduplicate the events, so there shouldn't be much overhead in doing this. Alternatively, the servers could only send local events if they were also extremities and hope that the other server will send the event over, but that is a bit risky.
472 lines
17 KiB
Python
472 lines
17 KiB
Python
from typing import List, Tuple
|
|
|
|
from mock import Mock
|
|
|
|
from synapse.api.constants import EventTypes
|
|
from synapse.events import EventBase
|
|
from synapse.federation.sender import PerDestinationQueue, TransactionManager
|
|
from synapse.federation.units import Edu
|
|
from synapse.rest import admin
|
|
from synapse.rest.client.v1 import login, room
|
|
from synapse.util.retryutils import NotRetryingDestination
|
|
|
|
from tests.test_utils import event_injection, make_awaitable
|
|
from tests.unittest import FederatingHomeserverTestCase, override_config
|
|
|
|
|
|
class FederationCatchUpTestCases(FederatingHomeserverTestCase):
|
|
servlets = [
|
|
admin.register_servlets,
|
|
room.register_servlets,
|
|
login.register_servlets,
|
|
]
|
|
|
|
def make_homeserver(self, reactor, clock):
|
|
return self.setup_test_homeserver(
|
|
federation_transport_client=Mock(spec=["send_transaction"]),
|
|
)
|
|
|
|
def prepare(self, reactor, clock, hs):
|
|
# stub out get_current_hosts_in_room
|
|
state_handler = hs.get_state_handler()
|
|
|
|
# This mock is crucial for destination_rooms to be populated.
|
|
state_handler.get_current_hosts_in_room = Mock(
|
|
return_value=make_awaitable(["test", "host2"])
|
|
)
|
|
|
|
# whenever send_transaction is called, record the pdu data
|
|
self.pdus = []
|
|
self.failed_pdus = []
|
|
self.is_online = True
|
|
self.hs.get_federation_transport_client().send_transaction.side_effect = (
|
|
self.record_transaction
|
|
)
|
|
|
|
async def record_transaction(self, txn, json_cb):
|
|
if self.is_online:
|
|
data = json_cb()
|
|
self.pdus.extend(data["pdus"])
|
|
return {}
|
|
else:
|
|
data = json_cb()
|
|
self.failed_pdus.extend(data["pdus"])
|
|
raise NotRetryingDestination(0, 24 * 60 * 60 * 1000, txn.destination)
|
|
|
|
def get_destination_room(self, room: str, destination: str = "host2") -> dict:
|
|
"""
|
|
Gets the destination_rooms entry for a (destination, room_id) pair.
|
|
|
|
Args:
|
|
room: room ID
|
|
destination: what destination, default is "host2"
|
|
|
|
Returns:
|
|
Dictionary of { event_id: str, stream_ordering: int }
|
|
"""
|
|
event_id, stream_ordering = self.get_success(
|
|
self.hs.get_datastore().db_pool.execute(
|
|
"test:get_destination_rooms",
|
|
None,
|
|
"""
|
|
SELECT event_id, stream_ordering
|
|
FROM destination_rooms dr
|
|
JOIN events USING (stream_ordering)
|
|
WHERE dr.destination = ? AND dr.room_id = ?
|
|
""",
|
|
destination,
|
|
room,
|
|
)
|
|
)[0]
|
|
return {"event_id": event_id, "stream_ordering": stream_ordering}
|
|
|
|
@override_config({"send_federation": True})
|
|
def test_catch_up_destination_rooms_tracking(self):
|
|
"""
|
|
Tests that we populate the `destination_rooms` table as needed.
|
|
"""
|
|
self.register_user("u1", "you the one")
|
|
u1_token = self.login("u1", "you the one")
|
|
room = self.helper.create_room_as("u1", tok=u1_token)
|
|
|
|
self.get_success(
|
|
event_injection.inject_member_event(self.hs, room, "@user:host2", "join")
|
|
)
|
|
|
|
event_id_1 = self.helper.send(room, "wombats!", tok=u1_token)["event_id"]
|
|
|
|
row_1 = self.get_destination_room(room)
|
|
|
|
event_id_2 = self.helper.send(room, "rabbits!", tok=u1_token)["event_id"]
|
|
|
|
row_2 = self.get_destination_room(room)
|
|
|
|
# check: events correctly registered in order
|
|
self.assertEqual(row_1["event_id"], event_id_1)
|
|
self.assertEqual(row_2["event_id"], event_id_2)
|
|
self.assertEqual(row_1["stream_ordering"], row_2["stream_ordering"] - 1)
|
|
|
|
@override_config({"send_federation": True})
|
|
def test_catch_up_last_successful_stream_ordering_tracking(self):
|
|
"""
|
|
Tests that we populate the `destination_rooms` table as needed.
|
|
"""
|
|
self.register_user("u1", "you the one")
|
|
u1_token = self.login("u1", "you the one")
|
|
room = self.helper.create_room_as("u1", tok=u1_token)
|
|
|
|
# take the remote offline
|
|
self.is_online = False
|
|
|
|
self.get_success(
|
|
event_injection.inject_member_event(self.hs, room, "@user:host2", "join")
|
|
)
|
|
|
|
self.helper.send(room, "wombats!", tok=u1_token)
|
|
self.pump()
|
|
|
|
lsso_1 = self.get_success(
|
|
self.hs.get_datastore().get_destination_last_successful_stream_ordering(
|
|
"host2"
|
|
)
|
|
)
|
|
|
|
self.assertIsNone(
|
|
lsso_1,
|
|
"There should be no last successful stream ordering for an always-offline destination",
|
|
)
|
|
|
|
# bring the remote online
|
|
self.is_online = True
|
|
|
|
event_id_2 = self.helper.send(room, "rabbits!", tok=u1_token)["event_id"]
|
|
|
|
lsso_2 = self.get_success(
|
|
self.hs.get_datastore().get_destination_last_successful_stream_ordering(
|
|
"host2"
|
|
)
|
|
)
|
|
row_2 = self.get_destination_room(room)
|
|
|
|
self.assertEqual(
|
|
self.pdus[0]["content"]["body"],
|
|
"rabbits!",
|
|
"Test fault: didn't receive the right PDU",
|
|
)
|
|
self.assertEqual(
|
|
row_2["event_id"],
|
|
event_id_2,
|
|
"Test fault: destination_rooms not updated correctly",
|
|
)
|
|
self.assertEqual(
|
|
lsso_2,
|
|
row_2["stream_ordering"],
|
|
"Send succeeded but not marked as last_successful_stream_ordering",
|
|
)
|
|
|
|
@override_config({"send_federation": True}) # critical to federate
|
|
def test_catch_up_from_blank_state(self):
|
|
"""
|
|
Runs an overall test of federation catch-up from scratch.
|
|
Further tests will focus on more narrow aspects and edge-cases, but I
|
|
hope to provide an overall view with this test.
|
|
"""
|
|
# bring the other server online
|
|
self.is_online = True
|
|
|
|
# let's make some events for the other server to receive
|
|
self.register_user("u1", "you the one")
|
|
u1_token = self.login("u1", "you the one")
|
|
room_1 = self.helper.create_room_as("u1", tok=u1_token)
|
|
room_2 = self.helper.create_room_as("u1", tok=u1_token)
|
|
|
|
# also critical to federate
|
|
self.get_success(
|
|
event_injection.inject_member_event(self.hs, room_1, "@user:host2", "join")
|
|
)
|
|
self.get_success(
|
|
event_injection.inject_member_event(self.hs, room_2, "@user:host2", "join")
|
|
)
|
|
|
|
self.helper.send_state(
|
|
room_1, event_type="m.room.topic", body={"topic": "wombat"}, tok=u1_token
|
|
)
|
|
|
|
# check: PDU received for topic event
|
|
self.assertEqual(len(self.pdus), 1)
|
|
self.assertEqual(self.pdus[0]["type"], "m.room.topic")
|
|
|
|
# take the remote offline
|
|
self.is_online = False
|
|
|
|
# send another event
|
|
self.helper.send(room_1, "hi user!", tok=u1_token)
|
|
|
|
# check: things didn't go well since the remote is down
|
|
self.assertEqual(len(self.failed_pdus), 1)
|
|
self.assertEqual(self.failed_pdus[0]["content"]["body"], "hi user!")
|
|
|
|
# let's delete the federation transmission queue
|
|
# (this pretends we are starting up fresh.)
|
|
self.assertFalse(
|
|
self.hs.get_federation_sender()
|
|
._per_destination_queues["host2"]
|
|
.transmission_loop_running
|
|
)
|
|
del self.hs.get_federation_sender()._per_destination_queues["host2"]
|
|
|
|
# let's also clear any backoffs
|
|
self.get_success(
|
|
self.hs.get_datastore().set_destination_retry_timings("host2", None, 0, 0)
|
|
)
|
|
|
|
# bring the remote online and clear the received pdu list
|
|
self.is_online = True
|
|
self.pdus = []
|
|
|
|
# now we need to initiate a federation transaction somehow…
|
|
# to do that, let's send another event (because it's simple to do)
|
|
# (do it to another room otherwise the catch-up logic decides it doesn't
|
|
# need to catch up room_1 — something I overlooked when first writing
|
|
# this test)
|
|
self.helper.send(room_2, "wombats!", tok=u1_token)
|
|
|
|
# we should now have received both PDUs
|
|
self.assertEqual(len(self.pdus), 2)
|
|
self.assertEqual(self.pdus[0]["content"]["body"], "hi user!")
|
|
self.assertEqual(self.pdus[1]["content"]["body"], "wombats!")
|
|
|
|
def make_fake_destination_queue(
|
|
self, destination: str = "host2"
|
|
) -> Tuple[PerDestinationQueue, List[EventBase]]:
|
|
"""
|
|
Makes a fake per-destination queue.
|
|
"""
|
|
transaction_manager = TransactionManager(self.hs)
|
|
per_dest_queue = PerDestinationQueue(self.hs, transaction_manager, destination)
|
|
results_list = []
|
|
|
|
async def fake_send(
|
|
destination_tm: str,
|
|
pending_pdus: List[EventBase],
|
|
_pending_edus: List[Edu],
|
|
) -> bool:
|
|
assert destination == destination_tm
|
|
results_list.extend(pending_pdus)
|
|
return True # success!
|
|
|
|
transaction_manager.send_new_transaction = fake_send
|
|
|
|
return per_dest_queue, results_list
|
|
|
|
@override_config({"send_federation": True})
|
|
def test_catch_up_loop(self):
|
|
"""
|
|
Tests the behaviour of _catch_up_transmission_loop.
|
|
"""
|
|
|
|
# ARRANGE:
|
|
# - a local user (u1)
|
|
# - 3 rooms which u1 is joined to (and remote user @user:host2 is
|
|
# joined to)
|
|
# - some events (1 to 5) in those rooms
|
|
# we have 'already sent' events 1 and 2 to host2
|
|
per_dest_queue, sent_pdus = self.make_fake_destination_queue()
|
|
|
|
self.register_user("u1", "you the one")
|
|
u1_token = self.login("u1", "you the one")
|
|
room_1 = self.helper.create_room_as("u1", tok=u1_token)
|
|
room_2 = self.helper.create_room_as("u1", tok=u1_token)
|
|
room_3 = self.helper.create_room_as("u1", tok=u1_token)
|
|
self.get_success(
|
|
event_injection.inject_member_event(self.hs, room_1, "@user:host2", "join")
|
|
)
|
|
self.get_success(
|
|
event_injection.inject_member_event(self.hs, room_2, "@user:host2", "join")
|
|
)
|
|
self.get_success(
|
|
event_injection.inject_member_event(self.hs, room_3, "@user:host2", "join")
|
|
)
|
|
|
|
# create some events
|
|
self.helper.send(room_1, "you hear me!!", tok=u1_token)
|
|
event_id_2 = self.helper.send(room_2, "wombats!", tok=u1_token)["event_id"]
|
|
self.helper.send(room_3, "Matrix!", tok=u1_token)
|
|
event_id_4 = self.helper.send(room_2, "rabbits!", tok=u1_token)["event_id"]
|
|
event_id_5 = self.helper.send(room_3, "Synapse!", tok=u1_token)["event_id"]
|
|
|
|
# destination_rooms should already be populated, but let us pretend that we already
|
|
# sent (successfully) up to and including event id 2
|
|
event_2 = self.get_success(self.hs.get_datastore().get_event(event_id_2))
|
|
|
|
# also fetch event 5 so we know its last_successful_stream_ordering later
|
|
event_5 = self.get_success(self.hs.get_datastore().get_event(event_id_5))
|
|
|
|
self.get_success(
|
|
self.hs.get_datastore().set_destination_last_successful_stream_ordering(
|
|
"host2", event_2.internal_metadata.stream_ordering
|
|
)
|
|
)
|
|
|
|
# ACT
|
|
self.get_success(per_dest_queue._catch_up_transmission_loop())
|
|
|
|
# ASSERT, noticing in particular:
|
|
# - event 3 not sent out, because event 5 replaces it
|
|
# - order is least recent first, so event 5 comes after event 4
|
|
# - catch-up is completed
|
|
self.assertEqual(len(sent_pdus), 2)
|
|
self.assertEqual(sent_pdus[0].event_id, event_id_4)
|
|
self.assertEqual(sent_pdus[1].event_id, event_id_5)
|
|
self.assertFalse(per_dest_queue._catching_up)
|
|
self.assertEqual(
|
|
per_dest_queue._last_successful_stream_ordering,
|
|
event_5.internal_metadata.stream_ordering,
|
|
)
|
|
|
|
@override_config({"send_federation": True})
|
|
def test_catch_up_on_synapse_startup(self):
|
|
"""
|
|
Tests the behaviour of get_catch_up_outstanding_destinations and
|
|
_wake_destinations_needing_catchup.
|
|
"""
|
|
|
|
# list of sorted server names (note that there are more servers than the batch
|
|
# size used in get_catch_up_outstanding_destinations).
|
|
server_names = ["server%02d" % number for number in range(42)] + ["zzzerver"]
|
|
|
|
# ARRANGE:
|
|
# - a local user (u1)
|
|
# - a room which u1 is joined to (and remote users @user:serverXX are
|
|
# joined to)
|
|
|
|
# mark the remotes as online
|
|
self.is_online = True
|
|
|
|
self.register_user("u1", "you the one")
|
|
u1_token = self.login("u1", "you the one")
|
|
room_id = self.helper.create_room_as("u1", tok=u1_token)
|
|
|
|
for server_name in server_names:
|
|
self.get_success(
|
|
event_injection.inject_member_event(
|
|
self.hs, room_id, "@user:%s" % server_name, "join"
|
|
)
|
|
)
|
|
|
|
# create an event
|
|
self.helper.send(room_id, "deary me!", tok=u1_token)
|
|
|
|
# ASSERT:
|
|
# - All servers are up to date so none should have outstanding catch-up
|
|
outstanding_when_successful = self.get_success(
|
|
self.hs.get_datastore().get_catch_up_outstanding_destinations(None)
|
|
)
|
|
self.assertEqual(outstanding_when_successful, [])
|
|
|
|
# ACT:
|
|
# - Make the remote servers unreachable
|
|
self.is_online = False
|
|
|
|
# - Mark zzzerver as being backed-off from
|
|
now = self.clock.time_msec()
|
|
self.get_success(
|
|
self.hs.get_datastore().set_destination_retry_timings(
|
|
"zzzerver", now, now, 24 * 60 * 60 * 1000 # retry in 1 day
|
|
)
|
|
)
|
|
|
|
# - Send an event
|
|
self.helper.send(room_id, "can anyone hear me?", tok=u1_token)
|
|
|
|
# ASSERT (get_catch_up_outstanding_destinations):
|
|
# - all remotes are outstanding
|
|
# - they are returned in batches of 25, in order
|
|
outstanding_1 = self.get_success(
|
|
self.hs.get_datastore().get_catch_up_outstanding_destinations(None)
|
|
)
|
|
|
|
self.assertEqual(len(outstanding_1), 25)
|
|
self.assertEqual(outstanding_1, server_names[0:25])
|
|
|
|
outstanding_2 = self.get_success(
|
|
self.hs.get_datastore().get_catch_up_outstanding_destinations(
|
|
outstanding_1[-1]
|
|
)
|
|
)
|
|
self.assertNotIn("zzzerver", outstanding_2)
|
|
self.assertEqual(len(outstanding_2), 17)
|
|
self.assertEqual(outstanding_2, server_names[25:-1])
|
|
|
|
# ACT: call _wake_destinations_needing_catchup
|
|
|
|
# patch wake_destination to just count the destinations instead
|
|
woken = []
|
|
|
|
def wake_destination_track(destination):
|
|
woken.append(destination)
|
|
|
|
self.hs.get_federation_sender().wake_destination = wake_destination_track
|
|
|
|
# cancel the pre-existing timer for _wake_destinations_needing_catchup
|
|
# this is because we are calling it manually rather than waiting for it
|
|
# to be called automatically
|
|
self.hs.get_federation_sender()._catchup_after_startup_timer.cancel()
|
|
|
|
self.get_success(
|
|
self.hs.get_federation_sender()._wake_destinations_needing_catchup(), by=5.0
|
|
)
|
|
|
|
# ASSERT (_wake_destinations_needing_catchup):
|
|
# - all remotes are woken up, save for zzzerver
|
|
self.assertNotIn("zzzerver", woken)
|
|
# - all destinations are woken exactly once; they appear once in woken.
|
|
self.assertCountEqual(woken, server_names[:-1])
|
|
|
|
@override_config({"send_federation": True})
|
|
def test_not_latest_event(self):
|
|
"""Test that we send the latest event in the room even if its not ours."""
|
|
|
|
per_dest_queue, sent_pdus = self.make_fake_destination_queue()
|
|
|
|
# Make a room with a local user, and two servers. One will go offline
|
|
# and one will send some events.
|
|
self.register_user("u1", "you the one")
|
|
u1_token = self.login("u1", "you the one")
|
|
room_1 = self.helper.create_room_as("u1", tok=u1_token)
|
|
|
|
self.get_success(
|
|
event_injection.inject_member_event(self.hs, room_1, "@user:host2", "join")
|
|
)
|
|
event_1 = self.get_success(
|
|
event_injection.inject_member_event(self.hs, room_1, "@user:host3", "join")
|
|
)
|
|
|
|
# First we send something from the local server, so that we notice the
|
|
# remote is down and go into catchup mode.
|
|
self.helper.send(room_1, "you hear me!!", tok=u1_token)
|
|
|
|
# Now simulate us receiving an event from the still online remote.
|
|
event_2 = self.get_success(
|
|
event_injection.inject_event(
|
|
self.hs,
|
|
type=EventTypes.Message,
|
|
sender="@user:host3",
|
|
room_id=room_1,
|
|
content={"msgtype": "m.text", "body": "Hello"},
|
|
)
|
|
)
|
|
|
|
self.get_success(
|
|
self.hs.get_datastore().set_destination_last_successful_stream_ordering(
|
|
"host2", event_1.internal_metadata.stream_ordering
|
|
)
|
|
)
|
|
|
|
self.get_success(per_dest_queue._catch_up_transmission_loop())
|
|
|
|
# We expect only the last message from the remote, event_2, to have been
|
|
# sent, rather than the last *local* event that was sent.
|
|
self.assertEqual(len(sent_pdus), 1)
|
|
self.assertEqual(sent_pdus[0].event_id, event_2.event_id)
|
|
self.assertFalse(per_dest_queue._catching_up)
|