diff --git a/changelog.d/17992.doc b/changelog.d/17992.doc new file mode 100644 index 0000000000..74afabe40f --- /dev/null +++ b/changelog.d/17992.doc @@ -0,0 +1 @@ +Improve documentation for the `TaskScheduler` class. \ No newline at end of file diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index 3ed457bd30..4683d09cd7 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -46,33 +46,43 @@ logger = logging.getLogger(__name__) class TaskScheduler: """ - This is a simple task sheduler aimed at resumable tasks: usually we use `run_in_background` - to launch a background task, or Twisted `deferLater` if we want to do so later on. + This is a simple task scheduler designed for resumable tasks. Normally, + you'd use `run_in_background` to start a background task or Twisted's + `deferLater` if you want to run it later. - The problem with that is that the tasks will just stop and never be resumed if synapse - is stopped for whatever reason. + The issue is that these tasks stop completely and won't resume if Synapse is + shut down for any reason. - How this works: - - A function mapped to a named action should first be registered with `register_action`. - This function will be called when trying to resuming tasks after a synapse shutdown, - so this registration should happen when synapse is initialised, NOT right before scheduling - a task. - - A task can then be launched using this named action with `schedule_task`. A `params` dict - can be passed, and it will be available to the registered function when launched. This task - can be launch either now-ish, or later on by giving a `timestamp` parameter. + Here's how it works: - The function may call `update_task` at any time to update the `result` of the task, - and this can be used to resume the task at a specific point and/or to convey a result to - the code launching the task. - You can also specify the `result` (and/or an `error`) when returning from the function. + - Register an Action: First, you need to register a function to a named + action using `register_action`. This function will be called to resume tasks + after a Synapse shutdown. Make sure to register it when Synapse initializes, + not right before scheduling the task. - The reconciliation loop runs every minute, so this is not a precise scheduler. - There is a limit of 10 concurrent tasks, so tasks may be delayed if the pool is already - full. In this regard, please take great care that scheduled tasks can actually finished. - For now there is no mechanism to stop a running task if it is stuck. + - Schedule a Task: You can launch a task linked to the named action + using `schedule_task`. You can pass a `params` dictionary, which will be + passed to the registered function when it's executed. Tasks can be scheduled + to run either immediately or later by specifying a `timestamp`. - Tasks will be run on the worker specified with `run_background_tasks_on` config, - or the main one by default. + - Update Task: The function handling the task can call `update_task` at + any point to update the task's `result`. This lets you resume the task from + a specific point or pass results back to the code that scheduled it. When + the function completes, you can also return a `result` or an `error`. + + Things to keep in mind: + + - The reconciliation loop runs every minute, so this is not a high-precision + scheduler. + + - Only 10 tasks can run at the same time. If the pool is full, tasks may be + delayed. Make sure your scheduled tasks can actually finish. + + - Currently, there's no way to stop a task if it gets stuck. + + - Tasks will run on the worker defined by the `run_background_tasks_on` + setting in your configuration. If no worker is specified, they'll run on + the main one by default. """ # Precision of the scheduler, evaluation of tasks to run will only happen @@ -157,7 +167,7 @@ class TaskScheduler: params: Optional[JsonMapping] = None, ) -> str: """Schedule a new potentially resumable task. A function matching the specified - `action` should have be registered with `register_action` before the task is run. + `action` should've been registered with `register_action` before the task is run. Args: action: the name of a previously registered action @@ -210,15 +220,15 @@ class TaskScheduler: result: Optional[JsonMapping] = None, error: Optional[str] = None, ) -> bool: - """Update some task associated values. This is exposed publicly so it can - be used inside task functions, mainly to update the result and be able to - resume a task at a specific step after a restart of synapse. + """Update some task-associated values. This is exposed publicly so it can + be used inside task functions, mainly to update the result or resume + a task at a specific step after a restart of synapse. It can also be used to stage a task, by setting the `status` to `SCHEDULED` with a new timestamp. - The `status` can only be set to `ACTIVE` or `SCHEDULED`, `COMPLETE` and `FAILED` - are terminal status and can only be set by returning it in the function. + The `status` can only be set to `ACTIVE` or `SCHEDULED`. `COMPLETE` and `FAILED` + are terminal statuses and can only be set by returning them from the function. Args: id: the id of the task to update @@ -226,6 +236,12 @@ class TaskScheduler: status: the new `TaskStatus` of the task result: the new result of the task error: the new error of the task + + Returns: + True if the update was successful, False otherwise. + + Raises: + Exception: If a status other than `ACTIVE` or `SCHEDULED` was passed. """ if status == TaskStatus.COMPLETE or status == TaskStatus.FAILED: raise Exception( @@ -263,9 +279,9 @@ class TaskScheduler: max_timestamp: Optional[int] = None, limit: Optional[int] = None, ) -> List[ScheduledTask]: - """Get a list of tasks. Returns all the tasks if no args is provided. + """Get a list of tasks. Returns all the tasks if no args are provided. - If an arg is `None` all tasks matching the other args will be selected. + If an arg is `None`, all tasks matching the other args will be selected. If an arg is an empty list, the corresponding value of the task needs to be `None` to be selected. @@ -277,8 +293,8 @@ class TaskScheduler: a timestamp inferior to the specified one limit: Only return `limit` number of rows if set. - Returns - A list of `ScheduledTask`, ordered by increasing timestamps + Returns: + A list of `ScheduledTask`, ordered by increasing timestamps. """ return await self._store.get_scheduled_tasks( actions=actions,