diff --git a/src/libraries/Common/src/System/Net/StreamBuffer.cs b/src/libraries/Common/src/System/Net/StreamBuffer.cs
index e8ea8f961d85ad..aa7c74622b07d2 100644
--- a/src/libraries/Common/src/System/Net/StreamBuffer.cs
+++ b/src/libraries/Common/src/System/Net/StreamBuffer.cs
@@ -343,7 +343,6 @@ public void Reset()
{
if (_hasWaiter != 0)
{
- Debug.Fail("Concurrent use is not supported");
throw new InvalidOperationException("Concurrent use is not supported");
}
diff --git a/src/libraries/Common/tests/Common.Tests.csproj b/src/libraries/Common/tests/Common.Tests.csproj
index b7c9f6b3ebabc4..12e815eb4a5535 100644
--- a/src/libraries/Common/tests/Common.Tests.csproj
+++ b/src/libraries/Common/tests/Common.Tests.csproj
@@ -102,6 +102,12 @@
Link="Common\System\Threading\Tasks\TaskToApm.cs" />
+
+
+
+
+
+
UncompressedTestFiles()
{
@@ -36,8 +38,22 @@ public abstract class CompressionStreamTestBase : CompressionTestBase
public abstract Stream CreateStream(Stream stream, CompressionLevel level);
public abstract Stream CreateStream(Stream stream, CompressionLevel level, bool leaveOpen);
public abstract Stream BaseStream(Stream stream);
- public virtual bool FlushCompletes { get => true; }
- public virtual bool FlushNoOps { get => false; }
public virtual int BufferSize { get => 8192; }
+
+ protected override Task CreateConnectedStreamsAsync()
+ {
+ (Stream stream1, Stream stream2) = ConnectedStreams.CreateBidirectional(4 * 1024, 16 * 1024);
+ return Task.FromResult((CreateStream(stream1, CompressionMode.Compress), CreateStream(stream2, CompressionMode.Decompress)));
+ }
+
+ protected override Task CreateWrappedConnectedStreamsAsync(StreamPair wrapped, bool leaveOpen) =>
+ Task.FromResult((CreateStream(wrapped.Stream1, CompressionMode.Compress, leaveOpen), CreateStream(wrapped.Stream2, CompressionMode.Decompress, leaveOpen)));
+
+ protected override int BufferedSize => 16 * 1024 + BufferSize;
+ protected override bool UsableAfterCanceledReads => false;
+ protected override Type UnsupportedReadWriteExceptionType => typeof(InvalidOperationException);
+ protected override bool WrappedUsableAfterClose => false;
+ protected override bool FlushRequiredToWriteData => true;
+ protected override bool FlushGuaranteesAllDataWritten => false;
}
}
diff --git a/src/libraries/Common/tests/System/IO/Compression/CompressionStreamUnitTestBase.cs b/src/libraries/Common/tests/System/IO/Compression/CompressionStreamUnitTestBase.cs
index 96db35c4be9a37..7e81c077709312 100644
--- a/src/libraries/Common/tests/System/IO/Compression/CompressionStreamUnitTestBase.cs
+++ b/src/libraries/Common/tests/System/IO/Compression/CompressionStreamUnitTestBase.cs
@@ -2,7 +2,6 @@
// The .NET Foundation licenses this file to you under the MIT license.
using System.Collections.Generic;
-using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
@@ -17,8 +16,6 @@ public abstract class CompressionStreamUnitTestBase : CompressionStreamTestBase
[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
public virtual void FlushAsync_DuringWriteAsync()
{
- if (FlushNoOps)
- return;
byte[] buffer = new byte[100000];
Random rand = new Random();
rand.NextBytes(buffer);
@@ -52,8 +49,6 @@ public virtual void FlushAsync_DuringWriteAsync()
[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
public async Task FlushAsync_DuringReadAsync()
{
- if (FlushNoOps)
- return;
byte[] buffer = new byte[32];
string testFilePath = CompressedTestFile(UncompressedTestFile());
using (var readStream = await ManualSyncMemoryStream.GetStreamFromFileAsync(testFilePath, false))
@@ -81,8 +76,6 @@ public async Task FlushAsync_DuringReadAsync()
[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
public async Task FlushAsync_DuringFlushAsync()
{
- if (FlushNoOps)
- return;
byte[] buffer = null;
string testFilePath = CompressedTestFile(UncompressedTestFile());
using (var origStream = await LocalMemoryStream.readAppFileAsync(testFilePath))
@@ -121,64 +114,6 @@ public async Task FlushAsync_DuringFlushAsync()
}
}
- [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
- public virtual void WriteAsync_DuringWriteAsync()
- {
- byte[] buffer = new byte[100000];
- Random rand = new Random();
- rand.NextBytes(buffer);
-
- using (var writeStream = new ManualSyncMemoryStream(false))
- using (var compressor = CreateStream(writeStream, CompressionMode.Compress))
- {
- Task task = null;
- try
- {
- // Write needs to be big enough to trigger a write to the underlying base stream so the WriteAsync call doesn't immediately complete.
- task = compressor.WriteAsync(buffer, 0, buffer.Length);
- while (task.IsCompleted)
- {
- rand.NextBytes(buffer);
- task = compressor.WriteAsync(buffer, 0, buffer.Length);
- }
- Assert.Throws(() => { compressor.WriteAsync(buffer, 32, 32); }); // "overlapping write"
- }
- finally
- {
- // Unblock Async operations
- writeStream.manualResetEvent.Set();
- // The original WriteAsync should be able to complete
- Assert.True(task.Wait(TaskTimeout), "Original WriteAsync Task did not complete in time");
- Assert.True(writeStream.WriteHit, "BaseStream Write function was not called");
- }
- }
- }
-
- [ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
- public async Task ReadAsync_DuringReadAsync()
- {
- byte[] buffer = new byte[32];
- string testFilePath = CompressedTestFile(UncompressedTestFile());
- using (var readStream = await ManualSyncMemoryStream.GetStreamFromFileAsync(testFilePath, false))
- using (var decompressor = CreateStream(readStream, CompressionMode.Decompress, true))
- {
- Task task = null;
- try
- {
- task = decompressor.ReadAsync(buffer, 0, 32);
- Assert.Throws(() => { decompressor.ReadAsync(buffer, 0, 32); }); // "overlapping read"
- }
- finally
- {
- // Unblock Async operations
- readStream.manualResetEvent.Set();
- // The original ReadAsync should be able to complete
- Assert.True(task.Wait(TaskTimeout), "The original ReadAsync should be able to complete");
- Assert.True(readStream.ReadHit, "BaseStream ReadAsync should have been called");
- }
- }
- }
-
[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
public virtual async Task Dispose_WithUnfinishedReadAsync()
{
@@ -287,20 +222,6 @@ public async Task Read_BaseStreamSlowly()
}
}
- [Theory]
- [InlineData(CompressionMode.Compress)]
- [InlineData(CompressionMode.Decompress)]
- public void CanReadCanWrite(CompressionMode mode)
- {
- var ms = new MemoryStream();
- var compressor = CreateStream(ms, mode);
- Assert.True(mode == CompressionMode.Compress ? compressor.CanWrite : compressor.CanRead);
-
- compressor.Dispose();
- Assert.False(compressor.CanRead);
- Assert.False(compressor.CanWrite);
- }
-
[Theory]
[InlineData(CompressionMode.Compress)]
[InlineData(CompressionMode.Decompress)]
@@ -332,7 +253,7 @@ public void Ctor_DisposedBaseStream(CompressionMode mode)
[Theory]
[InlineData(CompressionMode.Compress)]
[InlineData(CompressionMode.Decompress)]
- public void Ctor_InvalidStreamCanReadCanWrite(CompressionMode mode)
+ public void Ctor_InvalidStream_Throws(CompressionMode mode)
{
LocalMemoryStream ms = new LocalMemoryStream();
ms.SetCanRead(mode == CompressionMode.Compress);
@@ -341,27 +262,26 @@ public void Ctor_InvalidStreamCanReadCanWrite(CompressionMode mode)
AssertExtensions.Throws("stream", () => CreateStream(ms, mode));
}
- public IEnumerable> CtorFunctions()
- {
- CompressionLevel[] legalValues = new CompressionLevel[] { CompressionLevel.Optimal, CompressionLevel.Fastest, CompressionLevel.NoCompression };
- yield return new Func((stream) => CreateStream(stream, CompressionMode.Compress));
- foreach (CompressionLevel level in legalValues)
- {
- yield return new Func((stream) => CreateStream(stream, level));
- bool[] boolValues = new bool[] { true, false };
+ [Fact]
+ public void TestCompressCtor()
+ {
+ IEnumerable> CtorFunctions()
+ {
+ yield return new Func((stream) => CreateStream(stream, CompressionMode.Compress));
- foreach (bool remainsOpen in boolValues)
+ foreach (CompressionLevel level in new[] { CompressionLevel.Optimal, CompressionLevel.Fastest, CompressionLevel.NoCompression })
{
- yield return new Func((stream) => CreateStream(stream, level, remainsOpen));
+ yield return new Func((stream) => CreateStream(stream, level));
+
+ foreach (bool remainsOpen in new[] { true, false })
+ {
+ yield return new Func((stream) => CreateStream(stream, level, remainsOpen));
+ }
}
}
- }
- [Fact]
- public void TestCompressCtor()
- {
Assert.All(CtorFunctions(), (create) =>
{
//Create the Stream
@@ -400,41 +320,6 @@ public void TestCompressCtor()
});
}
- [Theory]
- [InlineData(CompressionMode.Compress)]
- [InlineData(CompressionMode.Decompress)]
- public void TestLeaveOpen(CompressionMode mode)
- {
- //Create the Stream
- var baseStream = new MemoryStream();
- Stream compressor = CreateStream(baseStream, mode, leaveOpen: false);
- compressor.Dispose();
-
- //Check that Close has really closed the underlying stream
- Assert.Throws(() => baseStream.Write(new byte[] { }, 0, 0));
- }
-
- [Fact]
- public void TestLeaveOpenAfterValidCompress()
- {
- //Create the Stream
- int _bufferSize = 1024;
- var bytes = new byte[_bufferSize];
- var baseStream = new MemoryStream(bytes, writable: true);
- Stream compressor = CreateStream(baseStream, CompressionMode.Compress, leaveOpen: false);
-
- //Write some data and Close the stream
- string strData = "Test Data";
- var encoding = Encoding.UTF8;
- byte[] data = encoding.GetBytes(strData);
- compressor.Write(data, 0, data.Length);
- compressor.Flush();
- compressor.Dispose();
-
- //Check that Close has really closed the underlying stream
- Assert.Throws(() => baseStream.Write(bytes, 0, bytes.Length));
- }
-
[Fact]
public async Task TestLeaveOpenAfterValidDecompress()
{
@@ -470,619 +355,6 @@ public void Ctor_ArgumentValidation()
AssertExtensions.Throws("stream", () => CreateStream(new MemoryStream(new byte[1], writable: false), CompressionLevel.Optimal));
}
- [Theory]
- [InlineData(CompressionMode.Compress)]
- [InlineData(CompressionMode.Decompress)]
- public async Task Flush(CompressionMode mode)
- {
- var ms = new MemoryStream();
- var compressor = CreateStream(ms, mode);
- compressor.Flush();
- await compressor.FlushAsync();
- }
-
- [Theory]
- [InlineData(CompressionMode.Compress)]
- [InlineData(CompressionMode.Decompress)]
- public void Flush_Double(CompressionMode mode)
- {
- var ms = new MemoryStream();
- var compressor = CreateStream(ms, mode);
- compressor.Flush();
- compressor.Flush();
- }
-
- [Theory]
- [InlineData(CompressionMode.Compress)]
- [InlineData(CompressionMode.Decompress)]
- public void Dispose_Double(CompressionMode mode)
- {
- var ms = new MemoryStream();
- var compressor = CreateStream(ms, mode);
- compressor.Dispose();
- compressor.Dispose();
- }
-
- [Theory]
- [InlineData(CompressionMode.Compress)]
- [InlineData(CompressionMode.Decompress)]
- public void Flush_FollowedByDispose(CompressionMode mode)
- {
- var ms = new MemoryStream();
- var compressor = CreateStream(ms, mode);
- compressor.Flush();
- compressor.Dispose();
- }
-
- [Theory]
- [InlineData(CompressionMode.Compress)]
- [InlineData(CompressionMode.Decompress)]
- public void Dispose_FollowedBySyncOperations(CompressionMode mode)
- {
- var ms = new MemoryStream();
- var compressor = CreateStream(ms, mode);
- compressor.Dispose();
-
- if (mode == CompressionMode.Compress)
- Assert.Throws(() => compressor.Write(new byte[1], 0, 1));
- else
- Assert.Throws(() => compressor.Read(new byte[1], 0, 1));
- Assert.Throws(() => compressor.Flush());
- Assert.Throws(() => compressor.CopyTo(new MemoryStream()));
- }
-
- [Theory]
- [InlineData(CompressionMode.Compress)]
- [InlineData(CompressionMode.Decompress)]
- public virtual async Task Dispose_FollowedByAsyncOperations(CompressionMode mode)
- {
- var ms = new MemoryStream();
- var compressor = CreateStream(ms, mode);
- compressor.Dispose();
-
- if (mode == CompressionMode.Compress)
- await Assert.ThrowsAsync(async () => await compressor.WriteAsync(new byte[1], 0, 1));
- else
- await Assert.ThrowsAsync(async () => await compressor.ReadAsync(new byte[1], 0, 1));
- await Assert.ThrowsAsync(async () => await compressor.FlushAsync());
- await Assert.ThrowsAsync(async () => await compressor.CopyToAsync(new MemoryStream()));
- }
-
- [Theory]
- [InlineData(CompressionMode.Compress)]
- [InlineData(CompressionMode.Decompress)]
- public void TestSeekMethods(CompressionMode mode)
- {
- var ms = new MemoryStream();
- var decompressor = CreateStream(ms, mode);
- Assert.False(decompressor.CanSeek, "CanSeek should be false");
- Assert.Throws(() => decompressor.Length);
- Assert.Throws(() => decompressor.SetLength(1));
- Assert.Throws(() => decompressor.Position);
- Assert.Throws(() => decompressor.Position = 100L);
- Assert.Throws(() => decompressor.Seek(100L, SeekOrigin.Begin));
- }
-
- [Theory]
- [InlineData(false)]
- [InlineData(true)]
- public virtual void Write_ArgumentValidation(bool useAsync)
- {
- using (var decompressor = CreateStream(new MemoryStream(), CompressionMode.Compress))
- {
- Assert.Throws(() => { if (useAsync) { decompressor.WriteAsync(null, 0, 0).Wait(); } else { decompressor.Write(null, 0, 0); } });
- Assert.Throws(() => { if (useAsync) { decompressor.WriteAsync(new byte[1], -1, 0).Wait(); } else { decompressor.Write(new byte[1], -1, 0); } });
- Assert.Throws(() => { if (useAsync) { decompressor.WriteAsync(new byte[1], 0, -1).Wait(); } else { decompressor.Write(new byte[1], 0, -1); } });
- Assert.Throws(null, () => { if (useAsync) { decompressor.WriteAsync(new byte[1], 0, 2).Wait(); } else { decompressor.Write(new byte[1], 0, 2); } });
- Assert.Throws(null, () => { if (useAsync) { decompressor.WriteAsync(new byte[1], 1, 1).Wait(); } else { decompressor.Write(new byte[1], 1, 1); } });
- Assert.Throws(() => useAsync ? decompressor.ReadAsync(new byte[1], 0, 1).Result : decompressor.Read(new byte[1], 0, 1));
- Assert.Throws(null, () => useAsync ? decompressor.ReadAsync(new byte[1], 1, 1).Result : decompressor.Read(new byte[1], 1, 1));
- if (useAsync)
- { decompressor.WriteAsync(new byte[1], 0, 1).Wait(); }
- else
- { decompressor.Write(new byte[1], 0, 1); }
- }
- }
-
- [Theory]
- [InlineData(false)]
- [InlineData(true)]
- public virtual void Read_ArgumentValidation(bool useAsync)
- {
- using (var decompressor = CreateStream(new MemoryStream(), CompressionMode.Decompress))
- {
- Assert.Throws(() => useAsync ? decompressor.ReadAsync(null, 0, 0).Result : decompressor.Read(null, 0, 0));
- Assert.Throws(() => useAsync ? decompressor.ReadAsync(new byte[1], -1, 0).Result : decompressor.Read(new byte[1], -1, 0));
- Assert.Throws(() => useAsync ? decompressor.ReadAsync(new byte[1], 0, -1).Result : decompressor.Read(new byte[1], 0, -1));
- AssertExtensions.Throws(null, () => useAsync ? decompressor.ReadAsync(new byte[1], 0, 2).Result : decompressor.Read(new byte[1], 0, 2));
- AssertExtensions.Throws(null, () => useAsync ? decompressor.ReadAsync(new byte[1], 1, 1).Result : decompressor.Read(new byte[1], 1, 1));
- Assert.Throws(() => { if (useAsync) { decompressor.WriteAsync(new byte[1], 0, 1).Wait(); } else { decompressor.Write(new byte[1], 0, 1); } });
- Assert.Throws(null, () => { if (useAsync) { decompressor.WriteAsync(new byte[1], 1, 1).Wait(); } else { decompressor.Write(new byte[1], 1, 1); } });
-
- var data = new byte[1] { 42 };
- Assert.Equal(0, useAsync ? decompressor.ReadAsync(data, 0, 0).Result : decompressor.Read(data, 0, 0));
- Assert.Equal(42, data[0]);
- }
- }
-
- [Theory]
- [InlineData(CompressionMode.Compress)]
- [InlineData(CompressionMode.Decompress)]
- public void CopyToAsync_ArgumentValidation(CompressionMode mode)
- {
- using (Stream compressor = CreateStream(new MemoryStream(), mode))
- {
- AssertExtensions.Throws("destination", () => { compressor.CopyToAsync(null); });
- AssertExtensions.Throws("bufferSize", () => { compressor.CopyToAsync(new MemoryStream(), 0); });
- Assert.Throws(() => { compressor.CopyToAsync(new MemoryStream(new byte[1], writable: false)); });
- compressor.Dispose();
- Assert.Throws(() => { compressor.CopyToAsync(new MemoryStream()); });
- }
- }
-
- [Theory]
- [InlineData(CompressionMode.Compress)]
- [InlineData(CompressionMode.Decompress)]
- public void CopyTo_ArgumentValidation(CompressionMode mode)
- {
- using (Stream compressor = CreateStream(new MemoryStream(), mode))
- {
- AssertExtensions.Throws("destination", () => { compressor.CopyTo(null); });
- AssertExtensions.Throws("bufferSize", () => { compressor.CopyTo(new MemoryStream(), 0); });
- Assert.Throws(() => { compressor.CopyTo(new MemoryStream(new byte[1], writable: false)); });
- compressor.Dispose();
- Assert.Throws(() => { compressor.CopyTo(new MemoryStream()); });
- }
- }
-
- public enum ReadWriteMode
- {
- SyncArray,
- SyncSpan,
- AsyncArray,
- AsyncMemory,
- AsyncBeginEnd
- }
-
- public static IEnumerable