From fe54ad21525b84e3ff0fe57f897d795d36394a01 Mon Sep 17 00:00:00 2001 From: Celeste Ang Date: Mon, 7 Jul 2025 02:42:48 +0800 Subject: [PATCH 1/4] refactor: agent state optimisations --- plugins/acp/acp_plugin_gamesdk/acp_plugin.py | 125 +++++++++++-------- 1 file changed, 73 insertions(+), 52 deletions(-) diff --git a/plugins/acp/acp_plugin_gamesdk/acp_plugin.py b/plugins/acp/acp_plugin_gamesdk/acp_plugin.py index 954e7d23..df46dee6 100644 --- a/plugins/acp/acp_plugin_gamesdk/acp_plugin.py +++ b/plugins/acp/acp_plugin_gamesdk/acp_plugin.py @@ -23,6 +23,9 @@ class AcpPluginOptions: evaluator_cluster: Optional[str] = None graduated: Optional[bool] = True job_expiry_duration_mins: Optional[int] = None + keep_completed_jobs: Optional[int] = 1 + keep_cancelled_jobs: Optional[int] = 0 + keep_produced_inventory: Optional[int] = 1 class AcpPlugin: def __init__(self, options: AcpPluginOptions): @@ -55,71 +58,89 @@ def __init__(self, options: AcpPluginOptions): self.produced_inventory: List[IInventory] = [] self.acp_base_url = self.acp_client.acp_api_url self.job_expiry_duration_mins = options.job_expiry_duration_mins if options.job_expiry_duration_mins is not None else 1440 + self.keep_completed_jobs = options.keep_completed_jobs or 1 + self.keep_cancelled_jobs = options.keep_cancelled_jobs or 0 + self.keep_produced_inventory = options.keep_produced_inventory or 1 + def add_produce_item(self, item: IInventory) -> None: self.produced_inventory.append(item) - - def memo_to_dict(self, m): - return { - "id": m.id, - "type": m.type.name, - "content": m.content, - "next_phase": m.next_phase.name - } - - def _to_state_acp_job(self, job: ACPJob) -> Dict: - memos = [] - for memo in job.memos: - memos.append(self.memo_to_dict(memo)) - - - return { - "jobId": job.id, - "clientName": job.client_agent.name if job.client_agent else "", - "providerName": job.provider_agent.name if job.provider_agent else "", - "desc": job.service_requirement or "", - "price": str(job.price), - "providerAddress": job.provider_address, - "phase": ACP_JOB_PHASE_MAP.get(job.phase), - "memo": list(reversed(memos)), - "tweetHistory": [ - { - "type": tweet.get("type"), - "tweet_id": tweet.get("tweetId"), - "content": tweet.get("content"), - "created_at": tweet.get("createdAt") - } - for tweet in reversed(job.context.get('tweets', []) if job.context else []) - ], - } def get_acp_state(self) -> Dict: + keep_completed_jobs = self.keep_completed_jobs + keep_cancelled_jobs = self.keep_cancelled_jobs + keep_produced_inventory = self.keep_produced_inventory + agent_addr = self.acp_client.agent_address.lower() + + def serialize_job(job: ACPJob, active: bool) -> Dict: + return { + "jobId": job.id, + "clientName": job.client_agent.name if job.client_agent else "", + "providerName": job.provider_agent.name if job.provider_agent else "", + "desc": job.service_requirement or "", + "price": str(job.price), + "providerAddress": job.provider_address, + "phase": ACP_JOB_PHASE_MAP.get(job.phase), + # Include memos only if active + "memo": [ + { + "id": m.id, + "type": m.type.value, + "content": m.content, + "next_phase": m.next_phase.value, + } + for m in reversed(job.memos) + ] if active and job.memos else [], + # Include tweetHistory only if active + "tweetHistory": [ + { + "type": t.get("type"), + "tweet_id": t.get("tweetId"), + "content": t.get("content"), + "created_at": t.get("createdAt"), + } + for t in reversed(job.context.get("tweets", [])) + ] if active and job.context else [], + } + + # Fetch job states active_jobs = self.acp_client.get_active_jobs() completed_jobs = self.acp_client.get_completed_jobs() cancelled_jobs = self.acp_client.get_cancelled_jobs() - agent_addr = self.acp_client.agent_address.lower() + # Partition active jobs + active_buyer_jobs = [ + serialize_job(job, active=True) + for job in active_jobs + if job.client_address.lower() == agent_addr + ] - active_buyer_jobs = [] - active_seller_jobs = [] - - for job in active_jobs: - processed = self._to_state_acp_job(job) - client_addr = job.client_address.lower() - provider_addr = job.provider_address.lower() - - if client_addr == agent_addr: - active_buyer_jobs.append(processed) - if provider_addr == agent_addr: - active_seller_jobs.append(processed) + active_seller_jobs = [ + serialize_job(job, active=True) + for job in active_jobs + if job.provider_address.lower() == agent_addr + ] + + # Limit completed and cancelled jobs + completed = [ + serialize_job(job, active=False) + for job in completed_jobs[:keep_completed_jobs] + ] + + cancelled = [ + serialize_job(job, active=False) + for job in cancelled_jobs[:keep_cancelled_jobs] + ] + + # Limit inventories + produced = [ + item.model_dump() for item in self.produced_inventory[:keep_produced_inventory] + ] if self.produced_inventory else [] - completed = [self._to_state_acp_job(job) for job in completed_jobs] - cancelled = [self._to_state_acp_job(job) for job in cancelled_jobs] - return { "inventory": { "acquired": [], - "produced": [item.model_dump() for item in self.produced_inventory] if self.produced_inventory else [], + "produced": produced, }, "jobs": { "active": { @@ -128,7 +149,7 @@ def get_acp_state(self) -> Dict: }, "completed": completed, "cancelled": cancelled, - } + }, } def get_worker(self, data: Optional[Dict] = None) -> WorkerConfig: From 48c58b01fd7df5c514c3cd242ea9f272eccaf02e Mon Sep 17 00:00:00 2001 From: Celeste Ang Date: Mon, 7 Jul 2025 10:34:56 +0800 Subject: [PATCH 2/4] refactor: call job apis only if needed for agent state --- plugins/acp/acp_plugin_gamesdk/acp_plugin.py | 29 ++++++++++++-------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/plugins/acp/acp_plugin_gamesdk/acp_plugin.py b/plugins/acp/acp_plugin_gamesdk/acp_plugin.py index df46dee6..4ce5577e 100644 --- a/plugins/acp/acp_plugin_gamesdk/acp_plugin.py +++ b/plugins/acp/acp_plugin_gamesdk/acp_plugin.py @@ -23,9 +23,9 @@ class AcpPluginOptions: evaluator_cluster: Optional[str] = None graduated: Optional[bool] = True job_expiry_duration_mins: Optional[int] = None - keep_completed_jobs: Optional[int] = 1 - keep_cancelled_jobs: Optional[int] = 0 - keep_produced_inventory: Optional[int] = 1 + keep_completed_jobs: Optional[int] = None + keep_cancelled_jobs: Optional[int] = None + keep_produced_inventory: Optional[int] = None class AcpPlugin: def __init__(self, options: AcpPluginOptions): @@ -58,9 +58,9 @@ def __init__(self, options: AcpPluginOptions): self.produced_inventory: List[IInventory] = [] self.acp_base_url = self.acp_client.acp_api_url self.job_expiry_duration_mins = options.job_expiry_duration_mins if options.job_expiry_duration_mins is not None else 1440 - self.keep_completed_jobs = options.keep_completed_jobs or 1 - self.keep_cancelled_jobs = options.keep_cancelled_jobs or 0 - self.keep_produced_inventory = options.keep_produced_inventory or 1 + self.keep_completed_jobs = options.keep_completed_jobs if options.keep_completed_jobs is not None else 1 + self.keep_cancelled_jobs = options.keep_cancelled_jobs if options.keep_cancelled_jobs is not None else 0 + self.keep_produced_inventory = options.keep_produced_inventory if options.keep_produced_inventory is not None else 1 def add_produce_item(self, item: IInventory) -> None: @@ -105,10 +105,15 @@ def serialize_job(job: ACPJob, active: bool) -> Dict: # Fetch job states active_jobs = self.acp_client.get_active_jobs() - completed_jobs = self.acp_client.get_completed_jobs() - cancelled_jobs = self.acp_client.get_cancelled_jobs() - # Partition active jobs + completed_jobs = [] + if self.keep_completed_jobs and self.keep_completed_jobs > 0: + completed_jobs = self.acp_client.get_completed_jobs() + + cancelled_jobs = [] + if self.keep_cancelled_jobs and self.keep_cancelled_jobs > 0: + cancelled_jobs = self.acp_client.get_cancelled_jobs() + active_buyer_jobs = [ serialize_job(job, active=True) for job in active_jobs @@ -125,17 +130,17 @@ def serialize_job(job: ACPJob, active: bool) -> Dict: completed = [ serialize_job(job, active=False) for job in completed_jobs[:keep_completed_jobs] - ] + ] if self.keep_completed_jobs else [] cancelled = [ serialize_job(job, active=False) for job in cancelled_jobs[:keep_cancelled_jobs] - ] + ] if self.keep_cancelled_jobs else [] # Limit inventories produced = [ item.model_dump() for item in self.produced_inventory[:keep_produced_inventory] - ] if self.produced_inventory else [] + ] if self.produced_inventory and self.keep_produced_inventory > 0 else [] return { "inventory": { From c0d223d093e495ade4dfa434851a7781d9d5eac1 Mon Sep 17 00:00:00 2001 From: Celeste Ang Date: Mon, 7 Jul 2025 10:49:22 +0800 Subject: [PATCH 3/4] refactor: handle no limit scenario for agent state --- plugins/acp/acp_plugin_gamesdk/acp_plugin.py | 42 ++++++++++++++------ 1 file changed, 30 insertions(+), 12 deletions(-) diff --git a/plugins/acp/acp_plugin_gamesdk/acp_plugin.py b/plugins/acp/acp_plugin_gamesdk/acp_plugin.py index 4ce5577e..07d75e72 100644 --- a/plugins/acp/acp_plugin_gamesdk/acp_plugin.py +++ b/plugins/acp/acp_plugin_gamesdk/acp_plugin.py @@ -106,12 +106,16 @@ def serialize_job(job: ACPJob, active: bool) -> Dict: # Fetch job states active_jobs = self.acp_client.get_active_jobs() - completed_jobs = [] - if self.keep_completed_jobs and self.keep_completed_jobs > 0: + # Fetch completed jobs if not explicitly disabled + if self.keep_completed_jobs == 0: + completed_jobs = [] + else: completed_jobs = self.acp_client.get_completed_jobs() - cancelled_jobs = [] - if self.keep_cancelled_jobs and self.keep_cancelled_jobs > 0: + # Fetch cancelled jobs if not explicitly disabled + if self.keep_cancelled_jobs == 0: + cancelled_jobs = [] + else: cancelled_jobs = self.acp_client.get_cancelled_jobs() active_buyer_jobs = [ @@ -129,18 +133,32 @@ def serialize_job(job: ACPJob, active: bool) -> Dict: # Limit completed and cancelled jobs completed = [ serialize_job(job, active=False) - for job in completed_jobs[:keep_completed_jobs] - ] if self.keep_completed_jobs else [] + for job in ( + completed_jobs[:self.keep_completed_jobs] + if self.keep_completed_jobs is not None + else completed_jobs + ) + ] cancelled = [ serialize_job(job, active=False) - for job in cancelled_jobs[:keep_cancelled_jobs] - ] if self.keep_cancelled_jobs else [] + for job in ( + cancelled_jobs[:self.keep_cancelled_jobs] + if self.keep_cancelled_jobs is not None + else cancelled_jobs + ) + ] - # Limit inventories - produced = [ - item.model_dump() for item in self.produced_inventory[:keep_produced_inventory] - ] if self.produced_inventory and self.keep_produced_inventory > 0 else [] + # Produced inventory logic + produced = [] + if self.produced_inventory and self.keep_produced_inventory > 0: + produced = [ + item.model_dump() for item in ( + self.produced_inventory[:self.keep_produced_inventory] + if self.keep_produced_inventory is not None + else self.produced_inventory + ) + ] return { "inventory": { From 279ee4ef463345bb72aaf9d5d69728ed47c4e61b Mon Sep 17 00:00:00 2001 From: Celeste Ang Date: Mon, 7 Jul 2025 10:53:36 +0800 Subject: [PATCH 4/4] refactor: remove unused vars for agent state --- plugins/acp/acp_plugin_gamesdk/acp_plugin.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/plugins/acp/acp_plugin_gamesdk/acp_plugin.py b/plugins/acp/acp_plugin_gamesdk/acp_plugin.py index 07d75e72..90b6fa61 100644 --- a/plugins/acp/acp_plugin_gamesdk/acp_plugin.py +++ b/plugins/acp/acp_plugin_gamesdk/acp_plugin.py @@ -67,9 +67,6 @@ def add_produce_item(self, item: IInventory) -> None: self.produced_inventory.append(item) def get_acp_state(self) -> Dict: - keep_completed_jobs = self.keep_completed_jobs - keep_cancelled_jobs = self.keep_cancelled_jobs - keep_produced_inventory = self.keep_produced_inventory agent_addr = self.acp_client.agent_address.lower() def serialize_job(job: ACPJob, active: bool) -> Dict: