diff --git a/src/coreclr/vm/wasm/callhelpers-pinvoke.cpp b/src/coreclr/vm/wasm/callhelpers-pinvoke.cpp
index 2be5428681fe54..3bc02cd24132eb 100644
--- a/src/coreclr/vm/wasm/callhelpers-pinvoke.cpp
+++ b/src/coreclr/vm/wasm/callhelpers-pinvoke.cpp
@@ -133,7 +133,6 @@ extern "C" {
int32_t SystemNative_PosixFAdvise (void *, int64_t, int64_t, int32_t);
int32_t SystemNative_Read (void *, void *, int32_t);
int32_t SystemNative_ReadDir (void *, void *);
- int32_t SystemNative_ReadFromNonblocking (void *, void *, int32_t);
int32_t SystemNative_ReadLink (void *, void *, int32_t);
int64_t SystemNative_ReadV (void *, void *, int32_t);
void * SystemNative_Realloc (void *, void *);
@@ -151,7 +150,6 @@ extern "C" {
int32_t SystemNative_UTimensat (void *, void *);
int32_t SystemNative_Unlink (void *);
int32_t SystemNative_Write (void *, void *, int32_t);
- int32_t SystemNative_WriteToNonblocking (void *, void *, int32_t);
int64_t SystemNative_WriteV (void *, void *, int32_t);
} // extern "C"
@@ -277,7 +275,6 @@ static const Entry s_libSystem_Native [] = {
DllImportEntry(SystemNative_PosixFAdvise) // System.Private.CoreLib
DllImportEntry(SystemNative_Read) // System.Private.CoreLib
DllImportEntry(SystemNative_ReadDir) // System.Private.CoreLib
- DllImportEntry(SystemNative_ReadFromNonblocking) // System.Private.CoreLib
DllImportEntry(SystemNative_ReadLink) // System.Private.CoreLib
DllImportEntry(SystemNative_ReadV) // System.Private.CoreLib
DllImportEntry(SystemNative_Realloc) // System.Private.CoreLib
@@ -294,8 +291,7 @@ static const Entry s_libSystem_Native [] = {
DllImportEntry(SystemNative_SysLog) // System.Private.CoreLib
DllImportEntry(SystemNative_UTimensat) // System.Private.CoreLib
DllImportEntry(SystemNative_Unlink) // System.IO.MemoryMappedFiles, System.Private.CoreLib
- DllImportEntry(SystemNative_Write) // System.Console, System.Private.CoreLib
- DllImportEntry(SystemNative_WriteToNonblocking) // System.Private.CoreLib
+ DllImportEntry(SystemNative_Write) // System.Console, System.Private.Core
DllImportEntry(SystemNative_WriteV) // System.Private.CoreLib
};
diff --git a/src/libraries/Common/src/Interop/Unix/System.Native/Interop.HandleEvents.cs b/src/libraries/Common/src/Interop/Unix/System.Native/Interop.HandleEvents.cs
new file mode 100644
index 00000000000000..7da71b356c4ab4
--- /dev/null
+++ b/src/libraries/Common/src/Interop/Unix/System.Native/Interop.HandleEvents.cs
@@ -0,0 +1,51 @@
+// Licensed to the .NET Foundation under one or more agreements.
+// The .NET Foundation licenses this file to you under the MIT license.
+
+using System;
+using System.Runtime.InteropServices;
+
+internal static partial class Interop
+{
+ internal static partial class Sys
+ {
+ [Flags]
+ internal enum HandleEvents : int
+ {
+ None = 0x00,
+ Read = 0x01,
+ Write = 0x02,
+ ReadClose = 0x04,
+ Close = 0x08,
+ Error = 0x10
+ }
+
+ [StructLayout(LayoutKind.Sequential)]
+ internal struct HandleEvent
+ {
+ public IntPtr Data;
+ public HandleEvents Events;
+ private int _padding;
+ }
+
+ [LibraryImport(Libraries.SystemNative, EntryPoint = "SystemNative_CreateHandleEventPort")]
+ internal static unsafe partial Error CreateHandleEventPort(IntPtr* port);
+
+ [LibraryImport(Libraries.SystemNative, EntryPoint = "SystemNative_CloseHandleEventPort")]
+ internal static partial Error CloseHandleEventPort(IntPtr port);
+
+ [LibraryImport(Libraries.SystemNative, EntryPoint = "SystemNative_CreateHandleEventBuffer")]
+ internal static unsafe partial Error CreateHandleEventBuffer(int count, HandleEvent** buffer);
+
+ [LibraryImport(Libraries.SystemNative, EntryPoint = "SystemNative_FreeHandleEventBuffer")]
+ internal static unsafe partial Error FreeHandleEventBuffer(HandleEvent* buffer);
+
+ [LibraryImport(Libraries.SystemNative, EntryPoint = "SystemNative_TryChangeHandleEventRegistration")]
+ internal static partial Error TryChangeHandleEventRegistration(IntPtr port, SafeHandle socket, HandleEvents currentEvents, HandleEvents newEvents, IntPtr data);
+
+ [LibraryImport(Libraries.SystemNative, EntryPoint = "SystemNative_TryChangeHandleEventRegistration")]
+ internal static partial Error TryChangeHandleEventRegistration(IntPtr port, IntPtr socket, HandleEvents currentEvents, HandleEvents newEvents, IntPtr data);
+
+ [LibraryImport(Libraries.SystemNative, EntryPoint = "SystemNative_WaitForHandleEvents")]
+ internal static unsafe partial Error WaitForHandleEvents(IntPtr port, HandleEvent* buffer, int* count);
+ }
+}
diff --git a/src/libraries/Common/src/Interop/Unix/System.Native/Interop.Read.Pipe.cs b/src/libraries/Common/src/Interop/Unix/System.Native/Interop.Read.Pipe.cs
deleted file mode 100644
index ef2c39ddbc0656..00000000000000
--- a/src/libraries/Common/src/Interop/Unix/System.Native/Interop.Read.Pipe.cs
+++ /dev/null
@@ -1,24 +0,0 @@
-// Licensed to the .NET Foundation under one or more agreements.
-// The .NET Foundation licenses this file to you under the MIT license.
-
-using System.Runtime.InteropServices;
-using Microsoft.Win32.SafeHandles;
-
-internal static partial class Interop
-{
- internal static partial class Sys
- {
- ///
- /// Reads a number of bytes from an open file descriptor into a specified buffer.
- ///
- /// The open file descriptor to try to read from
- /// The buffer to read info into
- /// The size of the buffer
- ///
- /// Returns the number of bytes read on success; otherwise, -1 is returned
- /// Note - on fail. the position of the stream may change depending on the platform; consult man 2 read for more info
- ///
- [LibraryImport(Libraries.SystemNative, EntryPoint = "SystemNative_Read", SetLastError = true)]
- internal static unsafe partial int Read(SafePipeHandle fd, byte* buffer, int count);
- }
-}
diff --git a/src/libraries/Common/src/Interop/Unix/System.Native/Interop.Read.cs b/src/libraries/Common/src/Interop/Unix/System.Native/Interop.Read.cs
index d1005d8740d26e..8318d1822c67c0 100644
--- a/src/libraries/Common/src/Interop/Unix/System.Native/Interop.Read.cs
+++ b/src/libraries/Common/src/Interop/Unix/System.Native/Interop.Read.cs
@@ -20,8 +20,5 @@ internal static partial class Sys
///
[LibraryImport(Libraries.SystemNative, EntryPoint = "SystemNative_Read", SetLastError = true)]
internal static unsafe partial int Read(SafeHandle fd, byte* buffer, int count);
-
- [LibraryImport(Libraries.SystemNative, EntryPoint = "SystemNative_ReadFromNonblocking", SetLastError = true)]
- internal static unsafe partial int ReadFromNonblocking(SafeHandle fd, byte* buffer, int count);
}
}
diff --git a/src/libraries/Common/src/Interop/Unix/System.Native/Interop.SocketEvent.cs b/src/libraries/Common/src/Interop/Unix/System.Native/Interop.SocketEvent.cs
deleted file mode 100644
index ef14ce83a11713..00000000000000
--- a/src/libraries/Common/src/Interop/Unix/System.Native/Interop.SocketEvent.cs
+++ /dev/null
@@ -1,51 +0,0 @@
-// Licensed to the .NET Foundation under one or more agreements.
-// The .NET Foundation licenses this file to you under the MIT license.
-
-using System;
-using System.Runtime.InteropServices;
-
-internal static partial class Interop
-{
- internal static partial class Sys
- {
- [Flags]
- internal enum SocketEvents : int
- {
- None = 0x00,
- Read = 0x01,
- Write = 0x02,
- ReadClose = 0x04,
- Close = 0x08,
- Error = 0x10
- }
-
- [StructLayout(LayoutKind.Sequential)]
- internal struct SocketEvent
- {
- public IntPtr Data;
- public SocketEvents Events;
- private int _padding;
- }
-
- [LibraryImport(Libraries.SystemNative, EntryPoint = "SystemNative_CreateSocketEventPort")]
- internal static unsafe partial Error CreateSocketEventPort(IntPtr* port);
-
- [LibraryImport(Libraries.SystemNative, EntryPoint = "SystemNative_CloseSocketEventPort")]
- internal static partial Error CloseSocketEventPort(IntPtr port);
-
- [LibraryImport(Libraries.SystemNative, EntryPoint = "SystemNative_CreateSocketEventBuffer")]
- internal static unsafe partial Error CreateSocketEventBuffer(int count, SocketEvent** buffer);
-
- [LibraryImport(Libraries.SystemNative, EntryPoint = "SystemNative_FreeSocketEventBuffer")]
- internal static unsafe partial Error FreeSocketEventBuffer(SocketEvent* buffer);
-
- [LibraryImport(Libraries.SystemNative, EntryPoint = "SystemNative_TryChangeSocketEventRegistration")]
- internal static partial Error TryChangeSocketEventRegistration(IntPtr port, SafeHandle socket, SocketEvents currentEvents, SocketEvents newEvents, IntPtr data);
-
- [LibraryImport(Libraries.SystemNative, EntryPoint = "SystemNative_TryChangeSocketEventRegistration")]
- internal static partial Error TryChangeSocketEventRegistration(IntPtr port, IntPtr socket, SocketEvents currentEvents, SocketEvents newEvents, IntPtr data);
-
- [LibraryImport(Libraries.SystemNative, EntryPoint = "SystemNative_WaitForSocketEvents")]
- internal static unsafe partial Error WaitForSocketEvents(IntPtr port, SocketEvent* buffer, int* count);
- }
-}
diff --git a/src/libraries/Common/src/Interop/Unix/System.Native/Interop.Write.Pipe.cs b/src/libraries/Common/src/Interop/Unix/System.Native/Interop.Write.Pipe.cs
deleted file mode 100644
index bd192c24b8edf2..00000000000000
--- a/src/libraries/Common/src/Interop/Unix/System.Native/Interop.Write.Pipe.cs
+++ /dev/null
@@ -1,23 +0,0 @@
-// Licensed to the .NET Foundation under one or more agreements.
-// The .NET Foundation licenses this file to you under the MIT license.
-
-using System.Runtime.InteropServices;
-using Microsoft.Win32.SafeHandles;
-
-internal static partial class Interop
-{
- internal static partial class Sys
- {
- ///
- /// Writes the specified buffer to the provided open file descriptor
- ///
- /// The file descriptor to try and write to
- /// The data to attempt to write
- /// The amount of data to write, in bytes
- ///
- /// Returns the number of bytes written on success; otherwise, returns -1 and sets errno
- ///
- [LibraryImport(Libraries.SystemNative, EntryPoint = "SystemNative_Write", SetLastError = true)]
- internal static unsafe partial int Write(SafePipeHandle fd, byte* buffer, int bufferSize);
- }
-}
diff --git a/src/libraries/Common/src/Interop/Unix/System.Native/Interop.Write.cs b/src/libraries/Common/src/Interop/Unix/System.Native/Interop.Write.cs
index b5bad8429e1937..e6dd4a900837a9 100644
--- a/src/libraries/Common/src/Interop/Unix/System.Native/Interop.Write.cs
+++ b/src/libraries/Common/src/Interop/Unix/System.Native/Interop.Write.cs
@@ -23,8 +23,5 @@ internal static partial class Sys
[LibraryImport(Libraries.SystemNative, EntryPoint = "SystemNative_Write", SetLastError = true)]
internal static unsafe partial int Write(IntPtr fd, byte* buffer, int bufferSize);
-
- [LibraryImport(Libraries.SystemNative, EntryPoint = "SystemNative_WriteToNonblocking", SetLastError = true)]
- internal static unsafe partial int WriteToNonblocking(SafeHandle fd, byte* buffer, int bufferSize);
}
}
diff --git a/src/libraries/Common/src/Interop/Wasi/System.Native/Interop.SocketEvent.cs b/src/libraries/Common/src/Interop/Wasi/System.Native/Interop.HandleEvents.cs
similarity index 94%
rename from src/libraries/Common/src/Interop/Wasi/System.Native/Interop.SocketEvent.cs
rename to src/libraries/Common/src/Interop/Wasi/System.Native/Interop.HandleEvents.cs
index 2e8742bc4c7708..141ce7937c14a2 100644
--- a/src/libraries/Common/src/Interop/Wasi/System.Native/Interop.SocketEvent.cs
+++ b/src/libraries/Common/src/Interop/Wasi/System.Native/Interop.HandleEvents.cs
@@ -9,7 +9,7 @@ internal static partial class Interop
internal static partial class Sys
{
[Flags]
- internal enum SocketEvents : int
+ internal enum HandleEvents : int
{
None = 0x00,
Read = 0x01,
diff --git a/src/libraries/System.IO.Pipes/src/Microsoft/Win32/SafeHandles/SafePipeHandle.Unix.cs b/src/libraries/System.IO.Pipes/src/Microsoft/Win32/SafeHandles/SafePipeHandle.Unix.cs
index eea6ef6410a907..7412cd2f36347b 100644
--- a/src/libraries/System.IO.Pipes/src/Microsoft/Win32/SafeHandles/SafePipeHandle.Unix.cs
+++ b/src/libraries/System.IO.Pipes/src/Microsoft/Win32/SafeHandles/SafePipeHandle.Unix.cs
@@ -4,10 +4,10 @@
using System;
using System.Diagnostics;
using System.Net.Sockets;
-using System.Reflection;
using System.Runtime.InteropServices;
-using System.Security;
using System.Threading;
+using System.Threading.Tasks;
+using System.Threading.Tasks.Sources;
namespace Microsoft.Win32.SafeHandles
{
@@ -15,130 +15,523 @@ public sealed partial class SafePipeHandle : SafeHandleZeroOrMinusOneIsInvalid
{
private const int DefaultInvalidHandle = -1;
- // For anonymous pipes, SafePipeHandle.handle is the file descriptor of the pipe.
- // For named pipes, SafePipeHandle.handle is a copy of the file descriptor
- // extracted from the Socket's SafeHandle.
- // This allows operations related to file descriptors to be performed directly on the SafePipeHandle,
- // and operations that should go through the Socket to be done via PipeSocket. We keep the
- // Socket's SafeHandle alive as long as this SafeHandle is alive.
+ private NullableBool _isBlocking;
+ private PollableHandle? _pollHandle;
+ private ReadOperation? _cachedReadOp;
+ private WriteOperation? _cachedWriteOp;
- private Socket? _pipeSocket;
- private SafeHandle? _pipeSocketHandle;
- private volatile int _disposed;
+ private ReadOperation RentReadOperation()
+ => Interlocked.Exchange(ref _cachedReadOp, null) ?? new ReadOperation(this);
- internal SafePipeHandle(Socket namedPipeSocket) : base(ownsHandle: true)
+ private WriteOperation RentWriteOperation()
+ => Interlocked.Exchange(ref _cachedWriteOp, null) ?? new WriteOperation(this);
+
+ private void ReturnReadOperation(ReadOperation op)
+ {
+ op.Reset();
+ Volatile.Write(ref _cachedReadOp, op);
+ }
+
+ private void ReturnWriteOperation(WriteOperation op)
+ {
+ op.Reset();
+ Volatile.Write(ref _cachedWriteOp, op);
+ }
+
+ private bool IsBlocking
+ {
+ get
+ {
+ NullableBool isBlocking = _isBlocking;
+ if (isBlocking == NullableBool.Undefined)
+ {
+ if (Interop.Sys.Fcntl.GetIsNonBlocking(this, out bool nonBlocking) != 0)
+ {
+ throw Interop.GetExceptionForIoErrno(Interop.Sys.GetLastErrorInfo());
+ }
+
+ _isBlocking = isBlocking = nonBlocking ? NullableBool.False : NullableBool.True;
+ }
+
+ return isBlocking == NullableBool.True;
+ }
+ }
+
+ private void SetHandleNonBlocking()
+ {
+ if (_isBlocking != NullableBool.False)
+ {
+ if (Interop.Sys.Fcntl.SetIsNonBlocking(this, 1) != 0)
+ {
+ throw Interop.GetExceptionForIoErrno(Interop.Sys.GetLastErrorInfo());
+ }
+ _isBlocking = NullableBool.False;
+ }
+ }
+
+ private PollableHandle PollHandle
{
- SetPipeSocketInterlocked(namedPipeSocket, ownsHandle: true);
- base.SetHandle(_pipeSocketHandle!.DangerousGetHandle());
+ get
+ {
+ if (_pollHandle == null)
+ {
+ SetHandleNonBlocking();
+ PollableHandle.Create(this, ref _pollHandle);
+ }
+ return _pollHandle!;
+ }
}
- internal Socket PipeSocket => _pipeSocket ?? CreatePipeSocket();
+ internal SafePipeHandle(Socket namedPipeSocket) : base(ownsHandle: true)
+ {
+ Debug.Assert(namedPipeSocket != null);
- internal SafeHandle? PipeSocketHandle => _pipeSocketHandle;
+ _isBlocking = namedPipeSocket.Blocking ? NullableBool.True : NullableBool.False;
+
+ // Transfer ownership of the file descriptor from the Socket to this SafeHandle.
+ SafeHandle socketHandle = namedPipeSocket.SafeHandle;
+ base.SetHandle(socketHandle.DangerousGetHandle());
+ socketHandle.SetHandleAsInvalid();
+ namedPipeSocket.Dispose();
+ }
protected override void Dispose(bool disposing)
{
- base.Dispose(disposing); // must be called before trying to Dispose the socket
- _disposed = 1;
- if (disposing && Volatile.Read(ref _pipeSocket) is Socket socket)
+ if (disposing)
{
- socket.Dispose();
- _pipeSocket = null;
+ _pollHandle?.Dispose();
}
+ base.Dispose(disposing);
}
protected override bool ReleaseHandle()
{
- Debug.Assert(!IsInvalid);
+ return (long)handle >= 0 && Interop.Sys.Close(handle) == 0;
+ }
+
+ public override bool IsInvalid
+ {
+ get { return (long)handle < 0; }
+ }
+
+ internal new void SetHandle(IntPtr descriptor)
+ {
+ base.SetHandle(descriptor);
+ }
+
+ // Named pipes on Unix are implemented using Unix domain sockets.
+ // Returns 0 for non-socket handles (getsockopt returns ENOTSOCK).
+ internal unsafe int GetSocketBufferSize(SocketOptionName optionName)
+ {
+ int value;
+ int optLen = sizeof(int);
+ Interop.Error error = Interop.Sys.GetSockOpt(this, SocketOptionLevel.Socket, optionName, (byte*)&value, &optLen);
+ return error == Interop.Error.SUCCESS ? value : 0;
+ }
- if (_pipeSocketHandle != null)
+ internal unsafe (int BytesRead, Interop.ErrorInfo ErrorInfo) Read(Span buffer)
+ {
+ int sequenceNumber = 0;
+ bool isBlocking = IsBlocking;
+
+ bool doSync = isBlocking || PollHandle.IsReadReady(out sequenceNumber);
+ if (doSync)
{
- base.SetHandle((IntPtr)DefaultInvalidHandle);
- _pipeSocketHandle.DangerousRelease();
- _pipeSocketHandle = null;
- return true;
+ if (TryCompleteRead(buffer, out var result, out bool pending))
+ {
+ return result;
+ }
+ if (isBlocking)
+ {
+ // The handle changed to non-blocking due to a concurrent operation.
+ Debug.Assert(pending);
+ if (PollHandle.IsReadReady(out sequenceNumber) && TryCompleteRead(buffer, out result, out _))
+ {
+ return result;
+ }
+ }
}
- else
+
+ ReadOperation op = RentReadOperation();
+ fixed (byte* bufPtr = &MemoryMarshal.GetReference(buffer))
{
- return (long)handle >= 0 ?
- Interop.Sys.Close(handle) == 0 :
- true;
+ op.Init(bufPtr, buffer.Length);
+
+ PollOperationSyncResult result = PollHandle.ReadSync(op, sequenceNumber, timeout: -1);
+
+ if (result == PollOperationSyncResult.Completed)
+ {
+ var readResult = op.Result;
+
+ ReturnReadOperation(op);
+
+ return readResult;
+ }
+
+ return (-1, new Interop.ErrorInfo(Interop.Error.ECANCELED));
}
}
- public override bool IsInvalid
+ internal ValueTask<(int BytesRead, Interop.ErrorInfo ErrorInfo)> ReadAsync(Memory destination, CancellationToken cancellationToken)
{
- get { return (long)handle < 0 && _pipeSocket == null; }
+ if (PollHandle.IsReadReady(out int sequenceNumber) &&
+ TryCompleteRead(destination.Span, out var readResult, out _))
+ {
+ return new ValueTask<(int, Interop.ErrorInfo)>(readResult);
+ }
+
+ ReadOperation op = RentReadOperation();
+ op.Init(destination, cancellationToken);
+
+ PollOperationAsyncResult result = PollHandle.ReadAsync(op, sequenceNumber, cancellationToken);
+
+ if (result == PollOperationAsyncResult.Pending)
+ {
+ return new ValueTask<(int, Interop.ErrorInfo)>(op, op.Version);
+ }
+ else if (result == PollOperationAsyncResult.Completed)
+ {
+ readResult = op.Result;
+ ReturnReadOperation(op);
+ return new ValueTask<(int, Interop.ErrorInfo)>(readResult);
+ }
+
+ return new ValueTask<(int, Interop.ErrorInfo)>((-1, new Interop.ErrorInfo(Interop.Error.ECANCELED)));
}
- private Socket CreatePipeSocket(bool ownsHandle = true)
+ internal unsafe Interop.ErrorInfo Write(ReadOnlySpan buffer)
{
- Socket? socket = null;
- if (_disposed == 0)
+ int sequenceNumber = 0;
+ bool isBlocking = IsBlocking;
+
+ bool doSync = isBlocking || PollHandle.IsWriteReady(out sequenceNumber);
+ while (doSync)
+ {
+ if (TryCompleteWrite(buffer, out int bytesWritten, out Interop.ErrorInfo errorInfo, out bool pending))
+ {
+ return errorInfo;
+ }
+ buffer = buffer.Slice(bytesWritten);
+ if (isBlocking && pending)
+ {
+ // The handle changed to non-blocking due to a concurrent operation.
+ isBlocking = false;
+ doSync = PollHandle.IsWriteReady(out sequenceNumber);
+ }
+ else
+ {
+ // There are bytes remaining for a blocking write.
+ Debug.Assert(buffer.Length != 0);
+ doSync = isBlocking;
+ }
+ }
+
+ WriteOperation op = RentWriteOperation();
+ fixed (byte* bufPtr = &MemoryMarshal.GetReference(buffer))
+ {
+ op.Init(bufPtr, buffer.Length);
+
+ PollOperationSyncResult result = PollHandle.WriteSync(op, sequenceNumber, timeout: -1);
+
+ if (result == PollOperationSyncResult.Completed)
+ {
+ Interop.ErrorInfo errorInfo = op.WriteResult;
+
+ ReturnWriteOperation(op);
+
+ return errorInfo;
+ }
+
+ return new Interop.ErrorInfo(Interop.Error.ECANCELED);
+ }
+ }
+
+ internal ValueTask WriteAsync(ReadOnlyMemory source, CancellationToken cancellationToken)
+ {
+ int bytesWritten = 0;
+ if (PollHandle.IsWriteReady(out int sequenceNumber) &&
+ TryCompleteWrite(source.Span, out bytesWritten, out Interop.ErrorInfo writeResult, out _))
+ {
+ return new ValueTask(writeResult);
+ }
+
+ WriteOperation op = RentWriteOperation();
+ op.Init(source.Slice(bytesWritten), cancellationToken);
+
+ PollOperationAsyncResult result = PollHandle.WriteAsync(op, sequenceNumber, cancellationToken);
+
+ if (result == PollOperationAsyncResult.Pending)
+ {
+ return new ValueTask(op, op.Version);
+ }
+ else if (result == PollOperationAsyncResult.Completed)
{
- bool refAdded = false;
+ writeResult = op.WriteResult;
+ ReturnWriteOperation(op);
+ return new ValueTask(writeResult);
+ }
+
+ return new ValueTask(new Interop.ErrorInfo(Interop.Error.ECANCELED));
+ }
+
+ private sealed unsafe class ReadOperation : PollTriggeredOperation, IValueTaskSource<(int BytesRead, Interop.ErrorInfo ErrorInfo)>
+ {
+ private readonly SafePipeHandle _owner;
+ internal (int BytesRead, Interop.ErrorInfo ErrorInfo) Result;
+ private ManualResetValueTaskSourceCore<(int, Interop.ErrorInfo)> _mrvtsc;
+ private Memory _buffer;
+ private byte* _syncBuffer;
+ private int _syncBufferLength;
+ private CancellationToken _cancellationToken;
+
+ internal ReadOperation(SafePipeHandle owner)
+ => _owner = owner;
+
+ internal short Version
+ => _mrvtsc.Version;
+
+ internal void Init(byte* syncBuffer, int syncBufferLength)
+ {
+ _syncBuffer = syncBuffer;
+ _syncBufferLength = syncBufferLength;
+ }
+
+ internal void Init(Memory buffer, CancellationToken cancellationToken)
+ {
+ _buffer = buffer;
+ _cancellationToken = cancellationToken;
+ }
+
+ internal void Reset()
+ {
+ _buffer = default;
+ _syncBuffer = null;
+ _cancellationToken = default;
+ _mrvtsc.Reset();
+ }
+
+ protected override bool TryCompleteOperation(SafeHandle handle)
+ {
+ if (_syncBuffer != null)
+ {
+ Debug.Assert(_syncBufferLength > 0);
+ return _owner.TryCompleteRead(_syncBuffer, _syncBufferLength, out Result, out _);
+ }
+
+ Span span = _buffer.Span;
+ Debug.Assert(!span.IsEmpty);
+
+ fixed (byte* bufPtr = &MemoryMarshal.GetReference(span))
+ {
+ return _owner.TryCompleteRead(bufPtr, span.Length, out Result, out _);
+ }
+ }
+
+ protected override void OnCompleted(PollOperationOnCompletedResult result)
+ {
+ if (result == PollOperationOnCompletedResult.Completed)
+ {
+ _mrvtsc.SetResult(Result);
+ }
+ else if (result == PollOperationOnCompletedResult.Canceled)
+ {
+ _mrvtsc.SetException(new OperationCanceledException(_cancellationToken));
+ }
+ else
+ {
+ Debug.Assert(result == PollOperationOnCompletedResult.Aborted);
+ _mrvtsc.SetException(new OperationCanceledException());
+ }
+ }
+
+ ValueTaskSourceStatus IValueTaskSource<(int BytesRead, Interop.ErrorInfo ErrorInfo)>.GetStatus(short token)
+ => _mrvtsc.GetStatus(token);
+
+ void IValueTaskSource<(int BytesRead, Interop.ErrorInfo ErrorInfo)>.OnCompleted(Action