mirror of
https://github.com/element-hq/synapse.git
synced 2024-11-21 09:05:42 +03:00
Don't delay new receipts
This commit is contained in:
parent
97284689ea
commit
ef27ed01fe
5 changed files with 233 additions and 148 deletions
|
@ -140,7 +140,6 @@ from typing import (
|
|||
Iterable,
|
||||
List,
|
||||
Optional,
|
||||
Set,
|
||||
Tuple,
|
||||
)
|
||||
|
||||
|
@ -170,7 +169,13 @@ from synapse.metrics.background_process_metrics import (
|
|||
run_as_background_process,
|
||||
wrap_as_background_process,
|
||||
)
|
||||
from synapse.types import JsonDict, ReadReceipt, RoomStreamToken, StrCollection
|
||||
from synapse.types import (
|
||||
JsonDict,
|
||||
ReadReceipt,
|
||||
RoomStreamToken,
|
||||
StrCollection,
|
||||
get_domain_from_id,
|
||||
)
|
||||
from synapse.util import Clock
|
||||
from synapse.util.metrics import Measure
|
||||
from synapse.util.retryutils import filter_destinations_by_retry_limiter
|
||||
|
@ -297,12 +302,10 @@ class _DestinationWakeupQueue:
|
|||
# being woken up.
|
||||
_MAX_TIME_IN_QUEUE = 30.0
|
||||
|
||||
# The maximum duration in seconds between waking up consecutive destination
|
||||
# queues.
|
||||
_MAX_DELAY = 0.1
|
||||
|
||||
sender: "FederationSender" = attr.ib()
|
||||
clock: Clock = attr.ib()
|
||||
max_delay_s: int = attr.ib()
|
||||
|
||||
queue: "OrderedDict[str, Literal[None]]" = attr.ib(factory=OrderedDict)
|
||||
processing: bool = attr.ib(default=False)
|
||||
|
||||
|
@ -332,7 +335,7 @@ class _DestinationWakeupQueue:
|
|||
# We also add an upper bound to the delay, to gracefully handle the
|
||||
# case where the queue only has a few entries in it.
|
||||
current_sleep_seconds = min(
|
||||
self._MAX_DELAY, self._MAX_TIME_IN_QUEUE / len(self.queue)
|
||||
self.max_delay_s, self._MAX_TIME_IN_QUEUE / len(self.queue)
|
||||
)
|
||||
|
||||
while self.queue:
|
||||
|
@ -416,19 +419,14 @@ class FederationSender(AbstractFederationSender):
|
|||
self._is_processing = False
|
||||
self._last_poked_id = -1
|
||||
|
||||
# map from room_id to a set of PerDestinationQueues which we believe are
|
||||
# awaiting a call to flush_read_receipts_for_room. The presence of an entry
|
||||
# here for a given room means that we are rate-limiting RR flushes to that room,
|
||||
# and that there is a pending call to _flush_rrs_for_room in the system.
|
||||
self._queues_awaiting_rr_flush_by_room: Dict[str, Set[PerDestinationQueue]] = {}
|
||||
|
||||
self._rr_txn_interval_per_room_ms = (
|
||||
1000.0
|
||||
/ hs.config.ratelimiting.federation_rr_transactions_per_room_per_second
|
||||
)
|
||||
|
||||
self._external_cache = hs.get_external_cache()
|
||||
self._destination_wakeup_queue = _DestinationWakeupQueue(self, self.clock)
|
||||
|
||||
rr_txn_interval_per_room_s = (
|
||||
1.0 / hs.config.ratelimiting.federation_rr_transactions_per_room_per_second
|
||||
)
|
||||
self._destination_wakeup_queue = _DestinationWakeupQueue(
|
||||
self, self.clock, max_delay_s=rr_txn_interval_per_room_s
|
||||
)
|
||||
|
||||
# Regularly wake up destinations that have outstanding PDUs to be caught up
|
||||
self.clock.looping_call_now(
|
||||
|
@ -745,37 +743,48 @@ class FederationSender(AbstractFederationSender):
|
|||
|
||||
# Some background on the rate-limiting going on here.
|
||||
#
|
||||
# It turns out that if we attempt to send out RRs as soon as we get them from
|
||||
# a client, then we end up trying to do several hundred Hz of federation
|
||||
# transactions. (The number of transactions scales as O(N^2) on the size of a
|
||||
# room, since in a large room we have both more RRs coming in, and more servers
|
||||
# to send them to.)
|
||||
# It turns out that if we attempt to send out RRs as soon as we get them
|
||||
# from a client, then we end up trying to do several hundred Hz of
|
||||
# federation transactions. (The number of transactions scales as O(N^2)
|
||||
# on the size of a room, since in a large room we have both more RRs
|
||||
# coming in, and more servers to send them to.)
|
||||
#
|
||||
# This leads to a lot of CPU load, and we end up getting behind. The solution
|
||||
# currently adopted is as follows:
|
||||
# This leads to a lot of CPU load, and we end up getting behind. The
|
||||
# solution currently adopted is to differentiate between receipts and
|
||||
# destinations we should immediately send to, and those we can trickle
|
||||
# the receipts to.
|
||||
#
|
||||
# The first receipt in a given room is sent out immediately, at time T0. Any
|
||||
# further receipts are, in theory, batched up for N seconds, where N is calculated
|
||||
# based on the number of servers in the room to achieve a transaction frequency
|
||||
# of around 50Hz. So, for example, if there were 100 servers in the room, then
|
||||
# N would be 100 / 50Hz = 2 seconds.
|
||||
# The current logic is to send receipts out immediately if:
|
||||
# - the room is "small", i.e. there's only N servers to send receipts
|
||||
# to, and so sending out the receipts immediately doesn't cause too
|
||||
# much load; or
|
||||
# - the receipt is for an event that happened recently, as users
|
||||
# notice if receipts are delayed when they know other users are
|
||||
# currently reading the room; or
|
||||
# - the receipt is being sent to the server that sent the event, so
|
||||
# that users see receipts for their own receipts quickly.
|
||||
#
|
||||
# Then, after T+N, we flush out any receipts that have accumulated, and restart
|
||||
# the timer to flush out more receipts at T+2N, etc. If no receipts accumulate,
|
||||
# we stop the cycle and go back to the start.
|
||||
# For destinations that we should delay sending the receipt to, we queue
|
||||
# the receipts up to be sent in the next transaction, but don't trigger
|
||||
# a new transaction to be sent. We then add the destination to the
|
||||
# `DestinationWakeupQueue`, which will slowly iterate over each
|
||||
# destination and trigger a new transaction to be sent.
|
||||
#
|
||||
# However, in practice, it is often possible to flush out receipts earlier: in
|
||||
# particular, if we are sending a transaction to a given server anyway (for
|
||||
# example, because we have a PDU or a RR in another room to send), then we may
|
||||
# as well send out all of the pending RRs for that server. So it may be that
|
||||
# by the time we get to T+N, we don't actually have any RRs left to send out.
|
||||
# Nevertheless we continue to buffer up RRs for the room in question until we
|
||||
# reach the point that no RRs arrive between timer ticks.
|
||||
# However, in practice, it is often possible to send out delayed
|
||||
# receipts earlier: in particular, if we are sending a transaction to a
|
||||
# given server anyway (for example, because we have a PDU or a RR in
|
||||
# another room to send), then we may as well send out all of the pending
|
||||
# RRs for that server. So it may be that by the time we get to waking up
|
||||
# the destination, we don't actually have any RRs left to send out.
|
||||
#
|
||||
# For even more background, see https://github.com/matrix-org/synapse/issues/4730.
|
||||
# For even more background, see
|
||||
# https://github.com/matrix-org/synapse/issues/4730.
|
||||
|
||||
room_id = receipt.room_id
|
||||
|
||||
# Local read receipts always have 1 event ID.
|
||||
event_id = receipt.event_ids[0]
|
||||
|
||||
# Work out which remote servers should be poked and poke them.
|
||||
domains_set = await self._storage_controllers.state.get_current_hosts_in_room_or_partial_state_approximation(
|
||||
room_id
|
||||
|
@ -797,49 +806,51 @@ class FederationSender(AbstractFederationSender):
|
|||
if not domains:
|
||||
return
|
||||
|
||||
queues_pending_flush = self._queues_awaiting_rr_flush_by_room.get(room_id)
|
||||
# We now split which domains we want to wake up immediately vs which we
|
||||
# want to delay waking up.
|
||||
immediate_domains: StrCollection
|
||||
delay_domains: StrCollection
|
||||
|
||||
# if there is no flush yet scheduled, we will send out these receipts with
|
||||
# immediate flushes, and schedule the next flush for this room.
|
||||
if queues_pending_flush is not None:
|
||||
logger.debug("Queuing receipt for: %r", domains)
|
||||
if len(domains) < 10:
|
||||
# For "small" rooms send to all domains immediately
|
||||
immediate_domains = domains
|
||||
delay_domains = ()
|
||||
else:
|
||||
logger.debug("Sending receipt to: %r", domains)
|
||||
self._schedule_rr_flush_for_room(room_id, len(domains))
|
||||
metadata = await self.store.get_metadata_for_event(
|
||||
receipt.room_id, event_id
|
||||
)
|
||||
assert metadata is not None
|
||||
|
||||
for domain in domains:
|
||||
sender_domain = get_domain_from_id(metadata.sender)
|
||||
|
||||
if self.clock.time_msec() - metadata.received_ts < 60_000:
|
||||
# We always send receipts for recent messages immediately
|
||||
immediate_domains = domains
|
||||
delay_domains = ()
|
||||
else:
|
||||
# Otherwise, we delay waking up all destinations except for the
|
||||
# sender's domain.
|
||||
immediate_domains = []
|
||||
delay_domains = []
|
||||
for domain in domains:
|
||||
if domain == sender_domain:
|
||||
immediate_domains.append(domain)
|
||||
else:
|
||||
delay_domains.append(domain)
|
||||
|
||||
for domain in immediate_domains:
|
||||
# Add to destination queue and wake the destination up
|
||||
queue = self._get_per_destination_queue(domain)
|
||||
queue.queue_read_receipt(receipt)
|
||||
queue.attempt_new_transaction()
|
||||
|
||||
for domain in delay_domains:
|
||||
# Add to destination queue...
|
||||
queue = self._get_per_destination_queue(domain)
|
||||
queue.queue_read_receipt(receipt)
|
||||
|
||||
# if there is already a RR flush pending for this room, then make sure this
|
||||
# destination is registered for the flush
|
||||
if queues_pending_flush is not None:
|
||||
queues_pending_flush.add(queue)
|
||||
else:
|
||||
queue.flush_read_receipts_for_room(room_id)
|
||||
|
||||
def _schedule_rr_flush_for_room(self, room_id: str, n_domains: int) -> None:
|
||||
# that is going to cause approximately len(domains) transactions, so now back
|
||||
# off for that multiplied by RR_TXN_INTERVAL_PER_ROOM
|
||||
backoff_ms = self._rr_txn_interval_per_room_ms * n_domains
|
||||
|
||||
logger.debug("Scheduling RR flush in %s in %d ms", room_id, backoff_ms)
|
||||
self.clock.call_later(backoff_ms, self._flush_rrs_for_room, room_id)
|
||||
self._queues_awaiting_rr_flush_by_room[room_id] = set()
|
||||
|
||||
def _flush_rrs_for_room(self, room_id: str) -> None:
|
||||
queues = self._queues_awaiting_rr_flush_by_room.pop(room_id)
|
||||
logger.debug("Flushing RRs in %s to %s", room_id, queues)
|
||||
|
||||
if not queues:
|
||||
# no more RRs arrived for this room; we are done.
|
||||
return
|
||||
|
||||
# schedule the next flush
|
||||
self._schedule_rr_flush_for_room(room_id, len(queues))
|
||||
|
||||
for queue in queues:
|
||||
queue.flush_read_receipts_for_room(room_id)
|
||||
# ... and schedule the destination to be woken up.
|
||||
self._destination_wakeup_queue.add_to_queue(domain)
|
||||
|
||||
async def send_presence_to_destinations(
|
||||
self, states: Iterable[UserPresenceState], destinations: Iterable[str]
|
||||
|
|
|
@ -156,7 +156,6 @@ class PerDestinationQueue:
|
|||
# Each receipt can only have a single receipt per
|
||||
# (room ID, receipt type, user ID, thread ID) tuple.
|
||||
self._pending_receipt_edus: List[Dict[str, Dict[str, Dict[str, dict]]]] = []
|
||||
self._rrs_pending_flush = False
|
||||
|
||||
# stream_id of last successfully sent to-device message.
|
||||
# NB: may be a long or an int.
|
||||
|
@ -258,15 +257,7 @@ class PerDestinationQueue:
|
|||
}
|
||||
)
|
||||
|
||||
def flush_read_receipts_for_room(self, room_id: str) -> None:
|
||||
# If there are any pending receipts for this room then force-flush them
|
||||
# in a new transaction.
|
||||
for edu in self._pending_receipt_edus:
|
||||
if room_id in edu:
|
||||
self._rrs_pending_flush = True
|
||||
self.attempt_new_transaction()
|
||||
# No use in checking remaining EDUs if the room was found.
|
||||
break
|
||||
self.mark_new_data()
|
||||
|
||||
def send_keyed_edu(self, edu: Edu, key: Hashable) -> None:
|
||||
self._pending_edus_keyed[(edu.edu_type, key)] = edu
|
||||
|
@ -603,12 +594,9 @@ class PerDestinationQueue:
|
|||
self._destination, last_successful_stream_ordering
|
||||
)
|
||||
|
||||
def _get_receipt_edus(self, force_flush: bool, limit: int) -> Iterable[Edu]:
|
||||
def _get_receipt_edus(self, limit: int) -> Iterable[Edu]:
|
||||
if not self._pending_receipt_edus:
|
||||
return
|
||||
if not force_flush and not self._rrs_pending_flush:
|
||||
# not yet time for this lot
|
||||
return
|
||||
|
||||
# Send at most limit EDUs for receipts.
|
||||
for content in self._pending_receipt_edus[:limit]:
|
||||
|
@ -747,7 +735,7 @@ class _TransactionQueueManager:
|
|||
)
|
||||
|
||||
# Add read receipt EDUs.
|
||||
pending_edus.extend(self.queue._get_receipt_edus(force_flush=False, limit=5))
|
||||
pending_edus.extend(self.queue._get_receipt_edus(limit=5))
|
||||
edu_limit = MAX_EDUS_PER_TRANSACTION - len(pending_edus)
|
||||
|
||||
# Next, prioritize to-device messages so that existing encryption channels
|
||||
|
@ -795,13 +783,6 @@ class _TransactionQueueManager:
|
|||
if not self._pdus and not pending_edus:
|
||||
return [], []
|
||||
|
||||
# if we've decided to send a transaction anyway, and we have room, we
|
||||
# may as well send any pending RRs
|
||||
if edu_limit:
|
||||
pending_edus.extend(
|
||||
self.queue._get_receipt_edus(force_flush=True, limit=edu_limit)
|
||||
)
|
||||
|
||||
if self._pdus:
|
||||
self._last_stream_ordering = self._pdus[
|
||||
-1
|
||||
|
|
|
@ -322,6 +322,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
|||
self._attempt_to_invalidate_cache(
|
||||
"get_unread_event_push_actions_by_room_for_user", (room_id,)
|
||||
)
|
||||
self._attempt_to_invalidate_cache("get_metadata_for_event", (room_id, event_id))
|
||||
|
||||
self._attempt_to_invalidate_cache("_get_max_event_pos", (room_id,))
|
||||
|
||||
|
@ -446,6 +447,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
|
|||
self._attempt_to_invalidate_cache("_get_state_group_for_event", None)
|
||||
|
||||
self._attempt_to_invalidate_cache("get_event_ordering", None)
|
||||
self._attempt_to_invalidate_cache("get_metadata_for_event", (room_id,))
|
||||
self._attempt_to_invalidate_cache("is_partial_state_event", None)
|
||||
self._attempt_to_invalidate_cache("_get_joined_profile_from_event_id", None)
|
||||
|
||||
|
|
|
@ -193,6 +193,14 @@ class _EventRow:
|
|||
outlier: bool
|
||||
|
||||
|
||||
@attr.s(slots=True, frozen=True, auto_attribs=True)
|
||||
class EventMetadata:
|
||||
"""Event metadata returned by `get_metadata_for_event(..)`"""
|
||||
|
||||
sender: str
|
||||
received_ts: int
|
||||
|
||||
|
||||
class EventRedactBehaviour(Enum):
|
||||
"""
|
||||
What to do when retrieving a redacted event from the database.
|
||||
|
@ -2580,3 +2588,22 @@ class EventsWorkerStore(SQLBaseStore):
|
|||
_BackgroundUpdates.SLIDING_SYNC_MEMBERSHIP_SNAPSHOTS_BG_UPDATE,
|
||||
)
|
||||
)
|
||||
|
||||
@cached(tree=True)
|
||||
async def get_metadata_for_event(
|
||||
self, room_id: str, event_id: str
|
||||
) -> Optional[EventMetadata]:
|
||||
row = await self.db_pool.simple_select_one(
|
||||
table="events",
|
||||
keyvalues={"room_id": room_id, "event_id": event_id},
|
||||
retcols=("sender", "received_ts"),
|
||||
allow_none=True,
|
||||
desc="get_metadata_for_event",
|
||||
)
|
||||
if row is None:
|
||||
return None
|
||||
|
||||
return EventMetadata(
|
||||
sender=row[0],
|
||||
received_ts=row[1],
|
||||
)
|
||||
|
|
|
@ -34,6 +34,7 @@ from synapse.handlers.device import DeviceHandler
|
|||
from synapse.rest import admin
|
||||
from synapse.rest.client import login
|
||||
from synapse.server import HomeServer
|
||||
from synapse.storage.databases.main.events_worker import EventMetadata
|
||||
from synapse.types import JsonDict, ReadReceipt
|
||||
from synapse.util import Clock
|
||||
|
||||
|
@ -55,12 +56,15 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase):
|
|||
federation_transport_client=self.federation_transport_client,
|
||||
)
|
||||
|
||||
hs.get_storage_controllers().state.get_current_hosts_in_room = AsyncMock( # type: ignore[method-assign]
|
||||
self.main_store = hs.get_datastores().main
|
||||
self.state_controller = hs.get_storage_controllers().state
|
||||
|
||||
self.state_controller.get_current_hosts_in_room = AsyncMock( # type: ignore[method-assign]
|
||||
return_value={"test", "host2"}
|
||||
)
|
||||
|
||||
hs.get_storage_controllers().state.get_current_hosts_in_room_or_partial_state_approximation = ( # type: ignore[method-assign]
|
||||
hs.get_storage_controllers().state.get_current_hosts_in_room
|
||||
self.state_controller.get_current_hosts_in_room_or_partial_state_approximation = ( # type: ignore[method-assign]
|
||||
self.state_controller.get_current_hosts_in_room
|
||||
)
|
||||
|
||||
return hs
|
||||
|
@ -185,12 +189,15 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase):
|
|||
],
|
||||
)
|
||||
|
||||
def test_send_receipts_with_backoff(self) -> None:
|
||||
"""Send two receipts in quick succession; the second should be flushed, but
|
||||
only after 20ms"""
|
||||
def test_send_receipts_with_backoff_small_room(self) -> None:
|
||||
"""Read receipt in small rooms should not be delayed"""
|
||||
mock_send_transaction = self.federation_transport_client.send_transaction
|
||||
mock_send_transaction.return_value = {}
|
||||
|
||||
self.state_controller.get_current_hosts_in_room_or_partial_state_approximation = AsyncMock( # type: ignore[method-assign]
|
||||
return_value={"test", "host2"}
|
||||
)
|
||||
|
||||
sender = self.hs.get_federation_sender()
|
||||
receipt = ReadReceipt(
|
||||
"room_id",
|
||||
|
@ -206,7 +213,104 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase):
|
|||
|
||||
# expect a call to send_transaction
|
||||
mock_send_transaction.assert_called_once()
|
||||
json_cb = mock_send_transaction.call_args[0][1]
|
||||
self._assert_edu_in_call(mock_send_transaction.call_args[0][1])
|
||||
|
||||
def test_send_receipts_with_backoff_recent_event(self) -> None:
|
||||
"""Read receipt for a recent message should not be delayed"""
|
||||
mock_send_transaction = self.federation_transport_client.send_transaction
|
||||
mock_send_transaction.return_value = {}
|
||||
|
||||
# Pretend this is a big room
|
||||
self.state_controller.get_current_hosts_in_room_or_partial_state_approximation = AsyncMock( # type: ignore[method-assign]
|
||||
return_value={"test"} | {f"host{i}" for i in range(20)}
|
||||
)
|
||||
|
||||
self.main_store.get_metadata_for_event = AsyncMock(
|
||||
return_value=EventMetadata(
|
||||
received_ts=self.clock.time_msec(),
|
||||
sender="@test:test",
|
||||
)
|
||||
)
|
||||
|
||||
sender = self.hs.get_federation_sender()
|
||||
receipt = ReadReceipt(
|
||||
"room_id",
|
||||
"m.read",
|
||||
"user_id",
|
||||
["event_id"],
|
||||
thread_id=None,
|
||||
data={"ts": 1234},
|
||||
)
|
||||
self.get_success(sender.send_read_receipt(receipt))
|
||||
|
||||
self.pump()
|
||||
|
||||
# expect a call to send_transaction for each host
|
||||
self.assertEqual(mock_send_transaction.call_count, 20)
|
||||
self._assert_edu_in_call(mock_send_transaction.call_args[0][1])
|
||||
|
||||
mock_send_transaction.reset_mock()
|
||||
|
||||
def test_send_receipts_with_backoff_sender(self) -> None:
|
||||
"""Read receipt for a message should not be delayed to the sender, but
|
||||
is delayed to everyone else"""
|
||||
mock_send_transaction = self.federation_transport_client.send_transaction
|
||||
mock_send_transaction.return_value = {}
|
||||
|
||||
# Pretend this is a big room
|
||||
self.state_controller.get_current_hosts_in_room_or_partial_state_approximation = AsyncMock( # type: ignore[method-assign]
|
||||
return_value={"test"} | {f"host{i}" for i in range(20)}
|
||||
)
|
||||
|
||||
self.main_store.get_metadata_for_event = AsyncMock(
|
||||
return_value=EventMetadata(
|
||||
received_ts=self.clock.time_msec() - 5 * 60_000,
|
||||
sender="@test:host1",
|
||||
)
|
||||
)
|
||||
|
||||
sender = self.hs.get_federation_sender()
|
||||
receipt = ReadReceipt(
|
||||
"room_id",
|
||||
"m.read",
|
||||
"user_id",
|
||||
["event_id"],
|
||||
thread_id=None,
|
||||
data={"ts": 1234},
|
||||
)
|
||||
self.get_success(sender.send_read_receipt(receipt))
|
||||
|
||||
self.pump()
|
||||
|
||||
# First, expect a call to send_transaction for the sending host
|
||||
mock_send_transaction.assert_called()
|
||||
|
||||
transaction = mock_send_transaction.call_args_list[0][0][0]
|
||||
self.assertEqual(transaction.destination, "host1")
|
||||
self._assert_edu_in_call(mock_send_transaction.call_args_list[0][0][1])
|
||||
|
||||
# We also expect a call to one of the other hosts, as the first
|
||||
# destination to wake up.
|
||||
self.assertEqual(mock_send_transaction.call_count, 2)
|
||||
self._assert_edu_in_call(mock_send_transaction.call_args[0][1])
|
||||
|
||||
mock_send_transaction.reset_mock()
|
||||
|
||||
# We now expect to see 18 more transactions to the remaining hosts
|
||||
# periodically.
|
||||
for _ in range(18):
|
||||
self.reactor.advance(
|
||||
1.0
|
||||
/ self.hs.config.ratelimiting.federation_rr_transactions_per_room_per_second
|
||||
)
|
||||
|
||||
mock_send_transaction.assert_called_once()
|
||||
self._assert_edu_in_call(mock_send_transaction.call_args[0][1])
|
||||
mock_send_transaction.reset_mock()
|
||||
|
||||
def _assert_edu_in_call(self, json_cb: Callable[[], JsonDict]) -> None:
|
||||
"""Assert that the given `json_cb` from a `send_transaction` has a
|
||||
receipt in it."""
|
||||
data = json_cb()
|
||||
self.assertEqual(
|
||||
data["edus"],
|
||||
|
@ -226,46 +330,6 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase):
|
|||
}
|
||||
],
|
||||
)
|
||||
mock_send_transaction.reset_mock()
|
||||
|
||||
# send the second RR
|
||||
receipt = ReadReceipt(
|
||||
"room_id",
|
||||
"m.read",
|
||||
"user_id",
|
||||
["other_id"],
|
||||
thread_id=None,
|
||||
data={"ts": 1234},
|
||||
)
|
||||
self.successResultOf(defer.ensureDeferred(sender.send_read_receipt(receipt)))
|
||||
self.pump()
|
||||
mock_send_transaction.assert_not_called()
|
||||
|
||||
self.reactor.advance(19)
|
||||
mock_send_transaction.assert_not_called()
|
||||
|
||||
self.reactor.advance(10)
|
||||
mock_send_transaction.assert_called_once()
|
||||
json_cb = mock_send_transaction.call_args[0][1]
|
||||
data = json_cb()
|
||||
self.assertEqual(
|
||||
data["edus"],
|
||||
[
|
||||
{
|
||||
"edu_type": EduTypes.RECEIPT,
|
||||
"content": {
|
||||
"room_id": {
|
||||
"m.read": {
|
||||
"user_id": {
|
||||
"event_ids": ["other_id"],
|
||||
"data": {"ts": 1234},
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
}
|
||||
],
|
||||
)
|
||||
|
||||
|
||||
class FederationSenderPresenceTestCases(HomeserverTestCase):
|
||||
|
|
Loading…
Reference in a new issue