Fix up logic for delaying sending read receipts over federation. (#17933)

For context of why we delay read receipts, see
https://github.com/matrix-org/synapse/issues/4730.

Element Web often sends read receipts in quick succession, if it reloads
the timeline it'll send one for the last message in the old timeline and
again for the last message in the new timeline. This caused remote users
to see a read receipt for older messages come through quickly, but then
the second read receipt taking a while to arrive for the most recent
message.

There are two things going on in this PR:
1. There was a mismatch between seconds and milliseconds, and so we
ended up delaying for far longer than intended.
2. Changing the logic to reuse the `DestinationWakeupQueue` (used for
presence)

The changes in logic are:
- Treat the first receipt and subsequent receipts in a room in the same
way
- Whitelist certain classes of receipts to never delay being sent, i.e.
receipts in small rooms, receipts for events that were sent within the
last 60s, and sending receipts to the event sender's server.
- The maximum delay a receipt can have before being sent to a server is
30s, and we'll send out receipts to remotes at least at 50Hz (by
default)

The upshot is that this should make receipts feel more snappy over
federation.

This new logic should send roughly between 10%–20% of transactions
immediately on matrix.org.
This commit is contained in:
Erik Johnston 2024-11-25 18:12:33 +00:00 committed by GitHub
parent 93cc955051
commit 3943d2fde7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 234 additions and 148 deletions

1
changelog.d/17933.bugfix Normal file
View file

@ -0,0 +1 @@
Fix long-standing bug where read receipts could get overly delayed being sent over federation.

View file

@ -140,7 +140,6 @@ from typing import (
Iterable, Iterable,
List, List,
Optional, Optional,
Set,
Tuple, Tuple,
) )
@ -170,7 +169,13 @@ from synapse.metrics.background_process_metrics import (
run_as_background_process, run_as_background_process,
wrap_as_background_process, wrap_as_background_process,
) )
from synapse.types import JsonDict, ReadReceipt, RoomStreamToken, StrCollection from synapse.types import (
JsonDict,
ReadReceipt,
RoomStreamToken,
StrCollection,
get_domain_from_id,
)
from synapse.util import Clock from synapse.util import Clock
from synapse.util.metrics import Measure from synapse.util.metrics import Measure
from synapse.util.retryutils import filter_destinations_by_retry_limiter from synapse.util.retryutils import filter_destinations_by_retry_limiter
@ -297,12 +302,10 @@ class _DestinationWakeupQueue:
# being woken up. # being woken up.
_MAX_TIME_IN_QUEUE = 30.0 _MAX_TIME_IN_QUEUE = 30.0
# The maximum duration in seconds between waking up consecutive destination
# queues.
_MAX_DELAY = 0.1
sender: "FederationSender" = attr.ib() sender: "FederationSender" = attr.ib()
clock: Clock = attr.ib() clock: Clock = attr.ib()
max_delay_s: int = attr.ib()
queue: "OrderedDict[str, Literal[None]]" = attr.ib(factory=OrderedDict) queue: "OrderedDict[str, Literal[None]]" = attr.ib(factory=OrderedDict)
processing: bool = attr.ib(default=False) processing: bool = attr.ib(default=False)
@ -332,7 +335,7 @@ class _DestinationWakeupQueue:
# We also add an upper bound to the delay, to gracefully handle the # We also add an upper bound to the delay, to gracefully handle the
# case where the queue only has a few entries in it. # case where the queue only has a few entries in it.
current_sleep_seconds = min( current_sleep_seconds = min(
self._MAX_DELAY, self._MAX_TIME_IN_QUEUE / len(self.queue) self.max_delay_s, self._MAX_TIME_IN_QUEUE / len(self.queue)
) )
while self.queue: while self.queue:
@ -416,19 +419,14 @@ class FederationSender(AbstractFederationSender):
self._is_processing = False self._is_processing = False
self._last_poked_id = -1 self._last_poked_id = -1
# map from room_id to a set of PerDestinationQueues which we believe are
# awaiting a call to flush_read_receipts_for_room. The presence of an entry
# here for a given room means that we are rate-limiting RR flushes to that room,
# and that there is a pending call to _flush_rrs_for_room in the system.
self._queues_awaiting_rr_flush_by_room: Dict[str, Set[PerDestinationQueue]] = {}
self._rr_txn_interval_per_room_ms = (
1000.0
/ hs.config.ratelimiting.federation_rr_transactions_per_room_per_second
)
self._external_cache = hs.get_external_cache() self._external_cache = hs.get_external_cache()
self._destination_wakeup_queue = _DestinationWakeupQueue(self, self.clock)
rr_txn_interval_per_room_s = (
1.0 / hs.config.ratelimiting.federation_rr_transactions_per_room_per_second
)
self._destination_wakeup_queue = _DestinationWakeupQueue(
self, self.clock, max_delay_s=rr_txn_interval_per_room_s
)
# Regularly wake up destinations that have outstanding PDUs to be caught up # Regularly wake up destinations that have outstanding PDUs to be caught up
self.clock.looping_call_now( self.clock.looping_call_now(
@ -745,37 +743,48 @@ class FederationSender(AbstractFederationSender):
# Some background on the rate-limiting going on here. # Some background on the rate-limiting going on here.
# #
# It turns out that if we attempt to send out RRs as soon as we get them from # It turns out that if we attempt to send out RRs as soon as we get them
# a client, then we end up trying to do several hundred Hz of federation # from a client, then we end up trying to do several hundred Hz of
# transactions. (The number of transactions scales as O(N^2) on the size of a # federation transactions. (The number of transactions scales as O(N^2)
# room, since in a large room we have both more RRs coming in, and more servers # on the size of a room, since in a large room we have both more RRs
# to send them to.) # coming in, and more servers to send them to.)
# #
# This leads to a lot of CPU load, and we end up getting behind. The solution # This leads to a lot of CPU load, and we end up getting behind. The
# currently adopted is as follows: # solution currently adopted is to differentiate between receipts and
# destinations we should immediately send to, and those we can trickle
# the receipts to.
# #
# The first receipt in a given room is sent out immediately, at time T0. Any # The current logic is to send receipts out immediately if:
# further receipts are, in theory, batched up for N seconds, where N is calculated # - the room is "small", i.e. there's only N servers to send receipts
# based on the number of servers in the room to achieve a transaction frequency # to, and so sending out the receipts immediately doesn't cause too
# of around 50Hz. So, for example, if there were 100 servers in the room, then # much load; or
# N would be 100 / 50Hz = 2 seconds. # - the receipt is for an event that happened recently, as users
# notice if receipts are delayed when they know other users are
# currently reading the room; or
# - the receipt is being sent to the server that sent the event, so
# that users see receipts for their own receipts quickly.
# #
# Then, after T+N, we flush out any receipts that have accumulated, and restart # For destinations that we should delay sending the receipt to, we queue
# the timer to flush out more receipts at T+2N, etc. If no receipts accumulate, # the receipts up to be sent in the next transaction, but don't trigger
# we stop the cycle and go back to the start. # a new transaction to be sent. We then add the destination to the
# `DestinationWakeupQueue`, which will slowly iterate over each
# destination and trigger a new transaction to be sent.
# #
# However, in practice, it is often possible to flush out receipts earlier: in # However, in practice, it is often possible to send out delayed
# particular, if we are sending a transaction to a given server anyway (for # receipts earlier: in particular, if we are sending a transaction to a
# example, because we have a PDU or a RR in another room to send), then we may # given server anyway (for example, because we have a PDU or a RR in
# as well send out all of the pending RRs for that server. So it may be that # another room to send), then we may as well send out all of the pending
# by the time we get to T+N, we don't actually have any RRs left to send out. # RRs for that server. So it may be that by the time we get to waking up
# Nevertheless we continue to buffer up RRs for the room in question until we # the destination, we don't actually have any RRs left to send out.
# reach the point that no RRs arrive between timer ticks.
# #
# For even more background, see https://github.com/matrix-org/synapse/issues/4730. # For even more background, see
# https://github.com/matrix-org/synapse/issues/4730.
room_id = receipt.room_id room_id = receipt.room_id
# Local read receipts always have 1 event ID.
event_id = receipt.event_ids[0]
# Work out which remote servers should be poked and poke them. # Work out which remote servers should be poked and poke them.
domains_set = await self._storage_controllers.state.get_current_hosts_in_room_or_partial_state_approximation( domains_set = await self._storage_controllers.state.get_current_hosts_in_room_or_partial_state_approximation(
room_id room_id
@ -797,49 +806,51 @@ class FederationSender(AbstractFederationSender):
if not domains: if not domains:
return return
queues_pending_flush = self._queues_awaiting_rr_flush_by_room.get(room_id) # We now split which domains we want to wake up immediately vs which we
# want to delay waking up.
immediate_domains: StrCollection
delay_domains: StrCollection
# if there is no flush yet scheduled, we will send out these receipts with if len(domains) < 10:
# immediate flushes, and schedule the next flush for this room. # For "small" rooms send to all domains immediately
if queues_pending_flush is not None: immediate_domains = domains
logger.debug("Queuing receipt for: %r", domains) delay_domains = ()
else: else:
logger.debug("Sending receipt to: %r", domains) metadata = await self.store.get_metadata_for_event(
self._schedule_rr_flush_for_room(room_id, len(domains)) receipt.room_id, event_id
)
assert metadata is not None
for domain in domains: sender_domain = get_domain_from_id(metadata.sender)
if self.clock.time_msec() - metadata.received_ts < 60_000:
# We always send receipts for recent messages immediately
immediate_domains = domains
delay_domains = ()
else:
# Otherwise, we delay waking up all destinations except for the
# sender's domain.
immediate_domains = []
delay_domains = []
for domain in domains:
if domain == sender_domain:
immediate_domains.append(domain)
else:
delay_domains.append(domain)
for domain in immediate_domains:
# Add to destination queue and wake the destination up
queue = self._get_per_destination_queue(domain)
queue.queue_read_receipt(receipt)
queue.attempt_new_transaction()
for domain in delay_domains:
# Add to destination queue...
queue = self._get_per_destination_queue(domain) queue = self._get_per_destination_queue(domain)
queue.queue_read_receipt(receipt) queue.queue_read_receipt(receipt)
# if there is already a RR flush pending for this room, then make sure this # ... and schedule the destination to be woken up.
# destination is registered for the flush self._destination_wakeup_queue.add_to_queue(domain)
if queues_pending_flush is not None:
queues_pending_flush.add(queue)
else:
queue.flush_read_receipts_for_room(room_id)
def _schedule_rr_flush_for_room(self, room_id: str, n_domains: int) -> None:
# that is going to cause approximately len(domains) transactions, so now back
# off for that multiplied by RR_TXN_INTERVAL_PER_ROOM
backoff_ms = self._rr_txn_interval_per_room_ms * n_domains
logger.debug("Scheduling RR flush in %s in %d ms", room_id, backoff_ms)
self.clock.call_later(backoff_ms, self._flush_rrs_for_room, room_id)
self._queues_awaiting_rr_flush_by_room[room_id] = set()
def _flush_rrs_for_room(self, room_id: str) -> None:
queues = self._queues_awaiting_rr_flush_by_room.pop(room_id)
logger.debug("Flushing RRs in %s to %s", room_id, queues)
if not queues:
# no more RRs arrived for this room; we are done.
return
# schedule the next flush
self._schedule_rr_flush_for_room(room_id, len(queues))
for queue in queues:
queue.flush_read_receipts_for_room(room_id)
async def send_presence_to_destinations( async def send_presence_to_destinations(
self, states: Iterable[UserPresenceState], destinations: Iterable[str] self, states: Iterable[UserPresenceState], destinations: Iterable[str]

View file

@ -156,7 +156,6 @@ class PerDestinationQueue:
# Each receipt can only have a single receipt per # Each receipt can only have a single receipt per
# (room ID, receipt type, user ID, thread ID) tuple. # (room ID, receipt type, user ID, thread ID) tuple.
self._pending_receipt_edus: List[Dict[str, Dict[str, Dict[str, dict]]]] = [] self._pending_receipt_edus: List[Dict[str, Dict[str, Dict[str, dict]]]] = []
self._rrs_pending_flush = False
# stream_id of last successfully sent to-device message. # stream_id of last successfully sent to-device message.
# NB: may be a long or an int. # NB: may be a long or an int.
@ -258,15 +257,7 @@ class PerDestinationQueue:
} }
) )
def flush_read_receipts_for_room(self, room_id: str) -> None: self.mark_new_data()
# If there are any pending receipts for this room then force-flush them
# in a new transaction.
for edu in self._pending_receipt_edus:
if room_id in edu:
self._rrs_pending_flush = True
self.attempt_new_transaction()
# No use in checking remaining EDUs if the room was found.
break
def send_keyed_edu(self, edu: Edu, key: Hashable) -> None: def send_keyed_edu(self, edu: Edu, key: Hashable) -> None:
self._pending_edus_keyed[(edu.edu_type, key)] = edu self._pending_edus_keyed[(edu.edu_type, key)] = edu
@ -603,12 +594,9 @@ class PerDestinationQueue:
self._destination, last_successful_stream_ordering self._destination, last_successful_stream_ordering
) )
def _get_receipt_edus(self, force_flush: bool, limit: int) -> Iterable[Edu]: def _get_receipt_edus(self, limit: int) -> Iterable[Edu]:
if not self._pending_receipt_edus: if not self._pending_receipt_edus:
return return
if not force_flush and not self._rrs_pending_flush:
# not yet time for this lot
return
# Send at most limit EDUs for receipts. # Send at most limit EDUs for receipts.
for content in self._pending_receipt_edus[:limit]: for content in self._pending_receipt_edus[:limit]:
@ -747,7 +735,7 @@ class _TransactionQueueManager:
) )
# Add read receipt EDUs. # Add read receipt EDUs.
pending_edus.extend(self.queue._get_receipt_edus(force_flush=False, limit=5)) pending_edus.extend(self.queue._get_receipt_edus(limit=5))
edu_limit = MAX_EDUS_PER_TRANSACTION - len(pending_edus) edu_limit = MAX_EDUS_PER_TRANSACTION - len(pending_edus)
# Next, prioritize to-device messages so that existing encryption channels # Next, prioritize to-device messages so that existing encryption channels
@ -795,13 +783,6 @@ class _TransactionQueueManager:
if not self._pdus and not pending_edus: if not self._pdus and not pending_edus:
return [], [] return [], []
# if we've decided to send a transaction anyway, and we have room, we
# may as well send any pending RRs
if edu_limit:
pending_edus.extend(
self.queue._get_receipt_edus(force_flush=True, limit=edu_limit)
)
if self._pdus: if self._pdus:
self._last_stream_ordering = self._pdus[ self._last_stream_ordering = self._pdus[
-1 -1

View file

@ -322,6 +322,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
self._attempt_to_invalidate_cache( self._attempt_to_invalidate_cache(
"get_unread_event_push_actions_by_room_for_user", (room_id,) "get_unread_event_push_actions_by_room_for_user", (room_id,)
) )
self._attempt_to_invalidate_cache("get_metadata_for_event", (room_id, event_id))
self._attempt_to_invalidate_cache("_get_max_event_pos", (room_id,)) self._attempt_to_invalidate_cache("_get_max_event_pos", (room_id,))
@ -446,6 +447,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
self._attempt_to_invalidate_cache("_get_state_group_for_event", None) self._attempt_to_invalidate_cache("_get_state_group_for_event", None)
self._attempt_to_invalidate_cache("get_event_ordering", None) self._attempt_to_invalidate_cache("get_event_ordering", None)
self._attempt_to_invalidate_cache("get_metadata_for_event", (room_id,))
self._attempt_to_invalidate_cache("is_partial_state_event", None) self._attempt_to_invalidate_cache("is_partial_state_event", None)
self._attempt_to_invalidate_cache("_get_joined_profile_from_event_id", None) self._attempt_to_invalidate_cache("_get_joined_profile_from_event_id", None)

View file

@ -193,6 +193,14 @@ class _EventRow:
outlier: bool outlier: bool
@attr.s(slots=True, frozen=True, auto_attribs=True)
class EventMetadata:
"""Event metadata returned by `get_metadata_for_event(..)`"""
sender: str
received_ts: int
class EventRedactBehaviour(Enum): class EventRedactBehaviour(Enum):
""" """
What to do when retrieving a redacted event from the database. What to do when retrieving a redacted event from the database.
@ -2580,3 +2588,22 @@ class EventsWorkerStore(SQLBaseStore):
_BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE, _BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE,
) )
) )
@cached(tree=True)
async def get_metadata_for_event(
self, room_id: str, event_id: str
) -> Optional[EventMetadata]:
row = await self.db_pool.simple_select_one(
table="events",
keyvalues={"room_id": room_id, "event_id": event_id},
retcols=("sender", "received_ts"),
allow_none=True,
desc="get_metadata_for_event",
)
if row is None:
return None
return EventMetadata(
sender=row[0],
received_ts=row[1],
)

View file

@ -34,6 +34,7 @@ from synapse.handlers.device import DeviceHandler
from synapse.rest import admin from synapse.rest import admin
from synapse.rest.client import login from synapse.rest.client import login
from synapse.server import HomeServer from synapse.server import HomeServer
from synapse.storage.databases.main.events_worker import EventMetadata
from synapse.types import JsonDict, ReadReceipt from synapse.types import JsonDict, ReadReceipt
from synapse.util import Clock from synapse.util import Clock
@ -55,12 +56,15 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase):
federation_transport_client=self.federation_transport_client, federation_transport_client=self.federation_transport_client,
) )
hs.get_storage_controllers().state.get_current_hosts_in_room = AsyncMock( # type: ignore[method-assign] self.main_store = hs.get_datastores().main
self.state_controller = hs.get_storage_controllers().state
self.state_controller.get_current_hosts_in_room = AsyncMock( # type: ignore[method-assign]
return_value={"test", "host2"} return_value={"test", "host2"}
) )
hs.get_storage_controllers().state.get_current_hosts_in_room_or_partial_state_approximation = ( # type: ignore[method-assign] self.state_controller.get_current_hosts_in_room_or_partial_state_approximation = ( # type: ignore[method-assign]
hs.get_storage_controllers().state.get_current_hosts_in_room self.state_controller.get_current_hosts_in_room
) )
return hs return hs
@ -185,12 +189,15 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase):
], ],
) )
def test_send_receipts_with_backoff(self) -> None: def test_send_receipts_with_backoff_small_room(self) -> None:
"""Send two receipts in quick succession; the second should be flushed, but """Read receipt in small rooms should not be delayed"""
only after 20ms"""
mock_send_transaction = self.federation_transport_client.send_transaction mock_send_transaction = self.federation_transport_client.send_transaction
mock_send_transaction.return_value = {} mock_send_transaction.return_value = {}
self.state_controller.get_current_hosts_in_room_or_partial_state_approximation = AsyncMock( # type: ignore[method-assign]
return_value={"test", "host2"}
)
sender = self.hs.get_federation_sender() sender = self.hs.get_federation_sender()
receipt = ReadReceipt( receipt = ReadReceipt(
"room_id", "room_id",
@ -206,7 +213,104 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase):
# expect a call to send_transaction # expect a call to send_transaction
mock_send_transaction.assert_called_once() mock_send_transaction.assert_called_once()
json_cb = mock_send_transaction.call_args[0][1] self._assert_edu_in_call(mock_send_transaction.call_args[0][1])
def test_send_receipts_with_backoff_recent_event(self) -> None:
"""Read receipt for a recent message should not be delayed"""
mock_send_transaction = self.federation_transport_client.send_transaction
mock_send_transaction.return_value = {}
# Pretend this is a big room
self.state_controller.get_current_hosts_in_room_or_partial_state_approximation = AsyncMock( # type: ignore[method-assign]
return_value={"test"} | {f"host{i}" for i in range(20)}
)
self.main_store.get_metadata_for_event = AsyncMock(
return_value=EventMetadata(
received_ts=self.clock.time_msec(),
sender="@test:test",
)
)
sender = self.hs.get_federation_sender()
receipt = ReadReceipt(
"room_id",
"m.read",
"user_id",
["event_id"],
thread_id=None,
data={"ts": 1234},
)
self.get_success(sender.send_read_receipt(receipt))
self.pump()
# expect a call to send_transaction for each host
self.assertEqual(mock_send_transaction.call_count, 20)
self._assert_edu_in_call(mock_send_transaction.call_args.args[1])
mock_send_transaction.reset_mock()
def test_send_receipts_with_backoff_sender(self) -> None:
"""Read receipt for a message should not be delayed to the sender, but
is delayed to everyone else"""
mock_send_transaction = self.federation_transport_client.send_transaction
mock_send_transaction.return_value = {}
# Pretend this is a big room
self.state_controller.get_current_hosts_in_room_or_partial_state_approximation = AsyncMock( # type: ignore[method-assign]
return_value={"test"} | {f"host{i}" for i in range(20)}
)
self.main_store.get_metadata_for_event = AsyncMock(
return_value=EventMetadata(
received_ts=self.clock.time_msec() - 5 * 60_000,
sender="@test:host1",
)
)
sender = self.hs.get_federation_sender()
receipt = ReadReceipt(
"room_id",
"m.read",
"user_id",
["event_id"],
thread_id=None,
data={"ts": 1234},
)
self.get_success(sender.send_read_receipt(receipt))
self.pump()
# First, expect a call to send_transaction for the sending host
mock_send_transaction.assert_called()
transaction = mock_send_transaction.call_args_list[0].args[0]
self.assertEqual(transaction.destination, "host1")
self._assert_edu_in_call(mock_send_transaction.call_args_list[0].args[1])
# We also expect a call to one of the other hosts, as the first
# destination to wake up.
self.assertEqual(mock_send_transaction.call_count, 2)
self._assert_edu_in_call(mock_send_transaction.call_args.args[1])
mock_send_transaction.reset_mock()
# We now expect to see 18 more transactions to the remaining hosts
# periodically.
for _ in range(18):
self.reactor.advance(
1.0
/ self.hs.config.ratelimiting.federation_rr_transactions_per_room_per_second
)
mock_send_transaction.assert_called_once()
self._assert_edu_in_call(mock_send_transaction.call_args.args[1])
mock_send_transaction.reset_mock()
def _assert_edu_in_call(self, json_cb: Callable[[], JsonDict]) -> None:
"""Assert that the given `json_cb` from a `send_transaction` has a
receipt in it."""
data = json_cb() data = json_cb()
self.assertEqual( self.assertEqual(
data["edus"], data["edus"],
@ -226,46 +330,6 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase):
} }
], ],
) )
mock_send_transaction.reset_mock()
# send the second RR
receipt = ReadReceipt(
"room_id",
"m.read",
"user_id",
["other_id"],
thread_id=None,
data={"ts": 1234},
)
self.successResultOf(defer.ensureDeferred(sender.send_read_receipt(receipt)))
self.pump()
mock_send_transaction.assert_not_called()
self.reactor.advance(19)
mock_send_transaction.assert_not_called()
self.reactor.advance(10)
mock_send_transaction.assert_called_once()
json_cb = mock_send_transaction.call_args[0][1]
data = json_cb()
self.assertEqual(
data["edus"],
[
{
"edu_type": EduTypes.RECEIPT,
"content": {
"room_id": {
"m.read": {
"user_id": {
"event_ids": ["other_id"],
"data": {"ts": 1234},
}
}
}
},
}
],
)
class FederationSenderPresenceTestCases(HomeserverTestCase): class FederationSenderPresenceTestCases(HomeserverTestCase):