Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .swift-version
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
6.3-snapshot-2026-03-05
12 changes: 6 additions & 6 deletions Package.swift
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,12 @@ import PackageDescription
let package = Package(
name: "mcp-swift-sdk",
platforms: [
.macOS("13.0"),
.macCatalyst("16.0"),
.iOS("16.0"),
.watchOS("9.0"),
.tvOS("16.0"),
.visionOS("1.0"),
.macOS("15.0"),
.macCatalyst("18.0"),
.iOS("18.0"),
.watchOS("11.0"),
.tvOS("18.0"),
.visionOS("2.0"),
],
products: [
// Products define the executables and libraries a package produces, making them visible to other packages.
Expand Down
100 changes: 45 additions & 55 deletions Sources/MCP/Base/Transports/NetworkTransport.swift
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import Logging

#if canImport(Network)
import Network
import Synchronization

/// Protocol that abstracts the Network.NWConnection functionality needed for NetworkTransport
@preconcurrency protocol NetworkConnectionProtocol {
Expand Down Expand Up @@ -511,8 +512,7 @@ import Logging
var messageWithNewline = message
messageWithNewline.append(UInt8(ascii: "\n"))

// Use a local actor-isolated variable to track continuation state
var sendContinuationResumed = false
let sendResumed = Atomic<Bool>(false)

try await withCheckedThrowingContinuation {
[weak self] (continuation: CheckedContinuation<Void, Swift.Error>) in
Expand All @@ -528,47 +528,39 @@ import Logging
completion: .contentProcessed { [weak self] error in
guard let self = self else { return }

Task { @MainActor in
if !sendContinuationResumed {
sendContinuationResumed = true
if let error = error {
self.logger.error("Send error: \(error)")

// Check if we should attempt to reconnect on send failure
let isStopping = await self.isStopping // Await actor-isolated property
if !isStopping && self.reconnectionConfig.enabled {
let isConnected = await self.isConnected
if isConnected {
if error.isConnectionLost {
self.logger.warning(
"Connection appears broken, will attempt to reconnect..."
)

// Schedule connection restart
Task { [weak self] in // Operate on self's executor
guard let self = self else { return }

await self.setIsConnected(false)

try? await Task.sleep(for: .milliseconds(500))

let currentIsStopping = await self.isStopping
if !currentIsStopping {
// Cancel the connection, then attempt to reconnect fully.
self.connection.cancel()
try? await self.connect()
}
}
}
let (exchanged, _) = sendResumed.compareExchange(
expected: false, desired: true, ordering: .acquiringAndReleasing)
guard exchanged else { return }

if let error = error {
self.logger.error("Send error: \(error)")

// Schedule reconnection check on a separate task
Task { [weak self] in
guard let self = self else { return }
let isStopping = await self.isStopping
if !isStopping && self.reconnectionConfig.enabled {
let isConnected = await self.isConnected
if isConnected && error.isConnectionLost {
self.logger.warning(
"Connection appears broken, will attempt to reconnect..."
)
await self.setIsConnected(false)
try? await Task.sleep(for: .milliseconds(500))

let currentIsStopping = await self.isStopping
if !currentIsStopping {
self.connection.cancel()
try? await self.connect()
}
}

continuation.resume(
throwing: MCPError.internalError("Send error: \(error)"))
} else {
continuation.resume()
}
}

continuation.resume(
throwing: MCPError.internalError("Send error: \(error)"))
} else {
continuation.resume()
}
})
}
Expand Down Expand Up @@ -747,7 +739,7 @@ import Logging
/// - Returns: The received data chunk
/// - Throws: Network errors or transport failures
private func receiveData() async throws -> Data {
var receiveContinuationResumed = false
let receiveResumed = Atomic<Bool>(false)

return try await withCheckedThrowingContinuation {
[weak self] (continuation: CheckedContinuation<Data, Swift.Error>) in
Expand All @@ -759,21 +751,19 @@ import Logging
let maxLength = bufferConfig.maxReceiveBufferSize ?? Int.max
connection.receive(minimumIncompleteLength: 1, maximumLength: maxLength) {
content, _, isComplete, error in
Task { @MainActor in
if !receiveContinuationResumed {
receiveContinuationResumed = true
if let error = error {
continuation.resume(throwing: MCPError.transportError(error))
} else if let content = content {
continuation.resume(returning: content)
} else if isComplete {
self.logger.trace("Connection completed by peer")
continuation.resume(throwing: MCPError.connectionClosed)
} else {
// EOF: Resume with empty data instead of throwing an error
continuation.resume(returning: Data())
}
}
let (exchanged, _) = receiveResumed.compareExchange(
expected: false, desired: true, ordering: .acquiringAndReleasing)
guard exchanged else { return }

if let error = error {
continuation.resume(throwing: MCPError.transportError(error))
} else if let content = content {
continuation.resume(returning: content)
} else if isComplete {
self.logger.trace("Connection completed by peer")
continuation.resume(throwing: MCPError.connectionClosed)
} else {
continuation.resume(returning: Data())
}
}
}
Expand Down
12 changes: 8 additions & 4 deletions Sources/MCP/Client/Client.swift
Original file line number Diff line number Diff line change
Expand Up @@ -592,24 +592,27 @@ public actor Client {
/// Use this object to add requests to the batch.
/// - Throws: `MCPError.internalError` if the client is not connected.
/// Can also rethrow errors from the `body` closure or from sending the batch request.
public func withBatch(body: @escaping @Sendable (Batch) async throws -> Void) async throws {
@discardableResult
public func withBatch<T: Sendable>(
body: @escaping @Sendable (Batch) async throws -> T
) async throws -> T {
guard let connection = connection else {
throw MCPError.internalError("Client connection not initialized")
}

// Create Batch actor, passing self (Client)
let batch = Batch(client: self)

// Populate the batch actor by calling the user's closure.
try await body(batch)
// Populate the batch actor by calling the user's closure and capture result.
let result = try await body(batch)

// Get the collected requests from the batch actor
let requests = await batch.requests

// Check if there are any requests to send
guard !requests.isEmpty else {
await logger?.debug("Batch requested but no requests were added.")
return // Nothing to send
return result // Return result even if no requests
}

await logger?.debug(
Expand All @@ -620,6 +623,7 @@ public actor Client {
try await connection.send(data)

// Responses will be handled asynchronously by the message loop and handleBatchResponse/handleResponse.
return result
}

// MARK: - Lifecycle
Expand Down
29 changes: 11 additions & 18 deletions Tests/MCPTests/ClientTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -345,12 +345,10 @@ struct ClientTests {

let request1 = Ping.request()
let request2 = Ping.request()
nonisolated(unsafe) var resultTask1: Task<Ping.Result, Swift.Error>?
nonisolated(unsafe) var resultTask2: Task<Ping.Result, Swift.Error>?

try await client.withBatch { batch in
resultTask1 = try await batch.addRequest(request1)
resultTask2 = try await batch.addRequest(request2)
let (resultTask1, resultTask2) = try await client.withBatch { batch in
let task1 = try await batch.addRequest(request1)
let task2 = try await batch.addRequest(request2)
return (task1, task2)
}

// Check if batch message was sent (after initialize and initialized notification)
Expand Down Expand Up @@ -381,13 +379,8 @@ struct ClientTests {
try await transport.queue(batch: [anyResponse1, anyResponse2])

// Wait for results and verify
guard let task1 = resultTask1, let task2 = resultTask2 else {
#expect(Bool(false), "Result tasks not created")
return
}

_ = try await task1.value // Should succeed
_ = try await task2.value // Should succeed
_ = try await resultTask1.value // Should succeed
_ = try await resultTask2.value // Should succeed

#expect(Bool(true)) // Reaching here means success

Expand Down Expand Up @@ -426,11 +419,11 @@ struct ClientTests {
let request1 = Ping.request() // Success
let request2 = Ping.request() // Error

nonisolated(unsafe) var resultTasks: [Task<Ping.Result, Swift.Error>] = []

try await client.withBatch { batch in
resultTasks.append(try await batch.addRequest(request1))
resultTasks.append(try await batch.addRequest(request2))
let resultTasks = try await client.withBatch { batch in
[
try await batch.addRequest(request1),
try await batch.addRequest(request2),
]
}

// Check if batch message was sent (after initialize and initialized notification)
Expand Down