diff --git a/docs/skills/README.md b/docs/skills/README.md index 7f09f5f..efb9986 100644 --- a/docs/skills/README.md +++ b/docs/skills/README.md @@ -38,6 +38,7 @@ Enforces privacy, guardrails, and secure handling of sensitive data before it re | :--- | :--- | :--- | | **[PII Masker](pii_masker.md)** | `compliance/pii_masker` | High-precision, local PII (Personally Identifiable Information) detection and redaction using the micro-f1-mask model. | | **[MiCA Module](mica_module.md)** | `compliance/mica_module` | Self-contained local Policy Enforcement and RAG engine strictly adhering to MiCA crypto-asset regulation. | +| **[Terms of Service Evaluator](tos_evaluator.md)** | `compliance/tos_evaluator` | Local-first evaluation of robots.txt and website legal pages to decide whether an intended automated action appears permissible. | --- diff --git a/docs/skills/tos_evaluator.md b/docs/skills/tos_evaluator.md new file mode 100644 index 0000000..6e5017c --- /dev/null +++ b/docs/skills/tos_evaluator.md @@ -0,0 +1,89 @@ +# Terms of Service Evaluator + +**Domain:** `compliance` +**Skill ID:** `compliance/tos_evaluator` + +A local-first compliance guardrail that checks whether an intended automated action appears permissible on a target website. It evaluates `robots.txt`, discovers candidate legal pages, extracts relevant clauses, and can optionally use a low-cost LLM to interpret ambiguous policy language. + +## What It Checks + +1. `robots.txt` rules for the exact target URL and user-agent. +2. Likely Terms, Legal, Acceptable Use, and API policy pages on the same site. +3. Clauses related to scraping, crawling, indexing, monitoring, downloading, and API-only access. +4. Optional LLM-backed clause review when local heuristics cannot confidently classify the policy language. + +## Manifest Details + +**Parameters Schema:** +* `target_url` (string): Full URL the agent intends to access. +* `intended_action` (string): Natural-language action such as `scrape pricing data` or `index docs`. +* `user_agent` (string, optional): User-agent used for `robots.txt` checks. +* `fetch_mode` (string, optional): `lightweight` or `deep`. +* `use_llm_evaluator` (boolean, optional): Enables optional clause interpretation for low-confidence cases. +* `llm_provider` (string, optional): Provider name for the optional evaluator. +* `llm_model` (string, optional): Model name such as `gemini-2.5-flash-lite`. +* `assume_authenticated_session` (boolean, optional): Helps represent paid or logged-in usage contexts. +* `max_terms_pages` (integer, optional): Caps discovery breadth. + +**Outputs Schema:** +* `is_safe_to_proceed` (boolean): Whether the action was approved. +* `confidence_score` (number): Confidence in the verdict. +* `verdict` (string): `SAFE`, `UNSAFE`, `CAUTION`, or `INSUFFICIENT_EVIDENCE`. +* `reason` (string): Short explanation of the verdict. +* `robots_assessment` (object): Structured `robots.txt` result. +* `tos_assessment` (object): Structured policy discovery and clause result. +* `llm_assessment` (object): Optional evaluator result. +* `evidence` (array): Supporting snippets and sources. + +## Verdict Semantics + +* `SAFE`: strong evidence suggests the requested action is allowed, and `robots.txt` does not block it. +* `UNSAFE`: `robots.txt` blocks the path or discovered policy text explicitly restricts the automation. +* `CAUTION`: the site may allow access, but only with conditions such as API usage, permission, or strict rate limits. +* `INSUFFICIENT_EVIDENCE`: the evaluator could not find enough trustworthy evidence to safely approve the action. + +## Example Usage (Direct) + +```python +from skillware.core.loader import SkillLoader + +bundle = SkillLoader.load_skill("compliance/tos_evaluator") +TOSEvaluatorSkill = bundle["module"].TOSEvaluatorSkill +skill = TOSEvaluatorSkill() + +result = skill.execute( + { + "target_url": "https://hackernoon.com/tagged/ai", + "intended_action": "crawl tagged article pages for research indexing", + "use_llm_evaluator": True, + "llm_provider": "gemini", + "llm_model": "gemini-2.5-flash-lite", + } +) + +print(result["verdict"]) +print(result["reason"]) +``` + +## Gemini Example + +Use the skill through `SkillLoader.to_gemini_tool(...)` and pass the skill instructions as the `system_instruction`. See `examples/gemini_tos_evaluator.py`. + +## Claude Example + +Use the skill through `SkillLoader.to_claude_tool(...)` and return the structured result back to Claude as a tool result. See `examples/claude_tos_evaluator.py`. + +## Ollama Example + +Use the text-based prompt adapter from `SkillLoader.to_ollama_prompt(...)` and execute the skill locally when the model emits a JSON tool block. See `examples/ollama_tos_evaluator.py`. + +## Notes + +This skill is a practical operational safeguard, not legal counsel. If the result is `CAUTION` or `INSUFFICIENT_EVIDENCE`, the safe default is manual review or an official API/developer integration path. + +To run tests specifically for this skill: + +```bash +pytest tests/skills/compliance/test_tos_evaluator.py +pytest skills/compliance/tos_evaluator/test_skill.py +``` diff --git a/examples/claude_tos_evaluator.py b/examples/claude_tos_evaluator.py new file mode 100644 index 0000000..6da2a17 --- /dev/null +++ b/examples/claude_tos_evaluator.py @@ -0,0 +1,68 @@ +import json +import os + +import anthropic + +from skillware.core.env import load_env_file +from skillware.core.loader import SkillLoader + +load_env_file() + +bundle = SkillLoader.load_skill("compliance/tos_evaluator") +print(f"Loaded Skill: {bundle['manifest']['name']}") + +TOSEvaluatorSkill = bundle["module"].TOSEvaluatorSkill +tos_skill = TOSEvaluatorSkill() + +client = anthropic.Anthropic(api_key=os.environ.get("ANTHROPIC_API_KEY")) +tools = [SkillLoader.to_claude_tool(bundle)] + +user_query = ( + "Can I use an automated crawler against https://hackernoon.com/tagged/devops " + "for research indexing? Check first." +) +print(f"User: {user_query}") + +message = client.messages.create( + model="claude-3-5-sonnet-latest", + max_tokens=1024, + system=bundle["instructions"], + messages=[{"role": "user", "content": user_query}], + tools=tools, +) + +if message.stop_reason == "tool_use": + tool_use = next(block for block in message.content if block.type == "tool_use") + tool_name = tool_use.name + tool_input = tool_use.input + + print(f"Claude requested tool: {tool_name}") + print(f"Input: {tool_input}") + + if tool_name == "compliance/tos_evaluator": + result = tos_skill.execute(tool_input) + print(json.dumps(result, indent=2)) + + response = client.messages.create( + model="claude-3-5-sonnet-latest", + max_tokens=1024, + system=bundle["instructions"], + tools=tools, + messages=[ + {"role": "user", "content": user_query}, + {"role": "assistant", "content": message.content}, + { + "role": "user", + "content": [ + { + "type": "tool_result", + "tool_use_id": tool_use.id, + "content": json.dumps(result), + } + ], + }, + ], + ) + + print("\nFinal Response:") + print(response.content[0].text) diff --git a/examples/gemini_tos_evaluator.py b/examples/gemini_tos_evaluator.py new file mode 100644 index 0000000..9cbcb6f --- /dev/null +++ b/examples/gemini_tos_evaluator.py @@ -0,0 +1,61 @@ +import json +import os + +import google.generativeai as genai + +from skillware.core.env import load_env_file +from skillware.core.loader import SkillLoader + +load_env_file() + +bundle = SkillLoader.load_skill("compliance/tos_evaluator") +print(f"Loaded Skill: {bundle['manifest']['name']}") + +TOSEvaluatorSkill = bundle["module"].TOSEvaluatorSkill +tos_skill = TOSEvaluatorSkill() + +genai.configure(api_key=os.environ.get("GOOGLE_API_KEY")) +tools = [SkillLoader.to_gemini_tool(bundle)] + +model = genai.GenerativeModel( + "gemini-2.5-flash-lite", + tools=tools, + system_instruction=bundle["instructions"], +) + +chat = model.start_chat(enable_automatic_function_calling=True) +user_query = ( + "Before scraping Hackernoon tagged AI pages, check whether automated crawling " + "appears allowed for https://hackernoon.com/tagged/ai." +) +print(f"User: {user_query}") + +response = chat.send_message(user_query) +while response.candidates and response.candidates[0].content.parts: + part = response.candidates[0].content.parts[0] + if not part.function_call: + break + + fn_name = part.function_call.name + fn_args = dict(part.function_call.args) + print(f"Gemini requested tool: {fn_name}") + print(f"Input: {fn_args}") + + if fn_name == "compliance/tos_evaluator": + result = tos_skill.execute(fn_args) + print(json.dumps(result, indent=2)) + response = chat.send_message( + [ + { + "function_response": { + "name": fn_name, + "response": {"result": result}, + } + } + ] + ) + else: + break + +print("\nFinal Response:") +print(response.text) diff --git a/examples/ollama_tos_evaluator.py b/examples/ollama_tos_evaluator.py new file mode 100644 index 0000000..4c75f72 --- /dev/null +++ b/examples/ollama_tos_evaluator.py @@ -0,0 +1,71 @@ +import json +import re + +import ollama + +from skillware.core.env import load_env_file +from skillware.core.loader import SkillLoader + +load_env_file() + +bundle = SkillLoader.load_skill("compliance/tos_evaluator") +TOSEvaluatorSkill = bundle["module"].TOSEvaluatorSkill +tos_skill = TOSEvaluatorSkill() + +tool_description = SkillLoader.to_ollama_prompt(bundle) +tool_description += f"\n**Cognitive Instructions:**\n{bundle['instructions']}\n" + +system_prompt = f"""You are an intelligent agent equipped with a local website policy evaluation skill. +To use a skill, output exactly one JSON code block: +```json +{{ + "tool": "the_tool_name", + "arguments": {{ + "param_name": "value" + }} +}} +``` +Wait for the system response containing the tool result before continuing. + +Available skill: +{tool_description} +""" + +messages = [ + {"role": "system", "content": system_prompt}, + { + "role": "user", + "content": ( + "Before I crawl https://hackernoon.com/tagged/startups with a bot for research, " + "check whether that seems allowed." + ), + }, +] + +model_name = "llama3" +response = ollama.chat(model=model_name, messages=messages) +message_content = response.get("message", {}).get("content", "") +print(message_content) + +tool_match = re.search(r"```json\s*({.*?})\s*```", message_content, re.DOTALL) +if tool_match: + tool_call = json.loads(tool_match.group(1)) + fn_name = tool_call.get("tool") + fn_args = tool_call.get("arguments", {}) + + if fn_name == "compliance/tos_evaluator": + result = tos_skill.execute(fn_args) + print(json.dumps(result, indent=2)) + messages.append({"role": "assistant", "content": message_content}) + messages.append( + { + "role": "user", + "content": ( + "SYSTEM RESPONSE:\n" + f"```json\n{json.dumps(result)}\n```\n" + "Please provide the final answer." + ), + } + ) + final_response = ollama.chat(model=model_name, messages=messages) + print(final_response.get("message", {}).get("content", "")) diff --git a/pyproject.toml b/pyproject.toml index 2570c5e..5782e52 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,6 +23,7 @@ dependencies = [ "anthropic", "google-generativeai", "pymupdf", + "beautifulsoup4", ] requires-python = ">=3.10" diff --git a/requirements.txt b/requirements.txt index b0b0f4e..5e83241 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,6 +2,7 @@ pyyaml python-dotenv requests +beautifulsoup4 # LLM SDKs (Optional but recommended for examples) google-generativeai diff --git a/skills/compliance/tos_evaluator/__init__.py b/skills/compliance/tos_evaluator/__init__.py new file mode 100644 index 0000000..22d028d --- /dev/null +++ b/skills/compliance/tos_evaluator/__init__.py @@ -0,0 +1,3 @@ +from .skill import TOSEvaluatorSkill + +__all__ = ["TOSEvaluatorSkill"] diff --git a/skills/compliance/tos_evaluator/card.json b/skills/compliance/tos_evaluator/card.json new file mode 100644 index 0000000..1f08b52 --- /dev/null +++ b/skills/compliance/tos_evaluator/card.json @@ -0,0 +1,23 @@ +{ + "name": "TOS Evaluator", + "description": "Checks robots.txt and legal pages before automated website access.", + "icon": "shield", + "color": "green", + "ui_schema": { + "type": "card", + "fields": [ + { + "key": "verdict", + "label": "Verdict" + }, + { + "key": "confidence_score", + "label": "Confidence" + }, + { + "key": "reason", + "label": "Reason" + } + ] + } +} diff --git a/skills/compliance/tos_evaluator/instructions.md b/skills/compliance/tos_evaluator/instructions.md new file mode 100644 index 0000000..97aca6f --- /dev/null +++ b/skills/compliance/tos_evaluator/instructions.md @@ -0,0 +1,34 @@ +# Operational Instructions: TOS Evaluator + +You are an agent equipped with the `compliance/tos_evaluator` skill. + +## When to use this skill +- Use this skill before scraping, crawling, indexing, bulk downloading, automated monitoring, or other programmatic access to a website. +- Use it when the user asks whether a website permits bots, scraping, or non-browser automation. +- Use it when you need a conservative compliance check before a downstream agent or script touches a domain. + +## What this skill actually checks +The skill evaluates: +1. `robots.txt` access rules for the requested URL and user-agent. +2. Candidate Terms of Service, legal, acceptable-use, and API policy pages discovered on the same site. +3. Optional low-cost LLM review for ambiguous legal clauses when enabled. + +## How to interpret the output +- `SAFE`: strong evidence suggests the requested action is allowed, and `robots.txt` does not block it. +- `UNSAFE`: `robots.txt` blocks the path or the discovered policy text explicitly restricts the requested automation. +- `CAUTION`: the site may allow some access, but the policy text contains conditions, ambiguities, or API-only restrictions. +- `INSUFFICIENT_EVIDENCE`: the skill could not find enough trustworthy evidence to safely approve the action. + +Always explain the result conservatively. This tool is an operational guardrail, not legal counsel. + +## Important behavior rules +- Do not claim legal certainty. +- If the result is `CAUTION` or `INSUFFICIENT_EVIDENCE`, recommend manual review or an official API path. +- If the result is `UNSAFE`, clearly tell the user not to proceed without explicit permission. +- If the result relied on `llm_assessment`, mention that it was an auxiliary interpretation layer on top of fetched evidence. + +## Example uses +- User: "Can I scrape pricing from this page?" -> call the tool with `target_url` and `intended_action=\"scrape pricing data\"` +- User: "Can I index these docs into search?" -> call the tool with `intended_action=\"index documentation pages\"` +- User: "Can I use a bot to monitor changes daily?" -> call the tool with `intended_action=\"monitor content with automated bot\"` +- If legal language is vague and the user enabled fallback review, set `use_llm_evaluator=true`. diff --git a/skills/compliance/tos_evaluator/manifest.yaml b/skills/compliance/tos_evaluator/manifest.yaml new file mode 100644 index 0000000..c520b63 --- /dev/null +++ b/skills/compliance/tos_evaluator/manifest.yaml @@ -0,0 +1,66 @@ +name: "compliance/tos_evaluator" +version: "0.1.0" +description: "Evaluates whether an intended automated website action appears permissible by checking robots.txt, discovered legal pages, and optional low-cost LLM review for ambiguous policy language." +category: "compliance" +parameters: + type: object + properties: + target_url: + type: string + description: "The full website URL the agent intends to access." + intended_action: + type: string + description: "Natural-language description of the automation, such as scrape pricing data, index docs, or use the API." + user_agent: + type: string + description: "Optional user-agent string used for robots.txt evaluation." + fetch_mode: + type: string + description: "How broadly to search for legal pages. Options: lightweight or deep." + enum: ["lightweight", "deep"] + use_llm_evaluator: + type: boolean + description: "If true, uses an optional LLM fallback for low-confidence or ambiguous legal clauses." + llm_provider: + type: string + description: "Optional provider for the clause evaluator. Gemini is the documented example." + llm_model: + type: string + description: "Optional model name for the clause evaluator, such as gemini-2.5-flash-lite." + assume_authenticated_session: + type: boolean + description: "Whether to evaluate the request as if the agent is acting within a logged-in or paid session." + max_terms_pages: + type: integer + description: "Maximum number of discovered legal pages to fetch and inspect." + required: + - target_url + - intended_action +outputs: + is_safe_to_proceed: + type: boolean + description: "Whether the evaluator found enough evidence to consider the action safe." + confidence_score: + type: number + description: "The evaluator's confidence in the final verdict." + verdict: + type: string + description: "One of SAFE, UNSAFE, CAUTION, or INSUFFICIENT_EVIDENCE." + reason: + type: string + description: "Short explanation of the primary reason for the verdict." +requirements: + - requests + - bs4 +constitution: | + 1. LEGAL CAUTION: Never represent the result as formal legal advice or a definitive legal interpretation. + 2. CONSERVATIVE VERDICTS: Prefer CAUTION or INSUFFICIENT_EVIDENCE over falsely approving ambiguous automated access. + 3. SCOPE LIMITATION: Only evaluate permissibility signals; do not perform the target scraping, crawling, or extraction task. + 4. PRIVACY: Do not send fetched policy text to an external model unless the caller explicitly enables the optional LLM evaluator. +env_vars: + GOOGLE_API_KEY: + description: "Optional. Required only when llm_provider is Gemini and use_llm_evaluator is enabled." + required: false +presentation: + icon: "shield" + color: "#16a34a" diff --git a/skills/compliance/tos_evaluator/skill.py b/skills/compliance/tos_evaluator/skill.py new file mode 100644 index 0000000..f8a89a4 --- /dev/null +++ b/skills/compliance/tos_evaluator/skill.py @@ -0,0 +1,587 @@ +import json +import os +import re +from typing import Any, Dict, List, Optional +from urllib.parse import urljoin, urlparse +from urllib.robotparser import RobotFileParser + +import requests +import yaml +from bs4 import BeautifulSoup +from skillware.core.base_skill import BaseSkill + +try: + import google.generativeai as genai +except ImportError: # pragma: no cover - dependency is optional at runtime + genai = None + + +class TOSEvaluatorSkill(BaseSkill): + """ + Evaluates whether an automated website action appears permissible based on + robots.txt and discovered legal policy pages. + """ + + WELL_KNOWN_POLICY_PATHS = [ + "/terms", + "/terms-of-service", + "/terms-of-use", + "/tos", + "/legal/terms", + "/legal", + "/conditions", + "/policies/terms", + "/acceptable-use", + "/aup", + "/developer-terms", + "/api-terms", + ] + + POLICY_KEYWORDS = { + "terms": 8, + "terms of service": 12, + "terms of use": 12, + "tos": 6, + "legal": 5, + "conditions": 4, + "user agreement": 8, + "acceptable use": 10, + "developer terms": 8, + "api terms": 10, + "api policy": 9, + "robots": 3, + } + + ACTION_PATTERNS = { + "scrape": [r"\bscrap", r"\bextract", r"\bharvest", r"\bcollect data\b"], + "crawl": [r"\bcrawl", r"\bspider", r"\bbot\b", r"\bautomated access\b"], + "index": [r"\bindex", r"\bsearch engine", r"\barchive", r"\bmirror"], + "api_use": [r"\bapi\b", r"\bdeveloper\b", r"\bintegration\b"], + "monitor": [r"\bmonitor", r"\bwatch", r"\btrack", r"\bcheck periodically\b"], + "download": [r"\bdownload", r"\bexport", r"\bbulk\b"], + "automated_access": [r"\bautomation\b", r"\bscript", r"\bprogrammatic\b"], + } + + CLAUSE_PATTERNS = [ + { + "label": "hard_block", + "severity": "high", + "weight": -45, + "patterns": [ + r"may not scrape", + r"must not scrape", + r"no scraping", + r"no crawlers", + r"no robots", + r"no automated means", + r"prohibited automated access", + r"automated means.*prohibited", + r"harvest.*prohibited", + r"crawl.*prohibited", + ], + }, + { + "label": "soft_caution", + "severity": "medium", + "weight": -20, + "patterns": [ + r"prior written consent", + r"without our permission", + r"reasonable rate", + r"rate limit", + r"commercial use.*restricted", + r"access.*subject to", + r"must comply with.*api", + r"use the api", + ], + }, + { + "label": "permission", + "severity": "low", + "weight": 18, + "patterns": [ + r"permitted to access", + r"you may use.*api", + r"public api", + r"developers may access", + r"search engines may crawl", + ], + }, + ] + + def __init__(self, config: Optional[Dict[str, Any]] = None): + super().__init__(config) + self.session = requests.Session() + self.session.headers.update( + { + "User-Agent": "Skillware-TOS-Evaluator/0.1 (+https://github.com/ARPAHLS/skillware)" + } + ) + + @property + def manifest(self) -> Dict[str, Any]: + manifest_path = os.path.join(os.path.dirname(__file__), "manifest.yaml") + if os.path.exists(manifest_path): + with open(manifest_path, "r", encoding="utf-8") as f: + return yaml.safe_load(f) + return {} + + def execute(self, params: Dict[str, Any]) -> Dict[str, Any]: + normalized = self._normalize_input(params) + if "error" in normalized: + return normalized + + robots_assessment = self._evaluate_robots( + normalized["origin"], normalized["target_url"], normalized["user_agent"] + ) + policy_candidates = self._discover_policy_pages( + normalized["origin"], robots_assessment, normalized["max_terms_pages"] + ) + tos_assessment = self._evaluate_policy_pages( + normalized, policy_candidates, normalized["max_terms_pages"] + ) + + llm_assessment = None + if self._should_use_llm(normalized, robots_assessment, tos_assessment): + llm_assessment = self._run_llm_evaluator(normalized, tos_assessment) + + return self._build_final_result( + normalized, + robots_assessment, + tos_assessment, + llm_assessment, + policy_candidates, + ) + + def _normalize_input(self, params: Dict[str, Any]) -> Dict[str, Any]: + target_url = params.get("target_url", "").strip() + intended_action = params.get("intended_action", "").strip() + if not target_url: + return {"error": "target_url is required."} + if not intended_action: + return {"error": "intended_action is required."} + + parsed = urlparse(target_url) + if not parsed.scheme or not parsed.netloc: + return {"error": "target_url must be a fully qualified URL."} + + origin = f"{parsed.scheme}://{parsed.netloc}" + action_type = self._classify_action(intended_action) + user_agent = params.get("user_agent", self.session.headers["User-Agent"]) + + return { + "target_url": target_url, + "intended_action": intended_action, + "action_type": action_type, + "origin": origin, + "path": parsed.path or "/", + "user_agent": user_agent, + "fetch_mode": params.get("fetch_mode", "lightweight"), + "use_llm_evaluator": bool(params.get("use_llm_evaluator", False)), + "llm_provider": params.get("llm_provider", "gemini"), + "llm_model": params.get("llm_model", "gemini-2.5-flash-lite"), + "assume_authenticated_session": bool( + params.get("assume_authenticated_session", False) + ), + "max_terms_pages": max(1, min(int(params.get("max_terms_pages", 5)), 10)), + } + + def _classify_action(self, intended_action: str) -> str: + lowered = intended_action.lower() + for action, patterns in self.ACTION_PATTERNS.items(): + if any(re.search(pattern, lowered) for pattern in patterns): + return action + return "automated_access" + + def _evaluate_robots( + self, origin: str, target_url: str, user_agent: str + ) -> Dict[str, Any]: + robots_url = f"{origin}/robots.txt" + assessment = { + "status": "unavailable", + "robots_url": robots_url, + "can_fetch": None, + "matched_rule": "unknown", + "crawl_delay": None, + "request_rate": None, + "sitemaps": [], + "reason": "robots.txt could not be retrieved.", + } + + try: + response = self.session.get(robots_url, timeout=10) + if response.status_code >= 400: + assessment["reason"] = f"robots.txt returned HTTP {response.status_code}." + return assessment + + parser = RobotFileParser() + parser.set_url(robots_url) + parser.parse(response.text.splitlines()) + + can_fetch = parser.can_fetch(user_agent, target_url) + assessment["status"] = "parsed" + assessment["can_fetch"] = can_fetch + assessment["matched_rule"] = "allowed" if can_fetch else "disallowed" + assessment["crawl_delay"] = parser.crawl_delay(user_agent) + assessment["request_rate"] = parser.request_rate(user_agent) + assessment["sitemaps"] = parser.site_maps() or [] + assessment["reason"] = ( + "robots.txt allows the target path." + if can_fetch + else "robots.txt disallows the target path for the supplied user-agent." + ) + return assessment + except requests.RequestException as exc: + assessment["reason"] = f"robots.txt request failed: {str(exc)}" + return assessment + + def _discover_policy_pages( + self, origin: str, robots_assessment: Dict[str, Any], max_terms_pages: int + ) -> Dict[str, List[Dict[str, Any]]]: + candidates: Dict[str, Dict[str, Any]] = {} + + for path in self.WELL_KNOWN_POLICY_PATHS: + url = urljoin(origin, path) + score = 50 if "terms" in path or "acceptable" in path else 35 + candidates[url] = { + "url": url, + "score": score, + "source": "well_known_path", + "label": path.strip("/") or "legal", + } + + html_pages = [origin] + for sitemap_url in robots_assessment.get("sitemaps", [])[:2]: + html_pages.append(sitemap_url) + + for url in html_pages: + discovered = self._extract_candidate_links(url, origin) + for item in discovered: + existing = candidates.get(item["url"]) + if existing: + existing["score"] = max(existing["score"], item["score"]) + existing["source"] = f"{existing['source']},{item['source']}" + else: + candidates[item["url"]] = item + + ordered = sorted(candidates.values(), key=lambda item: item["score"], reverse=True) + return {"candidates": ordered[:max_terms_pages]} + + def _extract_candidate_links(self, page_url: str, origin: str) -> List[Dict[str, Any]]: + links: List[Dict[str, Any]] = [] + response = self._safe_get(page_url, timeout=10) + if not response or "html" not in response.headers.get("Content-Type", "").lower(): + return links + + soup = BeautifulSoup(response.text[:300000], "html.parser") + for anchor in soup.find_all("a", href=True): + href = anchor.get("href", "").strip() + text = " ".join(anchor.stripped_strings).strip().lower() + if not href: + continue + + absolute = urljoin(page_url, href) + parsed = urlparse(absolute) + if not parsed.scheme.startswith("http"): + continue + if f"{parsed.scheme}://{parsed.netloc}" != origin: + continue + + score = self._score_policy_link(absolute.lower(), text) + if score <= 0: + continue + + links.append( + { + "url": absolute, + "score": score, + "source": "link_discovery", + "label": text or parsed.path, + } + ) + + return links + + def _score_policy_link(self, href: str, text: str) -> int: + combined = f"{href} {text}".lower() + score = 0 + for keyword, weight in self.POLICY_KEYWORDS.items(): + if keyword in combined: + score += weight + return score + + def _evaluate_policy_pages( + self, + normalized: Dict[str, Any], + policy_candidates: Dict[str, List[Dict[str, Any]]], + max_terms_pages: int, + ) -> Dict[str, Any]: + pages_evaluated = [] + clause_hits = [] + + for candidate in policy_candidates.get("candidates", [])[:max_terms_pages]: + url = candidate["url"] + response = self._safe_get(url, timeout=12) + if not response: + continue + + content_type = response.headers.get("Content-Type", "").lower() + if "html" not in content_type: + pages_evaluated.append( + { + "url": url, + "status": "skipped", + "reason": f"Unsupported content type: {content_type or 'unknown'}", + } + ) + continue + + extracted_sections = self._extract_policy_sections(response.text) + page_hits = self._score_policy_sections( + normalized["action_type"], extracted_sections, url + ) + clause_hits.extend(page_hits) + pages_evaluated.append( + { + "url": url, + "status": "parsed", + "candidate_score": candidate["score"], + "matched_clauses": len(page_hits), + } + ) + + clause_hits.sort(key=lambda item: abs(item["score_delta"]), reverse=True) + aggregate_score = sum(item["score_delta"] for item in clause_hits) + if not pages_evaluated: + status = "insufficient_evidence" + summary = "No candidate Terms or policy pages could be parsed." + elif any(item["classification"] == "hard_block" for item in clause_hits): + status = "blocked" + summary = "Discovered policy text contains an explicit restriction on the requested automated behavior." + elif aggregate_score <= -20: + status = "caution" + summary = "Discovered policy text suggests restrictions or conditions on the requested automated behavior." + elif aggregate_score > 0: + status = "allowed" + summary = "Discovered policy text includes language that appears permissive for the requested behavior." + else: + status = "insufficient_evidence" + summary = "Policy pages were found, but none produced strong action-specific evidence." + + return { + "status": status, + "summary": summary, + "pages_evaluated": pages_evaluated, + "matched_clauses": clause_hits[:10], + "aggregate_score": aggregate_score, + } + + def _extract_policy_sections(self, html: str) -> List[Dict[str, str]]: + soup = BeautifulSoup(html[:400000], "html.parser") + for tag in soup(["script", "style", "noscript", "svg"]): + tag.decompose() + + body = soup.body or soup + sections: List[Dict[str, str]] = [] + current_heading = "General" + + for element in body.find_all(["h1", "h2", "h3", "p", "li"]): + text = " ".join(element.stripped_strings) + text = re.sub(r"\s+", " ", text).strip() + if not text or len(text) < 20: + continue + + if element.name in {"h1", "h2", "h3"}: + current_heading = text[:160] + continue + + sections.append({"heading": current_heading, "text": text[:1200]}) + + return sections[:200] + + def _score_policy_sections( + self, action_type: str, sections: List[Dict[str, str]], page_url: str + ) -> List[Dict[str, Any]]: + hits = [] + action_relevance_patterns = self.ACTION_PATTERNS.get(action_type, []) + for section in sections: + text_lower = section["text"].lower() + heading_lower = section["heading"].lower() + combined = f"{heading_lower} {text_lower}" + + if action_relevance_patterns and not any( + re.search(pattern, combined) for pattern in action_relevance_patterns + ): + generic_automation = re.search( + r"automated|bot|crawl|scrap|harvest|api|programmatic", combined + ) + if not generic_automation: + continue + + for clause in self.CLAUSE_PATTERNS: + for pattern in clause["patterns"]: + if re.search(pattern, combined): + hits.append( + { + "url": page_url, + "heading": section["heading"], + "snippet": section["text"], + "classification": clause["label"], + "severity": clause["severity"], + "score_delta": clause["weight"], + } + ) + break + + return hits + + def _should_use_llm( + self, + normalized: Dict[str, Any], + robots_assessment: Dict[str, Any], + tos_assessment: Dict[str, Any], + ) -> bool: + if not normalized.get("use_llm_evaluator"): + return False + if tos_assessment["status"] in {"blocked", "allowed"}: + return False + if robots_assessment.get("can_fetch") is False: + return False + return bool(tos_assessment.get("matched_clauses") or tos_assessment["status"] == "caution") + + def _run_llm_evaluator( + self, normalized: Dict[str, Any], tos_assessment: Dict[str, Any] + ) -> Dict[str, Any]: + provider = normalized["llm_provider"].lower() + if provider != "gemini": + return { + "status": "skipped", + "reason": f"Unsupported llm_provider '{normalized['llm_provider']}'.", + } + if genai is None: + return {"status": "skipped", "reason": "google-generativeai is not installed."} + + api_key = os.environ.get("GOOGLE_API_KEY") + if not api_key: + return {"status": "skipped", "reason": "GOOGLE_API_KEY is not configured."} + + genai.configure(api_key=api_key) + prompt = { + "target_url": normalized["target_url"], + "intended_action": normalized["intended_action"], + "action_type": normalized["action_type"], + "matched_clauses": tos_assessment.get("matched_clauses", [])[:6], + "task": ( + "Classify whether these policy snippets forbid, allow, or condition " + "the requested action. Return strict JSON with keys: " + "verdict, confidence_score, rationale." + ), + } + + try: + model = genai.GenerativeModel(normalized["llm_model"]) + response = model.generate_content( + json.dumps(prompt, ensure_ascii=True), + generation_config=genai.GenerationConfig( + response_mime_type="application/json", temperature=0.0 + ), + ) + parsed = json.loads(response.text) + return { + "status": "used", + "provider": provider, + "model": normalized["llm_model"], + "verdict": parsed.get("verdict", "CAUTION"), + "confidence_score": float(parsed.get("confidence_score", 0.5)), + "rationale": parsed.get("rationale", "No rationale returned."), + } + except Exception as exc: + return {"status": "error", "reason": f"LLM evaluator failed: {str(exc)}"} + + def _build_final_result( + self, + normalized: Dict[str, Any], + robots_assessment: Dict[str, Any], + tos_assessment: Dict[str, Any], + llm_assessment: Optional[Dict[str, Any]], + policy_candidates: Dict[str, List[Dict[str, Any]]], + ) -> Dict[str, Any]: + verdict = "INSUFFICIENT_EVIDENCE" + confidence_score = 0.35 + reason = "Insufficient policy evidence to safely approve the requested action." + recommended_next_step = "Review the discovered policy pages manually before proceeding." + + if robots_assessment.get("can_fetch") is False: + verdict = "UNSAFE" + confidence_score = 0.98 + reason = robots_assessment["reason"] + recommended_next_step = ( + "Do not automate access to this path unless you have explicit permission." + ) + elif tos_assessment["status"] == "blocked": + verdict = "UNSAFE" + confidence_score = 0.9 + reason = tos_assessment["summary"] + recommended_next_step = "Avoid the requested action or obtain explicit written permission." + elif tos_assessment["status"] == "caution": + verdict = "CAUTION" + confidence_score = 0.65 + reason = tos_assessment["summary"] + recommended_next_step = "Prefer an official API or documented integration path if one exists." + elif tos_assessment["status"] == "allowed" and robots_assessment.get("can_fetch") is not False: + verdict = "SAFE" + confidence_score = 0.72 + reason = tos_assessment["summary"] + recommended_next_step = "Proceed conservatively and continue honoring crawl delays and rate limits." + + if llm_assessment and llm_assessment.get("status") == "used": + verdict = llm_assessment.get("verdict", verdict) + confidence_score = max( + confidence_score, + llm_assessment.get("confidence_score", confidence_score), + ) + reason = llm_assessment.get("rationale", reason) + + evidence = [] + if robots_assessment.get("reason"): + evidence.append( + { + "source": robots_assessment.get("robots_url"), + "type": "robots", + "snippet": robots_assessment["reason"], + } + ) + for clause in tos_assessment.get("matched_clauses", [])[:5]: + evidence.append( + { + "source": clause["url"], + "type": clause["classification"], + "heading": clause["heading"], + "snippet": clause["snippet"], + } + ) + + return { + "is_safe_to_proceed": verdict == "SAFE", + "confidence_score": round(float(confidence_score), 2), + "verdict": verdict, + "reason": reason, + "recommended_next_step": recommended_next_step, + "action_type": normalized["action_type"], + "robots_assessment": robots_assessment, + "tos_assessment": tos_assessment, + "llm_assessment": llm_assessment or {"status": "not_used"}, + "discovered_policy_urls": { + "candidates": [item["url"] for item in policy_candidates.get("candidates", [])] + }, + "evidence": evidence, + } + + def _safe_get(self, url: str, timeout: int = 10) -> Optional[requests.Response]: + try: + response = self.session.get(url, timeout=timeout, allow_redirects=True) + if response.status_code >= 400: + return None + return response + except requests.RequestException: + return None diff --git a/skills/compliance/tos_evaluator/test_skill.py b/skills/compliance/tos_evaluator/test_skill.py new file mode 100644 index 0000000..e4b9bf2 --- /dev/null +++ b/skills/compliance/tos_evaluator/test_skill.py @@ -0,0 +1,30 @@ +import os + +import pytest +import yaml + +from .skill import TOSEvaluatorSkill + + +@pytest.fixture +def skill(): + return TOSEvaluatorSkill() + + +@pytest.fixture +def manifest(): + manifest_path = os.path.join(os.path.dirname(__file__), "manifest.yaml") + with open(manifest_path, "r", encoding="utf-8") as f: + return yaml.safe_load(f) + + +def test_skill_manifest_consistency(skill, manifest): + skill_manifest = skill.manifest + assert skill_manifest["name"] == manifest["name"] + assert skill_manifest["version"] == manifest["version"] + + +def test_skill_execution_requires_inputs(skill): + result = skill.execute({}) + assert "error" in result + assert "target_url" in result["error"] diff --git a/tests/skills/compliance/test_tos_evaluator.py b/tests/skills/compliance/test_tos_evaluator.py new file mode 100644 index 0000000..6351a7b --- /dev/null +++ b/tests/skills/compliance/test_tos_evaluator.py @@ -0,0 +1,203 @@ +import json +from unittest.mock import MagicMock, patch + +from skillware.core.loader import SkillLoader + + +def get_skill(): + bundle = SkillLoader.load_skill("compliance/tos_evaluator") + return bundle["module"].TOSEvaluatorSkill() + + +def make_response(text="", status_code=200, content_type="text/html; charset=utf-8"): + response = MagicMock() + response.text = text + response.status_code = status_code + response.headers = {"Content-Type": content_type} + return response + + +def test_tos_evaluator_manifest_loads(): + bundle = SkillLoader.load_skill("compliance/tos_evaluator") + assert bundle["manifest"]["name"] == "compliance/tos_evaluator" + assert "target_url" in bundle["manifest"]["parameters"]["properties"] + assert "intended_action" in bundle["manifest"]["parameters"]["properties"] + + +@patch("skills.compliance.tos_evaluator.skill.requests.Session.get") +def test_tos_evaluator_robots_disallow_returns_unsafe(mock_get): + mock_get.return_value = make_response( + text="User-agent: *\nDisallow: /\n", content_type="text/plain" + ) + + skill = get_skill() + result = skill.execute( + { + "target_url": "https://hackernoon.com/tagged/ai", + "intended_action": "scrape pricing data", + } + ) + + assert result["verdict"] == "UNSAFE" + assert result["robots_assessment"]["can_fetch"] is False + + +@patch("skills.compliance.tos_evaluator.skill.requests.Session.get") +def test_tos_evaluator_missing_robots_and_terms_returns_insufficient_evidence(mock_get): + mock_get.return_value = make_response(status_code=404, text="not found") + + skill = get_skill() + result = skill.execute( + { + "target_url": "https://hackernoon.com/archive", + "intended_action": "index documentation pages", + "max_terms_pages": 2, + } + ) + + assert result["verdict"] == "INSUFFICIENT_EVIDENCE" + assert result["tos_assessment"]["status"] == "insufficient_evidence" + + +@patch("skills.compliance.tos_evaluator.skill.requests.Session.get") +def test_tos_evaluator_policy_clause_blocks_scraping(mock_get): + html = """ + + + Terms of Service +

Terms of Service

+

Automated Access

+

You may not scrape, crawl, or use automated means to extract content from this site.

+ + + """ + + def side_effect(url, **kwargs): + if url.endswith("/robots.txt"): + return make_response(text="User-agent: *\nAllow: /\n", content_type="text/plain") + return make_response(text=html) + + mock_get.side_effect = side_effect + + skill = get_skill() + result = skill.execute( + { + "target_url": "https://hackernoon.com/tagged/startups", + "intended_action": "scrape product listings", + } + ) + + assert result["verdict"] == "UNSAFE" + assert result["tos_assessment"]["status"] == "blocked" + assert result["evidence"] + + +@patch("skills.compliance.tos_evaluator.skill.requests.Session.get") +def test_tos_evaluator_api_only_language_returns_caution(mock_get): + html = """ + + + API Terms +

Developer Terms

+

Automated access is permitted only through our official API and is subject to reasonable rate limits.

+ + + """ + + def side_effect(url, **kwargs): + if url.endswith("/robots.txt"): + return make_response(text="User-agent: *\nAllow: /\n", content_type="text/plain") + return make_response(text=html) + + mock_get.side_effect = side_effect + + skill = get_skill() + result = skill.execute( + { + "target_url": "https://hackernoon.com/api", + "intended_action": "crawl catalog pages with a bot", + } + ) + + assert result["verdict"] == "CAUTION" + assert "API" in result["recommended_next_step"] + + +@patch("skills.compliance.tos_evaluator.skill.requests.Session.get") +def test_tos_evaluator_allowed_policy_can_return_safe(mock_get): + html = """ + + + Developer Terms +

Developer Terms

+

Developers may access our public API for automated integrations.

+ + + """ + + def side_effect(url, **kwargs): + if url.endswith("/robots.txt"): + return make_response(text="User-agent: *\nAllow: /\n", content_type="text/plain") + return make_response(text=html) + + mock_get.side_effect = side_effect + + skill = get_skill() + result = skill.execute( + { + "target_url": "https://hackernoon.com/api/v1/stories", + "intended_action": "use the api for automated integration", + } + ) + + assert result["verdict"] == "SAFE" + assert result["is_safe_to_proceed"] is True + + +@patch("skills.compliance.tos_evaluator.skill.requests.Session.get") +def test_tos_evaluator_llm_fallback_is_mockable(mock_get): + html = """ + + + Terms +

Terms

+

Automated access may be allowed only with prior written consent.

+ + + """ + + def side_effect(url, **kwargs): + if url.endswith("/robots.txt"): + return make_response(text="User-agent: *\nAllow: /\n", content_type="text/plain") + return make_response(text=html) + + mock_get.side_effect = side_effect + + bundle = SkillLoader.load_skill("compliance/tos_evaluator") + mock_genai = MagicMock() + mock_model = MagicMock() + mock_model.generate_content.return_value.text = json.dumps( + { + "verdict": "CAUTION", + "confidence_score": 0.81, + "rationale": "The clause conditions automation on prior written consent.", + } + ) + mock_genai.GenerativeModel.return_value = mock_model + mock_genai.GenerationConfig.return_value = object() + bundle["module"].genai = mock_genai + skill = bundle["module"].TOSEvaluatorSkill() + with patch.dict("os.environ", {"GOOGLE_API_KEY": "test-key"}): + result = skill.execute( + { + "target_url": "https://hackernoon.com/tagged/devops", + "intended_action": "crawl documentation pages", + "use_llm_evaluator": True, + "llm_provider": "gemini", + "llm_model": "gemini-2.5-flash-lite", + } + ) + + assert result["verdict"] == "CAUTION" + assert result["llm_assessment"]["status"] == "used" + assert result["llm_assessment"]["model"] == "gemini-2.5-flash-lite"