Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ RUN dnf -y install dnf-plugins-core \
nodejs \
npm \
which \
tini \
&& dnf clean all \
&& rm -rf /var/cache /var/log/dnf* /var/log/yum.*

Expand Down Expand Up @@ -78,4 +79,4 @@ RUN uv sync

HEALTHCHECK CMD curl --fail http://127.0.0.1:5000/webhook_server/healthcheck || exit 1

ENTRYPOINT ["uv", "run", "entrypoint.py"]
ENTRYPOINT ["tini", "--", "uv", "run", "entrypoint.py"]
159 changes: 159 additions & 0 deletions webhook_server/tests/test_helpers.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import asyncio
import datetime
import logging
import os
import subprocess as sp
import sys
from unittest.mock import Mock, patch

Expand Down Expand Up @@ -346,3 +348,160 @@ def log(self, msg):
assert futures[0].logged == "success"
assert futures[1].logged == "fail"
# futures[2] raised exception so no log attribute set

@pytest.mark.asyncio
async def test_run_command_timeout_cleanup(self) -> None:
"""Test that subprocess is properly cleaned up on timeout."""
# Get initial zombie count
initial_zombies = 0
try:
proc = sp.run(["ps", "aux"], capture_output=True, text=True, timeout=5)
initial_zombies = proc.stdout.count("<defunct>")
except Exception:
pass

# Run command that times out
result = await run_command("sleep 100", log_prefix="[TEST]", timeout=1)
assert result[0] is False
assert "timed out" in result[2].lower()

# Wait for cleanup
await asyncio.sleep(0.2)

# Verify no new zombies created
try:
proc = sp.run(["ps", "aux"], capture_output=True, text=True, timeout=5)
final_zombies = proc.stdout.count("<defunct>")
assert final_zombies == initial_zombies, f"Zombie count increased from {initial_zombies} to {final_zombies}"
except Exception:
pass # ps not available, but timeout test still validates behavior

@pytest.mark.asyncio
async def test_run_command_cancelled_cleanup(self) -> None:
"""Test that subprocess is properly cleaned up when cancelled."""
# Create a task that runs a long command
task = asyncio.create_task(run_command("sleep 100", log_prefix="[TEST]"))

# Let it start, then cancel
await asyncio.sleep(0.1)
task.cancel()

# Verify CancelledError is raised
with pytest.raises(asyncio.CancelledError):
await task

# Give process time to be reaped
await asyncio.sleep(0.1)
# Verify no zombie processes (implicit - would cause issues if zombies exist)

@pytest.mark.asyncio
async def test_run_command_oserror_cleanup(self) -> None:
"""Test that subprocess is properly cleaned up on OSError."""
# Try to run nonexistent command
result = await run_command("totally_nonexistent_command_12345", log_prefix="[TEST]")
assert result[0] is False

# Give process time to be reaped
await asyncio.sleep(0.1)
# Verify no zombie processes (implicit verification)

@pytest.mark.asyncio
async def test_run_command_no_zombie_processes(self) -> None:
"""Test that multiple failed commands don't create zombie processes."""
# Get initial zombie count at the start
initial_zombies = 0
try:
proc = sp.run(["ps", "aux"], capture_output=True, text=True, timeout=5)
initial_zombies = proc.stdout.count("<defunct>")
except Exception:
pytest.skip("ps command not available")

# Run more iterations to trigger potential race conditions
tasks = [
run_command("sleep 10", log_prefix="[TEST]", timeout=0.5), # Timeout
run_command("sleep 10", log_prefix="[TEST]", timeout=0.5), # Timeout
run_command("sleep 10", log_prefix="[TEST]", timeout=0.5), # Timeout
run_command("nonexistent_cmd", log_prefix="[TEST]"), # OSError
run_command("nonexistent_cmd", log_prefix="[TEST]"), # OSError
run_command("false", log_prefix="[TEST]"), # Normal failure
run_command("false", log_prefix="[TEST]"), # Normal failure
]

results = await asyncio.gather(*tasks, return_exceptions=True)

# Verify all commands failed appropriately
for result in results:
if isinstance(result, tuple):
assert result[0] is False, "All test commands should fail"

# Wait longer for cleanup with multiple processes
await asyncio.sleep(0.5)

# Check zombie count hasn't increased
try:
proc = sp.run(["ps", "aux"], capture_output=True, text=True, timeout=5)
final_zombies = proc.stdout.count("<defunct>")
assert final_zombies == initial_zombies, (
f"Zombie processes created: {final_zombies - initial_zombies} "
f"(initial: {initial_zombies}, final: {final_zombies})"
)
except Exception:
# ps command failed, but test still validates no exceptions occurred
pass

@pytest.mark.asyncio
async def test_run_command_race_condition_cleanup(self) -> None:
"""Test that zombie is reaped even in race condition where returncode is set quickly."""
# Get initial zombie count
initial_zombies = 0
try:
proc = sp.run(["ps", "aux"], capture_output=True, text=True, timeout=5)
initial_zombies = proc.stdout.count("<defunct>")
except Exception:
pass

# Run multiple timeouts concurrently to trigger race conditions
tasks = [run_command("sleep 100", log_prefix="[TEST]", timeout=0.5) for _ in range(10)]

results = await asyncio.gather(*tasks, return_exceptions=True)

# All should timeout
for result in results:
if isinstance(result, tuple):
assert result[0] is False, "All commands should timeout"

# Wait for all cleanup
await asyncio.sleep(0.5)

# Verify no zombies created despite race conditions
try:
proc = sp.run(["ps", "aux"], capture_output=True, text=True, timeout=5)
final_zombies = proc.stdout.count("<defunct>")
assert final_zombies == initial_zombies, f"Zombie processes created: {final_zombies - initial_zombies}"
except Exception:
pass

@pytest.mark.asyncio
async def test_run_command_stdin_cleanup(self) -> None:
"""Test that subprocess is properly cleaned up when using stdin."""
# Use a command that processes stdin slowly - sleep after reading to simulate slow processing
# This ensures we can cancel during the communicate() phase
task = asyncio.create_task(
run_command(
f"{sys.executable} -c 'import sys, time; sys.stdin.read(); time.sleep(10)'",
log_prefix="[TEST]",
stdin_input="test data",
)
)

# Let it start and begin reading stdin, then cancel
await asyncio.sleep(0.1)
task.cancel()

# Verify CancelledError is raised
with pytest.raises(asyncio.CancelledError):
await task

# Give process time to be reaped
await asyncio.sleep(0.1)
# Verify no zombie processes (implicit - would cause issues if zombies exist)
45 changes: 44 additions & 1 deletion webhook_server/utils/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ async def run_command(
logger = get_logger_with_params()
out_decoded: str = ""
err_decoded: str = ""
sub_process = None # Initialize to None for finally block cleanup
# Don't override caller-provided pipes - use setdefault to respect provided kwargs
kwargs.setdefault("stdout", subprocess.PIPE)
kwargs.setdefault("stderr", subprocess.PIPE)
Expand Down Expand Up @@ -337,7 +338,7 @@ async def run_command(
logger.error(f"{log_prefix} Command '{logged_command}' timed out after {timeout}s")
try:
sub_process.kill()
await sub_process.wait()
# Cleanup handled by finally block
except Exception:
pass # Process may already be dead
return False, "", f"Command timed out after {timeout}s"
Expand Down Expand Up @@ -372,10 +373,52 @@ async def run_command(

except asyncio.CancelledError:
logger.debug(f"{log_prefix} Command '{logged_command}' cancelled")
# Re-raise after finally block cleanup (prevents zombies on cancellation)
raise
except (OSError, subprocess.SubprocessError, ValueError):
logger.exception(f"{log_prefix} Failed to run '{logged_command}' command")
return False, out_decoded, err_decoded
finally:
# CRITICAL RACE CONDITION FIX:
#
# Original Bug: Zombies created when checking `if returncode is None` before wait()
# Root Cause: Event loop's child watcher can set returncode AFTER check but BEFORE wait()
# Result: wait() skipped → zombie process never reaped
#
# Solution: ALWAYS call wait() regardless of returncode
# Why This Works:
# - wait() is idempotent - calling on already-reaped process is safe
# - wait() is the ONLY API that guarantees zombie reaping
# - Even if returncode is set, we MUST call wait() to cleanup OS resources
#
# Defense in Depth:
# - ALWAYS try to kill() (may fail if already dead - that's OK via ProcessLookupError)
# - ALWAYS call wait() (may raise ProcessLookupError if already reaped - that's OK)
# - Handle expected exceptions (ProcessLookupError = already reaped = success)
# - Re-raise CancelledError (don't suppress task cancellation)
# - Log critical failures (unexpected exceptions = potential zombie)

if sub_process:
# Always try to kill - don't check returncode (racy!)
try:
sub_process.kill()
except ProcessLookupError:
pass # Already dead - this is OK
except Exception:
logger.debug(f"{log_prefix} Exception while killing process")

# ALWAYS wait - this is the ONLY way to guarantee zombie reaping
try:
await sub_process.wait()
except ProcessLookupError:
# Process was already reaped (e.g., by event loop child watcher) - this is OK
pass
except asyncio.CancelledError:
# Don't suppress cancellation - cleanup is done, now propagate cancellation
raise
except Exception:
# Genuinely critical - wait() failed for unknown reason
logger.exception(f"{log_prefix} CRITICAL: Failed to wait for subprocess - potential zombie")


def get_apis_and_tokes_from_config(config: Config) -> list[tuple[github.Github, str]]:
Expand Down