Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ System.Threading.Channel&lt;T&gt;</PackageDescription>
<Compile Include="System\Threading\Channels\Channel_1.cs" />
<Compile Include="System\Threading\Channels\Channel_2.cs" />
<Compile Include="System\Threading\Channels\IDebugEnumerator.cs" />
<Compile Include="System\Threading\Channels\RendezvousChannel.cs" />
<Compile Include="System\Threading\Channels\SingleConsumerUnboundedChannel.cs" />
<Compile Include="System\Threading\Channels\UnboundedChannel.cs" />
<Compile Include="$(CommonPath)Internal\Padding.cs" Link="Common\Internal\Padding.cs" />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,13 +388,7 @@ public override bool TryWrite(T item)
// There are no items in the channel, which means we may have blocked/waiting readers.

// Try to get a blocked reader that we can transfer the item to.
while (ChannelUtilities.TryDequeue(ref parent._blockedReadersHead, out blockedReader))
{
if (blockedReader.TryReserveCompletionIfCancelable())
{
break;
}
}
blockedReader = ChannelUtilities.TryDequeueAndReserveCompletionIfCancelable(ref parent._blockedReadersHead);

// If we weren't able to get a reader, instead queue the item and get any waiters that need to be notified.
if (blockedReader is null)
Expand Down Expand Up @@ -551,13 +545,7 @@ public override ValueTask WriteAsync(T item, CancellationToken cancellationToken
// There are no items in the channel, which means we may have blocked/waiting readers.

// Try to get a blocked reader that we can transfer the item to.
while (ChannelUtilities.TryDequeue(ref parent._blockedReadersHead, out blockedReader))
{
if (blockedReader.TryReserveCompletionIfCancelable())
{
break;
}
}
blockedReader = ChannelUtilities.TryDequeueAndReserveCompletionIfCancelable(ref parent._blockedReadersHead);

// If we weren't able to get a reader, instead queue the item and get any waiters that need to be notified.
if (blockedReader is null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public static Channel<T> CreateUnbounded<T>() =>
/// <typeparam name="T">Specifies the type of data in the channel.</typeparam>
/// <param name="options">Options that guide the behavior of the channel.</param>
/// <returns>The created channel.</returns>
/// <exception cref="ArgumentNullException"><paramref name="options"/> is <see langword="null"/>.</exception>
public static Channel<T> CreateUnbounded<T>(UnboundedChannelOptions options)
{
ArgumentNullException.ThrowIfNull(options);
Expand All @@ -35,35 +36,33 @@ public static Channel<T> CreateUnbounded<T>(UnboundedChannelOptions options)
/// Channels created with this method apply the <see cref="BoundedChannelFullMode.Wait"/>
/// behavior and prohibit continuations from running synchronously.
/// </remarks>
public static Channel<T> CreateBounded<T>(int capacity)
{
if (capacity < 1)
{
throw new ArgumentOutOfRangeException(nameof(capacity));
}

return new BoundedChannel<T>(capacity, BoundedChannelFullMode.Wait, runContinuationsAsynchronously: true, itemDropped: null);
}
/// <exception cref="ArgumentOutOfRangeException"><paramref name="capacity"/> is negative.</exception>
public static Channel<T> CreateBounded<T>(int capacity) =>
capacity > 0 ? new BoundedChannel<T>(capacity, BoundedChannelFullMode.Wait, runContinuationsAsynchronously: true, itemDropped: null) :
capacity == 0 ? new RendezvousChannel<T>(BoundedChannelFullMode.Wait, runContinuationsAsynchronously: true, itemDropped: null) :
throw new ArgumentOutOfRangeException(nameof(capacity));

/// <summary>Creates a channel subject to the provided options.</summary>
/// <typeparam name="T">Specifies the type of data in the channel.</typeparam>
/// <param name="options">Options that guide the behavior of the channel.</param>
/// <returns>The created channel.</returns>
public static Channel<T> CreateBounded<T>(BoundedChannelOptions options)
{
return CreateBounded<T>(options, itemDropped: null);
}
/// <exception cref="ArgumentNullException"><paramref name="options"/> is <see langword="null"/>.</exception>
public static Channel<T> CreateBounded<T>(BoundedChannelOptions options) =>
CreateBounded<T>(options, itemDropped: null);

/// <summary>Creates a channel subject to the provided options.</summary>
/// <typeparam name="T">Specifies the type of data in the channel.</typeparam>
/// <param name="options">Options that guide the behavior of the channel.</param>
/// <param name="itemDropped">Delegate that will be called when item is being dropped from channel. See <see cref="BoundedChannelFullMode"/>.</param>
/// <returns>The created channel.</returns>
/// <exception cref="ArgumentNullException"><paramref name="options"/> is <see langword="null"/>.</exception>
public static Channel<T> CreateBounded<T>(BoundedChannelOptions options, Action<T>? itemDropped)
{
ArgumentNullException.ThrowIfNull(options);

return new BoundedChannel<T>(options.Capacity, options.FullMode, !options.AllowSynchronousContinuations, itemDropped);
return
options.Capacity > 0 ? new BoundedChannel<T>(options.Capacity, options.FullMode, !options.AllowSynchronousContinuations, itemDropped) :
new RendezvousChannel<T>(options.FullMode, !options.AllowSynchronousContinuations, itemDropped);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,10 @@ public sealed class BoundedChannelOptions : ChannelOptions

/// <summary>Initializes the options.</summary>
/// <param name="capacity">The maximum number of items the bounded channel may store.</param>
/// <exception cref="ArgumentOutOfRangeException"><paramref name="capacity"/> is negative.</exception>
public BoundedChannelOptions(int capacity)
{
if (capacity < 1)
if (capacity < 0)
{
throw new ArgumentOutOfRangeException(nameof(capacity));
}
Expand All @@ -60,12 +61,13 @@ public BoundedChannelOptions(int capacity)
}

/// <summary>Gets or sets the maximum number of items the bounded channel may store.</summary>
/// <exception cref="ArgumentOutOfRangeException"><paramref name="value"/> is negative.</exception>
public int Capacity
{
get => _capacity;
set
{
if (value < 1)
if (value < 0)
{
throw new ArgumentOutOfRangeException(nameof(value));
}
Expand All @@ -74,6 +76,7 @@ public int Capacity
}

/// <summary>Gets or sets the behavior incurred by write operations when the channel is full.</summary>
/// <exception cref="ArgumentOutOfRangeException"><paramref name="value"/> is an invalid enum value.</exception>
public BoundedChannelFullMode FullMode
{
get => _mode;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,23 @@ internal static ValueTask<T> GetInvalidCompletionValueTask<T>(Exception error)
return new ValueTask<T>(t);
}

/// <summary>Dequeues from <paramref name="head"/> until an element is dequeued that can have completion reserved.</summary>
/// <param name="head">The head of the list, with items dequeued up through the returned element, or entirely if <see langword="null"/> is returned.</param>
/// <returns>The operation on which completion has been reserved, or null if none can be found.</returns>
internal static TAsyncOp? TryDequeueAndReserveCompletionIfCancelable<TAsyncOp>(ref TAsyncOp? head)
where TAsyncOp : AsyncOperation<TAsyncOp>
{
while (ChannelUtilities.TryDequeue(ref head, out var op))
{
if (op.TryReserveCompletionIfCancelable())
{
return op;
}
}

return null;
}

/// <summary>Dequeues an operation from the circular doubly-linked list referenced by <paramref name="head"/>.</summary>
/// <param name="head">The head of the list.</param>
/// <param name="op">The dequeued operation.</param>
Expand Down Expand Up @@ -317,6 +334,29 @@ internal static void AssertAll<TAsyncOp>(TAsyncOp? head, Func<TAsyncOp, bool> co
}
}

/// <summary>Counts the number of operations in the list.</summary>
/// <param name="head">The head of the queue of operations to count.</param>
internal static long CountOperations<TAsyncOp>(TAsyncOp? head)
Comment thread
stephentoub marked this conversation as resolved.
where TAsyncOp : AsyncOperation<TAsyncOp>
{
TAsyncOp? current = head;
long count = 0;

if (current is not null)
{
do
{
count++;

Debug.Assert(current is not null);
current = current.Next;
}
while (current != head);
}

return count;
}

/// <summary>Creates and returns an exception object to indicate that a channel has been closed.</summary>
internal static Exception CreateInvalidCompletionException(Exception? inner = null) =>
inner is OperationCanceledException ? inner :
Expand Down
Loading
Loading