Conversation
|
Warning Rate limit exceeded
Your organization is not enrolled in usage-based pricing. Contact your admin to enable usage-based pricing to continue reviews beyond the rate limit, or try again in 22 minutes and 53 seconds. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: defaults Review profile: CHILL Plan: Pro Run ID: 📒 Files selected for processing (4)
📝 WalkthroughWalkthroughAdds a backend-agnostic async task system with Django-Q and optional Celery support, task scheduling persisted in a new TaskSchedule model, settings/UI for selecting/configuring backends, and migration of existing async usages to the new abstraction. Changes
Sequence Diagram(s)sequenceDiagram
actor Client
participant API as API / Backend
participant TaskQueue as Task Abstraction<br/>(common.task_queue)
participant Backend as Selected Backend<br/>(Django-Q or Celery)
participant Worker as Worker / Broker
participant DB as Database<br/>(TaskSchedule)
Client->>API: request async work (async_task / schedule)
API->>TaskQueue: async_task(func, args, kwargs, ...)
TaskQueue->>TaskQueue: encode payload & choose backend
TaskQueue->>Backend: enqueue or schedule payload
Backend->>DB: create/update `TaskSchedule` (backend_job_id optional)
Backend->>Worker: push task to queue / broker
Backend-->>TaskQueue: return job_id
TaskQueue-->>API: acknowledge/enqueued
sequenceDiagram
actor Worker
participant Broker as Broker/Worker
participant TaskQueue as Task Abstraction<br/>(common.task_queue)
participant DB as Database<br/>(TaskSchedule)
participant Callback as Optional Callback
Broker->>TaskQueue: invoke execute_payload(payload)
TaskQueue->>TaskQueue: decode payload, import callable
TaskQueue->>DB: mark TaskSchedule.status -> RUNNING (if present)
TaskQueue->>TaskQueue: call target callable
alt success
TaskQueue->>DB: mark status -> COMPLETED, record result & timestamps
TaskQueue->>Callback: call hook with TaskResult (if configured)
else failure
TaskQueue->>DB: mark status -> FAILED, record error/traceback
TaskQueue->>Callback: call hook with TaskResult (if configured)
TaskQueue->>TaskQueue: re-raise exception
end
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 8
🧹 Nitpick comments (5)
sql/query.py (1)
26-27: Guard logic is sensible; consider a brief docstring.
isinstance(thread_id, (int, str)) and bool(thread_id)correctly excludesNone,0, and"". Note thatboolis a subclass ofint, soTrue/Falsethread_ids will be accepted as schedulable (True) or rejected (False); this should be fine since engines return integer thread IDs in practice. A one-line docstring would help future readers understand the intent (whystris accepted alongsideint).🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@sql/query.py` around lines 26 - 27, Add a one-line docstring to the _has_schedulable_thread_id function explaining its intent: accept integer thread IDs and non-empty string IDs, and reject falsy values (None, 0, empty string), and note that bool is a subclass of int so True/False will be treated accordingly; place the docstring immediately under the def _has_schedulable_thread_id(...).frontend/tests/e2e/system-settings.spec.ts (1)
58-131: Good coverage of the backend switch lifecycle.The test validates select → save → reload persistence and the conditional visibility/removal of Celery fields when toggling back to
django_q, and thefinallyblock restores the original configuration. A couple of small polish ideas:
- Consider asserting that the Celery fields are not visible before switching to
celery(sanity check of the conditional UI when the default backend isdjango_q).- The computed
brokerUrl/resultBackenduseDate.now().toString().slice(-2)for the port suffix — deterministic placeholder URLs (or a single fixed pair) would make failure diagnostics easier to read in CI logs.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@frontend/tests/e2e/system-settings.spec.ts` around lines 58 - 131, Add a pre-switch sanity assertion and make the Celery URLs deterministic: before calling taskBackendField.selectOption('celery') assert that the Celery inputs (getByTestId('settings-field-celery_broker_url'), '..._result_backend', '..._task_default_queue', '..._task_soft_time_limit', '..._task_time_limit') are not visible when originalTaskBackend is 'django_q', and replace the dynamic brokerUrl/resultBackend generation (the brokerUrl and resultBackend variables) with fixed predictable test values (e.g., constants) so CI logs are stable; update the cleanup in the finally block to continue using the original variables (originalBrokerUrl/originalResultBackend) as-is.sql/utils/tests.py (1)
589-620: Tests correctly updated toTaskSchedulesemantics.Lifecycle expectations (cancellation via
STATUS_CANCELLEDinstead of deletion, andtask_inforeturningNonefor missing names) align with the new helpers incommon/task_queue.pyandsql/utils/tasks.py. Consider adding a test that verifiestask_info("some_name")returns a non-Nonerecord for the existing schedule to complementtest_task_info_not_exists.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@sql/utils/tests.py` around lines 589 - 620, Add a positive assertion for task_info: update the TestTasks.test_task_info to assert that task_info("some_name") returns a non-None value (and optionally that returned object's name or status matches expectations) so the test complements test_task_info_not_exists; locate the test method test_task_info in class TestTasks and change the current call task_info("some_name") to an assertion like self.assertIsNotNone(task_info("some_name")) (or assert on attributes of the returned object) to validate the existing schedule created in setUp.sql/models.py (1)
1116-1122: Add a due-task lookup index before this table grows.Scheduler polling will likely filter scheduled rows by
statusandrun_at; without a composite index, completed history can turn this into a table scan.⚙️ Proposed model update
class Meta: managed = True db_table = "task_schedule" verbose_name = "Scheduled Task" verbose_name_plural = "Scheduled Task" + indexes = [ + models.Index(fields=["status", "run_at"], name="task_schedule_due_idx"), + ]Regenerate the migration after updating the model. As per coding guidelines,
**/migrations/*.py: Do not hand-write Django migrations for normal model changes; generate migrations usingmakemigrationsinstead.Also applies to: 1142-1146
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@sql/models.py` around lines 1116 - 1122, Add a composite DB index on the scheduler table for the fields status and run_at to optimize queries that filter by status and run_at (e.g., add models.Index(fields=['status','run_at'], name='idx_status_run_at') inside the model's Meta.indexes). Update the model where run_at and status are declared (references: run_at, status, STATUS_CHOICES, STATUS_SCHEDULED) and then regenerate the migration with ./manage.py makemigrations (do not hand-write the migration); repeat the same change for the second model occurrence noted around lines 1142-1146.archery/settings.py (1)
76-82: Consider validating Celery prerequisites whenTASK_BACKEND=celery.
CELERY_BROKER_URL/CELERY_RESULT_BACKENDdefault to empty strings. IfTASK_BACKEND(or the DB-leveltask_backendSysConfig) is set tocelerywithout a broker URL configured, workers andapply_asynccalls will fail with opaque errors far from this config site. Consider raisingImproperlyConfiguredat startup (similar to the WorkOS block) when the selected backend is Celery but required settings are missing — or at minimum document the env-var requirements.Also applies to: 275-283
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@archery/settings.py` around lines 76 - 82, If TASK_BACKEND is set to "celery" validate that CELERY_BROKER_URL and CELERY_RESULT_BACKEND are non-empty at startup and raise django.core.exceptions.ImproperlyConfigured if they are missing; update the settings initialization that defines TASK_BACKEND, CELERY_BROKER_URL, and CELERY_RESULT_BACKEND to perform this check (or mirror the WorkOS-style guard used elsewhere) so misconfiguration fails fast and clearly rather than producing downstream worker/apply_async errors.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@archery/celery.py`:
- Around line 15-17: The Celery app in archery.celery currently calls
app.autodiscover_tasks() which only finds conventional tasks.py and misses the
non-standard common/celery_tasks.py, causing common.execute_payload to be
unregistered until async_task() triggers a lazy import; fix by changing the
autodiscovery call to app.autodiscover_tasks(related_name="celery_tasks") so
Celery will discover modules named celery_tasks, or alternatively rename
common/celery_tasks.py to common/tasks.py or add an explicit import of
common.celery_tasks before calling app.autodiscover_tasks() to ensure
common.execute_payload is registered during worker initialization.
In `@common/celery_tasks.py`:
- Around line 3-6: The current try/except around "from archery.celery import
app" catches all Exceptions and can mask real import/configuration errors;
change the broad except to catch ImportError (or ModuleNotFoundError) instead
and set app = None only on import failure, or if you must keep a broader catch
then log the caught exception before falling back. Update the block around the
import of "app" from "archery.celery" to narrow the exception type and/or add a
process/logger call that includes the exception details so real errors are
visible at startup.
In `@common/task_queue.py`:
- Around line 388-417: schedule_payload currently calls task.apply_async (via
_celery_execute_task and result = task.apply_async(...)) before persisting the
TaskSchedule row, which can cause the worker to run the task before the DB row
exists or leave tasks untracked if the DB write fails; fix by reordering to
create/update the TaskSchedule inside a transaction.atomic() first (mirroring
DjangoQTaskBackend.schedule_payload), then call task.apply_async and update the
TaskSchedule with backend_job_id (result.id); ensure error handling updates
last_error/cleanup if apply_async fails so
_mark_schedule_running/_mark_schedule_completed can find the row.
- Around line 467-481: The current implementation of _encode_task_payload and
_decode_task_payload uses pickle (wrapped in base64) which allows arbitrary
object deserialization and creates an RCE vector when workers call
execute_payload; replace this by switching to a JSON-safe serializer for
payloads (e.g., JSON with explicit type tags) and add custom (de)serialization
handlers for ORM model objects passed via async_task so models are serialized as
stable references (model import path + PK or UUID) rather than full objects;
update _encode_task_payload to produce JSON-safe payloads and
_decode_task_payload/execute_payload to only accept that JSON format and
reconstruct models via a safe lookup function (e.g., load_model_reference) or
reject unknown types, or alternatively document and enforce the broker-trust
requirement so that any broker write access is prohibited.
In `@frontend/src/lib/system-settings.ts`:
- Around line 93-106: The Celery broker/result backend fields render secrets as
plain text; update the settings entries for keys 'celery_broker_url' and
'celery_result_backend' to use input: 'password' (instead of 'text') so the
values are masked in the form, preserving their label, placeholder, and showWhen
conditions to remain consistent with other credential fields.
In `@sql_api/api_settings.py`:
- Around line 375-383: The validate method should enforce that when both
celery_task_soft_time_limit and celery_task_time_limit are provided, the soft
limit is strictly less than the hard limit; update validate in api_settings.py
(the validate function shown) to check attrs.get("celery_task_soft_time_limit")
and attrs.get("celery_task_time_limit") and if both are not None and soft >=
hard raise serializers.ValidationError (attach the error to
celery_task_soft_time_limit or both fields) with a clear message like "soft time
limit must be less than hard time limit"; keep the existing celery_broker_url
check and return attrs when valid.
In `@sql_api/api_workflow.py`:
- Line 12: The queued payloads are sending callables and Django model instances
(e.g., notify_for_audit, WorkflowAudit instances, auditor.audit,
workflow_audit_detail, and request.user) directly via async_task, which leads to
stale/unsecure pickles; change all async_task call sites in
sql_api/api_workflow.py to pass primitive values only: replace callables with
their import path strings (e.g., "myapp.module.notify_for_audit") and replace
model instances with their PKs (ints) or other simple identifiers, then update
the worker/task handler to import the callable by path and re-fetch models from
the DB using the passed IDs before use. Locate usages around the referenced
symbols (notify_for_audit at ~1044–1049, the block around ~1596–1602 using
auditor.audit and workflow_audit_detail, and the ~1686–1696 use of request.user)
and adjust argument construction accordingly so async_task never receives raw
model or function objects.
In `@sql/binlog.py`:
- Line 13: Currently sql/binlog.py enqueues Django model instances (the Instance
object and request.user) and direct callables into the async_task payload (see
the args variable and the async_task(...) calls around my2sql_file), which risks
stale serialization; change the enqueuing to pass primitive identifiers instead:
replace passing the Instance instance with instance.id, replace request.user
with request.user.id, and replace callable objects with their import-path
strings (e.g., "package.module:function_name"); then modify my2sql_file to
accept these IDs and import-paths, reload the Instance and User via the ORM
using the IDs (Instance.objects.get(pk=instance_id),
User.objects.get(pk=user_id)), and dynamically import/resolve callables from
their import-paths before invoking them. Ensure all async_task(...) invocations
in the region previously covering lines 208-218 and 227-252 use only IDs and
import-path strings.
---
Nitpick comments:
In `@archery/settings.py`:
- Around line 76-82: If TASK_BACKEND is set to "celery" validate that
CELERY_BROKER_URL and CELERY_RESULT_BACKEND are non-empty at startup and raise
django.core.exceptions.ImproperlyConfigured if they are missing; update the
settings initialization that defines TASK_BACKEND, CELERY_BROKER_URL, and
CELERY_RESULT_BACKEND to perform this check (or mirror the WorkOS-style guard
used elsewhere) so misconfiguration fails fast and clearly rather than producing
downstream worker/apply_async errors.
In `@frontend/tests/e2e/system-settings.spec.ts`:
- Around line 58-131: Add a pre-switch sanity assertion and make the Celery URLs
deterministic: before calling taskBackendField.selectOption('celery') assert
that the Celery inputs (getByTestId('settings-field-celery_broker_url'),
'..._result_backend', '..._task_default_queue', '..._task_soft_time_limit',
'..._task_time_limit') are not visible when originalTaskBackend is 'django_q',
and replace the dynamic brokerUrl/resultBackend generation (the brokerUrl and
resultBackend variables) with fixed predictable test values (e.g., constants) so
CI logs are stable; update the cleanup in the finally block to continue using
the original variables (originalBrokerUrl/originalResultBackend) as-is.
In `@sql/models.py`:
- Around line 1116-1122: Add a composite DB index on the scheduler table for the
fields status and run_at to optimize queries that filter by status and run_at
(e.g., add models.Index(fields=['status','run_at'], name='idx_status_run_at')
inside the model's Meta.indexes). Update the model where run_at and status are
declared (references: run_at, status, STATUS_CHOICES, STATUS_SCHEDULED) and then
regenerate the migration with ./manage.py makemigrations (do not hand-write the
migration); repeat the same change for the second model occurrence noted around
lines 1142-1146.
In `@sql/query.py`:
- Around line 26-27: Add a one-line docstring to the _has_schedulable_thread_id
function explaining its intent: accept integer thread IDs and non-empty string
IDs, and reject falsy values (None, 0, empty string), and note that bool is a
subclass of int so True/False will be treated accordingly; place the docstring
immediately under the def _has_schedulable_thread_id(...).
In `@sql/utils/tests.py`:
- Around line 589-620: Add a positive assertion for task_info: update the
TestTasks.test_task_info to assert that task_info("some_name") returns a
non-None value (and optionally that returned object's name or status matches
expectations) so the test complements test_task_info_not_exists; locate the test
method test_task_info in class TestTasks and change the current call
task_info("some_name") to an assertion like
self.assertIsNotNone(task_info("some_name")) (or assert on attributes of the
returned object) to validate the existing schedule created in setUp.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 657d5717-962c-461e-a9a4-25d9891f38dd
📒 Files selected for processing (24)
archery/celery.pyarchery/settings.pycommon/celery_tasks.pycommon/task_queue.pycommon/tests.pyfrontend/src/lib/api.tsfrontend/src/lib/system-settings.tsfrontend/tests/e2e/system-settings.spec.tsrequirements.txtsql/archiver.pysql/binlog.pysql/migrations/0009_taskschedule.pysql/models.pysql/query.pysql/query_privileges.pysql/sql_workflow.pysql/utils/tasks.pysql/utils/tests.pysql_api/api_permission.pysql_api/api_query.pysql_api/api_settings.pysql_api/api_workflow.pysql_api/tests.pysql_api/views.py
|
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Actionable comments posted: 4
🧹 Nitpick comments (1)
sql_api/api_settings.py (1)
375-397: LGTM on cross-field validation.The past feedback on
celery_task_soft_time_limitvscelery_task_time_limithas been addressed, and using explicitis not Nonechecks (rather than truthy checks) correctly preserves the intent when either value is0.One optional hardening: consider also rejecting non-positive time limits (e.g., negative or zero) and empty-whitespace
celery_broker_urlvalues, sinceIntegerFieldhas nomin_valueandCharFieldallows blanks. Not blocking.🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@sql_api/api_settings.py` around lines 375 - 397, The validate method should also reject non-positive time limits and blank broker URLs: in validate (method name) add checks that when task_backend == "celery" the celery_broker_url is non-empty after stripping whitespace (use attrs.get("celery_broker_url") and .strip() to detect blank strings) and raise a serializers.ValidationError keyed to "celery_broker_url"; also ensure celery_task_soft_time_limit and celery_task_time_limit (attrs.get("celery_task_soft_time_limit") and attrs.get("celery_task_time_limit")) are positive integers (e.g., > 0) and still enforce soft < hard, raising ValidationError entries for "celery_task_soft_time_limit" and/or "celery_task_time_limit" with clear messages when they are non-positive or violate ordering.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@common/task_queue.py`:
- Around line 110-125: execute_payload currently imports and executes any
callable specified by callable_path from the decoded payload, allowing forged
broker messages to run arbitrary code; before calling _import_from_path and
invoking target in execute_payload (and the similar call sites around lines
587-590), validate the payload origins by either (A) enforcing an
allowlist/registry lookup (e.g., require callable_path to be present in a
REGISTERED_TASKS or TaskRegistry.lookup and only allow corresponding callables
to be executed) or (B) verify a cryptographic signature on the payload returned
by _decode_task_payload (HMAC or similar with a shared secret) and reject
unverified payloads; update execute_payload to perform that check and raise/log
and skip execution when validation fails, and apply the same guard to the other
_import_from_path call sites.
- Around line 503-560: The serializer treats any dict with only string keys as a
plain mapping, but that collides with internal marker dicts like
{"__task_type__": "custom"} which _deserialize_task_value then misinterprets;
update _serialize_task_value so that in the dict branch it checks for the
internal marker key "__task_type__" and, if present, encodes the dict using the
explicit dict encoding (the {"__task_type__": "dict", "items": [...] } form)
instead of returning it as a plain mapping; modify the condition in
_serialize_task_value (and keep existing behavior for non-string keys) so that
dicts containing "__task_type__" are escaped, leaving _deserialize_task_value
unchanged.
- Around line 139-165: The success callback (_run_callback) is currently invoked
inside the same try block as the target call so a callback exception is treated
as task failure; relocate the success-callback invocation so it runs only after
the target call completes successfully (outside the main try/except) and wrap
that callback invocation in its own try/except to log failures without changing
task success, using TaskResult and calling
_mark_schedule_completed(schedule_name) only on true task success; keep the
existing except branch to build the failure TaskResult, call
_mark_schedule_failed(schedule_name, str(exc)) there, invoke _run_callback for
the failure path (also wrapped in try/except and logged), and do not re-raise
callback exceptions from either callback invocation.
- Around line 685-688: The helper _celery_execute_task currently imports
execute_payload_task from common.celery_tasks directly which triggers a generic
ImportError when Celery isn't installed; update _celery_execute_task to first
call _celery_app() to force the explicit RuntimeError defined in _celery_app()
before attempting to import execute_payload_task from common.celery_tasks so
callers see the clear "Celery unavailable" error instead of an ambiguous
ImportError.
---
Nitpick comments:
In `@sql_api/api_settings.py`:
- Around line 375-397: The validate method should also reject non-positive time
limits and blank broker URLs: in validate (method name) add checks that when
task_backend == "celery" the celery_broker_url is non-empty after stripping
whitespace (use attrs.get("celery_broker_url") and .strip() to detect blank
strings) and raise a serializers.ValidationError keyed to "celery_broker_url";
also ensure celery_task_soft_time_limit and celery_task_time_limit
(attrs.get("celery_task_soft_time_limit") and
attrs.get("celery_task_time_limit")) are positive integers (e.g., > 0) and still
enforce soft < hard, raising ValidationError entries for
"celery_task_soft_time_limit" and/or "celery_task_time_limit" with clear
messages when they are non-positive or violate ordering.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 80e11432-0e67-48bb-947b-febceedf22b3
📒 Files selected for processing (13)
archery/celery.pyarchery/settings.pycommon/celery_tasks.pycommon/task_queue.pycommon/tests.pyfrontend/src/lib/system-settings.tsfrontend/tests/e2e/system-settings.spec.tssql/migrations/0010_taskschedule_idx_status_run_at.pysql/models.pysql/query.pysql/utils/tests.pysql_api/api_settings.pysql_api/tests.py
✅ Files skipped from review due to trivial changes (2)
- frontend/src/lib/system-settings.ts
- frontend/tests/e2e/system-settings.spec.ts
🚧 Files skipped from review as they are similar to previous changes (3)
- sql/query.py
- sql_api/tests.py
- archery/settings.py
Summary
Testing
python manage.py test common.tests sql_api.testspython manage.py test sql.utils.tests sql_api.tests sql.tests sql.test_archiverblack --check --diff .npx oxlint .npx eslint .npm run buildSummary by CodeRabbit
New Features
Database
Tests
Chores