diff --git a/src/Common/src/System/IO/StreamApmExtensions.cs b/src/Common/src/System/IO/StreamApmExtensions.cs new file mode 100644 index 000000000000..33e6e7b8684e --- /dev/null +++ b/src/Common/src/System/IO/StreamApmExtensions.cs @@ -0,0 +1,25 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.IO; +using System.Threading.Tasks; + +namespace System.IO +{ + /// Provides Stream.Begin/EndRead/Write wrappers for Stream.Read/WriteAsync. + internal static class StreamApmExtensions + { + public static IAsyncResult BeginRead(this Stream stream, byte[] buffer, int offset, int count, AsyncCallback callback, object state) => + TaskToApm.Begin(stream.ReadAsync(buffer, offset, count), callback, state); + + public static int EndRead(this Stream stream, IAsyncResult asyncResult) => + TaskToApm.End(asyncResult); + + public static IAsyncResult BeginWrite(this Stream stream, byte[] buffer, int offset, int count, AsyncCallback callback, object state) => + TaskToApm.Begin(stream.WriteAsync(buffer, offset, count), callback, state); + + public static void EndWrite(this Stream stream, IAsyncResult asyncResult) => + TaskToApm.End(asyncResult); + } +} diff --git a/src/Common/src/System/IO/StreamAsyncHelper.cs b/src/Common/src/System/IO/StreamAsyncHelper.cs deleted file mode 100644 index 7061f2f99b05..000000000000 --- a/src/Common/src/System/IO/StreamAsyncHelper.cs +++ /dev/null @@ -1,209 +0,0 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. -// See the LICENSE file in the project root for more information. - -using System.Diagnostics; -using System.Diagnostics.Contracts; -using System.Threading; -using System.Threading.Tasks; - -namespace System.IO -{ - /// Provides support for implementing asynchronous operations on Streams. - internal sealed class StreamAsyncHelper - { - private SemaphoreSlim _asyncActiveSemaphore; - private Task _activeReadWriteTask; - private readonly Stream _stream; - - internal StreamAsyncHelper(Stream stream) - { - Debug.Assert(stream != null); - _stream = stream; - } - - private SemaphoreSlim EnsureAsyncActiveSemaphoreInitialized() - { - // Lazily-initialize _asyncActiveSemaphore. As we're never accessing the SemaphoreSlim's - // WaitHandle, we don't need to worry about Disposing it. - return LazyInitializer.EnsureInitialized(ref _asyncActiveSemaphore, () => new SemaphoreSlim(1, 1)); - } - - internal IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state) - { - if (!_stream.CanRead) - { - throw __Error.GetReadNotSupported(); - } - - return BeginReadWrite(true, buffer, offset, count, callback, state); - } - - internal IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state) - { - if (!_stream.CanWrite) - { - throw __Error.GetWriteNotSupported(); - } - - return BeginReadWrite(false, buffer, offset, count, callback, state); - } - - private IAsyncResult BeginReadWrite(bool isRead, byte[] buffer, int offset, int count, AsyncCallback callback, object state) - { - // To avoid a race with a stream's position pointer & generating race - // conditions with internal buffer indexes in our own streams that - // don't natively support async IO operations when there are multiple - // async requests outstanding, we block the calling thread until - // the active one completes. - SemaphoreSlim sem = EnsureAsyncActiveSemaphoreInitialized(); - sem.Wait(); - Debug.Assert(_activeReadWriteTask == null); - - // Create the task to asynchronously run the read or write. Even though Task implements IAsyncResult, - // we wrap it in a special IAsyncResult object that stores all of the state for the operation - // and that we can pass around as a state parameter to all of our delegates. Even though this - // is an allocation, this allows us to avoid any closures or non-statically cached delegates - // for both the Task and its continuation, saving more allocations. - var asyncResult = new StreamReadWriteAsyncResult(_stream, buffer, offset, count, callback, state); - Task t; - if (isRead) - { - t = new Task(obj => - { - var ar = (StreamReadWriteAsyncResult)obj; - return ar._stream.Read(ar._buffer, ar._offset, ar._count); - }, asyncResult, CancellationToken.None, TaskCreationOptions.DenyChildAttach); - } - else - { - t = new Task(obj => - { - var ar = (StreamReadWriteAsyncResult)obj; - ar._stream.Write(ar._buffer, ar._offset, ar._count); - }, asyncResult, CancellationToken.None, TaskCreationOptions.DenyChildAttach); - } - - asyncResult._task = t; // this doesn't happen in the async result's ctor because the Task needs to reference the AR, and vice versa - - if (callback != null) - { - t.ContinueWith((_, obj) => - { - var ar = (StreamReadWriteAsyncResult)obj; - ar._callback(ar); - }, asyncResult, CancellationToken.None, TaskContinuationOptions.DenyChildAttach | TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); - } - - _activeReadWriteTask = t; - t.Start(TaskScheduler.Default); - - return asyncResult; - } - - internal int EndRead(IAsyncResult asyncResult) - { - if (asyncResult == null) - { - throw new ArgumentNullException(nameof(asyncResult)); - } - - Contract.Ensures(Contract.Result() >= 0); - Contract.EndContractBlock(); - - var ar = asyncResult as StreamReadWriteAsyncResult; - var task = _activeReadWriteTask; - - if (task == null || ar == null) - { - throw new ArgumentException(SR.InvalidOperation_WrongAsyncResultOrEndReadCalledMultiple); - } - else if (task != ar._task) - { - throw new InvalidOperationException(SR.InvalidOperation_WrongAsyncResultOrEndReadCalledMultiple); - } - - Task readTask = task as Task; - if (readTask == null) - { - throw new ArgumentException(SR.InvalidOperation_WrongAsyncResultOrEndReadCalledMultiple); - } - - try - { - return readTask.GetAwaiter().GetResult(); // block until completion, then get result / propagate any exception - } - finally - { - _activeReadWriteTask = null; - Debug.Assert(_asyncActiveSemaphore != null, "Must have been initialized in order to get here."); - _asyncActiveSemaphore.Release(); - } - } - - internal void EndWrite(IAsyncResult asyncResult) - { - if (asyncResult == null) - { - throw new ArgumentNullException(nameof(asyncResult)); - } - - Contract.EndContractBlock(); - - var ar = asyncResult as StreamReadWriteAsyncResult; - var writeTask = _activeReadWriteTask; - - if (writeTask == null || ar == null) - { - throw new ArgumentException(SR.InvalidOperation_WrongAsyncResultOrEndWriteCalledMultiple); - } - else if (writeTask != ar._task) - { - throw new InvalidOperationException(SR.InvalidOperation_WrongAsyncResultOrEndWriteCalledMultiple); - } - else if (writeTask is Task) - { - throw new ArgumentException(SR.InvalidOperation_WrongAsyncResultOrEndWriteCalledMultiple); - } - - try - { - writeTask.GetAwaiter().GetResult(); // block until completion, then propagate any exceptions - } - finally - { - _activeReadWriteTask = null; - Debug.Assert(_asyncActiveSemaphore != null, "Must have been initialized in order to get here."); - _asyncActiveSemaphore.Release(); - } - } - - private sealed class StreamReadWriteAsyncResult : IAsyncResult - { - internal readonly Stream _stream; - internal readonly byte[] _buffer; - internal readonly int _offset; - internal readonly int _count; - internal readonly AsyncCallback _callback; - internal readonly object _state; - - internal Task _task; - - internal StreamReadWriteAsyncResult(Stream stream, byte[] buffer, int offset, int count, AsyncCallback callback, object state) - { - _stream = stream; - _buffer = buffer; - _offset = offset; - _count = count; - _callback = callback; - _state = state; - } - - object IAsyncResult.AsyncState { get { return _state; } } // return caller-provided state, not that from the task - bool IAsyncResult.CompletedSynchronously { get { return false; } } // we always complete asynchronously - - bool IAsyncResult.IsCompleted { get { return _task.IsCompleted; } } - WaitHandle IAsyncResult.AsyncWaitHandle { get { return ((IAsyncResult)_task).AsyncWaitHandle; } } - } - } -} diff --git a/src/Common/tests/System/Net/VirtualNetwork/VirtualNetworkStream.cs b/src/Common/tests/System/Net/VirtualNetwork/VirtualNetworkStream.cs index 7f35cc3a39cf..73ed76184af0 100644 --- a/src/Common/tests/System/Net/VirtualNetwork/VirtualNetworkStream.cs +++ b/src/Common/tests/System/Net/VirtualNetwork/VirtualNetworkStream.cs @@ -107,16 +107,9 @@ public override void Write(byte[] buffer, int offset, int count) public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { - try - { - cancellationToken.ThrowIfCancellationRequested(); - int bytesRead = Read(buffer, offset, count); - return Task.FromResult(bytesRead); - } - catch (Exception e) - { - return Task.FromException(e); - } + return cancellationToken.IsCancellationRequested ? + Task.FromCanceled(cancellationToken) : + Task.Run(() => Read(buffer, offset, count)); } public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) diff --git a/src/System.Net.Security/src/Resources/Strings.resx b/src/System.Net.Security/src/Resources/Strings.resx index be553506a4f9..e04870df8caf 100644 --- a/src/System.Net.Security/src/Resources/Strings.resx +++ b/src/System.Net.Security/src/Resources/Strings.resx @@ -354,12 +354,6 @@ A service name must not be null or empty. - - Either the IAsyncResult object did not come from the corresponding async method on this type, or EndRead was called multiple times with the same IAsyncResult. - - - Either the IAsyncResult object did not come from the corresponding async method on this type, or EndWrite was called multiple times with the same IAsyncResult. - Cannot access a closed Stream. diff --git a/src/System.Net.Security/src/System.Net.Security.csproj b/src/System.Net.Security/src/System.Net.Security.csproj index 243e716af203..e3df7ba10594 100644 --- a/src/System.Net.Security/src/System.Net.Security.csproj +++ b/src/System.Net.Security/src/System.Net.Security.csproj @@ -139,8 +139,11 @@ Common\Microsoft\Win32\SafeHandles\SafeHandleZeroOrMinusOneIsInvalid.cs - - Common\System\IO\StreamAsyncHelper.cs + + Common\System\IO\StreamApmExtensions.cs + + + Common\System\Threading\Tasks\TaskToApm.cs Common\System\IO\Error.cs diff --git a/src/System.Net.Security/src/System/Net/SecureProtocols/FixedSizeReader.cs b/src/System.Net.Security/src/System/Net/SecureProtocols/FixedSizeReader.cs index c19c355bba36..9d904edb424f 100644 --- a/src/System.Net.Security/src/System/Net/SecureProtocols/FixedSizeReader.cs +++ b/src/System.Net.Security/src/System/Net/SecureProtocols/FixedSizeReader.cs @@ -15,8 +15,6 @@ internal class FixedSizeReader { private static readonly AsyncCallback s_readCallback = new AsyncCallback(ReadCallback); - // TODO (Issue #3114): Implement this using TPL instead of APM. - private readonly StreamAsyncHelper _transportAPM; private readonly Stream _transport; private AsyncProtocolRequest _request; private int _totalRead; @@ -24,7 +22,6 @@ internal class FixedSizeReader public FixedSizeReader(Stream transport) { _transport = transport; - _transportAPM = new StreamAsyncHelper(transport); } // @@ -73,7 +70,7 @@ private void StartReading() { while (true) { - IAsyncResult ar = _transportAPM.BeginRead(_request.Buffer, _request.Offset + _totalRead, _request.Count - _totalRead, s_readCallback, this); + IAsyncResult ar = _transport.BeginRead(_request.Buffer, _request.Offset + _totalRead, _request.Count - _totalRead, s_readCallback, this); if (!ar.CompletedSynchronously) { #if DEBUG @@ -82,7 +79,7 @@ private void StartReading() break; } - int bytes = _transportAPM.EndRead(ar); + int bytes = _transport.EndRead(ar); if (CheckCompletionBeforeNextRead(bytes)) { @@ -148,7 +145,7 @@ private static void ReadCallback(IAsyncResult transportResult) // Async completion. try { - int bytes = reader._transportAPM.EndRead(transportResult); + int bytes = reader._transport.EndRead(transportResult); if (reader.CheckCompletionBeforeNextRead(bytes)) { diff --git a/src/System.Net.Security/src/System/Net/SecureProtocols/InternalNegotiateStream.cs b/src/System.Net.Security/src/System/Net/SecureProtocols/InternalNegotiateStream.cs index 2715dfacc402..7217d462450a 100644 --- a/src/System.Net.Security/src/System/Net/SecureProtocols/InternalNegotiateStream.cs +++ b/src/System.Net.Security/src/System/Net/SecureProtocols/InternalNegotiateStream.cs @@ -28,22 +28,10 @@ public partial class NegotiateStream : AuthenticatedStream private FixedSizeReader _FrameReader; - // TODO (Issue #3114): Implement using TPL instead of APM. - private StreamAsyncHelper _innerStreamAPM; - - internal StreamAsyncHelper InnerStreamAPM - { - get - { - return _innerStreamAPM; - } - } - private void InitializeStreamPart() { _ReadHeader = new byte[4]; _FrameReader = new FixedSizeReader(InnerStream); - _innerStreamAPM = new StreamAsyncHelper(InnerStream); } private byte[] InternalBuffer @@ -179,13 +167,13 @@ private void StartWriting(byte[] buffer, int offset, int count, AsyncProtocolReq { // prepare for the next request asyncRequest.SetNextRequest(buffer, offset + chunkBytes, count - chunkBytes, null); - IAsyncResult ar = InnerStreamAPM.BeginWrite(outBuffer, 0, encryptedBytes, s_writeCallback, asyncRequest); + IAsyncResult ar = InnerStream.BeginWrite(outBuffer, 0, encryptedBytes, s_writeCallback, asyncRequest); if (!ar.CompletedSynchronously) { return; } - InnerStreamAPM.EndWrite(ar); + InnerStream.EndWrite(ar); } else { @@ -415,7 +403,7 @@ private static void WriteCallback(IAsyncResult transportResult) try { NegotiateStream negoStream = (NegotiateStream)asyncRequest.AsyncObject; - negoStream.InnerStreamAPM.EndWrite(transportResult); + negoStream.InnerStream.EndWrite(transportResult); if (asyncRequest.Count == 0) { // This was the last chunk. diff --git a/src/System.Net.Security/src/System/Net/SecureProtocols/NegotiateStream.cs b/src/System.Net.Security/src/System/Net/SecureProtocols/NegotiateStream.cs index bb757a063b12..6930a4f69a29 100644 --- a/src/System.Net.Security/src/System/Net/SecureProtocols/NegotiateStream.cs +++ b/src/System.Net.Security/src/System/Net/SecureProtocols/NegotiateStream.cs @@ -507,7 +507,7 @@ private IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallba if (!_negoState.CanGetSecureStream) { - return InnerStreamAPM.BeginRead(buffer, offset, count, asyncCallback, asyncState); + return InnerStream.BeginRead(buffer, offset, count, asyncCallback, asyncState); } BufferAsyncResult bufferResult = new BufferAsyncResult(this, buffer, offset, count, asyncState, asyncCallback); @@ -529,7 +529,7 @@ private int EndRead(IAsyncResult asyncResult) if (!_negoState.CanGetSecureStream) { - return InnerStreamAPM.EndRead(asyncResult); + return InnerStream.EndRead(asyncResult); } @@ -579,7 +579,7 @@ private IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallb if (!_negoState.CanGetSecureStream) { - return InnerStreamAPM.BeginWrite(buffer, offset, count, asyncCallback, asyncState); + return InnerStream.BeginWrite(buffer, offset, count, asyncCallback, asyncState); } BufferAsyncResult bufferResult = new BufferAsyncResult(this, buffer, offset, count, true, asyncState, asyncCallback); @@ -602,7 +602,7 @@ private void EndWrite(IAsyncResult asyncResult) if (!_negoState.CanGetSecureStream) { - InnerStreamAPM.EndWrite(asyncResult); + InnerStream.EndWrite(asyncResult); return; } diff --git a/src/System.Net.Security/src/System/Net/SecureProtocols/SslState.cs b/src/System.Net.Security/src/System/Net/SecureProtocols/SslState.cs index 71e5fa7d5c15..1ddebcda6d0d 100644 --- a/src/System.Net.Security/src/System/Net/SecureProtocols/SslState.cs +++ b/src/System.Net.Security/src/System/Net/SecureProtocols/SslState.cs @@ -26,9 +26,6 @@ internal class SslState private Stream _innerStream; - // TODO (Issue #3114): Implement using TPL instead of APM. - private StreamAsyncHelper _innerStreamAPM; - private SslStreamInternal _secureStream; private FixedSizeReader _reader; @@ -81,7 +78,6 @@ private enum CachedSessionStatus : byte internal SslState(Stream innerStream, RemoteCertValidationCallback certValidationCallback, LocalCertSelectionCallback certSelectionCallback, EncryptionPolicy encryptionPolicy) { _innerStream = innerStream; - _innerStreamAPM = new StreamAsyncHelper(innerStream); _reader = new FixedSizeReader(innerStream); _certValidationDelegate = certValidationCallback; _certSelectionDelegate = certSelectionCallback; @@ -388,14 +384,6 @@ internal Stream InnerStream } } - internal StreamAsyncHelper InnerStreamAPM - { - get - { - return _innerStreamAPM; - } - } - internal SslStreamInternal SecureStream { get @@ -784,7 +772,7 @@ private void StartSendBlob(byte[] incoming, int count, AsyncProtocolRequest asyn else { asyncRequest.AsyncState = message; - IAsyncResult ar = InnerStreamAPM.BeginWrite(message.Payload, 0, message.Size, s_writeCallback, asyncRequest); + IAsyncResult ar = InnerStream.BeginWrite(message.Payload, 0, message.Size, s_writeCallback, asyncRequest); if (!ar.CompletedSynchronously) { #if DEBUG @@ -793,7 +781,7 @@ private void StartSendBlob(byte[] incoming, int count, AsyncProtocolRequest asyn return; } - InnerStreamAPM.EndWrite(ar); + InnerStream.EndWrite(ar); } } @@ -987,12 +975,12 @@ private void StartSendAuthResetSignal(ProtocolToken message, AsyncProtocolReques else { asyncRequest.AsyncState = exception; - IAsyncResult ar = InnerStreamAPM.BeginWrite(message.Payload, 0, message.Size, s_writeCallback, asyncRequest); + IAsyncResult ar = InnerStream.BeginWrite(message.Payload, 0, message.Size, s_writeCallback, asyncRequest); if (!ar.CompletedSynchronously) { return; } - InnerStreamAPM.EndWrite(ar); + InnerStream.EndWrite(ar); } exception.Throw(); @@ -1072,7 +1060,7 @@ private static void WriteCallback(IAsyncResult transportResult) // Async completion. try { - sslState.InnerStreamAPM.EndWrite(transportResult); + sslState.InnerStream.EndWrite(transportResult); // Special case for an error notification. object asyncState = asyncRequest.AsyncState; diff --git a/src/System.Net.Security/src/System/Net/SecureProtocols/SslStreamInternal.cs b/src/System.Net.Security/src/System/Net/SecureProtocols/SslStreamInternal.cs index 3a0c79c7f973..b87cc3ea4001 100644 --- a/src/System.Net.Security/src/System/Net/SecureProtocols/SslStreamInternal.cs +++ b/src/System.Net.Security/src/System/Net/SecureProtocols/SslStreamInternal.cs @@ -421,13 +421,13 @@ private void StartWriting(byte[] buffer, int offset, int count, AsyncProtocolReq { // Prepare for the next request. asyncRequest.SetNextRequest(buffer, offset + chunkBytes, count - chunkBytes, s_resumeAsyncWriteCallback); - IAsyncResult ar = _sslState.InnerStreamAPM.BeginWrite(outBuffer, 0, encryptedBytes, s_writeCallback, asyncRequest); + IAsyncResult ar = _sslState.InnerStream.BeginWrite(outBuffer, 0, encryptedBytes, s_writeCallback, asyncRequest); if (!ar.CompletedSynchronously) { return; } - _sslState.InnerStreamAPM.EndWrite(ar); + _sslState.InnerStream.EndWrite(ar); } else @@ -761,7 +761,7 @@ private static void WriteCallback(IAsyncResult transportResult) try { - sslStream._sslState.InnerStreamAPM.EndWrite(transportResult); + sslStream._sslState.InnerStream.EndWrite(transportResult); sslStream._sslState.FinishWrite(); if (asyncRequest.Count == 0) diff --git a/src/System.Net.Security/src/System/Net/StreamFramer.cs b/src/System.Net.Security/src/System/Net/StreamFramer.cs index 1dc879463cd0..c92a08121cd0 100644 --- a/src/System.Net.Security/src/System/Net/StreamFramer.cs +++ b/src/System.Net.Security/src/System/Net/StreamFramer.cs @@ -12,9 +12,6 @@ internal class StreamFramer { private Stream _transport; - // TODO (Issue #3114): Implement using TPL instead of APM. - private StreamAsyncHelper _transportAPM; - private bool _eof; private FrameHeader _writeHeader = new FrameHeader(); @@ -43,8 +40,6 @@ public StreamFramer(Stream Transport) _readFrameCallback = new AsyncCallback(ReadFrameCallback); _beginWriteCallback = new AsyncCallback(BeginWriteCallback); - - _transportAPM = new StreamAsyncHelper(_transport); } public FrameHeader ReadHeader @@ -141,7 +136,7 @@ public IAsyncResult BeginReadMessage(AsyncCallback asyncCallback, object stateOb _readHeaderBuffer, 0, _readHeaderBuffer.Length); - IAsyncResult result = _transportAPM.BeginRead(_readHeaderBuffer, 0, _readHeaderBuffer.Length, + IAsyncResult result = _transport.BeginRead(_readHeaderBuffer, 0, _readHeaderBuffer.Length, _readFrameCallback, workerResult); if (result.CompletedSynchronously) @@ -214,7 +209,7 @@ private void ReadFrameComplete(IAsyncResult transportResult) WorkerAsyncResult workerResult = (WorkerAsyncResult)transportResult.AsyncState; - int bytesRead = _transportAPM.EndRead(transportResult); + int bytesRead = _transport.EndRead(transportResult); workerResult.Offset += bytesRead; if (!(workerResult.Offset <= workerResult.End)) @@ -291,7 +286,7 @@ private void ReadFrameComplete(IAsyncResult transportResult) } // This means we need more data to complete the data block. - transportResult = _transportAPM.BeginRead(workerResult.Buffer, workerResult.Offset, workerResult.End - workerResult.Offset, + transportResult = _transport.BeginRead(workerResult.Buffer, workerResult.Offset, workerResult.End - workerResult.Offset, _readFrameCallback, workerResult); } while (transportResult.CompletedSynchronously); } @@ -372,7 +367,7 @@ public IAsyncResult BeginWriteMessage(byte[] message, AsyncCallback asyncCallbac if (message.Length == 0) { - return _transportAPM.BeginWrite(_writeHeaderBuffer, 0, _writeHeaderBuffer.Length, + return _transport.BeginWrite(_writeHeaderBuffer, 0, _writeHeaderBuffer.Length, asyncCallback, stateObject); } @@ -381,7 +376,7 @@ public IAsyncResult BeginWriteMessage(byte[] message, AsyncCallback asyncCallbac message, 0, message.Length); // Charge the first: - IAsyncResult result = _transportAPM.BeginWrite(_writeHeaderBuffer, 0, _writeHeaderBuffer.Length, + IAsyncResult result = _transport.BeginWrite(_writeHeaderBuffer, 0, _writeHeaderBuffer.Length, _beginWriteCallback, workerResult); if (result.CompletedSynchronously) @@ -437,7 +432,7 @@ private void BeginWriteComplete(IAsyncResult transportResult) WorkerAsyncResult workerResult = (WorkerAsyncResult)transportResult.AsyncState; // First, complete the previous portion write. - _transportAPM.EndWrite(transportResult); + _transport.EndWrite(transportResult); // Check on exit criterion. if (workerResult.Offset == workerResult.End) @@ -450,7 +445,7 @@ private void BeginWriteComplete(IAsyncResult transportResult) workerResult.Offset = workerResult.End; // Write next portion (frame body) using Async IO. - transportResult = _transportAPM.BeginWrite(workerResult.Buffer, 0, workerResult.End, + transportResult = _transport.BeginWrite(workerResult.Buffer, 0, workerResult.End, _beginWriteCallback, workerResult); } while (transportResult.CompletedSynchronously); @@ -479,7 +474,7 @@ public void EndWriteMessage(IAsyncResult asyncResult) } else { - _transportAPM.EndWrite(asyncResult); + _transport.EndWrite(asyncResult); } } }