Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion scraper.py
Original file line number Diff line number Diff line change
Expand Up @@ -1344,7 +1344,7 @@ async def fetch_policy_pdf_bytes(
task = asyncio.create_task(fetch_with_retry(client, url, response_type="bytes"))
pdf_cache[url] = task

result = await task
result = await asyncio.shield(task)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

1. Shield breaks semaphore bound 🐞 Bug ☼ Reliability

Because fetch_policy_pdf_bytes now awaits asyncio.shield(task), a per-certificate timeout
(asyncio.wait_for) cancels the waiter and releases pdf_semaphore while the underlying PDF fetch task
continues running. This can exceed PDF_FETCH_CONCURRENCY under rate limiting/slow fetches and
degrade stability via many concurrent/sleeping background fetch tasks.
Agent Prompt
## Issue description
`asyncio.shield(task)` prevents cancellation of the shared fetch task, but the current concurrency control (`pdf_semaphore`) is held by the *waiter* coroutine, not by the underlying fetch task. When a waiter is cancelled by `asyncio.wait_for(..., timeout=CERT_PROCESS_TIMEOUT)`, it releases `pdf_semaphore` while the shielded fetch keeps running, allowing more fetches to start and exceeding the intended PDF concurrency bound.

## Issue Context
- `fetch_certificate_algorithms()` uses `async with pdf_semaphore:` around `await fetch_policy_pdf_bytes(...)`.
- `process_certificate_record_with_timeout()` cancels work on timeout via `asyncio.wait_for`.
- `fetch_with_retry()` can sleep for long `Retry-After` values on 429, making it plausible for the waiter to time out while the fetch task continues.

## Fix Focus Areas
- scraper.py[1333-1348]
- scraper.py[1426-1492]

## Suggested fix approach
- Move semaphore acquisition into the cached fetch task itself, so the semaphore is held for the *lifetime of the real fetch*, independent of waiter cancellation.
  - Option A (preferred): pass `pdf_semaphore` into `fetch_policy_pdf_bytes()` and create the cached task as a wrapper coroutine:
    - `async def _run(): async with pdf_semaphore: return await fetch_with_retry(...)`
    - Cache `asyncio.create_task(_run())`
    - Remove the outer `async with pdf_semaphore:` around `fetch_policy_pdf_bytes()` to avoid double-limiting.
  - Ensure the cache/lock still guarantees one task per URL.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

return result if isinstance(result, bytes) else None, cache_hit


Expand Down
60 changes: 60 additions & 0 deletions test_scraper.py
Original file line number Diff line number Diff line change
Expand Up @@ -809,6 +809,65 @@ async def scenario():
print("✓ Policy PDF cache reuse test passed")


def test_fetch_policy_pdf_bytes_shields_shared_cache_task_from_cancellation():
"""Cancelling one waiter should not cancel the shared PDF cache fetch task."""
class FakeResponse:
status_code = 200
headers = {}
text = ""
content = b"%PDF-1.7 slow fixture"

def raise_for_status(self):
return None

class FakeClient:
def __init__(self):
self.calls = 0

async def get(self, url):
self.calls += 1
await asyncio.sleep(0.02)
return FakeResponse()

async def scenario():
client = FakeClient()
pdf_cache = {}
pdf_cache_lock = asyncio.Lock()
first_waiter = asyncio.create_task(
fetch_policy_pdf_bytes(
client,
"https://csrc.nist.gov/slow.pdf",
pdf_cache,
pdf_cache_lock,
)
)
await asyncio.sleep(0)
first_waiter.cancel()
try:
await first_waiter
except asyncio.CancelledError:
pass

cached_task = pdf_cache["https://csrc.nist.gov/slow.pdf"]
was_cancelled = cached_task.cancelled()
second_bytes, second_hit = await fetch_policy_pdf_bytes(
client,
"https://csrc.nist.gov/slow.pdf",
pdf_cache,
pdf_cache_lock,
)
return client.calls, was_cancelled, second_bytes, second_hit

calls, was_cancelled, second_bytes, second_hit = asyncio.run(scenario())

assert was_cancelled is False, "Shared PDF cache task should survive cancellation of one waiter"
assert calls == 1, "Second PDF waiter should reuse the original cache task"
assert second_bytes == b"%PDF-1.7 slow fixture", "Second waiter should receive bytes from the shared task"
assert second_hit is True, "Second waiter should report a cache hit"

print("✓ Policy PDF cache cancellation shielding test passed")


def test_process_certificate_record_applies_cached_algorithm_provenance():
"""Cached algorithm reuse should still attach explicit provenance to outputs."""
module = {
Expand Down Expand Up @@ -1360,6 +1419,7 @@ def main():
test_should_reuse_cached_algorithms()
test_algorithm_extraction_provenance_and_metrics()
test_fetch_policy_pdf_bytes_reuses_in_run_cache()
test_fetch_policy_pdf_bytes_shields_shared_cache_task_from_cancellation()
test_process_certificate_record_applies_cached_algorithm_provenance()
test_process_certificate_record_timeout_preserves_cached_data()
test_build_certificate_artifacts_bounds_active_tasks()
Expand Down