From 2a204d10a6611e2027a7286201c00721ec500f6c Mon Sep 17 00:00:00 2001 From: Vinicius Brasil Date: Wed, 18 Mar 2026 14:39:11 -0300 Subject: [PATCH 1/3] Fix lock_store crash when redis package is not installed `REDIS_URL` being set was enough to trigger a Redis lock, which would raise `ImportError` if the `redis` package wasn't available. Added `_redis_available()` to guard on both the env var and the import. --- lib/crewai/src/crewai/utilities/lock_store.py | 18 ++- lib/crewai/tests/utilities/test_lock_store.py | 129 ++++++++++++++++++ 2 files changed, 144 insertions(+), 3 deletions(-) create mode 100644 lib/crewai/tests/utilities/test_lock_store.py diff --git a/lib/crewai/src/crewai/utilities/lock_store.py b/lib/crewai/src/crewai/utilities/lock_store.py index b2ac4d81cb..363448d8d2 100644 --- a/lib/crewai/src/crewai/utilities/lock_store.py +++ b/lib/crewai/src/crewai/utilities/lock_store.py @@ -1,7 +1,7 @@ """Centralised lock factory. -If ``REDIS_URL`` is set, locks are distributed via ``portalocker.RedisLock``. Otherwise, falls -back to the standard ``portalocker.Lock``. +If ``REDIS_URL`` is set and the ``redis`` package is installed, locks are distributed via +``portalocker.RedisLock``. Otherwise, falls back to the standard ``portalocker.Lock``. """ from __future__ import annotations @@ -30,6 +30,18 @@ _DEFAULT_TIMEOUT: Final[int] = 120 +def _redis_available() -> bool: + """Return True if redis is installed and REDIS_URL is set.""" + if not _REDIS_URL: + return False + try: + import redis # noqa: F401 + + return True + except ImportError: + return False + + @lru_cache(maxsize=1) def _redis_connection() -> redis.Redis: """Return a cached Redis connection, creating one on first call.""" @@ -51,7 +63,7 @@ def lock(name: str, *, timeout: float = _DEFAULT_TIMEOUT) -> Iterator[None]: """ channel = f"crewai:{md5(name.encode(), usedforsecurity=False).hexdigest()}" - if _REDIS_URL: + if _redis_available(): with portalocker.RedisLock( channel=channel, connection=_redis_connection(), diff --git a/lib/crewai/tests/utilities/test_lock_store.py b/lib/crewai/tests/utilities/test_lock_store.py new file mode 100644 index 0000000000..1ddafb6587 --- /dev/null +++ b/lib/crewai/tests/utilities/test_lock_store.py @@ -0,0 +1,129 @@ +"""Tests for lock_store.""" + +from __future__ import annotations + +import threading +from unittest import mock + +import portalocker.exceptions +import pytest + +import crewai.utilities.lock_store as lock_store +from crewai.utilities.lock_store import lock + + +# --------------------------------------------------------------------------- +# _redis_available +# --------------------------------------------------------------------------- + + +def test_redis_available_no_url(): + with mock.patch.object(lock_store, "_REDIS_URL", None): + assert lock_store._redis_available() is False + + +def test_redis_available_url_but_no_package(): + with mock.patch.object(lock_store, "_REDIS_URL", "redis://localhost:6379"): + with mock.patch("builtins.__import__", side_effect=ImportError): + assert lock_store._redis_available() is False + + +def test_redis_available_url_and_package(): + with mock.patch.object(lock_store, "_REDIS_URL", "redis://localhost:6379"): + with mock.patch.dict("sys.modules", {"redis": mock.MagicMock()}): + assert lock_store._redis_available() is True + + +# --------------------------------------------------------------------------- +# file-based lock (no Redis) +# --------------------------------------------------------------------------- + + +def test_lock_acquires_and_releases(): + with mock.patch.object(lock_store, "_redis_available", return_value=False): + entered = False + with lock("test_basic"): + entered = True + assert entered + + +def test_lock_releases_on_exception(): + with mock.patch.object(lock_store, "_redis_available", return_value=False): + with pytest.raises(ValueError): + with lock("test_exception"): + raise ValueError("boom") + + # lock should be released — acquiring again must succeed + with lock("test_exception"): + pass + + +def test_lock_mutual_exclusion_across_threads(): + with mock.patch.object(lock_store, "_redis_available", return_value=False): + results: list[int] = [] + barrier = threading.Barrier(2) + + def worker(val: int) -> None: + barrier.wait() + with lock("test_mutex", timeout=5): + results.append(val) + + threads = [threading.Thread(target=worker, args=(i,)) for i in range(2)] + for t in threads: + t.start() + for t in threads: + t.join() + + assert sorted(results) == [0, 1] + + +def test_lock_timeout_raises(): + with mock.patch.object(lock_store, "_redis_available", return_value=False): + acquired = threading.Event() + hold = threading.Event() + + def holder() -> None: + with lock("test_timeout", timeout=5): + acquired.set() + hold.wait() + + t = threading.Thread(target=holder) + t.start() + acquired.wait() + + try: + with pytest.raises(portalocker.exceptions.LockException): + with lock("test_timeout", timeout=0.1): + pass + finally: + hold.set() + t.join() + + +def test_lock_namespaced_separately(): + """Two different lock names must not block each other.""" + with mock.patch.object(lock_store, "_redis_available", return_value=False): + with lock("lock_a"): + with lock("lock_b"): + pass + + +# --------------------------------------------------------------------------- +# Redis path (mocked) +# --------------------------------------------------------------------------- + + +def test_lock_uses_redis_when_available(): + fake_redis_lock = mock.MagicMock() + fake_redis_lock.__enter__ = mock.Mock(return_value=None) + fake_redis_lock.__exit__ = mock.Mock(return_value=False) + + with mock.patch.object(lock_store, "_redis_available", return_value=True): + with mock.patch.object(lock_store, "_redis_connection", return_value=mock.MagicMock()): + with mock.patch("portalocker.RedisLock", return_value=fake_redis_lock) as mock_rl: + with lock("test_redis"): + pass + + mock_rl.assert_called_once() + _, kwargs = mock_rl.call_args + assert kwargs["channel"].startswith("crewai:") From e514449841147bac0250ff281c6dba12bc98d131 Mon Sep 17 00:00:00 2001 From: Vinicius Brasil Date: Wed, 18 Mar 2026 14:51:46 -0300 Subject: [PATCH 2/3] Simplify tests --- lib/crewai/tests/utilities/test_lock_store.py | 172 +++++++++--------- 1 file changed, 91 insertions(+), 81 deletions(-) diff --git a/lib/crewai/tests/utilities/test_lock_store.py b/lib/crewai/tests/utilities/test_lock_store.py index 1ddafb6587..e50b45eba4 100644 --- a/lib/crewai/tests/utilities/test_lock_store.py +++ b/lib/crewai/tests/utilities/test_lock_store.py @@ -2,7 +2,9 @@ from __future__ import annotations +import sys import threading +import time from unittest import mock import portalocker.exceptions @@ -12,118 +14,126 @@ from crewai.utilities.lock_store import lock +@pytest.fixture(autouse=True) +def no_redis_url(monkeypatch): + """Unset REDIS_URL for every test so file-based locking is used by default.""" + monkeypatch.setattr(lock_store, "_REDIS_URL", None) + + # --------------------------------------------------------------------------- # _redis_available # --------------------------------------------------------------------------- -def test_redis_available_no_url(): - with mock.patch.object(lock_store, "_REDIS_URL", None): - assert lock_store._redis_available() is False +def test_redis_not_available_without_url(): + assert lock_store._redis_available() is False -def test_redis_available_url_but_no_package(): - with mock.patch.object(lock_store, "_REDIS_URL", "redis://localhost:6379"): - with mock.patch("builtins.__import__", side_effect=ImportError): - assert lock_store._redis_available() is False +def test_redis_not_available_when_package_missing(monkeypatch): + monkeypatch.setattr(lock_store, "_REDIS_URL", "redis://localhost:6379") + # Setting a key to None in sys.modules causes ImportError on import + monkeypatch.setitem(sys.modules, "redis", None) + assert lock_store._redis_available() is False -def test_redis_available_url_and_package(): - with mock.patch.object(lock_store, "_REDIS_URL", "redis://localhost:6379"): - with mock.patch.dict("sys.modules", {"redis": mock.MagicMock()}): - assert lock_store._redis_available() is True +def test_redis_available_with_url_and_package(monkeypatch): + monkeypatch.setattr(lock_store, "_REDIS_URL", "redis://localhost:6379") + monkeypatch.setitem(sys.modules, "redis", mock.MagicMock()) + assert lock_store._redis_available() is True # --------------------------------------------------------------------------- -# file-based lock (no Redis) +# file-based lock # --------------------------------------------------------------------------- -def test_lock_acquires_and_releases(): - with mock.patch.object(lock_store, "_redis_available", return_value=False): - entered = False - with lock("test_basic"): - entered = True - assert entered +def test_lock_yields(): + with lock("basic"): + pass def test_lock_releases_on_exception(): - with mock.patch.object(lock_store, "_redis_available", return_value=False): - with pytest.raises(ValueError): - with lock("test_exception"): - raise ValueError("boom") - - # lock should be released — acquiring again must succeed - with lock("test_exception"): - pass - + with pytest.raises(ValueError): + with lock("on_exception"): + raise ValueError("boom") + + # would hang or raise LockException if the lock was not released + with lock("on_exception"): + pass + + +def test_lock_is_mutually_exclusive_across_threads(): + concurrent_holders = 0 + max_concurrent = 0 + counter_lock = threading.Lock() + barrier = threading.Barrier(5) + + def worker(): + nonlocal concurrent_holders, max_concurrent + barrier.wait() # all threads compete at the same time + with lock("mutex", timeout=10): + with counter_lock: + concurrent_holders += 1 + max_concurrent = max(max_concurrent, concurrent_holders) + time.sleep(0.01) # hold briefly so overlap is detectable if locking fails + with counter_lock: + concurrent_holders -= 1 + + threads = [threading.Thread(target=worker) for _ in range(5)] + for t in threads: + t.start() + for t in threads: + t.join(timeout=10) + assert all(not t.is_alive() for t in threads), "threads did not finish in time" -def test_lock_mutual_exclusion_across_threads(): - with mock.patch.object(lock_store, "_redis_available", return_value=False): - results: list[int] = [] - barrier = threading.Barrier(2) + assert max_concurrent == 1 - def worker(val: int) -> None: - barrier.wait() - with lock("test_mutex", timeout=5): - results.append(val) - threads = [threading.Thread(target=worker, args=(i,)) for i in range(2)] - for t in threads: - t.start() - for t in threads: - t.join() +def test_lock_timeout_raises_when_held(): + acquired = threading.Event() + release = threading.Event() - assert sorted(results) == [0, 1] + def holder(): + with lock("timeout_test", timeout=10): + acquired.set() + release.wait() + t = threading.Thread(target=holder) + t.start() + acquired.wait() -def test_lock_timeout_raises(): - with mock.patch.object(lock_store, "_redis_available", return_value=False): - acquired = threading.Event() - hold = threading.Event() + try: + with pytest.raises(portalocker.exceptions.LockException): + with lock("timeout_test", timeout=0.1): + pass + finally: + release.set() + t.join(timeout=10) + assert not t.is_alive(), "holder thread did not finish in time" - def holder() -> None: - with lock("test_timeout", timeout=5): - acquired.set() - hold.wait() - t = threading.Thread(target=holder) - t.start() - acquired.wait() - - try: - with pytest.raises(portalocker.exceptions.LockException): - with lock("test_timeout", timeout=0.1): - pass - finally: - hold.set() - t.join() - - -def test_lock_namespaced_separately(): - """Two different lock names must not block each other.""" - with mock.patch.object(lock_store, "_redis_available", return_value=False): - with lock("lock_a"): - with lock("lock_b"): - pass +def test_different_names_are_independent(): + with lock("alpha"): + with lock("beta"): + pass # would deadlock if names mapped to the same lock # --------------------------------------------------------------------------- -# Redis path (mocked) +# Redis path # --------------------------------------------------------------------------- -def test_lock_uses_redis_when_available(): - fake_redis_lock = mock.MagicMock() - fake_redis_lock.__enter__ = mock.Mock(return_value=None) - fake_redis_lock.__exit__ = mock.Mock(return_value=False) +def test_redis_lock_used_when_available(monkeypatch): + fake_conn = mock.MagicMock() + monkeypatch.setattr(lock_store, "_REDIS_URL", "redis://localhost:6379") + monkeypatch.setattr(lock_store, "_redis_available", mock.Mock(return_value=True)) + monkeypatch.setattr(lock_store, "_redis_connection", mock.Mock(return_value=fake_conn)) - with mock.patch.object(lock_store, "_redis_available", return_value=True): - with mock.patch.object(lock_store, "_redis_connection", return_value=mock.MagicMock()): - with mock.patch("portalocker.RedisLock", return_value=fake_redis_lock) as mock_rl: - with lock("test_redis"): - pass + with mock.patch("portalocker.RedisLock") as mock_redis_lock: + with lock("redis_test"): + pass - mock_rl.assert_called_once() - _, kwargs = mock_rl.call_args - assert kwargs["channel"].startswith("crewai:") + mock_redis_lock.assert_called_once() + kwargs = mock_redis_lock.call_args.kwargs + assert kwargs["channel"].startswith("crewai:") + assert kwargs["connection"] is fake_conn From 5aae2392e48279f00b56bbd82f0cd72fa66bc6a7 Mon Sep 17 00:00:00 2001 From: Vinicius Brasil Date: Wed, 18 Mar 2026 14:54:26 -0300 Subject: [PATCH 3/3] Simplify tests #2 --- lib/crewai/tests/utilities/test_lock_store.py | 97 +++---------------- 1 file changed, 14 insertions(+), 83 deletions(-) diff --git a/lib/crewai/tests/utilities/test_lock_store.py b/lib/crewai/tests/utilities/test_lock_store.py index e50b45eba4..8e0e6babc4 100644 --- a/lib/crewai/tests/utilities/test_lock_store.py +++ b/lib/crewai/tests/utilities/test_lock_store.py @@ -1,13 +1,14 @@ -"""Tests for lock_store.""" +"""Tests for lock_store. + +We verify our own logic: the _redis_available guard and which portalocker +backend is selected. We trust portalocker to handle actual locking mechanics. +""" from __future__ import annotations import sys -import threading -import time from unittest import mock -import portalocker.exceptions import pytest import crewai.utilities.lock_store as lock_store @@ -16,7 +17,6 @@ @pytest.fixture(autouse=True) def no_redis_url(monkeypatch): - """Unset REDIS_URL for every test so file-based locking is used by default.""" monkeypatch.setattr(lock_store, "_REDIS_URL", None) @@ -31,8 +31,7 @@ def test_redis_not_available_without_url(): def test_redis_not_available_when_package_missing(monkeypatch): monkeypatch.setattr(lock_store, "_REDIS_URL", "redis://localhost:6379") - # Setting a key to None in sys.modules causes ImportError on import - monkeypatch.setitem(sys.modules, "redis", None) + monkeypatch.setitem(sys.modules, "redis", None) # None → ImportError on import assert lock_store._redis_available() is False @@ -43,89 +42,21 @@ def test_redis_available_with_url_and_package(monkeypatch): # --------------------------------------------------------------------------- -# file-based lock +# lock strategy selection # --------------------------------------------------------------------------- -def test_lock_yields(): - with lock("basic"): - pass - - -def test_lock_releases_on_exception(): - with pytest.raises(ValueError): - with lock("on_exception"): - raise ValueError("boom") - - # would hang or raise LockException if the lock was not released - with lock("on_exception"): - pass - - -def test_lock_is_mutually_exclusive_across_threads(): - concurrent_holders = 0 - max_concurrent = 0 - counter_lock = threading.Lock() - barrier = threading.Barrier(5) - - def worker(): - nonlocal concurrent_holders, max_concurrent - barrier.wait() # all threads compete at the same time - with lock("mutex", timeout=10): - with counter_lock: - concurrent_holders += 1 - max_concurrent = max(max_concurrent, concurrent_holders) - time.sleep(0.01) # hold briefly so overlap is detectable if locking fails - with counter_lock: - concurrent_holders -= 1 - - threads = [threading.Thread(target=worker) for _ in range(5)] - for t in threads: - t.start() - for t in threads: - t.join(timeout=10) - assert all(not t.is_alive() for t in threads), "threads did not finish in time" - - assert max_concurrent == 1 - - -def test_lock_timeout_raises_when_held(): - acquired = threading.Event() - release = threading.Event() - - def holder(): - with lock("timeout_test", timeout=10): - acquired.set() - release.wait() - - t = threading.Thread(target=holder) - t.start() - acquired.wait() - - try: - with pytest.raises(portalocker.exceptions.LockException): - with lock("timeout_test", timeout=0.1): - pass - finally: - release.set() - t.join(timeout=10) - assert not t.is_alive(), "holder thread did not finish in time" - - -def test_different_names_are_independent(): - with lock("alpha"): - with lock("beta"): - pass # would deadlock if names mapped to the same lock +def test_uses_file_lock_when_redis_unavailable(): + with mock.patch("portalocker.Lock") as mock_lock: + with lock("file_test"): + pass + mock_lock.assert_called_once() + assert "crewai:" in mock_lock.call_args.args[0] -# --------------------------------------------------------------------------- -# Redis path -# --------------------------------------------------------------------------- - -def test_redis_lock_used_when_available(monkeypatch): +def test_uses_redis_lock_when_redis_available(monkeypatch): fake_conn = mock.MagicMock() - monkeypatch.setattr(lock_store, "_REDIS_URL", "redis://localhost:6379") monkeypatch.setattr(lock_store, "_redis_available", mock.Mock(return_value=True)) monkeypatch.setattr(lock_store, "_redis_connection", mock.Mock(return_value=fake_conn))