Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes

This commit is contained in:
Erik Johnston 2024-11-29 10:11:51 +00:00
commit 445b83bdc7
5 changed files with 65 additions and 56 deletions

1
changelog.d/17962.misc Normal file
View file

@ -0,0 +1 @@
Fix new scheduled tasks jumping the queue.

View file

@ -495,7 +495,7 @@ class LockReleasedCommand(Command):
class NewActiveTaskCommand(_SimpleCommand): class NewActiveTaskCommand(_SimpleCommand):
"""Sent to inform instance handling background tasks that a new active task is available to run. """Sent to inform instance handling background tasks that a new task is ready to run.
Format:: Format::

View file

@ -727,7 +727,7 @@ class ReplicationCommandHandler:
) -> None: ) -> None:
"""Called when get a new NEW_ACTIVE_TASK command.""" """Called when get a new NEW_ACTIVE_TASK command."""
if self._task_scheduler: if self._task_scheduler:
self._task_scheduler.launch_task_by_id(cmd.data) self._task_scheduler.on_new_task(cmd.data)
def new_connection(self, connection: IReplicationConnection) -> None: def new_connection(self, connection: IReplicationConnection) -> None:
"""Called when we have a new connection.""" """Called when we have a new connection."""

View file

@ -174,9 +174,10 @@ class TaskScheduler:
The id of the scheduled task The id of the scheduled task
""" """
status = TaskStatus.SCHEDULED status = TaskStatus.SCHEDULED
start_now = False
if timestamp is None or timestamp < self._clock.time_msec(): if timestamp is None or timestamp < self._clock.time_msec():
timestamp = self._clock.time_msec() timestamp = self._clock.time_msec()
status = TaskStatus.ACTIVE start_now = True
task = ScheduledTask( task = ScheduledTask(
random_string(16), random_string(16),
@ -190,9 +191,11 @@ class TaskScheduler:
) )
await self._store.insert_scheduled_task(task) await self._store.insert_scheduled_task(task)
if status == TaskStatus.ACTIVE: # If the task is ready to run immediately, run the scheduling algorithm now
# rather than waiting
if start_now:
if self._run_background_tasks: if self._run_background_tasks:
await self._launch_task(task) self._launch_scheduled_tasks()
else: else:
self._hs.get_replication_command_handler().send_new_active_task(task.id) self._hs.get_replication_command_handler().send_new_active_task(task.id)
@ -300,23 +303,13 @@ class TaskScheduler:
raise Exception(f"Task {id} is currently ACTIVE and can't be deleted") raise Exception(f"Task {id} is currently ACTIVE and can't be deleted")
await self._store.delete_scheduled_task(id) await self._store.delete_scheduled_task(id)
def launch_task_by_id(self, id: str) -> None: def on_new_task(self, task_id: str) -> None:
"""Try launching the task with the given ID.""" """Handle a notification that a new ready-to-run task has been added to the queue"""
# Don't bother trying to launch new tasks if we're already at capacity. # Just run the scheduler
if len(self._running_tasks) >= TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS: self._launch_scheduled_tasks()
return
run_as_background_process("launch_task_by_id", self._launch_task_by_id, id) def _launch_scheduled_tasks(self) -> None:
"""Retrieve and launch scheduled tasks that should be running at this time."""
async def _launch_task_by_id(self, id: str) -> None:
"""Helper async function for `launch_task_by_id`."""
task = await self.get_task(id)
if task:
await self._launch_task(task)
@wrap_as_background_process("launch_scheduled_tasks")
async def _launch_scheduled_tasks(self) -> None:
"""Retrieve and launch scheduled tasks that should be running at that time."""
# Don't bother trying to launch new tasks if we're already at capacity. # Don't bother trying to launch new tasks if we're already at capacity.
if len(self._running_tasks) >= TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS: if len(self._running_tasks) >= TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS:
return return
@ -326,20 +319,26 @@ class TaskScheduler:
self._launching_new_tasks = True self._launching_new_tasks = True
try: async def inner() -> None:
for task in await self.get_tasks( try:
statuses=[TaskStatus.ACTIVE], limit=self.MAX_CONCURRENT_RUNNING_TASKS for task in await self.get_tasks(
): statuses=[TaskStatus.ACTIVE],
await self._launch_task(task) limit=self.MAX_CONCURRENT_RUNNING_TASKS,
for task in await self.get_tasks( ):
statuses=[TaskStatus.SCHEDULED], # _launch_task will ignore tasks that we're already running, and
max_timestamp=self._clock.time_msec(), # will also do nothing if we're already at the maximum capacity.
limit=self.MAX_CONCURRENT_RUNNING_TASKS, await self._launch_task(task)
): for task in await self.get_tasks(
await self._launch_task(task) statuses=[TaskStatus.SCHEDULED],
max_timestamp=self._clock.time_msec(),
limit=self.MAX_CONCURRENT_RUNNING_TASKS,
):
await self._launch_task(task)
finally: finally:
self._launching_new_tasks = False self._launching_new_tasks = False
run_as_background_process("launch_scheduled_tasks", inner)
@wrap_as_background_process("clean_scheduled_tasks") @wrap_as_background_process("clean_scheduled_tasks")
async def _clean_scheduled_tasks(self) -> None: async def _clean_scheduled_tasks(self) -> None:

View file

@ -18,8 +18,7 @@
# [This file includes modifications made by New Vector Limited] # [This file includes modifications made by New Vector Limited]
# #
# #
from typing import List, Optional, Tuple
from typing import Optional, Tuple
from twisted.internet.task import deferLater from twisted.internet.task import deferLater
from twisted.test.proto_helpers import MemoryReactor from twisted.test.proto_helpers import MemoryReactor
@ -104,33 +103,43 @@ class TestTaskScheduler(HomeserverTestCase):
) )
) )
# This is to give the time to the active tasks to finish def get_tasks_of_status(status: TaskStatus) -> List[ScheduledTask]:
self.reactor.advance(1) tasks = (
self.get_success(self.task_scheduler.get_task(task_id))
# Check that only MAX_CONCURRENT_RUNNING_TASKS tasks has run and that one for task_id in task_ids
# is still scheduled. )
tasks = [ return [t for t in tasks if t is not None and t.status == status]
self.get_success(self.task_scheduler.get_task(task_id))
for task_id in task_ids
]
# At this point, there should be MAX_CONCURRENT_RUNNING_TASKS active tasks and
# one scheduled task.
self.assertEquals( self.assertEquals(
len( len(get_tasks_of_status(TaskStatus.ACTIVE)),
[t for t in tasks if t is not None and t.status == TaskStatus.COMPLETE]
),
TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS, TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS,
) )
self.assertEquals(
len(get_tasks_of_status(TaskStatus.SCHEDULED)),
1,
)
scheduled_tasks = [ # Give the time to the active tasks to finish
t for t in tasks if t is not None and t.status == TaskStatus.ACTIVE
]
self.assertEquals(len(scheduled_tasks), 1)
# We need to wait for the next run of the scheduler loop
self.reactor.advance((TaskScheduler.SCHEDULE_INTERVAL_MS / 1000))
self.reactor.advance(1) self.reactor.advance(1)
# Check that the last task has been properly executed after the next scheduler loop run # Check that MAX_CONCURRENT_RUNNING_TASKS tasks have run and that one
# is still scheduled.
self.assertEquals(
len(get_tasks_of_status(TaskStatus.COMPLETE)),
TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS,
)
scheduled_tasks = get_tasks_of_status(TaskStatus.SCHEDULED)
self.assertEquals(len(scheduled_tasks), 1)
# The scheduled task should start 0.1s after the first of the active tasks
# finishes
self.reactor.advance(0.1)
self.assertEquals(len(get_tasks_of_status(TaskStatus.ACTIVE)), 1)
# ... and should finally complete after another second
self.reactor.advance(1)
prev_scheduled_task = self.get_success( prev_scheduled_task = self.get_success(
self.task_scheduler.get_task(scheduled_tasks[0].id) self.task_scheduler.get_task(scheduled_tasks[0].id)
) )