Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion scripts/check_doc_frontmatter.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,8 @@ def _agent_rule_optional_frontmatter_lines(draft: _AgentRuleFrontmatterDraft) ->
@require(lambda path: isinstance(path, Path), "Path must be Path object")
@ensure(lambda result: isinstance(result, str), "Must return string")
def _format_agent_rules_suggested_frontmatter(path: Path, canonical_id: str, draft: _AgentRuleFrontmatterDraft) -> str:
title_guess = path.stem.replace("-", " ").title().replace('"', '\\"')
slug_for_title = canonical_id.removeprefix("agent-rules-")
title_guess = slug_for_title.replace("-", " ").title().replace('"', '\\"')
optional_lines = _agent_rule_optional_frontmatter_lines(draft)
return f"""---
layout: {_yaml_plain_or_quoted_scalar(draft.layout_val)}
Expand Down
250 changes: 235 additions & 15 deletions scripts/verify_safe_project_writes.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import ast
import sys
from pathlib import Path
from typing import Any

from beartype import beartype
from icontract import ensure, require
Expand Down Expand Up @@ -57,23 +58,237 @@ def _json_bindings(tree: ast.AST) -> tuple[dict[str, str], frozenset[str]]:
return func_aliases, frozenset(module_locals)


def _add_shadows_from_target(
target: ast.AST,
func_aliases: dict[str, str],
module_locals: frozenset[str],
shadow_func: set[str],
shadow_mod: set[str],
) -> None:
if isinstance(target, ast.Name):
if target.id in func_aliases:
shadow_func.add(target.id)
if target.id in module_locals:
shadow_mod.add(target.id)
elif isinstance(target, (ast.Tuple, ast.List)):
for elt in target.elts:
_add_shadows_from_target(elt, func_aliases, module_locals, shadow_func, shadow_mod)


def _collect_json_io_offenders(tree: ast.AST) -> list[tuple[int, str]]:
func_aliases, module_locals = _json_bindings(tree)
offenders: list[tuple[int, str]] = []
for node in ast.walk(tree):
if not isinstance(node, ast.Call):
continue
func = node.func
if isinstance(func, ast.Name) and func.id in func_aliases:
offenders.append((node.lineno, func_aliases[func.id]))
continue
if (
isinstance(func, ast.Attribute)
and isinstance(func.value, ast.Name)
and func.value.id in module_locals
and func.attr in _JSON_IO_NAMES
):
offenders.append((node.lineno, f"json.{func.attr}"))

class _Visitor(ast.NodeVisitor):
def __init__(self) -> None:
self.shadow_func: set[str] = set()
self.shadow_mod: set[str] = set()

def _push_scope(self) -> tuple[set[str], set[str]]:
return (set(self.shadow_func), set(self.shadow_mod))

def _pop_scope(self, saved: tuple[set[str], set[str]]) -> None:
self.shadow_func, self.shadow_mod = saved

def _shadow_arguments(self, args: ast.arguments) -> None:
for a in list(args.posonlyargs) + list(args.args) + list(args.kwonlyargs):
_add_shadows_from_target(
ast.Name(id=a.arg, ctx=ast.Store()),
func_aliases,
module_locals,
self.shadow_func,
self.shadow_mod,
)
if args.vararg:
_add_shadows_from_target(
ast.Name(id=args.vararg.arg, ctx=ast.Store()),
func_aliases,
module_locals,
self.shadow_func,
self.shadow_mod,
)
if args.kwarg:
_add_shadows_from_target(
ast.Name(id=args.kwarg.arg, ctx=ast.Store()),
func_aliases,
module_locals,
self.shadow_func,
self.shadow_mod,
)

def _visit_function_defaults_before_scope(self, args: ast.arguments) -> None:
"""Defaults and decorators run in the enclosing scope; visit before parameter shadows apply."""
pos_and_regular = list(args.posonlyargs) + list(args.args)
defaults = list(args.defaults)
if defaults:
for _param, default in zip(pos_and_regular[-len(defaults) :], defaults, strict=True):
self.visit(default)
for _kw_arg, default in zip(args.kwonlyargs, args.kw_defaults, strict=True):
if default is not None:
self.visit(default)

def _visit_function_body(self, node: ast.FunctionDef | ast.AsyncFunctionDef) -> None:
for d in node.decorator_list:
self.visit(d)
self._visit_function_defaults_before_scope(node.args)
saved = self._push_scope()
self._shadow_arguments(node.args)
for child in node.body:
self.visit(child)
Comment thread
djm81 marked this conversation as resolved.
self._pop_scope(saved)

def visit_FunctionDef(self, node: ast.FunctionDef) -> Any:
self._visit_function_body(node)
return None

def visit_AsyncFunctionDef(self, node: ast.AsyncFunctionDef) -> Any:
self._visit_function_body(node)
return None

def visit_ClassDef(self, node: ast.ClassDef) -> Any:
for d in node.decorator_list:
self.visit(d)
for b in node.bases:
self.visit(b)
for k in node.keywords:
self.visit(k.value)
saved = self._push_scope()
for child in node.body:
self.visit(child)
self._pop_scope(saved)
return None

def visit_Assign(self, node: ast.Assign) -> Any:
self.visit(node.value)
for t in node.targets:
self.visit(t)
_add_shadows_from_target(t, func_aliases, module_locals, self.shadow_func, self.shadow_mod)
return None

def visit_AnnAssign(self, node: ast.AnnAssign) -> Any:
self.visit(node.annotation)
if node.value is not None:
self.visit(node.value)
if node.target is not None:
self.visit(node.target)
_add_shadows_from_target(node.target, func_aliases, module_locals, self.shadow_func, self.shadow_mod)
return None

def visit_AugAssign(self, node: ast.AugAssign) -> Any:
self.visit(node.value)
self.visit(node.target)
_add_shadows_from_target(node.target, func_aliases, module_locals, self.shadow_func, self.shadow_mod)
return None

def visit_NamedExpr(self, node: ast.NamedExpr) -> Any:
self.visit(node.value)
self.visit(node.target)
_add_shadows_from_target(node.target, func_aliases, module_locals, self.shadow_func, self.shadow_mod)
return None

def visit_For(self, node: ast.For) -> Any:
self.visit(node.iter)
self.visit(node.target)
_add_shadows_from_target(node.target, func_aliases, module_locals, self.shadow_func, self.shadow_mod)
for stmt in node.body:
self.visit(stmt)
for stmt in node.orelse:
self.visit(stmt)
return None

def visit_AsyncFor(self, node: ast.AsyncFor) -> Any:
return self.visit_For(node) # type: ignore[arg-type]

def visit_With(self, node: ast.With) -> Any:
for item in node.items:
self.visit(item.context_expr)
if item.optional_vars is not None:
self.visit(item.optional_vars)
_add_shadows_from_target(
item.optional_vars,
func_aliases,
module_locals,
self.shadow_func,
self.shadow_mod,
)
for stmt in node.body:
self.visit(stmt)
return None

def visit_AsyncWith(self, node: ast.AsyncWith) -> Any:
return self.visit_With(node) # type: ignore[arg-type]

def visit_ExceptHandler(self, node: ast.ExceptHandler) -> Any:
if node.type is not None:
self.visit(node.type)
if node.name:
_add_shadows_from_target(
ast.Name(id=node.name, ctx=ast.Store()),
func_aliases,
module_locals,
self.shadow_func,
self.shadow_mod,
)
for stmt in node.body:
self.visit(stmt)
return None

def _visit_comprehensions_then_elt(
self,
generators: list[ast.comprehension],
visit_elt: Any | None = None,
) -> None:
saved = self._push_scope()
for gen in generators:
self.visit(gen.iter)
self.visit(gen.target)
_add_shadows_from_target(gen.target, func_aliases, module_locals, self.shadow_func, self.shadow_mod)
for if_clause in gen.ifs:
self.visit(if_clause)
if visit_elt is not None:
visit_elt()
self._pop_scope(saved)

def visit_ListComp(self, node: ast.ListComp) -> Any:
self._visit_comprehensions_then_elt(node.generators, lambda: self.visit(node.elt))
return None

def visit_SetComp(self, node: ast.SetComp) -> Any:
self._visit_comprehensions_then_elt(node.generators, lambda: self.visit(node.elt))
return None

def visit_GeneratorExp(self, node: ast.GeneratorExp) -> Any:
self._visit_comprehensions_then_elt(node.generators, lambda: self.visit(node.elt))
return None

def visit_DictComp(self, node: ast.DictComp) -> Any:
def visit_kv() -> None:
self.visit(node.key)
self.visit(node.value)

self._visit_comprehensions_then_elt(node.generators, visit_kv)
return None

def visit_Call(self, node: ast.Call) -> Any:
func = node.func
if isinstance(func, ast.Name) and func.id in func_aliases and func.id not in self.shadow_func:
offenders.append((node.lineno, func_aliases[func.id]))
elif (
isinstance(func, ast.Attribute)
and isinstance(func.value, ast.Name)
and func.value.id in module_locals
and func.value.id not in self.shadow_mod
and func.attr in _JSON_IO_NAMES
):
offenders.append((node.lineno, f"json.{func.attr}"))
self.visit(func)
for arg in node.args:
self.visit(arg)
for kw in node.keywords:
self.visit(kw.value)
return None

_Visitor().visit(tree)
return offenders


Expand All @@ -84,7 +299,12 @@ def main() -> int:
if not IDE_SETUP.is_file():
_write_stderr(f"Expected ide_setup at {IDE_SETUP}")
return 2
tree = ast.parse(IDE_SETUP.read_text(encoding="utf-8"), filename=str(IDE_SETUP))
try:
source = IDE_SETUP.read_text(encoding="utf-8")
tree = ast.parse(source, filename=str(IDE_SETUP))
except (OSError, UnicodeDecodeError, SyntaxError) as exc:
_write_stderr(f"Unable to analyze {IDE_SETUP}: {exc}")
return 1
offenders = _collect_json_io_offenders(tree)
if offenders:
lines = ", ".join(f"line {ln} ({name})" for ln, name in offenders)
Expand Down
46 changes: 4 additions & 42 deletions src/specfact_cli/adapters/ado.py
Original file line number Diff line number Diff line change
Expand Up @@ -707,26 +707,6 @@ def _configure_api_token(self, api_token: str | None) -> None:
self.api_token = None
self.auth_scheme = None

@staticmethod
def _work_item_id_from_source_tracking(source_tracking: Any, target_repo: str) -> Any:
if isinstance(source_tracking, dict):
return _as_str_dict(source_tracking).get("source_id")
if not isinstance(source_tracking, list):
return None
for entry in source_tracking:
if not isinstance(entry, dict):
continue
ed = _as_str_dict(entry)
entry_repo = ed.get("source_repo")
if entry_repo == target_repo:
return ed.get("source_id")
if entry_repo:
continue
source_url = ed.get("source_url", "")
if source_url and target_repo in source_url:
return ed.get("source_id")
return None

def _ado_create_patch_document(self, title: str, body: str, ado_state: str) -> list[dict[str, Any]]:
return [
{"op": "add", "path": "/fields/System.Title", "value": title},
Expand Down Expand Up @@ -764,7 +744,7 @@ def _merge_created_work_item_source_tracking(
},
}
source_tracking = proposal_data.get("source_tracking")
if not source_tracking:
if source_tracking is None:
proposal_data["source_tracking"] = tracking_update
return
if isinstance(source_tracking, dict):
Expand All @@ -773,6 +753,9 @@ def _merge_created_work_item_source_tracking(
proposal_data["source_tracking"] = st
return
if isinstance(source_tracking, list):
if not source_tracking:
proposal_data["source_tracking"] = [tracking_update]
return
cast(list[dict[str, Any]], source_tracking).append(tracking_update)

def _is_on_premise(self) -> bool:
Expand Down Expand Up @@ -2287,27 +2270,6 @@ def _update_work_item_status(
Returns:
Dict with updated work item data: {"work_item_id": int, "work_item_url": str, "state": str}
"""
target_repo = f"{org}/{project}"
work_item_id = self._work_item_id_from_source_tracking(
proposal_data.get("source_tracking", {}),
target_repo,
)

if not work_item_id:
msg = (
f"Work item ID not found in source_tracking for repository {target_repo}. "
"Work item must be created first."
)
raise ValueError(msg)

# Ensure work_item_id is an integer for API call
if isinstance(work_item_id, str):
try:
work_item_id = int(work_item_id)
except ValueError:
msg = f"Invalid work item ID format: {work_item_id}"
raise ValueError(msg) from None

target_repo = f"{org}/{project}"
work_item_id = self._get_source_tracking_work_item_id(proposal_data.get("source_tracking", {}), target_repo)
ado_state = self._resolve_proposal_ado_state(proposal_data)
Expand Down
2 changes: 1 addition & 1 deletion src/specfact_cli/adapters/github.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def _get_github_token_from_gh_cli() -> str | None:
return None


_GITHUB_GIT_CONFIG_URL_RE = re.compile(r"url\s*=\s*(https?://[^\s]+|ssh://[^\s]+|git://[^\s]+|git@[^:]+:[^\s]+)")
_GITHUB_GIT_CONFIG_URL_RE = re.compile(r"(?im)^\s*url\s*=\s*(https?://\S+|ssh://\S+|git://\S+|git@[^:\s]+:\S+)\s*$")


def _git_config_content_indicates_github(config_content: str) -> bool:
Expand Down
8 changes: 5 additions & 3 deletions src/specfact_cli/adapters/speckit.py
Original file line number Diff line number Diff line change
Expand Up @@ -462,13 +462,15 @@ def _build_stories_from_spec(self, spec_data: dict[str, Any]) -> list[Any]:
story_title = sd.get("title", "Unknown Story")
priority = sd.get("priority", "P3")
acceptance_raw = sd.get("acceptance", [])
default_acceptance = [f"{story_title} is implemented"]
if isinstance(acceptance_raw, list) and acceptance_raw:
if all(isinstance(x, str) for x in acceptance_raw):
acceptance = list(acceptance_raw)
acceptance = list(acceptance_raw) or default_acceptance
else:
acceptance = self._extract_text_list(cast(list[Any], acceptance_raw))
extracted = self._extract_text_list(cast(list[Any], acceptance_raw))
acceptance = extracted or default_acceptance
else:
acceptance = [f"{story_title} is implemented"]
acceptance = default_acceptance
story_points = priority_map.get(str(priority), 3)
stories.append(
Story(
Expand Down
Loading