Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.11", "3.12", "3.13"]
python-version: ["3.12", "3.13"]
fail-fast: false
defaults:
run:
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ venv/
ENV/
env.bak/
venv.bak/
.pdm-python

# Spyder project settings
.spyderproject
Expand Down
5 changes: 0 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,6 @@ export BUB_API_KEY="sk-..."
export BUB_PROVIDER="anthropic"
export BUB_MODEL_NAME="claude-3-sonnet-20240229"
export BUB_API_KEY="your-anthropic-key"

# For local models with Ollama
export BUB_PROVIDER="ollama"
export BUB_MODEL_NAME="llama2"
# No API key needed for local models
```

### 2. Start using Bub
Expand Down
10 changes: 0 additions & 10 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,6 @@ export BUB_API_KEY="sk-..."
export BUB_PROVIDER="anthropic"
export BUB_MODEL_NAME="claude-3-5-sonnet-20241022"
export BUB_API_KEY="your-anthropic-key"

# For local models with Ollama
export BUB_PROVIDER="ollama"
export BUB_MODEL_NAME="llama3"
# No API key needed for local models

# For Groq (fast inference)
export BUB_PROVIDER="groq"
export BUB_MODEL_NAME="llama3-8b-8192"
export BUB_API_KEY="gsk_..."
```

### Usage
Expand Down
10 changes: 6 additions & 4 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
[project]
name = "bub"
version = "0.1.0"
version = "0.1.1"
description = "Bub it. Build it."
authors = [{ name = "Chojan Shang", email = "psiace@apache.org" }]
readme = "README.md"
keywords = ['python']
requires-python = ">=3.11,<4.0"
requires-python = ">=3.12,<4.0"
classifiers = [
"Intended Audience :: Developers",
"Programming Language :: Python",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.11",
"Programming Language :: Python :: 3.12",
"Programming Language :: Python :: 3.13",
"Topic :: Software Development :: Libraries :: Python Modules",
Expand All @@ -20,8 +19,11 @@ dependencies = [
"pydantic>=2.0.0",
"pydantic-settings>=2.0.0",
"typer>=0.9.0",
"any-llm-sdk[openai,anthropic,google,azure,aws]>=0.1.0",
"any-llm-sdk[openai,anthropic,google,azure,aws]>=0.1.1",
"rich>=13.0.0",
"huey>=2.5.0",
"eventure>=0.4.0",
"uuid-extension>=0.2.0",
]

[project.urls]
Expand Down
2 changes: 1 addition & 1 deletion src/bub/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""Bub - Bub it. Build it."""

__version__ = "0.1.0"
__version__ = "0.1.1"

from .agent import Agent, ToolRegistry

Expand Down
3 changes: 3 additions & 0 deletions src/bub/agent/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@

from .context import Context
from .core import Agent, ReActPromptFormatter
from .runtime import ToolActionSubscriber, build_event_runtime
from .tools import Tool, ToolExecutor, ToolRegistry, ToolResult

__all__ = [
"Agent",
"Context",
"ReActPromptFormatter",
"Tool",
"ToolActionSubscriber",
"ToolExecutor",
"ToolRegistry",
"ToolResult",
"build_event_runtime",
]
186 changes: 160 additions & 26 deletions src/bub/agent/core.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,21 @@
"""Core agent implementation for Bub."""
"""Core agent implementation for Bub.

Supports two execution modes:
- Legacy synchronous mode (default) executes tools inline.
- Evented mode (when provided an EventBus) publishes ReACT events and
waits for observations, powered by Eventure + Huey runtime.
"""

import json
import re
import threading
from pathlib import Path
from typing import Callable, Optional

from any_llm import completion # type: ignore[import-untyped]
from eventure import EventBus, EventLog
from openai.types.chat import ChatCompletion, ChatCompletionMessageParam
from uuid_extension import uuid7

from .context import Context
from .tools import ToolExecutor, ToolRegistry
Expand Down Expand Up @@ -57,13 +68,17 @@ def __init__(
max_tokens: Optional[int] = None,
workspace_path: Optional[Path] = None,
system_prompt: Optional[str] = None,
bus: Optional[EventBus] = None,
event_log: Optional[EventLog] = None,
context_window_tokens: int = 32000,
):
self.provider = provider
self.model_name = model_name
self.api_key = api_key
self.api_base = api_base
self.max_tokens = max_tokens
self.conversation_history: list[ChatCompletionMessageParam] = []
self.context_window_tokens: int = context_window_tokens

# Initialize context and tool registry
self.context: Context = Context(workspace_path=workspace_path)
Expand All @@ -85,6 +100,11 @@ def __init__(
config_prompt = self.context.get_system_prompt()
self.system_prompt = self.prompt_formatter.format_prompt(config_prompt)

# Evented runtime (required)
self.bus: EventBus = bus or EventBus(EventLog())
self.event_log: EventLog = event_log or self.bus.event_log
self._evented: bool = True

@property
def model(self) -> str:
"""Get the full model string in provider/model format."""
Expand All @@ -93,19 +113,118 @@ def model(self) -> str:
def reset_conversation(self) -> None:
"""Reset the conversation history."""
self.conversation_history = []
# Notify observers/renderer
self._publish("agent.reset", {"reason": "user_requested"})

def _publish(self, event_type: str, data: dict) -> None:
self.bus.publish(event_type, data)

def _wait_for_observation(self, action_id: str, timeout_seconds: float = 30.0) -> dict:
done = threading.Event()
payload: dict = {}

def on_observation(event, _aid=action_id, _payload=payload, _done=done) -> None:
data = event.data
if data.get("id") == _aid:
_payload.update(data)
_done.set()

unsubscribe = self.bus.subscribe("agent.observation", on_observation)
try:
done.wait(timeout=timeout_seconds)
finally:
unsubscribe()
return payload

def _process_tool_calls(self, tool_calls: list[dict], on_step: Optional[Callable[[str, str], None]]) -> None:
for tool_call in tool_calls:
tool_name = tool_call.get("tool")
parameters = tool_call.get("parameters", {})
if not tool_name:
continue
action_id = str(uuid7())
# Subscribe BEFORE publishing action to avoid race (immediate execution)
done = threading.Event()
payload: dict = {}

def on_observation(event, _aid=action_id, _payload=payload, _done=done) -> None:
data = event.data
if data.get("id") == _aid:
_payload.update(data)
_done.set()

unsubscribe = self.bus.subscribe("agent.observation", on_observation)
try:
self._publish("agent.action", {"id": action_id, "tool": tool_name, "input": parameters})
# Extend wait based on tool-specific timeout if provided
wait_seconds = float(parameters.get("timeout", 30)) + 5.0
done.wait(timeout=wait_seconds)
finally:
unsubscribe()
obs_payload = payload
if not obs_payload:
obs_text = f'Observation: {{"ok": false, "error": "Tool {tool_name} timed out"}}'
self.conversation_history.append({"role": "user", "content": obs_text})
if on_step:
on_step("observation", obs_text)
continue
observation = json.dumps(obs_payload.get("result", {}), ensure_ascii=False)
obs_text = f"Observation: {observation}"
self.conversation_history.append({"role": "user", "content": obs_text})
if on_step:
on_step("observation", obs_text)
# Advance tick after processing observations
self.event_log.advance_tick()

def _strip_final_answer(self, text: str) -> str:
"""Remove any 'Final Answer:' section from a ReACT message (used when tools will run)."""
idx = text.find("Final Answer:")
return text if idx == -1 else text[:idx].rstrip()

def _extract_final_answer(self, text: str) -> Optional[str]:
m = re.search(r"Final Answer:\s*(.+)\s*$", text, re.DOTALL | re.IGNORECASE)
return m.group(1).strip() if m else None

def _estimate_tokens_for_text(self, text: str) -> int:
# Approximate: ~4 chars per token
return max(1, len(text) // 4)

def _estimate_tokens_for_messages(self, messages: list[ChatCompletionMessageParam]) -> int:
total = 0
for m in messages:
content = str(m.get("content", ""))
total += self._estimate_tokens_for_text(content) + 4 # small overhead per message
return total

def _build_messages(self) -> list[ChatCompletionMessageParam]:
system_msgs: list[ChatCompletionMessageParam] = [
{"role": "system", "content": self.system_prompt},
{"role": "system", "content": self.context.build_context_message()},
]
history = list(self.conversation_history)
# Reserve room for model output; default to 1000 if unspecified
output_budget = self.max_tokens or 1000
# Keep a safety margin for tool/use overhead
budget = max(1000, self.context_window_tokens - output_budget - 500)
while True:
msgs = system_msgs + history
if self._estimate_tokens_for_messages(msgs) <= budget:
return msgs
if not history:
return msgs
# Trim from the oldest entries (drop oldest user+assistant pair when possible)
history = history[2:] if len(history) >= 2 else history[1:]

def chat(self, message: str, on_step: Optional[Callable[[str, str], None]] = None) -> str:
"""Chat with the agent. If on_step is provided, call it with each intermediate message/observation."""
self.conversation_history.append({"role": "user", "content": message})

while True:
context_msg = self.context.build_context_message()
# Evented: advance tick and publish user input
self.event_log.advance_tick()
self._publish("user.input", {"message": message})

messages: list[ChatCompletionMessageParam] = [
{"role": "system", "content": self.system_prompt},
{"role": "system", "content": context_msg},
]
messages.extend(self.conversation_history)
while True:
messages: list[ChatCompletionMessageParam] = self._build_messages()

try:
response: ChatCompletion = completion(
Expand All @@ -116,30 +235,45 @@ def chat(self, message: str, on_step: Optional[Callable[[str, str], None]] = Non
api_base=self.api_base,
)
assistant_message = str(response.choices[0].message.content)
self.conversation_history.append({"role": "assistant", "content": assistant_message})
if on_step:
on_step("assistant", assistant_message)

tool_calls = self.tool_executor.extract_tool_calls(assistant_message)
if not tool_calls:
# Publish explicit final event for renderer clarity
final = self._extract_final_answer(assistant_message) or assistant_message
self.conversation_history.append({"role": "assistant", "content": assistant_message})
self._publish("agent.final", {"content": final})
if on_step:
on_step("assistant", final)
return final

if tool_calls:
for tool_call in tool_calls:
tool_name = tool_call.get("tool")
parameters = tool_call.get("parameters", {})
if not tool_name:
continue
result = self.tool_executor.execute_tool(tool_name, **parameters)
observation = f"Observation: {result.format_result()}"
self.conversation_history.append({"role": "user", "content": observation})
if on_step:
on_step("observation", observation)
continue
else:
return assistant_message

# Evented tool execution
# Do not surface premature Final Answer lines prior to tool execution
cleaned = self._strip_final_answer(assistant_message)
self.conversation_history.append({"role": "assistant", "content": cleaned})
self._publish("agent.thought", {"content": cleaned})
if on_step:
on_step("assistant", cleaned)
self._process_tool_calls(tool_calls, on_step)
# Immediately get the final answer after tools complete
messages = self._build_messages()
response2: ChatCompletion = completion(
model=self.model,
messages=messages,
max_tokens=self.max_tokens,
api_key=self.api_key,
api_base=self.api_base,
)
final_message = str(response2.choices[0].message.content)
self.conversation_history.append({"role": "assistant", "content": final_message})
final = self._extract_final_answer(final_message) or final_message
self._publish("agent.final", {"content": final})
if on_step:
on_step("assistant", final)
except Exception as e:
error_message = f"Error communicating with AI: {e!s}"
self.conversation_history.append({"role": "assistant", "content": error_message})
if on_step:
on_step("error", error_message)
return error_message
else:
return assistant_message
Loading
Loading