Catch-up after Federation Outage (split, 4): catch-up loop (#8272)

This commit is contained in:
reivilibre 2020-09-15 09:07:19 +01:00 committed by GitHub
parent aec294ee0d
commit 576bc37d31
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 338 additions and 5 deletions

1
changelog.d/8272.bugfix Normal file
View file

@ -0,0 +1 @@
Fix messages over federation being lost until an event is sent into the same room.

View file

@ -15,7 +15,7 @@
# limitations under the License.
import datetime
import logging
from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Tuple
from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple, cast
from prometheus_client import Counter
@ -92,6 +92,21 @@ class PerDestinationQueue:
self._destination = destination
self.transmission_loop_running = False
# True whilst we are sending events that the remote homeserver missed
# because it was unreachable. We start in this state so we can perform
# catch-up at startup.
# New events will only be sent once this is finished, at which point
# _catching_up is flipped to False.
self._catching_up = True # type: bool
# The stream_ordering of the most recent PDU that was discarded due to
# being in catch-up mode.
self._catchup_last_skipped = 0 # type: int
# Cache of the last successfully-transmitted stream ordering for this
# destination (we are the only updater so this is safe)
self._last_successful_stream_ordering = None # type: Optional[int]
# a list of pending PDUs
self._pending_pdus = [] # type: List[EventBase]
@ -138,7 +153,13 @@ class PerDestinationQueue:
Args:
pdu: pdu to send
"""
self._pending_pdus.append(pdu)
if not self._catching_up or self._last_successful_stream_ordering is None:
# only enqueue the PDU if we are not catching up (False) or do not
# yet know if we have anything to catch up (None)
self._pending_pdus.append(pdu)
else:
self._catchup_last_skipped = pdu.internal_metadata.stream_ordering
self.attempt_new_transaction()
def send_presence(self, states: Iterable[UserPresenceState]) -> None:
@ -218,6 +239,13 @@ class PerDestinationQueue:
# hence why we throw the result away.
await get_retry_limiter(self._destination, self._clock, self._store)
if self._catching_up:
# we potentially need to catch-up first
await self._catch_up_transmission_loop()
if self._catching_up:
# not caught up yet
return
pending_pdus = []
while True:
# We have to keep 2 free slots for presence and rr_edus
@ -351,8 +379,9 @@ class PerDestinationQueue:
if e.retry_interval > 60 * 60 * 1000:
# we won't retry for another hour!
# (this suggests a significant outage)
# We drop pending PDUs and EDUs because otherwise they will
# We drop pending EDUs because otherwise they will
# rack up indefinitely.
# (Dropping PDUs is already performed by `_start_catching_up`.)
# Note that:
# - the EDUs that are being dropped here are those that we can
# afford to drop (specifically, only typing notifications,
@ -364,11 +393,12 @@ class PerDestinationQueue:
# dropping read receipts is a bit sad but should be solved
# through another mechanism, because this is all volatile!
self._pending_pdus = []
self._pending_edus = []
self._pending_edus_keyed = {}
self._pending_presence = {}
self._pending_rrs = {}
self._start_catching_up()
except FederationDeniedError as e:
logger.info(e)
except HttpResponseException as e:
@ -378,6 +408,8 @@ class PerDestinationQueue:
e.code,
e,
)
self._start_catching_up()
except RequestSendFailed as e:
logger.warning(
"TX [%s] Failed to send transaction: %s", self._destination, e
@ -387,16 +419,96 @@ class PerDestinationQueue:
logger.info(
"Failed to send event %s to %s", p.event_id, self._destination
)
self._start_catching_up()
except Exception:
logger.exception("TX [%s] Failed to send transaction", self._destination)
for p in pending_pdus:
logger.info(
"Failed to send event %s to %s", p.event_id, self._destination
)
self._start_catching_up()
finally:
# We want to be *very* sure we clear this after we stop processing
self.transmission_loop_running = False
async def _catch_up_transmission_loop(self) -> None:
first_catch_up_check = self._last_successful_stream_ordering is None
if first_catch_up_check:
# first catchup so get last_successful_stream_ordering from database
self._last_successful_stream_ordering = await self._store.get_destination_last_successful_stream_ordering(
self._destination
)
if self._last_successful_stream_ordering is None:
# if it's still None, then this means we don't have the information
# in our database ­ we haven't successfully sent a PDU to this server
# (at least since the introduction of the feature tracking
# last_successful_stream_ordering).
# Sadly, this means we can't do anything here as we don't know what
# needs catching up — so catching up is futile; let's stop.
self._catching_up = False
return
# get at most 50 catchup room/PDUs
while True:
event_ids = await self._store.get_catch_up_room_event_ids(
self._destination, self._last_successful_stream_ordering,
)
if not event_ids:
# No more events to catch up on, but we can't ignore the chance
# of a race condition, so we check that no new events have been
# skipped due to us being in catch-up mode
if self._catchup_last_skipped > self._last_successful_stream_ordering:
# another event has been skipped because we were in catch-up mode
continue
# we are done catching up!
self._catching_up = False
break
if first_catch_up_check:
# as this is our check for needing catch-up, we may have PDUs in
# the queue from before we *knew* we had to do catch-up, so
# clear those out now.
self._start_catching_up()
# fetch the relevant events from the event store
# - redacted behaviour of REDACT is fine, since we only send metadata
# of redacted events to the destination.
# - don't need to worry about rejected events as we do not actively
# forward received events over federation.
catchup_pdus = await self._store.get_events_as_list(event_ids)
if not catchup_pdus:
raise AssertionError(
"No events retrieved when we asked for %r. "
"This should not happen." % event_ids
)
if logger.isEnabledFor(logging.INFO):
rooms = (p.room_id for p in catchup_pdus)
logger.info("Catching up rooms to %s: %r", self._destination, rooms)
success = await self._transaction_manager.send_new_transaction(
self._destination, catchup_pdus, []
)
if not success:
return
sent_transactions_counter.inc()
final_pdu = catchup_pdus[-1]
self._last_successful_stream_ordering = cast(
int, final_pdu.internal_metadata.stream_ordering
)
await self._store.set_destination_last_successful_stream_ordering(
self._destination, self._last_successful_stream_ordering
)
def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]:
if not self._pending_rrs:
return
@ -457,3 +569,12 @@ class PerDestinationQueue:
]
return (edus, stream_id)
def _start_catching_up(self) -> None:
"""
Marks this destination as being in catch-up mode.
This throws away the PDU queue.
"""
self._catching_up = True
self._pending_pdus = []

View file

@ -15,7 +15,7 @@
import logging
from collections import namedtuple
from typing import Iterable, Optional, Tuple
from typing import Iterable, List, Optional, Tuple
from canonicaljson import encode_canonical_json
@ -371,3 +371,44 @@ class TransactionStore(SQLBaseStore):
values={"last_successful_stream_ordering": last_successful_stream_ordering},
desc="set_last_successful_stream_ordering",
)
async def get_catch_up_room_event_ids(
self, destination: str, last_successful_stream_ordering: int,
) -> List[str]:
"""
Returns at most 50 event IDs and their corresponding stream_orderings
that correspond to the oldest events that have not yet been sent to
the destination.
Args:
destination: the destination in question
last_successful_stream_ordering: the stream_ordering of the
most-recently successfully-transmitted event to the destination
Returns:
list of event_ids
"""
return await self.db_pool.runInteraction(
"get_catch_up_room_event_ids",
self._get_catch_up_room_event_ids_txn,
destination,
last_successful_stream_ordering,
)
@staticmethod
def _get_catch_up_room_event_ids_txn(
txn, destination: str, last_successful_stream_ordering: int,
) -> List[str]:
q = """
SELECT event_id FROM destination_rooms
JOIN events USING (stream_ordering)
WHERE destination = ?
AND stream_ordering > ?
ORDER BY stream_ordering
LIMIT 50
"""
txn.execute(
q, (destination, last_successful_stream_ordering),
)
event_ids = [row[0] for row in txn]
return event_ids

View file

@ -1,5 +1,10 @@
from typing import List, Tuple
from mock import Mock
from synapse.events import EventBase
from synapse.federation.sender import PerDestinationQueue, TransactionManager
from synapse.federation.units import Edu
from synapse.rest import admin
from synapse.rest.client.v1 import login, room
@ -156,3 +161,163 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase):
row_2["stream_ordering"],
"Send succeeded but not marked as last_successful_stream_ordering",
)
@override_config({"send_federation": True}) # critical to federate
def test_catch_up_from_blank_state(self):
"""
Runs an overall test of federation catch-up from scratch.
Further tests will focus on more narrow aspects and edge-cases, but I
hope to provide an overall view with this test.
"""
# bring the other server online
self.is_online = True
# let's make some events for the other server to receive
self.register_user("u1", "you the one")
u1_token = self.login("u1", "you the one")
room_1 = self.helper.create_room_as("u1", tok=u1_token)
room_2 = self.helper.create_room_as("u1", tok=u1_token)
# also critical to federate
self.get_success(
event_injection.inject_member_event(self.hs, room_1, "@user:host2", "join")
)
self.get_success(
event_injection.inject_member_event(self.hs, room_2, "@user:host2", "join")
)
self.helper.send_state(
room_1, event_type="m.room.topic", body={"topic": "wombat"}, tok=u1_token
)
# check: PDU received for topic event
self.assertEqual(len(self.pdus), 1)
self.assertEqual(self.pdus[0]["type"], "m.room.topic")
# take the remote offline
self.is_online = False
# send another event
self.helper.send(room_1, "hi user!", tok=u1_token)
# check: things didn't go well since the remote is down
self.assertEqual(len(self.failed_pdus), 1)
self.assertEqual(self.failed_pdus[0]["content"]["body"], "hi user!")
# let's delete the federation transmission queue
# (this pretends we are starting up fresh.)
self.assertFalse(
self.hs.get_federation_sender()
._per_destination_queues["host2"]
.transmission_loop_running
)
del self.hs.get_federation_sender()._per_destination_queues["host2"]
# let's also clear any backoffs
self.get_success(
self.hs.get_datastore().set_destination_retry_timings("host2", None, 0, 0)
)
# bring the remote online and clear the received pdu list
self.is_online = True
self.pdus = []
# now we need to initiate a federation transaction somehow…
# to do that, let's send another event (because it's simple to do)
# (do it to another room otherwise the catch-up logic decides it doesn't
# need to catch up room_1 — something I overlooked when first writing
# this test)
self.helper.send(room_2, "wombats!", tok=u1_token)
# we should now have received both PDUs
self.assertEqual(len(self.pdus), 2)
self.assertEqual(self.pdus[0]["content"]["body"], "hi user!")
self.assertEqual(self.pdus[1]["content"]["body"], "wombats!")
def make_fake_destination_queue(
self, destination: str = "host2"
) -> Tuple[PerDestinationQueue, List[EventBase]]:
"""
Makes a fake per-destination queue.
"""
transaction_manager = TransactionManager(self.hs)
per_dest_queue = PerDestinationQueue(self.hs, transaction_manager, destination)
results_list = []
async def fake_send(
destination_tm: str,
pending_pdus: List[EventBase],
_pending_edus: List[Edu],
) -> bool:
assert destination == destination_tm
results_list.extend(pending_pdus)
return True # success!
transaction_manager.send_new_transaction = fake_send
return per_dest_queue, results_list
@override_config({"send_federation": True})
def test_catch_up_loop(self):
"""
Tests the behaviour of _catch_up_transmission_loop.
"""
# ARRANGE:
# - a local user (u1)
# - 3 rooms which u1 is joined to (and remote user @user:host2 is
# joined to)
# - some events (1 to 5) in those rooms
# we have 'already sent' events 1 and 2 to host2
per_dest_queue, sent_pdus = self.make_fake_destination_queue()
self.register_user("u1", "you the one")
u1_token = self.login("u1", "you the one")
room_1 = self.helper.create_room_as("u1", tok=u1_token)
room_2 = self.helper.create_room_as("u1", tok=u1_token)
room_3 = self.helper.create_room_as("u1", tok=u1_token)
self.get_success(
event_injection.inject_member_event(self.hs, room_1, "@user:host2", "join")
)
self.get_success(
event_injection.inject_member_event(self.hs, room_2, "@user:host2", "join")
)
self.get_success(
event_injection.inject_member_event(self.hs, room_3, "@user:host2", "join")
)
# create some events
self.helper.send(room_1, "you hear me!!", tok=u1_token)
event_id_2 = self.helper.send(room_2, "wombats!", tok=u1_token)["event_id"]
self.helper.send(room_3, "Matrix!", tok=u1_token)
event_id_4 = self.helper.send(room_2, "rabbits!", tok=u1_token)["event_id"]
event_id_5 = self.helper.send(room_3, "Synapse!", tok=u1_token)["event_id"]
# destination_rooms should already be populated, but let us pretend that we already
# sent (successfully) up to and including event id 2
event_2 = self.get_success(self.hs.get_datastore().get_event(event_id_2))
# also fetch event 5 so we know its last_successful_stream_ordering later
event_5 = self.get_success(self.hs.get_datastore().get_event(event_id_5))
self.get_success(
self.hs.get_datastore().set_destination_last_successful_stream_ordering(
"host2", event_2.internal_metadata.stream_ordering
)
)
# ACT
self.get_success(per_dest_queue._catch_up_transmission_loop())
# ASSERT, noticing in particular:
# - event 3 not sent out, because event 5 replaces it
# - order is least recent first, so event 5 comes after event 4
# - catch-up is completed
self.assertEqual(len(sent_pdus), 2)
self.assertEqual(sent_pdus[0].event_id, event_id_4)
self.assertEqual(sent_pdus[1].event_id, event_id_5)
self.assertFalse(per_dest_queue._catching_up)
self.assertEqual(
per_dest_queue._last_successful_stream_ordering,
event_5.internal_metadata.stream_ordering,
)

View file

@ -73,6 +73,7 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
"delivered_txn",
"get_received_txn_response",
"set_received_txn_response",
"get_destination_last_successful_stream_ordering",
"get_destination_retry_timings",
"get_devices_by_remote",
"maybe_store_room_on_invite",
@ -121,6 +122,10 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
(0, [])
)
self.datastore.get_destination_last_successful_stream_ordering.return_value = make_awaitable(
None
)
def get_received_txn_response(*args):
return defer.succeed(None)