< Summary

Information
Class: Renci.SshNet.Sftp.SftpFileReader
Assembly: Renci.SshNet
File(s): \home\appveyor\projects\ssh-net\src\Renci.SshNet\Sftp\SftpFileReader.cs
Line coverage
91%
Covered lines: 228
Uncovered lines: 22
Coverable lines: 250
Total lines: 478
Line coverage: 91.2%
Branch coverage
80%
Covered branches: 37
Total branches: 46
Branch coverage: 80.4%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Cyclomatic complexity Line coverage
.ctor(...)100%1100%
Read()76.66%2870.31%
Finalize()100%1100%
Dispose()100%1100%
Dispose(...)87.5%8100%
StartReadAhead()100%197.36%
ContinueReadAhead()75%491.66%
ReadCompleted(...)100%2100%
ReadCompletedCore(...)100%2100%
HandleFailure(...)100%1100%
get_ChunkIndex()100%1100%
get_Data()100%1100%
get_Offset()100%1100%
.ctor(...)100%1100%
Complete(...)100%1100%

File(s)

\home\appveyor\projects\ssh-net\src\Renci.SshNet\Sftp\SftpFileReader.cs

#LineLine coverage
 1using System;
 2using System.Collections.Generic;
 3using System.Globalization;
 4using System.Runtime.ExceptionServices;
 5using System.Threading;
 6
 7using Renci.SshNet.Abstractions;
 8using Renci.SshNet.Common;
 9
 10namespace Renci.SshNet.Sftp
 11{
 12    internal sealed class SftpFileReader : ISftpFileReader
 13    {
 14        private const int ReadAheadWaitTimeoutInMilliseconds = 1000;
 15
 16        private readonly byte[] _handle;
 17        private readonly ISftpSession _sftpSession;
 18        private readonly uint _chunkSize;
 19        private readonly SemaphoreLight _semaphore;
 20        private readonly object _readLock;
 21        private readonly ManualResetEvent _disposingWaitHandle;
 22        private readonly ManualResetEvent _readAheadCompleted;
 23        private readonly Dictionary<int, BufferedRead> _queue;
 24        private readonly WaitHandle[] _waitHandles;
 25
 26        /// <summary>
 27        /// Holds the size of the file, when available.
 28        /// </summary>
 29        private readonly long? _fileSize;
 30
 31        private ulong _offset;
 32        private int _readAheadChunkIndex;
 33        private ulong _readAheadOffset;
 34        private int _nextChunkIndex;
 35
 36        /// <summary>
 37        /// Holds a value indicating whether EOF has already been signaled by the SSH server.
 38        /// </summary>
 39        private bool _endOfFileReceived;
 40
 41        /// <summary>
 42        /// Holds a value indicating whether the client has read up to the end of the file.
 43        /// </summary>
 44        private bool _isEndOfFileRead;
 45
 46        private bool _disposingOrDisposed;
 47
 48        private Exception _exception;
 49
 50        /// <summary>
 51        /// Initializes a new instance of the <see cref="SftpFileReader"/> class with the specified handle,
 52        /// <see cref="ISftpSession"/> and the maximum number of pending reads.
 53        /// </summary>
 54        /// <param name="handle">The file handle.</param>
 55        /// <param name="sftpSession">The SFT session.</param>
 56        /// <param name="chunkSize">The size of a individual read-ahead chunk.</param>
 57        /// <param name="maxPendingReads">The maximum number of pending reads.</param>
 58        /// <param name="fileSize">The size of the file, if known; otherwise, <see langword="null"/>.</param>
 22059        public SftpFileReader(byte[] handle, ISftpSession sftpSession, uint chunkSize, int maxPendingReads, long? fileSi
 22060        {
 22061            _handle = handle;
 22062            _sftpSession = sftpSession;
 22063            _chunkSize = chunkSize;
 22064            _fileSize = fileSize;
 22065            _semaphore = new SemaphoreLight(maxPendingReads);
 22066            _queue = new Dictionary<int, BufferedRead>(maxPendingReads);
 22067            _readLock = new object();
 22068            _readAheadCompleted = new ManualResetEvent(initialState: false);
 22069            _disposingWaitHandle = new ManualResetEvent(initialState: false);
 22070            _waitHandles = _sftpSession.CreateWaitHandleArray(_disposingWaitHandle, _semaphore.AvailableWaitHandle);
 71
 22072            StartReadAhead();
 22073        }
 74
 75        public byte[] Read()
 424876        {
 77#if NET7_0_OR_GREATER
 406378            ObjectDisposedException.ThrowIf(_disposingOrDisposed, this);
 79#else
 18580            if (_disposingOrDisposed)
 481            {
 482                throw new ObjectDisposedException(GetType().FullName);
 83            }
 84#endif // NET7_0_OR_GREATER
 85
 423686            if (_exception is not null)
 2487            {
 2488                ExceptionDispatchInfo.Capture(_exception).Throw();
 089            }
 90
 421291            if (_isEndOfFileRead)
 1292            {
 1293                throw new SshException("Attempting to read beyond the end of the file.");
 94            }
 95
 96            BufferedRead nextChunk;
 97
 420098            lock (_readLock)
 420099            {
 100                // wait until either the next chunk is available, an exception has occurred or the current
 101                // instance is already disposed
 7996102                while (!_queue.TryGetValue(_nextChunkIndex, out nextChunk) && _exception is null)
 3796103                {
 3796104                    _ =Monitor.Wait(_readLock);
 3796105                }
 106
 107                // throw when exception occured in read-ahead, or the current instance is already disposed
 4200108                if (_exception != null)
 60109                {
 60110                    ExceptionDispatchInfo.Capture(_exception).Throw();
 0111                }
 112
 4140113                var data = nextChunk.Data;
 114
 4140115                if (nextChunk.Offset == _offset)
 4003116                {
 117                    // have we reached EOF?
 4003118                    if (data.Length == 0)
 65119                    {
 120                        // PERF: we do not bother updating all of the internal state when we've reached EOF
 65121                        _isEndOfFileRead = true;
 65122                    }
 123                    else
 3938124                    {
 125                        // remove processed chunk
 3938126                        _ = _queue.Remove(_nextChunkIndex);
 127
 128                        // update offset
 3938129                        _offset += (ulong) data.Length;
 130
 131                        // move to next chunk
 3938132                        _nextChunkIndex++;
 3938133                    }
 134
 135                    // unblock wait in read-ahead
 4003136                    _ = _semaphore.Release();
 137
 4003138                    return data;
 139                }
 140
 141                // When we received an EOF for the next chunk and the size of the file is known, then
 142                // we only complete the current chunk if we haven't already read up to the file size.
 143                // This way we save an extra round-trip to the server.
 137144                if (data.Length == 0 && _fileSize.HasValue && _offset == (ulong) _fileSize.Value)
 59145                {
 146                    // avoid future reads
 59147                    _isEndOfFileRead = true;
 148
 149                    // unblock wait in read-ahead
 59150                    _ = _semaphore.Release();
 151
 152                    // signal EOF to caller
 59153                    return nextChunk.Data;
 154                }
 78155            }
 156
 157            /*
 158             * When the server returned less bytes than requested (for the previous chunk)
 159             * we'll synchronously request the remaining data.
 160             *
 161             * Due to the optimization above, we'll only get here in one of the following cases:
 162             * - an EOF situation for files for which we were unable to obtain the file size
 163             * - fewer bytes that requested were returned
 164             *
 165             * According to the SSH specification, this last case should never happen for normal
 166             * disk files (but can happen for device files). In practice, OpenSSH - for example -
 167             * returns less bytes than requested when requesting more than 64 KB.
 168             *
 169             * Important:
 170             * To avoid a deadlock, this read must be done outside of the read lock.
 171             */
 172
 78173            var bytesToCatchUp = nextChunk.Offset - _offset;
 174
 175            /*
 176             * TODO: break loop and interrupt blocking wait in case of exception
 177             */
 178
 78179            var read = _sftpSession.RequestRead(_handle, _offset, (uint) bytesToCatchUp);
 78180            if (read.Length == 0)
 0181            {
 182                // process data in read lock to avoid ObjectDisposedException while releasing semaphore
 0183                lock (_readLock)
 0184                {
 185                    // a zero-length (EOF) response is only valid for the read-back when EOF has
 186                    // been signaled for the next read-ahead chunk
 0187                    if (nextChunk.Data.Length == 0)
 0188                    {
 0189                        _isEndOfFileRead = true;
 190
 191                        // ensure we've not yet disposed the current instance
 0192                        if (!_disposingOrDisposed)
 0193                        {
 194                            // unblock wait in read-ahead
 0195                            _ = _semaphore.Release();
 0196                        }
 197
 198                        // signal EOF to caller
 0199                        return read;
 200                    }
 201
 202                    // move reader to error state
 0203                    _exception = new SshException("Unexpectedly reached end of file.");
 204
 205                    // ensure we've not yet disposed the current instance
 0206                    if (!_disposingOrDisposed)
 0207                    {
 208                        // unblock wait in read-ahead
 0209                        _ = _semaphore.Release();
 0210                    }
 211
 212                    // notify caller of error
 0213                    throw _exception;
 214                }
 215            }
 216
 78217            _offset += (uint) read.Length;
 218
 78219            return read;
 4140220        }
 221
 222        ~SftpFileReader()
 210223        {
 105224            Dispose(disposing: false);
 210225        }
 226
 227        public void Dispose()
 118228        {
 118229            Dispose(disposing: true);
 118230            GC.SuppressFinalize(this);
 118231        }
 232
 233        /// <summary>
 234        /// Releases unmanaged and - optionally - managed resources.
 235        /// </summary>
 236        /// <param name="disposing"><see langword="true"/> to release both managed and unmanaged resources; <see langwor
 237        private void Dispose(bool disposing)
 223238        {
 223239            if (_disposingOrDisposed)
 3240            {
 3241                return;
 242            }
 243
 244            // transition to disposing state
 220245            _disposingOrDisposed = true;
 246
 220247            if (disposing)
 115248            {
 249                // record exception to break prevent future Read()
 115250                _exception = new ObjectDisposedException(GetType().FullName);
 251
 252                // signal that we're disposing to interrupt wait in read-ahead
 115253                _ = _disposingWaitHandle.Set();
 254
 255                // wait until the read-ahead thread has completed
 115256                _ = _readAheadCompleted.WaitOne();
 257
 258                // unblock the Read()
 115259                lock (_readLock)
 115260                {
 261                    // dispose semaphore in read lock to ensure we don't run into an ObjectDisposedException
 262                    // in Read()
 115263                    _semaphore.Dispose();
 264
 265                    // awake Read
 115266                    Monitor.PulseAll(_readLock);
 115267                }
 268
 115269                _readAheadCompleted.Dispose();
 115270                _disposingWaitHandle.Dispose();
 271
 115272                if (_sftpSession.IsOpen)
 106273                {
 274                    try
 106275                    {
 106276                        var closeAsyncResult = _sftpSession.BeginClose(_handle, callback: null, state: null);
 97277                        _sftpSession.EndClose(closeAsyncResult);
 88278                    }
 18279                    catch (Exception ex)
 18280                    {
 18281                        DiagnosticAbstraction.Log("Failure closing handle: " + ex);
 18282                    }
 106283                }
 115284            }
 223285        }
 286
 287        private void StartReadAhead()
 220288        {
 220289            ThreadAbstraction.ExecuteThread(() =>
 220290            {
 4384291                while (!_endOfFileReceived && _exception is null)
 4235292                {
 220293                    // check if we should continue with the read-ahead loop
 220294                    // note that the EOF and exception check are not included
 220295                    // in this check as they do not require Read() to be
 220296                    // unblocked (or have already done this)
 4235297                    if (!ContinueReadAhead())
 33298                    {
 220299                        // unblock the Read()
 33300                        lock (_readLock)
 33301                        {
 33302                            Monitor.PulseAll(_readLock);
 33303                        }
 220304
 220305                        // break the read-ahead loop
 33306                        break;
 220307                    }
 220308
 220309                    // attempt to obtain the semaphore; this may time out when all semaphores are
 220310                    // in use due to pending read-aheads (which in turn can happen when the server
 220311                    // is slow to respond or when the session is broken)
 4202312                    if (!_semaphore.Wait(ReadAheadWaitTimeoutInMilliseconds))
 0313                    {
 220314                        // re-evaluate whether an exception occurred, and - if not - wait again
 0315                        continue;
 220316                    }
 220317
 220318                    // don't bother reading any more chunks if we received EOF, an exception has occurred
 220319                    // or the current instance is disposed
 4202320                    if (_endOfFileReceived || _exception != null)
 29321                    {
 29322                        break;
 220323                    }
 220324
 220325                    // start reading next chunk
 4173326                    var bufferedRead = new BufferedRead(_readAheadChunkIndex, _readAheadOffset);
 220327
 220328                    try
 4173329                    {
 220330                        // even if we know the size of the file and have read up to EOF, we still want
 220331                        // to keep reading (ahead) until we receive zero bytes from the remote host as
 220332                        // we do not want to rely purely on the reported file size
 220333                        //
 220334                        // if the offset of the read-ahead chunk is greater than that file size, then
 220335                        // we can expect to be reading the last (zero-byte) chunk and switch to synchronous
 220336                        // mode to avoid having multiple read-aheads that read beyond EOF
 4173337                        if (_fileSize != null && (long) _readAheadOffset > _fileSize.Value)
 59338                        {
 59339                            var asyncResult = _sftpSession.BeginRead(_handle, _readAheadOffset, _chunkSize, callback: nu
 59340                            var data = _sftpSession.EndRead(asyncResult);
 59341                            ReadCompletedCore(bufferedRead, data);
 59342                        }
 220343                        else
 4114344                        {
 4114345                            _ = _sftpSession.BeginRead(_handle, _readAheadOffset, _chunkSize, ReadCompleted, bufferedRea
 4105346                        }
 4164347                    }
 9348                    catch (Exception ex)
 9349                    {
 9350                        HandleFailure(ex);
 9351                        break;
 220352                    }
 220353
 220354                    // advance read-ahead offset
 4164355                    _readAheadOffset += _chunkSize;
 220356
 220357                    // increment index of read-ahead chunk
 4164358                    _readAheadChunkIndex++;
 4164359                }
 220360
 220361                _ = _readAheadCompleted.Set();
 440362            });
 220363        }
 364
 365        /// <summary>
 366        /// Returns a value indicating whether the read-ahead loop should be continued.
 367        /// </summary>
 368        /// <returns>
 369        /// <see langword="true"/> if the read-ahead loop should be continued; otherwise, <see langword="false"/>.
 370        /// </returns>
 371        private bool ContinueReadAhead()
 4235372        {
 373            try
 4235374            {
 4235375                var waitResult = _sftpSession.WaitAny(_waitHandles, _sftpSession.OperationTimeout);
 4217376                switch (waitResult)
 377                {
 378                    case 0: // disposing
 15379                        return false;
 380                    case 1: // semaphore available
 4202381                        return true;
 382                    default:
 0383                        throw new NotImplementedException(string.Format(CultureInfo.InvariantCulture, "WaitAny return va
 384                }
 385            }
 18386            catch (Exception ex)
 18387            {
 18388                _ = Interlocked.CompareExchange(ref _exception, ex, comparand: null);
 18389                return false;
 390            }
 4235391        }
 392
 393        private void ReadCompleted(IAsyncResult result)
 4066394        {
 4066395            if (_disposingOrDisposed)
 12396            {
 397                // skip further processing if we're disposing the current instance
 398                // to avoid accessing disposed members
 12399                return;
 400            }
 401
 4054402            var readAsyncResult = (SftpReadAsyncResult) result;
 403
 404            byte[] data;
 405
 406            try
 4054407            {
 4054408                data = readAsyncResult.EndInvoke();
 4027409            }
 27410            catch (Exception ex)
 27411            {
 27412                HandleFailure(ex);
 27413                return;
 414            }
 415
 416            // a read that completes with a zero-byte result signals EOF
 417            // but there may be pending reads before that read
 4027418            var bufferedRead = (BufferedRead) readAsyncResult.AsyncState;
 4027419            ReadCompletedCore(bufferedRead, data);
 4066420        }
 421
 422        private void ReadCompletedCore(BufferedRead bufferedRead, byte[] data)
 4086423        {
 4086424            bufferedRead.Complete(data);
 425
 4086426            lock (_readLock)
 4086427            {
 428                // add item to queue
 4086429                _queue.Add(bufferedRead.ChunkIndex, bufferedRead);
 430
 431                // Signal that a chunk has been read or EOF has been reached.
 432                // In both cases, Read() will eventually also unblock the "read-ahead" thread.
 4086433                Monitor.PulseAll(_readLock);
 4086434            }
 435
 436            // check if server signaled EOF
 4086437            if (data.Length == 0)
 124438            {
 439                // set a flag to stop read-aheads
 124440                _endOfFileReceived = true;
 124441            }
 4086442        }
 443
 444        private void HandleFailure(Exception cause)
 36445        {
 36446            _ = Interlocked.CompareExchange(ref _exception, cause, comparand: null);
 447
 448            // unblock read-ahead
 36449            _ = _semaphore.Release();
 450
 451            // unblock Read()
 36452            lock (_readLock)
 36453            {
 36454                Monitor.PulseAll(_readLock);
 36455            }
 36456        }
 457
 458        internal sealed class BufferedRead
 459        {
 4086460            public int ChunkIndex { get; }
 461
 8285462            public byte[] Data { get; private set; }
 463
 4218464            public ulong Offset { get; }
 465
 4173466            public BufferedRead(int chunkIndex, ulong offset)
 4173467            {
 4173468                ChunkIndex = chunkIndex;
 4173469                Offset = offset;
 4173470            }
 471
 472            public void Complete(byte[] data)
 4086473            {
 4086474                Data = data;
 4086475            }
 476        }
 477    }
 478}