Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 95 additions & 54 deletions plugins/acp/acp_plugin_gamesdk/acp_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ class AcpPluginOptions:
evaluator_cluster: Optional[str] = None
graduated: Optional[bool] = True
job_expiry_duration_mins: Optional[int] = None
keep_completed_jobs: Optional[int] = None
keep_cancelled_jobs: Optional[int] = None
keep_produced_inventory: Optional[int] = None

class AcpPlugin:
def __init__(self, options: AcpPluginOptions):
Expand Down Expand Up @@ -55,71 +58,109 @@ def __init__(self, options: AcpPluginOptions):
self.produced_inventory: List[IInventory] = []
self.acp_base_url = self.acp_client.acp_api_url
self.job_expiry_duration_mins = options.job_expiry_duration_mins if options.job_expiry_duration_mins is not None else 1440
self.keep_completed_jobs = options.keep_completed_jobs if options.keep_completed_jobs is not None else 1
self.keep_cancelled_jobs = options.keep_cancelled_jobs if options.keep_cancelled_jobs is not None else 0
self.keep_produced_inventory = options.keep_produced_inventory if options.keep_produced_inventory is not None else 1


def add_produce_item(self, item: IInventory) -> None:
self.produced_inventory.append(item)

def memo_to_dict(self, m):
return {
"id": m.id,
"type": m.type.name,
"content": m.content,
"next_phase": m.next_phase.name
}

def _to_state_acp_job(self, job: ACPJob) -> Dict:
memos = []
for memo in job.memos:
memos.append(self.memo_to_dict(memo))


return {
"jobId": job.id,
"clientName": job.client_agent.name if job.client_agent else "",
"providerName": job.provider_agent.name if job.provider_agent else "",
"desc": job.service_requirement or "",
"price": str(job.price),
"providerAddress": job.provider_address,
"phase": ACP_JOB_PHASE_MAP.get(job.phase),
"memo": list(reversed(memos)),
"tweetHistory": [
{
"type": tweet.get("type"),
"tweet_id": tweet.get("tweetId"),
"content": tweet.get("content"),
"created_at": tweet.get("createdAt")
}
for tweet in reversed(job.context.get('tweets', []) if job.context else [])
],
}

def get_acp_state(self) -> Dict:
agent_addr = self.acp_client.agent_address.lower()

def serialize_job(job: ACPJob, active: bool) -> Dict:
return {
"jobId": job.id,
"clientName": job.client_agent.name if job.client_agent else "",
"providerName": job.provider_agent.name if job.provider_agent else "",
"desc": job.service_requirement or "",
"price": str(job.price),
"providerAddress": job.provider_address,
"phase": ACP_JOB_PHASE_MAP.get(job.phase),
# Include memos only if active
"memo": [
{
"id": m.id,
"type": m.type.value,
"content": m.content,
"next_phase": m.next_phase.value,
}
for m in reversed(job.memos)
] if active and job.memos else [],
# Include tweetHistory only if active
"tweetHistory": [
{
"type": t.get("type"),
"tweet_id": t.get("tweetId"),
"content": t.get("content"),
"created_at": t.get("createdAt"),
}
for t in reversed(job.context.get("tweets", []))
] if active and job.context else [],
}

# Fetch job states
active_jobs = self.acp_client.get_active_jobs()
completed_jobs = self.acp_client.get_completed_jobs()
cancelled_jobs = self.acp_client.get_cancelled_jobs()

agent_addr = self.acp_client.agent_address.lower()
# Fetch completed jobs if not explicitly disabled
if self.keep_completed_jobs == 0:
completed_jobs = []
else:
completed_jobs = self.acp_client.get_completed_jobs()

active_buyer_jobs = []
active_seller_jobs = []

for job in active_jobs:
processed = self._to_state_acp_job(job)
client_addr = job.client_address.lower()
provider_addr = job.provider_address.lower()

if client_addr == agent_addr:
active_buyer_jobs.append(processed)
if provider_addr == agent_addr:
active_seller_jobs.append(processed)
# Fetch cancelled jobs if not explicitly disabled
if self.keep_cancelled_jobs == 0:
cancelled_jobs = []
else:
cancelled_jobs = self.acp_client.get_cancelled_jobs()

active_buyer_jobs = [
serialize_job(job, active=True)
for job in active_jobs
if job.client_address.lower() == agent_addr
]

active_seller_jobs = [
serialize_job(job, active=True)
for job in active_jobs
if job.provider_address.lower() == agent_addr
]

# Limit completed and cancelled jobs
completed = [
serialize_job(job, active=False)
for job in (
completed_jobs[:self.keep_completed_jobs]
if self.keep_completed_jobs is not None
else completed_jobs
)
]

cancelled = [
serialize_job(job, active=False)
for job in (
cancelled_jobs[:self.keep_cancelled_jobs]
if self.keep_cancelled_jobs is not None
else cancelled_jobs
)
]

# Produced inventory logic
produced = []
if self.produced_inventory and self.keep_produced_inventory > 0:
produced = [
item.model_dump() for item in (
self.produced_inventory[:self.keep_produced_inventory]
if self.keep_produced_inventory is not None
else self.produced_inventory
)
]

completed = [self._to_state_acp_job(job) for job in completed_jobs]
cancelled = [self._to_state_acp_job(job) for job in cancelled_jobs]

return {
"inventory": {
"acquired": [],
"produced": [item.model_dump() for item in self.produced_inventory] if self.produced_inventory else [],
"produced": produced,
},
"jobs": {
"active": {
Expand All @@ -128,7 +169,7 @@ def get_acp_state(self) -> Dict:
},
"completed": completed,
"cancelled": cancelled,
}
},
}

def get_worker(self, data: Optional[Dict] = None) -> WorkerConfig:
Expand Down