From 92e09f7ca68e1b081c72c0b5a0690dc937f25a28 Mon Sep 17 00:00:00 2001 From: Florent Jaby Date: Tue, 17 Feb 2026 20:29:02 +0100 Subject: [PATCH 1/6] Add _debounce parameter to enqueue_job with job_id validation --- arq/connections.py | 8 ++++++++ tests/test_main.py | 6 ++++++ 2 files changed, 14 insertions(+) diff --git a/arq/connections.py b/arq/connections.py index c5dd0fc4..b718e88f 100644 --- a/arq/connections.py +++ b/arq/connections.py @@ -126,6 +126,8 @@ async def enqueue_job( _defer_by: Union[None, int, float, timedelta] = None, _expires: Union[None, int, float, timedelta] = None, _job_try: Optional[int] = None, + _debounce: bool = False, + _debounce_max: Union[None, int, float, timedelta] = None, **kwargs: Any, ) -> Optional[Job]: """ @@ -140,9 +142,15 @@ async def enqueue_job( :param _expires: do not start or retry a job after this duration; defaults to 24 hours plus deferring time, if any :param _job_try: useful when re-enqueueing jobs within a job + :param _debounce: if True and a queued job with the same ID exists, update its defer time + instead of returning None + :param _debounce_max: maximum total time from the original enqueue time before debouncing + stops and the job is allowed to run :param kwargs: any keyword arguments to pass to the function :return: :class:`arq.jobs.Job` instance or ``None`` if a job with this ID already exists """ + if _debounce and not _job_id: + raise RuntimeError("'_debounce' requires '_job_id' to be set") if _queue_name is None: _queue_name = self.default_queue_name job_id = _job_id or uuid4().hex diff --git a/tests/test_main.py b/tests/test_main.py index 96f6beac..10b7aa0c 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -283,6 +283,12 @@ async def test_get_jobs(arq_redis: ArqRedis): assert isinstance(jobs[2], JobDef) +async def test_debounce_requires_job_id(arq_redis: ArqRedis): + # when + with pytest.raises(RuntimeError, match="'_debounce' requires '_job_id'"): + await arq_redis.enqueue_job('foobar', _debounce=True) + + async def test_enqueue_multiple(arq_redis: ArqRedis, caplog): caplog.set_level(logging.DEBUG) results = await asyncio.gather(*[arq_redis.enqueue_job('foobar', i, _job_id='testing') for i in range(10)]) From 4d3e4e5da0af09bca8b45554c5dd536e27d1131b Mon Sep 17 00:00:00 2001 From: Florent Jaby Date: Tue, 17 Feb 2026 20:33:09 +0100 Subject: [PATCH 2/6] Debounce overwrites queued job instead of returning None When _debounce=True and job key exists without a result, the job data and queue score are overwritten, allowing the defer time to be reset. --- arq/connections.py | 5 ++++- tests/test_main.py | 15 +++++++++++++++ 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/arq/connections.py b/arq/connections.py index b718e88f..6c1b4ed8 100644 --- a/arq/connections.py +++ b/arq/connections.py @@ -163,7 +163,10 @@ async def enqueue_job( async with self.pipeline(transaction=True) as pipe: await pipe.watch(job_key) - if await pipe.exists(job_key, result_key_prefix + job_id): + job_exists = await pipe.exists(job_key) + result_exists = await pipe.exists(result_key_prefix + job_id) + + if (job_exists or result_exists) and not (_debounce and job_exists and not result_exists): await pipe.reset() return None diff --git a/tests/test_main.py b/tests/test_main.py index 10b7aa0c..56dee4e7 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -289,6 +289,21 @@ async def test_debounce_requires_job_id(arq_redis: ArqRedis): await arq_redis.enqueue_job('foobar', _debounce=True) +async def test_debounce_updates_defer_time(arq_redis: ArqRedis): + # given: a job already enqueued with a defer + j1 = await arq_redis.enqueue_job('foobar', _job_id='debounce_id', _defer_by=5) + assert isinstance(j1, Job) + score1 = await arq_redis.zscore(default_queue_name, 'debounce_id') + + # when: we enqueue the same job with debounce and a new defer + j2 = await arq_redis.enqueue_job('foobar', _job_id='debounce_id', _debounce=True, _defer_by=10) + + # then: the job is returned (not None) and the score is updated + assert isinstance(j2, Job) + score2 = await arq_redis.zscore(default_queue_name, 'debounce_id') + assert score2 > score1 + + async def test_enqueue_multiple(arq_redis: ArqRedis, caplog): caplog.set_level(logging.DEBUG) results = await asyncio.gather(*[arq_redis.enqueue_job('foobar', i, _job_id='testing') for i in range(10)]) From 08a76dc11610811c3f4e88c5d1cf01fe2ca64ec7 Mon Sep 17 00:00:00 2001 From: Florent Jaby Date: Tue, 17 Feb 2026 20:35:22 +0100 Subject: [PATCH 3/6] Preserve original enqueue_time when debouncing a job Read existing job data to extract the original enqueue timestamp and use it when serializing the updated job. --- arq/connections.py | 8 ++++++-- tests/test_main.py | 17 +++++++++++++++++ 2 files changed, 23 insertions(+), 2 deletions(-) diff --git a/arq/connections.py b/arq/connections.py index 6c1b4ed8..efce5c92 100644 --- a/arq/connections.py +++ b/arq/connections.py @@ -14,7 +14,7 @@ from redis.exceptions import RedisError, WatchError from .constants import default_queue_name, expires_extra_ms, job_key_prefix, result_key_prefix -from .jobs import Deserializer, Job, JobDef, JobResult, Serializer, deserialize_job, serialize_job +from .jobs import Deserializer, Job, JobDef, JobResult, Serializer, deserialize_job, deserialize_job_raw, serialize_job from .utils import timestamp_ms, to_ms, to_unix_ms logger = logging.getLogger('arq.connections') @@ -170,7 +170,11 @@ async def enqueue_job( await pipe.reset() return None - enqueue_time_ms = timestamp_ms() + if _debounce and job_exists: + existing_job_data = await pipe.get(job_key) + _, _, _, _, enqueue_time_ms = deserialize_job_raw(existing_job_data, deserializer=self.job_deserializer) + else: + enqueue_time_ms = timestamp_ms() if _defer_until is not None: score = to_unix_ms(_defer_until) elif defer_by_ms: diff --git a/tests/test_main.py b/tests/test_main.py index 56dee4e7..72b90ed3 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -304,6 +304,23 @@ async def test_debounce_updates_defer_time(arq_redis: ArqRedis): assert score2 > score1 +async def test_debounce_preserves_enqueue_time(arq_redis: ArqRedis): + # given: a job enqueued + j1 = await arq_redis.enqueue_job('foobar', _job_id='debounce_id', _defer_by=5) + assert isinstance(j1, Job) + info1 = await j1.info() + + await asyncio.sleep(0.05) + + # when: debounced + j2 = await arq_redis.enqueue_job('foobar', _job_id='debounce_id', _debounce=True, _defer_by=10) + assert isinstance(j2, Job) + info2 = await j2.info() + + # then: enqueue_time is preserved from the original job + assert info2.enqueue_time == info1.enqueue_time + + async def test_enqueue_multiple(arq_redis: ArqRedis, caplog): caplog.set_level(logging.DEBUG) results = await asyncio.gather(*[arq_redis.enqueue_job('foobar', i, _job_id='testing') for i in range(10)]) From 79628720b28d529ffe18f8394db44b46f606956c Mon Sep 17 00:00:00 2001 From: Florent Jaby Date: Tue, 17 Feb 2026 20:37:06 +0100 Subject: [PATCH 4/6] Add _debounce_max to cap the total debounce window When the time since original enqueue exceeds _debounce_max, the debounce is refused and the existing job is left to run. --- arq/connections.py | 4 ++++ tests/test_main.py | 13 +++++++++++++ 2 files changed, 17 insertions(+) diff --git a/arq/connections.py b/arq/connections.py index efce5c92..e71b10d4 100644 --- a/arq/connections.py +++ b/arq/connections.py @@ -160,6 +160,7 @@ async def enqueue_job( defer_by_ms = to_ms(_defer_by) expires_ms = to_ms(_expires) + debounce_max_ms = to_ms(_debounce_max) async with self.pipeline(transaction=True) as pipe: await pipe.watch(job_key) @@ -173,6 +174,9 @@ async def enqueue_job( if _debounce and job_exists: existing_job_data = await pipe.get(job_key) _, _, _, _, enqueue_time_ms = deserialize_job_raw(existing_job_data, deserializer=self.job_deserializer) + if debounce_max_ms is not None and timestamp_ms() - enqueue_time_ms >= debounce_max_ms: + await pipe.reset() + return None else: enqueue_time_ms = timestamp_ms() if _defer_until is not None: diff --git a/tests/test_main.py b/tests/test_main.py index 72b90ed3..039faf75 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -321,6 +321,19 @@ async def test_debounce_preserves_enqueue_time(arq_redis: ArqRedis): assert info2.enqueue_time == info1.enqueue_time +async def test_debounce_max_stops_debouncing(arq_redis: ArqRedis): + # given: a job enqueued with a very short debounce_max + j1 = await arq_redis.enqueue_job('foobar', _job_id='debounce_id', _defer_by=5) + assert isinstance(j1, Job) + + # when: we wait longer than debounce_max and try to debounce + await asyncio.sleep(0.1) + j2 = await arq_redis.enqueue_job('foobar', _job_id='debounce_id', _debounce=True, _defer_by=10, _debounce_max=0.05) + + # then: debounce is refused, returns None (let existing job run) + assert j2 is None + + async def test_enqueue_multiple(arq_redis: ArqRedis, caplog): caplog.set_level(logging.DEBUG) results = await asyncio.gather(*[arq_redis.enqueue_job('foobar', i, _job_id='testing') for i in range(10)]) From 5c30f2edcccaf1f17439be45d885988946a0fc76 Mon Sep 17 00:00:00 2001 From: Florent Jaby Date: Tue, 17 Feb 2026 20:39:49 +0100 Subject: [PATCH 5/6] Skip debounce for in-progress jobs Check the in_progress key prefix before allowing debounce to ensure we don't overwrite a job that a worker is currently executing. --- arq/connections.py | 7 +++++-- tests/test_main.py | 17 +++++++++++++++++ 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/arq/connections.py b/arq/connections.py index e71b10d4..e5a3b47d 100644 --- a/arq/connections.py +++ b/arq/connections.py @@ -13,7 +13,7 @@ from redis.asyncio.sentinel import Sentinel from redis.exceptions import RedisError, WatchError -from .constants import default_queue_name, expires_extra_ms, job_key_prefix, result_key_prefix +from .constants import default_queue_name, expires_extra_ms, in_progress_key_prefix, job_key_prefix, result_key_prefix from .jobs import Deserializer, Job, JobDef, JobResult, Serializer, deserialize_job, deserialize_job_raw, serialize_job from .utils import timestamp_ms, to_ms, to_unix_ms @@ -166,8 +166,11 @@ async def enqueue_job( await pipe.watch(job_key) job_exists = await pipe.exists(job_key) result_exists = await pipe.exists(result_key_prefix + job_id) + in_progress = await pipe.exists(in_progress_key_prefix + job_id) if _debounce else False - if (job_exists or result_exists) and not (_debounce and job_exists and not result_exists): + if (job_exists or result_exists) and not ( + _debounce and job_exists and not result_exists and not in_progress + ): await pipe.reset() return None diff --git a/tests/test_main.py b/tests/test_main.py index 039faf75..b6ccf8c1 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -334,6 +334,23 @@ async def test_debounce_max_stops_debouncing(arq_redis: ArqRedis): assert j2 is None +async def test_debounce_does_not_touch_in_progress_job(arq_redis: ArqRedis): + # given: a job that is in progress (has in_progress key) + from arq.constants import in_progress_key_prefix + + await arq_redis.enqueue_job('foobar', _job_id='debounce_id', _defer_by=5) + score_before = await arq_redis.zscore(default_queue_name, 'debounce_id') + await arq_redis.set(in_progress_key_prefix + 'debounce_id', b'1') + + # when: we try to debounce + j2 = await arq_redis.enqueue_job('foobar', _job_id='debounce_id', _debounce=True, _defer_by=10) + + # then: returns None, job score is unchanged + assert j2 is None + score_after = await arq_redis.zscore(default_queue_name, 'debounce_id') + assert score_after == score_before + + async def test_enqueue_multiple(arq_redis: ArqRedis, caplog): caplog.set_level(logging.DEBUG) results = await asyncio.gather(*[arq_redis.enqueue_job('foobar', i, _job_id='testing') for i in range(10)]) From d5fe748b4a65dd6a1ed130c95c9b0aca3b3c6ebf Mon Sep 17 00:00:00 2001 From: Florent Jaby Date: Tue, 17 Feb 2026 20:43:58 +0100 Subject: [PATCH 6/6] Fix defer time to be relative to now, not original enqueue time Refactor score computation to use current timestamp (now_ms) for _defer_by calculations and expiry, while preserving the original enqueue_time_ms only in the serialized job data. --- arq/connections.py | 18 +++++++++--------- tests/test_main.py | 8 +++++--- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/arq/connections.py b/arq/connections.py index e5a3b47d..250e3d36 100644 --- a/arq/connections.py +++ b/arq/connections.py @@ -167,29 +167,29 @@ async def enqueue_job( job_exists = await pipe.exists(job_key) result_exists = await pipe.exists(result_key_prefix + job_id) in_progress = await pipe.exists(in_progress_key_prefix + job_id) if _debounce else False + can_debounce = _debounce and job_exists and not result_exists and not in_progress - if (job_exists or result_exists) and not ( - _debounce and job_exists and not result_exists and not in_progress - ): + if (job_exists or result_exists) and not can_debounce: await pipe.reset() return None - if _debounce and job_exists: + now_ms = timestamp_ms() + if can_debounce: existing_job_data = await pipe.get(job_key) _, _, _, _, enqueue_time_ms = deserialize_job_raw(existing_job_data, deserializer=self.job_deserializer) - if debounce_max_ms is not None and timestamp_ms() - enqueue_time_ms >= debounce_max_ms: + if debounce_max_ms is not None and now_ms - enqueue_time_ms >= debounce_max_ms: await pipe.reset() return None else: - enqueue_time_ms = timestamp_ms() + enqueue_time_ms = now_ms if _defer_until is not None: score = to_unix_ms(_defer_until) elif defer_by_ms: - score = enqueue_time_ms + defer_by_ms + score = now_ms + defer_by_ms else: - score = enqueue_time_ms + score = now_ms - expires_ms = expires_ms or score - enqueue_time_ms + self.expires_extra_ms + expires_ms = expires_ms or score - now_ms + self.expires_extra_ms job = serialize_job(function, args, kwargs, _job_try, enqueue_time_ms, serializer=self.job_serializer) pipe.multi() diff --git a/tests/test_main.py b/tests/test_main.py index b6ccf8c1..6c1d5d45 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -295,10 +295,12 @@ async def test_debounce_updates_defer_time(arq_redis: ArqRedis): assert isinstance(j1, Job) score1 = await arq_redis.zscore(default_queue_name, 'debounce_id') - # when: we enqueue the same job with debounce and a new defer - j2 = await arq_redis.enqueue_job('foobar', _job_id='debounce_id', _debounce=True, _defer_by=10) + await asyncio.sleep(0.05) + + # when: we enqueue the same job with debounce and the same defer + j2 = await arq_redis.enqueue_job('foobar', _job_id='debounce_id', _debounce=True, _defer_by=5) - # then: the job is returned (not None) and the score is updated + # then: the job is returned (not None) and the score is updated (deferred from now) assert isinstance(j2, Job) score2 = await arq_redis.zscore(default_queue_name, 'debounce_id') assert score2 > score1