< Summary

Information
Class: Renci.SshNet.Common.PipeStream
Assembly: Renci.SshNet
File(s): \home\appveyor\projects\ssh-net\src\Renci.SshNet\Common\PipeStream.cs
Line coverage
75%
Covered lines: 106
Uncovered lines: 34
Coverable lines: 140
Total lines: 425
Line coverage: 75.7%
Branch coverage
72%
Covered branches: 45
Total branches: 62
Branch coverage: 72.5%
Method coverage

Feature is only available for sponsors

Upgrade to PRO version

Metrics

MethodBranch coverage Cyclomatic complexity Line coverage
.ctor()100%1100%
get_MaxBufferLength()100%1100%
get_BlockLastReadBuffer()100%1100%
set_BlockLastReadBuffer(...)0%20%
Flush()100%1100%
Seek(...)100%1100%
SetLength(...)100%1100%
Read(...)69.23%2668.42%
ReadAvailable(...)100%6100%
Write(...)68.75%1473.33%
Dispose(...)100%2100%
get_CanRead()100%1100%
get_CanSeek()100%1100%
get_CanWrite()100%1100%
get_Length()100%1100%
get_Position()100%1100%
set_Position(...)100%1100%
CreateObjectDisposedException()100%1100%

File(s)

\home\appveyor\projects\ssh-net\src\Renci.SshNet\Common\PipeStream.cs

#LineLine coverage
 1using System;
 2using System.Collections.Generic;
 3using System.Globalization;
 4using System.IO;
 5using System.Threading;
 6
 7namespace Renci.SshNet.Common
 8{
 9    /// <summary>
 10    /// PipeStream is a thread-safe read/write data stream for use between two threads in a
 11    /// single-producer/single-consumer type problem.
 12    /// </summary>
 13    /// <license>
 14    /// Copyright (c) 2006 James Kolpack (james dot kolpack at google mail)
 15    ///
 16    /// Permission is hereby granted, free of charge, to any person obtaining a copy of this software and
 17    /// associated documentation files (the "Software"), to deal in the Software without restriction,
 18    /// including without limitation the rights to use, copy, modify, merge, publish, distribute,
 19    /// sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is
 20    /// furnished to do so, subject to the following conditions:
 21    ///
 22    /// The above copyright notice and this permission notice shall be included in all copies or
 23    /// substantial portions of the Software.
 24    ///
 25    /// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED,
 26    /// INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR
 27    /// PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
 28    /// LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT
 29    /// OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
 30    /// OTHER DEALINGS IN THE SOFTWARE.
 31    /// </license>
 32    public class PipeStream : Stream
 33    {
 34        /// <summary>
 35        /// Queue of bytes provides the datastructure for transmitting from an
 36        /// input stream to an output stream.
 37        /// </summary>
 38        /// <remarks>Possible more effecient ways to accomplish this.</remarks>
 214639        private readonly Queue<byte> _buffer = new Queue<byte>();
 40
 41        /// <summary>
 42        /// Indicates that the input stream has been flushed and that
 43        /// all remaining data should be written to the output stream.
 44        /// </summary>
 45        private bool _isFlushed;
 46
 47        /// <summary>
 48        /// Setting this to true will cause Read() to block if it appears
 49        /// that it will run out of data.
 50        /// </summary>
 51        private bool _canBlockLastRead;
 52
 53        /// <summary>
 54        /// Indicates whether the current <see cref="PipeStream"/> is disposed.
 55        /// </summary>
 56        private bool _isDisposed;
 57
 58        /// <summary>
 59        /// Gets or sets the maximum number of bytes to store in the buffer.
 60        /// </summary>
 61        /// <value>The length of the max buffer.</value>
 541562        public long MaxBufferLength { get; set; } = 200 * 1024 * 1024;
 63
 64        /// <summary>
 65        /// Gets or sets a value indicating whether to block last read method before the buffer is empty.
 66        /// When true, Read() will block until it can fill the passed in buffer and count.
 67        /// When false, Read() will not block, returning all the available buffer data.
 68        /// </summary>
 69        /// <remarks>
 70        /// Setting to true will remove the possibility of ending a stream reader prematurely.
 71        /// </remarks>
 72        /// <value>
 73        /// <see langword="true"/> if block last read method before the buffer is empty; otherwise, <see langword="false
 74        /// </value>
 75        /// <exception cref="ObjectDisposedException">Methods were called after the stream was closed.</exception>
 76        public bool BlockLastReadBuffer
 77        {
 78            get
 1848579            {
 80#if NET7_0_OR_GREATER
 1841581                ObjectDisposedException.ThrowIf(_isDisposed, this);
 82#else
 7083                if (_isDisposed)
 484                {
 485                    throw CreateObjectDisposedException();
 86                }
 87#endif // NET7_0_OR_GREATER
 88
 1847389                return _canBlockLastRead;
 1847390            }
 91            set
 092            {
 93#if NET7_0_OR_GREATER
 094                ObjectDisposedException.ThrowIf(_isDisposed, this);
 95#else
 096                if (_isDisposed)
 097                {
 098                    throw CreateObjectDisposedException();
 99                }
 100#endif // NET7_0_OR_GREATER
 101
 0102                _canBlockLastRead = value;
 103
 104                // when turning off the block last read, signal Read() that it may now read the rest of the buffer.
 0105                if (!_canBlockLastRead)
 0106                {
 0107                    lock (_buffer)
 0108                    {
 0109                        Monitor.Pulse(_buffer);
 0110                    }
 0111                }
 0112            }
 113        }
 114
 115        /// <summary>
 116        /// When overridden in a derived class, clears all buffers for this stream and causes any buffered data to be wr
 117        /// </summary>
 118        /// <exception cref="IOException">An I/O error occurs.</exception>
 119        /// <exception cref="ObjectDisposedException">Methods were called after the stream was closed.</exception>
 120        /// <remarks>
 121        /// Once flushed, any subsequent read operations no longer block until requested bytes are available. Any write 
 122        /// reads.
 123        /// </remarks>
 124        public override void Flush()
 1983125        {
 126#if NET7_0_OR_GREATER
 1919127            ObjectDisposedException.ThrowIf(_isDisposed, this);
 128#else
 64129            if (_isDisposed)
 1130            {
 1131                throw CreateObjectDisposedException();
 132            }
 133#endif // NET7_0_OR_GREATER
 134
 1980135            _isFlushed = true;
 1980136            lock (_buffer)
 1980137            {
 138                // unblock read hereby allowing buffer to be partially filled
 1980139                Monitor.Pulse(_buffer);
 1980140            }
 1980141        }
 142
 143        /// <summary>
 144        /// When overridden in a derived class, sets the position within the current stream.
 145        /// </summary>
 146        /// <returns>
 147        /// The new position within the current stream.
 148        /// </returns>
 149        /// <param name="offset">A byte offset relative to the origin parameter.</param>
 150        /// <param name="origin">A value of type <see cref="SeekOrigin"/> indicating the reference point used to obtain 
 151        /// <exception cref="NotSupportedException">The stream does not support seeking, such as if the stream is constr
 152        public override long Seek(long offset, SeekOrigin origin)
 6153        {
 6154            throw new NotSupportedException();
 155        }
 156
 157        /// <summary>
 158        /// When overridden in a derived class, sets the length of the current stream.
 159        /// </summary>
 160        /// <param name="value">The desired length of the current stream in bytes.</param>
 161        /// <exception cref="NotSupportedException">The stream does not support both writing and seeking, such as if the
 162        public override void SetLength(long value)
 6163        {
 6164            throw new NotSupportedException();
 165        }
 166
 167        /// <summary>
 168        /// When overridden in a derived class, reads a sequence of bytes from the current stream and advances the posit
 169        /// </summary>
 170        /// <returns>
 171        /// The total number of bytes read into the buffer. This can be less than the number of bytes requested if that 
 172        /// </returns>
 173        /// <param name="buffer">An array of bytes. When this method returns, the buffer contains the specified byte arr
 174        /// <param name="offset">The zero-based byte offset in buffer at which to begin storing the data read from the c
 175        /// <param name="count">The maximum number of bytes to be read from the current stream.</param>
 176        /// <exception cref="ArgumentException">The sum of offset and count is larger than the buffer length.</exception
 177        /// <exception cref="ObjectDisposedException">Methods were called after the stream was closed.</exception>
 178        /// <exception cref="NotSupportedException">The stream does not support reading.</exception>
 179        /// <exception cref="ArgumentNullException"><paramref name="buffer"/> is <see langword="null"/>.</exception>
 180        /// <exception cref="IOException">An I/O error occurs.</exception>
 181        /// <exception cref="ArgumentOutOfRangeException">offset or count is negative.</exception>
 182        public override int Read(byte[] buffer, int offset, int count)
 13514183        {
 13514184            if (offset != 0)
 0185            {
 0186                throw new NotSupportedException("Offsets with value of non-zero are not supported");
 187            }
 188
 13514189            if (buffer is null)
 0190            {
 0191                throw new ArgumentNullException(nameof(buffer));
 192            }
 193
 13514194            if (offset + count > buffer.Length)
 0195            {
 0196                throw new ArgumentException("The sum of offset and count is greater than the buffer length.");
 197            }
 198
 13514199            if (offset < 0 || count < 0)
 0200            {
 0201                throw new ArgumentOutOfRangeException(nameof(offset), "offset or count is negative.");
 202            }
 203
 13514204            if (BlockLastReadBuffer && count >= MaxBufferLength)
 0205            {
 0206                throw new ArgumentException(string.Format(CultureInfo.CurrentCulture, "count({0}) > mMaxBufferLength({1}
 207            }
 208
 209#if NET7_0_OR_GREATER
 13462210            ObjectDisposedException.ThrowIf(_isDisposed, this);
 211#else
 40212            if (_isDisposed)
 0213            {
 0214                throw CreateObjectDisposedException();
 215            }
 216#endif // NET7_0_OR_GREATER
 217
 13502218            if (count == 0)
 4219            {
 4220                return 0;
 221            }
 222
 13498223            var readLength = 0;
 224
 13498225            lock (_buffer)
 13498226            {
 14871227                while (!_isDisposed && !ReadAvailable(count))
 1374228                {
 1374229                    _ = Monitor.Wait(_buffer);
 1373230                }
 231
 232                // return zero when the read is interrupted by a close/dispose of the stream
 13497233                if (_isDisposed)
 6234                {
 6235                    return 0;
 236                }
 237
 238                // fill the read buffer
 109187111239                for (; readLength < count && _buffer.Count > 0; readLength++)
 54586810240                {
 54586810241                    buffer[readLength] = _buffer.Dequeue();
 54586810242                }
 243
 13491244                Monitor.Pulse(_buffer);
 13491245            }
 246
 13491247            return readLength;
 13501248        }
 249
 250        /// <summary>
 251        /// Returns a value indicating whether data is available.
 252        /// </summary>
 253        /// <param name="count">The count.</param>
 254        /// <returns>
 255        /// <see langword="true"/> if data is available; otherwise, <see langword="false"/>.
 256        /// </returns>
 257        private bool ReadAvailable(int count)
 14865258        {
 14865259            var length = Length;
 14865260            return (_isFlushed || length >= count) && (length >= (count + 1) || !BlockLastReadBuffer);
 14865261        }
 262
 263        /// <summary>
 264        /// Writes a sequence of bytes to the current stream and advances the current position within this stream by the
 265        /// </summary>
 266        /// <param name="buffer">An array of bytes. This method copies count bytes from buffer to the current stream.</p
 267        /// <param name="offset">The zero-based byte offset in buffer at which to begin copying bytes to the current str
 268        /// <param name="count">The number of bytes to be written to the current stream.</param>
 269        /// <exception cref="IOException">An I/O error occurs.</exception>
 270        /// <exception cref="NotSupportedException">The stream does not support writing.</exception>
 271        /// <exception cref="ObjectDisposedException">Methods were called after the stream was closed.</exception>
 272        /// <exception cref="ArgumentNullException"><paramref name="buffer"/> is <see langword="null"/>.</exception>
 273        /// <exception cref="ArgumentException">The sum of offset and count is greater than the buffer length.</exceptio
 274        /// <exception cref="ArgumentOutOfRangeException">offset or count is negative.</exception>
 275        public override void Write(byte[] buffer, int offset, int count)
 3251276        {
 3251277            if (buffer is null)
 0278            {
 0279                throw new ArgumentNullException(nameof(buffer));
 280            }
 281
 3251282            if (offset + count > buffer.Length)
 0283            {
 0284                throw new ArgumentException("The sum of offset and count is greater than the buffer length.");
 285            }
 286
 3251287            if (offset < 0 || count < 0)
 0288            {
 0289                throw new ArgumentOutOfRangeException(nameof(offset), "offset or count is negative.");
 290            }
 291
 292#if NET7_0_OR_GREATER
 3149293            ObjectDisposedException.ThrowIf(_isDisposed, this);
 294#else
 102295            if (_isDisposed)
 2296            {
 2297                throw CreateObjectDisposedException();
 298            }
 299#endif // NET7_0_OR_GREATER
 300
 3245301            if (count == 0)
 0302            {
 0303                return;
 304            }
 305
 3245306            lock (_buffer)
 3245307            {
 308                // wait until the buffer isn't full
 3251309                while (Length >= MaxBufferLength)
 6310                {
 6311                    _ = Monitor.Wait(_buffer);
 6312                }
 313
 3239314                _isFlushed = false; // if it were flushed before, it soon will not be.
 315
 316                // queue up the buffer data
 109187150317                for (var i = offset; i < offset + count; i++)
 54590336318                {
 54590336319                    _buffer.Enqueue(buffer[i]);
 54590336320                }
 321
 3239322                Monitor.Pulse(_buffer); // signal that write has occurred
 3239323            }
 3239324        }
 325
 326        /// <summary>
 327        /// Releases the unmanaged resources used by the Stream and optionally releases the managed resources.
 328        /// </summary>
 329        /// <param name="disposing"><see langword="true"/> to release both managed and unmanaged resources; <see langwor
 330        /// <remarks>
 331        /// Disposing a <see cref="PipeStream"/> will interrupt blocking read and write operations.
 332        /// </remarks>
 333        protected override void Dispose(bool disposing)
 608334        {
 608335            base.Dispose(disposing);
 336
 608337            if (!_isDisposed)
 608338            {
 608339                lock (_buffer)
 608340                {
 608341                    _isDisposed = true;
 608342                    Monitor.Pulse(_buffer);
 608343                }
 608344            }
 608345        }
 346
 347        /// <summary>
 348        /// Gets a value indicating whether the current stream supports reading.
 349        /// </summary>
 350        /// <returns>
 351        /// true if the stream supports reading; otherwise, false.
 352        /// </returns>
 353        public override bool CanRead
 354        {
 1119355            get { return !_isDisposed; }
 356        }
 357
 358        /// <summary>
 359        /// Gets a value indicating whether the current stream supports seeking.
 360        /// </summary>
 361        /// <returns>
 362        /// <see langword="true"/> if the stream supports seeking; otherwise, <see langword="false"/>.
 363        /// </returns>
 364        public override bool CanSeek
 365        {
 9366            get { return false; }
 367        }
 368
 369        /// <summary>
 370        /// Gets a value indicating whether the current stream supports writing.
 371        /// </summary>
 372        /// <returns>
 373        /// <see langword="true"/> if the stream supports writing; otherwise, <see langword="false"/>.
 374        /// </returns>
 375        public override bool CanWrite
 376        {
 9377            get { return !_isDisposed; }
 378        }
 379
 380        /// <summary>
 381        /// Gets the length in bytes of the stream.
 382        /// </summary>
 383        /// <returns>
 384        /// A long value representing the length of the stream in bytes.
 385        /// </returns>
 386        /// <exception cref="NotSupportedException">A class derived from Stream does not support seeking.</exception>
 387        /// <exception cref="ObjectDisposedException">Methods were called after the stream was closed.</exception>
 388        public override long Length
 389        {
 390            get
 18941391            {
 392#if NET7_0_OR_GREATER
 18768393                ObjectDisposedException.ThrowIf(_isDisposed, this);
 394#else
 173395                if (_isDisposed)
 3396                {
 3397                    throw CreateObjectDisposedException();
 398                }
 399#endif // NET7_0_OR_GREATER
 400
 18932401                return _buffer.Count;
 18932402            }
 403        }
 404
 405        /// <summary>
 406        /// Gets or sets the position within the current stream.
 407        /// </summary>
 408        /// <returns>
 409        /// The current position within the stream.
 410        /// </returns>
 411        /// <exception cref="NotSupportedException">The stream does not support seeking.</exception>
 412        public override long Position
 413        {
 36414            get { return 0; }
 12415            set { throw new NotSupportedException(); }
 416        }
 417
 418#if !NET7_0_OR_GREATER
 419        private ObjectDisposedException CreateObjectDisposedException()
 10420        {
 10421            return new ObjectDisposedException(GetType().FullName);
 10422        }
 423#endif // !NET7_0_OR_GREATER
 424    }
 425}