diff --git a/examples/README.md b/examples/README.md index 82ccbe1b..1d8c9bdc 100644 --- a/examples/README.md +++ b/examples/README.md @@ -36,8 +36,6 @@ See [ollama/docs/api.md](https://github.com/ollama/ollama/blob/main/docs/api.md) - [gpt-oss-tools.py](gpt-oss-tools.py) - [gpt-oss-tools-stream.py](gpt-oss-tools-stream.py) -- [gpt-oss-tools-browser.py](gpt-oss-tools-browser.py) - Using browser research tools with gpt-oss -- [gpt-oss-tools-browser-stream.py](gpt-oss-tools-browser-stream.py) - Using browser research tools with gpt-oss, with streaming enabled ### Web search @@ -48,6 +46,7 @@ export OLLAMA_API_KEY="your_api_key_here" ``` - [web-search.py](web-search.py) +- [web-search-gpt-oss.py](web-search-gpt-oss.py) - Using browser research tools with gpt-oss #### MCP server diff --git a/examples/gpt-oss-tools-browser-stream.py b/examples/gpt-oss-tools-browser-stream.py deleted file mode 100644 index 8bbab537..00000000 --- a/examples/gpt-oss-tools-browser-stream.py +++ /dev/null @@ -1,198 +0,0 @@ -# /// script -# requires-python = ">=3.11" -# dependencies = [ -# "gpt-oss", -# "ollama", -# "rich", -# ] -# /// - -import asyncio -import json -from typing import Iterator, Optional - -from gpt_oss.tools.simple_browser import ExaBackend, SimpleBrowserTool -from openai_harmony import Author, Role, TextContent -from openai_harmony import Message as HarmonyMessage -from rich import print - -from ollama import Client -from ollama._types import ChatResponse - -_backend = ExaBackend(source='web') -_browser_tool = SimpleBrowserTool(backend=_backend) - - -def heading(text): - print(text) - print('=' * (len(text) + 3)) - - -async def _browser_search_async(query: str, topn: int = 10, source: str | None = None) -> str: - # map Ollama message to Harmony format - harmony_message = HarmonyMessage( - author=Author(role=Role.USER), - content=[TextContent(text=json.dumps({'query': query, 'topn': topn}))], - recipient='browser.search', - ) - - result_text: str = '' - async for response in _browser_tool._process(harmony_message): - if response.content: - for content in response.content: - if isinstance(content, TextContent): - result_text += content.text - return result_text or f'No results for query: {query}' - - -async def _browser_open_async(id: int | str = -1, cursor: int = -1, loc: int = -1, num_lines: int = -1, *, view_source: bool = False, source: str | None = None) -> str: - payload = {'id': id, 'cursor': cursor, 'loc': loc, 'num_lines': num_lines, 'view_source': view_source, 'source': source} - - harmony_message = HarmonyMessage( - author=Author(role=Role.USER), - content=[TextContent(text=json.dumps(payload))], - recipient='browser.open', - ) - - result_text: str = '' - async for response in _browser_tool._process(harmony_message): - if response.content: - for content in response.content: - if isinstance(content, TextContent): - result_text += content.text - return result_text or f'Could not open: {id}' - - -async def _browser_find_async(pattern: str, cursor: int = -1) -> str: - payload = {'pattern': pattern, 'cursor': cursor} - - harmony_message = HarmonyMessage( - author=Author(role=Role.USER), - content=[TextContent(text=json.dumps(payload))], - recipient='browser.find', - ) - - result_text: str = '' - async for response in _browser_tool._process(harmony_message): - if response.content: - for content in response.content: - if isinstance(content, TextContent): - result_text += content.text - return result_text or f'Pattern not found: {pattern}' - - -def browser_search(query: str, topn: int = 10, source: Optional[str] = None) -> str: - return asyncio.run(_browser_search_async(query=query, topn=topn, source=source)) - - -def browser_open(id: int | str = -1, cursor: int = -1, loc: int = -1, num_lines: int = -1, *, view_source: bool = False, source: Optional[str] = None) -> str: - return asyncio.run(_browser_open_async(id=id, cursor=cursor, loc=loc, num_lines=num_lines, view_source=view_source, source=source)) - - -def browser_find(pattern: str, cursor: int = -1) -> str: - return asyncio.run(_browser_find_async(pattern=pattern, cursor=cursor)) - - -# Schema definitions for each browser tool -browser_search_schema = { - 'type': 'function', - 'function': { - 'name': 'browser.search', - }, -} - -browser_open_schema = { - 'type': 'function', - 'function': { - 'name': 'browser.open', - }, -} - -browser_find_schema = { - 'type': 'function', - 'function': { - 'name': 'browser.find', - }, -} - -available_tools = { - 'browser.search': browser_search, - 'browser.open': browser_open, - 'browser.find': browser_find, -} - - -model = 'gpt-oss:20b' -print('Model: ', model, '\n') - -prompt = 'What is Ollama?' -print('You: ', prompt, '\n') -messages = [{'role': 'user', 'content': prompt}] - -client = Client() - -# gpt-oss can call tools while "thinking" -# a loop is needed to call the tools and get the results -final = True -while True: - response_stream: Iterator[ChatResponse] = client.chat( - model=model, - messages=messages, - tools=[browser_search_schema, browser_open_schema, browser_find_schema], - options={'num_ctx': 8192}, # 8192 is the recommended lower limit for the context window - stream=True, - ) - - tool_calls = [] - thinking = '' - content = '' - - for chunk in response_stream: - if chunk.message.tool_calls: - tool_calls.extend(chunk.message.tool_calls) - - if chunk.message.content: - if not (chunk.message.thinking or chunk.message.thinking == '') and final: - heading('\n\nFinal result: ') - final = False - print(chunk.message.content, end='', flush=True) - - if chunk.message.thinking: - thinking += chunk.message.thinking - print(chunk.message.thinking, end='', flush=True) - - if thinking != '': - messages.append({'role': 'assistant', 'content': thinking, 'tool_calls': tool_calls}) - - print() - - if tool_calls: - for tool_call in tool_calls: - tool_name = tool_call.function.name - args = tool_call.function.arguments or {} - function_to_call = available_tools.get(tool_name) - - if function_to_call: - heading(f'\nCalling tool: {tool_name}') - if args: - print(f'Arguments: {args}') - - try: - result = function_to_call(**args) - print(f'Tool result: {result[:200]}') - if len(result) > 200: - heading('... [truncated]') - print() - - result_message = {'role': 'tool', 'content': result, 'tool_name': tool_name} - messages.append(result_message) - - except Exception as e: - err = f'Error from {tool_name}: {e}' - print(err) - messages.append({'role': 'tool', 'content': err, 'tool_name': tool_name}) - else: - print(f'Tool {tool_name} not found') - else: - # no more tool calls, we can stop the loop - break diff --git a/examples/gpt-oss-tools-browser.py b/examples/gpt-oss-tools-browser.py deleted file mode 100644 index 8dcc1439..00000000 --- a/examples/gpt-oss-tools-browser.py +++ /dev/null @@ -1,175 +0,0 @@ -# /// script -# requires-python = ">=3.11" -# dependencies = [ -# "gpt-oss", -# "ollama", -# "rich", -# ] -# /// - -import asyncio -import json -from typing import Optional - -from gpt_oss.tools.simple_browser import ExaBackend, SimpleBrowserTool -from openai_harmony import Author, Role, TextContent -from openai_harmony import Message as HarmonyMessage - -from ollama import Client - -_backend = ExaBackend(source='web') -_browser_tool = SimpleBrowserTool(backend=_backend) - - -def heading(text): - print(text) - print('=' * (len(text) + 3)) - - -async def _browser_search_async(query: str, topn: int = 10, source: str | None = None) -> str: - # map Ollama message to Harmony format - harmony_message = HarmonyMessage( - author=Author(role=Role.USER), - content=[TextContent(text=json.dumps({'query': query, 'topn': topn}))], - recipient='browser.search', - ) - - result_text: str = '' - async for response in _browser_tool._process(harmony_message): - if response.content: - for content in response.content: - if isinstance(content, TextContent): - result_text += content.text - return result_text or f'No results for query: {query}' - - -async def _browser_open_async(id: int | str = -1, cursor: int = -1, loc: int = -1, num_lines: int = -1, *, view_source: bool = False, source: str | None = None) -> str: - payload = {'id': id, 'cursor': cursor, 'loc': loc, 'num_lines': num_lines, 'view_source': view_source, 'source': source} - - harmony_message = HarmonyMessage( - author=Author(role=Role.USER), - content=[TextContent(text=json.dumps(payload))], - recipient='browser.open', - ) - - result_text: str = '' - async for response in _browser_tool._process(harmony_message): - if response.content: - for content in response.content: - if isinstance(content, TextContent): - result_text += content.text - return result_text or f'Could not open: {id}' - - -async def _browser_find_async(pattern: str, cursor: int = -1) -> str: - payload = {'pattern': pattern, 'cursor': cursor} - - harmony_message = HarmonyMessage( - author=Author(role=Role.USER), - content=[TextContent(text=json.dumps(payload))], - recipient='browser.find', - ) - - result_text: str = '' - async for response in _browser_tool._process(harmony_message): - if response.content: - for content in response.content: - if isinstance(content, TextContent): - result_text += content.text - return result_text or f'Pattern not found: {pattern}' - - -def browser_search(query: str, topn: int = 10, source: Optional[str] = None) -> str: - return asyncio.run(_browser_search_async(query=query, topn=topn, source=source)) - - -def browser_open(id: int | str = -1, cursor: int = -1, loc: int = -1, num_lines: int = -1, *, view_source: bool = False, source: Optional[str] = None) -> str: - return asyncio.run(_browser_open_async(id=id, cursor=cursor, loc=loc, num_lines=num_lines, view_source=view_source, source=source)) - - -def browser_find(pattern: str, cursor: int = -1) -> str: - return asyncio.run(_browser_find_async(pattern=pattern, cursor=cursor)) - - -# Schema definitions for each browser tool -browser_search_schema = { - 'type': 'function', - 'function': { - 'name': 'browser.search', - }, -} - -browser_open_schema = { - 'type': 'function', - 'function': { - 'name': 'browser.open', - }, -} - -browser_find_schema = { - 'type': 'function', - 'function': { - 'name': 'browser.find', - }, -} - -available_tools = { - 'browser.search': browser_search, - 'browser.open': browser_open, - 'browser.find': browser_find, -} - - -model = 'gpt-oss:20b' -print('Model: ', model, '\n') - -prompt = 'What is Ollama?' -print('You: ', prompt, '\n') -messages = [{'role': 'user', 'content': prompt}] - -client = Client() -while True: - response = client.chat( - model=model, - messages=messages, - tools=[browser_search_schema, browser_open_schema, browser_find_schema], - options={'num_ctx': 8192}, # 8192 is the recommended lower limit for the context window - ) - - if hasattr(response.message, 'thinking') and response.message.thinking: - heading('Thinking') - print(response.message.thinking.strip() + '\n') - - if hasattr(response.message, 'content') and response.message.content: - heading('Assistant') - print(response.message.content.strip() + '\n') - - # add message to chat history - messages.append(response.message) - - if response.message.tool_calls: - for tool_call in response.message.tool_calls: - tool_name = tool_call.function.name - args = tool_call.function.arguments or {} - function_to_call = available_tools.get(tool_name) - if not function_to_call: - print(f'Unknown tool: {tool_name}') - continue - - try: - result = function_to_call(**args) - heading(f'Tool: {tool_name}') - if args: - print(f'Arguments: {args}') - print(result[:200]) - if len(result) > 200: - print('... [truncated]') - print() - messages.append({'role': 'tool', 'content': result, 'tool_name': tool_name}) - except Exception as e: - err = f'Error from {tool_name}: {e}' - print(err) - messages.append({'role': 'tool', 'content': err, 'tool_name': tool_name}) - else: - # break on no more tool calls - break diff --git a/examples/web-search-gpt-oss.py b/examples/web-search-gpt-oss.py new file mode 100644 index 00000000..7ba8803b --- /dev/null +++ b/examples/web-search-gpt-oss.py @@ -0,0 +1,99 @@ +# /// script +# requires-python = ">=3.11" +# dependencies = [ +# "ollama", +# ] +# /// +from typing import Any, Dict, List + +from web_search_gpt_oss_helper import Browser + +from ollama import Client + + +def main() -> None: + client = Client() + browser = Browser(initial_state=None, client=client) + + def browser_search(query: str, topn: int = 10) -> str: + return browser.search(query=query, topn=topn)['pageText'] + + def browser_open(id: int | str | None = None, cursor: int = -1, loc: int = -1, num_lines: int = -1) -> str: + return browser.open(id=id, cursor=cursor, loc=loc, num_lines=num_lines)['pageText'] + + def browser_find(pattern: str, cursor: int = -1, **_: Any) -> str: + return browser.find(pattern=pattern, cursor=cursor)['pageText'] + + browser_search_schema = { + 'type': 'function', + 'function': { + 'name': 'browser.search', + }, + } + + browser_open_schema = { + 'type': 'function', + 'function': { + 'name': 'browser.open', + }, + } + + browser_find_schema = { + 'type': 'function', + 'function': { + 'name': 'browser.find', + }, + } + + available_tools = { + 'browser.search': browser_search, + 'browser.open': browser_open, + 'browser.find': browser_find, + } + + query = "what is ollama's new engine" + print('Prompt:', query, '\n') + + messages: List[Dict[str, Any]] = [{'role': 'user', 'content': query}] + + while True: + resp = client.chat( + model='gpt-oss:120b-cloud', + messages=messages, + tools=[browser_search_schema, browser_open_schema, browser_find_schema], + think=True, + ) + + if resp.message.thinking: + print('Thinking:\n========\n') + print(resp.message.thinking + '\n') + + if resp.message.content: + print('Response:\n========\n') + print(resp.message.content + '\n') + + messages.append(resp.message) + + if not resp.message.tool_calls: + break + + for tc in resp.message.tool_calls: + tool_name = tc.function.name + args = tc.function.arguments or {} + print(f'Tool name: {tool_name}, args: {args}') + fn = available_tools.get(tool_name) + if not fn: + messages.append({'role': 'tool', 'content': f'Tool {tool_name} not found', 'tool_name': tool_name}) + continue + + try: + result_text = fn(**args) + print('Result: ', result_text[:200] + '...') + except Exception as e: + result_text = f'Error from {tool_name}: {e}' + + messages.append({'role': 'tool', 'content': result_text, 'tool_name': tool_name}) + + +if __name__ == '__main__': + main() diff --git a/examples/web-search.py b/examples/web-search.py index fde367a0..72d08f25 100644 --- a/examples/web-search.py +++ b/examples/web-search.py @@ -49,7 +49,7 @@ def format_tool_results( messages = [{'role': 'user', 'content': query}] while True: - response = chat(model='qwen3', messages=messages, tools=[web_search, web_fetch], think=True) + response = chat(model='deepseek-v3.1:671b-cloud', messages=messages, tools=[web_search, web_fetch], think=True) if response.message.thinking: print('Thinking: ') print(response.message.thinking + '\n\n') diff --git a/examples/web_search_gpt_oss_helper.py b/examples/web_search_gpt_oss_helper.py new file mode 100644 index 00000000..e3e064c9 --- /dev/null +++ b/examples/web_search_gpt_oss_helper.py @@ -0,0 +1,514 @@ +from __future__ import annotations + +import re +from dataclasses import dataclass, field +from datetime import datetime +from typing import Any, Dict, List, Optional, Protocol, Tuple +from urllib.parse import urlparse + +from ollama import Client + + +@dataclass +class Page: + url: str + title: str + text: str + lines: List[str] + links: Dict[int, str] + fetched_at: datetime + + +@dataclass +class BrowserStateData: + page_stack: List[str] = field(default_factory=list) + view_tokens: int = 1024 + url_to_page: Dict[str, Page] = field(default_factory=dict) + + +@dataclass +class WebSearchResult: + title: str + url: str + content: Dict[str, str] + + +class SearchClient(Protocol): + def search(self, queries: List[str], max_results: Optional[int] = None): ... + + +class CrawlClient(Protocol): + def crawl(self, urls: List[str]): ... + + +# ---- Constants --------------------------------------------------------------- + +DEFAULT_VIEW_TOKENS = 1024 +CAPPED_TOOL_CONTENT_LEN = 8000 + +# ---- Helpers ---------------------------------------------------------------- + + +def cap_tool_content(text: str) -> str: + if not text: + return text + if len(text) <= CAPPED_TOOL_CONTENT_LEN: + return text + if CAPPED_TOOL_CONTENT_LEN <= 1: + return text[:CAPPED_TOOL_CONTENT_LEN] + return text[: CAPPED_TOOL_CONTENT_LEN - 1] + '…' + + +def _safe_domain(u: str) -> str: + try: + parsed = urlparse(u) + host = parsed.netloc or u + return host.replace('www.', '') if host else u + except Exception: + return u + + +# ---- BrowserState ------------------------------------------------------------ + + +class BrowserState: + def __init__(self, initial_state: Optional[BrowserStateData] = None): + self._data = initial_state or BrowserStateData(view_tokens=DEFAULT_VIEW_TOKENS) + + def get_data(self) -> BrowserStateData: + return self._data + + def set_data(self, data: BrowserStateData) -> None: + self._data = data + + +# ---- Browser ---------------------------------------------------------------- + + +class Browser: + def __init__( + self, + initial_state: Optional[BrowserStateData] = None, + client: Optional[Client] = None, + ): + self.state = BrowserState(initial_state) + self._client: Optional[Client] = client + + def set_client(self, client: Client) -> None: + self._client = client + + def get_state(self) -> BrowserStateData: + return self.state.get_data() + + # ---- internal utils ---- + + def _save_page(self, page: Page) -> None: + data = self.state.get_data() + data.url_to_page[page.url] = page + data.page_stack.append(page.url) + self.state.set_data(data) + + def _page_from_stack(self, url: str) -> Page: + data = self.state.get_data() + page = data.url_to_page.get(url) + if not page: + raise ValueError(f'Page not found for url {url}') + return page + + def _join_lines_with_numbers(self, lines: List[str]) -> str: + result = [] + for i, line in enumerate(lines): + result.append(f'L{i}: {line}') + return '\n'.join(result) + + def _wrap_lines(self, text: str, width: int = 80) -> List[str]: + if width <= 0: + width = 80 + src_lines = text.split('\n') + wrapped: List[str] = [] + for line in src_lines: + if line == '': + wrapped.append('') + elif len(line) <= width: + wrapped.append(line) + else: + words = re.split(r'\s+', line) + if not words: + wrapped.append(line) + continue + curr = '' + for w in words: + test = (curr + ' ' + w) if curr else w + if len(test) > width and curr: + wrapped.append(curr) + curr = w + else: + curr = test + if curr: + wrapped.append(curr) + return wrapped + + def _process_markdown_links(self, text: str) -> Tuple[str, Dict[int, str]]: + links: Dict[int, str] = {} + link_id = 0 + + multiline_pattern = re.compile(r'\[([^\]]+)\]\s*\n\s*\(([^)]+)\)') + text = multiline_pattern.sub(lambda m: f'[{m.group(1)}]({m.group(2)})', text) + text = re.sub(r'\s+', ' ', text) + + link_pattern = re.compile(r'\[([^\]]+)\]\(([^)]+)\)') + + def _repl(m: re.Match) -> str: + nonlocal link_id + link_text = m.group(1).strip() + link_url = m.group(2).strip() + domain = _safe_domain(link_url) + formatted = f'【{link_id}†{link_text}†{domain}】' + links[link_id] = link_url + link_id += 1 + return formatted + + processed = link_pattern.sub(_repl, text) + return processed, links + + def _get_end_loc(self, loc: int, num_lines: int, total_lines: int, lines: List[str]) -> int: + if num_lines <= 0: + txt = self._join_lines_with_numbers(lines[loc:]) + data = self.state.get_data() + chars_per_token = 4 + max_chars = min(data.view_tokens * chars_per_token, len(txt)) + num_lines = txt[:max_chars].count('\n') + 1 + return min(loc + num_lines, total_lines) + + def _display_page(self, page: Page, cursor: int, loc: int, num_lines: int) -> str: + total_lines = len(page.lines) or 0 + if total_lines == 0: + page.lines = [''] + total_lines = 1 + + if loc != loc or loc < 0: + loc = 0 + elif loc >= total_lines: + loc = max(0, total_lines - 1) + + end_loc = self._get_end_loc(loc, num_lines, total_lines, page.lines) + + header = f'[{cursor}] {page.title}' + header += f'({page.url})\n' if page.url else '\n' + header += f'**viewing lines [{loc} - {end_loc - 1}] of {total_lines - 1}**\n\n' + + body_lines = [] + for i in range(loc, end_loc): + body_lines.append(f'L{i}: {page.lines[i]}') + + return header + '\n'.join(body_lines) + + # ---- page builders ---- + + def _build_search_results_page_collection(self, query: str, results: Dict[str, Any]) -> Page: + page = Page( + url=f'search_results_{query}', + title=query, + text='', + lines=[], + links={}, + fetched_at=datetime.utcnow(), + ) + + tb = [] + tb.append('') + tb.append('# Search Results') + tb.append('') + + link_idx = 0 + for query_results in results.get('results', {}).values(): + for result in query_results: + domain = _safe_domain(result.get('url', '')) + link_fmt = f'* 【{link_idx}†{result.get("title", "")}†{domain}】' + tb.append(link_fmt) + + raw_snip = result.get('content') or '' + capped = (raw_snip[:400] + '…') if len(raw_snip) > 400 else raw_snip + cleaned = re.sub(r'\d{40,}', lambda m: m.group(0)[:40] + '…', capped) + cleaned = re.sub(r'\s{3,}', ' ', cleaned) + tb.append(cleaned) + page.links[link_idx] = result.get('url', '') + link_idx += 1 + + page.text = '\n'.join(tb) + page.lines = self._wrap_lines(page.text, 80) + return page + + def _build_search_result_page(self, result: WebSearchResult, link_idx: int) -> Page: + page = Page( + url=result.url, + title=result.title, + text='', + lines=[], + links={}, + fetched_at=datetime.utcnow(), + ) + + link_fmt = f'【{link_idx}†{result.title}】\n' + preview = link_fmt + f'URL: {result.url}\n' + full_text = result.content.get('fullText', '') if result.content else '' + preview += full_text[:300] + '\n\n' + + if not full_text: + page.links[link_idx] = result.url + + if full_text: + raw = f'URL: {result.url}\n{full_text}' + processed, links = self._process_markdown_links(raw) + page.text = processed + page.links = links + else: + page.text = preview + + page.lines = self._wrap_lines(page.text, 80) + return page + + def _build_page_from_fetch(self, requested_url: str, fetch_response: Dict[str, Any]) -> Page: + page = Page( + url=requested_url, + title=requested_url, + text='', + lines=[], + links={}, + fetched_at=datetime.utcnow(), + ) + + for url, url_results in fetch_response.get('results', {}).items(): + if url_results: + r0 = url_results[0] + if r0.get('content'): + page.text = r0['content'] + if r0.get('title'): + page.title = r0['title'] + page.url = url + break + + if not page.text: + page.text = 'No content could be extracted from this page.' + else: + page.text = f'URL: {page.url}\n{page.text}' + + processed, links = self._process_markdown_links(page.text) + page.text = processed + page.links = links + page.lines = self._wrap_lines(page.text, 80) + return page + + def _build_find_results_page(self, pattern: str, page: Page) -> Page: + find_page = Page( + url=f'find_results_{pattern}', + title=f'Find results for text: `{pattern}` in `{page.title}`', + text='', + lines=[], + links={}, + fetched_at=datetime.utcnow(), + ) + + max_results = 50 + num_show_lines = 4 + pattern_lower = pattern.lower() + + result_chunks: List[str] = [] + line_idx = 0 + while line_idx < len(page.lines): + line = page.lines[line_idx] + if pattern_lower not in line.lower(): + line_idx += 1 + continue + + end_line = min(line_idx + num_show_lines, len(page.lines)) + snippet = '\n'.join(page.lines[line_idx:end_line]) + link_fmt = f'【{len(result_chunks)}†match at L{line_idx}】' + result_chunks.append(f'{link_fmt}\n{snippet}') + + if len(result_chunks) >= max_results: + break + line_idx += num_show_lines + + if not result_chunks: + find_page.text = f'No `find` results for pattern: `{pattern}`' + else: + find_page.text = '\n\n'.join(result_chunks) + + find_page.lines = self._wrap_lines(find_page.text, 80) + return find_page + + # ---- public API: search / open / find ------------------------------------ + + def search(self, *, query: str, topn: int = 5) -> Dict[str, Any]: + if not self._client: + raise RuntimeError('Client not provided') + + resp = self._client.web_search(query, max_results=topn) + + normalized: Dict[str, Any] = {'results': {}} + rows: List[Dict[str, str]] = [] + for item in resp.results: + content = item.content or '' + rows.append( + { + 'title': item.title, + 'url': item.url, + 'content': content, + } + ) + normalized['results'][query] = rows + + search_page = self._build_search_results_page_collection(query, normalized) + self._save_page(search_page) + cursor = len(self.get_state().page_stack) - 1 + + for query_results in normalized.get('results', {}).values(): + for i, r in enumerate(query_results): + ws = WebSearchResult( + title=r.get('title', ''), + url=r.get('url', ''), + content={'fullText': r.get('content', '') or ''}, + ) + result_page = self._build_search_result_page(ws, i + 1) + data = self.get_state() + data.url_to_page[result_page.url] = result_page + self.state.set_data(data) + + page_text = self._display_page(search_page, cursor, loc=0, num_lines=-1) + return {'state': self.get_state(), 'pageText': cap_tool_content(page_text)} + + def open( + self, + *, + id: Optional[str | int] = None, + cursor: int = -1, + loc: int = 0, + num_lines: int = -1, + ) -> Dict[str, Any]: + if not self._client: + raise RuntimeError('Client not provided') + + state = self.get_state() + + if isinstance(id, str): + url = id + if url in state.url_to_page: + self._save_page(state.url_to_page[url]) + cursor = len(self.get_state().page_stack) - 1 + page_text = self._display_page(state.url_to_page[url], cursor, loc, num_lines) + return {'state': self.get_state(), 'pageText': cap_tool_content(page_text)} + + fetch_response = self._client.web_fetch(url) + normalized: Dict[str, Any] = { + 'results': { + url: [ + { + 'title': fetch_response.title or url, + 'url': url, + 'content': fetch_response.content or '', + } + ] + } + } + new_page = self._build_page_from_fetch(url, normalized) + self._save_page(new_page) + cursor = len(self.get_state().page_stack) - 1 + page_text = self._display_page(new_page, cursor, loc, num_lines) + return {'state': self.get_state(), 'pageText': cap_tool_content(page_text)} + + # Resolve current page from stack only if needed (int id or no id) + page: Optional[Page] = None + if cursor >= 0: + if state.page_stack: + if cursor >= len(state.page_stack): + cursor = max(0, len(state.page_stack) - 1) + page = self._page_from_stack(state.page_stack[cursor]) + else: + page = None + else: + if state.page_stack: + page = self._page_from_stack(state.page_stack[-1]) + + if isinstance(id, int): + if not page: + raise RuntimeError('No current page to resolve link from') + + link_url = page.links.get(id) + if not link_url: + err = Page( + url=f'invalid_link_{id}', + title=f'No link with id {id} on `{page.title}`', + text='', + lines=[], + links={}, + fetched_at=datetime.utcnow(), + ) + available = sorted(page.links.keys()) + available_list = ', '.join(map(str, available)) if available else '(none)' + err.text = '\n'.join( + [ + f'Requested link id: {id}', + f'Current page: {page.title}', + f'Available link ids on this page: {available_list}', + '', + 'Tips:', + '- To scroll this page, call browser_open with { loc, num_lines } (no id).', + '- To open a result from a search results page, pass the correct { cursor, id }.', + ] + ) + err.lines = self._wrap_lines(err.text, 80) + self._save_page(err) + cursor = len(self.get_state().page_stack) - 1 + page_text = self._display_page(err, cursor, 0, -1) + return {'state': self.get_state(), 'pageText': cap_tool_content(page_text)} + + new_page = state.url_to_page.get(link_url) + if not new_page: + fetch_response = self._client.web_fetch(link_url) + normalized: Dict[str, Any] = { + 'results': { + link_url: [ + { + 'title': fetch_response.title or link_url, + 'url': link_url, + 'content': fetch_response.content or '', + } + ] + } + } + new_page = self._build_page_from_fetch(link_url, normalized) + + self._save_page(new_page) + cursor = len(self.get_state().page_stack) - 1 + page_text = self._display_page(new_page, cursor, loc, num_lines) + return {'state': self.get_state(), 'pageText': cap_tool_content(page_text)} + + if not page: + raise RuntimeError('No current page to display') + + cur = self.get_state() + cur.page_stack.append(page.url) + self.state.set_data(cur) + cursor = len(cur.page_stack) - 1 + page_text = self._display_page(page, cursor, loc, num_lines) + return {'state': self.get_state(), 'pageText': cap_tool_content(page_text)} + + def find(self, *, pattern: str, cursor: int = -1) -> Dict[str, Any]: + state = self.get_state() + if cursor == -1: + if not state.page_stack: + raise RuntimeError('No pages to search in') + page = self._page_from_stack(state.page_stack[-1]) + cursor = len(state.page_stack) - 1 + else: + if cursor < 0 or cursor >= len(state.page_stack): + cursor = max(0, min(cursor, len(state.page_stack) - 1)) + page = self._page_from_stack(state.page_stack[cursor]) + + find_page = self._build_find_results_page(pattern, page) + self._save_page(find_page) + new_cursor = len(self.get_state().page_stack) - 1 + + page_text = self._display_page(find_page, new_cursor, 0, -1) + return {'state': self.get_state(), 'pageText': cap_tool_content(page_text)}