From 4e03f8f6638778eedb0e37295408eaffd9fe48c6 Mon Sep 17 00:00:00 2001 From: Mattt Zmuda Date: Tue, 6 May 2025 04:38:21 -0700 Subject: [PATCH 1/6] Add package dependency for github.com/loopwork-ai/EventSource --- Package.resolved | 11 ++++++++++- Package.swift | 3 +++ 2 files changed, 13 insertions(+), 1 deletion(-) diff --git a/Package.resolved b/Package.resolved index eaee8ae6..5e9023c5 100644 --- a/Package.resolved +++ b/Package.resolved @@ -1,6 +1,15 @@ { - "originHash" : "cce36cb33302c2f1c28458e19b8439f736fc28106e4c6ea95d7992c74594c242", + "originHash" : "08de61941b7919a65e36c0e34f8c1c41995469b86a39122158b75b4a68c4527d", "pins" : [ + { + "identity" : "eventsource", + "kind" : "remoteSourceControl", + "location" : "https://github.com/loopwork-ai/eventsource.git", + "state" : { + "revision" : "e83f076811f32757305b8bf69ac92d05626ffdd7", + "version" : "1.1.0" + } + }, { "identity" : "swift-log", "kind" : "remoteSourceControl", diff --git a/Package.swift b/Package.swift index 270b0458..3c57596e 100644 --- a/Package.swift +++ b/Package.swift @@ -22,6 +22,7 @@ let package = Package( dependencies: [ .package(url: "https://github.com/apple/swift-system.git", from: "1.0.0"), .package(url: "https://github.com/apple/swift-log.git", from: "1.5.0"), + .package(url: "https://github.com/loopwork-ai/eventsource.git", from: "1.1.0"), ], targets: [ // Targets are the basic building blocks of a package, defining a module or a test suite. @@ -31,6 +32,7 @@ let package = Package( dependencies: [ .product(name: "SystemPackage", package: "swift-system"), .product(name: "Logging", package: "swift-log"), + .product(name: "EventSource", package: "eventsource"), ]), .testTarget( name: "MCPTests", @@ -38,6 +40,7 @@ let package = Package( "MCP", .product(name: "SystemPackage", package: "swift-system"), .product(name: "Logging", package: "swift-log"), + .product(name: "EventSource", package: "eventsource"), ]), ] ) From ab586e800ee72a97df3cfd3d29e2758d284b6ae6 Mon Sep 17 00:00:00 2001 From: Mattt Zmuda Date: Tue, 6 May 2025 04:41:25 -0700 Subject: [PATCH 2/6] Use EventSource for SSE implementation --- .../Base/Transports/HTTPClientTransport.swift | 112 ++++-------------- 1 file changed, 22 insertions(+), 90 deletions(-) diff --git a/Sources/MCP/Base/Transports/HTTPClientTransport.swift b/Sources/MCP/Base/Transports/HTTPClientTransport.swift index 9cf93a0c..c39f5589 100644 --- a/Sources/MCP/Base/Transports/HTTPClientTransport.swift +++ b/Sources/MCP/Base/Transports/HTTPClientTransport.swift @@ -1,3 +1,4 @@ +import EventSource import Foundation import Logging @@ -11,7 +12,6 @@ public actor HTTPClientTransport: Actor, Transport { public private(set) var sessionID: String? private let streaming: Bool private var streamingTask: Task? - private var lastEventID: String? public nonisolated let logger: Logger private var isConnected = false @@ -183,17 +183,13 @@ public actor HTTPClientTransport: Actor, Transport { var request = URLRequest(url: endpoint) request.httpMethod = "GET" request.addValue("text/event-stream", forHTTPHeaderField: "Accept") + request.addValue("no-cache", forHTTPHeaderField: "Cache-Control") // Add session ID if available if let sessionID = sessionID { request.addValue(sessionID, forHTTPHeaderField: "Mcp-Session-Id") } - // Add Last-Event-ID header for resumability if available - if let lastEventID = lastEventID { - request.addValue(lastEventID, forHTTPHeaderField: "Last-Event-ID") - } - logger.debug("Starting SSE connection") // Create URLSession task for SSE @@ -217,95 +213,31 @@ public actor HTTPClientTransport: Actor, Transport { // Extract session ID if present if let newSessionID = httpResponse.value(forHTTPHeaderField: "Mcp-Session-Id") { self.sessionID = newSessionID + logger.debug("Session ID received", metadata: ["sessionID": "\(newSessionID)"]) } // Process the SSE stream - var buffer = "" - var eventType = "" - var eventID: String? - var eventData = "" - - for try await byte in stream { - if Task.isCancelled { break } - - guard let char = String(bytes: [byte], encoding: .utf8) else { continue } - buffer.append(char) - - // Process complete lines - while let newlineIndex = buffer.utf8.firstIndex(where: { $0 == 10 }) { - var line = buffer[.. Date: Tue, 6 May 2025 04:55:46 -0700 Subject: [PATCH 3/6] Refactor response stream and content type handling Co-authored-by: Stephen Tallent --- .../Base/Transports/HTTPClientTransport.swift | 40 +++++++++++++------ Tests/MCPTests/HTTPClientTransportTests.swift | 2 +- 2 files changed, 29 insertions(+), 13 deletions(-) diff --git a/Sources/MCP/Base/Transports/HTTPClientTransport.swift b/Sources/MCP/Base/Transports/HTTPClientTransport.swift index c39f5589..7d20a44a 100644 --- a/Sources/MCP/Base/Transports/HTTPClientTransport.swift +++ b/Sources/MCP/Base/Transports/HTTPClientTransport.swift @@ -103,7 +103,7 @@ public actor HTTPClientTransport: Actor, Transport { request.addValue(sessionID, forHTTPHeaderField: "Mcp-Session-Id") } - let (responseData, response) = try await session.data(for: request) + let (responseStream, response) = try await session.bytes(for: request) guard let httpResponse = response as? HTTPURLResponse else { throw MCPError.internalError("Invalid HTTP response") @@ -120,19 +120,20 @@ public actor HTTPClientTransport: Actor, Transport { // Handle different response types switch httpResponse.statusCode { - case 200, 201, 202: + case 200..<300 where contentType.contains("text/event-stream"): // For SSE, the processing happens in the streaming task - if contentType.contains("text/event-stream") { - logger.debug("Received SSE response, processing in streaming task") - // The streaming is handled by the SSE task if active - return - } + logger.debug("Received SSE response, processing in streaming task") + try await self.processSSE(responseStream) + case 200..<300 where contentType.contains("application/json"): // For JSON responses, deliver the data directly - if contentType.contains("application/json") && !responseData.isEmpty { - logger.debug("Received JSON response", metadata: ["size": "\(responseData.count)"]) - messageContinuation.yield(responseData) + var buffer = Data() + for try await byte in responseStream { + buffer.append(byte) } + logger.debug("Received JSON response", metadata: ["size": "\(buffer.count)"]) + messageContinuation.yield(buffer) + case 404: // If we get a 404 with a session ID, it means our session is invalid if sessionID != nil { @@ -141,8 +142,16 @@ public actor HTTPClientTransport: Actor, Transport { throw MCPError.internalError("Session expired") } throw MCPError.internalError("Endpoint not found") + + case 405: + // If we get a 405, it means the server does not support streaming, + // so we should cancel the streaming task. + self.streamingTask?.cancel() + throw MCPError.internalError("Server does not support streaming") + default: - throw MCPError.internalError("HTTP error: \(httpResponse.statusCode)") + throw MCPError.internalError( + "Unexpected HTTP response: \(httpResponse.statusCode) \(contentType)") } } @@ -175,6 +184,10 @@ public actor HTTPClientTransport: Actor, Transport { private func connectToEventStream() async throws { logger.warning("SSE is not supported on this platform") } + + private func processSSE(_ stream: URLSession.AsyncBytes) async throws { + logger.warning("SSE is not supported on this platform") + } #else /// Establishes an SSE connection to the server private func connectToEventStream() async throws { @@ -216,7 +229,10 @@ public actor HTTPClientTransport: Actor, Transport { logger.debug("Session ID received", metadata: ["sessionID": "\(newSessionID)"]) } - // Process the SSE stream + try await self.processSSE(stream) + } + + private func processSSE(_ stream: URLSession.AsyncBytes) async throws { do { for try await event in stream.events { // Check if task has been cancelled diff --git a/Tests/MCPTests/HTTPClientTransportTests.swift b/Tests/MCPTests/HTTPClientTransportTests.swift index 8085f5a5..888d0067 100644 --- a/Tests/MCPTests/HTTPClientTransportTests.swift +++ b/Tests/MCPTests/HTTPClientTransportTests.swift @@ -341,7 +341,7 @@ import Testing Issue.record("Expected MCPError.internalError, got \(error)") throw error } - #expect(message?.contains("HTTP error: 500") ?? false) + #expect(message?.contains("Unexpected HTTP response: 500") ?? false) } catch { Issue.record("Expected MCPError, got \(error)") throw error From cdfea35e15981136a65b0c5d088897ee734f1074 Mon Sep 17 00:00:00 2001 From: Mattt Zmuda Date: Tue, 6 May 2025 05:03:33 -0700 Subject: [PATCH 4/6] Conditionalize use of streaming for supported platforms --- .../Base/Transports/HTTPClientTransport.swift | 135 ++++++++++++------ 1 file changed, 93 insertions(+), 42 deletions(-) diff --git a/Sources/MCP/Base/Transports/HTTPClientTransport.swift b/Sources/MCP/Base/Transports/HTTPClientTransport.swift index 7d20a44a..49aabb1f 100644 --- a/Sources/MCP/Base/Transports/HTTPClientTransport.swift +++ b/Sources/MCP/Base/Transports/HTTPClientTransport.swift @@ -103,56 +103,107 @@ public actor HTTPClientTransport: Actor, Transport { request.addValue(sessionID, forHTTPHeaderField: "Mcp-Session-Id") } - let (responseStream, response) = try await session.bytes(for: request) + #if canImport(FoundationNetworking) + // Linux implementation using data(for:) instead of bytes(for:) + let (responseData, response) = try await session.data(for: request) - guard let httpResponse = response as? HTTPURLResponse else { - throw MCPError.internalError("Invalid HTTP response") - } + guard let httpResponse = response as? HTTPURLResponse else { + throw MCPError.internalError("Invalid HTTP response") + } - // Process the response based on content type and status code - let contentType = httpResponse.value(forHTTPHeaderField: "Content-Type") ?? "" + // Process the response based on content type and status code + let contentType = httpResponse.value(forHTTPHeaderField: "Content-Type") ?? "" - // Extract session ID if present - if let newSessionID = httpResponse.value(forHTTPHeaderField: "Mcp-Session-Id") { - self.sessionID = newSessionID - logger.debug("Session ID received", metadata: ["sessionID": "\(newSessionID)"]) - } + // Extract session ID if present + if let newSessionID = httpResponse.value(forHTTPHeaderField: "Mcp-Session-Id") { + self.sessionID = newSessionID + logger.debug("Session ID received", metadata: ["sessionID": "\(newSessionID)"]) + } + + // Handle different response types + switch httpResponse.statusCode { + case 200..<300 where contentType.contains("text/event-stream"): + // For SSE, we can't process streaming on this platform + logger.warning("SSE responses aren't fully supported on this platform") + messageContinuation.yield(responseData) + + case 200..<300 where contentType.contains("application/json"): + // For JSON responses, deliver the data directly + logger.debug("Received JSON response", metadata: ["size": "\(responseData.count)"]) + messageContinuation.yield(responseData) + + case 404: + // If we get a 404 with a session ID, it means our session is invalid + if sessionID != nil { + logger.warning("Session has expired") + sessionID = nil + throw MCPError.internalError("Session expired") + } + throw MCPError.internalError("Endpoint not found") + + case 405: + // If we get a 405, it means the server does not support streaming, + // so we should cancel the streaming task. + self.streamingTask?.cancel() + throw MCPError.internalError("Server does not support streaming") + + default: + throw MCPError.internalError( + "Unexpected HTTP response: \(httpResponse.statusCode) \(contentType)") + } + #else + // macOS and other platforms with bytes(for:) support + let (responseStream, response) = try await session.bytes(for: request) - // Handle different response types - switch httpResponse.statusCode { - case 200..<300 where contentType.contains("text/event-stream"): - // For SSE, the processing happens in the streaming task - logger.debug("Received SSE response, processing in streaming task") - try await self.processSSE(responseStream) - - case 200..<300 where contentType.contains("application/json"): - // For JSON responses, deliver the data directly - var buffer = Data() - for try await byte in responseStream { - buffer.append(byte) + guard let httpResponse = response as? HTTPURLResponse else { + throw MCPError.internalError("Invalid HTTP response") } - logger.debug("Received JSON response", metadata: ["size": "\(buffer.count)"]) - messageContinuation.yield(buffer) - - case 404: - // If we get a 404 with a session ID, it means our session is invalid - if sessionID != nil { - logger.warning("Session has expired") - sessionID = nil - throw MCPError.internalError("Session expired") + + // Process the response based on content type and status code + let contentType = httpResponse.value(forHTTPHeaderField: "Content-Type") ?? "" + + // Extract session ID if present + if let newSessionID = httpResponse.value(forHTTPHeaderField: "Mcp-Session-Id") { + self.sessionID = newSessionID + logger.debug("Session ID received", metadata: ["sessionID": "\(newSessionID)"]) } - throw MCPError.internalError("Endpoint not found") - case 405: - // If we get a 405, it means the server does not support streaming, - // so we should cancel the streaming task. - self.streamingTask?.cancel() - throw MCPError.internalError("Server does not support streaming") + // Handle different response types + switch httpResponse.statusCode { + case 200..<300 where contentType.contains("text/event-stream"): + // For SSE, the processing happens in the streaming task + logger.debug("Received SSE response, processing in streaming task") + try await self.processSSE(responseStream) + + case 200..<300 where contentType.contains("application/json"): + // For JSON responses, deliver the data directly + var buffer = Data() + for try await byte in responseStream { + buffer.append(byte) + } + logger.debug("Received JSON response", metadata: ["size": "\(buffer.count)"]) + messageContinuation.yield(buffer) + + case 404: + // If we get a 404 with a session ID, it means our session is invalid + if sessionID != nil { + logger.warning("Session has expired") + sessionID = nil + throw MCPError.internalError("Session expired") + } + throw MCPError.internalError("Endpoint not found") - default: - throw MCPError.internalError( - "Unexpected HTTP response: \(httpResponse.statusCode) \(contentType)") - } + case 405: + // If we get a 405, it means the server does not support streaming, + // so we should cancel the streaming task. + self.streamingTask?.cancel() + throw MCPError.internalError("Server does not support streaming") + + default: + throw MCPError.internalError( + "Unexpected HTTP response: \(httpResponse.statusCode) \(contentType)") + } + #endif } /// Receives data in an async sequence From a250581bd7bec22dae7bf386fbcbee1053f75539 Mon Sep 17 00:00:00 2001 From: Mattt Zmuda Date: Tue, 6 May 2025 05:07:01 -0700 Subject: [PATCH 5/6] Refactor conditional platform handling --- .../Base/Transports/HTTPClientTransport.swift | 189 ++++++++++-------- Tests/MCPTests/HTTPClientTransportTests.swift | 2 +- 2 files changed, 112 insertions(+), 79 deletions(-) diff --git a/Sources/MCP/Base/Transports/HTTPClientTransport.swift b/Sources/MCP/Base/Transports/HTTPClientTransport.swift index 49aabb1f..f972da16 100644 --- a/Sources/MCP/Base/Transports/HTTPClientTransport.swift +++ b/Sources/MCP/Base/Transports/HTTPClientTransport.swift @@ -106,7 +106,17 @@ public actor HTTPClientTransport: Actor, Transport { #if canImport(FoundationNetworking) // Linux implementation using data(for:) instead of bytes(for:) let (responseData, response) = try await session.data(for: request) + try await processResponse(response: response, data: responseData) + #else + // macOS and other platforms with bytes(for:) support + let (responseStream, response) = try await session.bytes(for: request) + try await processResponse(response: response, stream: responseStream) + #endif + } + #if canImport(FoundationNetworking) + // Process response with data payload (Linux) + private func processResponse(response: URLResponse, data: Data) async throws { guard let httpResponse = response as? HTTPURLResponse else { throw MCPError.internalError("Invalid HTTP response") } @@ -120,41 +130,24 @@ public actor HTTPClientTransport: Actor, Transport { logger.debug("Session ID received", metadata: ["sessionID": "\(newSessionID)"]) } - // Handle different response types - switch httpResponse.statusCode { - case 200..<300 where contentType.contains("text/event-stream"): - // For SSE, we can't process streaming on this platform - logger.warning("SSE responses aren't fully supported on this platform") - messageContinuation.yield(responseData) - - case 200..<300 where contentType.contains("application/json"): - // For JSON responses, deliver the data directly - logger.debug("Received JSON response", metadata: ["size": "\(responseData.count)"]) - messageContinuation.yield(responseData) - - case 404: - // If we get a 404 with a session ID, it means our session is invalid - if sessionID != nil { - logger.warning("Session has expired") - sessionID = nil - throw MCPError.internalError("Session expired") - } - throw MCPError.internalError("Endpoint not found") - - case 405: - // If we get a 405, it means the server does not support streaming, - // so we should cancel the streaming task. - self.streamingTask?.cancel() - throw MCPError.internalError("Server does not support streaming") + try processHTTPResponse(httpResponse, contentType: contentType) + guard case 200..<300 = httpResponse.statusCode else { return } - default: - throw MCPError.internalError( - "Unexpected HTTP response: \(httpResponse.statusCode) \(contentType)") + // For SSE or JSON responses, yield the data + if contentType.contains("text/event-stream") { + logger.warning("SSE responses aren't fully supported on this platform") + } else if contentType.contains("application/json") { + logger.debug("Received JSON response", metadata: ["size": "\(data.count)"]) + messageContinuation.yield(data) + } else { + logger.warning("Unexpected content type: \(contentType)") } - #else - // macOS and other platforms with bytes(for:) support - let (responseStream, response) = try await session.bytes(for: request) - + } + #else + // Process response with byte stream (macOS, iOS, etc.) + private func processResponse(response: URLResponse, stream: URLSession.AsyncBytes) + async throws + { guard let httpResponse = response as? HTTPURLResponse else { throw MCPError.internalError("Invalid HTTP response") } @@ -168,42 +161,77 @@ public actor HTTPClientTransport: Actor, Transport { logger.debug("Session ID received", metadata: ["sessionID": "\(newSessionID)"]) } - // Handle different response types - switch httpResponse.statusCode { - case 200..<300 where contentType.contains("text/event-stream"): - // For SSE, the processing happens in the streaming task - logger.debug("Received SSE response, processing in streaming task") - try await self.processSSE(responseStream) + try processHTTPResponse(httpResponse, contentType: contentType) + guard case 200..<300 = httpResponse.statusCode else { return } - case 200..<300 where contentType.contains("application/json"): - // For JSON responses, deliver the data directly + // Handle different response types based on content and status + if contentType.contains("text/event-stream") { + // For SSE, processing happens via the stream + logger.debug("Received SSE response, processing in streaming task") + try await self.processSSE(stream) + } else if contentType.contains("application/json") { + // For JSON responses, collect and deliver the data var buffer = Data() - for try await byte in responseStream { + for try await byte in stream { buffer.append(byte) } logger.debug("Received JSON response", metadata: ["size": "\(buffer.count)"]) messageContinuation.yield(buffer) + } else { + logger.warning("Unexpected content type: \(contentType)") + } + } + #endif - case 404: - // If we get a 404 with a session ID, it means our session is invalid - if sessionID != nil { - logger.warning("Session has expired") - sessionID = nil - throw MCPError.internalError("Session expired") - } - throw MCPError.internalError("Endpoint not found") + // Common HTTP response handling for all platforms + private func processHTTPResponse(_ response: HTTPURLResponse, contentType: String) throws { + // Handle status codes according to HTTP semantics + switch response.statusCode { + case 200..<300: + // Success range - these are handled by the platform-specific code + return + + case 400: + throw MCPError.internalError("Bad request") + + case 401: + throw MCPError.internalError("Authentication required") + + case 403: + throw MCPError.internalError("Access forbidden") + + case 404: + // If we get a 404 with a session ID, it means our session is invalid + if sessionID != nil { + logger.warning("Session has expired") + sessionID = nil + throw MCPError.internalError("Session expired") + } + throw MCPError.internalError("Endpoint not found") - case 405: - // If we get a 405, it means the server does not support streaming, - // so we should cancel the streaming task. + case 405: + // If we get a 405, it means the server does not support the requested method + // If streaming was requested, we should cancel the streaming task + if streaming { self.streamingTask?.cancel() throw MCPError.internalError("Server does not support streaming") - - default: - throw MCPError.internalError( - "Unexpected HTTP response: \(httpResponse.statusCode) \(contentType)") } - #endif + throw MCPError.internalError("Method not allowed") + + case 408: + throw MCPError.internalError("Request timeout") + + case 429: + throw MCPError.internalError("Too many requests") + + case 500..<600: + // Server error range + throw MCPError.internalError("Server error: \(response.statusCode)") + + default: + throw MCPError.internalError( + "Unexpected HTTP response: \(response.statusCode) (\(contentType))") + } } /// Receives data in an async sequence @@ -215,31 +243,36 @@ public actor HTTPClientTransport: Actor, Transport { /// Starts listening for server events using SSE private func startListeningForServerEvents() async { - guard isConnected else { return } + #if canImport(FoundationNetworking) + // SSE is not supported on this platform due to issues with EventSource dependency or URLSession.bytes. + // The `streaming` flag would have initiated this, but we cannot proceed. + if streaming { // Check self.streaming to be explicit about why we're logging + logger.warning( + "SSE streaming was requested but is not supported on this platform. SSE connection will not be attempted." + ) + } + // streamingTask will complete without looping. + #else + // This is the original code for platforms that support SSE + guard isConnected else { return } - // Retry loop for connection drops - while isConnected && !Task.isCancelled { - do { - try await connectToEventStream() - } catch { - if !Task.isCancelled { - logger.error("SSE connection error: \(error)") - // Wait before retrying - try? await Task.sleep(nanoseconds: 1_000_000_000) // 1 second + // Retry loop for connection drops + while isConnected && !Task.isCancelled { + do { + // This will call the non-Linux version of connectToEventStream + try await connectToEventStream() + } catch { + if !Task.isCancelled { + logger.error("SSE connection error: \(error)") + // Wait before retrying + try? await Task.sleep(for: .seconds(1)) + } } } - } + #endif } - #if canImport(FoundationNetworking) - private func connectToEventStream() async throws { - logger.warning("SSE is not supported on this platform") - } - - private func processSSE(_ stream: URLSession.AsyncBytes) async throws { - logger.warning("SSE is not supported on this platform") - } - #else + #if !canImport(FoundationNetworking) /// Establishes an SSE connection to the server private func connectToEventStream() async throws { guard isConnected else { return } diff --git a/Tests/MCPTests/HTTPClientTransportTests.swift b/Tests/MCPTests/HTTPClientTransportTests.swift index 888d0067..05721398 100644 --- a/Tests/MCPTests/HTTPClientTransportTests.swift +++ b/Tests/MCPTests/HTTPClientTransportTests.swift @@ -341,7 +341,7 @@ import Testing Issue.record("Expected MCPError.internalError, got \(error)") throw error } - #expect(message?.contains("Unexpected HTTP response: 500") ?? false) + #expect(message?.contains("Server error: 500") ?? false) } catch { Issue.record("Expected MCPError, got \(error)") throw error From d9c082b944b0de3796d1eb059f0c6ac1f0a5452a Mon Sep 17 00:00:00 2001 From: Mattt Zmuda Date: Tue, 6 May 2025 05:25:21 -0700 Subject: [PATCH 6/6] Conditionalize code and package dependency on Linux --- Package.swift | 38 +++++++++++-------- .../Base/Transports/HTTPClientTransport.swift | 28 +++++++------- 2 files changed, 36 insertions(+), 30 deletions(-) diff --git a/Package.swift b/Package.swift index 3c57596e..9f1cb351 100644 --- a/Package.swift +++ b/Package.swift @@ -3,6 +3,25 @@ import PackageDescription +// Base dependencies needed on all platforms +var dependencies: [Package.Dependency] = [ + .package(url: "https://github.com/apple/swift-system.git", from: "1.0.0"), + .package(url: "https://github.com/apple/swift-log.git", from: "1.5.0"), +] + +// Target dependencies needed on all platforms +var targetDependencies: [Target.Dependency] = [ + .product(name: "SystemPackage", package: "swift-system"), + .product(name: "Logging", package: "swift-log"), +] + +// Add EventSource only on Apple platforms (non-Linux) +#if !os(Linux) + dependencies.append( + .package(url: "https://github.com/loopwork-ai/eventsource.git", from: "1.1.0")) + targetDependencies.append(.product(name: "EventSource", package: "eventsource")) +#endif + let package = Package( name: "mcp-swift-sdk", platforms: [ @@ -19,28 +38,15 @@ let package = Package( name: "MCP", targets: ["MCP"]) ], - dependencies: [ - .package(url: "https://github.com/apple/swift-system.git", from: "1.0.0"), - .package(url: "https://github.com/apple/swift-log.git", from: "1.5.0"), - .package(url: "https://github.com/loopwork-ai/eventsource.git", from: "1.1.0"), - ], + dependencies: dependencies, targets: [ // Targets are the basic building blocks of a package, defining a module or a test suite. // Targets can depend on other targets in this package and products from dependencies. .target( name: "MCP", - dependencies: [ - .product(name: "SystemPackage", package: "swift-system"), - .product(name: "Logging", package: "swift-log"), - .product(name: "EventSource", package: "eventsource"), - ]), + dependencies: targetDependencies), .testTarget( name: "MCPTests", - dependencies: [ - "MCP", - .product(name: "SystemPackage", package: "swift-system"), - .product(name: "Logging", package: "swift-log"), - .product(name: "EventSource", package: "eventsource"), - ]), + dependencies: ["MCP"] + targetDependencies), ] ) diff --git a/Sources/MCP/Base/Transports/HTTPClientTransport.swift b/Sources/MCP/Base/Transports/HTTPClientTransport.swift index f972da16..f9f7e40d 100644 --- a/Sources/MCP/Base/Transports/HTTPClientTransport.swift +++ b/Sources/MCP/Base/Transports/HTTPClientTransport.swift @@ -1,7 +1,10 @@ -import EventSource import Foundation import Logging +#if !os(Linux) + import EventSource +#endif + #if canImport(FoundationNetworking) import FoundationNetworking #endif @@ -103,7 +106,7 @@ public actor HTTPClientTransport: Actor, Transport { request.addValue(sessionID, forHTTPHeaderField: "Mcp-Session-Id") } - #if canImport(FoundationNetworking) + #if os(Linux) // Linux implementation using data(for:) instead of bytes(for:) let (responseData, response) = try await session.data(for: request) try await processResponse(response: response, data: responseData) @@ -114,7 +117,7 @@ public actor HTTPClientTransport: Actor, Transport { #endif } - #if canImport(FoundationNetworking) + #if os(Linux) // Process response with data payload (Linux) private func processResponse(response: URLResponse, data: Data) async throws { guard let httpResponse = response as? HTTPURLResponse else { @@ -133,9 +136,10 @@ public actor HTTPClientTransport: Actor, Transport { try processHTTPResponse(httpResponse, contentType: contentType) guard case 200..<300 = httpResponse.statusCode else { return } - // For SSE or JSON responses, yield the data + // For JSON responses, yield the data if contentType.contains("text/event-stream") { - logger.warning("SSE responses aren't fully supported on this platform") + logger.warning("SSE responses aren't fully supported on Linux") + messageContinuation.yield(data) } else if contentType.contains("application/json") { logger.debug("Received JSON response", metadata: ["size": "\(data.count)"]) messageContinuation.yield(data) @@ -164,7 +168,6 @@ public actor HTTPClientTransport: Actor, Transport { try processHTTPResponse(httpResponse, contentType: contentType) guard case 200..<300 = httpResponse.statusCode else { return } - // Handle different response types based on content and status if contentType.contains("text/event-stream") { // For SSE, processing happens via the stream logger.debug("Received SSE response, processing in streaming task") @@ -243,15 +246,13 @@ public actor HTTPClientTransport: Actor, Transport { /// Starts listening for server events using SSE private func startListeningForServerEvents() async { - #if canImport(FoundationNetworking) - // SSE is not supported on this platform due to issues with EventSource dependency or URLSession.bytes. - // The `streaming` flag would have initiated this, but we cannot proceed. - if streaming { // Check self.streaming to be explicit about why we're logging + #if os(Linux) + // SSE is not fully supported on Linux + if streaming { logger.warning( - "SSE streaming was requested but is not supported on this platform. SSE connection will not be attempted." + "SSE streaming was requested but is not fully supported on Linux. SSE connection will not be attempted." ) } - // streamingTask will complete without looping. #else // This is the original code for platforms that support SSE guard isConnected else { return } @@ -259,7 +260,6 @@ public actor HTTPClientTransport: Actor, Transport { // Retry loop for connection drops while isConnected && !Task.isCancelled { do { - // This will call the non-Linux version of connectToEventStream try await connectToEventStream() } catch { if !Task.isCancelled { @@ -272,7 +272,7 @@ public actor HTTPClientTransport: Actor, Transport { #endif } - #if !canImport(FoundationNetworking) + #if !os(Linux) /// Establishes an SSE connection to the server private func connectToEventStream() async throws { guard isConnected else { return }