Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// See the LICENSE file in the project root for more information.

using System.Threading.Tasks;
using Microsoft.DotNet.XUnitExtensions;
using Xunit;

namespace System.Threading.Channels.Tests
Expand Down Expand Up @@ -177,7 +178,7 @@ public void WriteAsync_TryRead_Many_DropNewest(int bufferedCapacity)
Assert.Equal(0, result);
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[Fact]
public async Task TryWrite_DropNewest_WrappedAroundInternalQueue()
{
var c = Channel.CreateBounded<int>(new BoundedChannelOptions(3) { FullMode = BoundedChannelFullMode.DropNewest });
Expand Down Expand Up @@ -249,7 +250,7 @@ public void WriteAsync_TryRead_Many_Ignore(int bufferedCapacity)
Assert.Equal(0, result);
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[Fact]
public async Task CancelPendingWrite_Reading_DataTransferredFromCorrectWriter()
{
var c = Channel.CreateBounded<int>(1);
Expand Down Expand Up @@ -371,7 +372,7 @@ public void ManyProducerConsumer_ConcurrentReadWrite_WithBufferedCapacity_Succes
Assert.Equal((NumItems * (NumItems + 1L)) / 2, readTotal);
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[Fact]
public async Task WaitToWriteAsync_AfterFullThenRead_ReturnsTrue()
{
var c = Channel.CreateBounded<int>(1);
Expand Down Expand Up @@ -409,11 +410,16 @@ public void AllowSynchronousContinuations_Reading_ContinuationsInvokedAccordingT
r.GetAwaiter().GetResult();
}

[ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[ConditionalTheory]
[InlineData(false)]
[InlineData(true)]
public void AllowSynchronousContinuations_CompletionTask_ContinuationsInvokedAccordingToSetting(bool allowSynchronousContinuations)
{
if (!allowSynchronousContinuations && !PlatformDetection.IsThreadingSupported)
{
throw new SkipTestException(nameof(PlatformDetection.IsThreadingSupported));
}

var c = Channel.CreateBounded<int>(new BoundedChannelOptions(1) { AllowSynchronousContinuations = allowSynchronousContinuations });

int expectedId = Environment.CurrentManagedThreadId;
Expand All @@ -427,7 +433,7 @@ public void AllowSynchronousContinuations_CompletionTask_ContinuationsInvokedAcc
r.GetAwaiter().GetResult();
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[Fact]
public async Task TryWrite_NoBlockedReaders_WaitingReader_WaiterNotified()
{
Channel<int> c = CreateChannel();
Expand Down
66 changes: 33 additions & 33 deletions src/libraries/System.Threading.Channels/tests/ChannelTestBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,15 @@ public void Completion_Idempotent()
Assert.Equal(TaskStatus.RanToCompletion, completion.Status);
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[Fact]
public async Task Complete_AfterEmpty_NoWaiters_TriggersCompletion()
{
Channel<int> c = CreateChannel();
c.Writer.Complete();
await c.Reader.Completion;
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[Fact]
public async Task Complete_AfterEmpty_WaitingReader_TriggersCompletion()
{
Channel<int> c = CreateChannel();
Expand All @@ -85,7 +85,7 @@ public async Task Complete_AfterEmpty_WaitingReader_TriggersCompletion()
await Assert.ThrowsAnyAsync<InvalidOperationException>(() => r);
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[Fact]
public async Task Complete_BeforeEmpty_WaitingReaders_TriggersCompletion()
{
Channel<int> c = CreateChannel();
Expand All @@ -112,7 +112,7 @@ public void TryComplete_Twice_ReturnsTrueThenFalse()
Assert.False(c.Writer.TryComplete());
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[Fact]
public async Task TryComplete_ErrorsPropage()
{
Channel<int> c;
Expand Down Expand Up @@ -326,7 +326,7 @@ public void WaitToWriteAsync_EmptyChannel_SynchronouslyCompletes()
Assert.True(write.Result);
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[Fact]
public async Task WaitToWriteAsync_ManyConcurrent_SatisifedByReaders()
{
if (RequiresSingleReader || RequiresSingleWriter)
Expand Down Expand Up @@ -375,15 +375,15 @@ public void TryWrite_AfterComplete_ReturnsFalse()
Assert.False(c.Writer.TryWrite(42));
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[Fact]
public async Task WriteAsync_AfterComplete_ThrowsException()
{
Channel<int> c = CreateChannel();
c.Writer.Complete();
await Assert.ThrowsAnyAsync<InvalidOperationException>(async () => await c.Writer.WriteAsync(42));
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[Fact]
public async Task Complete_WithException_PropagatesToCompletion()
{
Channel<int> c = CreateChannel();
Expand All @@ -392,7 +392,7 @@ public async Task Complete_WithException_PropagatesToCompletion()
Assert.Same(exc, await Assert.ThrowsAsync<FormatException>(() => c.Reader.Completion));
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[Fact]
public async Task Complete_WithCancellationException_PropagatesToCompletion()
{
Channel<int> c = CreateChannel();
Expand All @@ -407,7 +407,7 @@ public async Task Complete_WithCancellationException_PropagatesToCompletion()
await AssertCanceled(c.Reader.Completion, cts.Token);
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[Fact]
public async Task Complete_WithException_PropagatesToExistingWriter()
{
Channel<int> c = CreateFullChannel();
Expand All @@ -420,7 +420,7 @@ public async Task Complete_WithException_PropagatesToExistingWriter()
}
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[Fact]
public async Task Complete_WithException_PropagatesToNewWriter()
{
Channel<int> c = CreateChannel();
Expand All @@ -430,7 +430,7 @@ public async Task Complete_WithException_PropagatesToNewWriter()
Assert.Same(exc, (await Assert.ThrowsAsync<ChannelClosedException>(async () => await write)).InnerException);
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[Fact]
public async Task Complete_WithException_PropagatesToExistingWaitingReader()
{
Channel<int> c = CreateChannel();
Expand All @@ -440,7 +440,7 @@ public async Task Complete_WithException_PropagatesToExistingWaitingReader()
await Assert.ThrowsAsync<FormatException>(async () => await read);
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[Fact]
public async Task Complete_WithException_PropagatesToNewWaitingReader()
{
Channel<int> c = CreateChannel();
Expand All @@ -450,7 +450,7 @@ public async Task Complete_WithException_PropagatesToNewWaitingReader()
await Assert.ThrowsAsync<FormatException>(async () => await read);
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[Fact]
public async Task Complete_WithException_PropagatesToNewWaitingWriter()
{
Channel<int> c = CreateChannel();
Expand Down Expand Up @@ -524,7 +524,7 @@ public void Precancellation_WaitToReadAsync_ReturnsImmediately(bool dataAvailabl
Assert.True(waitTask.IsCanceled);
}

[ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[Theory]
[InlineData(false)]
[InlineData(true)]
public async Task WaitToReadAsync_DataWritten_CompletesSuccessfully(bool cancelable)
Expand All @@ -540,7 +540,7 @@ public async Task WaitToReadAsync_DataWritten_CompletesSuccessfully(bool cancela
Assert.True(await read);
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[Fact]
public async Task WaitToReadAsync_NoDataWritten_Canceled_CompletesAsCanceled()
{
Channel<int> c = CreateChannel();
Expand All @@ -552,7 +552,7 @@ public async Task WaitToReadAsync_NoDataWritten_Canceled_CompletesAsCanceled()
await Assert.ThrowsAnyAsync<OperationCanceledException>(async () => await read);
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[Fact]
public async Task ReadAsync_ThenWriteAsync_Succeeds()
{
Channel<int> c = CreateChannel();
Expand All @@ -566,7 +566,7 @@ public async Task ReadAsync_ThenWriteAsync_Succeeds()
Assert.Equal(42, await r);
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[Fact]
public async Task WriteAsync_ReadAsync_Succeeds()
{
Channel<int> c = CreateChannel();
Expand Down Expand Up @@ -594,7 +594,7 @@ public void Precancellation_ReadAsync_ReturnsImmediately(bool dataAvailable)
Assert.True(readTask.IsCanceled);
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[Fact]
public async Task ReadAsync_Canceled_CanceledAsynchronously()
{
Channel<int> c = CreateChannel();
Expand All @@ -613,7 +613,7 @@ public async Task ReadAsync_Canceled_CanceledAsynchronously()
}
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[Fact]
public async Task ReadAsync_WriteAsync_ManyConcurrentReaders_SerializedWriters_Success()
{
if (RequiresSingleReader)
Expand All @@ -633,7 +633,7 @@ public async Task ReadAsync_WriteAsync_ManyConcurrentReaders_SerializedWriters_S
Assert.Equal((Items * (Items - 1)) / 2, Enumerable.Sum(await Task.WhenAll(readers.Select(r => r.AsTask()))));
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[Fact]
public async Task ReadAsync_TryWrite_ManyConcurrentReaders_SerializedWriters_Success()
{
if (RequiresSingleReader)
Expand All @@ -658,15 +658,15 @@ public async Task ReadAsync_TryWrite_ManyConcurrentReaders_SerializedWriters_Suc
Assert.Equal((Items * (Items - 1)) / 2, Enumerable.Sum(await Task.WhenAll(readers)));
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[Fact]
public async Task ReadAsync_AlreadyCompleted_Throws()
{
Channel<int> c = CreateChannel();
c.Writer.Complete();
await Assert.ThrowsAsync<ChannelClosedException>(() => c.Reader.ReadAsync().AsTask());
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[Fact]
public async Task ReadAsync_SubsequentlyCompleted_Throws()
{
Channel<int> c = CreateChannel();
Expand All @@ -676,7 +676,7 @@ public async Task ReadAsync_SubsequentlyCompleted_Throws()
await Assert.ThrowsAsync<ChannelClosedException>(() => r);
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[Fact]
public async Task ReadAsync_AfterFaultedChannel_Throws()
{
Channel<int> c = CreateChannel();
Expand All @@ -689,7 +689,7 @@ public async Task ReadAsync_AfterFaultedChannel_Throws()
Assert.Same(e, cce.InnerException);
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[Fact]
public async Task ReadAsync_AfterCanceledChannel_Throws()
{
Channel<int> c = CreateChannel();
Expand All @@ -701,7 +701,7 @@ public async Task ReadAsync_AfterCanceledChannel_Throws()
await Assert.ThrowsAnyAsync<OperationCanceledException>(() => c.Reader.ReadAsync().AsTask());
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[Fact]
public async Task ReadAsync_Canceled_WriteAsyncCompletesNextReader()
{
Channel<int> c = CreateChannel();
Expand All @@ -722,7 +722,7 @@ public async Task ReadAsync_Canceled_WriteAsyncCompletesNextReader()
}
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[Fact]
public async Task ReadAsync_ConsecutiveReadsSucceed()
{
Channel<int> c = CreateChannel();
Expand All @@ -734,7 +734,7 @@ public async Task ReadAsync_ConsecutiveReadsSucceed()
}
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[Fact]
public async Task WaitToReadAsync_ConsecutiveReadsSucceed()
{
Channel<int> c = CreateChannel();
Expand Down Expand Up @@ -832,7 +832,7 @@ public void ReadAsync_MultipleContinuations_Throws(bool onCompleted, bool? conti
}
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[Fact]
public async Task WaitToReadAsync_AwaitThenGetResult_Throws()
{
Channel<int> c = CreateChannel();
Expand All @@ -845,7 +845,7 @@ public async Task WaitToReadAsync_AwaitThenGetResult_Throws()
Assert.Throws<InvalidOperationException>(() => read.GetAwaiter().GetResult());
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[Fact]
public async Task ReadAsync_AwaitThenGetResult_Throws()
{
Channel<int> c = CreateChannel();
Expand All @@ -858,7 +858,7 @@ public async Task ReadAsync_AwaitThenGetResult_Throws()
Assert.Throws<InvalidOperationException>(() => read.GetAwaiter().GetResult());
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[Fact]
public async Task WaitToWriteAsync_AwaitThenGetResult_Throws()
{
Channel<int> c = CreateFullChannel();
Expand All @@ -875,7 +875,7 @@ public async Task WaitToWriteAsync_AwaitThenGetResult_Throws()
Assert.Throws<InvalidOperationException>(() => write.GetAwaiter().GetResult());
}

[ConditionalFact(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[Fact]
public async Task WriteAsync_AwaitThenGetResult_Throws()
{
Channel<int> c = CreateFullChannel();
Expand Down Expand Up @@ -992,7 +992,7 @@ public static IEnumerable<object[]> Reader_ContinuesOnCurrentContextIfDesired_Me
from setNonDefaultTaskScheduler in new[] { true, false }
select new object[] { readOrWait, completeBeforeOnCompleted, flowExecutionContext, continueOnCapturedContext, setNonDefaultTaskScheduler };

[ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[Theory]
[MemberData(nameof(Reader_ContinuesOnCurrentContextIfDesired_MemberData))]
public async Task Reader_ContinuesOnCurrentSynchronizationContextIfDesired(
bool readOrWait, bool completeBeforeOnCompleted, bool flowExecutionContext, bool? continueOnCapturedContext, bool setNonDefaultTaskScheduler)
Expand Down Expand Up @@ -1088,7 +1088,7 @@ public static IEnumerable<object[]> Reader_ContinuesOnCurrentSchedulerIfDesired_
from setDefaultSyncContext in new[] { true, false }
select new object[] { readOrWait, completeBeforeOnCompleted, flowExecutionContext, continueOnCapturedContext, setDefaultSyncContext };

[ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))]
[Theory]
[MemberData(nameof(Reader_ContinuesOnCurrentSchedulerIfDesired_MemberData))]
public async Task Reader_ContinuesOnCurrentTaskSchedulerIfDesired(
bool readOrWait, bool completeBeforeOnCompleted, bool flowExecutionContext, bool? continueOnCapturedContext, bool setDefaultSyncContext)
Expand Down
Loading