diff --git a/Dockerfile b/Dockerfile index 71a899d..edc9c58 100644 --- a/Dockerfile +++ b/Dockerfile @@ -9,7 +9,7 @@ RUN useradd -m -u 1000 codeassureuser WORKDIR /app # Install codeassure -RUN uv pip install --system --no-cache git+https://github.com/accuknox/codeassure-cli.git@v0.1.0 +RUN uv pip install --system --no-cache git+https://github.com/accuknox/codeassure-cli.git@v0.2.0 # Set ownership RUN chown -R codeassureuser:codeassureuser /app diff --git a/brev_docker_files/docker-compose.yml b/brev_docker_files/docker-compose.yml index fe614ca..3935e52 100644 --- a/brev_docker_files/docker-compose.yml +++ b/brev_docker_files/docker-compose.yml @@ -33,6 +33,10 @@ services: - qwen3_coder - --reasoning-parser - qwen3 + - --enable-prefix-caching + - --enable-chunked-prefill + - --max-num-batched-tokens + - "8192" - --trust-remote-code - --host - 0.0.0.0 diff --git a/codeassure.json b/codeassure.json index 7cb5062..c9d8555 100644 --- a/codeassure.json +++ b/codeassure.json @@ -1,11 +1,11 @@ { "model": { "provider": "openai", - "name": "qwen/qwen3.5-9b", - "api_base": "https://openrouter.ai/api", - "api_key": "$OPENROUTER_KEY" + "name": "qwen35-nvfp4", + "api_base": "http://localhost:5000", + "temperature": 0.1 }, - "concurrency": 2, + "concurrency": 7, "stage_timeout": 300, "finding_timeout": 600 } diff --git a/pyproject.toml b/pyproject.toml index 33316fe..e80b5e0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "codeassure" -version = "0.1.1" +version = "0.2.0" description = "AI-powered SAST finding verification" readme = "README.md" requires-python = ">=3.11" diff --git a/sast_verify/__init__.py b/sast_verify/__init__.py index 3dc1f76..d3ec452 100644 --- a/sast_verify/__init__.py +++ b/sast_verify/__init__.py @@ -1 +1 @@ -__version__ = "0.1.0" +__version__ = "0.2.0" diff --git a/sast_verify/agents/__init__.py b/sast_verify/agents/__init__.py index 19e51fc..844570a 100644 --- a/sast_verify/agents/__init__.py +++ b/sast_verify/agents/__init__.py @@ -1,4 +1,4 @@ -from .analyzer import build_analyzer, build_verdict_formatter +from .analyzer import build_analyzer from .deps import AnalyzerDeps from .runner import analyze_all from .tools import grep_code, read_file @@ -6,7 +6,6 @@ __all__ = [ "AnalyzerDeps", "build_analyzer", - "build_verdict_formatter", "analyze_all", "read_file", "grep_code", diff --git a/sast_verify/agents/analyzer.py b/sast_verify/agents/analyzer.py index f9fc3ae..f7be822 100644 --- a/sast_verify/agents/analyzer.py +++ b/sast_verify/agents/analyzer.py @@ -38,6 +38,13 @@ def build_verdict_formatter() -> Agent[None, str]: ) +def build_group_verdict_formatter() -> Agent[None, str]: + return Agent( + get_config().build_model(), + instructions=GROUP_VERDICT_FORMATTER_INSTRUCTION, + ) + + def build_group_analyzer() -> Agent[AnalyzerDeps, str]: cfg = get_config() if cfg.model.tool_calling: @@ -52,10 +59,3 @@ def build_group_analyzer() -> Agent[AnalyzerDeps, str]: deps_type=AnalyzerDeps, instructions=GROUP_ANALYZER_INSTRUCTION_NO_TOOLS, ) - - -def build_group_verdict_formatter() -> Agent[None, str]: - return Agent( - get_config().build_model(), - instructions=GROUP_VERDICT_FORMATTER_INSTRUCTION, - ) diff --git a/sast_verify/agents/runner.py b/sast_verify/agents/runner.py index de5063e..ffdad8f 100644 --- a/sast_verify/agents/runner.py +++ b/sast_verify/agents/runner.py @@ -4,6 +4,7 @@ import json import logging import os +import time from pathlib import Path import anthropic @@ -28,7 +29,7 @@ log = logging.getLogger(__name__) -DEFAULT_CONCURRENCY = 4 +DEFAULT_CONCURRENCY = 7 MAX_GREP_FILE_SIZE_DEFAULT = 512 * 1024 MAX_GREP_BYTES_DEFAULT = 5 * 1024 * 1024 @@ -273,6 +274,34 @@ def _validate_group_evidence( return validated +# --------------------------------------------------------------------------- +# Majority voting +# --------------------------------------------------------------------------- + +_CONFIDENCE_WEIGHT = {"high": 3, "medium": 2, "low": 1} + + +def _majority_verdict(verdicts: list[Verdict]) -> Verdict: + """Pick verdict with the most votes; break ties by total confidence weight.""" + from collections import Counter + counts: Counter = Counter(v.verdict for v in verdicts) + max_votes = max(counts.values()) + candidates = [label for label, n in counts.items() if n == max_votes] + + if len(candidates) == 1: + winner = candidates[0] + else: + weights: dict[str, int] = {} + for v in verdicts: + weights[v.verdict] = weights.get(v.verdict, 0) + _CONFIDENCE_WEIGHT.get(v.confidence, 1) + winner = max(candidates, key=lambda lbl: weights.get(lbl, 0)) + + winners = [v for v in verdicts if v.verdict == winner] + best = max(winners, key=lambda v: _CONFIDENCE_WEIGHT.get(v.confidence, 0)) + best.voting_tally = dict(counts) + return best + + # --------------------------------------------------------------------------- # Shared primitives # --------------------------------------------------------------------------- @@ -318,9 +347,8 @@ def _severity_rank(s: str) -> int: # Single-finding analysis # --------------------------------------------------------------------------- -async def _analyze_one( +async def _analyze_one_round( analyzer, - formatter, bundle: EvidenceBundle, codebase: Path, index: int, @@ -329,7 +357,9 @@ async def _analyze_one( grep_max_bytes: int = MAX_GREP_BYTES_DEFAULT, request_limit: int = 200, thinking_settings: dict | None = None, + formatter=None, ) -> Verdict: + """Single analysis pass for one finding. Returns a Verdict (possibly uncertain on failure).""" finding_dir = Path(bundle.finding.path).parent anchor_root = _compute_anchor_root(finding_dir) deps = _build_deps(codebase, finding_dir, anchor_root, grep_max_file_size, grep_max_bytes) @@ -338,21 +368,13 @@ async def _analyze_one( run_kwargs: dict = {"deps": deps, "usage_limits": limits} if thinking_settings: run_kwargs["model_settings"] = thinking_settings - formatter_kwargs: dict = {"model_settings": thinking_settings} - mode = "full" if not thinking_settings["extra_body"]["chat_template_kwargs"].get("low_effort") else "low" - if not thinking_settings["extra_body"]["chat_template_kwargs"]["enable_thinking"]: - mode = "off" - log.info("Finding %d [%s] → thinking=%s", index, bundle.finding.severity, mode) - else: - formatter_kwargs = {} - # Stage 1: Tool-using analysis try: - analysis_result = await asyncio.wait_for( + result = await asyncio.wait_for( _run_with_retry(analyzer, build_user_message(bundle), **run_kwargs), timeout=stage_timeout, ) - analysis = analysis_result.output + analysis = result.output except asyncio.TimeoutError: log.warning("Analyzer timed out for finding %d", index) return Verdict(verdict="uncertain", confidence="low", @@ -369,68 +391,22 @@ async def _analyze_one( accessed_paths = deps.accessed_paths - # Stage 2: Verdict extraction with validation-error repair loop - format_message = build_formatter_message(analysis, bundle) - format_result = None - + verdict = None try: - format_result = await asyncio.wait_for( - _run_with_retry(formatter, format_message, **formatter_kwargs), - timeout=stage_timeout, - ) - response = format_result.output - except asyncio.TimeoutError: - log.warning("Formatter timed out for finding %d", index) - response = "" + verdict = _parse_verdict(analysis) except Exception as exc: - log.error("Formatter failed for finding %d: %s", index, type(exc).__name__) - response = "" - - verdict = None - if response.strip(): - try: - verdict = _parse_verdict(response) - except Exception as exc: - log.warning("Formatter parse failed for finding %d: %s", index, exc) - - repair_message = ( - f"Your response could not be parsed: {exc}\n\n" - "Return ONLY a valid JSON object with these exact keys:\n" - '{"verdict": "true_positive|false_positive|uncertain", ' - '"is_security_vulnerability": true or false, ' - '"confidence": "high|medium|low", ' - '"severity": "critical|high|medium|low", ' - '"reason": "...", "evidence_locations": ["file:line"]}\n' - "No markdown fences, no prose." - ) + log.warning("Direct parse failed for finding %d: %s — trying formatter fallback", index, exc) + if formatter is not None: try: - repair_result = await asyncio.wait_for( - _run_with_retry( - formatter, - repair_message, - message_history=format_result.all_messages(), - **formatter_kwargs, - ), + fmt_result = await asyncio.wait_for( + _run_with_retry(formatter, build_formatter_message(analysis, bundle)), timeout=stage_timeout, ) - repair_response = repair_result.output - except Exception: - repair_response = "" - - if repair_response.strip(): - try: - verdict = _parse_verdict(repair_response) - except Exception as repair_exc: - log.warning("Formatter repair failed for finding %d: %s", index, repair_exc) - - if verdict is None: - try: - verdict = _parse_verdict(analysis) - except Exception: - pass + verdict = _parse_verdict(fmt_result.output) + except Exception as fmt_exc: + log.error("Formatter fallback also failed for finding %d: %s", index, fmt_exc) if verdict is None: - log.error("All parse attempts failed for finding %d", index) return Verdict(verdict="uncertain", confidence="low", reason="Could not extract a valid verdict from LLM output.") @@ -446,13 +422,57 @@ async def _analyze_one( return verdict +async def _analyze_one( + analyzer, + bundle: EvidenceBundle, + codebase: Path, + index: int, + stage_timeout: float = 500, + grep_max_file_size: int = MAX_GREP_FILE_SIZE_DEFAULT, + grep_max_bytes: int = MAX_GREP_BYTES_DEFAULT, + request_limit: int = 200, + thinking_settings: dict | None = None, + formatter=None, + voting_rounds: int = 1, +) -> Verdict: + if voting_rounds <= 1: + return await _analyze_one_round( + analyzer, bundle, codebase, index, + stage_timeout=stage_timeout, + grep_max_file_size=grep_max_file_size, + grep_max_bytes=grep_max_bytes, + request_limit=request_limit, + thinking_settings=thinking_settings, + formatter=formatter, + ) + + round_kwargs = dict( + stage_timeout=stage_timeout, + grep_max_file_size=grep_max_file_size, + grep_max_bytes=grep_max_bytes, + request_limit=request_limit, + thinking_settings=thinking_settings, + formatter=formatter, + ) + tasks = [ + _analyze_one_round(analyzer, bundle, codebase, index, **round_kwargs) + for _ in range(voting_rounds) + ] + results = await asyncio.gather(*tasks) + verdict = _majority_verdict(list(results)) + log.info( + "Finding %d voting (%d rounds): %s → %s", + index, voting_rounds, verdict.voting_tally, verdict.verdict, + ) + return verdict + + # --------------------------------------------------------------------------- # Group analysis # --------------------------------------------------------------------------- async def _analyze_one_group( analyzer, - formatter, group: FindingGroup, codebase: Path, stage_timeout: float = 500, @@ -460,6 +480,7 @@ async def _analyze_one_group( grep_max_bytes: int = MAX_GREP_BYTES_DEFAULT, request_limit: int = 200, thinking_settings: dict | None = None, + formatter=None, ) -> dict[int, Verdict]: """Analyze a co-located group. Returns dict[original_index → Verdict].""" finding_dir = Path(group.bundles[0].finding.path).parent @@ -468,10 +489,8 @@ async def _analyze_one_group( limits = UsageLimits(request_limit=request_limit) run_kwargs: dict = {"deps": deps, "usage_limits": limits} - formatter_kwargs: dict = {} if thinking_settings: run_kwargs["model_settings"] = thinking_settings - formatter_kwargs["model_settings"] = thinking_settings expected_keys = [str(i) for i in range(len(group.bundles))] @@ -479,15 +498,12 @@ def _uncertain_all(reason: str) -> dict[int, Verdict]: return {idx: Verdict(verdict="uncertain", confidence="low", reason=reason) for idx in group.original_indices} - log.info("Group %s (%d findings, %s)", group.group_key, len(group.bundles), group.relationship) - - # Stage 1: Tool-using analysis try: - analysis_result = await asyncio.wait_for( + result = await asyncio.wait_for( _run_with_retry(analyzer, build_group_message(group), **run_kwargs), timeout=stage_timeout, ) - analysis = analysis_result.output + analysis = result.output except asyncio.TimeoutError: log.warning("Group analyzer timed out for %s", group.group_key) return _uncertain_all(f"Analyzer stage timed out after {stage_timeout}s.") @@ -501,72 +517,22 @@ def _uncertain_all(reason: str) -> dict[int, Verdict]: accessed_paths = deps.accessed_paths - # Stage 2: Group verdict formatting - format_message = build_group_formatter_message(analysis, group) - format_result = None - + verdicts = None try: - format_result = await asyncio.wait_for( - _run_with_retry(formatter, format_message, **formatter_kwargs), - timeout=stage_timeout, - ) - response = format_result.output - except asyncio.TimeoutError: - log.warning("Group formatter timed out for %s", group.group_key) - response = "" + verdicts = _parse_group_verdicts(analysis, expected_keys) except Exception as exc: - log.error("Group formatter failed for %s: %s", group.group_key, type(exc).__name__) - response = "" - - verdicts: dict[str, Verdict] | None = None - if response.strip(): - try: - verdicts = _parse_group_verdicts(response, expected_keys) - except Exception as exc: - log.warning("Group verdict parse failed for %s: %s", group.group_key, exc) - - key_lines = "\n".join( - f' "{k}": {{"verdict": "true_positive|false_positive|uncertain", ' - f'"is_security_vulnerability": true, "confidence": "high|medium|low", ' - f'"severity": "critical|high|medium|low", ' - f'"reason": "...", "evidence_locations": []}}' - for k in expected_keys - ) - repair_msg = ( - f"Your response could not be parsed: {exc}\n\n" - "Return ONLY a JSON object:\n" - '{\n "verdicts": {\n' - + key_lines - + "\n }\n}\nNo markdown fences, no prose." - ) + log.warning("Direct group parse failed for %s: %s — trying formatter fallback", group.group_key, exc) + if formatter is not None: try: - history = format_result.all_messages() if format_result is not None else None - repair_kw = dict(formatter_kwargs) - if history: - repair_kw["message_history"] = history - repair_result = await asyncio.wait_for( - formatter.run(repair_msg, **repair_kw), + fmt_result = await asyncio.wait_for( + _run_with_retry(formatter, build_group_formatter_message(analysis, group)), timeout=stage_timeout, ) - repair_response = repair_result.output - except Exception: - repair_response = "" + verdicts = _parse_group_verdicts(fmt_result.output, expected_keys) + except Exception as fmt_exc: + log.error("Formatter fallback also failed for group %s: %s", group.group_key, fmt_exc) - if repair_response.strip(): - try: - verdicts = _parse_group_verdicts(repair_response, expected_keys) - except Exception as repair_exc: - log.warning("Group verdict repair failed for %s: %s", group.group_key, repair_exc) - - # Fallback: try parsing analyzer output directly if verdicts is None: - try: - verdicts = _parse_group_verdicts(analysis, expected_keys) - except Exception: - pass - - if verdicts is None: - log.error("All group parse attempts failed for %s", group.group_key) return _uncertain_all("Could not extract group verdicts from LLM output.") verdicts = _validate_group_evidence(group, verdicts, accessed_paths) @@ -658,31 +624,33 @@ async def analyze_all( grep_max_file_size = cfg.grep_max_file_kb * 1024 grep_max_bytes = cfg.grep_max_scan_mb * 1024 * 1024 request_limit = cfg.request_limit + voting_rounds = cfg.voting_rounds analyzer = build_analyzer() formatter = build_verdict_formatter() semaphore = asyncio.Semaphore(concurrency) total = len(bundles) - counter = [0] + done_counter = [0] async def _bounded(index: int, bundle: EvidenceBundle) -> Verdict: async with semaphore: - counter[0] += 1 thinking = cfg.get_thinking_settings(bundle.finding.severity) - log.info("Analysing %d/%d finding #%d", counter[0], total, index) + t0 = time.perf_counter() try: verdict = await asyncio.wait_for( _analyze_one( - analyzer, formatter, + analyzer, bundle, codebase, index, stage_timeout=stage_timeout, grep_max_file_size=grep_max_file_size, grep_max_bytes=grep_max_bytes, request_limit=request_limit, thinking_settings=thinking, + formatter=formatter, + voting_rounds=voting_rounds, ), - timeout=finding_timeout, + timeout=finding_timeout * voting_rounds, ) except asyncio.TimeoutError: log.error("Finding %d timed out after %ds", index, finding_timeout) @@ -693,6 +661,14 @@ async def _bounded(index: int, bundle: EvidenceBundle) -> Verdict: return Verdict(verdict="uncertain", confidence="low", reason=f"Analysis error: {type(exc).__name__}") + done_counter[0] += 1 + elapsed = time.perf_counter() - t0 + tally_str = f" votes={verdict.voting_tally}" if verdict.voting_tally else "" + print( + f"[{done_counter[0]}/{total}] Finding #{index} — {elapsed:.1f}s", + flush=True, + ) + if claude_verification: verdict_agrees, vuln_agrees, claude_reason = await _claude_validate(bundle, verdict) verdict.claude_verdict_agrees = verdict_agrees @@ -722,6 +698,7 @@ async def analyze_all_grouped( grep_max_file_size = cfg.grep_max_file_kb * 1024 grep_max_bytes = cfg.grep_max_scan_mb * 1024 * 1024 request_limit = cfg.request_limit + voting_rounds = cfg.voting_rounds solo_analyzer = build_analyzer() solo_formatter = build_verdict_formatter() @@ -730,28 +707,29 @@ async def analyze_all_grouped( semaphore = asyncio.Semaphore(concurrency) total = len(groups) - counter = [0] + done_counter = [0] async def _bounded_group(group: FindingGroup) -> dict[int, Verdict]: async with semaphore: - counter[0] += 1 if group.relationship == "solo": bundle = group.bundles[0] orig_idx = group.original_indices[0] thinking = cfg.get_thinking_settings(bundle.finding.severity) - log.info("Analysing %d/%d finding #%d", counter[0], total, orig_idx) + t0 = time.perf_counter() try: verdict = await asyncio.wait_for( _analyze_one( - solo_analyzer, solo_formatter, + solo_analyzer, bundle, codebase, orig_idx, stage_timeout=stage_timeout, grep_max_file_size=grep_max_file_size, grep_max_bytes=grep_max_bytes, request_limit=request_limit, thinking_settings=thinking, + formatter=solo_formatter, + voting_rounds=voting_rounds, ), - timeout=finding_timeout, + timeout=finding_timeout * voting_rounds, ) except asyncio.TimeoutError: log.error("Finding %d timed out after %ds", orig_idx, finding_timeout) @@ -762,6 +740,13 @@ async def _bounded_group(group: FindingGroup) -> dict[int, Verdict]: verdict = Verdict(verdict="uncertain", confidence="low", reason=f"Analysis error: {type(exc).__name__}") + done_counter[0] += 1 + tally_str = f" votes={verdict.voting_tally}" if verdict.voting_tally else "" + print( + f"[{done_counter[0]}/{total}] Finding #{orig_idx} — {time.perf_counter() - t0:.1f}s", + flush=True, + ) + if claude_verification: va, vua, cr = await _claude_validate(bundle, verdict) verdict.claude_verdict_agrees = va @@ -780,17 +765,18 @@ async def _bounded_group(group: FindingGroup) -> dict[int, Verdict]: ) thinking = cfg.get_thinking_settings(max_severity) timeout = finding_timeout + 60 * (len(group.bundles) - 1) - log.info("Analysing %d/%d group %s (%d findings)", counter[0], total, group.group_key, len(group.bundles)) + t0 = time.perf_counter() try: result = await asyncio.wait_for( _analyze_one_group( - group_analyzer, group_formatter, + group_analyzer, group, codebase, stage_timeout=stage_timeout, grep_max_file_size=grep_max_file_size, grep_max_bytes=grep_max_bytes, request_limit=request_limit, + formatter=group_formatter, thinking_settings=thinking, ), timeout=timeout, @@ -806,6 +792,15 @@ async def _bounded_group(group: FindingGroup) -> dict[int, Verdict]: reason=f"Group analysis error: {type(exc).__name__}") result = {idx: uncertain for idx in group.original_indices} + done_counter[0] += 1 + elapsed = time.perf_counter() - t0 + indices_str = ", ".join(f"#{i}" for i in group.original_indices) + print( + f"[{done_counter[0]}/{total}] Group [{indices_str}] — " + f"{len(group.bundles)} findings — {elapsed:.1f}s", + flush=True, + ) + if claude_verification: for i, orig_idx in enumerate(group.original_indices): if orig_idx in result: diff --git a/sast_verify/config.py b/sast_verify/config.py index a8f47d0..b7eb1a9 100644 --- a/sast_verify/config.py +++ b/sast_verify/config.py @@ -40,16 +40,18 @@ class ModelConfig(BaseModel): api_base: str | None = Field(default=None, description="API base URL (for self-hosted endpoints)") api_key: str | None = Field(default=None, description="API key (overrides env vars; supports $VAR_NAME syntax for env var references)") tool_calling: bool = Field(default=True, description="Set to false for models that don't support tool/function calling") + temperature: float | None = Field(default=0.1, description="Sampling temperature (0.0 = deterministic). Set null to use model default.") class Config(BaseModel): model: ModelConfig - concurrency: int = Field(default=4, ge=1) + concurrency: int = Field(default=7, ge=1) stage_timeout: int = Field(default=120, ge=10, description="Seconds per LLM stage (analyzer or formatter)") finding_timeout: int = Field(default=300, ge=30, description="Seconds for the entire finding (both stages + repair)") grep_max_file_kb: int = Field(default=512, ge=1, description="Skip files larger than this in grep (KB)") grep_max_scan_mb: int = Field(default=5, ge=1, description="Stop grep scanning after this many MB read") request_limit: int = Field(default=200, ge=1, description="Max requests per agent.run() call (reasoning models need more)") + voting_rounds: int = Field(default=1, ge=1, description="Run each finding N times and take majority verdict (3 recommended for non-deterministic local models)") thinking_map: dict[str, ThinkingMode] | None = Field( # default_factory=lambda: dict(_DEFAULT_THINKING_MAP), default=None, @@ -57,12 +59,20 @@ class Config(BaseModel): "Set to null/omit to disable (no extra_body sent).", ) + def base_model_settings(self) -> dict[str, Any] | None: + """Return base model_settings with temperature, or None if nothing to set.""" + if self.model.temperature is None: + return None + return {"temperature": self.model.temperature} + def get_thinking_settings(self, severity: str) -> dict[str, Any] | None: """Return model_settings dict for the given severity, or None if thinking control is disabled.""" + base = self.base_model_settings() or {} if self.thinking_map is None: - return None + return base or None mode = self.thinking_map.get(severity.upper(), "low") # default to low for unknown severities - return thinking_model_settings(mode) + thinking = thinking_model_settings(mode) + return {**base, **thinking} @property def litellm_model(self) -> str: diff --git a/sast_verify/pipeline.py b/sast_verify/pipeline.py index 4e5c4e0..140edd2 100644 --- a/sast_verify/pipeline.py +++ b/sast_verify/pipeline.py @@ -3,6 +3,8 @@ import asyncio import json import logging +import time +from concurrent.futures import ThreadPoolExecutor from pathlib import Path from .agents.runner import analyze_all, analyze_all_grouped @@ -33,8 +35,15 @@ def run( enable_grouping: bool = True, claude_verification: bool = False, ) -> None: + wall_start = time.perf_counter() + findings = preprocess(findings_path) - bundles = [retrieve(finding, codebase) for finding in findings] + t0 = time.perf_counter() + with ThreadPoolExecutor(max_workers=min(len(findings), concurrency * 2) if findings else 1) as pool: + bundles = list(pool.map(lambda f: retrieve(f, codebase), findings)) + retrieval_elapsed = time.perf_counter() - t0 + print(f"[timing] retrieval: {retrieval_elapsed:.1f}s for {len(findings)} finding(s)", flush=True) + # Only anchored findings go to the agent; unanchored → deterministic uncertain verdicts: list[Verdict] = [_no_anchor_verdict()] * len(bundles) to_analyze = [(i, b) for i, b in enumerate(bundles) if b.evidence] @@ -49,8 +58,10 @@ def run( if skipped: log.warning("%d finding(s) skipped (no anchored evidence)", skipped) + ai_elapsed = 0.0 if to_analyze: indices, analyzable = zip(*to_analyze) + t1 = time.perf_counter() if enable_grouping: groups = build_groups(list(analyzable), list(indices)) @@ -71,6 +82,15 @@ def run( for idx, verdict in zip(indices, llm_verdicts): verdicts[idx] = verdict + ai_elapsed = time.perf_counter() - t1 + n = len(to_analyze) + print( + f"[timing] AI analysis: {ai_elapsed:.1f}s total | " + f"{ai_elapsed / n:.1f}s avg per finding | " + f"{n} finding(s) | concurrency={concurrency}", + flush=True, + ) + raw = json.loads(findings_path.read_text(encoding="utf-8")) for result, verdict in zip(raw["results"], verdicts): verification: dict = { @@ -90,6 +110,12 @@ def run( } output_path.write_text(json.dumps(raw, indent=2)) + total_elapsed = time.perf_counter() - wall_start + print( + f"[timing] done — total wall time: {total_elapsed:.1f}s " + f"(retrieval {retrieval_elapsed:.1f}s + AI {ai_elapsed:.1f}s + output {total_elapsed - retrieval_elapsed - ai_elapsed:.1f}s)", + flush=True, + ) def verify( diff --git a/sast_verify/prompts/__init__.py b/sast_verify/prompts/__init__.py index f69be36..ff5b092 100644 --- a/sast_verify/prompts/__init__.py +++ b/sast_verify/prompts/__init__.py @@ -40,9 +40,9 @@ def build_user_message(bundle: EvidenceBundle) -> str: return "\n".join(parts) + def build_formatter_message(analysis: str, bundle: EvidenceBundle) -> str: f = bundle.finding - parts = [ "## Analysis Record", analysis, @@ -53,7 +53,19 @@ def build_formatter_message(analysis: str, bundle: EvidenceBundle) -> str: f"- **severity**: {f.severity}", f"- **claim**: {f.message}", ] + return "\n".join(parts) + +def build_group_formatter_message(analysis: str, group: "FindingGroup") -> str: + parts = ["## Analysis Record", analysis, "\n## Original Findings (cross-reference)"] + for i, bundle in enumerate(group.bundles): + f = bundle.finding + parts.append(f"\n### Finding {i}") + parts.append(f"- **check_id**: {f.check_id}") + parts.append(f"- **path**: {f.path}") + parts.append(f"- **lines**: {f.line}–{f.end_line}") + parts.append(f"- **severity**: {f.severity}") + parts.append(f"- **claim**: {f.message}") return "\n".join(parts) @@ -94,15 +106,3 @@ def build_group_message(group: "FindingGroup") -> str: return "\n".join(parts) -def build_group_formatter_message(analysis: str, group: "FindingGroup") -> str: - """Build the formatter input for a group: analysis + all original findings.""" - parts = ["## Analysis Record", analysis, "\n## Original Findings (cross-reference)"] - for i, bundle in enumerate(group.bundles): - f = bundle.finding - parts.append(f"\n### Finding {i}") - parts.append(f"- **check_id**: {f.check_id}") - parts.append(f"- **path**: {f.path}") - parts.append(f"- **lines**: {f.line}–{f.end_line}") - parts.append(f"- **severity**: {f.severity}") - parts.append(f"- **claim**: {f.message}") - return "\n".join(parts) diff --git a/sast_verify/prompts/analyzer.py b/sast_verify/prompts/analyzer.py index 042c9b7..2b7bcc0 100644 --- a/sast_verify/prompts/analyzer.py +++ b/sast_verify/prompts/analyzer.py @@ -58,30 +58,18 @@ class name — that pulls in unrelated code. ## Output format -After gathering sufficient evidence, respond with a structured analysis record -using these exact field labels: - -- **verdict_candidate**: true_positive | false_positive | uncertain -- **is_security_vulnerability**: true | false — Could an attacker exploit - this to cause harm (confidentiality, integrity, availability, or - privilege)? When in doubt, lean toward true. Answer false only when - there is no plausible attack scenario. -- **confidence**: high | medium | low -- **severity**: critical | high | medium | low — Assessed severity of the - vulnerability. If verdict_candidate is true_positive, assess based on - exploitability and potential impact (data loss, RCE, privilege escalation = - critical/high; limited-scope or hard-to-reach = medium/low). If - false_positive or uncertain, always use "low". -- **mitigations_found**: List any sanitizers, validators, or framework protections found (or "none") -- **assumptions**: List any assumptions you made during analysis (or "none") -- **unresolved_questions**: List anything you could not determine (or "none") -- **evidence_locations**: List the file:line references you examined -- **reasoning**: Why you reached this verdict - -Definitions: -- **true_positive** — the finding is correct given actual code context -- **false_positive** — the finding is incorrect given actual code context -- **uncertain** — not enough evidence to decide even after using tools +After gathering sufficient evidence, end your response with a JSON verdict +on its own line (no markdown fences): + +{"verdict": "true_positive|false_positive|uncertain", "is_security_vulnerability": true|false, "confidence": "high|medium|low", "severity": "critical|high|medium|low", "reason": "one or two sentence explanation", "evidence_locations": ["file:line"]} + +Field rules: +- **verdict**: true_positive = finding is correct; false_positive = finding is wrong; uncertain = insufficient evidence +- **is_security_vulnerability**: true if an attacker could exploit this; false only when no plausible attack scenario exists +- **confidence**: how certain you are of the verdict +- **severity**: for true_positive assess exploitability/impact; for false_positive or uncertain always use "low" +- **reason**: concise explanation covering verdict and security assessment +- **evidence_locations**: file:line references you examined """ @@ -131,20 +119,13 @@ class name — that pulls in unrelated code. ## Output format -For each finding, use the exact format: - -### Finding Analysis -- **verdict_candidate**: true_positive | false_positive | uncertain -- **is_security_vulnerability**: true | false -- **confidence**: high | medium | low -- **severity**: critical | high | medium | low — Assessed severity. If - true_positive, assess based on exploitability and impact. If false_positive - or uncertain, always use "low". -- **mitigations_found**: ... -- **assumptions**: ... -- **unresolved_questions**: ... -- **evidence_locations**: file:line references -- **reasoning**: why you reached this verdict +After analyzing all findings, end your response with a single JSON object +on its own line (no markdown fences): + +{"verdicts": {"0": {"verdict": "true_positive|false_positive|uncertain", "is_security_vulnerability": true|false, "confidence": "high|medium|low", "severity": "critical|high|medium|low", "reason": "...", "evidence_locations": ["file:line"]}, "1": {...}}} + +Keys must be the finding numbers as strings ("0", "1", ...). Include exactly one entry per finding. +For false_positive or uncertain verdicts, always set severity to "low". """ @@ -274,25 +255,16 @@ class name — that pulls in unrelated code. ## Output format -After analyzing the provided code, respond with a structured analysis record -using these exact field labels: - -- **verdict_candidate**: true_positive | false_positive | uncertain -- **is_security_vulnerability**: true | false — Could an attacker exploit - this to cause harm (confidentiality, integrity, availability, or - privilege)? When in doubt, lean toward true. Answer false only when - there is no plausible attack scenario. -- **confidence**: high | medium | low -- **mitigations_found**: List any sanitizers, validators, or framework protections found (or "none") -- **assumptions**: List any assumptions you made during analysis (or "none") -- **unresolved_questions**: List anything you could not determine (or "none") -- **evidence_locations**: List the file:line references you examined -- **reasoning**: Why you reached this verdict - -Definitions: -- **true_positive** — the finding is correct given actual code context -- **false_positive** — the finding is incorrect given actual code context -- **uncertain** — not enough evidence to decide from the provided code +After analyzing the provided code, end your response with a JSON verdict +on its own line (no markdown fences): + +{"verdict": "true_positive|false_positive|uncertain", "is_security_vulnerability": true|false, "confidence": "high|medium|low", "severity": "critical|high|medium|low", "reason": "one or two sentence explanation", "evidence_locations": ["file:line"]} + +Field rules: +- **verdict**: true_positive = finding is correct; false_positive = finding is wrong; uncertain = insufficient evidence +- **is_security_vulnerability**: true if an attacker could exploit this; false only when no plausible attack scenario exists +- **severity**: for true_positive assess exploitability/impact; for false_positive or uncertain always use "low" +- **reason**: concise explanation covering verdict and security assessment """ @@ -339,16 +311,12 @@ class name — that pulls in unrelated code. ## Output format -For each finding, use the exact format: - -### Finding Analysis -- **verdict_candidate**: true_positive | false_positive | uncertain -- **is_security_vulnerability**: true | false -- **confidence**: high | medium | low -- **mitigations_found**: ... -- **assumptions**: ... -- **unresolved_questions**: ... -- **evidence_locations**: file:line references -- **reasoning**: why you reached this verdict +After analyzing all findings, end your response with a single JSON object +on its own line (no markdown fences): + +{"verdicts": {"0": {"verdict": "true_positive|false_positive|uncertain", "is_security_vulnerability": true|false, "confidence": "high|medium|low", "severity": "critical|high|medium|low", "reason": "...", "evidence_locations": ["file:line"]}, "1": {...}}} + +Keys must be the finding numbers as strings ("0", "1", ...). Include exactly one entry per finding. +For false_positive or uncertain verdicts, always set severity to "low". """ diff --git a/sast_verify/schema.py b/sast_verify/schema.py index 267056a..699a662 100644 --- a/sast_verify/schema.py +++ b/sast_verify/schema.py @@ -59,6 +59,10 @@ class Verdict(BaseModel): default=[], description="file:line references that support the verdict", ) + voting_tally: dict[str, int] | None = Field( + default=None, + description="Vote counts per verdict label when voting_rounds > 1 (e.g. {\"false_positive\": 2, \"true_positive\": 1})", + ) claude_verdict_agrees: bool | None = Field( default=None, description="Whether Claude agrees with the verdict (true_positive/false_positive/uncertain)",