A backport of django.tasks - Django's built-in Tasks framework.
python -m pip install django-tasks
The first step is to add django_tasks to your INSTALLED_APPS.
INSTALLED_APPS = [
# ...
"django_tasks",
]Secondly, you'll need to configure a backend. This connects the tasks to whatever is going to execute them.
If omitted, the following configuration is used:
TASKS = {
"default": {
"BACKEND": "django_tasks.backends.immediate.ImmediateBackend"
}
}A few backends are included by default:
django_tasks.backends.dummy.DummyBackend: Don't execute the tasks, just store them. This is especially useful for testing.django_tasks.backends.immediate.ImmediateBackend: Execute the task immediately in the current thread
Prior to 0.12.0, django-tasks-db and django-tasks-rq were also included to provide database and RQ based backends.
A task is created with the task decorator.
from django_tasks import task
@task()
def calculate_meaning_of_life() -> int:
return 42The task decorator accepts a few arguments to customize the task:
priority: The priority of the task (between -100 and 100. Larger numbers are higher priority. 0 by default)queue_name: Whether to run the task on a specific queuebackend: Name of the backend for this task to use (as defined inTASKS)
modified_task = calculate_meaning_of_life.using(priority=10)In addition to the above attributes, run_after can be passed to specify a specific time the task should run.
Sometimes the running task may need to know context about how it was enqueued. To receive the task context as an argument to your task function, pass takes_context to the decorator and ensure the task takes a context as the first argument.
from django_tasks import task, TaskContext
@task(takes_context=True)
def calculate_meaning_of_life(context: TaskContext) -> int:
return 42The task context has the following attributes:
task_result: The running task resultattempt: The current attempt number for the task
This API will be extended with additional features in future.
To execute a task, call the enqueue method on it:
result = calculate_meaning_of_life.enqueue()The returned TaskResult can be interrogated to query the current state of the running task, as well as its return value.
If the task takes arguments, these can be passed as-is to enqueue.
By default, tasks are enqueued onto the "default" queue. When using multiple queues, it can be useful to constrain the allowed names, so tasks aren't missed.
TASKS = {
"default": {
"BACKEND": "django_tasks.backends.immediate.ImmediateBackend",
"QUEUES": ["default", "special"]
}
}Enqueueing tasks to an unknown queue name raises InvalidTaskError.
To disable queue name validation, set QUEUES to [].
When enqueueing a task, you get a TaskResult, however it may be useful to retrieve said result from somewhere else (another request, another task etc). This can be done with get_result (or aget_result):
result_id = result.id
# Later, somewhere else...
calculate_meaning_of_life.get_result(result_id)A result id should be considered an opaque string, whose length could be up to 64 characters. ID generation is backend-specific.
Only tasks of the same type can be retrieved this way. To retrieve the result of any task, you can call get_result on the backend:
from django_tasks import default_task_backend
default_task_backend.get_result(result_id)If your task returns something, it can be retrieved from the .return_value attribute on a TaskResult. Accessing this property on an unsuccessful task (ie not SUCCESSFUL) will raise a ValueError.
assert result.status == TaskResultStatus.SUCCESSFUL
assert result.return_value == 42If a result has been updated in the background, you can call refresh on it to update its values. Results obtained using get_result will always be up-to-date.
assert result.status == TaskResultStatus.READY
result.refresh()
assert result.status == TaskResultStatus.SUCCESSFULIf a task raised an exception, its .errors contains information about the error:
assert result.errors[0].exception_class == ValueErrorNote that this is just the type of exception, and contains no other values. The traceback information is reduced to a string that you can print to help debugging:
assert isinstance(result.errors[0].traceback, str)Note that currently, whilst .errors is a list, it will only ever contain a single element.
The number of times a task has been run is stored as the .attempts attribute. This will currently only ever be 0 or 1.
The date of the last attempt is stored as .last_attempted_at.
Because django-tasks enables support for multiple different backends, those backends may not support all features, and it can be useful to determine this at runtime to ensure the chosen task queue meets the requirements, or to gracefully degrade functionality if it doesn't.
supports_defer: Can tasks be enqueued with therun_afterattribute?supports_async_task: Can coroutines be enqueued?supports_get_result: Can results be retrieved after the fact (from any thread / process)?supports_priority: Can tasks be executed in a given priority order?
from django_tasks import default_task_backend
assert default_task_backend.supports_get_resultThis is particularly useful in combination with Django's system check framework.
A few Signals are provided to more easily respond to certain task events.
Whilst signals are available, they may not be the most maintainable approach.
django_tasks.signals.task_enqueued: Called when a task is enqueued. The sender is the backend class. Also called with the enqueuedtask_result.django_tasks.signals.task_finished: Called when a task finishes (SUCCESSFULorFAILED). The sender is the backend class. Also called with the finishedtask_result.django_tasks.signals.task_started: Called immediately before a task starts executing. The sender is the backend class. Also called with the startedtask_result.
See CONTRIBUTING.md for information on how to contribute.