Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions plugins/acp/acp_plugin_gamesdk/acp_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def browse_agents(self, cluster: Optional[str] = None, query: Optional[str] = No
offerings = [AcpOffering(name=offering["name"], price=offering["price"]) for offering in agent["offerings"]]
else:
offerings = None

result.append(
AcpAgent(
id=agent["id"],
Expand All @@ -68,7 +68,7 @@ def create_job(self, provider_address: str, price: float, job_description: str)
expire_at = datetime.now() + timedelta(days=1)
tx_result = self.acp_token.create_job(
provider_address=provider_address,
evaluator_address=provider_address,
evaluator_address=self.agent_wallet_address,
expire_at=expire_at
)

Expand Down
168 changes: 129 additions & 39 deletions plugins/acp/acp_plugin_gamesdk/acp_plugin.py
Original file line number Diff line number Diff line change
@@ -1,29 +1,42 @@
from collections.abc import Callable
import signal
import sys
from typing import List, Dict, Any, Optional,Tuple
import json
from dataclasses import dataclass
from datetime import datetime

import socketio
import socketio.client

from game_sdk.game.agent import WorkerConfig
from game_sdk.game.custom_types import Argument, Function, FunctionResultStatus
from twitter_plugin_gamesdk.twitter_plugin import TwitterPlugin
from twitter_plugin_gamesdk.game_twitter_plugin import GameTwitterPlugin
from acp_plugin_gamesdk.acp_client import AcpClient
from acp_plugin_gamesdk.acp_token import AcpToken
from acp_plugin_gamesdk.interface import AcpJobPhasesDesc, IInventory
from acp_plugin_gamesdk.interface import AcpJobPhasesDesc, IDeliverable, IInventory

@dataclass
class AcpPluginOptions:
api_key: str
acp_token_client: AcpToken
twitter_plugin: TwitterPlugin | GameTwitterPlugin = None
cluster: Optional[str] = None
acp_base_url: Optional[str] = None
on_evaluate: Optional[Callable[[IDeliverable], Tuple[bool, str]]] = None

SocketEvents = {
"JOIN_EVALUATOR_ROOM": "joinEvaluatorRoom",
"LEAVE_EVALUATOR_ROOM": "leaveEvaluatorRoom",
"ON_EVALUATE": "onEvaluate",
"ROOM_JOINED" : "roomJoined"
}

class AcpPlugin:
def __init__(self, options: AcpPluginOptions):
print("Initializing AcpPlugin")
self.acp_client = AcpClient(options.api_key, options.acp_token_client, options.acp_base_url)
self.acp_token_client = options.acp_token_client
self.acp_client = AcpClient(options.api_key, options.acp_token_client, options.acp_token_client.acp_base_url)
self.id = "acp_worker"
self.name = "ACP Worker"
self.description = """
Expand All @@ -44,9 +57,70 @@ def __init__(self, options: AcpPluginOptions):
self.cluster = options.cluster
self.twitter_plugin = options.twitter_plugin
self.produced_inventory: List[IInventory] = []
self.acp_base_url = options.acp_base_url if options.acp_base_url else "https://acpx-staging.virtuals.io/api"


self.acp_base_url = self.acp_token_client.acp_base_url if self.acp_token_client.acp_base_url is None else "https://acpx-staging.virtuals.io/api"
if (options.on_evaluate is not None):
print("Initializing socket")
self.on_evaluate = options.on_evaluate
self.socket = None
self.initializeSocket()

def initializeSocket(self) -> Tuple[bool, str]:
"""
Initialize socket connection for real-time communication.
Returns a tuple of (success, message).
"""
try:
print("Initializing socket after")
self.socket = socketio.Client()

# Set up authentication before connecting
self.socket.auth = {
"evaluatorAddress": self.acp_token_client.agent_wallet_address
}

# Connect socket to GAME SDK dev server
self.socket.connect("https://sdk-dev.game.virtuals.io", auth=self.socket.auth)

if (self.socket.connected):
print("Connecting socket")
self.socket.emit(SocketEvents["JOIN_EVALUATOR_ROOM"], self.acp_token_client.agent_wallet_address)
print(f"Joined evaluator room with address: {self.acp_token_client.agent_wallet_address}")


# Set up event handler for evaluation requests
@self.socket.on(SocketEvents["ON_EVALUATE"])
def on_evaluate(data):
if self.on_evaluate:
deliverable = data.get("deliverable")
memo_id = data.get("memoId")

is_approved, reasoning = self.on_evaluate(deliverable)

self.acp_token_client.sign_memo(memo_id, is_approved, reasoning)

# Set up cleanup function for graceful shutdown
def cleanup():
if self.socket:
print("Disconnecting socket")
self.socket.emit("leaveEvaluatorRoom", self.acp_token_client.agent_wallet_address, callback=lambda: print("Successfully left evaluator room"))

import time
time.sleep(1)
self.socket.disconnect()

def signal_handler(sig, frame):
cleanup()
sys.exit(0)

signal.signal(signal.SIGINT, signal_handler)
signal.signal(signal.SIGTERM, signal_handler)

return True, "Socket initialized successfully"

except Exception as e:
return False, f"Failed to initialize socket: {str(e)}"



def add_produce_item(self, item: IInventory) -> None:
self.produced_inventory.append(item)
Expand Down Expand Up @@ -112,7 +186,7 @@ def _search_agents_executable(self,reasoning: str, keyword: str) -> Tuple[Functi
return FunctionResultStatus.FAILED, "No other trading agents found in the system. Please try again later when more agents are available.", {}

return FunctionResultStatus.DONE, json.dumps({
"availableAgents": [{"id": agent.id, "name": agent.name, "description": agent.description, "wallet_address": agent.wallet_address, "offerings": [{"name": offering.name, "price": offering.price} for offering in agent.offerings]} for agent in agents],
"availableAgents": [{"id": agent.id, "name": agent.name, "description": agent.description, "wallet_address": agent.wallet_address, "offerings": [{"name": offering.name, "price": offering.price} for offering in agent.offerings] if agent.offerings else []} for agent in agents],
"totalAgentsFound": len(agents),
"timestamp": datetime.now().timestamp(),
"note": "Use the walletAddress when initiating a job with your chosen trading partner."
Expand Down Expand Up @@ -165,20 +239,24 @@ def initiate_job(self) -> Function:
description="Detailed specifications for service-based items",
)

tweet_content_arg = Argument(
name="tweetContent",
type="string",
description="Tweet content that will be posted about this job. Must include the seller's Twitter handle (with @ symbol) to notify them",
)
args = [seller_wallet_address_arg, price_arg, reasoning_arg, service_requirements_arg]

if self.twitter_plugin is not None:
tweet_content_arg = Argument(
name="tweetContent",
type="string",
description="Tweet content that will be posted about this job. Must include the seller's Twitter handle (with @ symbol) to notify them",
)
args.append(tweet_content_arg)

return Function(
fn_name="initiate_job",
fn_description="Creates a purchase request for items from another agent's catalog. Only for use when YOU are the buyer. The seller must accept your request before you can proceed with payment.",
args=[seller_wallet_address_arg, price_arg, reasoning_arg, service_requirements_arg, tweet_content_arg],
args=args,
executable=self._initiate_job_executable
)

def _initiate_job_executable(self, sellerWalletAddress: str, price: str, reasoning: str, serviceRequirements: str, tweetContent : str) -> Tuple[FunctionResultStatus, str, dict]:
def _initiate_job_executable(self, sellerWalletAddress: str, price: str, reasoning: str, serviceRequirements: str, tweetContent : Optional[str] = None) -> Tuple[FunctionResultStatus, str, dict]:
if not price:
return FunctionResultStatus.FAILED, "Missing price - specify how much you're offering per unit", {}

Expand Down Expand Up @@ -231,21 +309,25 @@ def respond_job(self) -> Function:
type="string",
description="Why you made this decision",
)

tweet_content_arg = Argument(
name="tweetContent",
type="string",
description="Tweet content that will be posted about this job. Must include the seller's Twitter handle (with @ symbol) to notify them",
)

args = [job_id_arg, decision_arg, reasoning_arg]

if self.twitter_plugin is not None:
tweet_content_arg = Argument(
name="tweetContent",
type="string",
description="Tweet content that will be posted about this job. Must include the seller's Twitter handle (with @ symbol) to notify them",
)
args.append(tweet_content_arg)

return Function(
fn_name="respond_to_job",
fn_description="Accepts or rejects an incoming 'request' job",
args=[job_id_arg, decision_arg, reasoning_arg, tweet_content_arg],
args=args,
executable=self._respond_job_executable
)

def _respond_job_executable(self, jobId: int, decision: str, reasoning: str, tweetContent: str) -> Tuple[FunctionResultStatus, str, dict]:
def _respond_job_executable(self, jobId: int, decision: str, reasoning: str, tweetContent: Optional[str] = None) -> Tuple[FunctionResultStatus, str, dict]:
if not jobId:
return FunctionResultStatus.FAILED, "Missing job ID - specify which job you're responding to", {}

Expand Down Expand Up @@ -276,7 +358,7 @@ def _respond_job_executable(self, jobId: int, decision: str, reasoning: str, twe
reasoning
)

if (self.twitter_plugin is not None):
if (self.twitter_plugin is not None and tweetContent is not None):
tweet_history = job.get("tweetHistory", [])
tweet_id = tweet_history[-1].get("tweetId") if tweet_history else None
if (tweet_id is not None):
Expand Down Expand Up @@ -314,20 +396,24 @@ def pay_job(self) -> Function:
description="Why you are making this payment",
)

tweet_content_arg = Argument(
name="tweetContent",
type="string",
description="Tweet content that will be posted about this job. Must include the seller's Twitter handle (with @ symbol) to notify them",
)
args = [job_id_arg, amount_arg, reasoning_arg]

if self.twitter_plugin is not None:
tweet_content_arg = Argument(
name="tweetContent",
type="string",
description="Tweet content that will be posted about this job. Must include the seller's Twitter handle (with @ symbol) to notify them",
)
args.append(tweet_content_arg)

return Function(
fn_name="pay_job",
fn_description="Processes payment for an accepted purchase request",
args=[job_id_arg, amount_arg, reasoning_arg, tweet_content_arg],
args=args,
executable=self._pay_job_executable
)

def _pay_job_executable(self, jobId: int, amount: float, reasoning: str, tweetContent: str) -> Tuple[FunctionResultStatus, str, dict]:
def _pay_job_executable(self, jobId: int, amount: float, reasoning: str, tweetContent: Optional[str] = None) -> Tuple[FunctionResultStatus, str, dict]:
if not jobId:
return FunctionResultStatus.FAILED, "Missing job ID - specify which job you're paying for", {}

Expand Down Expand Up @@ -359,7 +445,7 @@ def _pay_job_executable(self, jobId: int, amount: float, reasoning: str, tweetCo
reasoning
)

if (self.twitter_plugin is not None):
if (self.twitter_plugin is not None and tweetContent is not None):
tweet_history = job.get("tweetHistory", [])
tweet_id = tweet_history[-1].get("tweetId") if tweet_history else None
if (tweet_id is not None):
Expand Down Expand Up @@ -403,20 +489,24 @@ def deliver_job(self) -> Function:
description="Why you are making this delivery",
)

tweet_content_arg = Argument(
name="tweetContent",
type="string",
description="Tweet content that will be posted about this job. Must include the seller's Twitter handle (with @ symbol) to notify them",
)
args = [job_id_arg, deliverable_type_arg, deliverable_arg, reasoning_arg]

if self.twitter_plugin is not None:
tweet_content_arg = Argument(
name="tweetContent",
type="string",
description="Tweet content that will be posted about this job. Must include the seller's Twitter handle (with @ symbol) to notify them",
)
args.append(tweet_content_arg)

return Function(
fn_name="deliver_job",
fn_description="Completes a sale by delivering items to the buyer",
args=[job_id_arg, deliverable_type_arg, deliverable_arg, reasoning_arg, tweet_content_arg],
args=args,
executable=self._deliver_job_executable
)

def _deliver_job_executable(self, jobId: int, deliverableType: str, deliverable: str, reasoning: str, tweetContent: str) -> Tuple[FunctionResultStatus, str, dict]:
def _deliver_job_executable(self, jobId: int, deliverableType: str, deliverable: str, reasoning: str, tweetContent: Optional[str] = None) -> Tuple[FunctionResultStatus, str, dict]:
if not jobId:
return FunctionResultStatus.FAILED, "Missing job ID - specify which job you're delivering for", {}

Expand Down Expand Up @@ -458,7 +548,7 @@ def _deliver_job_executable(self, jobId: int, deliverableType: str, deliverable:
json.dumps(deliverable),
)

if (self.twitter_plugin is not None):
if (self.twitter_plugin is not None and tweetContent is not None):
tweet_history = job.get("tweetHistory", [])
tweet_id = tweet_history[-1].get("tweetId") if tweet_history else None
if (tweet_id is not None):
Expand Down
4 changes: 2 additions & 2 deletions plugins/acp/acp_plugin_gamesdk/interface.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from dataclasses import dataclass
from enum import IntEnum, Enum
from typing import List, Dict, Optional
from typing import List, Dict, Literal, Optional

@dataclass
class AcpOffering:
Expand Down Expand Up @@ -46,7 +46,7 @@ class AcpJob:

@dataclass
class IDeliverable:
type: str
type: Literal["url", "text"]
value: str

@dataclass
Expand Down
9 changes: 7 additions & 2 deletions plugins/acp/examples/test_buyer.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,14 @@
from acp_plugin_gamesdk.acp_token import AcpToken
from twitter_plugin_gamesdk.game_twitter_plugin import GameTwitterPlugin
from twitter_plugin_gamesdk.twitter_plugin import TwitterPlugin

from acp_plugin_gamesdk.interface import IDeliverable
def ask_question(query: str) -> str:
return input(query)

def on_evaluate(deliverable: IDeliverable) -> Tuple[bool, str]:
print(f"Evaluating deliverable: {deliverable}")
return True, "Default evaluation"

# GAME Twitter Plugin options
options = {
"id": "test_game_twitter_plugin",
Expand Down Expand Up @@ -46,7 +50,8 @@ def main():
"https://acpx-staging.virtuals.io/api"
),
acp_base_url="https://acpx-staging.virtuals.io/api",
twitter_plugin=GameTwitterPlugin(options)
twitter_plugin=GameTwitterPlugin(options),
on_evaluate=on_evaluate # will initialize socket connection for real-time communication
)
)
# Native Twitter Plugin
Expand Down
2 changes: 2 additions & 0 deletions plugins/acp/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ requests = "^2.32.3"
pydantic = "^2.10.6"
twitter-plugin-gamesdk = ">=0.2.2"
game-sdk = ">=0.1.5"
python-socketio = "^5.11.1"
websocket-client = "^1.7.0"

[build-system]
requires = ["poetry-core"]
Expand Down