From 7f86b1ef83472b988d8c18cf2091eff2ce6f9f90 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Mon, 5 Sep 2016 23:05:27 -0700 Subject: [PATCH 1/3] reduce memory usage in Read() --- zstd_stream.go | 67 ++++++++++++++++++++++++++++++-------------------- 1 file changed, 40 insertions(+), 27 deletions(-) diff --git a/zstd_stream.go b/zstd_stream.go index 0e2a76d..e1fde34 100644 --- a/zstd_stream.go +++ b/zstd_stream.go @@ -147,15 +147,13 @@ func (w *Writer) Close() error { type reader struct { ctx *C.ZBUFF_DCtx compressionBuffer []byte + compressionLeft int decompressionBuffer []byte dict []byte firstError error - // Reuse previous bytes from source that were not consumed - // Hopefully because we use recommended size, we will never need to use that - srcBuffer bytes.Buffer - dstBuffer bytes.Buffer - recommendedSrcSize int - underlyingReader io.Reader + dstBuffer bytes.Buffer + recommendedSrcSize int + underlyingReader io.Reader } // NewReader creates a new io.ReadCloser. Reads from the returned ReadCloser @@ -194,6 +192,7 @@ func NewReaderDict(r io.Reader, dict []byte) io.ReadCloser { ctx: ctx, dict: dict, compressionBuffer: compressionBuffer, + compressionLeft: 0, decompressionBuffer: decompressionBuffer, firstError: err, recommendedSrcSize: cSize, @@ -213,20 +212,25 @@ func (r *reader) Read(p []byte) (int, error) { return r.dstBuffer.Read(p) } - for r.dstBuffer.Len() < len(p) { + got := 0 + if r.dstBuffer.Len() > 0 { + n := r.dstBuffer.Len() + r.dstBuffer.Read(p[:n]) + r.dstBuffer.Reset() + got += n + } + + for got < len(p) { // Populate src src := r.compressionBuffer reader := r.underlyingReader - if r.srcBuffer.Len() != 0 { - reader = io.MultiReader(&r.srcBuffer, r.underlyingReader) - } - n, err := io.ReadFull(reader, src) - if err == io.EOF { - break - } else if err != nil && err != io.ErrUnexpectedEOF { + n, err := io.ReadFull(reader, src[r.compressionLeft:]) + if err == io.EOF && n == 0 && r.compressionLeft == 0 { + return got, io.EOF + } else if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { return 0, fmt.Errorf("failed to read from underlying reader: %s", err) } - src = src[:n] + src = src[:r.compressionLeft+n] // C code cSrcSize := C.size_t(len(src)) @@ -245,23 +249,32 @@ func (r *reader) Read(p []byte) (int, error) { // Put everything in buffer if int(cSrcSize) < len(src) { // We did not read everything, put in buffer toSave := src[int(cSrcSize):] - _, err = r.srcBuffer.Write(toSave) + copy(r.compressionBuffer, toSave) + r.compressionLeft = len(toSave) + } + toCopy := int(cDstSize) + if toCopy > len(p)-got { + toCopy = len(p) - got + } + copy(p[got:], r.decompressionBuffer[:toCopy]) + got += toCopy + if int(cDstSize) > toCopy { + _, err = r.dstBuffer.Write(r.decompressionBuffer[toCopy:int(cDstSize)]) if err != nil { - return 0, fmt.Errorf("failed to store temporary src buffer: %s", err) + return 0, fmt.Errorf("failed to store temporary result: %s", err) } } - _, err = r.dstBuffer.Write(r.decompressionBuffer[:int(cDstSize)]) - if err != nil { - return 0, fmt.Errorf("failed to store temporary result: %s", err) - } // Resize buffers - if retCode > 0 { // Hint for next src buffer size - r.compressionBuffer = resize(r.compressionBuffer, retCode) - } else { // Reset to recommended size - r.compressionBuffer = resize(r.compressionBuffer, r.recommendedSrcSize) + nsize := retCode // Hint for next src buffer size + if nsize <= 0 { + // Reset to recommended size + nsize = r.recommendedSrcSize + } + if nsize < r.compressionLeft { + nsize = r.compressionLeft } + r.compressionBuffer = resize(r.compressionBuffer, nsize) } - // Write to dst - return r.dstBuffer.Read(p) + return got, nil } From f48c28d9fbdad4074db3e60f68fd97cccca332f3 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Thu, 8 Sep 2016 22:52:58 -0700 Subject: [PATCH 2/3] remove dstBuffer --- zstd_stream.go | 49 +++++++++++++++++++------------------------------ 1 file changed, 19 insertions(+), 30 deletions(-) diff --git a/zstd_stream.go b/zstd_stream.go index e1fde34..29ba307 100644 --- a/zstd_stream.go +++ b/zstd_stream.go @@ -8,7 +8,6 @@ package zstd */ import "C" import ( - "bytes" "fmt" "io" "unsafe" @@ -149,9 +148,10 @@ type reader struct { compressionBuffer []byte compressionLeft int decompressionBuffer []byte + decompOff int + decompSize int dict []byte firstError error - dstBuffer bytes.Buffer recommendedSrcSize int underlyingReader io.Reader } @@ -192,7 +192,6 @@ func NewReaderDict(r io.Reader, dict []byte) io.ReadCloser { ctx: ctx, dict: dict, compressionBuffer: compressionBuffer, - compressionLeft: 0, decompressionBuffer: decompressionBuffer, firstError: err, recommendedSrcSize: cSize, @@ -208,27 +207,26 @@ func (r *reader) Close() error { func (r *reader) Read(p []byte) (int, error) { // If we already have enough bytes, return - if r.dstBuffer.Len() >= len(p) { - return r.dstBuffer.Read(p) + if r.decompSize-r.decompOff >= len(p) { + copy(p, r.decompressionBuffer[r.decompOff:]) + r.decompOff += len(p) + return len(p), nil } - got := 0 - if r.dstBuffer.Len() > 0 { - n := r.dstBuffer.Len() - r.dstBuffer.Read(p[:n]) - r.dstBuffer.Reset() - got += n - } + copy(p, r.decompressionBuffer[r.decompOff:r.decompSize]) + got := r.decompSize - r.decompOff + r.decompSize = 0 + r.decompOff = 0 for got < len(p) { // Populate src src := r.compressionBuffer reader := r.underlyingReader n, err := io.ReadFull(reader, src[r.compressionLeft:]) - if err == io.EOF && n == 0 && r.compressionLeft == 0 { + if err == io.EOF && r.compressionLeft == 0 { return got, io.EOF } else if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { - return 0, fmt.Errorf("failed to read from underlying reader: %s", err) + return got, fmt.Errorf("failed to read from underlying reader: %s", err) } src = src[:r.compressionLeft+n] @@ -247,23 +245,14 @@ func (r *reader) Read(p []byte) (int, error) { } // Put everything in buffer - if int(cSrcSize) < len(src) { // We did not read everything, put in buffer - toSave := src[int(cSrcSize):] - copy(r.compressionBuffer, toSave) - r.compressionLeft = len(toSave) - } - toCopy := int(cDstSize) - if toCopy > len(p)-got { - toCopy = len(p) - got - } - copy(p[got:], r.decompressionBuffer[:toCopy]) - got += toCopy - if int(cDstSize) > toCopy { - _, err = r.dstBuffer.Write(r.decompressionBuffer[toCopy:int(cDstSize)]) - if err != nil { - return 0, fmt.Errorf("failed to store temporary result: %s", err) - } + if int(cSrcSize) < len(src) { + left := src[int(cSrcSize):] + copy(r.compressionBuffer, left) } + r.compressionLeft = len(src) - int(cSrcSize) + r.decompSize = int(cDstSize) + r.decompOff = copy(p[got:], r.decompressionBuffer[:r.decompSize]) + got += r.decompOff // Resize buffers nsize := retCode // Hint for next src buffer size From 8581291ec09b178f100161e2a245c8aa0856f333 Mon Sep 17 00:00:00 2001 From: Davies Liu Date: Fri, 2 Feb 2018 18:10:27 -0800 Subject: [PATCH 3/3] Update zstd_stream.go --- zstd_stream.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/zstd_stream.go b/zstd_stream.go index 29ba307..28dfd3e 100644 --- a/zstd_stream.go +++ b/zstd_stream.go @@ -226,7 +226,7 @@ func (r *reader) Read(p []byte) (int, error) { if err == io.EOF && r.compressionLeft == 0 { return got, io.EOF } else if err != nil && err != io.EOF && err != io.ErrUnexpectedEOF { - return got, fmt.Errorf("failed to read from underlying reader: %s", err) + return 0, fmt.Errorf("failed to read from underlying reader: %s", err) } src = src[:r.compressionLeft+n]