Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ printf("resolved value = %s\n", json_encode($result->value));

### Consuming events

Iterate the ordered event stream — `log`, `thought`, `tool_call`, `tool_result`, `status`, `metric`, `artifact_ref`, `progress`, `result_chunk` — and optionally acknowledge progress so the runtime can release buffered events early.
Iterate the ordered event stream — `log`, `metric`, `event.emit`, `tool.invoke`, `tool.result`, `job.progress`, `job.result_chunk`, `artifact.ref` — and optionally acknowledge progress so the runtime can release buffered events early.

```php
use Arcp\Envelope\Envelope;
Expand Down
9 changes: 7 additions & 2 deletions docs/guides/auth.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ $runtime = new ARCPRuntime(
);
```

`BearerAuth` always resolves the principal from the token → principal
map. The `principal` field a client may set in `PeerInfo` is ignored on
the runtime side — only the server-side mapping is authoritative.

## Custom verifier

Implement `Arcp\Auth\AuthScheme`:
Expand All @@ -54,8 +58,9 @@ final class HeaderAuth implements AuthScheme
## Where the principal lives

After handshake, the runtime stores the resolved principal on
`Session::$principal`. The client stores its own principal from
`PeerInfo`.
`Session::$principal` — server-authoritative, derived from the auth
scheme, not from any client-supplied `PeerInfo`. The client stores its
own principal from `PeerInfo` purely for local logging.

## Sessions, resume, and auth

Expand Down
5 changes: 3 additions & 2 deletions src/Auth/BearerAuth.php
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ public function verify(Auth $auth, PeerInfo $client): AuthResult
if (!isset($this->tokens[$auth->token])) {
return AuthResult::reject('invalid token');
}
$principal = $client->principal ?? $this->tokens[$auth->token];
return AuthResult::accept($principal);
// Always use the server-side principal mapped to the token; do not
// trust the principal supplied in the untrusted PeerInfo block.
return AuthResult::accept($this->tokens[$auth->token]);
}
}
98 changes: 95 additions & 3 deletions src/Client/ARCPClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
use Arcp\Messages\Artifacts\ArtifactFetch;
use Arcp\Messages\Artifacts\ArtifactPut;
use Arcp\Messages\Artifacts\ArtifactRef;
use Arcp\Messages\Artifacts\ArtifactRelease;
use Arcp\Messages\Control\Ack;
use Arcp\Messages\Control\Cancel;
use Arcp\Messages\Control\Nack;
use Arcp\Messages\Control\Ping;
Expand Down Expand Up @@ -141,6 +143,11 @@ public static function withConfig(ClientConfig $config): self
/**
* Send `session.open`, await `session.accepted`, and start the read-loop.
*
* @throws \Arcp\Errors\UnauthenticatedException when the runtime rejects credentials.
* @throws \Arcp\Errors\UnimplementedException for capability mismatches.
* @throws \Arcp\Errors\ARCPExceptionInterface for other handshake errors.
* @throws \Arcp\Errors\TransportClosedException if the transport drops.
*
* @size-check-suppress public BC; mirrors RFC §8.3 session.open shape.
*/
public function open(
Expand Down Expand Up @@ -169,6 +176,14 @@ public function open(
*
* @param array<string, mixed> $arguments
*
* @throws \Arcp\Errors\ARCPExceptionInterface mapped from `tool.error`
* or correlated `nack` (e.g. `PermissionDeniedException`,
* `BudgetExhaustedException`, `NotFoundException`).
* @throws \Arcp\Errors\DeadlineExceededException when `deadlineSeconds`
* elapses before a terminal response arrives.
* @throws \Arcp\Errors\CancelledException when `$cancellation` fires.
* @throws InvalidArgumentException for unexpected response shapes.
*
* @size-check-suppress public BC; tool.invoke options are RFC §10 wire fields.
*/
public function invokeTool(
Expand Down Expand Up @@ -208,6 +223,11 @@ public function invokeTool(
* @param array<string, mixed> $filter
* @param \Closure(Envelope): void $onEvent Called with the unwrapped envelope.
*
* @throws \Arcp\Errors\PermissionDeniedException when the filter
* crosses session-scope boundaries.
* @throws \Arcp\Errors\ARCPExceptionInterface for other runtime errors.
* @throws InvalidArgumentException for unexpected response shapes.
*
* @size-check-suppress public BC; subscribe is the RFC §13 entry-point.
*/
public function subscribe(
Expand Down Expand Up @@ -249,7 +269,13 @@ public function unsubscribe(SubscriptionId $id): void
}

/**
* Page through jobs visible to this session.
*
* @param array<string, mixed> $filter
*
* @throws \Arcp\Errors\ARCPExceptionInterface for runtime errors mapped
* from a correlated `nack`.
* @throws InvalidArgumentException for unexpected response shapes.
*/
public function listJobs(
array $filter = [],
Expand Down Expand Up @@ -289,6 +315,13 @@ public function cancelJob(
$this->session->transport->send($env);
}

/**
* Round-trip a ping/pong heartbeat.
*
* @throws \Arcp\Errors\ARCPExceptionInterface when the runtime
* returns a Nack instead of a Pong.
* @throws InvalidArgumentException for an unexpected response type.
*/
public function ping(?string $nonce = null, float $deadlineSeconds = 5.0): Pong
{
$id = MessageId::random();
Expand All @@ -299,29 +332,60 @@ public function ping(?string $nonce = null, float $deadlineSeconds = 5.0): Pong
sessionId: $this->session->sessionId,
);
$this->session->transport->send($env);
/** @var Pong $resp */
$resp = $this->pending->awaitResponse($id, $deadlineSeconds);
if ($resp instanceof Nack) {
throw $this->errorMapper->raise($resp->error);
}
if (!$resp instanceof Pong) {
throw new InvalidArgumentException('expected pong as ping response');
}
return $resp;
}

/**
* Upload an artifact and receive its server-issued reference.
*
* @param string|null $sha256 hex-encoded SHA-256 digest of `$bytes`.
* When supplied, the runtime rejects the upload if the digest does
* not match the decoded payload.
*
* @throws \Arcp\Errors\InvalidArgumentException on digest mismatch or
* malformed payload.
* @throws \Arcp\Errors\ARCPExceptionInterface on other runtime errors.
*/
public function putArtifact(
string $mediaType,
string $bytes,
?int $retentionSeconds = null,
?string $sha256 = null,
): ArtifactRef {
$id = MessageId::random();
$env = new Envelope(
id: $id,
payload: new ArtifactPut($mediaType, base64_encode($bytes), $retentionSeconds),
payload: new ArtifactPut($mediaType, base64_encode($bytes), $retentionSeconds, $sha256),
timestamp: $this->clock->now(),
sessionId: $this->session->sessionId,
);
$this->session->transport->send($env);
/** @var ArtifactRef $resp */
$resp = $this->pending->awaitResponse($id, 30.0);
if ($resp instanceof Nack) {
throw $this->errorMapper->raise($resp->error);
}
if (!$resp instanceof ArtifactRef) {
throw new InvalidArgumentException('expected artifact.ref as put response');
}
return $resp;
}

/**
* Fetch the bytes of an artifact owned by this session.
*
* @throws \Arcp\Errors\PermissionDeniedException for cross-session ids.
* @throws \Arcp\Errors\NotFoundException if the artifact is unknown
* or has expired.
* @throws \Arcp\Errors\ARCPExceptionInterface for other runtime errors.
* @throws InvalidArgumentException for malformed payload or response.
*/
public function fetchArtifact(ArtifactId $artifactId): string
{
$id = MessageId::random();
Expand All @@ -345,6 +409,34 @@ public function fetchArtifact(ArtifactId $artifactId): string
: throw new InvalidArgumentException('artifact data not base64');
}

/**
* Release an artifact owned by this session. Returns true if the
* runtime confirmed deletion, false if it was already unknown.
*
* @throws \Arcp\Errors\PermissionDeniedException when the artifact
* belongs to a different session.
* @throws \Arcp\Errors\ARCPExceptionInterface on other runtime errors.
*/
public function releaseArtifact(ArtifactId $artifactId): bool
{
$id = MessageId::random();
$env = new Envelope(
id: $id,
payload: new ArtifactRelease($artifactId),
timestamp: $this->clock->now(),
sessionId: $this->session->sessionId,
);
$this->session->transport->send($env);
$resp = $this->pending->awaitResponse($id, 30.0);
if ($resp instanceof Nack) {
throw $this->errorMapper->raise($resp->error);
}
if (!$resp instanceof Ack) {
throw new InvalidArgumentException('expected ack as artifact.release response');
}
return $resp->note === 'released';
}

public function close(): void
{
if ($this->session->state === SessionState::Closed) {
Expand Down
51 changes: 50 additions & 1 deletion src/Client/ResultChunkAssembler.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@
use Arcp\Errors\InvalidArgumentException;
use Arcp\Messages\Execution\ResultChunk;

/** Collects `job.result_chunk` messages by result id and assembles final bytes. */
/**
* Collects `job.result_chunk` messages by result id and assembles final
* bytes. Enforces sequence contiguity (0..N), terminal-chunk delivery,
* and duplicate consistency so a truncated or out-of-order stream never
* silently produces a corrupted result.
*/
final class ResultChunkAssembler
{
/** @var array<string, array<int, ResultChunk>> */
Expand All @@ -18,6 +23,13 @@ final class ResultChunkAssembler

public function push(ResultChunk $chunk): void
{
$existing = $this->chunks[$chunk->resultId][$chunk->chunkSeq] ?? null;
if ($existing instanceof ResultChunk && !$this->sameChunkPayload($existing, $chunk)) {
throw new InvalidArgumentException(
'result_chunk duplicate with conflicting payload',
['result_id' => $chunk->resultId, 'chunk_seq' => $chunk->chunkSeq],
);
}
$this->chunks[$chunk->resultId][$chunk->chunkSeq] = $chunk;
if (!$chunk->more) {
$this->complete[$chunk->resultId] = true;
Expand All @@ -35,7 +47,14 @@ public function assemble(string $resultId): string
if ($chunks === []) {
throw new InvalidArgumentException('unknown result_id: ' . $resultId);
}
if (!isset($this->complete[$resultId])) {
throw new InvalidArgumentException(
'result_chunk stream incomplete: terminal chunk not yet received',
['result_id' => $resultId],
);
}
ksort($chunks);
$this->assertContiguous($resultId, $chunks);
$out = '';
foreach ($chunks as $chunk) {
$out .= $chunk->encoding === 'base64'
Expand All @@ -45,6 +64,36 @@ public function assemble(string $resultId): string
return $out;
}

/** @param array<int, ResultChunk> $chunks */
private function assertContiguous(string $resultId, array $chunks): void
{
$expected = 0;
$terminal = null;
foreach ($chunks as $seq => $chunk) {
if ($seq !== $expected) {
throw new InvalidArgumentException(
'result_chunk sequence not contiguous from 0',
['result_id' => $resultId, 'expected_seq' => $expected, 'actual_seq' => $seq],
);
}
++$expected;
$terminal = $chunk;
}
if ($terminal instanceof ResultChunk && $terminal->more) {
throw new InvalidArgumentException(
'result_chunk highest sequence has more=true',
['result_id' => $resultId],
);
}
}

private function sameChunkPayload(ResultChunk $a, ResultChunk $b): bool
{
return $a->data === $b->data
&& $a->encoding === $b->encoding
&& $a->more === $b->more;
}

private function decodeBase64(ResultChunk $chunk): string
{
$decoded = base64_decode($chunk->data, strict: true);
Expand Down
34 changes: 31 additions & 3 deletions src/Internal/Runtime/ArtifactDispatcher.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,28 @@ public function put(Session $session, Envelope $env, ArtifactPut $msg): void
);
return;
}
if ($msg->sha256 !== null) {
$normalized = strtolower(trim($msg->sha256));
if (preg_match('/^[0-9a-f]{64}$/', $normalized) !== 1) {
$this->lifecycle->nack(
$session,
$env,
'INVALID_ARGUMENT',
'artifact.put sha256 must be 64 lowercase hex chars',
);
Comment on lines +45 to +51
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor | ⚡ Quick win

Adjust SHA-256 validation error text to match actual behavior.

Line 45 normalizes with strtolower(...), so uppercase hex is accepted. The current message says “must be 64 lowercase hex chars,” which is misleading.

Suggested fix
-                    'artifact.put sha256 must be 64 lowercase hex chars',
+                    'artifact.put sha256 must be 64 hex characters',
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
if (preg_match('/^[0-9a-f]{64}$/', $normalized) !== 1) {
$this->lifecycle->nack(
$session,
$env,
'INVALID_ARGUMENT',
'artifact.put sha256 must be 64 lowercase hex chars',
);
if (preg_match('/^[0-9a-f]{64}$/', $normalized) !== 1) {
$this->lifecycle->nack(
$session,
$env,
'INVALID_ARGUMENT',
'artifact.put sha256 must be 64 hex characters',
);
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/Internal/Runtime/ArtifactDispatcher.php` around lines 45 - 51, The error
message in the SHA-256 validation is misleading because $normalized is derived
with strtolower(), so uppercase hex is accepted; update the lifecycle->nack call
(the error string passed in the artifact.put SHA-256 validation block using
preg_match('/^[0-9a-f]{64}$/', $normalized)) to a neutral message such as
"artifact.put sha256 must be 64 hexadecimal characters" (or similar) that does
not claim lowercase-only hex.

return;
}
$computed = hash('sha256', $bytes);
if (!hash_equals($computed, $normalized)) {
$this->lifecycle->nack(
$session,
$env,
'INVALID_ARGUMENT',
'artifact.put sha256 does not match payload',
);
return;
}
}
$ref = $this->runtime->artifacts->put(
$session,
new ArtifactBlob($msg->mediaType, $bytes, $msg->retentionSeconds),
Expand All @@ -50,21 +72,27 @@ public function put(Session $session, Envelope $env, ArtifactPut $msg): void
public function fetch(Session $session, Envelope $env, ArtifactFetch $msg): void
{
try {
$bytes = $this->runtime->artifacts->fetch($msg->artifactId);
$bytes = $this->runtime->artifacts->fetch($msg->artifactId, $session);
$mediaType = $this->runtime->artifacts->ref($msg->artifactId, $session)->mediaType;
} catch (ARCPException $e) {
$this->lifecycle->nack($session, $env, $e->code()->value, $e->getMessage());
return;
}
$put = new ArtifactPut(
mediaType: $this->runtime->artifacts->ref($msg->artifactId)->mediaType,
mediaType: $mediaType,
data: base64_encode($bytes),
);
$this->runtime->emit($session, $put, ['correlation_id' => $env->id]);
}

public function release(Session $session, Envelope $env, ArtifactRelease $msg): void
{
$ok = $this->runtime->artifacts->release($msg->artifactId);
try {
$ok = $this->runtime->artifacts->release($msg->artifactId, $session);
} catch (ARCPException $e) {
$this->lifecycle->nack($session, $env, $e->code()->value, $e->getMessage());
return;
}
$this->runtime->emit($session, new Ack($ok ? 'released' : 'unknown'), [
'correlation_id' => $env->id,
]);
Expand Down
Loading