From a20179958b014fcacc00a94d82e8aec59671936c Mon Sep 17 00:00:00 2001 From: Stijn Willems Date: Sun, 22 Mar 2026 11:44:09 +0100 Subject: [PATCH 1/3] direnv for git --- .envrc | 1 + 1 file changed, 1 insertion(+) create mode 100644 .envrc diff --git a/.envrc b/.envrc new file mode 100644 index 00000000..26bce532 --- /dev/null +++ b/.envrc @@ -0,0 +1 @@ +export GIT_DIR=$PWD/.jj/repo/store/git From 5d5071839a8f21e9178e4cc8437346e2359ed60c Mon Sep 17 00:00:00 2001 From: Stijn Willems Date: Wed, 18 Mar 2026 10:34:44 +0100 Subject: [PATCH 2/3] fix: use LockedValue for continuation properties (Swift 6.3 compat) (#10) Replace nonisolated(unsafe) on non-Sendable CheckedContinuation properties with a thread-safe LockedValue wrapper using NSLock. This fixes Swift 6.3-dev build failures without bumping platform minimums (preserves macOS 13+/iOS 16+). Fixes #211 Co-authored-by: Stijn Willems --- .../Base/Transports/NetworkTransport.swift | 91 +++++++------------ 1 file changed, 34 insertions(+), 57 deletions(-) diff --git a/Sources/MCP/Base/Transports/NetworkTransport.swift b/Sources/MCP/Base/Transports/NetworkTransport.swift index 62b623c8..48dc1c78 100644 --- a/Sources/MCP/Base/Transports/NetworkTransport.swift +++ b/Sources/MCP/Base/Transports/NetworkTransport.swift @@ -511,9 +511,6 @@ import Logging var messageWithNewline = message messageWithNewline.append(UInt8(ascii: "\n")) - // Use a local actor-isolated variable to track continuation state - var sendContinuationResumed = false - try await withCheckedThrowingContinuation { [weak self] (continuation: CheckedContinuation) in guard let self = self else { @@ -528,47 +525,35 @@ import Logging completion: .contentProcessed { [weak self] error in guard let self = self else { return } - Task { @MainActor in - if !sendContinuationResumed { - sendContinuationResumed = true - if let error = error { - self.logger.error("Send error: \(error)") - - // Check if we should attempt to reconnect on send failure - let isStopping = await self.isStopping // Await actor-isolated property - if !isStopping && self.reconnectionConfig.enabled { - let isConnected = await self.isConnected - if isConnected { - if error.isConnectionLost { - self.logger.warning( - "Connection appears broken, will attempt to reconnect..." - ) - - // Schedule connection restart - Task { [weak self] in // Operate on self's executor - guard let self = self else { return } - - await self.setIsConnected(false) - - try? await Task.sleep(for: .milliseconds(500)) - - let currentIsStopping = await self.isStopping - if !currentIsStopping { - // Cancel the connection, then attempt to reconnect fully. - self.connection.cancel() - try? await self.connect() - } - } - } + if let error = error { + self.logger.error("Send error: \(error)") + + // Schedule reconnection check on a separate task + Task { [weak self] in + guard let self = self else { return } + let isStopping = await self.isStopping + if !isStopping && self.reconnectionConfig.enabled { + let isConnected = await self.isConnected + if isConnected && error.isConnectionLost { + self.logger.warning( + "Connection appears broken, will attempt to reconnect..." + ) + await self.setIsConnected(false) + try? await Task.sleep(for: .milliseconds(500)) + + let currentIsStopping = await self.isStopping + if !currentIsStopping { + self.connection.cancel() + try? await self.connect() } } - - continuation.resume( - throwing: MCPError.internalError("Send error: \(error)")) - } else { - continuation.resume() } } + + continuation.resume( + throwing: MCPError.internalError("Send error: \(error)")) + } else { + continuation.resume() } }) } @@ -747,8 +732,6 @@ import Logging /// - Returns: The received data chunk /// - Throws: Network errors or transport failures private func receiveData() async throws -> Data { - var receiveContinuationResumed = false - return try await withCheckedThrowingContinuation { [weak self] (continuation: CheckedContinuation) in guard let self = self else { @@ -759,21 +742,15 @@ import Logging let maxLength = bufferConfig.maxReceiveBufferSize ?? Int.max connection.receive(minimumIncompleteLength: 1, maximumLength: maxLength) { content, _, isComplete, error in - Task { @MainActor in - if !receiveContinuationResumed { - receiveContinuationResumed = true - if let error = error { - continuation.resume(throwing: MCPError.transportError(error)) - } else if let content = content { - continuation.resume(returning: content) - } else if isComplete { - self.logger.trace("Connection completed by peer") - continuation.resume(throwing: MCPError.connectionClosed) - } else { - // EOF: Resume with empty data instead of throwing an error - continuation.resume(returning: Data()) - } - } + if let error = error { + continuation.resume(throwing: MCPError.transportError(error)) + } else if let content = content { + continuation.resume(returning: content) + } else if isComplete { + self.logger.trace("Connection completed by peer") + continuation.resume(throwing: MCPError.connectionClosed) + } else { + continuation.resume(returning: Data()) } } } From 70ea8de4aeeceee3547d340d404697d2783f76bb Mon Sep 17 00:00:00 2001 From: Stijn Willems Date: Tue, 17 Mar 2026 16:35:08 +0100 Subject: [PATCH 3/3] fix continuation (#9) --- .gitignore | 4 +- .../Base/Transports/NetworkTransport.swift | 123 +++++++++++++++++- Sources/MCP/Client/Client.swift | 12 +- Tests/MCPTests/ClientTests.swift | 72 +++++++--- 4 files changed, 187 insertions(+), 24 deletions(-) diff --git a/.gitignore b/.gitignore index 2d0abd04..4679ed9d 100644 --- a/.gitignore +++ b/.gitignore @@ -6,4 +6,6 @@ DerivedData/ .swiftpm/configuration/registries.json .swiftpm/xcode/package.xcworkspace/contents.xcworkspacedata .netrc -.index-build/ \ No newline at end of file +.index-build/ +.claude/settings.local.json +.swift-version \ No newline at end of file diff --git a/Sources/MCP/Base/Transports/NetworkTransport.swift b/Sources/MCP/Base/Transports/NetworkTransport.swift index 48dc1c78..67cd4fab 100644 --- a/Sources/MCP/Base/Transports/NetworkTransport.swift +++ b/Sources/MCP/Base/Transports/NetworkTransport.swift @@ -459,7 +459,14 @@ import Logging content: Heartbeat().data, contentContext: .defaultMessage, isComplete: true, - completion: .contentProcessed { [weak self] error in + completion: .contentProcessed { [weak self, continuation] error in + guard let self = self else { + continuation.resume( + throwing: MCPError.internalError( + "Transport deallocated during heartbeat")) + return + } + if let error = error { continuation.resume(throwing: error) } else { @@ -525,6 +532,86 @@ import Logging completion: .contentProcessed { [weak self] error in guard let self = self else { return } + if let error = error { + self.logger.error("Send error: \(error)") + + // Schedule reconnection check on a separate task + Task { [weak self] in + guard let self = self else { return } + let isStopping = await self.isStopping + if !isStopping && self.reconnectionConfig.enabled { + let isConnected = await self.isConnected + if isConnected && error.isConnectionLost { + self.logger.warning( + "Connection appears broken, will attempt to reconnect..." + ) + await self.setIsConnected(false) + try? await Task.sleep(for: .milliseconds(500)) + + let currentIsStopping = await self.isStopping + if !currentIsStopping { + self.connection.cancel() + try? await self.connect() + Task { @MainActor in + if !sendContinuationResumed { + sendContinuationResumed = true + if let error = error { + self.logger.error("Send error: \(error)") + + // Check if we should attempt to reconnect on send failure + let isStopping = await self.isStopping // Await actor-isolated property + if !isStopping && self.reconnectionConfig.enabled { + let isConnected = await self.isConnected + if isConnected { + if error.isConnectionLost { + self.logger.warning( + "Connection appears broken, will attempt to reconnect..." + ) + + // Schedule connection restart + Task { [weak self] in // Operate on self's executor + guard let self = self else { return } + + await self.setIsConnected(false) + + try? await Task.sleep(for: .milliseconds(500)) + + let currentIsStopping = await self.isStopping + if !currentIsStopping { + // Cancel the connection, then attempt to reconnect fully. + self.connection.cancel() + try? await self.connect() + } + } + } + if let error = error { + self.logger.error("Send error: \(error)") + + // Schedule reconnection check on a separate task + Task { [weak self] in + guard let self = self else { return } + let isStopping = await self.isStopping + if !isStopping && self.reconnectionConfig.enabled { + let isConnected = await self.isConnected + if isConnected && error.isConnectionLost { + self.logger.warning( + "Connection appears broken, will attempt to reconnect..." + ) + await self.setIsConnected(false) + try? await Task.sleep(for: .milliseconds(500)) + + let currentIsStopping = await self.isStopping + if !currentIsStopping { + self.connection.cancel() + try? await self.connect() + completion: .contentProcessed { [weak self, continuation] error in + guard let self = self else { + continuation.resume( + throwing: MCPError.internalError( + "Transport deallocated during send")) + return + } + if let error = error { self.logger.error("Send error: \(error)") @@ -751,6 +838,40 @@ import Logging continuation.resume(throwing: MCPError.connectionClosed) } else { continuation.resume(returning: Data()) + Task { @MainActor in + if !receiveContinuationResumed { + receiveContinuationResumed = true + if let error = error { + continuation.resume(throwing: MCPError.transportError(error)) + } else if let content = content { + continuation.resume(returning: content) + } else if isComplete { + self.logger.trace("Connection completed by peer") + continuation.resume(throwing: MCPError.connectionClosed) + } else { + // EOF: Resume with empty data instead of throwing an error + continuation.resume(returning: Data()) + } + } + if let error = error { + continuation.resume(throwing: MCPError.transportError(error)) + } else if let content = content { + continuation.resume(returning: content) + } else if isComplete { + self.logger.trace("Connection completed by peer") + continuation.resume(throwing: MCPError.connectionClosed) + } else { + continuation.resume(returning: Data()) + [weak self, continuation] content, _, isComplete, error in + if let error = error { + continuation.resume(throwing: MCPError.transportError(error)) + } else if let content = content { + continuation.resume(returning: content) + } else if isComplete { + self?.logger.trace("Connection completed by peer") + continuation.resume(throwing: MCPError.connectionClosed) + } else { + continuation.resume(returning: Data()) } } } diff --git a/Sources/MCP/Client/Client.swift b/Sources/MCP/Client/Client.swift index 57485ade..b972d95a 100644 --- a/Sources/MCP/Client/Client.swift +++ b/Sources/MCP/Client/Client.swift @@ -592,7 +592,10 @@ public actor Client { /// Use this object to add requests to the batch. /// - Throws: `MCPError.internalError` if the client is not connected. /// Can also rethrow errors from the `body` closure or from sending the batch request. - public func withBatch(body: @escaping @Sendable (Batch) async throws -> Void) async throws { + @discardableResult + public func withBatch( + body: @escaping @Sendable (Batch) async throws -> T + ) async throws -> T { guard let connection = connection else { throw MCPError.internalError("Client connection not initialized") } @@ -600,8 +603,8 @@ public actor Client { // Create Batch actor, passing self (Client) let batch = Batch(client: self) - // Populate the batch actor by calling the user's closure. - try await body(batch) + // Populate the batch actor by calling the user's closure and capture result. + let result = try await body(batch) // Get the collected requests from the batch actor let requests = await batch.requests @@ -609,7 +612,7 @@ public actor Client { // Check if there are any requests to send guard !requests.isEmpty else { await logger?.debug("Batch requested but no requests were added.") - return // Nothing to send + return result // Return result even if no requests } await logger?.debug( @@ -620,6 +623,7 @@ public actor Client { try await connection.send(data) // Responses will be handled asynchronously by the message loop and handleBatchResponse/handleResponse. + return result } // MARK: - Lifecycle diff --git a/Tests/MCPTests/ClientTests.swift b/Tests/MCPTests/ClientTests.swift index 8a2882a4..ef2e55ed 100644 --- a/Tests/MCPTests/ClientTests.swift +++ b/Tests/MCPTests/ClientTests.swift @@ -345,12 +345,10 @@ struct ClientTests { let request1 = Ping.request() let request2 = Ping.request() - nonisolated(unsafe) var resultTask1: Task? - nonisolated(unsafe) var resultTask2: Task? - - try await client.withBatch { batch in - resultTask1 = try await batch.addRequest(request1) - resultTask2 = try await batch.addRequest(request2) + let (resultTask1, resultTask2) = try await client.withBatch { batch in + let task1 = try await batch.addRequest(request1) + let task2 = try await batch.addRequest(request2) + return (task1, task2) } // Check if batch message was sent (after initialize and initialized notification) @@ -381,13 +379,8 @@ struct ClientTests { try await transport.queue(batch: [anyResponse1, anyResponse2]) // Wait for results and verify - guard let task1 = resultTask1, let task2 = resultTask2 else { - #expect(Bool(false), "Result tasks not created") - return - } - - _ = try await task1.value // Should succeed - _ = try await task2.value // Should succeed + _ = try await resultTask1.value // Should succeed + _ = try await resultTask2.value // Should succeed #expect(Bool(true)) // Reaching here means success @@ -426,11 +419,11 @@ struct ClientTests { let request1 = Ping.request() // Success let request2 = Ping.request() // Error - nonisolated(unsafe) var resultTasks: [Task] = [] - - try await client.withBatch { batch in - resultTasks.append(try await batch.addRequest(request1)) - resultTasks.append(try await batch.addRequest(request2)) + let resultTasks = try await client.withBatch { batch in + [ + try await batch.addRequest(request1), + try await batch.addRequest(request2), + ] } // Check if batch message was sent (after initialize and initialized notification) @@ -514,6 +507,49 @@ struct ClientTests { await client.disconnect() } + @Test("Batch request - empty with non-Void return") + func testBatchRequestEmptyNonVoid() async throws { + let transport = MockTransport() + let client = Client(name: "TestClient", version: "1.0") + + // Set up a task to handle the initialize response + let initTask = Task { + try await Task.sleep(for: .milliseconds(10)) + if let lastMessage = await transport.sentMessages.last, + let data = lastMessage.data(using: .utf8), + let request = try? JSONDecoder().decode(Request.self, from: data) + { + let response = Initialize.response( + id: request.id, + result: .init( + protocolVersion: Version.latest, + capabilities: .init(), + serverInfo: .init(name: "TestServer", version: "1.0"), + instructions: nil + ) + ) + try await transport.queue(response: response) + } + } + + try await client.connect(transport: transport) + try await Task.sleep(for: .milliseconds(10)) + initTask.cancel() + + // Call withBatch with non-Void return but don't add any requests + let result: Int = try await client.withBatch { _ in + 42 + } + + // Verify the closure's return value is passed through + #expect(result == 42) + + // Check that only initialize message and initialized notification were sent + #expect(await transport.sentMessages.count == 2) + + await client.disconnect() + } + @Test("Notify method sends notifications") func testClientNotify() async throws { let transport = MockTransport()