diff --git a/plugins/acp/acp_plugin_gamesdk/acp_client.py b/plugins/acp/acp_plugin_gamesdk/acp_client.py index c64e5fbb..f1d86bd2 100644 --- a/plugins/acp/acp_plugin_gamesdk/acp_client.py +++ b/plugins/acp/acp_plugin_gamesdk/acp_client.py @@ -51,7 +51,7 @@ def browse_agents(self, cluster: Optional[str] = None, query: Optional[str] = No offerings = [AcpOffering(name=offering["name"], price=offering["price"]) for offering in agent["offerings"]] else: offerings = None - + result.append( AcpAgent( id=agent["id"], @@ -68,7 +68,7 @@ def create_job(self, provider_address: str, price: float, job_description: str) expire_at = datetime.now() + timedelta(days=1) tx_result = self.acp_token.create_job( provider_address=provider_address, - evaluator_address=provider_address, + evaluator_address=self.agent_wallet_address, expire_at=expire_at ) diff --git a/plugins/acp/acp_plugin_gamesdk/acp_plugin.py b/plugins/acp/acp_plugin_gamesdk/acp_plugin.py index 6d9342fc..34bc4e1b 100644 --- a/plugins/acp/acp_plugin_gamesdk/acp_plugin.py +++ b/plugins/acp/acp_plugin_gamesdk/acp_plugin.py @@ -1,15 +1,21 @@ +from collections.abc import Callable +import signal +import sys from typing import List, Dict, Any, Optional,Tuple import json from dataclasses import dataclass from datetime import datetime +import socketio +import socketio.client + from game_sdk.game.agent import WorkerConfig from game_sdk.game.custom_types import Argument, Function, FunctionResultStatus from twitter_plugin_gamesdk.twitter_plugin import TwitterPlugin from twitter_plugin_gamesdk.game_twitter_plugin import GameTwitterPlugin from acp_plugin_gamesdk.acp_client import AcpClient from acp_plugin_gamesdk.acp_token import AcpToken -from acp_plugin_gamesdk.interface import AcpJobPhasesDesc, IInventory +from acp_plugin_gamesdk.interface import AcpJobPhasesDesc, IDeliverable, IInventory @dataclass class AcpPluginOptions: @@ -17,13 +23,20 @@ class AcpPluginOptions: acp_token_client: AcpToken twitter_plugin: TwitterPlugin | GameTwitterPlugin = None cluster: Optional[str] = None - acp_base_url: Optional[str] = None + on_evaluate: Optional[Callable[[IDeliverable], Tuple[bool, str]]] = None +SocketEvents = { + "JOIN_EVALUATOR_ROOM": "joinEvaluatorRoom", + "LEAVE_EVALUATOR_ROOM": "leaveEvaluatorRoom", + "ON_EVALUATE": "onEvaluate", + "ROOM_JOINED" : "roomJoined" +} class AcpPlugin: def __init__(self, options: AcpPluginOptions): print("Initializing AcpPlugin") - self.acp_client = AcpClient(options.api_key, options.acp_token_client, options.acp_base_url) + self.acp_token_client = options.acp_token_client + self.acp_client = AcpClient(options.api_key, options.acp_token_client, options.acp_token_client.acp_base_url) self.id = "acp_worker" self.name = "ACP Worker" self.description = """ @@ -44,9 +57,70 @@ def __init__(self, options: AcpPluginOptions): self.cluster = options.cluster self.twitter_plugin = options.twitter_plugin self.produced_inventory: List[IInventory] = [] - self.acp_base_url = options.acp_base_url if options.acp_base_url else "https://acpx-staging.virtuals.io/api" - - + self.acp_base_url = self.acp_token_client.acp_base_url if self.acp_token_client.acp_base_url is None else "https://acpx-staging.virtuals.io/api" + if (options.on_evaluate is not None): + print("Initializing socket") + self.on_evaluate = options.on_evaluate + self.socket = None + self.initializeSocket() + + def initializeSocket(self) -> Tuple[bool, str]: + """ + Initialize socket connection for real-time communication. + Returns a tuple of (success, message). + """ + try: + print("Initializing socket after") + self.socket = socketio.Client() + + # Set up authentication before connecting + self.socket.auth = { + "evaluatorAddress": self.acp_token_client.agent_wallet_address + } + + # Connect socket to GAME SDK dev server + self.socket.connect("https://sdk-dev.game.virtuals.io", auth=self.socket.auth) + + if (self.socket.connected): + print("Connecting socket") + self.socket.emit(SocketEvents["JOIN_EVALUATOR_ROOM"], self.acp_token_client.agent_wallet_address) + print(f"Joined evaluator room with address: {self.acp_token_client.agent_wallet_address}") + + + # Set up event handler for evaluation requests + @self.socket.on(SocketEvents["ON_EVALUATE"]) + def on_evaluate(data): + if self.on_evaluate: + deliverable = data.get("deliverable") + memo_id = data.get("memoId") + + is_approved, reasoning = self.on_evaluate(deliverable) + + self.acp_token_client.sign_memo(memo_id, is_approved, reasoning) + + # Set up cleanup function for graceful shutdown + def cleanup(): + if self.socket: + print("Disconnecting socket") + self.socket.emit("leaveEvaluatorRoom", self.acp_token_client.agent_wallet_address, callback=lambda: print("Successfully left evaluator room")) + + import time + time.sleep(1) + self.socket.disconnect() + + def signal_handler(sig, frame): + cleanup() + sys.exit(0) + + signal.signal(signal.SIGINT, signal_handler) + signal.signal(signal.SIGTERM, signal_handler) + + return True, "Socket initialized successfully" + + except Exception as e: + return False, f"Failed to initialize socket: {str(e)}" + + def add_produce_item(self, item: IInventory) -> None: self.produced_inventory.append(item) @@ -112,7 +186,7 @@ def _search_agents_executable(self,reasoning: str, keyword: str) -> Tuple[Functi return FunctionResultStatus.FAILED, "No other trading agents found in the system. Please try again later when more agents are available.", {} return FunctionResultStatus.DONE, json.dumps({ - "availableAgents": [{"id": agent.id, "name": agent.name, "description": agent.description, "wallet_address": agent.wallet_address, "offerings": [{"name": offering.name, "price": offering.price} for offering in agent.offerings]} for agent in agents], + "availableAgents": [{"id": agent.id, "name": agent.name, "description": agent.description, "wallet_address": agent.wallet_address, "offerings": [{"name": offering.name, "price": offering.price} for offering in agent.offerings] if agent.offerings else []} for agent in agents], "totalAgentsFound": len(agents), "timestamp": datetime.now().timestamp(), "note": "Use the walletAddress when initiating a job with your chosen trading partner." @@ -165,20 +239,24 @@ def initiate_job(self) -> Function: description="Detailed specifications for service-based items", ) - tweet_content_arg = Argument( - name="tweetContent", - type="string", - description="Tweet content that will be posted about this job. Must include the seller's Twitter handle (with @ symbol) to notify them", - ) + args = [seller_wallet_address_arg, price_arg, reasoning_arg, service_requirements_arg] + + if self.twitter_plugin is not None: + tweet_content_arg = Argument( + name="tweetContent", + type="string", + description="Tweet content that will be posted about this job. Must include the seller's Twitter handle (with @ symbol) to notify them", + ) + args.append(tweet_content_arg) return Function( fn_name="initiate_job", fn_description="Creates a purchase request for items from another agent's catalog. Only for use when YOU are the buyer. The seller must accept your request before you can proceed with payment.", - args=[seller_wallet_address_arg, price_arg, reasoning_arg, service_requirements_arg, tweet_content_arg], + args=args, executable=self._initiate_job_executable ) - def _initiate_job_executable(self, sellerWalletAddress: str, price: str, reasoning: str, serviceRequirements: str, tweetContent : str) -> Tuple[FunctionResultStatus, str, dict]: + def _initiate_job_executable(self, sellerWalletAddress: str, price: str, reasoning: str, serviceRequirements: str, tweetContent : Optional[str] = None) -> Tuple[FunctionResultStatus, str, dict]: if not price: return FunctionResultStatus.FAILED, "Missing price - specify how much you're offering per unit", {} @@ -231,21 +309,25 @@ def respond_job(self) -> Function: type="string", description="Why you made this decision", ) - - tweet_content_arg = Argument( - name="tweetContent", - type="string", - description="Tweet content that will be posted about this job. Must include the seller's Twitter handle (with @ symbol) to notify them", - ) + + args = [job_id_arg, decision_arg, reasoning_arg] + + if self.twitter_plugin is not None: + tweet_content_arg = Argument( + name="tweetContent", + type="string", + description="Tweet content that will be posted about this job. Must include the seller's Twitter handle (with @ symbol) to notify them", + ) + args.append(tweet_content_arg) return Function( fn_name="respond_to_job", fn_description="Accepts or rejects an incoming 'request' job", - args=[job_id_arg, decision_arg, reasoning_arg, tweet_content_arg], + args=args, executable=self._respond_job_executable ) - def _respond_job_executable(self, jobId: int, decision: str, reasoning: str, tweetContent: str) -> Tuple[FunctionResultStatus, str, dict]: + def _respond_job_executable(self, jobId: int, decision: str, reasoning: str, tweetContent: Optional[str] = None) -> Tuple[FunctionResultStatus, str, dict]: if not jobId: return FunctionResultStatus.FAILED, "Missing job ID - specify which job you're responding to", {} @@ -276,7 +358,7 @@ def _respond_job_executable(self, jobId: int, decision: str, reasoning: str, twe reasoning ) - if (self.twitter_plugin is not None): + if (self.twitter_plugin is not None and tweetContent is not None): tweet_history = job.get("tweetHistory", []) tweet_id = tweet_history[-1].get("tweetId") if tweet_history else None if (tweet_id is not None): @@ -314,20 +396,24 @@ def pay_job(self) -> Function: description="Why you are making this payment", ) - tweet_content_arg = Argument( - name="tweetContent", - type="string", - description="Tweet content that will be posted about this job. Must include the seller's Twitter handle (with @ symbol) to notify them", - ) + args = [job_id_arg, amount_arg, reasoning_arg] + + if self.twitter_plugin is not None: + tweet_content_arg = Argument( + name="tweetContent", + type="string", + description="Tweet content that will be posted about this job. Must include the seller's Twitter handle (with @ symbol) to notify them", + ) + args.append(tweet_content_arg) return Function( fn_name="pay_job", fn_description="Processes payment for an accepted purchase request", - args=[job_id_arg, amount_arg, reasoning_arg, tweet_content_arg], + args=args, executable=self._pay_job_executable ) - def _pay_job_executable(self, jobId: int, amount: float, reasoning: str, tweetContent: str) -> Tuple[FunctionResultStatus, str, dict]: + def _pay_job_executable(self, jobId: int, amount: float, reasoning: str, tweetContent: Optional[str] = None) -> Tuple[FunctionResultStatus, str, dict]: if not jobId: return FunctionResultStatus.FAILED, "Missing job ID - specify which job you're paying for", {} @@ -359,7 +445,7 @@ def _pay_job_executable(self, jobId: int, amount: float, reasoning: str, tweetCo reasoning ) - if (self.twitter_plugin is not None): + if (self.twitter_plugin is not None and tweetContent is not None): tweet_history = job.get("tweetHistory", []) tweet_id = tweet_history[-1].get("tweetId") if tweet_history else None if (tweet_id is not None): @@ -403,20 +489,24 @@ def deliver_job(self) -> Function: description="Why you are making this delivery", ) - tweet_content_arg = Argument( - name="tweetContent", - type="string", - description="Tweet content that will be posted about this job. Must include the seller's Twitter handle (with @ symbol) to notify them", - ) + args = [job_id_arg, deliverable_type_arg, deliverable_arg, reasoning_arg] + + if self.twitter_plugin is not None: + tweet_content_arg = Argument( + name="tweetContent", + type="string", + description="Tweet content that will be posted about this job. Must include the seller's Twitter handle (with @ symbol) to notify them", + ) + args.append(tweet_content_arg) return Function( fn_name="deliver_job", fn_description="Completes a sale by delivering items to the buyer", - args=[job_id_arg, deliverable_type_arg, deliverable_arg, reasoning_arg, tweet_content_arg], + args=args, executable=self._deliver_job_executable ) - def _deliver_job_executable(self, jobId: int, deliverableType: str, deliverable: str, reasoning: str, tweetContent: str) -> Tuple[FunctionResultStatus, str, dict]: + def _deliver_job_executable(self, jobId: int, deliverableType: str, deliverable: str, reasoning: str, tweetContent: Optional[str] = None) -> Tuple[FunctionResultStatus, str, dict]: if not jobId: return FunctionResultStatus.FAILED, "Missing job ID - specify which job you're delivering for", {} @@ -458,7 +548,7 @@ def _deliver_job_executable(self, jobId: int, deliverableType: str, deliverable: json.dumps(deliverable), ) - if (self.twitter_plugin is not None): + if (self.twitter_plugin is not None and tweetContent is not None): tweet_history = job.get("tweetHistory", []) tweet_id = tweet_history[-1].get("tweetId") if tweet_history else None if (tweet_id is not None): diff --git a/plugins/acp/acp_plugin_gamesdk/interface.py b/plugins/acp/acp_plugin_gamesdk/interface.py index 806d8d02..d6f42fcd 100644 --- a/plugins/acp/acp_plugin_gamesdk/interface.py +++ b/plugins/acp/acp_plugin_gamesdk/interface.py @@ -1,6 +1,6 @@ from dataclasses import dataclass from enum import IntEnum, Enum -from typing import List, Dict, Optional +from typing import List, Dict, Literal, Optional @dataclass class AcpOffering: @@ -46,7 +46,7 @@ class AcpJob: @dataclass class IDeliverable: - type: str + type: Literal["url", "text"] value: str @dataclass diff --git a/plugins/acp/examples/test_buyer.py b/plugins/acp/examples/test_buyer.py index 65b2476f..1883f248 100644 --- a/plugins/acp/examples/test_buyer.py +++ b/plugins/acp/examples/test_buyer.py @@ -6,10 +6,14 @@ from acp_plugin_gamesdk.acp_token import AcpToken from twitter_plugin_gamesdk.game_twitter_plugin import GameTwitterPlugin from twitter_plugin_gamesdk.twitter_plugin import TwitterPlugin - +from acp_plugin_gamesdk.interface import IDeliverable def ask_question(query: str) -> str: return input(query) +def on_evaluate(deliverable: IDeliverable) -> Tuple[bool, str]: + print(f"Evaluating deliverable: {deliverable}") + return True, "Default evaluation" + # GAME Twitter Plugin options options = { "id": "test_game_twitter_plugin", @@ -46,7 +50,8 @@ def main(): "https://acpx-staging.virtuals.io/api" ), acp_base_url="https://acpx-staging.virtuals.io/api", - twitter_plugin=GameTwitterPlugin(options) + twitter_plugin=GameTwitterPlugin(options), + on_evaluate=on_evaluate # will initialize socket connection for real-time communication ) ) # Native Twitter Plugin diff --git a/plugins/acp/pyproject.toml b/plugins/acp/pyproject.toml index 925bf26a..5d7a804a 100644 --- a/plugins/acp/pyproject.toml +++ b/plugins/acp/pyproject.toml @@ -17,6 +17,8 @@ requests = "^2.32.3" pydantic = "^2.10.6" twitter-plugin-gamesdk = ">=0.2.2" game-sdk = ">=0.1.5" +python-socketio = "^5.11.1" +websocket-client = "^1.7.0" [build-system] requires = ["poetry-core"]