Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
d78ba8a
Refactored fileio.c:
yoniko Jan 21, 2022
f16ed30
Bugfix - rename fileio_asycio to fileio_asyncio
yoniko Jan 21, 2022
d95ef84
Added copyrights & license to new files
yoniko Jan 22, 2022
6a36ac7
CR fixes
yoniko Jan 24, 2022
1bccf20
Compression asyncio:
yoniko Jan 24, 2022
4ef1ddd
CR fixes (naming conventions)
yoniko Jan 24, 2022
2255280
Reverted whitespace changes
yoniko Jan 22, 2022
7366119
CR fixes (naming conventions)
yoniko Jan 24, 2022
307a9cd
Merge remote-tracking branch 'origin/dev' into compression_asyncio_af…
yoniko Jan 24, 2022
2cbc104
Small cosmetic changes
yoniko Jan 24, 2022
512e5a3
CR fixes
yoniko Jan 25, 2022
f1715ef
CR fixes - disabled sparse write for compression
yoniko Jan 25, 2022
4cac292
Asyncio Compression: significantly reduced the number of memcpys
yoniko Jan 26, 2022
a5de446
Asyncio Compression: fixed write error messages
yoniko Jan 26, 2022
494d81f
Merge remote-tracking branch 'origin/dev' into compression_asyncio_af…
yoniko Jan 26, 2022
0f5f6ed
Asyncio Compression: CR fixes
yoniko Jan 26, 2022
53d68e0
Asyncio Compression: Changed asyncio to be default behaviour and adju…
yoniko Jan 26, 2022
00ecfcb
fix output line
yoniko Jan 26, 2022
99a0b5d
Asyncio compression: fix a bug in lz4 usage
yoniko Jan 27, 2022
c989e44
CR fixes
yoniko Jan 27, 2022
7007744
Bug fix
yoniko Jan 27, 2022
a74321f
addFrame now uses a specific level so it won't crash with lz4
yoniko Jan 29, 2022
cd20f44
remove tmplimit when tests done
yoniko Jan 31, 2022
5946fe5
turn off autoflush for lz4 to mitigate level 12 bug
yoniko Jan 31, 2022
9336a2b
Updated license for new files
yoniko Jan 31, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions programs/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,7 @@ Advanced arguments :
--filelist FILE : read list of files to operate upon from FILE
--output-dir-flat DIR : processed files are stored into DIR
--output-dir-mirror DIR : processed files are stored into DIR respecting original directory structure
--[no-]asyncio : use asynchronous IO (default: enabled)
--[no-]check : during compression, add XXH64 integrity checksum to frame (default: enabled). If specified with -d, decompressor will ignore/validate checksums in compressed frame (default: validate).
-- : All arguments after "--" are treated as files

Expand Down
425 changes: 196 additions & 229 deletions programs/fileio.c

Large diffs are not rendered by default.

353 changes: 305 additions & 48 deletions programs/fileio_asyncio.c

Large diffs are not rendered by default.

85 changes: 73 additions & 12 deletions programs/fileio_asyncio.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) Yann Collet, Facebook, Inc.
* Copyright (c) Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under both the BSD-style license (found in the
Expand Down Expand Up @@ -28,7 +28,7 @@ typedef struct {
/* These struct fields should be set only on creation and not changed afterwards */
POOL_ctx* threadPool;
int totalIoJobs;
FIO_prefs_t* prefs;
const FIO_prefs_t* prefs;
POOL_function poolFunction;

/* Controls the file we currently write to, make changes only by using provided utility functions */
Expand All @@ -39,8 +39,36 @@ typedef struct {
ZSTD_pthread_mutex_t ioJobsMutex;
void* availableJobs[MAX_IO_JOBS];
int availableJobsCount;
size_t jobBufferSize;
} IOPoolCtx_t;

typedef struct {
IOPoolCtx_t base;

/* State regarding the currently read file */
int reachedEof;
U64 nextReadOffset;
U64 waitingOnOffset;

/* We may hold an IOJob object as needed if we actively expose its buffer. */
void *currentJobHeld;

/* Coalesce buffer is used to join two buffers in case where we need to read more bytes than left in
* the first of them. Shouldn't be accessed from outside ot utility functions. */
U8 *coalesceBuffer;

/* Read buffer can be used by consumer code, take care when copying this pointer aside as it might
* change when consuming / refilling buffer. */
U8 *srcBuffer;
size_t srcBufferLoaded;

/* We need to know what tasks completed so we can use their buffers when their time comes.
* Should only be accessed after locking base.ioJobsMutex . */
void* completedJobs[MAX_IO_JOBS];
int completedJobsCount;
ZSTD_pthread_cond_t jobCompletedCond;
} ReadPoolCtx_t;

typedef struct {
IOPoolCtx_t base;
unsigned storedSkips;
Expand All @@ -59,15 +87,10 @@ typedef struct {
U64 offset;
} IOJob_t;

/** AIO_fwriteSparse() :
* @return : storedSkips,
* argument for next call to AIO_fwriteSparse() or AIO_fwriteSparseEnd() */
unsigned AIO_fwriteSparse(FILE* file,
const void* buffer, size_t bufferSize,
const FIO_prefs_t* const prefs,
unsigned storedSkips);
/* AIO_supported:
* Returns 1 if AsyncIO is supported on the system, 0 otherwise. */
int AIO_supported(void);

void AIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips);

/* AIO_WritePool_releaseIoJob:
* Releases an acquired job back to the pool. Doesn't execute the job. */
Expand Down Expand Up @@ -97,7 +120,7 @@ void AIO_WritePool_setFile(WritePoolCtx_t *ctx, FILE* file);

/* AIO_WritePool_getFile:
* Returns the file the writePool is currently set to write to. */
FILE* AIO_WritePool_getFile(WritePoolCtx_t *ctx);
FILE* AIO_WritePool_getFile(const WritePoolCtx_t* ctx);

/* AIO_WritePool_closeFile:
* Ends sparse write and closes the writePool's current file and sets the file to NULL.
Expand All @@ -107,12 +130,50 @@ int AIO_WritePool_closeFile(WritePoolCtx_t *ctx);
/* AIO_WritePool_create:
* Allocates and sets and a new write pool including its included jobs.
* bufferSize should be set to the maximal buffer we want to write to at a time. */
WritePoolCtx_t* AIO_WritePool_create(FIO_prefs_t* const prefs, size_t bufferSize);
WritePoolCtx_t* AIO_WritePool_create(const FIO_prefs_t* prefs, size_t bufferSize);

/* AIO_WritePool_free:
* Frees and releases a writePool and its resources. Closes destination file. */
void AIO_WritePool_free(WritePoolCtx_t* ctx);

/* AIO_ReadPool_create:
* Allocates and sets and a new readPool including its included jobs.
* bufferSize should be set to the maximal buffer we want to read at a time, will also be used
* as our basic read size. */
ReadPoolCtx_t* AIO_ReadPool_create(const FIO_prefs_t* prefs, size_t bufferSize);

/* AIO_ReadPool_free:
* Frees and releases a readPool and its resources. Closes source file. */
void AIO_ReadPool_free(ReadPoolCtx_t* ctx);

/* AIO_ReadPool_consumeBytes:
* Consumes byes from srcBuffer's beginning and updates srcBufferLoaded accordingly. */
void AIO_ReadPool_consumeBytes(ReadPoolCtx_t *ctx, size_t n);

/* AIO_ReadPool_fillBuffer:
* Makes sure buffer has at least n bytes loaded (as long as n is not bigger than the initalized bufferSize).
* Returns if srcBuffer has at least n bytes loaded or if we've reached the end of the file.
* Return value is the number of bytes added to the buffer.
* Note that srcBuffer might have up to 2 times bufferSize bytes. */
size_t AIO_ReadPool_fillBuffer(ReadPoolCtx_t *ctx, size_t n);

/* AIO_ReadPool_consumeAndRefill:
* Consumes the current buffer and refills it with bufferSize bytes. */
size_t AIO_ReadPool_consumeAndRefill(ReadPoolCtx_t *ctx);

/* AIO_ReadPool_setFile:
* Sets the source file for future read in the pool. Initiates reading immediately if file is not NULL.
* Waits for all current enqueued tasks to complete if a previous file was set. */
void AIO_ReadPool_setFile(ReadPoolCtx_t *ctx, FILE* file);

/* AIO_ReadPool_getFile:
* Returns the current file set for the read pool. */
FILE* AIO_ReadPool_getFile(const ReadPoolCtx_t *ctx);

/* AIO_ReadPool_closeFile:
* Closes the current set file. Waits for all current enqueued tasks to complete and resets state. */
int AIO_ReadPool_closeFile(ReadPoolCtx_t *ctx);

#if defined (__cplusplus)
}
#endif
Expand Down
2 changes: 1 addition & 1 deletion programs/fileio_common.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) Yann Collet, Facebook, Inc.
* Copyright (c) Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under both the BSD-style license (found in the
Expand Down
4 changes: 2 additions & 2 deletions programs/fileio_types.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) Yann Collet, Facebook, Inc.
* Copyright (c) Facebook, Inc.
* All rights reserved.
*
* This source code is licensed under both the BSD-style license (found in the
Expand Down Expand Up @@ -70,4 +70,4 @@ typedef struct FIO_prefs_s {
int allowBlockDevices;
} FIO_prefs_t;

#endif /* FILEIO_TYPES_HEADER */
#endif /* FILEIO_TYPES_HEADER */
8 changes: 4 additions & 4 deletions programs/zstdcli.c
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
# include "zstdcli_trace.h"
#endif
#include "../lib/zstd.h" /* ZSTD_VERSION_STRING, ZSTD_minCLevel, ZSTD_maxCLevel */
#include "fileio_asyncio.h"


/*-************************************
Expand Down Expand Up @@ -179,7 +180,8 @@ static void usage_advanced(const char* programName)
#ifdef UTIL_HAS_MIRRORFILELIST
DISPLAYOUT( "--output-dir-mirror DIR : processed files are stored into DIR respecting original directory structure \n");
#endif

if (AIO_supported())
DISPLAYOUT( "--[no-]asyncio : use asynchronous IO (default: enabled) \n");

#ifndef ZSTD_NOCOMPRESS
DISPLAYOUT( "--[no-]check : during compression, add XXH64 integrity checksum to frame (default: enabled)");
Expand Down Expand Up @@ -242,9 +244,6 @@ static void usage_advanced(const char* programName)
DISPLAYOUT( " -l : print information about zstd compressed files \n");
DISPLAYOUT( "--test : test compressed file integrity \n");
DISPLAYOUT( " -M# : Set a memory usage limit for decompression \n");
#ifdef ZSTD_MULTITHREAD
DISPLAYOUT( "--[no-]asyncio : use threaded asynchronous IO for output (default: disabled) \n");
#endif
# if ZSTD_SPARSE_DEFAULT
DISPLAYOUT( "--[no-]sparse : sparse mode (default: enabled on file, disabled on stdout) \n");
# else
Expand Down Expand Up @@ -1445,6 +1444,7 @@ int main(int argCount, const char* argv[])
FIO_setTargetCBlockSize(prefs, targetCBlockSize);
FIO_setSrcSizeHint(prefs, srcSizeHint);
FIO_setLiteralCompressionMode(prefs, literalCompressionMode);
FIO_setSparseWrite(prefs, 0);
if (adaptMin > cLevel) cLevel = adaptMin;
if (adaptMax < cLevel) cLevel = adaptMax;

Expand Down
15 changes: 9 additions & 6 deletions tests/playTests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -259,10 +259,13 @@ zstd -dc - < tmp.zst > $INTOVOID
zstd -d < tmp.zst > $INTOVOID # implicit stdout when stdin is used
zstd -d - < tmp.zst > $INTOVOID
println "test : impose memory limitation (must fail)"
zstd -d -f tmp.zst -M2K -c > $INTOVOID && die "decompression needs more memory than allowed"
zstd -d -f tmp.zst --memlimit=2K -c > $INTOVOID && die "decompression needs more memory than allowed" # long command
zstd -d -f tmp.zst --memory=2K -c > $INTOVOID && die "decompression needs more memory than allowed" # long command
zstd -d -f tmp.zst --memlimit-decompress=2K -c > $INTOVOID && die "decompression needs more memory than allowed" # long command
datagen -g500K > tmplimit
zstd -f tmplimit
zstd -d -f tmplimit.zst -M2K -c > $INTOVOID && die "decompression needs more memory than allowed"
zstd -d -f tmplimit.zst --memlimit=2K -c > $INTOVOID && die "decompression needs more memory than allowed" # long command
zstd -d -f tmplimit.zst --memory=2K -c > $INTOVOID && die "decompression needs more memory than allowed" # long command
zstd -d -f tmplimit.zst --memlimit-decompress=2K -c > $INTOVOID && die "decompression needs more memory than allowed" # long command
rm -f tmplimit tmplimit.zst
println "test : overwrite protection"
zstd -q tmp && die "overwrite check failed!"
println "test : force overwrite"
Expand Down Expand Up @@ -1590,11 +1593,11 @@ elif [ "$longCSize19wlog23" -gt "$optCSize19wlog23" ]; then
exit 1
fi

println "\n===> zstd asyncio decompression tests "
println "\n===> zstd asyncio tests "

addFrame() {
datagen -g2M -s$2 >> tmp_uncompressed
datagen -g2M -s$2 | zstd --format=$1 >> tmp_compressed.zst
datagen -g2M -s$2 | zstd -1 --format=$1 >> tmp_compressed.zst
}

addTwoFrames() {
Expand Down