From 3916b8cb6a5c9aac0ab215c13bea3a8635994e26 Mon Sep 17 00:00:00 2001 From: stephentoub Date: Sat, 16 Apr 2016 20:58:48 -0400 Subject: [PATCH] Replace StreamAsyncHelper with StreamApmExtensions in System.Net.Security System.Net.Security was originally written to use Stream.Begin/EndRead/Write. When it was ported to .NET Core, it took a dependency on a helper that provided such Begin/End methods, but this helper was originally written for a different purpose: providing async Begin/End wrappers for the synchronous Read/Write methods. As a result of this mismatch, the async calls in System.Net.Security are queueing work items that then do work synchronously, blocking thread pool threads unnecessarily. This commit changes those helpers to sit on top of ReadAsync/WriteAsync instead. Eventually System.Net.Security should have its async I/O redone to sit natively on ReadAsync/WriteAsync using async/await, as it'll result in less overhead, but for now with minimal changes this improves the scalability of the library. --- .../src/System/IO/StreamApmExtensions.cs | 25 +++ src/Common/src/System/IO/StreamAsyncHelper.cs | 209 ------------------ .../VirtualNetwork/VirtualNetworkStream.cs | 13 +- .../src/Resources/Strings.resx | 6 - .../src/System.Net.Security.csproj | 7 +- .../Net/SecureProtocols/FixedSizeReader.cs | 9 +- .../InternalNegotiateStream.cs | 18 +- .../Net/SecureProtocols/NegotiateStream.cs | 8 +- .../System/Net/SecureProtocols/SslState.cs | 22 +- .../Net/SecureProtocols/SslStreamInternal.cs | 6 +- .../src/System/Net/StreamFramer.cs | 21 +- 11 files changed, 59 insertions(+), 285 deletions(-) create mode 100644 src/Common/src/System/IO/StreamApmExtensions.cs delete mode 100644 src/Common/src/System/IO/StreamAsyncHelper.cs diff --git a/src/Common/src/System/IO/StreamApmExtensions.cs b/src/Common/src/System/IO/StreamApmExtensions.cs new file mode 100644 index 000000000000..33e6e7b8684e --- /dev/null +++ b/src/Common/src/System/IO/StreamApmExtensions.cs @@ -0,0 +1,25 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. +// See the LICENSE file in the project root for more information. + +using System.IO; +using System.Threading.Tasks; + +namespace System.IO +{ + /// Provides Stream.Begin/EndRead/Write wrappers for Stream.Read/WriteAsync. + internal static class StreamApmExtensions + { + public static IAsyncResult BeginRead(this Stream stream, byte[] buffer, int offset, int count, AsyncCallback callback, object state) => + TaskToApm.Begin(stream.ReadAsync(buffer, offset, count), callback, state); + + public static int EndRead(this Stream stream, IAsyncResult asyncResult) => + TaskToApm.End(asyncResult); + + public static IAsyncResult BeginWrite(this Stream stream, byte[] buffer, int offset, int count, AsyncCallback callback, object state) => + TaskToApm.Begin(stream.WriteAsync(buffer, offset, count), callback, state); + + public static void EndWrite(this Stream stream, IAsyncResult asyncResult) => + TaskToApm.End(asyncResult); + } +} diff --git a/src/Common/src/System/IO/StreamAsyncHelper.cs b/src/Common/src/System/IO/StreamAsyncHelper.cs deleted file mode 100644 index 7061f2f99b05..000000000000 --- a/src/Common/src/System/IO/StreamAsyncHelper.cs +++ /dev/null @@ -1,209 +0,0 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. -// See the LICENSE file in the project root for more information. - -using System.Diagnostics; -using System.Diagnostics.Contracts; -using System.Threading; -using System.Threading.Tasks; - -namespace System.IO -{ - /// Provides support for implementing asynchronous operations on Streams. - internal sealed class StreamAsyncHelper - { - private SemaphoreSlim _asyncActiveSemaphore; - private Task _activeReadWriteTask; - private readonly Stream _stream; - - internal StreamAsyncHelper(Stream stream) - { - Debug.Assert(stream != null); - _stream = stream; - } - - private SemaphoreSlim EnsureAsyncActiveSemaphoreInitialized() - { - // Lazily-initialize _asyncActiveSemaphore. As we're never accessing the SemaphoreSlim's - // WaitHandle, we don't need to worry about Disposing it. - return LazyInitializer.EnsureInitialized(ref _asyncActiveSemaphore, () => new SemaphoreSlim(1, 1)); - } - - internal IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state) - { - if (!_stream.CanRead) - { - throw __Error.GetReadNotSupported(); - } - - return BeginReadWrite(true, buffer, offset, count, callback, state); - } - - internal IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, Object state) - { - if (!_stream.CanWrite) - { - throw __Error.GetWriteNotSupported(); - } - - return BeginReadWrite(false, buffer, offset, count, callback, state); - } - - private IAsyncResult BeginReadWrite(bool isRead, byte[] buffer, int offset, int count, AsyncCallback callback, object state) - { - // To avoid a race with a stream's position pointer & generating race - // conditions with internal buffer indexes in our own streams that - // don't natively support async IO operations when there are multiple - // async requests outstanding, we block the calling thread until - // the active one completes. - SemaphoreSlim sem = EnsureAsyncActiveSemaphoreInitialized(); - sem.Wait(); - Debug.Assert(_activeReadWriteTask == null); - - // Create the task to asynchronously run the read or write. Even though Task implements IAsyncResult, - // we wrap it in a special IAsyncResult object that stores all of the state for the operation - // and that we can pass around as a state parameter to all of our delegates. Even though this - // is an allocation, this allows us to avoid any closures or non-statically cached delegates - // for both the Task and its continuation, saving more allocations. - var asyncResult = new StreamReadWriteAsyncResult(_stream, buffer, offset, count, callback, state); - Task t; - if (isRead) - { - t = new Task(obj => - { - var ar = (StreamReadWriteAsyncResult)obj; - return ar._stream.Read(ar._buffer, ar._offset, ar._count); - }, asyncResult, CancellationToken.None, TaskCreationOptions.DenyChildAttach); - } - else - { - t = new Task(obj => - { - var ar = (StreamReadWriteAsyncResult)obj; - ar._stream.Write(ar._buffer, ar._offset, ar._count); - }, asyncResult, CancellationToken.None, TaskCreationOptions.DenyChildAttach); - } - - asyncResult._task = t; // this doesn't happen in the async result's ctor because the Task needs to reference the AR, and vice versa - - if (callback != null) - { - t.ContinueWith((_, obj) => - { - var ar = (StreamReadWriteAsyncResult)obj; - ar._callback(ar); - }, asyncResult, CancellationToken.None, TaskContinuationOptions.DenyChildAttach | TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default); - } - - _activeReadWriteTask = t; - t.Start(TaskScheduler.Default); - - return asyncResult; - } - - internal int EndRead(IAsyncResult asyncResult) - { - if (asyncResult == null) - { - throw new ArgumentNullException(nameof(asyncResult)); - } - - Contract.Ensures(Contract.Result() >= 0); - Contract.EndContractBlock(); - - var ar = asyncResult as StreamReadWriteAsyncResult; - var task = _activeReadWriteTask; - - if (task == null || ar == null) - { - throw new ArgumentException(SR.InvalidOperation_WrongAsyncResultOrEndReadCalledMultiple); - } - else if (task != ar._task) - { - throw new InvalidOperationException(SR.InvalidOperation_WrongAsyncResultOrEndReadCalledMultiple); - } - - Task readTask = task as Task; - if (readTask == null) - { - throw new ArgumentException(SR.InvalidOperation_WrongAsyncResultOrEndReadCalledMultiple); - } - - try - { - return readTask.GetAwaiter().GetResult(); // block until completion, then get result / propagate any exception - } - finally - { - _activeReadWriteTask = null; - Debug.Assert(_asyncActiveSemaphore != null, "Must have been initialized in order to get here."); - _asyncActiveSemaphore.Release(); - } - } - - internal void EndWrite(IAsyncResult asyncResult) - { - if (asyncResult == null) - { - throw new ArgumentNullException(nameof(asyncResult)); - } - - Contract.EndContractBlock(); - - var ar = asyncResult as StreamReadWriteAsyncResult; - var writeTask = _activeReadWriteTask; - - if (writeTask == null || ar == null) - { - throw new ArgumentException(SR.InvalidOperation_WrongAsyncResultOrEndWriteCalledMultiple); - } - else if (writeTask != ar._task) - { - throw new InvalidOperationException(SR.InvalidOperation_WrongAsyncResultOrEndWriteCalledMultiple); - } - else if (writeTask is Task) - { - throw new ArgumentException(SR.InvalidOperation_WrongAsyncResultOrEndWriteCalledMultiple); - } - - try - { - writeTask.GetAwaiter().GetResult(); // block until completion, then propagate any exceptions - } - finally - { - _activeReadWriteTask = null; - Debug.Assert(_asyncActiveSemaphore != null, "Must have been initialized in order to get here."); - _asyncActiveSemaphore.Release(); - } - } - - private sealed class StreamReadWriteAsyncResult : IAsyncResult - { - internal readonly Stream _stream; - internal readonly byte[] _buffer; - internal readonly int _offset; - internal readonly int _count; - internal readonly AsyncCallback _callback; - internal readonly object _state; - - internal Task _task; - - internal StreamReadWriteAsyncResult(Stream stream, byte[] buffer, int offset, int count, AsyncCallback callback, object state) - { - _stream = stream; - _buffer = buffer; - _offset = offset; - _count = count; - _callback = callback; - _state = state; - } - - object IAsyncResult.AsyncState { get { return _state; } } // return caller-provided state, not that from the task - bool IAsyncResult.CompletedSynchronously { get { return false; } } // we always complete asynchronously - - bool IAsyncResult.IsCompleted { get { return _task.IsCompleted; } } - WaitHandle IAsyncResult.AsyncWaitHandle { get { return ((IAsyncResult)_task).AsyncWaitHandle; } } - } - } -} diff --git a/src/Common/tests/System/Net/VirtualNetwork/VirtualNetworkStream.cs b/src/Common/tests/System/Net/VirtualNetwork/VirtualNetworkStream.cs index 7f35cc3a39cf..73ed76184af0 100644 --- a/src/Common/tests/System/Net/VirtualNetwork/VirtualNetworkStream.cs +++ b/src/Common/tests/System/Net/VirtualNetwork/VirtualNetworkStream.cs @@ -107,16 +107,9 @@ public override void Write(byte[] buffer, int offset, int count) public override Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) { - try - { - cancellationToken.ThrowIfCancellationRequested(); - int bytesRead = Read(buffer, offset, count); - return Task.FromResult(bytesRead); - } - catch (Exception e) - { - return Task.FromException(e); - } + return cancellationToken.IsCancellationRequested ? + Task.FromCanceled(cancellationToken) : + Task.Run(() => Read(buffer, offset, count)); } public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) diff --git a/src/System.Net.Security/src/Resources/Strings.resx b/src/System.Net.Security/src/Resources/Strings.resx index be553506a4f9..e04870df8caf 100644 --- a/src/System.Net.Security/src/Resources/Strings.resx +++ b/src/System.Net.Security/src/Resources/Strings.resx @@ -354,12 +354,6 @@ A service name must not be null or empty. - - Either the IAsyncResult object did not come from the corresponding async method on this type, or EndRead was called multiple times with the same IAsyncResult. - - - Either the IAsyncResult object did not come from the corresponding async method on this type, or EndWrite was called multiple times with the same IAsyncResult. - Cannot access a closed Stream. diff --git a/src/System.Net.Security/src/System.Net.Security.csproj b/src/System.Net.Security/src/System.Net.Security.csproj index 243e716af203..e3df7ba10594 100644 --- a/src/System.Net.Security/src/System.Net.Security.csproj +++ b/src/System.Net.Security/src/System.Net.Security.csproj @@ -139,8 +139,11 @@ Common\Microsoft\Win32\SafeHandles\SafeHandleZeroOrMinusOneIsInvalid.cs - - Common\System\IO\StreamAsyncHelper.cs + + Common\System\IO\StreamApmExtensions.cs + + + Common\System\Threading\Tasks\TaskToApm.cs Common\System\IO\Error.cs diff --git a/src/System.Net.Security/src/System/Net/SecureProtocols/FixedSizeReader.cs b/src/System.Net.Security/src/System/Net/SecureProtocols/FixedSizeReader.cs index c19c355bba36..9d904edb424f 100644 --- a/src/System.Net.Security/src/System/Net/SecureProtocols/FixedSizeReader.cs +++ b/src/System.Net.Security/src/System/Net/SecureProtocols/FixedSizeReader.cs @@ -15,8 +15,6 @@ internal class FixedSizeReader { private static readonly AsyncCallback s_readCallback = new AsyncCallback(ReadCallback); - // TODO (Issue #3114): Implement this using TPL instead of APM. - private readonly StreamAsyncHelper _transportAPM; private readonly Stream _transport; private AsyncProtocolRequest _request; private int _totalRead; @@ -24,7 +22,6 @@ internal class FixedSizeReader public FixedSizeReader(Stream transport) { _transport = transport; - _transportAPM = new StreamAsyncHelper(transport); } // @@ -73,7 +70,7 @@ private void StartReading() { while (true) { - IAsyncResult ar = _transportAPM.BeginRead(_request.Buffer, _request.Offset + _totalRead, _request.Count - _totalRead, s_readCallback, this); + IAsyncResult ar = _transport.BeginRead(_request.Buffer, _request.Offset + _totalRead, _request.Count - _totalRead, s_readCallback, this); if (!ar.CompletedSynchronously) { #if DEBUG @@ -82,7 +79,7 @@ private void StartReading() break; } - int bytes = _transportAPM.EndRead(ar); + int bytes = _transport.EndRead(ar); if (CheckCompletionBeforeNextRead(bytes)) { @@ -148,7 +145,7 @@ private static void ReadCallback(IAsyncResult transportResult) // Async completion. try { - int bytes = reader._transportAPM.EndRead(transportResult); + int bytes = reader._transport.EndRead(transportResult); if (reader.CheckCompletionBeforeNextRead(bytes)) { diff --git a/src/System.Net.Security/src/System/Net/SecureProtocols/InternalNegotiateStream.cs b/src/System.Net.Security/src/System/Net/SecureProtocols/InternalNegotiateStream.cs index 2715dfacc402..7217d462450a 100644 --- a/src/System.Net.Security/src/System/Net/SecureProtocols/InternalNegotiateStream.cs +++ b/src/System.Net.Security/src/System/Net/SecureProtocols/InternalNegotiateStream.cs @@ -28,22 +28,10 @@ public partial class NegotiateStream : AuthenticatedStream private FixedSizeReader _FrameReader; - // TODO (Issue #3114): Implement using TPL instead of APM. - private StreamAsyncHelper _innerStreamAPM; - - internal StreamAsyncHelper InnerStreamAPM - { - get - { - return _innerStreamAPM; - } - } - private void InitializeStreamPart() { _ReadHeader = new byte[4]; _FrameReader = new FixedSizeReader(InnerStream); - _innerStreamAPM = new StreamAsyncHelper(InnerStream); } private byte[] InternalBuffer @@ -179,13 +167,13 @@ private void StartWriting(byte[] buffer, int offset, int count, AsyncProtocolReq { // prepare for the next request asyncRequest.SetNextRequest(buffer, offset + chunkBytes, count - chunkBytes, null); - IAsyncResult ar = InnerStreamAPM.BeginWrite(outBuffer, 0, encryptedBytes, s_writeCallback, asyncRequest); + IAsyncResult ar = InnerStream.BeginWrite(outBuffer, 0, encryptedBytes, s_writeCallback, asyncRequest); if (!ar.CompletedSynchronously) { return; } - InnerStreamAPM.EndWrite(ar); + InnerStream.EndWrite(ar); } else { @@ -415,7 +403,7 @@ private static void WriteCallback(IAsyncResult transportResult) try { NegotiateStream negoStream = (NegotiateStream)asyncRequest.AsyncObject; - negoStream.InnerStreamAPM.EndWrite(transportResult); + negoStream.InnerStream.EndWrite(transportResult); if (asyncRequest.Count == 0) { // This was the last chunk. diff --git a/src/System.Net.Security/src/System/Net/SecureProtocols/NegotiateStream.cs b/src/System.Net.Security/src/System/Net/SecureProtocols/NegotiateStream.cs index bb757a063b12..6930a4f69a29 100644 --- a/src/System.Net.Security/src/System/Net/SecureProtocols/NegotiateStream.cs +++ b/src/System.Net.Security/src/System/Net/SecureProtocols/NegotiateStream.cs @@ -507,7 +507,7 @@ private IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallba if (!_negoState.CanGetSecureStream) { - return InnerStreamAPM.BeginRead(buffer, offset, count, asyncCallback, asyncState); + return InnerStream.BeginRead(buffer, offset, count, asyncCallback, asyncState); } BufferAsyncResult bufferResult = new BufferAsyncResult(this, buffer, offset, count, asyncState, asyncCallback); @@ -529,7 +529,7 @@ private int EndRead(IAsyncResult asyncResult) if (!_negoState.CanGetSecureStream) { - return InnerStreamAPM.EndRead(asyncResult); + return InnerStream.EndRead(asyncResult); } @@ -579,7 +579,7 @@ private IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallb if (!_negoState.CanGetSecureStream) { - return InnerStreamAPM.BeginWrite(buffer, offset, count, asyncCallback, asyncState); + return InnerStream.BeginWrite(buffer, offset, count, asyncCallback, asyncState); } BufferAsyncResult bufferResult = new BufferAsyncResult(this, buffer, offset, count, true, asyncState, asyncCallback); @@ -602,7 +602,7 @@ private void EndWrite(IAsyncResult asyncResult) if (!_negoState.CanGetSecureStream) { - InnerStreamAPM.EndWrite(asyncResult); + InnerStream.EndWrite(asyncResult); return; } diff --git a/src/System.Net.Security/src/System/Net/SecureProtocols/SslState.cs b/src/System.Net.Security/src/System/Net/SecureProtocols/SslState.cs index 71e5fa7d5c15..1ddebcda6d0d 100644 --- a/src/System.Net.Security/src/System/Net/SecureProtocols/SslState.cs +++ b/src/System.Net.Security/src/System/Net/SecureProtocols/SslState.cs @@ -26,9 +26,6 @@ internal class SslState private Stream _innerStream; - // TODO (Issue #3114): Implement using TPL instead of APM. - private StreamAsyncHelper _innerStreamAPM; - private SslStreamInternal _secureStream; private FixedSizeReader _reader; @@ -81,7 +78,6 @@ private enum CachedSessionStatus : byte internal SslState(Stream innerStream, RemoteCertValidationCallback certValidationCallback, LocalCertSelectionCallback certSelectionCallback, EncryptionPolicy encryptionPolicy) { _innerStream = innerStream; - _innerStreamAPM = new StreamAsyncHelper(innerStream); _reader = new FixedSizeReader(innerStream); _certValidationDelegate = certValidationCallback; _certSelectionDelegate = certSelectionCallback; @@ -388,14 +384,6 @@ internal Stream InnerStream } } - internal StreamAsyncHelper InnerStreamAPM - { - get - { - return _innerStreamAPM; - } - } - internal SslStreamInternal SecureStream { get @@ -784,7 +772,7 @@ private void StartSendBlob(byte[] incoming, int count, AsyncProtocolRequest asyn else { asyncRequest.AsyncState = message; - IAsyncResult ar = InnerStreamAPM.BeginWrite(message.Payload, 0, message.Size, s_writeCallback, asyncRequest); + IAsyncResult ar = InnerStream.BeginWrite(message.Payload, 0, message.Size, s_writeCallback, asyncRequest); if (!ar.CompletedSynchronously) { #if DEBUG @@ -793,7 +781,7 @@ private void StartSendBlob(byte[] incoming, int count, AsyncProtocolRequest asyn return; } - InnerStreamAPM.EndWrite(ar); + InnerStream.EndWrite(ar); } } @@ -987,12 +975,12 @@ private void StartSendAuthResetSignal(ProtocolToken message, AsyncProtocolReques else { asyncRequest.AsyncState = exception; - IAsyncResult ar = InnerStreamAPM.BeginWrite(message.Payload, 0, message.Size, s_writeCallback, asyncRequest); + IAsyncResult ar = InnerStream.BeginWrite(message.Payload, 0, message.Size, s_writeCallback, asyncRequest); if (!ar.CompletedSynchronously) { return; } - InnerStreamAPM.EndWrite(ar); + InnerStream.EndWrite(ar); } exception.Throw(); @@ -1072,7 +1060,7 @@ private static void WriteCallback(IAsyncResult transportResult) // Async completion. try { - sslState.InnerStreamAPM.EndWrite(transportResult); + sslState.InnerStream.EndWrite(transportResult); // Special case for an error notification. object asyncState = asyncRequest.AsyncState; diff --git a/src/System.Net.Security/src/System/Net/SecureProtocols/SslStreamInternal.cs b/src/System.Net.Security/src/System/Net/SecureProtocols/SslStreamInternal.cs index 3a0c79c7f973..b87cc3ea4001 100644 --- a/src/System.Net.Security/src/System/Net/SecureProtocols/SslStreamInternal.cs +++ b/src/System.Net.Security/src/System/Net/SecureProtocols/SslStreamInternal.cs @@ -421,13 +421,13 @@ private void StartWriting(byte[] buffer, int offset, int count, AsyncProtocolReq { // Prepare for the next request. asyncRequest.SetNextRequest(buffer, offset + chunkBytes, count - chunkBytes, s_resumeAsyncWriteCallback); - IAsyncResult ar = _sslState.InnerStreamAPM.BeginWrite(outBuffer, 0, encryptedBytes, s_writeCallback, asyncRequest); + IAsyncResult ar = _sslState.InnerStream.BeginWrite(outBuffer, 0, encryptedBytes, s_writeCallback, asyncRequest); if (!ar.CompletedSynchronously) { return; } - _sslState.InnerStreamAPM.EndWrite(ar); + _sslState.InnerStream.EndWrite(ar); } else @@ -761,7 +761,7 @@ private static void WriteCallback(IAsyncResult transportResult) try { - sslStream._sslState.InnerStreamAPM.EndWrite(transportResult); + sslStream._sslState.InnerStream.EndWrite(transportResult); sslStream._sslState.FinishWrite(); if (asyncRequest.Count == 0) diff --git a/src/System.Net.Security/src/System/Net/StreamFramer.cs b/src/System.Net.Security/src/System/Net/StreamFramer.cs index 1dc879463cd0..c92a08121cd0 100644 --- a/src/System.Net.Security/src/System/Net/StreamFramer.cs +++ b/src/System.Net.Security/src/System/Net/StreamFramer.cs @@ -12,9 +12,6 @@ internal class StreamFramer { private Stream _transport; - // TODO (Issue #3114): Implement using TPL instead of APM. - private StreamAsyncHelper _transportAPM; - private bool _eof; private FrameHeader _writeHeader = new FrameHeader(); @@ -43,8 +40,6 @@ public StreamFramer(Stream Transport) _readFrameCallback = new AsyncCallback(ReadFrameCallback); _beginWriteCallback = new AsyncCallback(BeginWriteCallback); - - _transportAPM = new StreamAsyncHelper(_transport); } public FrameHeader ReadHeader @@ -141,7 +136,7 @@ public IAsyncResult BeginReadMessage(AsyncCallback asyncCallback, object stateOb _readHeaderBuffer, 0, _readHeaderBuffer.Length); - IAsyncResult result = _transportAPM.BeginRead(_readHeaderBuffer, 0, _readHeaderBuffer.Length, + IAsyncResult result = _transport.BeginRead(_readHeaderBuffer, 0, _readHeaderBuffer.Length, _readFrameCallback, workerResult); if (result.CompletedSynchronously) @@ -214,7 +209,7 @@ private void ReadFrameComplete(IAsyncResult transportResult) WorkerAsyncResult workerResult = (WorkerAsyncResult)transportResult.AsyncState; - int bytesRead = _transportAPM.EndRead(transportResult); + int bytesRead = _transport.EndRead(transportResult); workerResult.Offset += bytesRead; if (!(workerResult.Offset <= workerResult.End)) @@ -291,7 +286,7 @@ private void ReadFrameComplete(IAsyncResult transportResult) } // This means we need more data to complete the data block. - transportResult = _transportAPM.BeginRead(workerResult.Buffer, workerResult.Offset, workerResult.End - workerResult.Offset, + transportResult = _transport.BeginRead(workerResult.Buffer, workerResult.Offset, workerResult.End - workerResult.Offset, _readFrameCallback, workerResult); } while (transportResult.CompletedSynchronously); } @@ -372,7 +367,7 @@ public IAsyncResult BeginWriteMessage(byte[] message, AsyncCallback asyncCallbac if (message.Length == 0) { - return _transportAPM.BeginWrite(_writeHeaderBuffer, 0, _writeHeaderBuffer.Length, + return _transport.BeginWrite(_writeHeaderBuffer, 0, _writeHeaderBuffer.Length, asyncCallback, stateObject); } @@ -381,7 +376,7 @@ public IAsyncResult BeginWriteMessage(byte[] message, AsyncCallback asyncCallbac message, 0, message.Length); // Charge the first: - IAsyncResult result = _transportAPM.BeginWrite(_writeHeaderBuffer, 0, _writeHeaderBuffer.Length, + IAsyncResult result = _transport.BeginWrite(_writeHeaderBuffer, 0, _writeHeaderBuffer.Length, _beginWriteCallback, workerResult); if (result.CompletedSynchronously) @@ -437,7 +432,7 @@ private void BeginWriteComplete(IAsyncResult transportResult) WorkerAsyncResult workerResult = (WorkerAsyncResult)transportResult.AsyncState; // First, complete the previous portion write. - _transportAPM.EndWrite(transportResult); + _transport.EndWrite(transportResult); // Check on exit criterion. if (workerResult.Offset == workerResult.End) @@ -450,7 +445,7 @@ private void BeginWriteComplete(IAsyncResult transportResult) workerResult.Offset = workerResult.End; // Write next portion (frame body) using Async IO. - transportResult = _transportAPM.BeginWrite(workerResult.Buffer, 0, workerResult.End, + transportResult = _transport.BeginWrite(workerResult.Buffer, 0, workerResult.End, _beginWriteCallback, workerResult); } while (transportResult.CompletedSynchronously); @@ -479,7 +474,7 @@ public void EndWriteMessage(IAsyncResult asyncResult) } else { - _transportAPM.EndWrite(asyncResult); + _transport.EndWrite(asyncResult); } } }