diff --git a/CONFORMANCE.md b/CONFORMANCE.md index f567853..1fe342f 100644 --- a/CONFORMANCE.md +++ b/CONFORMANCE.md @@ -19,7 +19,12 @@ below. ### Durable jobs (§9 – §14) - Job state machine: `queued → running → completed | failed | cancelled` -- Heartbeats, cooperative cancellation (`job.cancel`), interrupts (`job.interrupt`) +- Heartbeats (telemetry only — see below), cooperative cancellation (`job.cancel`). + Interrupts (`interrupt`) are **not advertised by default** — the current + runtime only transitions the job state to `.blocked` and acks the + envelope; there is no handler-visible callback that lets the running + job observe and respond to an interrupt. Leave + `Capabilities.interrupt = false` unless you have wired your own observer. - `tool.invoke` + `ToolHandler` adapter pattern - `JobContext`: `checkLeaseExpiration`, `checkCancellation`, `charge`, `log`, `metric`, `requestPermission`, `reportProgress`, `openStream`, `emitResultChunk` @@ -65,7 +70,11 @@ below. ### Subscriptions (§18) - `subscription.filter`, backfill, `subscription.backfill_complete` boundary -- Resume by `after_message_id` +- Resume by `after_message_id` — **same-session only**: the runtime replays + envelopes for the current session id. Cross-session resume (carrying a + prior session's `after_message_id` into a fresh session) is not + implemented. `checkpoint_id` and `include_open_streams` are currently + ignored. ### Artifacts (§21) diff --git a/Sources/ARCP/Auth/AuthScheme.swift b/Sources/ARCP/Auth/AuthScheme.swift index 19fcde7..3dc8252 100644 --- a/Sources/ARCP/Auth/AuthScheme.swift +++ b/Sources/ARCP/Auth/AuthScheme.swift @@ -15,9 +15,12 @@ public protocol AuthValidator: Sendable { /// Result of successful credential validation. public struct AuthenticatedPrincipal: Sendable, Hashable { + /// Subject (`sub`) identifier of the authenticated principal. public var subject: String + /// Trust level granted to the principal by the validator. public var trustLevel: TrustLevel + /// Create an authenticated principal. public init(subject: String, trustLevel: TrustLevel = .trusted) { self.subject = subject self.trustLevel = trustLevel @@ -26,16 +29,20 @@ public struct AuthenticatedPrincipal: Sendable, Hashable { /// Composite validator that fans out to scheme-specific validators. public struct CompositeAuthValidator: AuthValidator { + /// Validators that this composite fans out to in order. public let validators: [any AuthValidator] + /// Wrap a list of scheme-specific validators behind a single facade. public init(_ validators: [any AuthValidator]) { self.validators = validators } + /// Return `true` if any wrapped validator supports `scheme`. public func supports(_ scheme: AuthScheme) -> Bool { validators.contains { $0.supports(scheme) } } + /// Validate `auth` against the first wrapped validator that supports its scheme. public func validate( auth: AuthBlock, challenge nonce: String? diff --git a/Sources/ARCP/Auth/BearerAuth.swift b/Sources/ARCP/Auth/BearerAuth.swift index dacbd43..7497fd7 100644 --- a/Sources/ARCP/Auth/BearerAuth.swift +++ b/Sources/ARCP/Auth/BearerAuth.swift @@ -4,6 +4,7 @@ public struct BearerAuthValidator: AuthValidator { private let principals: [String: AuthenticatedPrincipal] + /// Initialize with an explicit token → principal map. public init(_ principals: [String: AuthenticatedPrincipal]) { self.principals = principals } @@ -17,10 +18,13 @@ public struct BearerAuthValidator: AuthValidator { self.principals = map } + /// Returns `true` for `bearer`. public func supports(_ scheme: AuthScheme) -> Bool { scheme == .bearer } + /// Validate the bearer token and return the mapped principal, or throw + /// `ARCPError.unauthenticated` on failure. public func validate( auth: AuthBlock, challenge nonce: String? diff --git a/Sources/ARCP/Auth/JWTAuth.swift b/Sources/ARCP/Auth/JWTAuth.swift index 8469d5d..5f18c19 100644 --- a/Sources/ARCP/Auth/JWTAuth.swift +++ b/Sources/ARCP/Auth/JWTAuth.swift @@ -3,9 +3,12 @@ import JWTKit /// `signed_jwt` validator. RFC §8.2. /// -/// Validates the JWT's signature against the configured key collection -/// and asserts `aud` matches the runtime identity before returning the -/// authenticated principal. +/// Validates the JWT's signature against the configured key collection, +/// asserts `aud` matches the runtime identity, and — when the runtime +/// issued a challenge nonce — requires the JWT to carry the same nonce +/// (claim `nonce`). The nonce binding prevents a captured token from being +/// replayed against a fresh challenge until it expires. When no nonce is +/// supplied (no challenge was issued), the JWT need not carry one. public struct JWTAuthValidator: AuthValidator { private let keys: JWTKeyCollection private let audience: String @@ -29,7 +32,9 @@ public struct JWTAuthValidator: AuthValidator { /// /// - Parameters: /// - auth: Authentication block received during the handshake. - /// - nonce: Optional challenge nonce from the runtime. + /// - nonce: Optional challenge nonce from the runtime. When supplied, + /// the JWT MUST carry a matching `nonce` claim or the token is + /// rejected with `ARCPError.unauthenticated`. /// - Returns: The authenticated principal. /// - Throws: `ARCPError.unauthenticated` when the token is missing or invalid. public func validate( @@ -45,6 +50,16 @@ public struct JWTAuthValidator: AuthValidator { do { let payload = try await keys.verify(token, as: ARCPClaims.self) try payload.aud.verifyIntendedAudience(includes: audience) + if let nonce { + guard let tokenNonce = payload.nonce, !tokenNonce.isEmpty else { + throw ARCPError.unauthenticated( + detail: "challenge required but JWT carries no nonce claim" + ) + } + guard tokenNonce == nonce else { + throw ARCPError.unauthenticated(detail: "JWT nonce does not match challenge") + } + } return AuthenticatedPrincipal(subject: payload.sub.value, trustLevel: trustLevel) } catch let error as ARCPError { throw error @@ -55,13 +70,17 @@ public struct JWTAuthValidator: AuthValidator { } /// Minimal claim set used by ARCP JWTs. Deployments are free to issue richer -/// JWTs; only the standard fields are inspected here. +/// JWTs; only the standard fields plus the optional `nonce` (used for +/// challenge binding) and `jti` (replay-tracking identifier) are inspected +/// here. struct ARCPClaims: JWTPayload { var sub: SubjectClaim var aud: AudienceClaim var exp: ExpirationClaim var iat: IssuedAtClaim? var iss: IssuerClaim? + var nonce: String? + var jti: String? func verify(using algorithm: some JWTAlgorithm) async throws { try exp.verifyNotExpired() diff --git a/Sources/ARCP/Client/ARCPClient.swift b/Sources/ARCP/Client/ARCPClient.swift index 78492f5..1c510ac 100644 --- a/Sources/ARCP/Client/ARCPClient.swift +++ b/Sources/ARCP/Client/ARCPClient.swift @@ -11,7 +11,14 @@ public actor ARCPClient { private let mailbox: Mailbox private let drainer: Task private var dispatcher: Task? - private var pendingPongs: [MessageId: CheckedContinuation] = [:] + private var pendingPongs: [MessageId: PongSlot] = [:] + + private enum PongSlot { + case pending + case waiting(CheckedContinuation) + case readyValue(PongPayload) + case readyError(any Error) + } private var pendingByInvoke: [MessageId: JobInvocationState] = [:] private var invokeByJobId: [JobId: MessageId] = [:] private var resultChunkStreams: [JobId: ResultChunkStream] = [:] @@ -165,9 +172,17 @@ public actor ARCPClient { switch envelope.payload { case .pong(let payload): if let id = envelope.correlationId, - let cont = pendingPongs.removeValue(forKey: id) + let slot = pendingPongs[id] { - cont.resume(returning: payload) + switch slot { + case .pending: + pendingPongs[id] = .readyValue(payload) + case .waiting(let cont): + pendingPongs.removeValue(forKey: id) + cont.resume(returning: payload) + case .readyValue, .readyError: + unhandledContinuation?.yield(envelope) + } } else { unhandledContinuation?.yield(envelope) } @@ -184,6 +199,8 @@ public actor ARCPClient { let state = pendingByInvoke[invokeId] { state.progressContinuation?.yield(payload) + } else { + unhandledContinuation?.yield(envelope) } case .jobCompleted(let payload): if let jobId = envelope.jobId { @@ -222,11 +239,14 @@ public actor ARCPClient { case .jobResultChunk(let payload): if let jobId = envelope.jobId, let stream = resultChunkStreams[jobId] { try? await stream.push(payload) + } else { + unhandledContinuation?.yield(envelope) } - unhandledContinuation?.yield(envelope) case .toolError(let payload): if let invokeId = envelope.correlationId, pendingByInvoke[invokeId] != nil { resolve(invokeId: invokeId, outcome: .failed(payload.error)) + } else { + unhandledContinuation?.yield(envelope) } case .permissionRequest(let payload): Task { [transport, permissionHandler] in @@ -301,10 +321,31 @@ public actor ARCPClient { private func finishUnhandled() { unhandledContinuation?.finish() unhandledContinuation = nil - for (_, cont) in pendingPongs { - cont.resume(throwing: ARCPError.unavailable(reason: "transport closed", retryAfter: nil)) - } + let closedError = ARCPError.unavailable(reason: "transport closed", retryAfter: nil) + let pongSlots = pendingPongs pendingPongs.removeAll() + for (_, slot) in pongSlots { + switch slot { + case .waiting(let cont): + cont.resume(throwing: closedError) + case .pending, .readyValue, .readyError: + break + } + } + let pending = pendingByInvoke + pendingByInvoke.removeAll() + invokeByJobId.removeAll() + for (_, state) in pending { + state.progressContinuation?.finish() + state.continuation?.resume(throwing: closedError) + } + let streams = resultChunkStreams + resultChunkStreams.removeAll() + Task { + for (_, stream) in streams { + await stream.fail(closedError) + } + } } /// Send a `ping`, await the corresponding `pong`. Times out after `timeout`. @@ -315,41 +356,55 @@ public actor ARCPClient { -> PongPayload { let id = MessageId.random() + pendingPongs[id] = .pending let envelope = Envelope( id: id, sessionId: info.sessionId, payload: .ping(PingPayload(nonce: nonce)) ) - try await transport.send(envelope) - return try await withThrowingTaskGroup(of: PongPayload.self) { group in - group.addTask { [weak self] in - guard let self = self else { - throw ARCPError.unavailable(reason: "client released", retryAfter: nil) - } - return try await self.awaitPong(id: id) - } - group.addTask { [weak self] in - try await Task.sleep(for: timeout) - await self?.failPongWaiter(id: id) - throw ARCPError.deadlineExceeded(operation: "ping \(id)") - } - defer { group.cancelAll() } - guard let result = try await group.next() else { - throw ARCPError.internal(detail: "ping group empty", cause: nil) - } - return result + let timeoutTask = Task { [weak self] in + try? await Task.sleep(for: timeout) + if Task.isCancelled { return } + await self?.failPongWaiter(id: id) + } + defer { timeoutTask.cancel() } + do { + try await transport.send(envelope) + } catch { + pendingPongs.removeValue(forKey: id) + throw error } + return try await awaitPong(id: id) } private func awaitPong(id: MessageId) async throws -> PongPayload { try await withCheckedThrowingContinuation { (cont: CheckedContinuation) in - pendingPongs[id] = cont + switch pendingPongs[id] { + case .readyValue(let payload): + pendingPongs.removeValue(forKey: id) + cont.resume(returning: payload) + case .readyError(let err): + pendingPongs.removeValue(forKey: id) + cont.resume(throwing: err) + case .pending, .none: + pendingPongs[id] = .waiting(cont) + case .waiting: + cont.resume(throwing: ARCPError.internal(detail: "double-attach pong \(id)", cause: nil)) + } } } private func failPongWaiter(id: MessageId) { - if let cont = pendingPongs.removeValue(forKey: id) { - cont.resume(throwing: ARCPError.deadlineExceeded(operation: "ping \(id)")) + guard let slot = pendingPongs[id] else { return } + let err = ARCPError.deadlineExceeded(operation: "ping \(id)") + switch slot { + case .pending: + pendingPongs[id] = .readyError(err) + case .waiting(let cont): + pendingPongs.removeValue(forKey: id) + cont.resume(throwing: err) + case .readyValue, .readyError: + break } } @@ -426,7 +481,14 @@ public actor ARCPClient { ) ) ) - try await transport.send(envelope) + do { + try await transport.send(envelope) + } catch { + if let state = pendingByInvoke.removeValue(forKey: invokeId) { + state.progressContinuation?.finish() + } + throw error + } let (outcome, jobId) = try await withCheckedThrowingContinuation { cont in attachContinuation(invokeId: invokeId, cont: cont) } diff --git a/Sources/ARCP/Client/ResultChunkStream.swift b/Sources/ARCP/Client/ResultChunkStream.swift index 919bae1..7ec9e18 100644 --- a/Sources/ARCP/Client/ResultChunkStream.swift +++ b/Sources/ARCP/Client/ResultChunkStream.swift @@ -1,21 +1,31 @@ import Foundation +/// Typed async sequence of `job.result_chunk` payloads for a single job. +/// +/// Returned by `ARCPClient.resultChunks(for:)` and (on the runtime side) +/// produced as part of result-chunk delivery. The stream completes when the +/// job reaches its terminal envelope or when `finish()`/`fail(_:)` is called. public final class ResultChunkStream: AsyncSequence, @unchecked Sendable { + /// The element type yielded by the sequence — one `job.result_chunk` payload. public typealias Element = JobResultChunkPayload + /// Iterator over the underlying throwing async stream of result chunks. public struct AsyncIterator: AsyncIteratorProtocol { fileprivate var base: AsyncThrowingStream.Iterator + /// Advance the iterator. Throws if the stream was failed. public mutating func next() async throws -> Element? { try await base.next() } } + /// Logical result identifier carried by the chunks (RFC §8.4). public let resultId: String private let state = ResultChunkStreamState() private let stream: AsyncThrowingStream private let continuation: AsyncThrowingStream.Continuation + /// Create a new stream optionally tagged with a `resultId`. public init(resultId: String = "") { self.resultId = resultId var continuation: AsyncThrowingStream.Continuation! @@ -23,6 +33,10 @@ public final class ResultChunkStream: AsyncSequence, @unchecked Sendable { self.continuation = continuation } + /// Push a chunk through the underlying assembler and yield it to subscribers. + /// + /// - Parameter chunk: Chunk to deliver. + /// - Throws: Rethrows any assembler error after failing the stream. public func push(_ chunk: JobResultChunkPayload) async throws { do { try await state.push(chunk) @@ -33,19 +47,28 @@ public final class ResultChunkStream: AsyncSequence, @unchecked Sendable { } } + /// Finish the stream gracefully. Subsequent iterators return `nil`. public func finish() async { continuation.finish() } + /// Finish the stream with an error. Subsequent iterators rethrow it. public func fail(_ error: any Error) async { continuation.finish(throwing: error) } + /// Drain the stream to completion and return the assembled bytes. + /// + /// - Returns: The fully assembled result bytes. + /// - Throws: Any assembler error or upstream failure. public func collect() async throws -> Data { for try await _ in self {} return try await state.finish() } + /// Drain the stream and return the assembled bytes as a UTF-8 string. + /// + /// - Throws: `ARCPError.dataLoss` if the bytes are not valid UTF-8. public func collectUTF8() async throws -> String { let data = try await collect() guard let value = String(data: data, encoding: .utf8) else { @@ -54,6 +77,7 @@ public final class ResultChunkStream: AsyncSequence, @unchecked Sendable { return value } + /// Conform to `AsyncSequence`. public func makeAsyncIterator() -> AsyncIterator { AsyncIterator(base: stream.makeAsyncIterator()) } diff --git a/Sources/ARCP/Envelope/Envelope.swift b/Sources/ARCP/Envelope/Envelope.swift index 8b3c4db..70fc23a 100644 --- a/Sources/ARCP/Envelope/Envelope.swift +++ b/Sources/ARCP/Envelope/Envelope.swift @@ -6,23 +6,43 @@ import Foundation /// snake_case key names defined in the RFC. The `payload` field is decoded by /// dispatching on the `type` string into `MessageType.decodePayload(...)`. public struct Envelope: Sendable, Hashable { + /// ARCP wire-version string (e.g. `"1.1"`). RFC §6.1. public var arcp: String + /// Globally unique message identifier (ULID). RFC §6.1. public var id: MessageId + /// Wall-clock creation timestamp. public var timestamp: Date + /// Free-form sender identifier (typically a deployment name). public var source: String? + /// Free-form recipient identifier. public var target: String? + /// Session this message belongs to. Required for any envelope persisted + /// to the event log. public var sessionId: SessionId? + /// Job this message belongs to, if any. RFC §7. public var jobId: JobId? + /// Stream this message belongs to, if any. RFC §11. public var streamId: StreamId? + /// Subscription this message belongs to, if any. RFC §13. public var subscriptionId: SubscriptionId? + /// Distributed-trace identifier (RFC §17.1). Inherits ambient + /// `Tracing.current` if not explicitly set. public var traceId: TraceId? + /// Span identifier for this envelope (RFC §17.1). public var spanId: SpanId? + /// Parent span identifier (RFC §17.1). public var parentSpanId: SpanId? + /// Identifier of the request this envelope responds to (RFC §6.1). public var correlationId: MessageId? + /// Identifier of the envelope that caused this one to be emitted. public var causationId: MessageId? + /// Idempotency key for tool-invocation retry safety (RFC §6.4). public var idempotencyKey: IdempotencyKey? + /// Priority hint for transports and subscription filters. public var priority: Priority + /// Free-form extension fields (RFC §21). public var extensions: [String: JSONValue]? + /// The strongly-typed payload (`type` + body) carried by the envelope. public var payload: MessageType public init( @@ -54,9 +74,19 @@ public struct Envelope: Sendable, Hashable { self.jobId = jobId self.streamId = streamId self.subscriptionId = subscriptionId - self.traceId = traceId - self.spanId = spanId - self.parentSpanId = parentSpanId + // When trace fields are not explicitly set, inherit them from the + // ambient `Tracing.current` task-local context. The observability + // guide promises envelopes built inside `Tracing.withTrace` propagate + // the trace automatically (RFC §17.1). + if let trace = Tracing.current { + self.traceId = traceId ?? trace.traceId + self.spanId = spanId ?? trace.spanId + self.parentSpanId = parentSpanId ?? trace.parentSpanId + } else { + self.traceId = traceId + self.spanId = spanId + self.parentSpanId = parentSpanId + } self.correlationId = correlationId self.causationId = causationId self.idempotencyKey = idempotencyKey diff --git a/Sources/ARCP/Ids/Ulid.swift b/Sources/ARCP/Ids/Ulid.swift index 34d7448..6cb3ccc 100644 --- a/Sources/ARCP/Ids/Ulid.swift +++ b/Sources/ARCP/Ids/Ulid.swift @@ -30,30 +30,46 @@ private final class MonotonicGenerator: @unchecked Sendable { lock.lock() defer { lock.unlock() } var random = [UInt8](repeating: 0, count: 10) - let timeMs: UInt64 - if nowMs == lastTime { - timeMs = lastTime - random = lastRandom - increment(&random) - } else { + var timeMs: UInt64 + if nowMs > lastTime { + // System clock advanced — start a fresh random tail. timeMs = nowMs for index in 0..<10 { random[index] = UInt8.random(in: 0...255) } + } else { + // Clock equals or moved backwards. Hold the previous timestamp + // and increment the previous random tail so the new id sorts + // strictly after the last one. RFC §6.2 monotonicity. + timeMs = lastTime + random = lastRandom + if !increment(&random) { + // Tail overflowed. Advance the timestamp by one millisecond + // and restart the tail with a fresh random value so we never + // emit an id with a lower-or-equal sort key under the same + // timestamp. + timeMs = lastTime &+ 1 + for index in 0..<10 { + random[index] = UInt8.random(in: 0...255) + } + } } lastTime = timeMs lastRandom = random return encode(time: timeMs, random: random) } - private func increment(_ bytes: inout [UInt8]) { + /// Increment `bytes` as a big-endian integer. Returns `false` when the + /// value overflowed (all bytes were `0xFF`), `true` otherwise. + private func increment(_ bytes: inout [UInt8]) -> Bool { for index in (0.. String { diff --git a/Sources/ARCP/Messages/Common.swift b/Sources/ARCP/Messages/Common.swift index c6a2fa7..4b4129a 100644 --- a/Sources/ARCP/Messages/Common.swift +++ b/Sources/ARCP/Messages/Common.swift @@ -2,25 +2,51 @@ import Foundation /// Negotiated client/runtime capabilities. RFC §7. public struct Capabilities: Sendable, Codable, Hashable { + /// Whether the peer supports multi-kind streams (RFC §11). public var streaming: Bool + /// Whether the peer supports durable-job semantics (RFC §10). public var durableJobs: Bool + /// Whether the peer supports checkpointed resume (RFC §19) — currently + /// not implemented in this SDK. public var checkpoints: Bool + /// Whether the peer supports binary stream frames (RFC §11.3). public var binaryStreams: Bool + /// Whether the peer supports cross-agent handoff (RFC §14). public var agentHandoff: Bool + /// Whether inline-base64 artifacts are supported (RFC §16). public var artifacts: Bool + /// Whether subscriptions are supported (RFC §13). public var subscriptions: Bool + /// Whether the peer supports scheduled jobs (RFC §14) — currently not + /// implemented. public var scheduledJobs: Bool + /// Whether `scheme=none` (anonymous) handshakes are accepted. public var anonymous: Bool + /// Whether the peer advertises the `interrupt` control flow. **Leave + /// `false`** in this SDK release — see [Interrupts in the jobs + /// guide](https://github.com/agentruntimecontrolprotocol/swift-sdk/blob/main/docs/guides/jobs.md); + /// the runtime acks interrupts but does not surface them to handlers. public var interrupt: Bool + /// Whether `job.result_chunk` streaming is supported (RFC §8.4). public var resultChunk: Bool + /// Whether `cost.budget` accounting is supported (RFC §9.6). public var costBudget: Bool + /// Whether `model.use` lease pattern enforcement is supported (RFC §9.7). public var modelUse: Bool + /// Whether provisioned-credential issuance is supported (RFC §9.8). public var provisionedCredentials: Bool + /// Heartbeat recovery policy (RFC §10.3). **Heartbeat ack tracking is + /// not implemented in this release** — heartbeats are telemetry only. public var heartbeatRecovery: HeartbeatRecovery + /// Heartbeat interval in seconds. public var heartbeatIntervalSeconds: Int + /// Accepted binary stream encodings. public var binaryEncoding: [BinaryEncoding] + /// Artifact retention policy (RFC §16.3). public var artifactRetention: ArtifactRetention? + /// Negotiated set of advertised extension namespaces (RFC §21). public var extensions: [String] + /// Free-form extras the spec does not enumerate. public var extras: [String: Bool] /// Runtime-side advertisement of available agents (ARCP v1.1 §7.5). /// Accepts both v1.0 flat-name and v1.1 rich shapes. diff --git a/Sources/ARCP/Messages/ModelUse.swift b/Sources/ARCP/Messages/ModelUse.swift index 1e73fb4..0e8ed5f 100644 --- a/Sources/ARCP/Messages/ModelUse.swift +++ b/Sources/ARCP/Messages/ModelUse.swift @@ -14,7 +14,9 @@ public struct ModelUse: Sendable, Codable, Hashable { patterns.contains { Self.matches(pattern: $0, value: model) } } - /// Returns the first child pattern not covered by this parent set. + /// Returns the first child pattern not covered by this parent set, where + /// "covers" means every concrete model name the child pattern can match + /// is also matched by some parent pattern. public func subsetViolation(of child: ModelUse) -> String? { for childPattern in child.patterns where !covers(childPattern) { return childPattern @@ -23,20 +25,77 @@ public struct ModelUse: Sendable, Codable, Hashable { } private func covers(_ childPattern: String) -> Bool { - patterns.contains { parent in - parent == "*" - || parent == childPattern - || Self.literalPrefix(parent).map { childPattern.hasPrefix($0) } == true + patterns.contains { Self.patternCovers(parent: $0, child: childPattern) } + } + + /// Returns `true` when every model string matched by `child` is also + /// matched by `parent`. The check is conservative: it returns `false` + /// when it cannot prove subset. + static func patternCovers(parent: String, child: String) -> Bool { + if parent == "*" { return true } + if parent == child { return true } + // If child has no wildcards, defer to literal matching. + if !child.contains("*") { + return matches(pattern: parent, value: child) + } + // Both contain wildcards. Decompose into literal segments and check + // whether every concrete instantiation of `child` is matchable by + // `parent`. We do this by computing the constraints `child`'s + // segments impose and ensuring `parent`'s segments are at least as + // permissive (prefix and suffix) and that every parent middle segment + // can be located inside the child's segment sequence. + let parentSegs = split(parent) + let childSegs = split(child) + + // Prefix constraint: parent's first literal must be a prefix of + // child's first literal (parent allows at least the strings that + // start with parent.first; child's strings start with child.first). + let parentFirst = parentSegs.first ?? "" + let childFirst = childSegs.first ?? "" + if !childFirst.hasPrefix(parentFirst) { return false } + + // Suffix constraint: the last segment (after the final `*` for + // patterns containing one) anchors the suffix. If parent ends with a + // literal (i.e. does NOT end with `*`), child must also end with a + // literal that has parent's suffix as a suffix. + let parentEndsWithWildcard = parent.hasSuffix("*") + let childEndsWithWildcard = child.hasSuffix("*") + let parentLast = parentSegs.last ?? "" + let childLast = childSegs.last ?? "" + if !parentEndsWithWildcard { + if childEndsWithWildcard { return false } + if !childLast.hasSuffix(parentLast) { return false } } + + // Middle constraint: every interior literal of parent must appear in + // some interior literal of child, in order. We do not try to fold a + // parent middle into the child's anchored prefix/suffix segments — a + // wildcard in the child between two literals would otherwise let the + // child match a string the parent cannot. + let parentMiddle = Array(parentSegs.dropFirst().dropLast()) + let childMiddle = Array(childSegs.dropFirst().dropLast()) + var cursor = 0 + for needle in parentMiddle where !needle.isEmpty { + var found = false + while cursor < childMiddle.count { + if childMiddle[cursor].contains(needle) { + found = true + cursor += 1 + break + } + cursor += 1 + } + if !found { return false } + } + return true } - private static func literalPrefix(_ pattern: String) -> String? { - guard let star = pattern.firstIndex(of: "*") else { return nil } - return String(pattern[.. [String] { + pattern.split(separator: "*", omittingEmptySubsequences: false).map(String.init) } private static func matches(pattern: String, value: String) -> Bool { - let parts = pattern.split(separator: "*", omittingEmptySubsequences: false).map(String.init) + let parts = split(pattern) if parts.count == 1 { return pattern == value } var cursor = value.startIndex diff --git a/Sources/ARCP/Runtime/ARCPRuntime.swift b/Sources/ARCP/Runtime/ARCPRuntime.swift index 57b45ae..7911073 100644 --- a/Sources/ARCP/Runtime/ARCPRuntime.swift +++ b/Sources/ARCP/Runtime/ARCPRuntime.swift @@ -67,6 +67,16 @@ public actor ARCPRuntime { self.subscriptionManager = SubscriptionManager(eventLog: self.eventLog) self.artifactStore = ArtifactStore(eventLog: self.eventLog) self.log = Logger(label: "arcp.runtime") + // RFC §16.3: artifact retention sweep runs unconditionally so + // expired artifacts are removed without callers having to opt in. + let store = self.artifactStore + Task { await store.startSweep() } + } + + /// Stop the runtime's owned background tasks (currently the artifact + /// retention sweep). Called by `shutdown()` and on actor deinit. + public func stop() async { + await artifactStore.stopSweep() } /// Snapshot of currently-open session metadata. @@ -77,7 +87,8 @@ public actor ARCPRuntime { /// Register a tool handler globally. Future sessions inherit it. public func register(_ handler: any ToolHandler) { registeredHandlers.append(handler) - for manager in jobManagers.values { + let managers = jobManagers + for manager in managers.values { Task { await manager.register(handler) } } } @@ -106,19 +117,27 @@ public actor ARCPRuntime { sessionId: info.sessionId ) }, + eventLog: eventLog, + principalSubject: info.principal.subject, send: { [weak self] envelope in guard let self else { return } try await self.send(envelope, transport: transport) } ) + // Insert into jobManagers BEFORE iterating registeredHandlers so a + // concurrent `register(_:)` either sees the new entry and fans the + // handler out itself, or our subsequent iteration picks up the newly + // appended handler. `JobManager.register` is idempotent by name, so + // double-registration in the overlap window is safe. + jobManagers[info.sessionId] = jobManager for handler in registeredHandlers { await jobManager.register(handler) } await jobManager.setAgentInventory(supportedCapabilities.agents) - jobManagers[info.sessionId] = jobManager log.info("session opened", metadata: ["session": "\(info.sessionId)"]) await dispatchLoop(transport: transport, mailbox: mailbox, info: info, jobManager: jobManager) await jobManager.shutdown() + await subscriptionManager.removeAllOwned(by: info.sessionId, reason: "session closed") jobManagers.removeValue(forKey: info.sessionId) sessions.removeValue(forKey: info.sessionId) log.info("session closed", metadata: ["session": "\(info.sessionId)"]) @@ -308,7 +327,9 @@ public actor ARCPRuntime { await jobManager.handleLeaseRefresh(envelope: envelope, payload: payload) return false case .subscribe(let payload): - try await handleSubscribe(envelope: envelope, payload: payload, transport: transport) + try await handleSubscribe( + envelope: envelope, payload: payload, info: info, transport: transport + ) return false case .unsubscribe(let payload): await subscriptionManager.unsubscribe(payload.subscriptionId) @@ -361,18 +382,23 @@ public actor ARCPRuntime { private func handleSubscribe( envelope: Envelope, payload: SubscribePayload, + info: SessionInfo, transport: any Transport ) async throws { let subscriptionId = SubscriptionId.random() - let send: @Sendable (Envelope) async throws -> Void = { [weak self] env in - guard let self else { return } - try await self.send(env, transport: transport) + // Deliveries to the subscriber must NOT be re-routed through + // SubscriptionManager, or an empty-filter subscriber will see its own + // wrapped delivery cascade through itself. Write straight to the + // owning transport instead. + let deliver: @Sendable (Envelope) async throws -> Void = { env in + try await transport.send(env) } await subscriptionManager.subscribe( subscriptionId: subscriptionId, + ownerSessionId: info.sessionId, filter: payload.filter, since: payload.since, - send: send + send: deliver ) try await transport.send( Envelope( diff --git a/Sources/ARCP/Runtime/CredentialManager.swift b/Sources/ARCP/Runtime/CredentialManager.swift index d49a2c9..5de86e3 100644 --- a/Sources/ARCP/Runtime/CredentialManager.swift +++ b/Sources/ARCP/Runtime/CredentialManager.swift @@ -62,22 +62,44 @@ public actor CredentialManager { try? await retention.persistOutstanding([], jobId: jobId) } + /// Rotate the credential identified by `credentialId` for `jobId`. + /// + /// The `CredentialProvisioner` contract is treated as returning the full + /// credential set for the lease (a provider may mint multiple paired + /// credentials per issuance — e.g. access + refresh token, or several + /// vendor credentials in a single transactional issue). The rotation + /// flow therefore: + /// + /// 1. Revokes the credential that is being rotated. + /// 2. Removes that credential from the job's tracked set. + /// 3. Appends every credential returned by the provisioner to the job's + /// tracked set so the runtime can revoke them later — none are + /// silently discarded. + /// 4. Persists the updated outstanding-id list through retention. + /// + /// Returns the credential identified as the replacement: the entry whose + /// `id` matches `credentialId` if the provisioner is performing an + /// in-place rotation, otherwise the first newly minted entry. public func rotate(jobId: JobId, credentialId: String) async throws -> ProvisionedCredential { guard let lease = leaseByJob[jobId] else { throw ARCPError.notFound(kind: "job credentials", id: jobId.rawValue) } let next = try await provisioner.issue(lease: lease, jobId: jobId, sessionId: sessionId) - guard let replacement = next.first else { - throw ARCPError.unavailable(reason: "credential rotation returned no credential", retryAfter: nil) + guard !next.isEmpty else { + throw ARCPError.unavailable( + reason: "credential rotation returned no credentials", + retryAfter: nil + ) } var existing = credentialsByJob[jobId] ?? [] - if let old = existing.first(where: { $0.id == credentialId }) { - await revokeWithRetry(old.id) + if existing.contains(where: { $0.id == credentialId }) { + await revokeWithRetry(credentialId) existing.removeAll { $0.id == credentialId } } - existing.append(replacement) + existing.append(contentsOf: next) credentialsByJob[jobId] = existing try await retention.persistOutstanding(existing.map(\.id), jobId: jobId) + let replacement = next.first(where: { $0.id == credentialId }) ?? next[0] return replacement } diff --git a/Sources/ARCP/Runtime/JobContext.swift b/Sources/ARCP/Runtime/JobContext.swift index 0144e1c..4f73b70 100644 --- a/Sources/ARCP/Runtime/JobContext.swift +++ b/Sources/ARCP/Runtime/JobContext.swift @@ -4,7 +4,9 @@ import Foundation /// streaming, cancellation observation, permission/lease hooks, and /// provisioned-credential rotation. public protocol JobContext: Sendable { + /// The job identifier currently being executed. var jobId: JobId { get } + /// The session in which the job is running. var sessionId: SessionId { get } /// The `lease_constraints.expires_at` declared on `tool.invoke`, @@ -133,9 +135,14 @@ extension JobContext { /// Handle returned by `JobContext.openStream` for emitting chunks until close. public protocol StreamHandle: Sendable { + /// Identifier of the open stream. var streamId: StreamId { get } + /// Send a text chunk; optional ordinal `sequence` for ordering hints. func sendText(_ text: String, sequence: Int?) async throws + /// Send a fully-formed `stream.chunk` payload. func sendChunk(_ payload: StreamChunkPayload) async throws + /// Close the stream gracefully with an optional reason. func close(reason: String?) async throws + /// Close the stream with an error (`stream.error` envelope). func error(_ error: ARCPError) async throws } diff --git a/Sources/ARCP/Runtime/JobManager.swift b/Sources/ARCP/Runtime/JobManager.swift index 7983619..82445c6 100644 --- a/Sources/ARCP/Runtime/JobManager.swift +++ b/Sources/ARCP/Runtime/JobManager.swift @@ -16,6 +16,11 @@ public actor JobManager { private let rawSend: @Sendable (Envelope) async throws -> Void private var handlers: [String: any ToolHandler] = [:] private var jobs: [JobId: JobRecord] = [:] + private let eventLog: EventLog? + private let principalSubject: String? + /// Tracks idempotency keys for in-flight jobs so the terminal envelope can + /// be persisted on completion. + private var idempotencyByJob: [JobId: IdempotencyKey] = [:] /// Optional advertised agent inventory (ARCP v1.1 §7.5). When set, the /// runtime validates `agent@version` references on `tool.invoke` against /// it and surfaces `agentVersionNotAvailable` for unknown pins. @@ -44,6 +49,8 @@ public actor JobManager { heartbeatInterval: TimeInterval = 30, cancelDeadline: TimeInterval = 5, credentialManager: CredentialManager? = nil, + eventLog: EventLog? = nil, + principalSubject: String? = nil, send: @escaping @Sendable (Envelope) async throws -> Void ) { self.sessionId = sessionId @@ -53,7 +60,14 @@ public actor JobManager { self.streamManager = StreamManager(sessionId: sessionId, send: send) self.leaseManager = LeaseManager(sessionId: sessionId, send: send) self.credentialManager = credentialManager + self.eventLog = eventLog + self.principalSubject = principalSubject self.log = Logger(label: "arcp.jobs.\(sessionId)") + // Start the lease expiry sweep automatically (RFC §15.5). Without + // this, expired permission leases would not be revoked until a + // caller manually reached into the subsystem and started the sweep. + let manager = self.leaseManager + Task { await manager.startSweep() } } /// Register a handler for a named tool. Replaces a previous registration. @@ -74,6 +88,29 @@ public actor JobManager { /// Handle a `tool.invoke` envelope. Spawns a child Task to run the handler. public func handleToolInvoke(envelope: Envelope, payload: ToolInvokePayload) async throws { + if let trace = Self.traceContext(from: envelope) { + try await Tracing.$current.withValue(trace) { + try await self.handleToolInvokeInner(envelope: envelope, payload: payload) + } + } else { + try await handleToolInvokeInner(envelope: envelope, payload: payload) + } + } + + private static func traceContext(from envelope: Envelope) -> TraceContext? { + guard let traceId = envelope.traceId, let spanId = envelope.spanId else { return nil } + return TraceContext(traceId: traceId, spanId: spanId, parentSpanId: envelope.parentSpanId) + } + + private func handleToolInvokeInner(envelope: Envelope, payload: ToolInvokePayload) async throws { + // §6.4: when the invoke carries an idempotency key and we have a + // cached terminal response for the same (principal, key), re-emit it + // correlated to the new invoke id without re-executing the handler. + if let key = envelope.idempotencyKey, + try await replayCachedIdempotency(key: key, invokeId: envelope.id) + { + return + } // Parse the tool/agent identifier per ARCP v1.1 §7.5. Bare names and // `name@version` are both accepted; an unparseable identifier falls // through to a notFound error. @@ -226,14 +263,30 @@ public actor JobManager { invokeCorrelationId: envelope.id ) + if let key = envelope.idempotencyKey { + idempotencyByJob[jobId] = key + } + let inboundTrace = Self.traceContext(from: envelope) let runTask = Task { [weak self] in - await self?.runJob( - jobId: jobId, - invokeId: envelope.id, - handler: handler, - invocation: invocation, - context: context - ) + if let inboundTrace { + await Tracing.$current.withValue(inboundTrace) { + await self?.runJob( + jobId: jobId, + invokeId: envelope.id, + handler: handler, + invocation: invocation, + context: context + ) + } + } else { + await self?.runJob( + jobId: jobId, + invokeId: envelope.id, + handler: handler, + invocation: invocation, + context: context + ) + } return () } let heartbeatTask = Task { [weak self] in @@ -345,7 +398,28 @@ public actor JobManager { } public func handleLeaseRefresh(envelope: Envelope, payload: LeaseRefreshPayload) async { - try? await leaseManager.refresh(leaseId: payload.leaseId, seconds: payload.requestedSeconds) + do { + try await leaseManager.refresh( + leaseId: payload.leaseId, seconds: payload.requestedSeconds + ) + } catch let error as ARCPError { + try? await send( + Envelope( + sessionId: sessionId, + correlationId: envelope.id, + payload: .nack(NackPayload(error: error.toEnvelope())) + ) + ) + } catch { + let wrapped = ARCPError.internal(detail: "lease refresh failed: \(error)", cause: nil) + try? await send( + Envelope( + sessionId: sessionId, + correlationId: envelope.id, + payload: .nack(NackPayload(error: wrapped.toEnvelope())) + ) + ) + } } /// Send a permission challenge and await grant/deny. @@ -437,69 +511,135 @@ public actor JobManager { ) ) + let terminal: MessageType + let terminalState: JobState do { let result = try await handler.execute(invocation: invocation, context: context) try Task.checkCancellation() let completedPayload = try toCompleted(jobId: jobId, result) - try? await send( - Envelope( - sessionId: sessionId, - jobId: jobId, - correlationId: invokeId, - payload: .jobCompleted(completedPayload) - ) - ) - transition(jobId: jobId, to: .completed) + terminal = .jobCompleted(completedPayload) + terminalState = .completed } catch let error as ARCPError where error.code == .cancelled { - try? await send( - Envelope( - sessionId: sessionId, - jobId: jobId, - correlationId: invokeId, - payload: .jobCancelled( - JobCancelledPayload(reason: error.message, code: .cancelled) - ) - ) - ) - transition(jobId: jobId, to: .cancelled) + terminal = .jobCancelled(JobCancelledPayload(reason: error.message, code: .cancelled)) + terminalState = .cancelled } catch is CancellationError { - try? await send( - Envelope( - sessionId: sessionId, - jobId: jobId, - correlationId: invokeId, - payload: .jobCancelled( - JobCancelledPayload(reason: "task cancelled", code: .cancelled) - ) - ) - ) - transition(jobId: jobId, to: .cancelled) + terminal = .jobCancelled(JobCancelledPayload(reason: "task cancelled", code: .cancelled)) + terminalState = .cancelled } catch let error as ARCPError { - try? await send( - Envelope( - sessionId: sessionId, - jobId: jobId, - correlationId: invokeId, - payload: .jobFailed(JobFailedPayload(error: error.toEnvelope())) - ) - ) - transition(jobId: jobId, to: .failed) + terminal = .jobFailed(JobFailedPayload(error: error.toEnvelope())) + terminalState = .failed } catch { let wrapped = ARCPError.internal(detail: "\(error)", cause: error) - try? await send( - Envelope( - sessionId: sessionId, - jobId: jobId, - correlationId: invokeId, - payload: .jobFailed(JobFailedPayload(error: wrapped.toEnvelope())) - ) - ) - transition(jobId: jobId, to: .failed) + terminal = .jobFailed(JobFailedPayload(error: wrapped.toEnvelope())) + terminalState = .failed } + // Persist idempotency BEFORE emitting the terminal envelope so any + // racing duplicate invocation sees the cached response. + await persistIdempotencyIfNeeded(jobId: jobId, terminal: terminal) + try? await send( + Envelope( + sessionId: sessionId, + jobId: jobId, + correlationId: invokeId, + payload: terminal + ) + ) + transition(jobId: jobId, to: terminalState) if let record = jobs[jobId] { record.heartbeatTask?.cancel() } await credentialManager?.revokeAll(jobId: jobId) } + /// If `jobId` carries a tracked idempotency key, persist the terminal + /// `MessageType` for future lookups by the same (principal, key). + private func persistIdempotencyIfNeeded(jobId: JobId, terminal: MessageType) async { + guard let key = idempotencyByJob.removeValue(forKey: jobId), + let eventLog, + let principal = principalSubject + else { return } + guard let payloadValue = Self.encodePayloadBody(terminal) else { return } + let cached: JSONValue = .object([ + "job_id": .string(jobId.rawValue), + "type": .string(terminal.typeName), + "payload": payloadValue, + ]) + let expiresAt = Date(timeIntervalSinceNow: 24 * 60 * 60) + try? await eventLog.recordIdempotency( + principal: principal, + key: key, + response: cached, + expiresAt: expiresAt + ) + } + + /// Replay a cached idempotency response for `key`. Returns `true` when a + /// hit was found and emitted, `false` otherwise (caller should proceed + /// with normal handling). + private func replayCachedIdempotency( + key: IdempotencyKey, + invokeId: MessageId + ) async throws -> Bool { + guard let eventLog, let principal = principalSubject else { return false } + guard let cached = try await eventLog.lookupIdempotency(principal: principal, key: key) + else { return false } + guard case .object(let dict) = cached, + case .string(let jobIdValue) = dict["job_id"] ?? .null, + case .string(let typeName) = dict["type"] ?? .null, + let payloadValue = dict["payload"], + let terminal = Self.decodePayloadBody(typeName: typeName, payload: payloadValue) + else { + // Cached response present but malformed — treat as miss. + return false + } + let jobId = JobId(jobIdValue) + try? await send( + Envelope( + sessionId: sessionId, + jobId: jobId, + correlationId: invokeId, + payload: .jobAccepted(JobAcceptedPayload(jobId: jobId, credentials: nil)) + ) + ) + try? await send( + Envelope( + sessionId: sessionId, + jobId: jobId, + correlationId: invokeId, + payload: terminal + ) + ) + return true + } + + /// Encode a `MessageType` payload body as JSON (just the payload object — + /// the `type` discriminant is stored separately). + private static func encodePayloadBody(_ payload: MessageType) -> JSONValue? { + let envelope = Envelope(payload: payload) + guard let data = try? envelope.toJSON(), + let value = try? Envelope.makeDecoder().decode(JSONValue.self, from: data), + case .object(let dict) = value + else { return nil } + return dict["payload"] + } + + /// Decode a payload body previously written with `encodePayloadBody`, + /// using `typeName` as the dispatch discriminant. + private static func decodePayloadBody( + typeName: String, + payload: JSONValue + ) -> MessageType? { + let synthetic: JSONValue = .object([ + "arcp": .string("1.1"), + "id": .string("idempotency_replay"), + "type": .string(typeName), + "timestamp": .string(ISO8601DateFormatter().string(from: Date())), + "payload": payload, + ]) + guard let data = try? Envelope.makeEncoder().encode(synthetic), + let envelope = try? Envelope.makeDecoder().decode(Envelope.self, from: data) + else { return nil } + return envelope.payload + } + private func transition(jobId: JobId, to state: JobState) { if var record = jobs[jobId] { record.state = state diff --git a/Sources/ARCP/Runtime/LeaseManager.swift b/Sources/ARCP/Runtime/LeaseManager.swift index 8342f13..ffe076b 100644 --- a/Sources/ARCP/Runtime/LeaseManager.swift +++ b/Sources/ARCP/Runtime/LeaseManager.swift @@ -47,6 +47,12 @@ public actor LeaseManager { costBudget: CostBudget? = nil, modelUse: ModelUse? = nil ) async throws -> LeaseId { + guard seconds > 0 else { + throw ARCPError.invalidArgument( + field: "lease_seconds", + detail: "lease duration must be positive, got \(seconds)" + ) + } let leaseId = LeaseId.random() let expiresAt = Date(timeIntervalSinceNow: TimeInterval(seconds)) leases[leaseId] = LeaseRecord( @@ -80,6 +86,12 @@ public actor LeaseManager { /// Refresh a lease, extending it by `seconds`. RFC §15.5. public func refresh(leaseId: LeaseId, seconds: Int) async throws { + guard seconds > 0 else { + throw ARCPError.invalidArgument( + field: "requested_seconds", + detail: "lease refresh duration must be positive, got \(seconds)" + ) + } guard var record = leases[leaseId] else { throw ARCPError.notFound(kind: "lease", id: leaseId.rawValue) } diff --git a/Sources/ARCP/Runtime/Mailbox.swift b/Sources/ARCP/Runtime/Mailbox.swift index b6db8c0..29367dd 100644 --- a/Sources/ARCP/Runtime/Mailbox.swift +++ b/Sources/ARCP/Runtime/Mailbox.swift @@ -6,8 +6,14 @@ /// Producers call `put`; consumers call `next()`. When the producer side is /// done it calls `finish()`; subsequent `next()` calls return `nil` once the /// buffer is drained. +/// +/// The buffer uses a head index with periodic compaction so draining N +/// envelopes is O(N) total rather than O(N²). `Array.removeFirst()` shifts +/// the remaining elements, which made bursty resume / subscription replay +/// quadratic. actor Mailbox { private var buffer: [Element] = [] + private var head: Int = 0 private var waiter: CheckedContinuation? private var closed = false @@ -33,10 +39,25 @@ actor Mailbox { /// the buffer is empty. Multiple concurrent waiters are not supported — /// the mailbox is single-consumer. func next() async -> Element? { - if !buffer.isEmpty { return buffer.removeFirst() } + if head < buffer.count { + let value = buffer[head] + head += 1 + compactIfNeeded() + return value + } if closed { return nil } return await withCheckedContinuation { (cont: CheckedContinuation) in self.waiter = cont } } + + private func compactIfNeeded() { + // Compact when the dead prefix is at least half of the array AND not + // tiny. This keeps amortized work O(1) per element while bounding the + // wasted prefix to roughly the live tail. + if head >= 64 && head >= buffer.count / 2 { + buffer.removeFirst(head) + head = 0 + } + } } diff --git a/Sources/ARCP/Runtime/PendingRegistry.swift b/Sources/ARCP/Runtime/PendingRegistry.swift index 49da7eb..7f0dab0 100644 --- a/Sources/ARCP/Runtime/PendingRegistry.swift +++ b/Sources/ARCP/Runtime/PendingRegistry.swift @@ -8,78 +8,102 @@ import Foundation /// timeout deadline races each pending await; if the deadline elapses first, /// the awaiter sees `ARCPError.deadlineExceeded`. public actor PendingRegistry { - private var waiters: [MessageId: CheckedContinuation] = [:] + private enum Slot { + case pending + case waiting(CheckedContinuation) + case readyValue(Response) + case readyError(any Error) + } + + private var slots: [MessageId: Slot] = [:] public init() {} /// Register a waiter for `id` and return its response when it arrives or /// `ARCPError.deadlineExceeded` when `deadline` elapses first. public func awaitResponse(id: MessageId, deadline: Duration) async throws -> Response { - try await withThrowingTaskGroup(of: Response.self) { group in - group.addTask { - try await self.suspend(id: id) - } - group.addTask { [weak self] in - try await Task.sleep(for: deadline) - // Drop the waiter without resolving it — that lets the - // deadline error win the race deterministically. We don't - // want a cancel-resume here because that would race the - // resume against this task's throw. - await self?.dropForTimeout(id: id) - throw ARCPError.deadlineExceeded(operation: "pending response \(id)") - } - defer { group.cancelAll() } - guard let result = try await group.next() else { - throw ARCPError.internal(detail: "pending registry empty", cause: nil) - } - return result + slots[id] = .pending + let timeoutTask = Task { [weak self] in + try? await Task.sleep(for: deadline) + if Task.isCancelled { return } + await self?.dropForTimeout(id: id) } + defer { timeoutTask.cancel() } + return try await suspend(id: id) } /// Resolve a pending await with a value. Returns `true` if a waiter was /// found and resumed. @discardableResult public func resolve(id: MessageId, value: Response) -> Bool { - guard let cont = waiters.removeValue(forKey: id) else { return false } - cont.resume(returning: value) - return true + guard let slot = slots[id] else { return false } + switch slot { + case .pending: + slots[id] = .readyValue(value) + return true + case .waiting(let cont): + slots.removeValue(forKey: id) + cont.resume(returning: value) + return true + case .readyValue, .readyError: + return false + } } /// Resolve a pending await with an error. @discardableResult public func reject(id: MessageId, error: any Error) -> Bool { - guard let cont = waiters.removeValue(forKey: id) else { return false } - cont.resume(throwing: error) - return true + guard let slot = slots[id] else { return false } + switch slot { + case .pending: + slots[id] = .readyError(error) + return true + case .waiting(let cont): + slots.removeValue(forKey: id) + cont.resume(throwing: error) + return true + case .readyValue, .readyError: + return false + } } /// Cancel any in-flight pending request — used by external callers (e.g. /// session shutdown) to forcibly fail an awaiter. func cancel(id: MessageId) { - guard let cont = waiters.removeValue(forKey: id) else { return } - cont.resume(throwing: ARCPError.cancelled(operation: "pending \(id)", reason: "cancelled")) + _ = reject(id: id, error: ARCPError.cancelled(operation: "pending \(id)", reason: "cancelled")) } /// Quietly drop a waiter without delivering a response. Used by the /// deadline race so the deadline task's throw is the one that surfaces. func dropForTimeout(id: MessageId) { - guard let cont = waiters.removeValue(forKey: id) else { return } - cont.resume( - throwing: ARCPError.deadlineExceeded(operation: "pending response \(id)") - ) + _ = reject(id: id, error: ARCPError.deadlineExceeded(operation: "pending response \(id)")) } /// Reject every pending awaiter — called when the session ends. public func failAll(error: any Error) { - for (_, cont) in waiters { - cont.resume(throwing: error) + let snapshot = slots + slots.removeAll() + for (_, slot) in snapshot { + if case .waiting(let cont) = slot { + cont.resume(throwing: error) + } } - waiters.removeAll() } private func suspend(id: MessageId) async throws -> Response { try await withCheckedThrowingContinuation { cont in - waiters[id] = cont + switch slots[id] { + case .readyValue(let value): + slots.removeValue(forKey: id) + cont.resume(returning: value) + case .readyError(let err): + slots.removeValue(forKey: id) + cont.resume(throwing: err) + case .pending, .none: + slots[id] = .waiting(cont) + case .waiting: + cont.resume(throwing: ARCPError.internal(detail: "double-attach pending \(id)", cause: nil)) + } } } } diff --git a/Sources/ARCP/Runtime/StreamManager.swift b/Sources/ARCP/Runtime/StreamManager.swift index fe0bacd..23844ac 100644 --- a/Sources/ARCP/Runtime/StreamManager.swift +++ b/Sources/ARCP/Runtime/StreamManager.swift @@ -65,7 +65,18 @@ public actor StreamManager { } /// Subscribe to inbound chunks of a stream. - public func subscribeInbound(streamId: StreamId) -> AsyncStream { + /// + /// A stream id is single-subscriber: calling this more than once for the + /// same `streamId` throws `ARCPError.failedPrecondition`. The previous + /// behavior of silently replacing the existing continuation orphaned the + /// first subscriber, so subsequent chunks were never delivered and the + /// original stream never finished. + public func subscribeInbound(streamId: StreamId) throws -> AsyncStream { + if inboundContinuations[streamId] != nil { + throw ARCPError.failedPrecondition( + detail: "stream \(streamId) already has an inbound subscriber" + ) + } var continuation: AsyncStream.Continuation! let stream = AsyncStream { continuation = $0 } inboundContinuations[streamId] = continuation diff --git a/Sources/ARCP/Runtime/SubscriptionManager.swift b/Sources/ARCP/Runtime/SubscriptionManager.swift index c4b1eb3..034b6f6 100644 --- a/Sources/ARCP/Runtime/SubscriptionManager.swift +++ b/Sources/ARCP/Runtime/SubscriptionManager.swift @@ -23,11 +23,16 @@ public actor SubscriptionManager { /// match, ending with a synthetic `subscription.backfill_complete` event. public func subscribe( subscriptionId: SubscriptionId, + ownerSessionId: SessionId? = nil, filter: SubscriptionFilter, since: SubscriptionSince?, send: @escaping @Sendable (Envelope) async throws -> Void ) async { - subscriptions[subscriptionId] = SubscriptionRecord(filter: filter, send: send) + subscriptions[subscriptionId] = SubscriptionRecord( + ownerSessionId: ownerSessionId, + filter: filter, + send: send + ) if let since { Task { [weak self] in await self?.backfill(subscriptionId: subscriptionId, since: since) @@ -39,8 +44,36 @@ public actor SubscriptionManager { subscriptions.removeValue(forKey: subscriptionId) } + /// Remove every subscription owned by `sessionId`, attempting to notify + /// each one with a `subscribe.closed` envelope. Cleanup proceeds even if + /// the notification fails (the owning transport is typically already shut). + public func removeAllOwned(by sessionId: SessionId, reason: String) async { + let owned = subscriptions.filter { $0.value.ownerSessionId == sessionId } + for (subId, record) in owned { + subscriptions.removeValue(forKey: subId) + try? await record.send( + Envelope( + sessionId: sessionId, + subscriptionId: subId, + payload: .subscribeClosed( + SubscribeClosedPayload( + subscriptionId: subId, + code: .unavailable, + reason: reason + ) + ) + ) + ) + } + } + /// Push `envelope` to every subscriber whose filter matches. + /// + /// `subscribe.event` envelopes are ignored to prevent a subscriber with an + /// empty filter from re-wrapping its own delivery into another + /// `subscribe.event` and cascading without bound. public func route(envelope: Envelope) async { + if case .subscribeEvent = envelope.payload { return } for (subId, record) in subscriptions where record.matches(envelope) { try? await record.send( Envelope( @@ -137,6 +170,7 @@ public actor SubscriptionManager { } private struct SubscriptionRecord: Sendable { + let ownerSessionId: SessionId? let filter: SubscriptionFilter let send: @Sendable (Envelope) async throws -> Void diff --git a/Sources/ARCP/Runtime/ToolHandler.swift b/Sources/ARCP/Runtime/ToolHandler.swift index 06d9955..3ef8261 100644 --- a/Sources/ARCP/Runtime/ToolHandler.swift +++ b/Sources/ARCP/Runtime/ToolHandler.swift @@ -19,12 +19,19 @@ public protocol ToolHandler: Sendable { /// Inputs handed to a `ToolHandler`. public struct ToolInvocation: Sendable { + /// Job assigned to this invocation by the runtime. public let jobId: JobId + /// Session that hosts the invocation. public let sessionId: SessionId + /// Tool arguments as decoded from the inbound `tool.invoke` envelope. public let arguments: JSONValue + /// Optional idempotency key — duplicate invokes with the same key replay + /// the cached terminal response instead of re-executing the handler. public let idempotencyKey: IdempotencyKey? + /// Inbound trace id (if any) for cross-runtime distributed tracing. public let traceId: TraceId? + /// Create an invocation. Normally only the runtime constructs these. public init( jobId: JobId, sessionId: SessionId, @@ -46,8 +53,11 @@ public struct ToolInvocation: Sendable { /// `job.completed` carries `resultId` / `resultSize` / `summary` /// markers instead of an inline value. public enum ToolOutput: Sendable { + /// Inline JSON value returned to the caller. case value(JSONValue) + /// Reference to a stored artifact instead of an inline value. case ref(ArtifactRef) + /// No result body — the job succeeded without producing data. case empty /// Result was streamed as `job.result_chunk` events; the terminal /// `job.completed` envelope references the same `resultId`. diff --git a/Tests/ARCPTests/IdsTests.swift b/Tests/ARCPTests/IdsTests.swift index e168d7a..5b6f8a3 100644 --- a/Tests/ARCPTests/IdsTests.swift +++ b/Tests/ARCPTests/IdsTests.swift @@ -54,4 +54,15 @@ struct IdsTests { #expect(a < b || a == b) #expect(b < c || b == c) } + + // Issue #56: bursts within a single millisecond must remain strictly + // monotonic — the random tail is incremented on collision. + @Test("ULID burst stays strictly monotonic within a millisecond") + func ulidStrictlyMonotonicBurst() { + var ids: [String] = [] + for _ in 0..<1024 { ids.append(Ulid.next()) } + let sorted = ids.sorted() + #expect(ids == sorted) + #expect(Set(ids).count == ids.count) + } } diff --git a/Tests/ARCPTests/Integration/ClientDispatchTests.swift b/Tests/ARCPTests/Integration/ClientDispatchTests.swift new file mode 100644 index 0000000..2ff074c --- /dev/null +++ b/Tests/ARCPTests/Integration/ClientDispatchTests.swift @@ -0,0 +1,125 @@ +import Foundation +import Testing + +@testable import ARCP + +@Suite("Client dispatcher unhandled-fallback (issues #55, #59, #60)") +struct ClientDispatchTests { + @Test("Unknown-job job.progress falls through to unhandled (#55)") + func progressUnmatched() async throws { + let pair = MemoryTransport.makePair() + let client = try await openClient(pair: pair) + defer { Task { await client.close() } } + + // Inject a job.progress for a job id this client never invoked. + let strayJobId = JobId.random() + try await pair.server.send( + Envelope( + sessionId: client.info.sessionId, + jobId: strayJobId, + payload: .jobProgress(JobProgressPayload(percent: 50.0)) + ) + ) + + let envelope = try await firstUnhandled(client: client, predicate: { env in + if case .jobProgress = env.payload { return env.jobId == strayJobId } + return false + }) + #expect(envelope.jobId == strayJobId) + } + + @Test("Unknown-job job.result_chunk falls through to unhandled (#59)") + func resultChunkUnmatched() async throws { + let pair = MemoryTransport.makePair() + let client = try await openClient(pair: pair) + defer { Task { await client.close() } } + + let strayJobId = JobId.random() + try await pair.server.send( + Envelope( + sessionId: client.info.sessionId, + jobId: strayJobId, + payload: .jobResultChunk( + JobResultChunkPayload( + resultId: "r1", + chunkSeq: 0, + data: "hi", + encoding: .utf8, + more: true + ) + ) + ) + ) + + let envelope = try await firstUnhandled(client: client, predicate: { env in + if case .jobResultChunk = env.payload { return env.jobId == strayJobId } + return false + }) + #expect(envelope.jobId == strayJobId) + } + + @Test("Uncorrelated tool.error falls through to unhandled (#60)") + func toolErrorUnmatched() async throws { + let pair = MemoryTransport.makePair() + let client = try await openClient(pair: pair) + defer { Task { await client.close() } } + + try await pair.server.send( + Envelope( + sessionId: client.info.sessionId, + payload: .toolError( + ToolErrorPayload( + error: ARCPError.notFound(kind: "tool", id: "ghost").toEnvelope() + ) + ) + ) + ) + + let envelope = try await firstUnhandled(client: client, predicate: { env in + if case .toolError = env.payload { return true } + return false + }) + if case .toolError = envelope.payload { + // matched + } else { + Issue.record("expected toolError envelope") + } + } + + private func openClient( + pair: (client: MemoryTransport, server: MemoryTransport) + ) async throws -> ARCPClient { + let runtime = try ARCPRuntime( + identity: IdentityBlock(kind: "rt", version: "1"), + supportedCapabilities: Capabilities(durableJobs: true), + auth: BearerAuthValidator(subjectsByToken: ["t": "alice"]) + ) + Task { _ = try? await runtime.acceptSession(over: pair.server) } + return try await ARCPClient.open( + transport: pair.client, + auth: AuthBlock(scheme: .bearer, token: "t"), + client: IdentityBlock(kind: "tester", version: "1"), + capabilities: Capabilities(durableJobs: true) + ) + } + + private func firstUnhandled( + client: ARCPClient, + predicate: @escaping @Sendable (Envelope) -> Bool + ) async throws -> Envelope { + try await withThrowingTaskGroup(of: Envelope.self) { group in + group.addTask { + for await env in client.unhandled where predicate(env) { + return env + } + throw ARCPError.unavailable(reason: "unhandled closed", retryAfter: nil) + } + group.addTask { + try await Task.sleep(for: .seconds(2)) + throw ARCPError.deadlineExceeded(operation: "wait unhandled") + } + defer { group.cancelAll() } + return try await group.next()! + } + } +} diff --git a/Tests/ARCPTests/Integration/IdempotencyTests.swift b/Tests/ARCPTests/Integration/IdempotencyTests.swift new file mode 100644 index 0000000..e7e9641 --- /dev/null +++ b/Tests/ARCPTests/Integration/IdempotencyTests.swift @@ -0,0 +1,63 @@ +import Foundation +import Testing + +@testable import ARCP + +@Suite("Idempotency semantics (issue #43)") +struct IdempotencyTests { + @Test("Second invoke with same idempotency key returns the cached terminal") + func duplicateInvocationReplays() async throws { + let fixture = IntegrationFixture(handler: CountingTool()) + let open = try await fixture.open() + defer { open.close() } + + let key = IdempotencyKey("idem_dup_test_01") + + let first = try await open.client.invoke( + tool: "count", + arguments: .null, + idempotencyKey: key + ) + guard case .completed(let firstPayload) = first.outcome else { + Issue.record("first invocation did not complete") + return + } + let firstResult = firstPayload.result ?? .null + + let second = try await open.client.invoke( + tool: "count", + arguments: .null, + idempotencyKey: key + ) + guard case .completed(let secondPayload) = second.outcome else { + Issue.record("second invocation did not complete") + return + } + // Same handler-produced result re-delivered; the handler must not have + // executed twice. CountingTool's actor exposes the call count. + #expect(secondPayload.result == firstResult) + #expect(first.jobId == second.jobId) + let count = await CountingTool.callCount + #expect(count == 1) + } +} + +private struct CountingTool: ToolHandler { + static let counter = AtomicCounter() + static var callCount: Int { get async { await counter.value } } + + let name = "count" + func execute(invocation: ToolInvocation, context: any JobContext) async throws -> ToolOutput { + let next = await Self.counter.increment() + return .value(.int(Int64(next))) + } +} + +private actor AtomicCounter { + private(set) var value: Int = 0 + @discardableResult + func increment() -> Int { + value += 1 + return value + } +} diff --git a/Tests/ARCPTests/Integration/SubscriptionCleanupTests.swift b/Tests/ARCPTests/Integration/SubscriptionCleanupTests.swift new file mode 100644 index 0000000..b0694a6 --- /dev/null +++ b/Tests/ARCPTests/Integration/SubscriptionCleanupTests.swift @@ -0,0 +1,114 @@ +import Foundation +import Testing + +@testable import ARCP + +@Suite("Subscription cleanup and recursion (issues #40, #54)") +struct SubscriptionCleanupTests { + @Test("Empty filter does not recursively wrap subscribe.event") + func emptyFilterNoCascade() async throws { + let pair = MemoryTransport.makePair() + let runtime = try ARCPRuntime( + identity: IdentityBlock(kind: "rt", version: "1"), + supportedCapabilities: Capabilities(durableJobs: true, subscriptions: true), + auth: BearerAuthValidator(subjectsByToken: ["t": "alice"]) + ) + await runtime.register(NoopTool()) + let serverTask = Task { try await runtime.acceptSession(over: pair.server) } + let client = try await ARCPClient.open( + transport: pair.client, + auth: AuthBlock(scheme: .bearer, token: "t"), + client: IdentityBlock(kind: "c", version: "1"), + capabilities: Capabilities(durableJobs: true, subscriptions: true) + ) + defer { + Task { + await client.close() + _ = try? await serverTask.value + } + } + + let observed = Counter() + let drainer = Task { + for await env in client.unhandled { + if case .subscribeEvent = env.payload { + await observed.increment() + } + } + } + try await client.send( + Envelope( + sessionId: client.info.sessionId, + payload: .subscribe(SubscribePayload(filter: SubscriptionFilter())) + ) + ) + _ = try await client.invoke(tool: "noop", arguments: .null) + try await Task.sleep(for: .milliseconds(200)) + drainer.cancel() + let count = await observed.value + // We invoked one tool that emits a job lifecycle. Whether the subscriber sees + // 0+ events is fine; the critical assertion is that we did NOT cascade into a + // huge number of recursive subscribe.event envelopes. + #expect(count < 32) + } + + @Test("removeAllOwned drops session-scoped subscriptions") + func removeAllOwned() async { + let eventLog = try? EventLog.inMemory() + guard let eventLog else { + Issue.record("could not create event log") + return + } + let manager = SubscriptionManager(eventLog: eventLog) + let owner = SessionId.random() + let foreign = SessionId.random() + let subA = SubscriptionId.random() + let subB = SubscriptionId.random() + await manager.subscribe( + subscriptionId: subA, + ownerSessionId: owner, + filter: SubscriptionFilter(), + since: nil, + send: { _ in } + ) + await manager.subscribe( + subscriptionId: subB, + ownerSessionId: foreign, + filter: SubscriptionFilter(), + since: nil, + send: { _ in } + ) + await manager.removeAllOwned(by: owner, reason: "session closed") + // Subsequent routing should not invoke the dropped subscriber. We use a + // sentinel envelope to confirm only the foreign subscriber would match. + let calls = Counter() + let subC = SubscriptionId.random() + await manager.subscribe( + subscriptionId: subC, + ownerSessionId: foreign, + filter: SubscriptionFilter(), + since: nil, + send: { _ in await calls.increment() } + ) + await manager.route( + envelope: Envelope( + sessionId: foreign, + payload: .log(LogPayload(level: .info, message: "hi", attributes: nil)) + ) + ) + let value = await calls.value + #expect(value == 1) + } +} + +private struct NoopTool: ToolHandler { + let name = "noop" + func execute(invocation: ToolInvocation, context: any JobContext) async throws -> ToolOutput { + .value(.null) + } +} + +private actor Counter { + private(set) var value: Int = 0 + func increment() { value += 1 } +} diff --git a/Tests/ARCPTests/JWTAuthTests.swift b/Tests/ARCPTests/JWTAuthTests.swift new file mode 100644 index 0000000..73b0603 --- /dev/null +++ b/Tests/ARCPTests/JWTAuthTests.swift @@ -0,0 +1,109 @@ +import Foundation +import JWTKit +import Testing + +@testable import ARCP + +@Suite("JWT challenge binding (issue #58)") +struct JWTAuthTests { + @Test("Validator rejects token without nonce when challenge present") + func rejectsMissingNonce() async throws { + let keys = JWTKeyCollection() + await keys.add(hmac: HMACKey(stringLiteral: "super-secret"), digestAlgorithm: .sha256) + + let validator = JWTAuthValidator(keys: keys, audience: "arcp") + let signed = try await keys.sign( + JWTPayloadFixture( + sub: "alice", + aud: "arcp", + exp: Date().addingTimeInterval(60), + nonce: nil + ) + ) + await #expect(throws: ARCPError.self) { + _ = try await validator.validate( + auth: AuthBlock(scheme: .signedJwt, token: signed), + challenge: "nonce-123" + ) + } + } + + @Test("Validator rejects mismatched nonce") + func rejectsWrongNonce() async throws { + let keys = JWTKeyCollection() + await keys.add(hmac: HMACKey(stringLiteral: "super-secret"), digestAlgorithm: .sha256) + let validator = JWTAuthValidator(keys: keys, audience: "arcp") + let signed = try await keys.sign( + JWTPayloadFixture( + sub: "alice", + aud: "arcp", + exp: Date().addingTimeInterval(60), + nonce: "wrong" + ) + ) + await #expect(throws: ARCPError.self) { + _ = try await validator.validate( + auth: AuthBlock(scheme: .signedJwt, token: signed), + challenge: "correct" + ) + } + } + + @Test("Validator accepts matching nonce") + func acceptsMatchingNonce() async throws { + let keys = JWTKeyCollection() + await keys.add(hmac: HMACKey(stringLiteral: "super-secret"), digestAlgorithm: .sha256) + let validator = JWTAuthValidator(keys: keys, audience: "arcp") + let signed = try await keys.sign( + JWTPayloadFixture( + sub: "alice", + aud: "arcp", + exp: Date().addingTimeInterval(60), + nonce: "n-123" + ) + ) + let principal = try await validator.validate( + auth: AuthBlock(scheme: .signedJwt, token: signed), + challenge: "n-123" + ) + #expect(principal.subject == "alice") + } + + @Test("Validator accepts token without nonce when no challenge issued") + func acceptsAbsentChallenge() async throws { + let keys = JWTKeyCollection() + await keys.add(hmac: HMACKey(stringLiteral: "super-secret"), digestAlgorithm: .sha256) + let validator = JWTAuthValidator(keys: keys, audience: "arcp") + let signed = try await keys.sign( + JWTPayloadFixture( + sub: "bob", + aud: "arcp", + exp: Date().addingTimeInterval(60), + nonce: nil + ) + ) + let principal = try await validator.validate( + auth: AuthBlock(scheme: .signedJwt, token: signed), + challenge: nil + ) + #expect(principal.subject == "bob") + } +} + +private struct JWTPayloadFixture: JWTPayload { + var sub: SubjectClaim + var aud: AudienceClaim + var exp: ExpirationClaim + var nonce: String? + + init(sub: String, aud: String, exp: Date, nonce: String?) { + self.sub = SubjectClaim(value: sub) + self.aud = AudienceClaim(value: aud) + self.exp = ExpirationClaim(value: exp) + self.nonce = nonce + } + + func verify(using algorithm: some JWTAlgorithm) async throws { + try exp.verifyNotExpired() + } +} diff --git a/Tests/ARCPTests/LeaseManagerTests.swift b/Tests/ARCPTests/LeaseManagerTests.swift new file mode 100644 index 0000000..4178532 --- /dev/null +++ b/Tests/ARCPTests/LeaseManagerTests.swift @@ -0,0 +1,63 @@ +import Foundation +import Testing + +@testable import ARCP + +@Suite("LeaseManager validation (issue #48)") +struct LeaseManagerTests { + @Test("grant rejects zero seconds with invalidArgument") + func grantRejectsZero() async { + let manager = LeaseManager(sessionId: SessionId.random(), send: { _ in }) + await #expect(throws: ARCPError.self) { + try await manager.grant( + permission: "fs.read", + resource: "/tmp", + operation: "read", + seconds: 0 + ) + } + } + + @Test("grant rejects negative seconds") + func grantRejectsNegative() async { + let manager = LeaseManager(sessionId: SessionId.random(), send: { _ in }) + await #expect(throws: ARCPError.self) { + try await manager.grant( + permission: "fs.read", + resource: "/tmp", + operation: "read", + seconds: -5 + ) + } + } + + @Test("refresh rejects nonpositive seconds") + func refreshRejectsNonpositive() async throws { + let manager = LeaseManager(sessionId: SessionId.random(), send: { _ in }) + let leaseId = try await manager.grant( + permission: "fs.read", + resource: "/tmp", + operation: "read", + seconds: 60 + ) + await #expect(throws: ARCPError.self) { + try await manager.refresh(leaseId: leaseId, seconds: 0) + } + await #expect(throws: ARCPError.self) { + try await manager.refresh(leaseId: leaseId, seconds: -10) + } + } + + @Test("grant accepts positive seconds") + func grantPositive() async throws { + let manager = LeaseManager(sessionId: SessionId.random(), send: { _ in }) + let id = try await manager.grant( + permission: "fs.read", + resource: "/tmp", + operation: "read", + seconds: 60 + ) + let status = await manager.status(id) + #expect(status == .active) + } +} diff --git a/Tests/ARCPTests/MailboxTests.swift b/Tests/ARCPTests/MailboxTests.swift new file mode 100644 index 0000000..ed2dd52 --- /dev/null +++ b/Tests/ARCPTests/MailboxTests.swift @@ -0,0 +1,33 @@ +import Foundation +import Testing + +@testable import ARCP + +@Suite("Mailbox queue (issue #49)") +struct MailboxTests { + @Test("drain returns items in FIFO order") + func fifoOrder() async { + let mailbox = Mailbox() + for i in 0..<10 { await mailbox.put(i) } + await mailbox.finish() + var out: [Int] = [] + while let value = await mailbox.next() { out.append(value) } + #expect(out == Array(0..<10)) + } + + @Test("large bursty drain compacts without quadratic shifting") + func largeBurst() async { + let mailbox = Mailbox() + let total = 5_000 + for i in 0..() + let id = MessageId.random() + Task { + // Delay slightly to ensure awaitResponse has suspended. + try? await Task.sleep(for: .milliseconds(20)) + _ = await registry.resolve(id: id, value: 99) + } + let value = try await registry.awaitResponse(id: id, deadline: .seconds(2)) + #expect(value == 99) + } + + @Test("timeout fails the waiter") + func timeoutFails() async { + let registry = PendingRegistry() + let id = MessageId.random() + await #expect(throws: ARCPError.self) { + _ = try await registry.awaitResponse(id: id, deadline: .milliseconds(50)) + } + } + + @Test("reject delivered after suspend resumes with error") + func rejectAfterSuspend() async { + let registry = PendingRegistry() + let id = MessageId.random() + Task { + // Small delay so awaitResponse has a chance to suspend. + try? await Task.sleep(for: .milliseconds(20)) + _ = await registry.reject(id: id, error: ARCPError.notFound(kind: "x", id: id.rawValue)) + } + await #expect(throws: ARCPError.self) { + _ = try await registry.awaitResponse(id: id, deadline: .seconds(2)) + } + } +} diff --git a/docs/conformance.md b/docs/conformance.md index 45ac892..811657d 100644 --- a/docs/conformance.md +++ b/docs/conformance.md @@ -11,7 +11,7 @@ Swift SDK. The authoritative detail is in | Envelope format | §6.1 | All fields; ULID IDs | | Four-step handshake | §8.1 | `bearer`, `signed_jwt`, `none` | | Capability negotiation | §7 | Intersection of client + runtime caps | -| Durable jobs | §10 | State machine, heartbeats, cancel, interrupt | +| Durable jobs | §10 | State machine, heartbeat telemetry, cancel. `interrupt` is wire-acked only — no handler observation callback. | | Multi-kind streams | §11 | text, event, log, thought, metric, base64 binary + backpressure | | Permission challenges | §15.4 | Grant, refresh, revoke, expiry sweep | | Leases: `expires_at` | §9.5 | Submission validation + in-handler expiry check | diff --git a/docs/guides/jobs.md b/docs/guides/jobs.md index 4de06c1..4b4830e 100644 --- a/docs/guides/jobs.md +++ b/docs/guides/jobs.md @@ -128,11 +128,13 @@ the full request/observe loop. ## Interrupts -An interrupt is a separate control signal; the runtime delivers -`job.interrupt` to the handler without changing the job's terminal -disposition. Different from cancellation — the job continues running -once the interrupt is observed. See `interrupt` capability flag in -`Capabilities` and the runtime's `handleInterrupt` path. +An interrupt is a separate control signal. In this SDK the runtime's +`handleInterrupt` path currently only transitions the job state to +`.blocked` and sends an `ack` — there is **no handler-visible callback** +that lets the running job observe or respond to an interrupt. As a +result, `Capabilities.interrupt` defaults to `false` and should be left +that way unless you have wired your own observer. A future release may +add a `JobContext` API that surfaces interrupts to the handler. ## Listing jobs @@ -168,9 +170,15 @@ See the [`ListJobs` sample](../../Samples/ListJobs). The runtime emits `job.heartbeat` envelopes (carrying `sequence`, `deadlineMs`, and `JobState`) automatically while a handler runs. The default heartbeat interval is 30 seconds and is configurable on -`Capabilities.heartbeatIntervalSeconds`. If the runtime stops receiving -acks within the configured window it emits `HEARTBEAT_LOST` and -transitions the job to `failed`. +`Capabilities.heartbeatIntervalSeconds`. + +> **Heartbeat recovery is not implemented in this release.** The runtime +> emits heartbeat telemetry only — it does **not** track inbound `ack` +> envelopes, does **not** detect missed acknowledgements, and does +> **not** transition the job to `failed` with `HEARTBEAT_LOST`. Treat +> heartbeats as a one-way liveness signal for the client and avoid +> advertising `Capabilities.heartbeatRecovery` as `.fail` until a real +> ack-tracking implementation lands. Handlers don't call a heartbeat method directly — keep the work cooperative (`Task.yield()` inside CPU-bound loops) so the runtime's diff --git a/docs/guides/resume.md b/docs/guides/resume.md index 2328cbe..b1bed36 100644 --- a/docs/guides/resume.md +++ b/docs/guides/resume.md @@ -4,14 +4,36 @@ ARCP supports resuming a session from a specific message id, recovering after a network drop or process crash without replaying from the beginning (RFC §19). +## Scope in this SDK + +This release implements **same-session event-log replay only**: + +- After a transport drop, you must reconnect over a fresh transport. + The runtime accepts the resume request on the *new* session and + replays from the new session's event log starting after + `after_message_id`. +- **Cross-session resume is not implemented** — i.e. you cannot use + `after_message_id` from a prior session to recover events that the + prior session emitted. The runtime has no mapping from one session id + to another, and the replay query is scoped to the current session id. + Resume in this SDK works only when the same session id survives the + reconnect (e.g. when the transport drops but the runtime can be told + to continue using the same `session_id`). +- `checkpoint_id` is **not** implemented and returns + `ARCPError.unimplemented`. +- `include_open_streams` is currently ignored — open streams are not + re-emitted by the runtime. + +A future release may add a resume-token handshake that authorizes +cross-session continuation. + ## How it works Every envelope is stored in the `EventLog` with its `id` (a ULID). -After a transport drop, open a fresh session and send a `resume` -envelope carrying `after_message_id`; the runtime replays every -envelope with id greater than that cutoff and then resumes live -delivery. It terminates the replay with an `ack` whose `detail` -reports the number of replayed events. +A `resume` envelope carrying `after_message_id` triggers the runtime to +replay every envelope with id greater than that cutoff (for the current +session) and then resume live delivery. It terminates the replay with +an `ack` whose `detail` reports the number of replayed events. ## Reconnecting with a resume point