diff --git a/src/libraries/System.IO.FileSystem/tests/FileStream/ReadAsync.cs b/src/libraries/System.IO.FileSystem/tests/FileStream/ReadAsync.cs index da429bbad21874..e5b8ecdfab992a 100644 --- a/src/libraries/System.IO.FileSystem/tests/FileStream/ReadAsync.cs +++ b/src/libraries/System.IO.FileSystem/tests/FileStream/ReadAsync.cs @@ -1,6 +1,8 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. +using System.Linq; +using System.Security.Cryptography; using System.Threading; using System.Threading.Tasks; using Xunit; @@ -92,6 +94,38 @@ public async Task ReadAsyncCanceledFile() } } } + + [ConditionalTheory(typeof(PlatformDetection), nameof(PlatformDetection.IsThreadingSupported))] + [InlineData(FileShare.None, FileOptions.Asynchronous)] // FileShare.None: exclusive access + [InlineData(FileShare.ReadWrite, FileOptions.Asynchronous)] // FileShare.ReadWrite: others can write to the file, the length can't be cached + [InlineData(FileShare.None, FileOptions.None)] + [InlineData(FileShare.ReadWrite, FileOptions.None)] + public async Task IncompleteReadCantSetPositionBeyondEndOfFile(FileShare fileShare, FileOptions options) + { + const int fileSize = 10_000; + string filePath = GetTestFilePath(); + byte[] content = RandomNumberGenerator.GetBytes(fileSize); + File.WriteAllBytes(filePath, content); + + byte[][] buffers = Enumerable.Repeat(Enumerable.Repeat(byte.MaxValue, fileSize * 2).ToArray(), 10).ToArray(); + + using (FileStream fs = new FileStream(filePath, FileMode.Open, FileAccess.Read, fileShare, bufferSize: 0, options)) + { + Task[] reads = buffers.Select(buffer => fs.ReadAsync(buffer, 0, buffer.Length)).ToArray(); + + // the reads were not awaited, it's an anti-pattern and Position can be (0, buffersLength) now: + Assert.InRange(fs.Position, 0, buffers.Sum(buffer => buffer.Length)); + + await Task.WhenAll(reads); + // but when they are finished, the first buffer should contain valid data: + Assert.Equal(fileSize, reads.First().Result); + AssertExtensions.SequenceEqual(content, buffers.First().AsSpan(0, fileSize)); + // and other reads should return 0: + Assert.All(reads.Skip(1), read => Assert.Equal(0, read.Result)); + // and the Position must be correct: + Assert.Equal(fileSize, fs.Position); + } + } } [ActiveIssue("https://github.com/dotnet/runtime/issues/34582", TestPlatforms.Windows, TargetFrameworkMonikers.Netcoreapp, TestRuntimes.Mono)] diff --git a/src/libraries/System.Private.CoreLib/src/Microsoft/Win32/SafeHandles/SafeFileHandle.OverlappedValueTaskSource.Windows.cs b/src/libraries/System.Private.CoreLib/src/Microsoft/Win32/SafeHandles/SafeFileHandle.OverlappedValueTaskSource.Windows.cs index 367184e9a455ea..26e17f1d161d82 100644 --- a/src/libraries/System.Private.CoreLib/src/Microsoft/Win32/SafeHandles/SafeFileHandle.OverlappedValueTaskSource.Windows.cs +++ b/src/libraries/System.Private.CoreLib/src/Microsoft/Win32/SafeHandles/SafeFileHandle.OverlappedValueTaskSource.Windows.cs @@ -5,6 +5,7 @@ using System.Buffers; using System.Diagnostics; using System.IO; +using System.IO.Strategies; using System.Threading; using System.Threading.Tasks.Sources; @@ -45,7 +46,9 @@ internal sealed unsafe class OverlappedValueTaskSource : IValueTaskSource, internal readonly PreAllocatedOverlapped _preallocatedOverlapped; internal readonly SafeFileHandle _fileHandle; + private AsyncWindowsFileStreamStrategy? _strategy; internal MemoryHandle _memoryHandle; + private int _bufferSize; internal ManualResetValueTaskSourceCore _source; // mutable struct; do not make this readonly private NativeOverlapped* _overlapped; private CancellationTokenRegistration _cancellationRegistration; @@ -74,9 +77,11 @@ internal static Exception GetIOError(int errorCode, string? path) ? ThrowHelper.CreateEndOfFileException() : Win32Marshal.GetExceptionForWin32Error(errorCode, path); - internal NativeOverlapped* PrepareForOperation(ReadOnlyMemory memory, long fileOffset) + internal NativeOverlapped* PrepareForOperation(ReadOnlyMemory memory, long fileOffset, AsyncWindowsFileStreamStrategy? strategy = null) { _result = 0; + _strategy = strategy; + _bufferSize = memory.Length; _memoryHandle = memory.Pin(); _overlapped = _fileHandle.ThreadPoolBinding!.AllocateNativeOverlapped(_preallocatedOverlapped); _overlapped->OffsetLow = (int)fileOffset; @@ -132,8 +137,9 @@ internal void RegisterForCancellation(CancellationToken cancellationToken) } } - internal void ReleaseResources() + private void ReleaseResources() { + _strategy = null; // Unpin any pinned buffer. _memoryHandle.Dispose(); @@ -187,11 +193,19 @@ private static void IOCallback(uint errorCode, uint numBytes, NativeOverlapped* internal void Complete(uint errorCode, uint numBytes) { + Debug.Assert(errorCode == Interop.Errors.ERROR_SUCCESS || numBytes == 0, $"Callback returned {errorCode} error and {numBytes} bytes"); + + AsyncWindowsFileStreamStrategy? strategy = _strategy; ReleaseResources(); + if (strategy is not null && _bufferSize != numBytes) // true only for incomplete reads + { + strategy.OnIncompleteRead(_bufferSize, (int)numBytes); + } + switch (errorCode) { - case 0: + case Interop.Errors.ERROR_SUCCESS: case Interop.Errors.ERROR_BROKEN_PIPE: case Interop.Errors.ERROR_NO_DATA: case Interop.Errors.ERROR_HANDLE_EOF: // logically success with 0 bytes read (read at end of file) diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/FileStream.cs b/src/libraries/System.Private.CoreLib/src/System/IO/FileStream.cs index a2be26fa231030..6a94ad202fa190 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/FileStream.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/FileStream.cs @@ -280,9 +280,14 @@ public override Task ReadAsync(byte[] buffer, int offset, int count, Cancel { return Task.FromCanceled(cancellationToken); } - else if (_strategy.IsClosed) + else if (!_strategy.CanRead) { - ThrowHelper.ThrowObjectDisposedException_FileClosed(); + if (_strategy.IsClosed) + { + ThrowHelper.ThrowObjectDisposedException_FileClosed(); + } + + ThrowHelper.ThrowNotSupportedException_UnreadableStream(); } return _strategy.ReadAsync(buffer, offset, count, cancellationToken); @@ -294,9 +299,14 @@ public override ValueTask ReadAsync(Memory buffer, CancellationToken { return ValueTask.FromCanceled(cancellationToken); } - else if (_strategy.IsClosed) + else if (!_strategy.CanRead) { - ThrowHelper.ThrowObjectDisposedException_FileClosed(); + if (_strategy.IsClosed) + { + ThrowHelper.ThrowObjectDisposedException_FileClosed(); + } + + ThrowHelper.ThrowNotSupportedException_UnreadableStream(); } return _strategy.ReadAsync(buffer, cancellationToken); @@ -319,9 +329,14 @@ public override Task WriteAsync(byte[] buffer, int offset, int count, Cancellati { return Task.FromCanceled(cancellationToken); } - else if (_strategy.IsClosed) + else if (!_strategy.CanWrite) { - ThrowHelper.ThrowObjectDisposedException_FileClosed(); + if (_strategy.IsClosed) + { + ThrowHelper.ThrowObjectDisposedException_FileClosed(); + } + + ThrowHelper.ThrowNotSupportedException_UnwritableStream(); } return _strategy.WriteAsync(buffer, offset, count, cancellationToken); @@ -333,9 +348,14 @@ public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationTo { return ValueTask.FromCanceled(cancellationToken); } - else if (_strategy.IsClosed) + else if (!_strategy.CanWrite) { - ThrowHelper.ThrowObjectDisposedException_FileClosed(); + if (_strategy.IsClosed) + { + ThrowHelper.ThrowObjectDisposedException_FileClosed(); + } + + ThrowHelper.ThrowNotSupportedException_UnwritableStream(); } return _strategy.WriteAsync(buffer, cancellationToken); diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.Windows.cs b/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.Windows.cs index 8711ce23e9d2c6..2bb8d46706dccd 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.Windows.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/RandomAccess.Windows.cs @@ -242,21 +242,23 @@ internal static ValueTask ReadAtOffsetAsync(SafeFileHandle handle, Memory buffer, long fileOffset, CancellationToken cancellationToken) + internal static unsafe (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) QueueAsyncReadFile(SafeFileHandle handle, Memory buffer, long fileOffset, + CancellationToken cancellationToken, AsyncWindowsFileStreamStrategy? strategy = null) { handle.EnsureThreadPoolBindingInitialized(); SafeFileHandle.OverlappedValueTaskSource vts = handle.GetOverlappedValueTaskSource(); + int errorCode = 0; try { - NativeOverlapped* nativeOverlapped = vts.PrepareForOperation(buffer, fileOffset); + NativeOverlapped* nativeOverlapped = vts.PrepareForOperation(buffer, fileOffset, strategy); Debug.Assert(vts._memoryHandle.Pointer != null); // Queue an async ReadFile operation. if (Interop.Kernel32.ReadFile(handle, (byte*)vts._memoryHandle.Pointer, buffer.Length, IntPtr.Zero, nativeOverlapped) == 0) { // The operation failed, or it's pending. - int errorCode = FileStreamHelpers.GetLastWin32ErrorAndDisposeHandleIfInvalid(handle); + errorCode = FileStreamHelpers.GetLastWin32ErrorAndDisposeHandleIfInvalid(handle); switch (errorCode) { case Interop.Errors.ERROR_IO_PENDING: @@ -286,6 +288,13 @@ internal static unsafe (SafeFileHandle.OverlappedValueTaskSource? vts, int error vts.Dispose(); throw; } + finally + { + if (errorCode != Interop.Errors.ERROR_IO_PENDING && errorCode != Interop.Errors.ERROR_SUCCESS) + { + strategy?.OnIncompleteRead(buffer.Length, 0); + } + } // Completion handled by callback. vts.FinishedScheduling(); diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/AsyncWindowsFileStreamStrategy.cs b/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/AsyncWindowsFileStreamStrategy.cs index 913ce597be00a7..9bc49658bfa991 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/AsyncWindowsFileStreamStrategy.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/AsyncWindowsFileStreamStrategy.cs @@ -27,40 +27,29 @@ public override Task ReadAsync(byte[] buffer, int offset, int count, Cancel public override ValueTask ReadAsync(Memory destination, CancellationToken cancellationToken = default) => ReadAsyncInternal(destination, cancellationToken); - private unsafe ValueTask ReadAsyncInternal(Memory destination, CancellationToken cancellationToken) + private ValueTask ReadAsyncInternal(Memory destination, CancellationToken cancellationToken) { - if (!CanRead) + if (!CanSeek) { - ThrowHelper.ThrowNotSupportedException_UnreadableStream(); + return RandomAccess.ReadAtOffsetAsync(_fileHandle, destination, fileOffset: -1, cancellationToken); } - long positionBefore = _filePosition; - if (CanSeek) + if (LengthCachingSupported && _length >= 0 && Volatile.Read(ref _filePosition) >= _length) { - long len = Length; - if (positionBefore + destination.Length > len) - { - destination = positionBefore <= len ? - destination.Slice(0, (int)(len - positionBefore)) : - default; - } - - // When using overlapped IO, the OS is not supposed to - // touch the file pointer location at all. We will adjust it - // ourselves, but only in memory. This isn't threadsafe. - _filePosition += destination.Length; - - // We know for sure that there is nothing to read, so we just return here and avoid a sys-call. - if (destination.IsEmpty && LengthCachingSupported) - { - return ValueTask.FromResult(0); - } + // We know for sure that the file length can be safely cached and it has already been obtained. + // If we have reached EOF we just return here and avoid a sys-call. + return ValueTask.FromResult(0); } - (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) = RandomAccess.QueueAsyncReadFile(_fileHandle, destination, positionBefore, cancellationToken); + // This implementation updates the file position before the operation starts and updates it after incomplete read. + // This is done to keep backward compatibility for concurrent reads. + // It uses Interlocked as there can be multiple concurrent incomplete reads updating position at the same time. + long readOffset = Interlocked.Add(ref _filePosition, destination.Length) - destination.Length; + + (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) = RandomAccess.QueueAsyncReadFile(_fileHandle, destination, readOffset, cancellationToken, this); return vts != null ? new ValueTask(vts, vts.Version) - : (errorCode == 0) ? ValueTask.FromResult(0) : ValueTask.FromException(HandleIOError(positionBefore, errorCode)); + : (errorCode == 0) ? ValueTask.FromResult(0) : ValueTask.FromException(HandleIOError(readOffset, errorCode)); } public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) @@ -69,35 +58,22 @@ public override Task WriteAsync(byte[] buffer, int offset, int count, Cancellati public override ValueTask WriteAsync(ReadOnlyMemory buffer, CancellationToken cancellationToken = default) => WriteAsyncInternal(buffer, cancellationToken); - private unsafe ValueTask WriteAsyncInternal(ReadOnlyMemory source, CancellationToken cancellationToken) + private ValueTask WriteAsyncInternal(ReadOnlyMemory source, CancellationToken cancellationToken) { - if (!CanWrite) - { - ThrowHelper.ThrowNotSupportedException_UnwritableStream(); - } - - long positionBefore = _filePosition; - if (CanSeek) - { - // When using overlapped IO, the OS is not supposed to - // touch the file pointer location at all. We will adjust it - // ourselves, but only in memory. This isn't threadsafe. - _filePosition += source.Length; - UpdateLengthOnChangePosition(); - } + long writeOffset = CanSeek ? Interlocked.Add(ref _filePosition, source.Length) - source.Length : -1; - (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) = RandomAccess.QueueAsyncWriteFile(_fileHandle, source, positionBefore, cancellationToken); + (SafeFileHandle.OverlappedValueTaskSource? vts, int errorCode) = RandomAccess.QueueAsyncWriteFile(_fileHandle, source, writeOffset, cancellationToken); return vts != null ? new ValueTask(vts, vts.Version) - : (errorCode == 0) ? ValueTask.CompletedTask : ValueTask.FromException(HandleIOError(positionBefore, errorCode)); + : (errorCode == 0) ? ValueTask.CompletedTask : ValueTask.FromException(HandleIOError(writeOffset, errorCode)); } private Exception HandleIOError(long positionBefore, int errorCode) { - if (!_fileHandle.IsClosed && CanSeek) + if (_fileHandle.CanSeek) { // Update Position... it could be anywhere. - _filePosition = positionBefore; + Interlocked.Exchange(ref _filePosition, positionBefore); } return SafeFileHandle.OverlappedValueTaskSource.GetIOError(errorCode, _fileHandle.Path); diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/OSFileStreamStrategy.cs b/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/OSFileStreamStrategy.cs index 226765ff8f0ee5..2faf4c2e47ae8a 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/OSFileStreamStrategy.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/OSFileStreamStrategy.cs @@ -15,9 +15,9 @@ internal abstract class OSFileStreamStrategy : FileStreamStrategy private readonly FileAccess _access; // What file was opened for. protected long _filePosition; + protected long _length = -1; // negative means that hasn't been fetched. private long _appendStart; // When appending, prevent overwriting file. - private long _length = -1; // When the file is locked for writes on Windows ((share & FileShare.Write) == 0) cache file length in-memory, negative means that hasn't been fetched. - private bool _lengthCanBeCached; // SafeFileHandle hasn't been exposed and FileShare.Write was not specified when the handle was opened. + private bool _lengthCanBeCached; // SafeFileHandle hasn't been exposed, file has been opened for reading and not shared for writing. internal OSFileStreamStrategy(SafeFileHandle handle, FileAccess access) { @@ -44,7 +44,7 @@ internal OSFileStreamStrategy(string path, FileMode mode, FileAccess access, Fil string fullPath = Path.GetFullPath(path); _access = access; - _lengthCanBeCached = (share & FileShare.Write) == 0; + _lengthCanBeCached = (share & FileShare.Write) == 0 && (access & FileAccess.Write) == 0; _fileHandle = SafeFileHandle.Open(fullPath, mode, access, share, options, preallocationSize); @@ -96,21 +96,9 @@ public unsafe sealed override long Length } } - protected void UpdateLengthOnChangePosition() - { - // Do not update the cached length if the file is not locked - // or if the length hasn't been fetched. - if (!LengthCachingSupported || _length < 0) - { - Debug.Assert(_length < 0); - return; - } - - if (_filePosition > _length) - { - _length = _filePosition; - } - } + // in case of concurrent incomplete reads, there can be multiple threads trying to update the position + // at the same time. That is why we are using Interlocked here. + internal void OnIncompleteRead(int expectedBytesRead, int actualBytesRead) => Interlocked.Add(ref _filePosition, actualBytesRead - expectedBytesRead); protected bool LengthCachingSupported => OperatingSystem.IsWindows() && _lengthCanBeCached; @@ -287,18 +275,8 @@ public sealed override void Write(ReadOnlySpan buffer) ThrowHelper.ThrowNotSupportedException_UnwritableStream(); } - try - { - RandomAccess.WriteAtOffset(_fileHandle, buffer, _filePosition); - } - catch - { - _length = -1; // invalidate cached length - throw; - } - + RandomAccess.WriteAtOffset(_fileHandle, buffer, _filePosition); _filePosition += buffer.Length; - UpdateLengthOnChangePosition(); } } } diff --git a/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/UnixFileStreamStrategy.cs b/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/UnixFileStreamStrategy.cs index e9f00f254ad290..e1a64a6e123323 100644 --- a/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/UnixFileStreamStrategy.cs +++ b/src/libraries/System.Private.CoreLib/src/System/IO/Strategies/UnixFileStreamStrategy.cs @@ -1,6 +1,7 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. +using System.Diagnostics; using System.Threading; using System.Threading.Tasks; using System.Threading.Tasks.Sources; @@ -34,20 +35,16 @@ public override Task ReadAsync(byte[] buffer, int offset, int count, Cancel public override ValueTask ReadAsync(Memory destination, CancellationToken cancellationToken) { - if (!CanRead) + if (!CanSeek) { - ThrowHelper.ThrowNotSupportedException_UnreadableStream(); + return RandomAccess.ReadAtOffsetAsync(_fileHandle, destination, fileOffset: -1, cancellationToken); } - if (CanSeek) - { - // This implementation updates the file position after the operation completes, rather than before. - // Also, unlike the Net5CompatFileStreamStrategy implementation, this implementation doesn't serialize operations. - ReadAsyncTaskSource rats = Interlocked.Exchange(ref _readAsyncTaskSource, null) ?? new ReadAsyncTaskSource(this); - return rats.QueueRead(destination, cancellationToken); - } - - return RandomAccess.ReadAtOffsetAsync(_fileHandle, destination, fileOffset: -1, cancellationToken); + // This implementation updates the file position before the operation starts and updates it after incomplete read. + // Also, unlike the Net5CompatFileStreamStrategy implementation, this implementation doesn't serialize operations. + long readOffset = Interlocked.Add(ref _filePosition, destination.Length) - destination.Length; + ReadAsyncTaskSource rats = Interlocked.Exchange(ref _readAsyncTaskSource, null) ?? new ReadAsyncTaskSource(this); + return rats.QueueRead(destination, readOffset, cancellationToken); } public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback? callback, object? state) => @@ -61,19 +58,8 @@ public override Task WriteAsync(byte[] buffer, int offset, int count, Cancellati public override ValueTask WriteAsync(ReadOnlyMemory source, CancellationToken cancellationToken) { - if (!CanWrite) - { - ThrowHelper.ThrowNotSupportedException_UnwritableStream(); - } - - long filePositionBefore = -1; - if (CanSeek) - { - filePositionBefore = _filePosition; - _filePosition += source.Length; - } - - return RandomAccess.WriteAtOffsetAsync(_fileHandle, source, filePositionBefore, cancellationToken); + long writeOffset = CanSeek ? Interlocked.Add(ref _filePosition, source.Length) - source.Length : -1; + return RandomAccess.WriteAtOffsetAsync(_fileHandle, source, writeOffset, cancellationToken); } /// Provides a reusable ValueTask-backing object for implementing ReadAsync. @@ -83,14 +69,16 @@ private sealed class ReadAsyncTaskSource : IValueTaskSource, IThreadPoolWor private ManualResetValueTaskSourceCore _source; private Memory _destination; + private long _readOffset; private ExecutionContext? _context; private CancellationToken _cancellationToken; public ReadAsyncTaskSource(UnixFileStreamStrategy stream) => _stream = stream; - public ValueTask QueueRead(Memory destination, CancellationToken cancellationToken) + public ValueTask QueueRead(Memory destination, long readOffset, CancellationToken cancellationToken) { _destination = destination; + _readOffset = readOffset; _cancellationToken = cancellationToken; _context = ExecutionContext.Capture(); @@ -123,7 +111,7 @@ private void Read() } else { - result = _stream.Read(_destination.Span); + result = RandomAccess.ReadAtOffset(_stream._fileHandle, _destination.Span, _readOffset); } } catch (Exception e) @@ -132,7 +120,14 @@ private void Read() } finally { + // if the read was incomplete, we need to update the file position: + if (result != _destination.Length) + { + _stream.OnIncompleteRead(_destination.Length, result); + } + _destination = default; + _readOffset = -1; _cancellationToken = default; _context = null; }