From 556f7e289b922611cb867d26e2c9dd862b7982dc Mon Sep 17 00:00:00 2001 From: stephentoub Date: Tue, 19 Jan 2016 17:52:08 -0500 Subject: [PATCH] Two Stream.Read/WriteAsync improvements A logical port of two of the three fixes in https://github.com/dotnet/coreclr/pull/2724. Makes it so that Read/WriteAsync calls are serialized asynchronously rather than synchronously, and so that Wait()'ing on a Read/WriteAsync task may be able to inline the execution onto the current thread. This also reduces allocations when calling Read/WriteAsync (though there are still more optimizations that could be done). --- src/System.IO/src/System/IO/Stream.cs | 68 +++++++++++---------------- 1 file changed, 28 insertions(+), 40 deletions(-) diff --git a/src/System.IO/src/System/IO/Stream.cs b/src/System.IO/src/System/IO/Stream.cs index b66d795102a1..b841ce16987b 100644 --- a/src/System.IO/src/System/IO/Stream.cs +++ b/src/System.IO/src/System/IO/Stream.cs @@ -270,29 +270,23 @@ public virtual Task ReadAsync(Byte[] buffer, int offset, int count, Cancell return new Task(() => 0, cancellationToken); } - return ReadAsyncTask(buffer, offset, count, cancellationToken); - } - - private async Task ReadAsyncTask(Byte[] buffer, int offset, int count, CancellationToken cancellationToken) - { // To avoid a race with a stream's position pointer & generating race // conditions with internal buffer indexes in our own streams that // don't natively support async IO operations when there are multiple - // async requests outstanding, we will block the application's main - // thread if it does a second IO request until the first one completes. - EnsureAsyncActiveSemaphoreInitialized().Wait(); - - try - { - return await Task.Factory.StartNew(() => Read(buffer, offset, count), - cancellationToken, - TaskCreationOptions.DenyChildAttach, - TaskScheduler.Default); - } - finally - { - _asyncActiveSemaphore.Release(); - } + // async requests outstanding, we will serialize the requests. + return EnsureAsyncActiveSemaphoreInitialized().WaitAsync().ContinueWith((completedWait, s) => + { + Debug.Assert(completedWait.Status == TaskStatus.RanToCompletion); + var state = (Tuple)s; + try + { + return state.Item1.Read(state.Item2, state.Item3, state.Item4); // this.Read(buffer, offset, count); + } + finally + { + state.Item1._asyncActiveSemaphore.Release(); + } + }, Tuple.Create(this, buffer, offset, count), CancellationToken.None, TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default); } public Task WriteAsync(Byte[] buffer, int offset, int count) @@ -312,29 +306,23 @@ public virtual Task WriteAsync(Byte[] buffer, int offset, int count, Cancellatio return new Task(() => { }, cancellationToken); } - return WriteAsyncTask(buffer, offset, count, cancellationToken); - } - - private async Task WriteAsyncTask(Byte[] buffer, int offset, int count, CancellationToken cancellationToken) - { // To avoid a race with a stream's position pointer & generating race // conditions with internal buffer indexes in our own streams that // don't natively support async IO operations when there are multiple - // async requests outstanding, we will block the application's main - // thread if it does a second IO request until the first one completes. - EnsureAsyncActiveSemaphoreInitialized().Wait(); - - try - { - await Task.Factory.StartNew(() => Write(buffer, offset, count), - cancellationToken, - TaskCreationOptions.DenyChildAttach, - TaskScheduler.Default); - } - finally - { - _asyncActiveSemaphore.Release(); - } + // async requests outstanding, we will serialize the requests. + return EnsureAsyncActiveSemaphoreInitialized().WaitAsync().ContinueWith((completedWait, s) => + { + Debug.Assert(completedWait.Status == TaskStatus.RanToCompletion); + var state = (Tuple)s; + try + { + state.Item1.Write(state.Item2, state.Item3, state.Item4); // this.Write(buffer, offset, count); + } + finally + { + state.Item1._asyncActiveSemaphore.Release(); + } + }, Tuple.Create(this, buffer, offset, count), CancellationToken.None, TaskContinuationOptions.DenyChildAttach, TaskScheduler.Default); } public abstract long Seek(long offset, SeekOrigin origin);