diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 8dc1a5f..15ead54 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -8,7 +8,7 @@ on: jobs: test-macos: - name: Swift ${{ matrix.swift }} on macOS ${{ matrix.macos }} with Xcode ${{ matrix.xcode }} + name: Swift ${{ matrix.swift }} on macOS ${{ matrix.macos }} with Xcode ${{ matrix.xcode }}${{ matrix.variant == 'async_http_client' && ' (AsyncHTTPClient)' || '' }} runs-on: macos-${{ matrix.macos }} env: DEVELOPER_DIR: "/Applications/Xcode_${{ matrix.xcode }}.app/Contents/Developer" @@ -19,12 +19,23 @@ jobs: - macos: "15" swift: "6.0" xcode: "16.0" + variant: default - macos: "15" swift: "6.1" xcode: "16.3" + variant: default - macos: "26" swift: "6.2" xcode: "26.0" + variant: default + - macos: "15" + swift: "6.1" + xcode: "16.3" + variant: async_http_client + - macos: "26" + swift: "6.2" + xcode: "26.0" + variant: async_http_client timeout-minutes: 5 steps: - name: Checkout code @@ -36,27 +47,40 @@ jobs: path: | ~/.cache/org.swift.swiftpm .build - key: ${{ runner.os }}-swift-${{ matrix.swift }}-spm-${{ hashFiles('**/Package.resolved') }} + key: ${{ runner.os }}-swift-${{ matrix.swift }}-${{ matrix.variant }}-spm-${{ hashFiles('**/Package.resolved') }} restore-keys: | - ${{ runner.os }}-swift-${{ matrix.swift }}-spm- + ${{ runner.os }}-swift-${{ matrix.swift }}-${{ matrix.variant }}-spm- + + - name: Configure variant + run: | + if [ "${{ matrix.variant }}" = "async_http_client" ]; then + echo "SWIFT_VARIANT_ARGS=--traits AsyncHTTPClient --scratch-path .build/async-http-client" >> "$GITHUB_ENV" + else + echo "SWIFT_VARIANT_ARGS=--scratch-path .build/default" >> "$GITHUB_ENV" + fi - name: Lint run: swift format lint --recursive . --strict - name: Build - run: swift build -v + run: swift build $SWIFT_VARIANT_ARGS - name: Test - run: swift test -v + run: swift test $SWIFT_VARIANT_ARGS test-linux: - name: Swift ${{ matrix.swift-version }} on Linux + name: Swift ${{ matrix.swift-version }} on Linux${{ matrix.variant == 'async_http_client' && ' (AsyncHTTPClient)' || '' }} runs-on: ubuntu-latest strategy: fail-fast: false matrix: - swift-version: - - 6.1.0 + include: + - swift-version: "6.0.3" + variant: default + - swift-version: "6.1.0" + variant: default + - swift-version: "6.1.0" + variant: async_http_client timeout-minutes: 10 steps: - name: Checkout code @@ -67,8 +91,16 @@ jobs: with: toolchain: ${{ matrix.swift-version }} + - name: Configure variant + run: | + if [ "${{ matrix.variant }}" = "async_http_client" ]; then + echo "SWIFT_VARIANT_ARGS=--traits AsyncHTTPClient --scratch-path .build/async-http-client" >> "$GITHUB_ENV" + else + echo "SWIFT_VARIANT_ARGS=--scratch-path .build/default" >> "$GITHUB_ENV" + fi + - name: Build - run: swift build -v + run: swift build $SWIFT_VARIANT_ARGS - name: Test - run: swift test -v + run: swift test $SWIFT_VARIANT_ARGS diff --git a/Package.resolved b/Package.resolved new file mode 100644 index 0000000..58fcc12 --- /dev/null +++ b/Package.resolved @@ -0,0 +1,204 @@ +{ + "originHash" : "3f810b80f7bc6b6b9cbfce334e5ef9b8ad4870a334c4bcd33df43e381565b749", + "pins" : [ + { + "identity" : "async-http-client", + "kind" : "remoteSourceControl", + "location" : "https://github.com/swift-server/async-http-client.git", + "state" : { + "revision" : "52ed9d172018e31f2dbb46f0d4f58d66e13c281e", + "version" : "1.31.0" + } + }, + { + "identity" : "swift-algorithms", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-algorithms.git", + "state" : { + "revision" : "87e50f483c54e6efd60e885f7f5aa946cee68023", + "version" : "1.2.1" + } + }, + { + "identity" : "swift-asn1", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-asn1.git", + "state" : { + "revision" : "810496cf121e525d660cd0ea89a758740476b85f", + "version" : "1.5.1" + } + }, + { + "identity" : "swift-async-algorithms", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-async-algorithms.git", + "state" : { + "revision" : "2971dd5d9f6e0515664b01044826bcea16e59fac", + "version" : "1.1.2" + } + }, + { + "identity" : "swift-atomics", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-atomics.git", + "state" : { + "revision" : "b601256eab081c0f92f059e12818ac1d4f178ff7", + "version" : "1.3.0" + } + }, + { + "identity" : "swift-certificates", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-certificates.git", + "state" : { + "revision" : "24ccdeeeed4dfaae7955fcac9dbf5489ed4f1a25", + "version" : "1.18.0" + } + }, + { + "identity" : "swift-collections", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-collections", + "state" : { + "revision" : "7b847a3b7008b2dc2f47ca3110d8c782fb2e5c7e", + "version" : "1.3.0" + } + }, + { + "identity" : "swift-configuration", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-configuration.git", + "state" : { + "revision" : "1bb939fe7bbb00b8f8bab664cc90020c035c08d9", + "version" : "1.1.0" + } + }, + { + "identity" : "swift-crypto", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-crypto.git", + "state" : { + "revision" : "6f70fa9eab24c1fd982af18c281c4525d05e3095", + "version" : "4.2.0" + } + }, + { + "identity" : "swift-distributed-tracing", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-distributed-tracing.git", + "state" : { + "revision" : "e109d8b5308d0e05201d9a1dd1c475446a946a11", + "version" : "1.4.0" + } + }, + { + "identity" : "swift-http-structured-headers", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-http-structured-headers.git", + "state" : { + "revision" : "76d7627bd88b47bf5a0f8497dd244885960dde0b", + "version" : "1.6.0" + } + }, + { + "identity" : "swift-http-types", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-http-types.git", + "state" : { + "revision" : "45eb0224913ea070ec4fba17291b9e7ecf4749ca", + "version" : "1.5.1" + } + }, + { + "identity" : "swift-log", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-log.git", + "state" : { + "revision" : "bbd81b6725ae874c69e9b8c8804d462356b55523", + "version" : "1.10.1" + } + }, + { + "identity" : "swift-nio", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-nio.git", + "state" : { + "revision" : "e932d3c4d8f77433c8f7093b5ebcbf91463948a0", + "version" : "2.95.0" + } + }, + { + "identity" : "swift-nio-extras", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-nio-extras.git", + "state" : { + "revision" : "3df009d563dc9f21a5c85b33d8c2e34d2e4f8c3b", + "version" : "1.32.1" + } + }, + { + "identity" : "swift-nio-http2", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-nio-http2.git", + "state" : { + "revision" : "b6571f3db40799df5a7fc0e92c399aa71c883edd", + "version" : "1.40.0" + } + }, + { + "identity" : "swift-nio-ssl", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-nio-ssl.git", + "state" : { + "revision" : "173cc69a058623525a58ae6710e2f5727c663793", + "version" : "2.36.0" + } + }, + { + "identity" : "swift-nio-transport-services", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-nio-transport-services.git", + "state" : { + "revision" : "60c3e187154421171721c1a38e800b390680fb5d", + "version" : "1.26.0" + } + }, + { + "identity" : "swift-numerics", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-numerics.git", + "state" : { + "revision" : "0c0290ff6b24942dadb83a929ffaaa1481df04a2", + "version" : "1.1.1" + } + }, + { + "identity" : "swift-service-context", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-service-context.git", + "state" : { + "revision" : "d0997351b0c7779017f88e7a93bc30a1878d7f29", + "version" : "1.3.0" + } + }, + { + "identity" : "swift-service-lifecycle", + "kind" : "remoteSourceControl", + "location" : "https://github.com/swift-server/swift-service-lifecycle", + "state" : { + "revision" : "1de37290c0ab3c5a96028e0f02911b672fd42348", + "version" : "2.9.1" + } + }, + { + "identity" : "swift-system", + "kind" : "remoteSourceControl", + "location" : "https://github.com/apple/swift-system", + "state" : { + "revision" : "7c6ad0fc39d0763e0b699210e4124afd5041c5df", + "version" : "1.6.4" + } + } + ], + "version" : 3 +} diff --git a/Package.swift b/Package.swift index b7e2404..272e51f 100644 --- a/Package.swift +++ b/Package.swift @@ -1,4 +1,4 @@ -// swift-tools-version: 6.0 +// swift-tools-version: 6.1 // The swift-tools-version declares the minimum version of Swift required to build this package. import PackageDescription @@ -20,15 +20,35 @@ let package = Package( targets: ["EventSource"] ) ], + traits: [ + .trait(name: "AsyncHTTPClient") + ], + dependencies: [ + .package(url: "https://github.com/swift-server/async-http-client.git", from: "1.24.0") + ], targets: [ // Targets are the basic building blocks of a package, defining a module or a test suite. // Targets can depend on other targets in this package and products from dependencies. .target( - name: "EventSource" + name: "EventSource", + dependencies: [ + .product( + name: "AsyncHTTPClient", + package: "async-http-client", + condition: .when(traits: ["AsyncHTTPClient"]) + ) + ] ), .testTarget( name: "EventSourceTests", - dependencies: ["EventSource"] + dependencies: [ + "EventSource", + .product( + name: "AsyncHTTPClient", + package: "async-http-client", + condition: .when(traits: ["AsyncHTTPClient"]) + ), + ] ), ] ) diff --git a/Package@swift-6.0.swift b/Package@swift-6.0.swift new file mode 100644 index 0000000..2bfac39 --- /dev/null +++ b/Package@swift-6.0.swift @@ -0,0 +1,31 @@ +// swift-tools-version: 6.0 +// The swift-tools-version declares the minimum version of Swift required to build this package. + +import PackageDescription + +let package = Package( + name: "EventSource", + platforms: [ + .iOS("15.0"), + .macOS("12.0"), + .macCatalyst("15.0"), + .watchOS("8.0"), + .tvOS("15.0"), + .visionOS("1.0"), + ], + products: [ + .library( + name: "EventSource", + targets: ["EventSource"] + ) + ], + targets: [ + .target( + name: "EventSource" + ), + .testTarget( + name: "EventSourceTests", + dependencies: ["EventSource"] + ), + ] +) diff --git a/README.md b/README.md index 692df96..d318f2b 100644 --- a/README.md +++ b/README.md @@ -28,6 +28,37 @@ dependencies: [ ] ``` +### AsyncHTTPClient support + +EventSource can optionally integrate with +[AsyncHTTPClient](https://github.com/swift-server/async-http-client) +through a package trait (Swift 6.1+): + +```swift +dependencies: [ + .package( + url: "https://github.com/mattt/EventSource.git", + from: "1.4.0", + traits: ["AsyncHTTPClient"] + ) +] +``` + +Build and test with the trait enabled: + +```bash +swift build --traits AsyncHTTPClient +swift test --traits AsyncHTTPClient +``` + +> [!NOTE] +> `AsyncHTTPClient` uses SwiftNIO instead of Foundation URL Loading System. +> If traffic is routed through `AsyncHTTPClient`, `URLProtocol`-based interception +> does not apply. +> On Linux, EventSource starts with URLSession transport and switches to +> AsyncHTTPClient only after a retryable URLSession failure. Once switched, +> that EventSource instance continues using AsyncHTTPClient for reconnect attempts. + ## Usage ### Connecting to an EventSource @@ -222,4 +253,4 @@ This project is available under the MIT license. See the LICENSE file for more info. [mdn]: https://developer.mozilla.org/en-US/docs/Web/API/EventSource -[spec]: https://html.spec.whatwg.org/multipage/server-sent-events.html#the-eventsource-interface \ No newline at end of file +[spec]: https://html.spec.whatwg.org/multipage/server-sent-events.html#the-eventsource-interface diff --git a/Sources/EventSource/EventSource+AsyncHTTPClient.swift b/Sources/EventSource/EventSource+AsyncHTTPClient.swift new file mode 100644 index 0000000..25d3452 --- /dev/null +++ b/Sources/EventSource/EventSource+AsyncHTTPClient.swift @@ -0,0 +1,151 @@ +#if canImport(AsyncHTTPClient) + import AsyncHTTPClient + import Foundation + import NIOCore + import NIOHTTP1 + + #if canImport(FoundationNetworking) + import FoundationNetworking + #endif + + /// Errors thrown by the AsyncHTTPClient transport adapter. + enum EventSourceAsyncHTTPClientError: Error { + /// The request does not contain a valid URL. + case invalidRequestURL + /// The AsyncHTTPClient response cannot be converted to ``HTTPURLResponse``. + case invalidResponse + } + + /// Executes EventSource requests and returns a byte stream with response metadata. + protocol EventSourceByteStreamingBackend: Sendable { + /// Executes the given request with the provided timeout. + /// + /// - Parameters: + /// - request: The request to execute. + /// - timeout: The request timeout. + /// - Returns: A converted HTTP response and a streaming byte sequence. + /// - Throws: An error if the request fails or response conversion is invalid. + func execute(_ request: URLRequest, timeout: TimeAmount) async throws -> ( + response: HTTPURLResponse, bytes: AsyncThrowingStream + ) + } + + /// Coordinates one-time shutdown for a shared ``HTTPClient`` instance. + actor ShutdownCoordinator { + private var hasShutdown = false + + /// Shuts down the client once, and ignores subsequent calls. + func shutdown(client: HTTPClient) async { + guard !hasShutdown else { return } + hasShutdown = true + try? await client.shutdown() + } + } + + /// AsyncHTTPClient-backed implementation of ``EventSourceByteStreamingBackend``. + struct AsyncHTTPClientBackend: EventSourceByteStreamingBackend { + func execute(_ request: URLRequest, timeout: TimeAmount) async throws -> ( + response: HTTPURLResponse, bytes: AsyncThrowingStream + ) { + guard let url = request.url else { + throw EventSourceAsyncHTTPClientError.invalidRequestURL + } + + let client = HTTPClient() + let shutdownCoordinator = ShutdownCoordinator() + var clientRequest = HTTPClientRequest(url: url.absoluteString) + + if let method = request.httpMethod { + clientRequest.method = HTTPMethod(rawValue: method) + } + + for (name, value) in request.allHTTPHeaderFields ?? [:] { + clientRequest.headers.add(name: name, value: value) + } + + if let body = request.httpBody { + clientRequest.body = .bytes(body) + } + + do { + let response = try await client.execute(clientRequest, timeout: timeout) + + // HTTPURLResponse requires a [String: String] map, so duplicate header fields + // (for example, multiple Set-Cookie values) cannot be preserved independently. + var responseHeaders: [String: String] = [:] + for header in response.headers { + if let existing = responseHeaders[header.name] { + responseHeaders[header.name] = existing + ", " + header.value + } else { + responseHeaders[header.name] = header.value + } + } + + // Convert the response to an HTTPURLResponse. + guard + let httpResponse = HTTPURLResponse( + url: url, + statusCode: Int(response.status.code), + httpVersion: nil, + headerFields: responseHeaders + ) + else { + throw EventSourceAsyncHTTPClientError.invalidResponse + } + + // Convert the response body to a stream of bytes. + let bytes = AsyncThrowingStream { continuation in + let task = Task { + do { + for try await chunk in response.body { + for byte in chunk.readableBytesView { + continuation.yield(byte) + } + } + continuation.finish() + } catch { + continuation.finish(throwing: error) + } + await shutdownCoordinator.shutdown(client: client) + } + continuation.onTermination = { _ in + task.cancel() + Task { + await shutdownCoordinator.shutdown(client: client) + } + } + } + + return (httpResponse, bytes) + } catch { + await shutdownCoordinator.shutdown(client: client) + throw error + } + } + } + + /// Decides when Linux URLSession failures should fall back to AsyncHTTPClient. + enum EventSourceFallbackPolicy { + /// Returns whether fallback should occur for the given error. + /// + /// - Parameters: + /// - useAsyncHTTPClientOnLinux: Whether this instance already uses AsyncHTTPClient. + /// - error: The failure from the current connection attempt. + /// - Returns: `true` when fallback should switch transports, otherwise `false`. + static func shouldFallback( + useAsyncHTTPClientOnLinux: Bool, + error: Error + ) -> Bool { + guard !useAsyncHTTPClientOnLinux else { return false } + guard error is EventSourceError == false else { return false } + if let urlError = error as? URLError { + let nonRetryableCodes: Set = [ + .badURL, .unsupportedURL, .userAuthenticationRequired, + ] + return !nonRetryableCodes.contains(urlError.code) + } + return false + } + } + +#endif diff --git a/Sources/EventSource/EventSource.swift b/Sources/EventSource/EventSource.swift index b097f4e..1e28162 100644 --- a/Sources/EventSource/EventSource.swift +++ b/Sources/EventSource/EventSource.swift @@ -103,7 +103,7 @@ public enum EventSourceError: Swift.Error, LocalizedError { guard let self else { return } await last?.value await parser.finish() - if let e = error { await owner?.emitError(e) } + await owner?.setLinuxCompletionError(error) await owner?.resumeLinuxCompletion() } } @@ -375,6 +375,11 @@ public actor EventSource { private var linuxSession: URLSession? private var linuxTask: URLSessionDataTask? private var linuxCompletion: CheckedContinuation? + private var linuxCompletionError: Error? + + #if canImport(AsyncHTTPClient) + private var useAsyncHTTPClientOnLinux = false + #endif #endif /// The callback to invoke when the connection is opened. @@ -445,7 +450,7 @@ public actor EventSource { readyState = .closed linuxTask?.cancel() linuxSession?.invalidateAndCancel() - await resumeLinuxCompletion() + resumeLinuxCompletion() } fileprivate func waitForLinuxCompletion() async { @@ -458,6 +463,16 @@ public actor EventSource { linuxCompletion?.resume() linuxCompletion = nil } + + fileprivate func setLinuxCompletionError(_ error: Error?) { + linuxCompletionError = error + } + + fileprivate func consumeLinuxCompletionError() -> Error? { + let error = linuxCompletionError + linuxCompletionError = nil + return error + } #endif /// Initializes a new EventSource and begins connecting to the given URL. @@ -541,22 +556,38 @@ public actor EventSource { // Perform the HTTP request with streaming support #if canImport(FoundationNetworking) - // Linux: Use URLSessionDataTask with delegate for streaming - let delegate = LinuxSSEDelegate(owner: self, parser: parser) - let delegateQueue = OperationQueue() - delegateQueue.maxConcurrentOperationCount = 1 - let linuxSession = URLSession( - configuration: session.configuration, - delegate: delegate, - delegateQueue: delegateQueue - ) - self.linuxSession = linuxSession - let task = linuxSession.dataTask(with: currentRequest) - self.linuxTask = task - task.resume() - - // Wait until the stream completes (server closed or error) - await waitForLinuxCompletion() + #if canImport(AsyncHTTPClient) + if shouldUseAsyncHTTPClientOnLinux() { + try await connectUsingAsyncHTTPClient( + request: currentRequest, + parser: parser + ) + } else { + do { + try await connectUsingLinuxURLSession( + request: currentRequest, + parser: parser + ) + } catch { + if shouldFallbackToAsyncHTTPClient(for: error) { + // Once this instance falls back to AsyncHTTPClient on Linux, + // subsequent reconnect attempts continue using it. + useAsyncHTTPClientOnLinux = true + try await connectUsingAsyncHTTPClient( + request: currentRequest, + parser: parser + ) + } else { + throw error + } + } + } + #else + try await connectUsingLinuxURLSession( + request: currentRequest, + parser: parser + ) + #endif #else // Apple platforms: Use URLSession.bytes for true streaming let (byteStream, response) = try await session.bytes( @@ -566,14 +597,7 @@ public actor EventSource { // Validate HTTP response (status code and content type). if let httpResponse = response as? HTTPURLResponse { - let status = httpResponse.statusCode - if status != 200 { - throw EventSourceError.invalidHTTPStatus(status) - } - let contentType = httpResponse.value(forHTTPHeaderField: "Content-Type") - if contentType?.lowercased().hasPrefix("text/event-stream") != true { - throw EventSourceError.invalidContentType(contentType) - } + try validateHTTPResponse(httpResponse) } else { throw EventSourceError.invalidHTTPStatus(0) } @@ -585,19 +609,7 @@ public actor EventSource { } // Read the incoming byte stream and parse events. - for try await byte in byteStream { - if Task.isCancelled || readyState == .closed { - break - } - - await parser.consume(byte) - - while let event = await parser.getNextEvent() { - if let onMessage = _onMessageCallback { - await onMessage(event) - } - } - } + try await processIncomingBytes(from: byteStream, parser: parser) #endif // End of stream reached (server closed connection). @@ -643,6 +655,98 @@ public actor EventSource { // Update state to `.closed`. readyState = .closed } + + private func processIncomingBytes( + from byteStream: S, + parser: Parser + ) async throws where S.Element == UInt8 { + for try await byte in byteStream { + if Task.isCancelled || readyState == .closed { + break + } + + await parser.consume(byte) + + while let event = await parser.getNextEvent() { + if let onMessage = _onMessageCallback { + await onMessage(event) + } + } + } + } + + #if canImport(FoundationNetworking) + private func connectUsingLinuxURLSession( + request currentRequest: URLRequest, + parser: Parser + ) async throws { + let delegate = LinuxSSEDelegate(owner: self, parser: parser) + let delegateQueue = OperationQueue() + delegateQueue.maxConcurrentOperationCount = 1 + let linuxSession = URLSession( + configuration: session.configuration, + delegate: delegate, + delegateQueue: delegateQueue + ) + self.linuxSession = linuxSession + self.linuxCompletionError = nil + let task = linuxSession.dataTask(with: currentRequest) + self.linuxTask = task + task.resume() + + await waitForLinuxCompletion() + if let completionError = consumeLinuxCompletionError() { + throw completionError + } + } + + private func shouldUseAsyncHTTPClientOnLinux() -> Bool { + #if canImport(AsyncHTTPClient) + return useAsyncHTTPClientOnLinux + #else + return false + #endif + } + + #if canImport(AsyncHTTPClient) + private func shouldFallbackToAsyncHTTPClient(for error: Error) -> Bool { + EventSourceFallbackPolicy.shouldFallback( + useAsyncHTTPClientOnLinux: useAsyncHTTPClientOnLinux, + error: error + ) + } + + private func connectUsingAsyncHTTPClient( + request currentRequest: URLRequest, + parser: Parser + ) async throws { + let backend = AsyncHTTPClientBackend() + let timeoutSeconds = max(1.0, session.configuration.timeoutIntervalForResource) + let timeoutNanos = Int64(min(timeoutSeconds * 1_000_000_000, Double(Int64.max))) + let (response, byteStream) = try await backend.execute( + currentRequest, + timeout: .nanoseconds(timeoutNanos) + ) + try validateHTTPResponse(response) + readyState = .open + if let onOpen = _onOpenCallback { + await onOpen() + } + try await processIncomingBytes(from: byteStream, parser: parser) + } + #endif + #endif + + private func validateHTTPResponse(_ httpResponse: HTTPURLResponse) throws { + let status = httpResponse.statusCode + if status != 200 { + throw EventSourceError.invalidHTTPStatus(status) + } + let contentType = httpResponse.value(forHTTPHeaderField: "Content-Type") + if contentType?.lowercased().hasPrefix("text/event-stream") != true { + throw EventSourceError.invalidContentType(contentType) + } + } } /// A type alias for `EventSource.Event`. diff --git a/Tests/EventSourceTests/AsyncHTTPClientTraitTests.swift b/Tests/EventSourceTests/AsyncHTTPClientTraitTests.swift new file mode 100644 index 0000000..618b856 --- /dev/null +++ b/Tests/EventSourceTests/AsyncHTTPClientTraitTests.swift @@ -0,0 +1,212 @@ +#if canImport(AsyncHTTPClient) + import AsyncHTTPClient + import Foundation + import NIOCore + import NIOHTTP1 + #if canImport(NIOPosix) + import NIOPosix + #endif + import Testing + + @testable import EventSource + + #if canImport(FoundationNetworking) + import FoundationNetworking + #endif + + @Suite("AsyncHTTPClient Trait Tests") + struct AsyncHTTPClientTraitTests { + @Test("AsyncHTTPClient backend conforms to streaming backend protocol") + func backendProtocolConformance() { + func acceptsBackend(_: some EventSourceByteStreamingBackend) {} + acceptsBackend(AsyncHTTPClientBackend()) + } + + @Test("AsyncHTTPClient backend surfaces execution errors") + func executionError() async { + let backend = AsyncHTTPClientBackend() + let request = URLRequest(url: URL(string: "ftp://127.0.0.1/unavailable")!) + do { + _ = try await backend.execute(request, timeout: .seconds(1)) + Issue.record("Expected backend.execute to throw for unreachable host") + } catch { + // expected + } + } + + @Test("Falls back for retryable URL errors") + func fallbackForRetryableURLError() { + let shouldFallback = EventSourceFallbackPolicy.shouldFallback( + useAsyncHTTPClientOnLinux: false, + error: URLError(.cannotConnectToHost) + ) + #expect(shouldFallback) + } + + @Test("Does not fall back for non-retryable URL errors") + func noFallbackForNonRetryableURLError() { + let shouldFallback = EventSourceFallbackPolicy.shouldFallback( + useAsyncHTTPClientOnLinux: false, + error: URLError(.unsupportedURL) + ) + #expect(!shouldFallback) + } + + @Test("Does not fall back for EventSource errors") + func noFallbackForEventSourceError() { + let shouldFallback = EventSourceFallbackPolicy.shouldFallback( + useAsyncHTTPClientOnLinux: false, + error: EventSourceError.invalidHTTPStatus(500) + ) + #expect(!shouldFallback) + } + + @Test("Does not fall back when AsyncHTTPClient is already active") + func noFallbackWhenAlreadyUsingAsyncHTTPClient() { + let shouldFallback = EventSourceFallbackPolicy.shouldFallback( + useAsyncHTTPClientOnLinux: true, + error: URLError(.cannotConnectToHost) + ) + #expect(!shouldFallback) + } + + @Test("Does not fall back for non-URL errors") + func noFallbackForNonURLError() { + struct SomeError: Error {} + let shouldFallback = EventSourceFallbackPolicy.shouldFallback( + useAsyncHTTPClientOnLinux: false, + error: SomeError() + ) + #expect(!shouldFallback) + } + + #if canImport(NIOPosix) + @Test("AsyncHTTPClient backend executes request and streams bytes") + func backendExecutionAndStreaming() async throws { + try await withLocalSSEHTTPServer( + responseBody: "data: hello\n\ndata: world\n\n", + headers: [ + ("Content-Type", "text/event-stream"), + ("Set-Cookie", "a=1"), + ("Set-Cookie", "b=2"), + ] + ) { server in + var request = URLRequest(url: server.url) + request.setValue("text/event-stream", forHTTPHeaderField: "Accept") + let backend = AsyncHTTPClientBackend() + let (response, bytes) = try await backend.execute(request, timeout: .seconds(5)) + + #expect(response.statusCode == 200) + #expect(response.value(forHTTPHeaderField: "Content-Type") == "text/event-stream") + #expect(response.value(forHTTPHeaderField: "Set-Cookie")?.contains("a=1") == true) + #expect(response.value(forHTTPHeaderField: "Set-Cookie")?.contains("b=2") == true) + + var collected: [UInt8] = [] + for try await byte in bytes { + collected.append(byte) + } + let payload = String(decoding: collected, as: UTF8.self) + #expect(payload == "data: hello\n\ndata: world\n\n") + } + } + + private func withLocalSSEHTTPServer( + responseBody: String, + headers: [(String, String)], + operation: (LocalSSEHTTPServer) async throws -> T + ) async throws -> T { + let server = try await LocalSSEHTTPServer( + responseBody: responseBody, + headers: headers + ) + do { + let result = try await operation(server) + try? await server.shutdown() + return result + } catch { + try? await server.shutdown() + throw error + } + } + #endif + } + + #if canImport(NIOPosix) + private final class StaticResponseHandler: ChannelInboundHandler, @unchecked Sendable { + typealias InboundIn = HTTPServerRequestPart + typealias OutboundOut = HTTPServerResponsePart + + private let responseBody: String + private let headers: [(String, String)] + + init(responseBody: String, headers: [(String, String)]) { + self.responseBody = responseBody + self.headers = headers + } + + func channelRead(context: ChannelHandlerContext, data: NIOAny) { + let part = unwrapInboundIn(data) + switch part { + case .end: + var httpHeaders = HTTPHeaders() + for (name, value) in headers { + httpHeaders.add(name: name, value: value) + } + let head = HTTPResponseHead( + version: .http1_1, + status: .ok, + headers: httpHeaders + ) + context.write(wrapOutboundOut(.head(head)), promise: nil) + var buffer = context.channel.allocator.buffer(capacity: responseBody.utf8.count) + buffer.writeString(responseBody) + context.write(wrapOutboundOut(.body(.byteBuffer(buffer))), promise: nil) + context.writeAndFlush(wrapOutboundOut(.end(nil)), promise: nil) + default: + break + } + } + } + + private struct LocalSSEHTTPServer { + let url: URL + + private let group: MultiThreadedEventLoopGroup + private let channel: Channel + + init(responseBody: String, headers: [(String, String)]) async throws { + let group = MultiThreadedEventLoopGroup(numberOfThreads: 1) + let bootstrap = ServerBootstrap(group: group) + .serverChannelOption(ChannelOptions.backlog, value: 256) + .serverChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1) + .childChannelInitializer { channel in + channel.pipeline.configureHTTPServerPipeline().flatMap { + channel.pipeline.addHandler( + StaticResponseHandler( + responseBody: responseBody, + headers: headers + ) + ) + } + } + .childChannelOption(ChannelOptions.socketOption(.so_reuseaddr), value: 1) + + let channel = try await bootstrap.bind(host: "127.0.0.1", port: 0).get() + guard let port = channel.localAddress?.port else { + try? await channel.close().get() + try? await group.shutdownGracefully() + throw URLError(.cannotFindHost) + } + + self.group = group + self.channel = channel + self.url = URL(string: "http://127.0.0.1:\(port)/events")! + } + + func shutdown() async throws { + try await channel.close().get() + try await group.shutdownGracefully() + } + } + #endif +#endif