diff --git a/scripts/check_doc_frontmatter.py b/scripts/check_doc_frontmatter.py index 40a4d59b..98cf87df 100755 --- a/scripts/check_doc_frontmatter.py +++ b/scripts/check_doc_frontmatter.py @@ -463,7 +463,8 @@ def _agent_rule_optional_frontmatter_lines(draft: _AgentRuleFrontmatterDraft) -> @require(lambda path: isinstance(path, Path), "Path must be Path object") @ensure(lambda result: isinstance(result, str), "Must return string") def _format_agent_rules_suggested_frontmatter(path: Path, canonical_id: str, draft: _AgentRuleFrontmatterDraft) -> str: - title_guess = path.stem.replace("-", " ").title().replace('"', '\\"') + slug_for_title = canonical_id.removeprefix("agent-rules-") + title_guess = slug_for_title.replace("-", " ").title().replace('"', '\\"') optional_lines = _agent_rule_optional_frontmatter_lines(draft) return f"""--- layout: {_yaml_plain_or_quoted_scalar(draft.layout_val)} diff --git a/scripts/verify_safe_project_writes.py b/scripts/verify_safe_project_writes.py index 4689b752..6177da81 100644 --- a/scripts/verify_safe_project_writes.py +++ b/scripts/verify_safe_project_writes.py @@ -6,6 +6,7 @@ import ast import sys from pathlib import Path +from typing import Any from beartype import beartype from icontract import ensure, require @@ -57,23 +58,237 @@ def _json_bindings(tree: ast.AST) -> tuple[dict[str, str], frozenset[str]]: return func_aliases, frozenset(module_locals) +def _add_shadows_from_target( + target: ast.AST, + func_aliases: dict[str, str], + module_locals: frozenset[str], + shadow_func: set[str], + shadow_mod: set[str], +) -> None: + if isinstance(target, ast.Name): + if target.id in func_aliases: + shadow_func.add(target.id) + if target.id in module_locals: + shadow_mod.add(target.id) + elif isinstance(target, (ast.Tuple, ast.List)): + for elt in target.elts: + _add_shadows_from_target(elt, func_aliases, module_locals, shadow_func, shadow_mod) + + def _collect_json_io_offenders(tree: ast.AST) -> list[tuple[int, str]]: func_aliases, module_locals = _json_bindings(tree) offenders: list[tuple[int, str]] = [] - for node in ast.walk(tree): - if not isinstance(node, ast.Call): - continue - func = node.func - if isinstance(func, ast.Name) and func.id in func_aliases: - offenders.append((node.lineno, func_aliases[func.id])) - continue - if ( - isinstance(func, ast.Attribute) - and isinstance(func.value, ast.Name) - and func.value.id in module_locals - and func.attr in _JSON_IO_NAMES - ): - offenders.append((node.lineno, f"json.{func.attr}")) + + class _Visitor(ast.NodeVisitor): + def __init__(self) -> None: + self.shadow_func: set[str] = set() + self.shadow_mod: set[str] = set() + + def _push_scope(self) -> tuple[set[str], set[str]]: + return (set(self.shadow_func), set(self.shadow_mod)) + + def _pop_scope(self, saved: tuple[set[str], set[str]]) -> None: + self.shadow_func, self.shadow_mod = saved + + def _shadow_arguments(self, args: ast.arguments) -> None: + for a in list(args.posonlyargs) + list(args.args) + list(args.kwonlyargs): + _add_shadows_from_target( + ast.Name(id=a.arg, ctx=ast.Store()), + func_aliases, + module_locals, + self.shadow_func, + self.shadow_mod, + ) + if args.vararg: + _add_shadows_from_target( + ast.Name(id=args.vararg.arg, ctx=ast.Store()), + func_aliases, + module_locals, + self.shadow_func, + self.shadow_mod, + ) + if args.kwarg: + _add_shadows_from_target( + ast.Name(id=args.kwarg.arg, ctx=ast.Store()), + func_aliases, + module_locals, + self.shadow_func, + self.shadow_mod, + ) + + def _visit_function_defaults_before_scope(self, args: ast.arguments) -> None: + """Defaults and decorators run in the enclosing scope; visit before parameter shadows apply.""" + pos_and_regular = list(args.posonlyargs) + list(args.args) + defaults = list(args.defaults) + if defaults: + for _param, default in zip(pos_and_regular[-len(defaults) :], defaults, strict=True): + self.visit(default) + for _kw_arg, default in zip(args.kwonlyargs, args.kw_defaults, strict=True): + if default is not None: + self.visit(default) + + def _visit_function_body(self, node: ast.FunctionDef | ast.AsyncFunctionDef) -> None: + for d in node.decorator_list: + self.visit(d) + self._visit_function_defaults_before_scope(node.args) + saved = self._push_scope() + self._shadow_arguments(node.args) + for child in node.body: + self.visit(child) + self._pop_scope(saved) + + def visit_FunctionDef(self, node: ast.FunctionDef) -> Any: + self._visit_function_body(node) + return None + + def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef) -> Any: + self._visit_function_body(node) + return None + + def visit_ClassDef(self, node: ast.ClassDef) -> Any: + for d in node.decorator_list: + self.visit(d) + for b in node.bases: + self.visit(b) + for k in node.keywords: + self.visit(k.value) + saved = self._push_scope() + for child in node.body: + self.visit(child) + self._pop_scope(saved) + return None + + def visit_Assign(self, node: ast.Assign) -> Any: + self.visit(node.value) + for t in node.targets: + self.visit(t) + _add_shadows_from_target(t, func_aliases, module_locals, self.shadow_func, self.shadow_mod) + return None + + def visit_AnnAssign(self, node: ast.AnnAssign) -> Any: + self.visit(node.annotation) + if node.value is not None: + self.visit(node.value) + if node.target is not None: + self.visit(node.target) + _add_shadows_from_target(node.target, func_aliases, module_locals, self.shadow_func, self.shadow_mod) + return None + + def visit_AugAssign(self, node: ast.AugAssign) -> Any: + self.visit(node.value) + self.visit(node.target) + _add_shadows_from_target(node.target, func_aliases, module_locals, self.shadow_func, self.shadow_mod) + return None + + def visit_NamedExpr(self, node: ast.NamedExpr) -> Any: + self.visit(node.value) + self.visit(node.target) + _add_shadows_from_target(node.target, func_aliases, module_locals, self.shadow_func, self.shadow_mod) + return None + + def visit_For(self, node: ast.For) -> Any: + self.visit(node.iter) + self.visit(node.target) + _add_shadows_from_target(node.target, func_aliases, module_locals, self.shadow_func, self.shadow_mod) + for stmt in node.body: + self.visit(stmt) + for stmt in node.orelse: + self.visit(stmt) + return None + + def visit_AsyncFor(self, node: ast.AsyncFor) -> Any: + return self.visit_For(node) # type: ignore[arg-type] + + def visit_With(self, node: ast.With) -> Any: + for item in node.items: + self.visit(item.context_expr) + if item.optional_vars is not None: + self.visit(item.optional_vars) + _add_shadows_from_target( + item.optional_vars, + func_aliases, + module_locals, + self.shadow_func, + self.shadow_mod, + ) + for stmt in node.body: + self.visit(stmt) + return None + + def visit_AsyncWith(self, node: ast.AsyncWith) -> Any: + return self.visit_With(node) # type: ignore[arg-type] + + def visit_ExceptHandler(self, node: ast.ExceptHandler) -> Any: + if node.type is not None: + self.visit(node.type) + if node.name: + _add_shadows_from_target( + ast.Name(id=node.name, ctx=ast.Store()), + func_aliases, + module_locals, + self.shadow_func, + self.shadow_mod, + ) + for stmt in node.body: + self.visit(stmt) + return None + + def _visit_comprehensions_then_elt( + self, + generators: list[ast.comprehension], + visit_elt: Any | None = None, + ) -> None: + saved = self._push_scope() + for gen in generators: + self.visit(gen.iter) + self.visit(gen.target) + _add_shadows_from_target(gen.target, func_aliases, module_locals, self.shadow_func, self.shadow_mod) + for if_clause in gen.ifs: + self.visit(if_clause) + if visit_elt is not None: + visit_elt() + self._pop_scope(saved) + + def visit_ListComp(self, node: ast.ListComp) -> Any: + self._visit_comprehensions_then_elt(node.generators, lambda: self.visit(node.elt)) + return None + + def visit_SetComp(self, node: ast.SetComp) -> Any: + self._visit_comprehensions_then_elt(node.generators, lambda: self.visit(node.elt)) + return None + + def visit_GeneratorExp(self, node: ast.GeneratorExp) -> Any: + self._visit_comprehensions_then_elt(node.generators, lambda: self.visit(node.elt)) + return None + + def visit_DictComp(self, node: ast.DictComp) -> Any: + def visit_kv() -> None: + self.visit(node.key) + self.visit(node.value) + + self._visit_comprehensions_then_elt(node.generators, visit_kv) + return None + + def visit_Call(self, node: ast.Call) -> Any: + func = node.func + if isinstance(func, ast.Name) and func.id in func_aliases and func.id not in self.shadow_func: + offenders.append((node.lineno, func_aliases[func.id])) + elif ( + isinstance(func, ast.Attribute) + and isinstance(func.value, ast.Name) + and func.value.id in module_locals + and func.value.id not in self.shadow_mod + and func.attr in _JSON_IO_NAMES + ): + offenders.append((node.lineno, f"json.{func.attr}")) + self.visit(func) + for arg in node.args: + self.visit(arg) + for kw in node.keywords: + self.visit(kw.value) + return None + + _Visitor().visit(tree) return offenders @@ -84,7 +299,12 @@ def main() -> int: if not IDE_SETUP.is_file(): _write_stderr(f"Expected ide_setup at {IDE_SETUP}") return 2 - tree = ast.parse(IDE_SETUP.read_text(encoding="utf-8"), filename=str(IDE_SETUP)) + try: + source = IDE_SETUP.read_text(encoding="utf-8") + tree = ast.parse(source, filename=str(IDE_SETUP)) + except (OSError, UnicodeDecodeError, SyntaxError) as exc: + _write_stderr(f"Unable to analyze {IDE_SETUP}: {exc}") + return 1 offenders = _collect_json_io_offenders(tree) if offenders: lines = ", ".join(f"line {ln} ({name})" for ln, name in offenders) diff --git a/src/specfact_cli/adapters/ado.py b/src/specfact_cli/adapters/ado.py index c6dd5732..f11496da 100644 --- a/src/specfact_cli/adapters/ado.py +++ b/src/specfact_cli/adapters/ado.py @@ -707,26 +707,6 @@ def _configure_api_token(self, api_token: str | None) -> None: self.api_token = None self.auth_scheme = None - @staticmethod - def _work_item_id_from_source_tracking(source_tracking: Any, target_repo: str) -> Any: - if isinstance(source_tracking, dict): - return _as_str_dict(source_tracking).get("source_id") - if not isinstance(source_tracking, list): - return None - for entry in source_tracking: - if not isinstance(entry, dict): - continue - ed = _as_str_dict(entry) - entry_repo = ed.get("source_repo") - if entry_repo == target_repo: - return ed.get("source_id") - if entry_repo: - continue - source_url = ed.get("source_url", "") - if source_url and target_repo in source_url: - return ed.get("source_id") - return None - def _ado_create_patch_document(self, title: str, body: str, ado_state: str) -> list[dict[str, Any]]: return [ {"op": "add", "path": "/fields/System.Title", "value": title}, @@ -764,7 +744,7 @@ def _merge_created_work_item_source_tracking( }, } source_tracking = proposal_data.get("source_tracking") - if not source_tracking: + if source_tracking is None: proposal_data["source_tracking"] = tracking_update return if isinstance(source_tracking, dict): @@ -773,6 +753,9 @@ def _merge_created_work_item_source_tracking( proposal_data["source_tracking"] = st return if isinstance(source_tracking, list): + if not source_tracking: + proposal_data["source_tracking"] = [tracking_update] + return cast(list[dict[str, Any]], source_tracking).append(tracking_update) def _is_on_premise(self) -> bool: @@ -2287,27 +2270,6 @@ def _update_work_item_status( Returns: Dict with updated work item data: {"work_item_id": int, "work_item_url": str, "state": str} """ - target_repo = f"{org}/{project}" - work_item_id = self._work_item_id_from_source_tracking( - proposal_data.get("source_tracking", {}), - target_repo, - ) - - if not work_item_id: - msg = ( - f"Work item ID not found in source_tracking for repository {target_repo}. " - "Work item must be created first." - ) - raise ValueError(msg) - - # Ensure work_item_id is an integer for API call - if isinstance(work_item_id, str): - try: - work_item_id = int(work_item_id) - except ValueError: - msg = f"Invalid work item ID format: {work_item_id}" - raise ValueError(msg) from None - target_repo = f"{org}/{project}" work_item_id = self._get_source_tracking_work_item_id(proposal_data.get("source_tracking", {}), target_repo) ado_state = self._resolve_proposal_ado_state(proposal_data) diff --git a/src/specfact_cli/adapters/github.py b/src/specfact_cli/adapters/github.py index 27cfb431..7f2de29a 100644 --- a/src/specfact_cli/adapters/github.py +++ b/src/specfact_cli/adapters/github.py @@ -159,7 +159,7 @@ def _get_github_token_from_gh_cli() -> str | None: return None -_GITHUB_GIT_CONFIG_URL_RE = re.compile(r"url\s*=\s*(https?://[^\s]+|ssh://[^\s]+|git://[^\s]+|git@[^:]+:[^\s]+)") +_GITHUB_GIT_CONFIG_URL_RE = re.compile(r"(?im)^\s*url\s*=\s*(https?://\S+|ssh://\S+|git://\S+|git@[^:\s]+:\S+)\s*$") def _git_config_content_indicates_github(config_content: str) -> bool: diff --git a/src/specfact_cli/adapters/speckit.py b/src/specfact_cli/adapters/speckit.py index f6f95918..2530faad 100644 --- a/src/specfact_cli/adapters/speckit.py +++ b/src/specfact_cli/adapters/speckit.py @@ -462,13 +462,15 @@ def _build_stories_from_spec(self, spec_data: dict[str, Any]) -> list[Any]: story_title = sd.get("title", "Unknown Story") priority = sd.get("priority", "P3") acceptance_raw = sd.get("acceptance", []) + default_acceptance = [f"{story_title} is implemented"] if isinstance(acceptance_raw, list) and acceptance_raw: if all(isinstance(x, str) for x in acceptance_raw): - acceptance = list(acceptance_raw) + acceptance = list(acceptance_raw) or default_acceptance else: - acceptance = self._extract_text_list(cast(list[Any], acceptance_raw)) + extracted = self._extract_text_list(cast(list[Any], acceptance_raw)) + acceptance = extracted or default_acceptance else: - acceptance = [f"{story_title} is implemented"] + acceptance = default_acceptance story_points = priority_map.get(str(priority), 3) stories.append( Story( diff --git a/src/specfact_cli/analyzers/code_analyzer.py b/src/specfact_cli/analyzers/code_analyzer.py index ef946b91..40b7fa88 100644 --- a/src/specfact_cli/analyzers/code_analyzer.py +++ b/src/specfact_cli/analyzers/code_analyzer.py @@ -738,7 +738,19 @@ def _extract_themes_from_imports(self, tree: ast.AST) -> None: @staticmethod def _themes_for_import_module(module_name: str, theme_keywords: dict[str, str]) -> set[str]: lowered = module_name.lower() - return {theme for keyword, theme in theme_keywords.items() if keyword in lowered} + tokens = [t for t in re.split(r"[._-]", lowered) if t] + top_level = lowered.split(".", 1)[0] + found: set[str] = set() + for keyword, theme in theme_keywords.items(): + if keyword == lowered or lowered.startswith(f"{keyword}."): + found.add(theme) + continue + if keyword == top_level or top_level.startswith(f"{keyword}."): + found.add(theme) + continue + if keyword in tokens: + found.add(theme) + return found def _themes_for_import_node(self, node: ast.Import | ast.ImportFrom, theme_keywords: dict[str, str]) -> set[str]: if isinstance(node, ast.Import): @@ -1733,16 +1745,6 @@ def _detect_async_patterns(self, tree: ast.AST, file_path: Path) -> list[str]: self.async_patterns[module_name].extend(async_methods) return async_methods - @staticmethod - def _function_name_holding_ast_subtree(tree: ast.AST, target: ast.AST) -> str | None: - for parent in ast.walk(tree): - if not isinstance(parent, (ast.FunctionDef, ast.AsyncFunctionDef)): - continue - for child in ast.walk(parent): - if child is target: - return parent.name - return None - def _detect_async_patterns_parallel(self, tree: ast.AST, file_path: Path) -> list[str]: """ Detect async/await patterns in code (thread-safe version). @@ -1750,18 +1752,7 @@ def _detect_async_patterns_parallel(self, tree: ast.AST, file_path: Path) -> lis Returns: List of async method/function names """ - async_methods: list[str] = [] - - for node in ast.walk(tree): - if isinstance(node, ast.AsyncFunctionDef): - async_methods.append(node.name) - if not isinstance(node, ast.Await): - continue - host = self._function_name_holding_ast_subtree(tree, node) - if host and host not in async_methods: - async_methods.append(host) - - return async_methods + return [node.name for node in ast.walk(tree) if isinstance(node, ast.AsyncFunctionDef)] def _apply_commit_hash_to_matching_features(self, feature_num: str, commit_hash: str) -> None: for feature in self.features: diff --git a/src/specfact_cli/analyzers/graph_analyzer.py b/src/specfact_cli/analyzers/graph_analyzer.py index 2df648d5..909827b0 100644 --- a/src/specfact_cli/analyzers/graph_analyzer.py +++ b/src/specfact_cli/analyzers/graph_analyzer.py @@ -193,7 +193,17 @@ def _add_call_graph_edges_for_module( call_graph: dict[str, list[str]], python_files: list[Path], ) -> None: - """Add directed edges from ``module_name`` to callees that resolve to known graph nodes.""" + """Add call-graph-derived dependency edges for one module. + + Args: + graph: Dependency graph being populated. + module_name: Source module name for emitted edges. + call_graph: Mapping of caller symbols to callee symbol lists. + python_files: Repository Python files used for callee module resolution. + + Returns: + None. + """ for _caller, callees in call_graph.items(): for callee in callees: callee_module = self._resolve_module_from_function(callee, python_files) diff --git a/tests/integration/test_command_package_runtime_validation.py b/tests/integration/test_command_package_runtime_validation.py index deaf95c7..334e7327 100644 --- a/tests/integration/test_command_package_runtime_validation.py +++ b/tests/integration/test_command_package_runtime_validation.py @@ -1,5 +1,6 @@ from __future__ import annotations +import contextlib import hashlib import json import os @@ -44,8 +45,9 @@ def _resolve_modules_repo() -> Path: candidates = [ REPO_ROOT / "specfact-cli-modules", REPO_ROOT.parent / "specfact-cli-modules", - REPO_ROOT.parents[2] / "specfact-cli-modules", ] + with contextlib.suppress(IndexError): + candidates.append(REPO_ROOT.parents[2] / "specfact-cli-modules") for candidate in candidates: if candidate.exists(): return candidate diff --git a/tests/unit/adapters/test_ado.py b/tests/unit/adapters/test_ado.py index 016513a8..594ec51d 100644 --- a/tests/unit/adapters/test_ado.py +++ b/tests/unit/adapters/test_ado.py @@ -8,13 +8,14 @@ import os from pathlib import Path +from typing import Any from unittest.mock import MagicMock, patch import pytest import requests from beartype import beartype -from specfact_cli.adapters.ado import AdoAdapter +from specfact_cli.adapters.ado import AdoAdapter, _AdoCreatedWorkItemRef from specfact_cli.models.bridge import AdapterType, BridgeConfig from specfact_cli.models.change import ChangeProposal, ChangeTracking from specfact_cli.models.source_tracking import SourceTracking @@ -39,6 +40,24 @@ def bridge_config() -> BridgeConfig: class TestAdoAdapter: """Test Azure DevOps adapter implementation.""" + @beartype + def test_merge_created_work_item_preserves_empty_list_carrier(self, ado_adapter: AdoAdapter) -> None: + """Empty ``source_tracking`` list must stay a list and receive the new entry.""" + proposal_data: dict[str, Any] = {"source_tracking": []} + created = _AdoCreatedWorkItemRef( + work_item_id=42, + work_item_url="https://dev.azure.com/test-org/test-project/_workitems/edit/42", + org="test-org", + project="test-project", + work_item_type="Feature", + ado_state="New", + ) + ado_adapter._merge_created_work_item_source_tracking(proposal_data, created) + st = proposal_data["source_tracking"] + assert isinstance(st, list) + assert len(st) == 1 + assert st[0]["source_id"] == 42 + @beartype def test_detect_ado_repo(self, ado_adapter: AdoAdapter, tmp_path: Path, bridge_config: BridgeConfig) -> None: """Test Azure DevOps repository detection.""" diff --git a/tests/unit/adapters/test_github.py b/tests/unit/adapters/test_github.py index 709cdc67..35d7d64e 100644 --- a/tests/unit/adapters/test_github.py +++ b/tests/unit/adapters/test_github.py @@ -14,7 +14,7 @@ import requests from beartype import beartype -from specfact_cli.adapters.github import GitHubAdapter +from specfact_cli.adapters.github import GitHubAdapter, _git_config_content_indicates_github from specfact_cli.models.bridge import AdapterType, BridgeConfig @@ -57,6 +57,20 @@ def test_detect_non_github_repo(self, github_adapter: GitHubAdapter, tmp_path: P assert github_adapter.detect(tmp_path) is False + @beartype + def test_git_config_pushurl_only_does_not_indicate_github(self) -> None: + """Only ``url =`` lines count; ``pushurl`` must not imply GitHub.""" + content = '[remote "origin"]\npushurl = git@github.com:owner/repo.git\n' + assert _git_config_content_indicates_github(content) is False + + @beartype + def test_detect_pushurl_only_remote_is_not_github(self, github_adapter: GitHubAdapter, tmp_path: Path) -> None: + """``detect`` must not treat GitHub ``pushurl`` alone as a GitHub remote.""" + git_config = tmp_path / ".git" / "config" + git_config.parent.mkdir(parents=True) + git_config.write_text('[remote "origin"]\npushurl = git@github.com:owner/repo.git\n') + assert github_adapter.detect(tmp_path) is False + @beartype def test_detect_with_bridge_config( self, github_adapter: GitHubAdapter, tmp_path: Path, bridge_config: BridgeConfig diff --git a/tests/unit/adapters/test_openspec_parser.py b/tests/unit/adapters/test_openspec_parser.py index e8ef22ff..2f1dcdb7 100644 --- a/tests/unit/adapters/test_openspec_parser.py +++ b/tests/unit/adapters/test_openspec_parser.py @@ -207,6 +207,8 @@ def test_parse_change_spec_delta_content_keeps_markdown_headings( parsed = parser.parse_change_spec_delta(delta_path) assert parsed is not None + assert parsed.get("type") == "MODIFIED" + assert parsed.get("feature_id") == "001-auth" assert "## Subheading inside content" in (parsed.get("content") or "") assert "Intro line." in (parsed.get("content") or "") assert "More detail." in (parsed.get("content") or "") diff --git a/tests/unit/adapters/test_speckit.py b/tests/unit/adapters/test_speckit.py index 2dbd49da..befb5bb9 100644 --- a/tests/unit/adapters/test_speckit.py +++ b/tests/unit/adapters/test_speckit.py @@ -107,6 +107,23 @@ def speckit_repo_modern(tmp_path: Path) -> Path: class TestSpecKitAdapter: """Test Spec-Kit adapter implementation.""" + def test_build_stories_acceptance_defaults_when_non_string_list_yields_no_text( + self, speckit_adapter: SpecKitAdapter + ) -> None: + """Non-string acceptance items with no extractable text fall back to the default sentence.""" + spec_data: dict[str, Any] = { + "stories": [ + { + "key": "S1", + "title": "Do thing", + "priority": "P3", + "acceptance": [42, None], + } + ] + } + stories = speckit_adapter._build_stories_from_spec(spec_data) + assert stories[0].acceptance == ["Do thing is implemented"] + def test_detect_same_repo_classic(self, speckit_adapter: SpecKitAdapter, speckit_repo_classic: Path) -> None: """Test detecting classic Spec-Kit in same repository.""" assert speckit_adapter.detect(speckit_repo_classic) is True diff --git a/tests/unit/analyzers/test_code_analyzer.py b/tests/unit/analyzers/test_code_analyzer.py index c9eff122..b42a8608 100644 --- a/tests/unit/analyzers/test_code_analyzer.py +++ b/tests/unit/analyzers/test_code_analyzer.py @@ -3,6 +3,7 @@ Focus: Business logic and edge cases only (@beartype handles type validation). """ +import ast import os import tempfile from pathlib import Path @@ -16,6 +17,32 @@ class TestCodeAnalyzer: """Test suite for CodeAnalyzer.""" + def test_themes_for_import_module_avoids_substring_false_positive(self) -> None: + """``click`` theme must not match ``clickhouse_connect`` via substring.""" + theme_keywords = {"click": "CLI", "fastapi": "API"} + assert CodeAnalyzer._themes_for_import_module("clickhouse_connect", theme_keywords) == set() + assert CodeAnalyzer._themes_for_import_module("click", theme_keywords) == {"CLI"} + assert CodeAnalyzer._themes_for_import_module("click.extra", theme_keywords) == {"CLI"} + + def test_detect_async_patterns_parallel_lists_async_defs_only(self) -> None: + """Async function names are collected without redundant await walks.""" + source = dedent( + """ + async def outer(): + await inner() + + async def inner(): + pass + + def sync(): + return 1 + """ + ) + tree = ast.parse(source) + analyzer = CodeAnalyzer(Path(".")) + names = analyzer._detect_async_patterns_parallel(tree, Path("mod.py")) + assert set(names) == {"outer", "inner"} + def test_should_skip_test_files(self): """Test that test files are skipped.""" analyzer = CodeAnalyzer(Path(".")) diff --git a/tests/unit/scripts/test_verify_safe_project_writes.py b/tests/unit/scripts/test_verify_safe_project_writes.py index 0065598b..bb3d569e 100644 --- a/tests/unit/scripts/test_verify_safe_project_writes.py +++ b/tests/unit/scripts/test_verify_safe_project_writes.py @@ -4,8 +4,6 @@ import ast import importlib.util -import subprocess -import sys from pathlib import Path @@ -48,13 +46,41 @@ def test_collect_json_io_flags_from_json_star_import() -> None: assert any(name == "json.dump" for _, name in offenders) -def test_verify_safe_project_writes_passes_on_repo() -> None: - """Gate must succeed while ide_setup routes settings through project_artifact_write.""" - script = Path(__file__).resolve().parents[3] / "scripts" / "verify_safe_project_writes.py" - completed = subprocess.run( - [sys.executable, str(script)], - check=False, - capture_output=True, - text=True, - ) - assert completed.returncode == 0, completed.stderr +def test_collect_json_io_ignores_shadowed_loads_name() -> None: + mod = _load_verify_module() + tree = ast.parse('from json import loads\ndef f(loads):\n return loads("{}")\n') + offenders = mod._collect_json_io_offenders(tree) + assert not offenders + + +def test_collect_json_io_flags_loads_in_function_default() -> None: + mod = _load_verify_module() + tree = ast.parse('from json import loads\ndef f(x=loads("{}")):\n pass\n') + offenders = mod._collect_json_io_offenders(tree) + assert any(name == "json.loads" for _, name in offenders) + + +def test_collect_json_io_flags_loads_in_kwonly_default() -> None: + mod = _load_verify_module() + tree = ast.parse('from json import loads\ndef f(*, x=loads("{}")):\n pass\n') + offenders = mod._collect_json_io_offenders(tree) + assert any(name == "json.loads" for _, name in offenders) + + +def test_verify_safe_project_writes_passes_for_safe_stub(tmp_path: Path) -> None: + """Gate succeeds when IDE setup stub has no direct json I/O offenders.""" + mod = _load_verify_module() + ide_setup = tmp_path / "ide_setup.py" + ide_setup.write_text("def noop() -> None:\n return None\n", encoding="utf-8") + mod.ROOT = tmp_path + mod.IDE_SETUP = ide_setup + assert mod.main() == 0 + + +def test_verify_safe_project_writes_parse_error_returns_one(tmp_path: Path) -> None: + mod = _load_verify_module() + ide_setup = tmp_path / "ide_setup.py" + ide_setup.write_text("def broken(\n", encoding="utf-8") + mod.ROOT = tmp_path + mod.IDE_SETUP = ide_setup + assert mod.main() == 1