Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 6 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,6 @@ There are two main APIs:
The compress/decompress APIs mirror that of lz4, while the streaming API was
designed to be a drop-in replacement for zlib.

## Current status

There is currently a potential issue with the streaming `Writer` interface. See https://github.com/DataDog/zstd/issues/22 for more details.
If you intend to use the streaming interface instead of `Compress` method, it is currently recommended to use [release 1.3.0](https://github.com/DataDog/zstd/releases/tag/v1.3.0) or earlier.
If you are able to contribute or reproduce the issue, please let us know!

### Simple `Compress/Decompress`


Expand Down Expand Up @@ -79,8 +73,6 @@ NewReader(r io.Reader) io.ReadCloser
NewReaderDict(r io.Reader, dict []byte) io.ReadCloser
```

See section above `Current Status`

### Benchmarks (benchmarked with v0.5.0)

The author of Zstd also wrote lz4. Zstd is intended to occupy a speed/ratio
Expand Down Expand Up @@ -124,3 +116,9 @@ Testing with size: 45951... czlib: 201.62 MB/s, zstd: 1951.57 MB/s
```

zstd starts to shine with payloads > 1KB

### Stability - Current state: STABLE

The C library seems to be pretty stable and according to the author has been tested and fuzzed.

For the Go wrapper, the test cover most usual cases and we have succesfully tested it on all staging and prod data.
5 changes: 5 additions & 0 deletions helpers_test.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
package zstd

/*
From https://github.com/dustin/randbo
All credits for the code below goes there :) (There wasn't a license so I'm distributing as is)
*/

import (
"io"
"math/rand"
Expand Down
93 changes: 67 additions & 26 deletions zstd_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@ package zstd

/*
#define ZSTD_STATIC_LINKING_ONLY
#define ZBUFF_DISABLE_DEPRECATE_WARNINGS
#include "stdint.h" // for uintptr_t
#include "zstd.h"
#include "zbuff.h"

typedef struct compressStream2_result_s {
size_t return_code;
Expand All @@ -32,6 +30,23 @@ static void ZSTD_compressStream2_finish(compressStream2_result* result, ZSTD_CCt
result->bytes_consumed = inBuffer.pos;
result->bytes_written = outBuffer.pos;
}

// decompressStream2_result is the same as compressStream2_result, but keep 2 separate struct for easier changes
typedef struct decompressStream2_result_s {
size_t return_code;
size_t bytes_consumed;
size_t bytes_written;
} decompressStream2_result;

static void ZSTD_decompressStream_wrapper(decompressStream2_result* result, ZSTD_DCtx* ctx, uintptr_t dst, size_t maxDstSize, const uintptr_t src, size_t srcSize) {
ZSTD_outBuffer outBuffer = { (void*)dst, maxDstSize, 0 };
ZSTD_inBuffer inBuffer = { (void*)src, srcSize, 0 };
size_t retCode = ZSTD_decompressStream(ctx, &outBuffer, &inBuffer);

result->return_code = retCode;
result->bytes_consumed = inBuffer.pos;
result->bytes_written = outBuffer.pos;
}
*/
import "C"
import (
Expand All @@ -44,6 +59,7 @@ import (
)

var errShortRead = errors.New("short read")
var errReaderClosed = errors.New("Reader is closed")

// Writer is an io.WriteCloser that zstd-compresses its input.
type Writer struct {
Expand Down Expand Up @@ -231,19 +247,19 @@ func (w *Writer) Close() error {
// cSize is the recommended size of reader.compressionBuffer. This func and
// invocation allow for a one-time check for validity.
var cSize = func() int {
v := int(C.ZBUFF_recommendedDInSize())
v := int(C.ZSTD_DStreamInSize())
if v <= 0 {
panic(fmt.Errorf("ZBUFF_recommendedDInSize() returned invalid size: %v", v))
panic(fmt.Errorf("ZSTD_DStreamInSize() returned invalid size: %v", v))
}
return v
}()

// dSize is the recommended size of reader.decompressionBuffer. This func and
// invocation allow for a one-time check for validity.
var dSize = func() int {
v := int(C.ZBUFF_recommendedDOutSize())
v := int(C.ZSTD_DStreamOutSize())
if v <= 0 {
panic(fmt.Errorf("ZBUFF_recommendedDOutSize() returned invalid size: %v", v))
panic(fmt.Errorf("ZSTD_DStreamOutSize() returned invalid size: %v", v))
}
return v
}()
Expand Down Expand Up @@ -272,7 +288,7 @@ var dPool = sync.Pool{

// reader is an io.ReadCloser that decompresses when read from.
type reader struct {
ctx *C.ZBUFF_DCtx
ctx *C.ZSTD_DCtx
compressionBuffer []byte
compressionLeft int
decompressionBuffer []byte
Expand All @@ -281,6 +297,7 @@ type reader struct {
dict []byte
firstError error
recommendedSrcSize int
resultBuffer *C.decompressStream2_result
underlyingReader io.Reader
}

Expand All @@ -296,14 +313,18 @@ func NewReader(r io.Reader) io.ReadCloser {
// ignores the dictionary if it is nil.
func NewReaderDict(r io.Reader, dict []byte) io.ReadCloser {
var err error
ctx := C.ZBUFF_createDCtx()
ctx := C.ZSTD_createDStream()
if len(dict) == 0 {
err = getError(int(C.ZBUFF_decompressInit(ctx)))
err = getError(int(C.ZSTD_initDStream(ctx)))
} else {
err = getError(int(C.ZBUFF_decompressInitDictionary(
ctx,
unsafe.Pointer(&dict[0]),
C.size_t(len(dict)))))
err = getError(int(C.ZSTD_DCtx_reset(ctx, C.ZSTD_reset_session_only)))
if err == nil {
// Only load dictionary if we succesfully inited the context
err = getError(int(C.ZSTD_DCtx_loadDictionary(
ctx,
unsafe.Pointer(&dict[0]),
C.size_t(len(dict)))))
}
}
compressionBufferP := cPool.Get().(*[]byte)
decompressionBufferP := dPool.Get().(*[]byte)
Expand All @@ -314,20 +335,33 @@ func NewReaderDict(r io.Reader, dict []byte) io.ReadCloser {
decompressionBuffer: *decompressionBufferP,
firstError: err,
recommendedSrcSize: cSize,
resultBuffer: new(C.decompressStream2_result),
underlyingReader: r,
}
}

// Close frees the allocated C objects
func (r *reader) Close() error {
if r.firstError != nil {
return r.firstError
}

cb := r.compressionBuffer
db := r.decompressionBuffer
// Ensure that we won't resuse buffer
r.firstError = errReaderClosed
r.compressionBuffer = nil
r.decompressionBuffer = nil

cPool.Put(&cb)
dPool.Put(&db)
return getError(int(C.ZBUFF_freeDCtx(r.ctx)))
return getError(int(C.ZSTD_freeDStream(r.ctx)))
}

func (r *reader) Read(p []byte) (int, error) {
if r.firstError != nil {
return 0, r.firstError
}

// If we already have enough bytes, return
if r.decompSize-r.decompOff >= len(p) {
Expand All @@ -354,28 +388,35 @@ func (r *reader) Read(p []byte) (int, error) {
src = src[:r.compressionLeft+n]

// C code
cSrcSize := C.size_t(len(src))
cDstSize := C.size_t(len(r.decompressionBuffer))
retCode := int(C.ZBUFF_decompressContinue(
srcPtr := C.uintptr_t(uintptr(0)) // Do not point anywhere, if src is empty
if len(src) > 0 {
srcPtr = C.uintptr_t(uintptr(unsafe.Pointer(&src[0])))
}

C.ZSTD_decompressStream_wrapper(
r.resultBuffer,
r.ctx,
unsafe.Pointer(&r.decompressionBuffer[0]),
&cDstSize,
unsafe.Pointer(&src[0]),
&cSrcSize))
C.uintptr_t(uintptr(unsafe.Pointer(&r.decompressionBuffer[0]))),
C.size_t(len(r.decompressionBuffer)),
srcPtr,
C.size_t(len(src)),
)
retCode := int(r.resultBuffer.return_code)

// Keep src here eventhough, we reuse later, the code might be deleted at some point
// Keep src here eventhough we reuse later, the code might be deleted at some point
runtime.KeepAlive(src)
if err = getError(retCode); err != nil {
return 0, fmt.Errorf("failed to decompress: %s", err)
}

// Put everything in buffer
if int(cSrcSize) < len(src) {
left := src[int(cSrcSize):]
bytesConsumed := int(r.resultBuffer.bytes_consumed)
if bytesConsumed < len(src) {
left := src[bytesConsumed:]
copy(r.compressionBuffer, left)
}
r.compressionLeft = len(src) - int(cSrcSize)
r.decompSize = int(cDstSize)
r.compressionLeft = len(src) - int(bytesConsumed)
r.decompSize = int(r.resultBuffer.bytes_written)
r.decompOff = copy(p[got:], r.decompressionBuffer[:r.decompSize])
got += r.decompOff

Expand Down
36 changes: 36 additions & 0 deletions zstd_stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,42 @@ func TestStreamCompressionChunks(t *testing.T) {
}
}

func TestStreamDecompressionChunks(t *testing.T) {
MB := 1024 * 1024
totalSize := 100 * MB
chunk := 1 * MB

rawData := make([]byte, totalSize)
r := NewRandBytes()
r.Read(rawData)

compressed, _ := Compress(nil, rawData)
streamDecompressed := bytes.NewReader(compressed)
reader := NewReader(streamDecompressed)

result := make([]byte, 0, totalSize)
for {
chunkBytes := make([]byte, chunk)
n, err := reader.Read(chunkBytes)
if err != nil && err != io.EOF {
t.Fatalf("Got an error while reading: %s", err)
}
result = append(result, chunkBytes[:n]...)
if err == io.EOF {
break
}
}

err := reader.Close()
if err != nil {
t.Fatalf("Failed to close writer: %s", err)
}

if !bytes.Equal(rawData, result) {
t.Fatalf("Decompression data is not equal to original data")
}
}

func BenchmarkStreamCompression(b *testing.B) {
if raw == nil {
b.Fatal(ErrNoPayloadEnv)
Expand Down