fix: address 24 open issues — security, concurrency, semantics, docs#66
fix: address 24 open issues — security, concurrency, semantics, docs#66nficano wants to merge 1 commit into
Conversation
Fixes a batch of correctness and security gaps in the runtime/client and narrows several over-advertised features in the docs: Security / soundness - JWT challenge binding: JWTAuthValidator now requires a matching `nonce` claim when the runtime issues a challenge (#58). - ModelUse subset: rework `subsetViolation(of:)` to use a real glob-inclusion check so wildcard parents with suffix segments can't be escaped (#53). - Lease durations validated at grant/refresh; `handleLeaseRefresh` surfaces errors instead of swallowing them (#48). Concurrency - PendingRegistry and `ARCPClient.ping` use a slot state machine so the waiter is registered before any timeout race (#42). - Pending invocations are failed and progress/result streams are finished when the transport closes; invoke's send-error path cleans up state (#41). - `ARCPRuntime.register` race window closed by inserting `jobManagers` entry before iterating `registeredHandlers` (#61). - Subscriptions track owner session id; cleanup runs when the session ends (#54). `subscribe.event` envelopes are skipped during route(), preventing recursive cascade on empty filters (#40). - `StreamManager.subscribeInbound` refuses a second subscriber for the same stream id with `failedPrecondition` (#62). Client dispatch - Unmatched `job.progress`, `job.result_chunk`, and `tool.error` envelopes fall through to `unhandled` instead of being silently dropped / double-delivered (#55, #59, #60). Runtime semantics - Idempotency keys persist the terminal `job.completed/failed/cancelled` payload (scoped by principal + key) and replay it on duplicate invokes without re-running the handler (#43). - Trace fields default to `Tracing.current`; `JobManager` propagates the inbound trace context across job lifecycle and handler emissions (#45). - Artifact retention sweep starts in `ARCPRuntime.init`; lease expiry sweep starts in `JobManager.init` (#47). - `CredentialManager.rotate` keeps every credential returned by the provisioner instead of silently discarding extras (#63). - ULID generator preserves strict monotonicity across equal/backward clocks and overflow (#56). Mailbox - O(1) drain via head index with periodic compaction (#49). Docs - Resume guide / CONFORMANCE narrowed to same-session replay only (#44). - Interrupt support documented as wire-acked only — no handler observation callback (#46). - Heartbeat recovery documented as telemetry-only (#57). - DocC comments added across the most user-facing public surface (#50). Tests - New unit / integration suites cover the fixes and previously untested files: Mailbox, LeaseManager, PendingRegistry, JWTAuth, client dispatch fallback, subscription cleanup, idempotency replay, ULID burst (#52). Closes #40 #41 #42 #43 #44 #45 #46 #47 #48 #49 #50 #51 #52 #53 #54 #55 #56 #57 #58 #59 #60 #61 #62 #63 Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
WalkthroughThis PR advances the ARCP v1.1 Swift SDK with multi-layered improvements: JWT challenge nonce binding for replay prevention, ULID/queue/registry infrastructure refactors, stream single-subscriber enforcement, runtime initialization with idempotency caching and tracing propagation, client ping/pong state machine, and enhanced unhandled envelope routing. Includes comprehensive test coverage and specification clarifications. ChangesARCP v1.1 Runtime Enhancements
Estimated code review effort🎯 4 (Complex) | ⏱️ ~75 minutes Possibly related issues
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Comment |
There was a problem hiding this comment.
Actionable comments posted: 7
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
docs/guides/resume.md (1)
82-83:⚠️ Potential issue | 🟡 Minor | ⚡ Quick winUpdate
ResumePayloadtable to match current SDK behavior.These rows conflict with Line 22-25. The table currently implies support that the scope section explicitly says is not implemented/ignored.
Suggested doc fix
-| `checkpointId` | `String?` | Reserved for checkpoint-based resume (deferred to v0.2) | -| `includeOpenStreams` | `Bool` | If true, re-emit `stream.open` for streams still alive | +| `checkpointId` | `String?` | Not implemented in this SDK release (`ARCPError.unimplemented`) | +| `includeOpenStreams` | `Bool` | Currently ignored; open streams are not re-emitted |🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@docs/guides/resume.md` around lines 82 - 83, The ResumePayload table is inconsistent with the SDK scope (which says these fields are not implemented/ignored); update the ResumePayload table to reflect current behavior by removing or marking the `checkpointId` and `includeOpenStreams` rows as "Not implemented / Ignored (reserved for future release)" (or explicitly state deferred to v0.2 for `checkpointId`), ensuring the table entries for `checkpointId` and `includeOpenStreams` match the text in the scope section and no longer imply active support.
🧹 Nitpick comments (1)
docs/guides/resume.md (1)
68-68: 💤 Low valueUse
includeOpenStreams: falsein sample to avoid implying current support.Since this release documents the field as ignored, setting it to
falsekeeps the example aligned with actual behavior.🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@docs/guides/resume.md` at line 68, Update the example ResumePayload call to reflect current behavior by setting includeOpenStreams to false: change the ResumePayload(...) sample that uses ResumePayload(afterMessageId: lastSeen, includeOpenStreams: true) so it passes includeOpenStreams: false instead, ensuring the example matches the documented ignored behavior of the field.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@docs/guides/resume.md`:
- Around line 12-14: The two sentences conflict: reconcile and make a single
clear statement that resume only succeeds when the original session identifier
survives reconnect (session_id must be the same) and that replay is performed
from that surviving session's event log starting after after_message_id; replace
the phrasing that says "new session" with explicit language describing the two
possible behaviors (either reconnect to same session_id and replay from that
session's log after after_message_id, or if a new session_id is allocated then
resume is not possible) and update all occurrences referencing session_id and
after_message_id to use this unified wording.
In `@Sources/ARCP/Client/ARCPClient.swift`:
- Around line 325-333: Pending ping slots are being dropped or left pending
after transport closure so pings can wait for timeout; update the logic to fail
them immediately: in finishUnhandled()/the loop that iterates pendingPongs,
treat the .pending case the same as .waiting by resuming the awaiting
continuation with closedError (use the existing closedError symbol), and in
awaitPong() change the branch that currently treats .none like .pending to
instead fail immediately with closedError (so .none/.pending no longer wait for
timeout). Ensure you update handling for the enum cases .waiting, .pending,
.readyValue, .readyError and keep using pendingPongs and closedError symbols.
In `@Sources/ARCP/Messages/Common.swift`:
- Around line 38-40: The documented comment says heartbeat ack tracking isn't
implemented but Capabilities.init currently defaults heartbeatRecovery to .fail
and thus advertises recovery support; modify the initialization in
Capabilities.init to use a non-recovery default (e.g., .none or an explicit
"unsupported" variant) or add logic to prevent advertising heartbeat recovery
until ack tracking exists by changing the default value of the heartbeatRecovery
property or gating where Capabilities is serialized/advertised (refer to the
HeartbeatRecovery enum and Capabilities.init to locate the code).
In `@Sources/ARCP/Messages/ModelUse.swift`:
- Around line 34-40: The function patternCovers currently treats an expanded
child pattern as covered by a literal parent (e.g., parent "abc" vs child
"abc*abc"); to fix, add a guard that when parent contains no '*' but child does,
patternCovers returns false (i.e., reject wildcard children for literal-only
parents) before falling back to matches(pattern:value:). Update patternCovers to
check parent.contains("*") and child.contains("*") (or explicitly: if
!parent.contains("*") && child.contains("*") { return false }) so
subsetViolation(of:) cannot miss capability expansions.
In `@Sources/ARCP/Runtime/CredentialManager.swift`:
- Around line 67-103: In rotate(jobId:credentialId:) do not append the
provisioner response to the prior tracked set — treat the
provisioner.issue(lease:jobId:sessionId:) result as the authoritative full set:
compute newSet = next, determine removedIds = existing.ids minus newSet.ids,
optionally call await revokeWithRetry(id) for each removedId if you want to
revoke dropped credentials, then replace credentialsByJob[jobId] = newSet and
call try await retention.persistOutstanding(newSet.map(\.id), jobId: jobId);
this ensures omitted credentials are not kept as stale tracked state.
In `@Sources/ARCP/Runtime/PendingRegistry.swift`:
- Around line 24-26: The method awaitResponse currently unconditionally does
slots[id] = .pending which can overwrite an existing .waiting and orphan its
continuation; update awaitResponse to first check slots[id] for an existing
value and if a .waiting entry (or any non-nil) exists, prevent registration by
returning/throwing a descriptive error (e.g. DuplicateAwaitError) instead of
overwriting, otherwise insert the new .pending entry and proceed to create the
timeout Task; reference the awaitResponse function, the slots dictionary,
MessageId key, and the .pending/.waiting slot cases when making the check and
error handling so the original continuation is never lost.
In `@Tests/ARCPTests/Integration/IdempotencyTests.swift`:
- Line 10: Reset the shared static counter on CountingTool before each test
invocation to avoid state leakage between in-process runs: locate the test cases
that create IntegrationFixture(handler: CountingTool()) in
IdempotencyTests.swift (instances referencing CountingTool and
IntegrationFixture) and set CountingTool.counter = 0 (or the appropriate initial
value) immediately before the invocation sequence/fixture setup in each test
(including the other spots noted around lines 46–47 and 56–63) so each test
starts with a deterministic counter state.
---
Outside diff comments:
In `@docs/guides/resume.md`:
- Around line 82-83: The ResumePayload table is inconsistent with the SDK scope
(which says these fields are not implemented/ignored); update the ResumePayload
table to reflect current behavior by removing or marking the `checkpointId` and
`includeOpenStreams` rows as "Not implemented / Ignored (reserved for future
release)" (or explicitly state deferred to v0.2 for `checkpointId`), ensuring
the table entries for `checkpointId` and `includeOpenStreams` match the text in
the scope section and no longer imply active support.
---
Nitpick comments:
In `@docs/guides/resume.md`:
- Line 68: Update the example ResumePayload call to reflect current behavior by
setting includeOpenStreams to false: change the ResumePayload(...) sample that
uses ResumePayload(afterMessageId: lastSeen, includeOpenStreams: true) so it
passes includeOpenStreams: false instead, ensuring the example matches the
documented ignored behavior of the field.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro Plus
Run ID: 1cecc6d7-8cad-4f41-82d7-9b7f920b0122
📒 Files selected for processing (32)
CONFORMANCE.mdSources/ARCP/Auth/AuthScheme.swiftSources/ARCP/Auth/BearerAuth.swiftSources/ARCP/Auth/JWTAuth.swiftSources/ARCP/Client/ARCPClient.swiftSources/ARCP/Client/ResultChunkStream.swiftSources/ARCP/Envelope/Envelope.swiftSources/ARCP/Ids/Ulid.swiftSources/ARCP/Messages/Common.swiftSources/ARCP/Messages/ModelUse.swiftSources/ARCP/Runtime/ARCPRuntime.swiftSources/ARCP/Runtime/CredentialManager.swiftSources/ARCP/Runtime/JobContext.swiftSources/ARCP/Runtime/JobManager.swiftSources/ARCP/Runtime/LeaseManager.swiftSources/ARCP/Runtime/Mailbox.swiftSources/ARCP/Runtime/PendingRegistry.swiftSources/ARCP/Runtime/StreamManager.swiftSources/ARCP/Runtime/SubscriptionManager.swiftSources/ARCP/Runtime/ToolHandler.swiftTests/ARCPTests/IdsTests.swiftTests/ARCPTests/Integration/ClientDispatchTests.swiftTests/ARCPTests/Integration/IdempotencyTests.swiftTests/ARCPTests/Integration/SubscriptionCleanupTests.swiftTests/ARCPTests/JWTAuthTests.swiftTests/ARCPTests/LeaseManagerTests.swiftTests/ARCPTests/MailboxTests.swiftTests/ARCPTests/ModelUseTests.swiftTests/ARCPTests/PendingRegistryTests.swiftdocs/conformance.mddocs/guides/jobs.mddocs/guides/resume.md
| The runtime accepts the resume request on the *new* session and | ||
| replays from the new session's event log starting after | ||
| `after_message_id`. |
There was a problem hiding this comment.
Clarify contradictory session semantics in resume scope.
Line 12-14 implies replay on a new session log, while Line 19-21 says resume only works if the same session_id survives reconnect. These conflict and will confuse implementers.
Suggested doc fix
-- After a transport drop, you must reconnect over a fresh transport.
- The runtime accepts the resume request on the *new* session and
- replays from the new session's event log starting after
- `after_message_id`.
+- After a transport drop, you must reconnect over a fresh transport.
+ Resume replay is scoped to the active `session_id` on reconnect and
+ starts after `after_message_id` in that same session's event log.Also applies to: 19-21
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@docs/guides/resume.md` around lines 12 - 14, The two sentences conflict:
reconcile and make a single clear statement that resume only succeeds when the
original session identifier survives reconnect (session_id must be the same) and
that replay is performed from that surviving session's event log starting after
after_message_id; replace the phrasing that says "new session" with explicit
language describing the two possible behaviors (either reconnect to same
session_id and replay from that session's log after after_message_id, or if a
new session_id is allocated then resume is not possible) and update all
occurrences referencing session_id and after_message_id to use this unified
wording.
| let pongSlots = pendingPongs | ||
| pendingPongs.removeAll() | ||
| for (_, slot) in pongSlots { | ||
| switch slot { | ||
| case .waiting(let cont): | ||
| cont.resume(throwing: closedError) | ||
| case .pending, .readyValue, .readyError: | ||
| break | ||
| } |
There was a problem hiding this comment.
Fail pending ping immediately after transport close.
finishUnhandled() drops .pending pong slots, and awaitPong() currently treats .none like .pending (Line 389), so a ping in that window waits for timeout instead of getting the transport-closed error immediately.
💡 Suggested fix
public actor ARCPClient {
+ private var isTransportClosed = false
@@
private func finishUnhandled() {
+ isTransportClosed = true
unhandledContinuation?.finish()
unhandledContinuation = nil
@@
private func awaitPong(id: MessageId) async throws -> PongPayload {
try await withCheckedThrowingContinuation { (cont: CheckedContinuation<PongPayload, Error>) in
switch pendingPongs[id] {
case .readyValue(let payload):
pendingPongs.removeValue(forKey: id)
cont.resume(returning: payload)
case .readyError(let err):
pendingPongs.removeValue(forKey: id)
cont.resume(throwing: err)
- case .pending, .none:
+ case .pending:
pendingPongs[id] = .waiting(cont)
+ case .none:
+ let err = ARCPError.unavailable(reason: "transport closed", retryAfter: nil)
+ cont.resume(throwing: err)
case .waiting:
cont.resume(throwing: ARCPError.internal(detail: "double-attach pong \(id)", cause: nil))
}
}
}
}Also applies to: 389-391
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@Sources/ARCP/Client/ARCPClient.swift` around lines 325 - 333, Pending ping
slots are being dropped or left pending after transport closure so pings can
wait for timeout; update the logic to fail them immediately: in
finishUnhandled()/the loop that iterates pendingPongs, treat the .pending case
the same as .waiting by resuming the awaiting continuation with closedError (use
the existing closedError symbol), and in awaitPong() change the branch that
currently treats .none like .pending to instead fail immediately with
closedError (so .none/.pending no longer wait for timeout). Ensure you update
handling for the enum cases .waiting, .pending, .readyValue, .readyError and
keep using pendingPongs and closedError symbols.
| /// Heartbeat recovery policy (RFC §10.3). **Heartbeat ack tracking is | ||
| /// not implemented in this release** — heartbeats are telemetry only. | ||
| public var heartbeatRecovery: HeartbeatRecovery |
There was a problem hiding this comment.
Align heartbeatRecovery default with documented behavior.
These docs correctly state heartbeat recovery is not implemented, but Capabilities.init still defaults heartbeatRecovery to .fail (Line 70). That can over-advertise behavior during negotiation. Consider defaulting to a non-recovery value (or gating advertisement) until ack tracking exists.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@Sources/ARCP/Messages/Common.swift` around lines 38 - 40, The documented
comment says heartbeat ack tracking isn't implemented but Capabilities.init
currently defaults heartbeatRecovery to .fail and thus advertises recovery
support; modify the initialization in Capabilities.init to use a non-recovery
default (e.g., .none or an explicit "unsupported" variant) or add logic to
prevent advertising heartbeat recovery until ack tracking exists by changing the
default value of the heartbeatRecovery property or gating where Capabilities is
serialized/advertised (refer to the HeartbeatRecovery enum and Capabilities.init
to locate the code).
| static func patternCovers(parent: String, child: String) -> Bool { | ||
| if parent == "*" { return true } | ||
| if parent == child { return true } | ||
| // If child has no wildcards, defer to literal matching. | ||
| if !child.contains("*") { | ||
| return matches(pattern: parent, value: child) | ||
| } |
There was a problem hiding this comment.
Reject wildcard children when the parent pattern is literal-only.
patternCovers can currently return true for expanded child patterns when parent has no * (e.g., parent = "abc", child = "abc*abc"). That is a false positive and can let subsetViolation(of:) miss a capability expansion.
🔧 Proposed fix
static func patternCovers(parent: String, child: String) -> Bool {
if parent == "*" { return true }
if parent == child { return true }
+ if !parent.contains("*") && child.contains("*") { return false }
// If child has no wildcards, defer to literal matching.
if !child.contains("*") {
return matches(pattern: parent, value: child)
}🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@Sources/ARCP/Messages/ModelUse.swift` around lines 34 - 40, The function
patternCovers currently treats an expanded child pattern as covered by a literal
parent (e.g., parent "abc" vs child "abc*abc"); to fix, add a guard that when
parent contains no '*' but child does, patternCovers returns false (i.e., reject
wildcard children for literal-only parents) before falling back to
matches(pattern:value:). Update patternCovers to check parent.contains("*") and
child.contains("*") (or explicitly: if !parent.contains("*") &&
child.contains("*") { return false }) so subsetViolation(of:) cannot miss
capability expansions.
| /// The `CredentialProvisioner` contract is treated as returning the full | ||
| /// credential set for the lease (a provider may mint multiple paired | ||
| /// credentials per issuance — e.g. access + refresh token, or several | ||
| /// vendor credentials in a single transactional issue). The rotation | ||
| /// flow therefore: | ||
| /// | ||
| /// 1. Revokes the credential that is being rotated. | ||
| /// 2. Removes that credential from the job's tracked set. | ||
| /// 3. Appends every credential returned by the provisioner to the job's | ||
| /// tracked set so the runtime can revoke them later — none are | ||
| /// silently discarded. | ||
| /// 4. Persists the updated outstanding-id list through retention. | ||
| /// | ||
| /// Returns the credential identified as the replacement: the entry whose | ||
| /// `id` matches `credentialId` if the provisioner is performing an | ||
| /// in-place rotation, otherwise the first newly minted entry. | ||
| public func rotate(jobId: JobId, credentialId: String) async throws -> ProvisionedCredential { | ||
| guard let lease = leaseByJob[jobId] else { | ||
| throw ARCPError.notFound(kind: "job credentials", id: jobId.rawValue) | ||
| } | ||
| let next = try await provisioner.issue(lease: lease, jobId: jobId, sessionId: sessionId) | ||
| guard let replacement = next.first else { | ||
| throw ARCPError.unavailable(reason: "credential rotation returned no credential", retryAfter: nil) | ||
| guard !next.isEmpty else { | ||
| throw ARCPError.unavailable( | ||
| reason: "credential rotation returned no credentials", | ||
| retryAfter: nil | ||
| ) | ||
| } | ||
| var existing = credentialsByJob[jobId] ?? [] | ||
| if let old = existing.first(where: { $0.id == credentialId }) { | ||
| await revokeWithRetry(old.id) | ||
| if existing.contains(where: { $0.id == credentialId }) { | ||
| await revokeWithRetry(credentialId) | ||
| existing.removeAll { $0.id == credentialId } | ||
| } | ||
| existing.append(replacement) | ||
| existing.append(contentsOf: next) | ||
| credentialsByJob[jobId] = existing | ||
| try await retention.persistOutstanding(existing.map(\.id), jobId: jobId) | ||
| let replacement = next.first(where: { $0.id == credentialId }) ?? next[0] | ||
| return replacement |
There was a problem hiding this comment.
Avoid retaining stale credentials when rotation returns a full replacement set.
Line 67 says rotation treats provisioner output as the full lease credential set, but Lines 99-101 append next onto prior existing. Credentials omitted from next remain tracked/persisted, which can stale retention state and later revocation behavior. Replace tracked state with the returned set (optionally revoking dropped IDs).
Proposed fix
- var existing = credentialsByJob[jobId] ?? []
- if existing.contains(where: { $0.id == credentialId }) {
+ let existing = credentialsByJob[jobId] ?? []
+ if existing.contains(where: { $0.id == credentialId }) {
await revokeWithRetry(credentialId)
- existing.removeAll { $0.id == credentialId }
}
- existing.append(contentsOf: next)
- credentialsByJob[jobId] = existing
- try await retention.persistOutstanding(existing.map(\.id), jobId: jobId)
+ // Provisioner response is the authoritative replacement set.
+ let nextIds = Set(next.map(\.id))
+ for stale in existing where !nextIds.contains(stale.id) && stale.id != credentialId {
+ await revokeWithRetry(stale.id)
+ }
+ credentialsByJob[jobId] = next
+ try await retention.persistOutstanding(next.map(\.id), jobId: jobId)
let replacement = next.first(where: { $0.id == credentialId }) ?? next[0]📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| /// The `CredentialProvisioner` contract is treated as returning the full | |
| /// credential set for the lease (a provider may mint multiple paired | |
| /// credentials per issuance — e.g. access + refresh token, or several | |
| /// vendor credentials in a single transactional issue). The rotation | |
| /// flow therefore: | |
| /// | |
| /// 1. Revokes the credential that is being rotated. | |
| /// 2. Removes that credential from the job's tracked set. | |
| /// 3. Appends every credential returned by the provisioner to the job's | |
| /// tracked set so the runtime can revoke them later — none are | |
| /// silently discarded. | |
| /// 4. Persists the updated outstanding-id list through retention. | |
| /// | |
| /// Returns the credential identified as the replacement: the entry whose | |
| /// `id` matches `credentialId` if the provisioner is performing an | |
| /// in-place rotation, otherwise the first newly minted entry. | |
| public func rotate(jobId: JobId, credentialId: String) async throws -> ProvisionedCredential { | |
| guard let lease = leaseByJob[jobId] else { | |
| throw ARCPError.notFound(kind: "job credentials", id: jobId.rawValue) | |
| } | |
| let next = try await provisioner.issue(lease: lease, jobId: jobId, sessionId: sessionId) | |
| guard let replacement = next.first else { | |
| throw ARCPError.unavailable(reason: "credential rotation returned no credential", retryAfter: nil) | |
| guard !next.isEmpty else { | |
| throw ARCPError.unavailable( | |
| reason: "credential rotation returned no credentials", | |
| retryAfter: nil | |
| ) | |
| } | |
| var existing = credentialsByJob[jobId] ?? [] | |
| if let old = existing.first(where: { $0.id == credentialId }) { | |
| await revokeWithRetry(old.id) | |
| if existing.contains(where: { $0.id == credentialId }) { | |
| await revokeWithRetry(credentialId) | |
| existing.removeAll { $0.id == credentialId } | |
| } | |
| existing.append(replacement) | |
| existing.append(contentsOf: next) | |
| credentialsByJob[jobId] = existing | |
| try await retention.persistOutstanding(existing.map(\.id), jobId: jobId) | |
| let replacement = next.first(where: { $0.id == credentialId }) ?? next[0] | |
| return replacement | |
| /// The `CredentialProvisioner` contract is treated as returning the full | |
| /// credential set for the lease (a provider may mint multiple paired | |
| /// credentials per issuance — e.g. access + refresh token, or several | |
| /// vendor credentials in a single transactional issue). The rotation | |
| /// flow therefore: | |
| /// | |
| /// 1. Revokes the credential that is being rotated. | |
| /// 2. Removes that credential from the job's tracked set. | |
| /// 3. Appends every credential returned by the provisioner to the job's | |
| /// tracked set so the runtime can revoke them later — none are | |
| /// silently discarded. | |
| /// 4. Persists the updated outstanding-id list through retention. | |
| /// | |
| /// Returns the credential identified as the replacement: the entry whose | |
| /// `id` matches `credentialId` if the provisioner is performing an | |
| /// in-place rotation, otherwise the first newly minted entry. | |
| public func rotate(jobId: JobId, credentialId: String) async throws -> ProvisionedCredential { | |
| guard let lease = leaseByJob[jobId] else { | |
| throw ARCPError.notFound(kind: "job credentials", id: jobId.rawValue) | |
| } | |
| let next = try await provisioner.issue(lease: lease, jobId: jobId, sessionId: sessionId) | |
| guard !next.isEmpty else { | |
| throw ARCPError.unavailable( | |
| reason: "credential rotation returned no credentials", | |
| retryAfter: nil | |
| ) | |
| } | |
| let existing = credentialsByJob[jobId] ?? [] | |
| if existing.contains(where: { $0.id == credentialId }) { | |
| await revokeWithRetry(credentialId) | |
| } | |
| // Provisioner response is the authoritative replacement set. | |
| let nextIds = Set(next.map(\.id)) | |
| for stale in existing where !nextIds.contains(stale.id) && stale.id != credentialId { | |
| await revokeWithRetry(stale.id) | |
| } | |
| credentialsByJob[jobId] = next | |
| try await retention.persistOutstanding(next.map(\.id), jobId: jobId) | |
| let replacement = next.first(where: { $0.id == credentialId }) ?? next[0] | |
| return replacement |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@Sources/ARCP/Runtime/CredentialManager.swift` around lines 67 - 103, In
rotate(jobId:credentialId:) do not append the provisioner response to the prior
tracked set — treat the provisioner.issue(lease:jobId:sessionId:) result as the
authoritative full set: compute newSet = next, determine removedIds =
existing.ids minus newSet.ids, optionally call await revokeWithRetry(id) for
each removedId if you want to revoke dropped credentials, then replace
credentialsByJob[jobId] = newSet and call try await
retention.persistOutstanding(newSet.map(\.id), jobId: jobId); this ensures
omitted credentials are not kept as stale tracked state.
| public func awaitResponse(id: MessageId, deadline: Duration) async throws -> Response { | ||
| try await withThrowingTaskGroup(of: Response.self) { group in | ||
| group.addTask { | ||
| try await self.suspend(id: id) | ||
| } | ||
| group.addTask { [weak self] in | ||
| try await Task.sleep(for: deadline) | ||
| // Drop the waiter without resolving it — that lets the | ||
| // deadline error win the race deterministically. We don't | ||
| // want a cancel-resume here because that would race the | ||
| // resume against this task's throw. | ||
| await self?.dropForTimeout(id: id) | ||
| throw ARCPError.deadlineExceeded(operation: "pending response \(id)") | ||
| } | ||
| defer { group.cancelAll() } | ||
| guard let result = try await group.next() else { | ||
| throw ARCPError.internal(detail: "pending registry empty", cause: nil) | ||
| } | ||
| return result | ||
| slots[id] = .pending | ||
| let timeoutTask = Task { [weak self] in |
There was a problem hiding this comment.
Guard against duplicate awaitResponse registration for the same id.
At Line 25, unconditional slots[id] = .pending can overwrite an existing .waiting slot. A second awaitResponse for the same MessageId can orphan the first continuation (never resumed).
Suggested fix
public func awaitResponse(id: MessageId, deadline: Duration) async throws -> Response {
+ if slots[id] != nil {
+ throw ARCPError.internal(detail: "duplicate pending \(id)", cause: nil)
+ }
slots[id] = .pending
let timeoutTask = Task { [weak self] in
try? await Task.sleep(for: deadline)
if Task.isCancelled { return }
await self?.dropForTimeout(id: id)
}
defer { timeoutTask.cancel() }
return try await suspend(id: id)
}📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| public func awaitResponse(id: MessageId, deadline: Duration) async throws -> Response { | |
| try await withThrowingTaskGroup(of: Response.self) { group in | |
| group.addTask { | |
| try await self.suspend(id: id) | |
| } | |
| group.addTask { [weak self] in | |
| try await Task.sleep(for: deadline) | |
| // Drop the waiter without resolving it — that lets the | |
| // deadline error win the race deterministically. We don't | |
| // want a cancel-resume here because that would race the | |
| // resume against this task's throw. | |
| await self?.dropForTimeout(id: id) | |
| throw ARCPError.deadlineExceeded(operation: "pending response \(id)") | |
| } | |
| defer { group.cancelAll() } | |
| guard let result = try await group.next() else { | |
| throw ARCPError.internal(detail: "pending registry empty", cause: nil) | |
| } | |
| return result | |
| slots[id] = .pending | |
| let timeoutTask = Task { [weak self] in | |
| public func awaitResponse(id: MessageId, deadline: Duration) async throws -> Response { | |
| if slots[id] != nil { | |
| throw ARCPError.internal(detail: "duplicate pending \(id)", cause: nil) | |
| } | |
| slots[id] = .pending | |
| let timeoutTask = Task { [weak self] in |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@Sources/ARCP/Runtime/PendingRegistry.swift` around lines 24 - 26, The method
awaitResponse currently unconditionally does slots[id] = .pending which can
overwrite an existing .waiting and orphan its continuation; update awaitResponse
to first check slots[id] for an existing value and if a .waiting entry (or any
non-nil) exists, prevent registration by returning/throwing a descriptive error
(e.g. DuplicateAwaitError) instead of overwriting, otherwise insert the new
.pending entry and proceed to create the timeout Task; reference the
awaitResponse function, the slots dictionary, MessageId key, and the
.pending/.waiting slot cases when making the check and error handling so the
original continuation is never lost.
| struct IdempotencyTests { | ||
| @Test("Second invoke with same idempotency key returns the cached terminal") | ||
| func duplicateInvocationReplays() async throws { | ||
| let fixture = IntegrationFixture(handler: CountingTool()) |
There was a problem hiding this comment.
Reset shared counter to keep this test deterministic.
CountingTool.counter is static, so state can leak between in-process executions. Reset it before the invocation sequence.
💡 Suggested fix
`@Suite`("Idempotency semantics (issue `#43`)")
struct IdempotencyTests {
`@Test`("Second invoke with same idempotency key returns the cached terminal")
func duplicateInvocationReplays() async throws {
+ await CountingTool.counter.reset()
let fixture = IntegrationFixture(handler: CountingTool())
let open = try await fixture.open()
defer { open.close() }
@@
private actor AtomicCounter {
private(set) var value: Int = 0
+ func reset() { value = 0 }
`@discardableResult`
func increment() -> Int {
value += 1
return value
}
}Also applies to: 46-47, 56-63
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@Tests/ARCPTests/Integration/IdempotencyTests.swift` at line 10, Reset the
shared static counter on CountingTool before each test invocation to avoid state
leakage between in-process runs: locate the test cases that create
IntegrationFixture(handler: CountingTool()) in IdempotencyTests.swift (instances
referencing CountingTool and IntegrationFixture) and set CountingTool.counter =
0 (or the appropriate initial value) immediately before the invocation
sequence/fixture setup in each test (including the other spots noted around
lines 46–47 and 56–63) so each test starts with a deterministic counter state.
Summary
Batch fix for 24 open issues spanning security, concurrency, protocol semantics, and documentation. Builds clean, all 137 tests (112 existing + 25 new) pass locally on macOS / Swift 6.3.
Security
JWTAuthValidatornow requires anonceclaim matching the runtime-issued challenge so captured tokens can't be replayed across challenges.ModelUse.subsetViolationreworked with a real glob-inclusion check (parent wildcards with required suffix segments can no longer be escaped, e.g.openai:gpt-*odoes NOT coveropenai:gpt-5).Concurrency / lifecycle
PendingRegistryandARCPClient.pinguse a slot state machine so the timeout vs. waiter race can't leave dangling continuations.ARCPClient.finishUnhandled()fails every pending invocation / finishes progress + result streams when the transport closes.invokecleans up state iftransport.sendthrows.ARCPRuntime.registerrace closed by inserting thejobManagersentry before iteratingregisteredHandlers.SubscriptionManager.routeignoressubscribe.eventenvelopes to prevent recursive cascade on empty filters.SessionId; runtime cleans them up on session close.StreamManager.subscribeInboundrejects duplicate subscriptions on the same stream id withfailedPrecondition.Client dispatch
job.progress,job.result_chunk, andtool.errorenvelopes fall through toclient.unhandledinstead of being silently dropped or double-delivered.Runtime semantics
job.completed/failed/cancelledpayload (scoped by principal + key) and replay it on duplicate invokes without re-running the handler.Envelope.initreadsTracing.currentfor default trace fields;JobManagerwraps invocation handling under the inbound trace context.ARCPRuntime.init; lease expiry sweep starts inJobManager.init.CredentialManager.rotatekeeps every credential returned by the provisioner instead of silently discarding extras.Ulid.nextpreserves strict monotonicity across equal-and-backward clocks and tail overflow.Mailboxswitched to a head-index queue with periodic compaction (O(1) amortized drain).invalidArgument;handleLeaseRefreshsurfaces errors asnackinstead of swallowing them.Docs
CONFORMANCE.mdnarrowed to same-session replay only (no cross-session, no checkpoint, no open-stream re-emission).Capabilities.interruptshould stayfalse.Tests / coverage
Mailbox,LeaseManager,PendingRegistry,JWTAuth(was 0% covered), client dispatch fallback, subscription cleanup, idempotency replay, ULID burst monotonicity.Closes
#40 #41 #42 #43 #44 #45 #46 #47 #48 #49 #50 #51 #52 #53 #54 #55 #56 #57 #58 #59 #60 #61 #62 #63
Test plan
swift build— cleanswift test— 137 passed locally🤖 Generated with Claude Code
Summary by CodeRabbit
Release Notes
New Features
Bug Fixes
Documentation