mirror of
https://github.com/element-hq/synapse.git
synced 2024-11-22 01:25:44 +03:00
Fix overflows in /messages backfill calculation (#13936)
* Reproduce bug * Compute `least_function` first * Substitute `least_function` with an f-string * Bugfix: avoid overflow Co-authored-by: Eric Eastwood <erice@element.io>
This commit is contained in:
parent
1cc2ca81ba
commit
e8f30a76ca
3 changed files with 103 additions and 41 deletions
1
changelog.d/13936.feature
Normal file
1
changelog.d/13936.feature
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Exponentially backoff from backfilling the same event over and over.
|
|
@ -73,13 +73,30 @@ pdus_pruned_from_federation_queue = Counter(
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
BACKFILL_EVENT_BACKOFF_UPPER_BOUND_SECONDS: int = int(
|
# Parameters controlling exponential backoff between backfill failures.
|
||||||
datetime.timedelta(days=7).total_seconds()
|
# After the first failure to backfill, we wait 2 hours before trying again. If the
|
||||||
)
|
# second attempt fails, we wait 4 hours before trying again. If the third attempt fails,
|
||||||
BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_SECONDS: int = int(
|
# we wait 8 hours before trying again, ... and so on.
|
||||||
datetime.timedelta(hours=1).total_seconds()
|
#
|
||||||
|
# Each successive backoff period is twice as long as the last. However we cap this
|
||||||
|
# period at a maximum of 2^8 = 256 hours: a little over 10 days. (This is the smallest
|
||||||
|
# power of 2 which yields a maximum backoff period of at least 7 days---which was the
|
||||||
|
# original maximum backoff period.) Even when we hit this cap, we will continue to
|
||||||
|
# make backfill attempts once every 10 days.
|
||||||
|
BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS = 8
|
||||||
|
BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS = int(
|
||||||
|
datetime.timedelta(hours=1).total_seconds() * 1000
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# We need a cap on the power of 2 or else the backoff period
|
||||||
|
# 2^N * (milliseconds per hour)
|
||||||
|
# will overflow when calcuated within the database. We ensure overflow does not occur
|
||||||
|
# by checking that the largest backoff period fits in a 32-bit signed integer.
|
||||||
|
_LONGEST_BACKOFF_PERIOD_MILLISECONDS = (
|
||||||
|
2**BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS
|
||||||
|
) * BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS
|
||||||
|
assert 0 < _LONGEST_BACKOFF_PERIOD_MILLISECONDS <= ((2**31) - 1)
|
||||||
|
|
||||||
|
|
||||||
# All the info we need while iterating the DAG while backfilling
|
# All the info we need while iterating the DAG while backfilling
|
||||||
@attr.s(frozen=True, slots=True, auto_attribs=True)
|
@attr.s(frozen=True, slots=True, auto_attribs=True)
|
||||||
|
@ -767,7 +784,15 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
|
||||||
# persisted in our database yet (meaning we don't know their depth
|
# persisted in our database yet (meaning we don't know their depth
|
||||||
# specifically). So we need to look for the approximate depth from
|
# specifically). So we need to look for the approximate depth from
|
||||||
# the events connected to the current backwards extremeties.
|
# the events connected to the current backwards extremeties.
|
||||||
sql = """
|
|
||||||
|
if isinstance(self.database_engine, PostgresEngine):
|
||||||
|
least_function = "LEAST"
|
||||||
|
elif isinstance(self.database_engine, Sqlite3Engine):
|
||||||
|
least_function = "MIN"
|
||||||
|
else:
|
||||||
|
raise RuntimeError("Unknown database engine")
|
||||||
|
|
||||||
|
sql = f"""
|
||||||
SELECT backward_extrem.event_id, event.depth FROM events AS event
|
SELECT backward_extrem.event_id, event.depth FROM events AS event
|
||||||
/**
|
/**
|
||||||
* Get the edge connections from the event_edges table
|
* Get the edge connections from the event_edges table
|
||||||
|
@ -825,7 +850,10 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
|
||||||
*/
|
*/
|
||||||
AND (
|
AND (
|
||||||
failed_backfill_attempt_info.event_id IS NULL
|
failed_backfill_attempt_info.event_id IS NULL
|
||||||
OR ? /* current_time */ >= failed_backfill_attempt_info.last_attempt_ts + /*least*/%s((1 << failed_backfill_attempt_info.num_attempts) * ? /* step */, ? /* upper bound */)
|
OR ? /* current_time */ >= failed_backfill_attempt_info.last_attempt_ts + (
|
||||||
|
(1 << {least_function}(failed_backfill_attempt_info.num_attempts, ? /* max doubling steps */))
|
||||||
|
* ? /* step */
|
||||||
|
)
|
||||||
)
|
)
|
||||||
/**
|
/**
|
||||||
* Sort from highest (closest to the `current_depth`) to the lowest depth
|
* Sort from highest (closest to the `current_depth`) to the lowest depth
|
||||||
|
@ -837,22 +865,15 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
|
||||||
LIMIT ?
|
LIMIT ?
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if isinstance(self.database_engine, PostgresEngine):
|
|
||||||
least_function = "least"
|
|
||||||
elif isinstance(self.database_engine, Sqlite3Engine):
|
|
||||||
least_function = "min"
|
|
||||||
else:
|
|
||||||
raise RuntimeError("Unknown database engine")
|
|
||||||
|
|
||||||
txn.execute(
|
txn.execute(
|
||||||
sql % (least_function,),
|
sql,
|
||||||
(
|
(
|
||||||
room_id,
|
room_id,
|
||||||
False,
|
False,
|
||||||
current_depth,
|
current_depth,
|
||||||
self._clock.time_msec(),
|
self._clock.time_msec(),
|
||||||
1000 * BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_SECONDS,
|
BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS,
|
||||||
1000 * BACKFILL_EVENT_BACKOFF_UPPER_BOUND_SECONDS,
|
BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS,
|
||||||
limit,
|
limit,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
@ -902,7 +923,14 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
|
||||||
def get_insertion_event_backward_extremities_in_room_txn(
|
def get_insertion_event_backward_extremities_in_room_txn(
|
||||||
txn: LoggingTransaction, room_id: str
|
txn: LoggingTransaction, room_id: str
|
||||||
) -> List[Tuple[str, int]]:
|
) -> List[Tuple[str, int]]:
|
||||||
sql = """
|
if isinstance(self.database_engine, PostgresEngine):
|
||||||
|
least_function = "LEAST"
|
||||||
|
elif isinstance(self.database_engine, Sqlite3Engine):
|
||||||
|
least_function = "MIN"
|
||||||
|
else:
|
||||||
|
raise RuntimeError("Unknown database engine")
|
||||||
|
|
||||||
|
sql = f"""
|
||||||
SELECT
|
SELECT
|
||||||
insertion_event_extremity.event_id, event.depth
|
insertion_event_extremity.event_id, event.depth
|
||||||
/* We only want insertion events that are also marked as backwards extremities */
|
/* We only want insertion events that are also marked as backwards extremities */
|
||||||
|
@ -942,7 +970,10 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
|
||||||
*/
|
*/
|
||||||
AND (
|
AND (
|
||||||
failed_backfill_attempt_info.event_id IS NULL
|
failed_backfill_attempt_info.event_id IS NULL
|
||||||
OR ? /* current_time */ >= failed_backfill_attempt_info.last_attempt_ts + /*least*/%s((1 << failed_backfill_attempt_info.num_attempts) * ? /* step */, ? /* upper bound */)
|
OR ? /* current_time */ >= failed_backfill_attempt_info.last_attempt_ts + (
|
||||||
|
(1 << {least_function}(failed_backfill_attempt_info.num_attempts, ? /* max doubling steps */))
|
||||||
|
* ? /* step */
|
||||||
|
)
|
||||||
)
|
)
|
||||||
/**
|
/**
|
||||||
* Sort from highest (closest to the `current_depth`) to the lowest depth
|
* Sort from highest (closest to the `current_depth`) to the lowest depth
|
||||||
|
@ -954,21 +985,14 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
|
||||||
LIMIT ?
|
LIMIT ?
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if isinstance(self.database_engine, PostgresEngine):
|
|
||||||
least_function = "least"
|
|
||||||
elif isinstance(self.database_engine, Sqlite3Engine):
|
|
||||||
least_function = "min"
|
|
||||||
else:
|
|
||||||
raise RuntimeError("Unknown database engine")
|
|
||||||
|
|
||||||
txn.execute(
|
txn.execute(
|
||||||
sql % (least_function,),
|
sql,
|
||||||
(
|
(
|
||||||
room_id,
|
room_id,
|
||||||
current_depth,
|
current_depth,
|
||||||
self._clock.time_msec(),
|
self._clock.time_msec(),
|
||||||
1000 * BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_SECONDS,
|
BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS,
|
||||||
1000 * BACKFILL_EVENT_BACKOFF_UPPER_BOUND_SECONDS,
|
BACKFILL_EVENT_EXPONENTIAL_BACKOFF_STEP_MILLISECONDS,
|
||||||
limit,
|
limit,
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
|
@ -766,9 +766,7 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
|
||||||
self.store.get_backfill_points_in_room(room_id, depth_map["B"], limit=100)
|
self.store.get_backfill_points_in_room(room_id, depth_map["B"], limit=100)
|
||||||
)
|
)
|
||||||
backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
|
backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
|
||||||
self.assertListEqual(
|
self.assertEqual(backfill_event_ids, ["b6", "b5", "b4", "2", "b3", "b2", "b1"])
|
||||||
backfill_event_ids, ["b6", "b5", "b4", "2", "b3", "b2", "b1"]
|
|
||||||
)
|
|
||||||
|
|
||||||
# Try at "A"
|
# Try at "A"
|
||||||
backfill_points = self.get_success(
|
backfill_points = self.get_success(
|
||||||
|
@ -814,7 +812,7 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
|
||||||
)
|
)
|
||||||
backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
|
backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
|
||||||
# Only the backfill points that we didn't record earlier exist here.
|
# Only the backfill points that we didn't record earlier exist here.
|
||||||
self.assertListEqual(backfill_event_ids, ["b6", "2", "b1"])
|
self.assertEqual(backfill_event_ids, ["b6", "2", "b1"])
|
||||||
|
|
||||||
def test_get_backfill_points_in_room_attempted_event_retry_after_backoff_duration(
|
def test_get_backfill_points_in_room_attempted_event_retry_after_backoff_duration(
|
||||||
self,
|
self,
|
||||||
|
@ -860,7 +858,7 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
|
||||||
self.store.get_backfill_points_in_room(room_id, depth_map["A"], limit=100)
|
self.store.get_backfill_points_in_room(room_id, depth_map["A"], limit=100)
|
||||||
)
|
)
|
||||||
backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
|
backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
|
||||||
self.assertListEqual(backfill_event_ids, ["b3", "b2"])
|
self.assertEqual(backfill_event_ids, ["b3", "b2"])
|
||||||
|
|
||||||
# Now advance time by 20 hours (above 2^4 because we made 4 attemps) and
|
# Now advance time by 20 hours (above 2^4 because we made 4 attemps) and
|
||||||
# see if we can now backfill it
|
# see if we can now backfill it
|
||||||
|
@ -871,7 +869,48 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
|
||||||
self.store.get_backfill_points_in_room(room_id, depth_map["A"], limit=100)
|
self.store.get_backfill_points_in_room(room_id, depth_map["A"], limit=100)
|
||||||
)
|
)
|
||||||
backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
|
backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
|
||||||
self.assertListEqual(backfill_event_ids, ["b3", "b2", "b1"])
|
self.assertEqual(backfill_event_ids, ["b3", "b2", "b1"])
|
||||||
|
|
||||||
|
def test_get_backfill_points_in_room_works_after_many_failed_pull_attempts_that_could_naively_overflow(
|
||||||
|
self,
|
||||||
|
) -> None:
|
||||||
|
"""
|
||||||
|
A test that reproduces #13929 (Postgres only).
|
||||||
|
|
||||||
|
Test to make sure we can still get backfill points after many failed pull
|
||||||
|
attempts that cause us to backoff to the limit. Even if the backoff formula
|
||||||
|
would tell us to wait for more seconds than can be expressed in a 32 bit
|
||||||
|
signed int.
|
||||||
|
"""
|
||||||
|
setup_info = self._setup_room_for_backfill_tests()
|
||||||
|
room_id = setup_info.room_id
|
||||||
|
depth_map = setup_info.depth_map
|
||||||
|
|
||||||
|
# Pretend that we have tried and failed 10 times to backfill event b1.
|
||||||
|
for _ in range(10):
|
||||||
|
self.get_success(
|
||||||
|
self.store.record_event_failed_pull_attempt(room_id, "b1", "fake cause")
|
||||||
|
)
|
||||||
|
|
||||||
|
# If the backoff periods grow without limit:
|
||||||
|
# After the first failed attempt, we would have backed off for 1 << 1 = 2 hours.
|
||||||
|
# After the second failed attempt we would have backed off for 1 << 2 = 4 hours,
|
||||||
|
# so after the 10th failed attempt we should backoff for 1 << 10 == 1024 hours.
|
||||||
|
# Wait 1100 hours just so we have a nice round number.
|
||||||
|
self.reactor.advance(datetime.timedelta(hours=1100).total_seconds())
|
||||||
|
|
||||||
|
# 1024 hours in milliseconds is 1024 * 3600000, which exceeds the largest 32 bit
|
||||||
|
# signed integer. The bug we're reproducing is that this overflow causes an
|
||||||
|
# error in postgres preventing us from fetching a set of backwards extremities
|
||||||
|
# to retry fetching.
|
||||||
|
backfill_points = self.get_success(
|
||||||
|
self.store.get_backfill_points_in_room(room_id, depth_map["A"], limit=100)
|
||||||
|
)
|
||||||
|
|
||||||
|
# We should aim to fetch all backoff points: b1's latest backoff period has
|
||||||
|
# expired, and we haven't tried the rest.
|
||||||
|
backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
|
||||||
|
self.assertEqual(backfill_event_ids, ["b3", "b2", "b1"])
|
||||||
|
|
||||||
def _setup_room_for_insertion_backfill_tests(self) -> _BackfillSetupInfo:
|
def _setup_room_for_insertion_backfill_tests(self) -> _BackfillSetupInfo:
|
||||||
"""
|
"""
|
||||||
|
@ -965,9 +1004,7 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
|
backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
|
||||||
self.assertListEqual(
|
self.assertEqual(backfill_event_ids, ["insertion_eventB", "insertion_eventA"])
|
||||||
backfill_event_ids, ["insertion_eventB", "insertion_eventA"]
|
|
||||||
)
|
|
||||||
|
|
||||||
# Try at "insertion_eventA"
|
# Try at "insertion_eventA"
|
||||||
backfill_points = self.get_success(
|
backfill_points = self.get_success(
|
||||||
|
@ -1011,7 +1048,7 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
|
||||||
)
|
)
|
||||||
backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
|
backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
|
||||||
# Only the backfill points that we didn't record earlier exist here.
|
# Only the backfill points that we didn't record earlier exist here.
|
||||||
self.assertListEqual(backfill_event_ids, ["insertion_eventB"])
|
self.assertEqual(backfill_event_ids, ["insertion_eventB"])
|
||||||
|
|
||||||
def test_get_insertion_event_backward_extremities_in_room_attempted_event_retry_after_backoff_duration(
|
def test_get_insertion_event_backward_extremities_in_room_attempted_event_retry_after_backoff_duration(
|
||||||
self,
|
self,
|
||||||
|
@ -1069,7 +1106,7 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
|
backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
|
||||||
self.assertListEqual(backfill_event_ids, [])
|
self.assertEqual(backfill_event_ids, [])
|
||||||
|
|
||||||
# Now advance time by 20 hours (above 2^4 because we made 4 attemps) and
|
# Now advance time by 20 hours (above 2^4 because we made 4 attemps) and
|
||||||
# see if we can now backfill it
|
# see if we can now backfill it
|
||||||
|
@ -1083,7 +1120,7 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
|
backfill_event_ids = [backfill_point[0] for backfill_point in backfill_points]
|
||||||
self.assertListEqual(backfill_event_ids, ["insertion_eventA"])
|
self.assertEqual(backfill_event_ids, ["insertion_eventA"])
|
||||||
|
|
||||||
|
|
||||||
@attr.s
|
@attr.s
|
||||||
|
|
Loading…
Reference in a new issue