diff --git a/plugins/acp/acp_plugin_gamesdk/acp_plugin.py b/plugins/acp/acp_plugin_gamesdk/acp_plugin.py index 954e7d2..90b6fa6 100644 --- a/plugins/acp/acp_plugin_gamesdk/acp_plugin.py +++ b/plugins/acp/acp_plugin_gamesdk/acp_plugin.py @@ -23,6 +23,9 @@ class AcpPluginOptions: evaluator_cluster: Optional[str] = None graduated: Optional[bool] = True job_expiry_duration_mins: Optional[int] = None + keep_completed_jobs: Optional[int] = None + keep_cancelled_jobs: Optional[int] = None + keep_produced_inventory: Optional[int] = None class AcpPlugin: def __init__(self, options: AcpPluginOptions): @@ -55,71 +58,109 @@ def __init__(self, options: AcpPluginOptions): self.produced_inventory: List[IInventory] = [] self.acp_base_url = self.acp_client.acp_api_url self.job_expiry_duration_mins = options.job_expiry_duration_mins if options.job_expiry_duration_mins is not None else 1440 + self.keep_completed_jobs = options.keep_completed_jobs if options.keep_completed_jobs is not None else 1 + self.keep_cancelled_jobs = options.keep_cancelled_jobs if options.keep_cancelled_jobs is not None else 0 + self.keep_produced_inventory = options.keep_produced_inventory if options.keep_produced_inventory is not None else 1 + def add_produce_item(self, item: IInventory) -> None: self.produced_inventory.append(item) - - def memo_to_dict(self, m): - return { - "id": m.id, - "type": m.type.name, - "content": m.content, - "next_phase": m.next_phase.name - } - - def _to_state_acp_job(self, job: ACPJob) -> Dict: - memos = [] - for memo in job.memos: - memos.append(self.memo_to_dict(memo)) - - - return { - "jobId": job.id, - "clientName": job.client_agent.name if job.client_agent else "", - "providerName": job.provider_agent.name if job.provider_agent else "", - "desc": job.service_requirement or "", - "price": str(job.price), - "providerAddress": job.provider_address, - "phase": ACP_JOB_PHASE_MAP.get(job.phase), - "memo": list(reversed(memos)), - "tweetHistory": [ - { - "type": tweet.get("type"), - "tweet_id": tweet.get("tweetId"), - "content": tweet.get("content"), - "created_at": tweet.get("createdAt") - } - for tweet in reversed(job.context.get('tweets', []) if job.context else []) - ], - } def get_acp_state(self) -> Dict: + agent_addr = self.acp_client.agent_address.lower() + + def serialize_job(job: ACPJob, active: bool) -> Dict: + return { + "jobId": job.id, + "clientName": job.client_agent.name if job.client_agent else "", + "providerName": job.provider_agent.name if job.provider_agent else "", + "desc": job.service_requirement or "", + "price": str(job.price), + "providerAddress": job.provider_address, + "phase": ACP_JOB_PHASE_MAP.get(job.phase), + # Include memos only if active + "memo": [ + { + "id": m.id, + "type": m.type.value, + "content": m.content, + "next_phase": m.next_phase.value, + } + for m in reversed(job.memos) + ] if active and job.memos else [], + # Include tweetHistory only if active + "tweetHistory": [ + { + "type": t.get("type"), + "tweet_id": t.get("tweetId"), + "content": t.get("content"), + "created_at": t.get("createdAt"), + } + for t in reversed(job.context.get("tweets", [])) + ] if active and job.context else [], + } + + # Fetch job states active_jobs = self.acp_client.get_active_jobs() - completed_jobs = self.acp_client.get_completed_jobs() - cancelled_jobs = self.acp_client.get_cancelled_jobs() - agent_addr = self.acp_client.agent_address.lower() + # Fetch completed jobs if not explicitly disabled + if self.keep_completed_jobs == 0: + completed_jobs = [] + else: + completed_jobs = self.acp_client.get_completed_jobs() - active_buyer_jobs = [] - active_seller_jobs = [] - - for job in active_jobs: - processed = self._to_state_acp_job(job) - client_addr = job.client_address.lower() - provider_addr = job.provider_address.lower() - - if client_addr == agent_addr: - active_buyer_jobs.append(processed) - if provider_addr == agent_addr: - active_seller_jobs.append(processed) + # Fetch cancelled jobs if not explicitly disabled + if self.keep_cancelled_jobs == 0: + cancelled_jobs = [] + else: + cancelled_jobs = self.acp_client.get_cancelled_jobs() + + active_buyer_jobs = [ + serialize_job(job, active=True) + for job in active_jobs + if job.client_address.lower() == agent_addr + ] + + active_seller_jobs = [ + serialize_job(job, active=True) + for job in active_jobs + if job.provider_address.lower() == agent_addr + ] + + # Limit completed and cancelled jobs + completed = [ + serialize_job(job, active=False) + for job in ( + completed_jobs[:self.keep_completed_jobs] + if self.keep_completed_jobs is not None + else completed_jobs + ) + ] + + cancelled = [ + serialize_job(job, active=False) + for job in ( + cancelled_jobs[:self.keep_cancelled_jobs] + if self.keep_cancelled_jobs is not None + else cancelled_jobs + ) + ] + + # Produced inventory logic + produced = [] + if self.produced_inventory and self.keep_produced_inventory > 0: + produced = [ + item.model_dump() for item in ( + self.produced_inventory[:self.keep_produced_inventory] + if self.keep_produced_inventory is not None + else self.produced_inventory + ) + ] - completed = [self._to_state_acp_job(job) for job in completed_jobs] - cancelled = [self._to_state_acp_job(job) for job in cancelled_jobs] - return { "inventory": { "acquired": [], - "produced": [item.model_dump() for item in self.produced_inventory] if self.produced_inventory else [], + "produced": produced, }, "jobs": { "active": { @@ -128,7 +169,7 @@ def get_acp_state(self) -> Dict: }, "completed": completed, "cancelled": cancelled, - } + }, } def get_worker(self, data: Optional[Dict] = None) -> WorkerConfig: