From 730cc61bf8716b5d00765d0001ea24892e63c9bd Mon Sep 17 00:00:00 2001 From: Stijn Willems Date: Tue, 17 Mar 2026 00:16:27 +0100 Subject: [PATCH] fix continuation fix rebase with upstream fix rebase with upstream fix continuation (#9) Co-authored-by: Stijn Willems --- .swift-version | 1 + Package.swift | 12 +-- .../Base/Transports/NetworkTransport.swift | 100 ++++++++---------- Sources/MCP/Client/Client.swift | 12 ++- Tests/MCPTests/ClientTests.swift | 29 ++--- 5 files changed, 71 insertions(+), 83 deletions(-) create mode 100644 .swift-version diff --git a/.swift-version b/.swift-version new file mode 100644 index 00000000..34ac1028 --- /dev/null +++ b/.swift-version @@ -0,0 +1 @@ +6.3-snapshot-2026-03-05 \ No newline at end of file diff --git a/Package.swift b/Package.swift index 2647e8bd..52e872b2 100644 --- a/Package.swift +++ b/Package.swift @@ -6,12 +6,12 @@ import PackageDescription let package = Package( name: "mcp-swift-sdk", platforms: [ - .macOS("13.0"), - .macCatalyst("16.0"), - .iOS("16.0"), - .watchOS("9.0"), - .tvOS("16.0"), - .visionOS("1.0"), + .macOS("15.0"), + .macCatalyst("18.0"), + .iOS("18.0"), + .watchOS("11.0"), + .tvOS("18.0"), + .visionOS("2.0"), ], products: [ // Products define the executables and libraries a package produces, making them visible to other packages. diff --git a/Sources/MCP/Base/Transports/NetworkTransport.swift b/Sources/MCP/Base/Transports/NetworkTransport.swift index 62b623c8..bef86de3 100644 --- a/Sources/MCP/Base/Transports/NetworkTransport.swift +++ b/Sources/MCP/Base/Transports/NetworkTransport.swift @@ -3,6 +3,7 @@ import Logging #if canImport(Network) import Network + import Synchronization /// Protocol that abstracts the Network.NWConnection functionality needed for NetworkTransport @preconcurrency protocol NetworkConnectionProtocol { @@ -511,8 +512,7 @@ import Logging var messageWithNewline = message messageWithNewline.append(UInt8(ascii: "\n")) - // Use a local actor-isolated variable to track continuation state - var sendContinuationResumed = false + let sendResumed = Atomic(false) try await withCheckedThrowingContinuation { [weak self] (continuation: CheckedContinuation) in @@ -528,47 +528,39 @@ import Logging completion: .contentProcessed { [weak self] error in guard let self = self else { return } - Task { @MainActor in - if !sendContinuationResumed { - sendContinuationResumed = true - if let error = error { - self.logger.error("Send error: \(error)") - - // Check if we should attempt to reconnect on send failure - let isStopping = await self.isStopping // Await actor-isolated property - if !isStopping && self.reconnectionConfig.enabled { - let isConnected = await self.isConnected - if isConnected { - if error.isConnectionLost { - self.logger.warning( - "Connection appears broken, will attempt to reconnect..." - ) - - // Schedule connection restart - Task { [weak self] in // Operate on self's executor - guard let self = self else { return } - - await self.setIsConnected(false) - - try? await Task.sleep(for: .milliseconds(500)) - - let currentIsStopping = await self.isStopping - if !currentIsStopping { - // Cancel the connection, then attempt to reconnect fully. - self.connection.cancel() - try? await self.connect() - } - } - } + let (exchanged, _) = sendResumed.compareExchange( + expected: false, desired: true, ordering: .acquiringAndReleasing) + guard exchanged else { return } + + if let error = error { + self.logger.error("Send error: \(error)") + + // Schedule reconnection check on a separate task + Task { [weak self] in + guard let self = self else { return } + let isStopping = await self.isStopping + if !isStopping && self.reconnectionConfig.enabled { + let isConnected = await self.isConnected + if isConnected && error.isConnectionLost { + self.logger.warning( + "Connection appears broken, will attempt to reconnect..." + ) + await self.setIsConnected(false) + try? await Task.sleep(for: .milliseconds(500)) + + let currentIsStopping = await self.isStopping + if !currentIsStopping { + self.connection.cancel() + try? await self.connect() } } - - continuation.resume( - throwing: MCPError.internalError("Send error: \(error)")) - } else { - continuation.resume() } } + + continuation.resume( + throwing: MCPError.internalError("Send error: \(error)")) + } else { + continuation.resume() } }) } @@ -747,7 +739,7 @@ import Logging /// - Returns: The received data chunk /// - Throws: Network errors or transport failures private func receiveData() async throws -> Data { - var receiveContinuationResumed = false + let receiveResumed = Atomic(false) return try await withCheckedThrowingContinuation { [weak self] (continuation: CheckedContinuation) in @@ -759,21 +751,19 @@ import Logging let maxLength = bufferConfig.maxReceiveBufferSize ?? Int.max connection.receive(minimumIncompleteLength: 1, maximumLength: maxLength) { content, _, isComplete, error in - Task { @MainActor in - if !receiveContinuationResumed { - receiveContinuationResumed = true - if let error = error { - continuation.resume(throwing: MCPError.transportError(error)) - } else if let content = content { - continuation.resume(returning: content) - } else if isComplete { - self.logger.trace("Connection completed by peer") - continuation.resume(throwing: MCPError.connectionClosed) - } else { - // EOF: Resume with empty data instead of throwing an error - continuation.resume(returning: Data()) - } - } + let (exchanged, _) = receiveResumed.compareExchange( + expected: false, desired: true, ordering: .acquiringAndReleasing) + guard exchanged else { return } + + if let error = error { + continuation.resume(throwing: MCPError.transportError(error)) + } else if let content = content { + continuation.resume(returning: content) + } else if isComplete { + self.logger.trace("Connection completed by peer") + continuation.resume(throwing: MCPError.connectionClosed) + } else { + continuation.resume(returning: Data()) } } } diff --git a/Sources/MCP/Client/Client.swift b/Sources/MCP/Client/Client.swift index 57485ade..b972d95a 100644 --- a/Sources/MCP/Client/Client.swift +++ b/Sources/MCP/Client/Client.swift @@ -592,7 +592,10 @@ public actor Client { /// Use this object to add requests to the batch. /// - Throws: `MCPError.internalError` if the client is not connected. /// Can also rethrow errors from the `body` closure or from sending the batch request. - public func withBatch(body: @escaping @Sendable (Batch) async throws -> Void) async throws { + @discardableResult + public func withBatch( + body: @escaping @Sendable (Batch) async throws -> T + ) async throws -> T { guard let connection = connection else { throw MCPError.internalError("Client connection not initialized") } @@ -600,8 +603,8 @@ public actor Client { // Create Batch actor, passing self (Client) let batch = Batch(client: self) - // Populate the batch actor by calling the user's closure. - try await body(batch) + // Populate the batch actor by calling the user's closure and capture result. + let result = try await body(batch) // Get the collected requests from the batch actor let requests = await batch.requests @@ -609,7 +612,7 @@ public actor Client { // Check if there are any requests to send guard !requests.isEmpty else { await logger?.debug("Batch requested but no requests were added.") - return // Nothing to send + return result // Return result even if no requests } await logger?.debug( @@ -620,6 +623,7 @@ public actor Client { try await connection.send(data) // Responses will be handled asynchronously by the message loop and handleBatchResponse/handleResponse. + return result } // MARK: - Lifecycle diff --git a/Tests/MCPTests/ClientTests.swift b/Tests/MCPTests/ClientTests.swift index 8a2882a4..a0de5d43 100644 --- a/Tests/MCPTests/ClientTests.swift +++ b/Tests/MCPTests/ClientTests.swift @@ -345,12 +345,10 @@ struct ClientTests { let request1 = Ping.request() let request2 = Ping.request() - nonisolated(unsafe) var resultTask1: Task? - nonisolated(unsafe) var resultTask2: Task? - - try await client.withBatch { batch in - resultTask1 = try await batch.addRequest(request1) - resultTask2 = try await batch.addRequest(request2) + let (resultTask1, resultTask2) = try await client.withBatch { batch in + let task1 = try await batch.addRequest(request1) + let task2 = try await batch.addRequest(request2) + return (task1, task2) } // Check if batch message was sent (after initialize and initialized notification) @@ -381,13 +379,8 @@ struct ClientTests { try await transport.queue(batch: [anyResponse1, anyResponse2]) // Wait for results and verify - guard let task1 = resultTask1, let task2 = resultTask2 else { - #expect(Bool(false), "Result tasks not created") - return - } - - _ = try await task1.value // Should succeed - _ = try await task2.value // Should succeed + _ = try await resultTask1.value // Should succeed + _ = try await resultTask2.value // Should succeed #expect(Bool(true)) // Reaching here means success @@ -426,11 +419,11 @@ struct ClientTests { let request1 = Ping.request() // Success let request2 = Ping.request() // Error - nonisolated(unsafe) var resultTasks: [Task] = [] - - try await client.withBatch { batch in - resultTasks.append(try await batch.addRequest(request1)) - resultTasks.append(try await batch.addRequest(request2)) + let resultTasks = try await client.withBatch { batch in + [ + try await batch.addRequest(request1), + try await batch.addRequest(request2), + ] } // Check if batch message was sent (after initialize and initialized notification)