diff --git a/build/VS2008/zstd/zstd.vcproj b/build/VS2008/zstd/zstd.vcproj index c7eec577db3..91f2bda536c 100644 --- a/build/VS2008/zstd/zstd.vcproj +++ b/build/VS2008/zstd/zstd.vcproj @@ -384,6 +384,10 @@ RelativePath="..\..\..\programs\fileio.c" > + + diff --git a/build/VS2010/zstd/zstd.vcxproj b/build/VS2010/zstd/zstd.vcxproj index 46e22f42e9b..8ab239dd814 100644 --- a/build/VS2010/zstd/zstd.vcxproj +++ b/build/VS2010/zstd/zstd.vcxproj @@ -62,6 +62,7 @@ + diff --git a/build/cmake/programs/CMakeLists.txt b/build/cmake/programs/CMakeLists.txt index 490030783d3..28b1e1d166b 100644 --- a/build/cmake/programs/CMakeLists.txt +++ b/build/cmake/programs/CMakeLists.txt @@ -32,7 +32,7 @@ if (MSVC) set(PlatformDependResources ${MSVC_RESOURCE_DIR}/zstd.rc) endif () -add_executable(zstd ${PROGRAMS_DIR}/zstdcli.c ${PROGRAMS_DIR}/util.c ${PROGRAMS_DIR}/timefn.c ${PROGRAMS_DIR}/fileio.c ${PROGRAMS_DIR}/benchfn.c ${PROGRAMS_DIR}/benchzstd.c ${PROGRAMS_DIR}/datagen.c ${PROGRAMS_DIR}/dibio.c ${PROGRAMS_DIR}/zstdcli_trace.c ${PlatformDependResources}) +add_executable(zstd ${PROGRAMS_DIR}/zstdcli.c ${PROGRAMS_DIR}/util.c ${PROGRAMS_DIR}/timefn.c ${PROGRAMS_DIR}/fileio.c ${PROGRAMS_DIR}/fileio_asyncio.c ${PROGRAMS_DIR}/benchfn.c ${PROGRAMS_DIR}/benchzstd.c ${PROGRAMS_DIR}/datagen.c ${PROGRAMS_DIR}/dibio.c ${PROGRAMS_DIR}/zstdcli_trace.c ${PlatformDependResources}) target_link_libraries(zstd ${PROGRAMS_ZSTD_LINK_TARGET}) if (CMAKE_SYSTEM_NAME MATCHES "(Solaris|SunOS)") target_link_libraries(zstd rt) @@ -75,7 +75,7 @@ if (UNIX) ${CMAKE_CURRENT_BINARY_DIR}/zstdless.1 DESTINATION "${MAN_INSTALL_DIR}") - add_executable(zstd-frugal ${PROGRAMS_DIR}/zstdcli.c ${PROGRAMS_DIR}/util.c ${PROGRAMS_DIR}/timefn.c ${PROGRAMS_DIR}/fileio.c) + add_executable(zstd-frugal ${PROGRAMS_DIR}/zstdcli.c ${PROGRAMS_DIR}/util.c ${PROGRAMS_DIR}/timefn.c ${PROGRAMS_DIR}/fileio.c ${PROGRAMS_DIR}/fileio_asyncio.c) target_link_libraries(zstd-frugal ${PROGRAMS_ZSTD_LINK_TARGET}) set_property(TARGET zstd-frugal APPEND PROPERTY COMPILE_DEFINITIONS "ZSTD_NOBENCH;ZSTD_NODICT;ZSTD_NOTRACE") endif () diff --git a/build/meson/programs/meson.build b/build/meson/programs/meson.build index 4181030c2ee..5ccd679a167 100644 --- a/build/meson/programs/meson.build +++ b/build/meson/programs/meson.build @@ -14,20 +14,31 @@ zstd_programs_sources = [join_paths(zstd_rootdir, 'programs/zstdcli.c'), join_paths(zstd_rootdir, 'programs/util.c'), join_paths(zstd_rootdir, 'programs/timefn.c'), join_paths(zstd_rootdir, 'programs/fileio.c'), + join_paths(zstd_rootdir, 'programs/fileio_asyncio.c'), join_paths(zstd_rootdir, 'programs/benchfn.c'), join_paths(zstd_rootdir, 'programs/benchzstd.c'), join_paths(zstd_rootdir, 'programs/datagen.c'), join_paths(zstd_rootdir, 'programs/dibio.c'), join_paths(zstd_rootdir, 'programs/zstdcli_trace.c'), # needed due to use of private symbol + -fvisibility=hidden - join_paths(zstd_rootdir, 'lib/common/xxhash.c')] + join_paths(zstd_rootdir, 'lib/common/xxhash.c'), + join_paths(zstd_rootdir, 'lib/common/pool.c'), + join_paths(zstd_rootdir, 'lib/common/zstd_common.c'), + join_paths(zstd_rootdir, 'lib/common/error_private.c')] +zstd_deps = [ libzstd_dep ] zstd_c_args = libzstd_debug_cflags + +zstd_frugal_deps = [ libzstd_dep ] +zstd_frugal_c_args = [ '-DZSTD_NOBENCH', '-DZSTD_NODICT', '-DZSTD_NOTRACE' ] + if use_multi_thread + zstd_deps += [ thread_dep ] zstd_c_args += [ '-DZSTD_MULTITHREAD' ] + zstd_frugal_deps += [ thread_dep ] + zstd_frugal_c_args += [ '-DZSTD_MULTITHREAD' ] endif -zstd_deps = [ libzstd_dep ] if use_zlib zstd_deps += [ zlib_dep ] zstd_c_args += [ '-DZSTD_GZCOMPRESS', '-DZSTD_GZDECOMPRESS' ] @@ -69,14 +80,18 @@ zstd = executable('zstd', zstd_frugal_sources = [join_paths(zstd_rootdir, 'programs/zstdcli.c'), join_paths(zstd_rootdir, 'programs/timefn.c'), join_paths(zstd_rootdir, 'programs/util.c'), - join_paths(zstd_rootdir, 'programs/fileio.c')] + join_paths(zstd_rootdir, 'programs/fileio.c'), + join_paths(zstd_rootdir, 'programs/fileio_asyncio.c'), + join_paths(zstd_rootdir, 'lib/common/pool.c'), + join_paths(zstd_rootdir, 'lib/common/zstd_common.c'), + join_paths(zstd_rootdir, 'lib/common/error_private.c')] # Minimal target, with only zstd compression and decompression. # No bench. No legacy. executable('zstd-frugal', zstd_frugal_sources, - dependencies: libzstd_dep, - c_args: [ '-DZSTD_NOBENCH', '-DZSTD_NODICT', '-DZSTD_NOTRACE' ], + dependencies: zstd_frugal_deps, + c_args: zstd_frugal_c_args, install: true) install_data(join_paths(zstd_rootdir, 'programs/zstdgrep'), diff --git a/contrib/VS2005/zstd/zstd.vcproj b/contrib/VS2005/zstd/zstd.vcproj index 78645d18a36..e37ebee3911 100644 --- a/contrib/VS2005/zstd/zstd.vcproj +++ b/contrib/VS2005/zstd/zstd.vcproj @@ -363,6 +363,10 @@ RelativePath="..\..\..\programs\fileio.c" > + + diff --git a/lib/common/pool.c b/lib/common/pool.c index 2e37cdd73c8..5c1d07d356e 100644 --- a/lib/common/pool.c +++ b/lib/common/pool.c @@ -96,9 +96,7 @@ static void* POOL_thread(void* opaque) { /* If the intended queue size was 0, signal after finishing job */ ZSTD_pthread_mutex_lock(&ctx->queueMutex); ctx->numThreadsBusy--; - if (ctx->queueSize == 1) { - ZSTD_pthread_cond_signal(&ctx->queuePushCond); - } + ZSTD_pthread_cond_signal(&ctx->queuePushCond); ZSTD_pthread_mutex_unlock(&ctx->queueMutex); } } /* for (;;) */ @@ -190,6 +188,17 @@ void POOL_free(POOL_ctx *ctx) { ZSTD_customFree(ctx, ctx->customMem); } +/*! POOL_joinJobs() : + * Waits for all queued jobs to finish executing. + */ +void POOL_joinJobs(POOL_ctx* ctx) { + ZSTD_pthread_mutex_lock(&ctx->queueMutex); + while(!ctx->queueEmpty || ctx->numThreadsBusy > 0) { + ZSTD_pthread_cond_wait(&ctx->queuePushCond, &ctx->queueMutex); + } + ZSTD_pthread_mutex_unlock(&ctx->queueMutex); +} + void ZSTD_freeThreadPool (ZSTD_threadPool* pool) { POOL_free (pool); } @@ -330,6 +339,11 @@ void POOL_free(POOL_ctx* ctx) { (void)ctx; } +void POOL_joinJobs(POOL_ctx* ctx){ + assert(!ctx || ctx == &g_poolCtx); + (void)ctx; +} + int POOL_resize(POOL_ctx* ctx, size_t numThreads) { (void)ctx; (void)numThreads; return 0; diff --git a/lib/common/pool.h b/lib/common/pool.h index 0ebde1805db..b86a3452e5c 100644 --- a/lib/common/pool.h +++ b/lib/common/pool.h @@ -38,6 +38,12 @@ POOL_ctx* POOL_create_advanced(size_t numThreads, size_t queueSize, */ void POOL_free(POOL_ctx* ctx); + +/*! POOL_joinJobs() : + * Waits for all queued jobs to finish executing. + */ +void POOL_joinJobs(POOL_ctx* ctx); + /*! POOL_resize() : * Expands or shrinks pool's number of threads. * This is more efficient than releasing + creating a new context, diff --git a/programs/Makefile b/programs/Makefile index f77e1b7f10f..82aa498950f 100644 --- a/programs/Makefile +++ b/programs/Makefile @@ -243,17 +243,17 @@ zstd-pgo : ## zstd-small: minimal target, supporting only zstd compression and decompression. no bench. no legacy. no other format. zstd-small: CFLAGS = -Os -s -zstd-frugal zstd-small: $(ZSTDLIB_CORE_SRC) zstdcli.c util.c timefn.c fileio.c +zstd-frugal zstd-small: $(ZSTDLIB_CORE_SRC) zstdcli.c util.c timefn.c fileio.c fileio_asycio.c $(CC) $(FLAGS) -DZSTD_NOBENCH -DZSTD_NODICT -DZSTD_NOTRACE -UZSTD_LEGACY_SUPPORT -DZSTD_LEGACY_SUPPORT=0 $^ -o $@$(EXT) -zstd-decompress: $(ZSTDLIB_COMMON_SRC) $(ZSTDLIB_DECOMPRESS_SRC) zstdcli.c util.c timefn.c fileio.c +zstd-decompress: $(ZSTDLIB_COMMON_SRC) $(ZSTDLIB_DECOMPRESS_SRC) zstdcli.c util.c timefn.c fileio.c fileio_asycio.c $(CC) $(FLAGS) -DZSTD_NOBENCH -DZSTD_NODICT -DZSTD_NOCOMPRESS -DZSTD_NOTRACE -UZSTD_LEGACY_SUPPORT -DZSTD_LEGACY_SUPPORT=0 $^ -o $@$(EXT) -zstd-compress: $(ZSTDLIB_COMMON_SRC) $(ZSTDLIB_COMPRESS_SRC) zstdcli.c util.c timefn.c fileio.c +zstd-compress: $(ZSTDLIB_COMMON_SRC) $(ZSTDLIB_COMPRESS_SRC) zstdcli.c util.c timefn.c fileio.c fileio_asycio.c $(CC) $(FLAGS) -DZSTD_NOBENCH -DZSTD_NODICT -DZSTD_NODECOMPRESS -DZSTD_NOTRACE -UZSTD_LEGACY_SUPPORT -DZSTD_LEGACY_SUPPORT=0 $^ -o $@$(EXT) ## zstd-dictBuilder: executable supporting dictionary creation and compression (only) -zstd-dictBuilder: $(ZSTDLIB_COMMON_SRC) $(ZSTDLIB_COMPRESS_SRC) $(ZDICT_SRC) zstdcli.c util.c timefn.c fileio.c dibio.c +zstd-dictBuilder: $(ZSTDLIB_COMMON_SRC) $(ZSTDLIB_COMPRESS_SRC) $(ZDICT_SRC) zstdcli.c util.c timefn.c fileio.c fileio_asycio.c dibio.c $(CC) $(FLAGS) -DZSTD_NOBENCH -DZSTD_NODECOMPRESS -DZSTD_NOTRACE $^ -o $@$(EXT) zstdmt: zstd diff --git a/programs/fileio.c b/programs/fileio.c index 5338fa62955..5c07902ff29 100644 --- a/programs/fileio.c +++ b/programs/fileio.c @@ -40,8 +40,12 @@ # include #endif -#include "../lib/common/mem.h" /* U32, U64 */ #include "fileio.h" +#include "fileio_asycio.h" +#include "fileio_common.h" + +FIO_display_prefs_t g_display_prefs = {2, FIO_ps_auto}; +UTIL_time_t g_displayClock = UTIL_TIME_INITIALIZER; #define ZSTD_STATIC_LINKING_ONLY /* ZSTD_magicNumber, ZSTD_frameHeaderSize_max */ #include "../lib/zstd.h" @@ -82,62 +86,6 @@ #define DEFAULT_FILE_PERMISSIONS (0666) #endif -/*-************************************* -* Macros -***************************************/ -#define KB *(1 <<10) -#define MB *(1 <<20) -#define GB *(1U<<30) -#undef MAX -#define MAX(a,b) ((a)>(b) ? (a) : (b)) - -struct FIO_display_prefs_s { - int displayLevel; /* 0 : no display; 1: errors; 2: + result + interaction + warnings; 3: + progression; 4: + information */ - FIO_progressSetting_e progressSetting; -}; - -static FIO_display_prefs_t g_display_prefs = {2, FIO_ps_auto}; - -#define DISPLAY(...) fprintf(stderr, __VA_ARGS__) -#define DISPLAYOUT(...) fprintf(stdout, __VA_ARGS__) -#define DISPLAYLEVEL(l, ...) { if (g_display_prefs.displayLevel>=l) { DISPLAY(__VA_ARGS__); } } - -static const U64 g_refreshRate = SEC_TO_MICRO / 6; -static UTIL_time_t g_displayClock = UTIL_TIME_INITIALIZER; - -#define READY_FOR_UPDATE() ((g_display_prefs.progressSetting != FIO_ps_never) && UTIL_clockSpanMicro(g_displayClock) > g_refreshRate) -#define DELAY_NEXT_UPDATE() { g_displayClock = UTIL_getTime(); } -#define DISPLAYUPDATE(l, ...) { \ - if (g_display_prefs.displayLevel>=l && (g_display_prefs.progressSetting != FIO_ps_never)) { \ - if (READY_FOR_UPDATE() || (g_display_prefs.displayLevel>=4)) { \ - DELAY_NEXT_UPDATE(); \ - DISPLAY(__VA_ARGS__); \ - if (g_display_prefs.displayLevel>=4) fflush(stderr); \ - } } } - -#undef MIN /* in case it would be already defined */ -#define MIN(a,b) ((a) < (b) ? (a) : (b)) - - -#define EXM_THROW(error, ...) \ -{ \ - DISPLAYLEVEL(1, "zstd: "); \ - DISPLAYLEVEL(5, "Error defined at %s, line %i : \n", __FILE__, __LINE__); \ - DISPLAYLEVEL(1, "error %i : ", error); \ - DISPLAYLEVEL(1, __VA_ARGS__); \ - DISPLAYLEVEL(1, " \n"); \ - exit(error); \ -} - -#define CHECK_V(v, f) \ - v = f; \ - if (ZSTD_isError(v)) { \ - DISPLAYLEVEL(5, "%s \n", #f); \ - EXM_THROW(11, "%s", ZSTD_getErrorName(v)); \ - } -#define CHECK(f) { size_t err; CHECK_V(err, f); } - - /*-************************************ * Signal (Ctrl-C trapping) **************************************/ @@ -248,94 +196,6 @@ void FIO_addAbortHandler() #endif } - -/*-************************************************************ -* Avoid fseek()'s 2GiB barrier with MSVC, macOS, *BSD, MinGW -***************************************************************/ -#if defined(_MSC_VER) && _MSC_VER >= 1400 -# define LONG_SEEK _fseeki64 -# define LONG_TELL _ftelli64 -#elif !defined(__64BIT__) && (PLATFORM_POSIX_VERSION >= 200112L) /* No point defining Large file for 64 bit */ -# define LONG_SEEK fseeko -# define LONG_TELL ftello -#elif defined(__MINGW32__) && !defined(__STRICT_ANSI__) && !defined(__NO_MINGW_LFS) && defined(__MSVCRT__) -# define LONG_SEEK fseeko64 -# define LONG_TELL ftello64 -#elif defined(_WIN32) && !defined(__DJGPP__) -# include - static int LONG_SEEK(FILE* file, __int64 offset, int origin) { - LARGE_INTEGER off; - DWORD method; - off.QuadPart = offset; - if (origin == SEEK_END) - method = FILE_END; - else if (origin == SEEK_CUR) - method = FILE_CURRENT; - else - method = FILE_BEGIN; - - if (SetFilePointerEx((HANDLE) _get_osfhandle(_fileno(file)), off, NULL, method)) - return 0; - else - return -1; - } - static __int64 LONG_TELL(FILE* file) { - LARGE_INTEGER off, newOff; - off.QuadPart = 0; - newOff.QuadPart = 0; - SetFilePointerEx((HANDLE) _get_osfhandle(_fileno(file)), off, &newOff, FILE_CURRENT); - return newOff.QuadPart; - } -#else -# define LONG_SEEK fseek -# define LONG_TELL ftell -#endif - - -/*-************************************* -* Parameters: FIO_prefs_t -***************************************/ - -/* typedef'd to FIO_prefs_t within fileio.h */ -struct FIO_prefs_s { - - /* Algorithm preferences */ - FIO_compressionType_t compressionType; - U32 sparseFileSupport; /* 0: no sparse allowed; 1: auto (file yes, stdout no); 2: force sparse */ - int dictIDFlag; - int checksumFlag; - int blockSize; - int overlapLog; - U32 adaptiveMode; - U32 useRowMatchFinder; - int rsyncable; - int minAdaptLevel; - int maxAdaptLevel; - int ldmFlag; - int ldmHashLog; - int ldmMinMatch; - int ldmBucketSizeLog; - int ldmHashRateLog; - size_t streamSrcSize; - size_t targetCBlockSize; - int srcSizeHint; - int testMode; - ZSTD_paramSwitch_e literalCompressionMode; - - /* IO preferences */ - U32 removeSrcFile; - U32 overwrite; - - /* Computation resources preferences */ - unsigned memLimit; - int nbWorkers; - - int excludeCompressedFiles; - int patchFromMode; - int contentSize; - int allowBlockDevices; -}; - /*-************************************* * Parameters: FIO_ctx_t ***************************************/ @@ -395,6 +255,7 @@ FIO_prefs_t* FIO_createPreferences(void) ret->literalCompressionMode = ZSTD_ps_auto; ret->excludeCompressedFiles = 0; ret->allowBlockDevices = 0; + ret->asyncIO = 0; return ret; } @@ -558,6 +419,16 @@ void FIO_setContentSize(FIO_prefs_t* const prefs, int value) prefs->contentSize = value != 0; } +void FIO_setAsyncIOFlag(FIO_prefs_t* const prefs, unsigned value) { +#ifdef ZSTD_MULTITHREAD + prefs->asyncIO = value; +#else + (void) prefs; + (void) value; + DISPLAYLEVEL(2, "Note : asyncio is disabled (lack of multithreading support) \n"); +#endif +} + /* FIO_ctx_t functions */ void FIO_setHasStdoutOutput(FIO_ctx_t* const fCtx, int value) { @@ -932,16 +803,12 @@ static int FIO_removeMultiFilesWarning(FIO_ctx_t* const fCtx, const FIO_prefs_t* * Compression ************************************************************************/ typedef struct { - FILE* srcFile; - FILE* dstFile; - void* srcBuffer; - size_t srcBufferSize; - void* dstBuffer; - size_t dstBufferSize; void* dictBuffer; size_t dictBufferSize; const char* dictFileName; ZSTD_CStream* cctx; + write_pool_ctx_t *writeCtx; + read_pool_ctx_t *readCtx; } cRess_t; /** ZSTD_cycleLog() : @@ -990,9 +857,6 @@ static cRess_t FIO_createCResources(FIO_prefs_t* const prefs, if (ress.cctx == NULL) EXM_THROW(30, "allocation error (%s): can't create ZSTD_CCtx", strerror(errno)); - ress.srcBufferSize = ZSTD_CStreamInSize(); - ress.srcBuffer = malloc(ress.srcBufferSize); - ress.dstBufferSize = ZSTD_CStreamOutSize(); /* need to update memLimit before calling createDictBuffer * because of memLimit check inside it */ @@ -1000,10 +864,10 @@ static cRess_t FIO_createCResources(FIO_prefs_t* const prefs, unsigned long long const ssSize = (unsigned long long)prefs->streamSrcSize; FIO_adjustParamsForPatchFromMode(prefs, &comprParams, UTIL_getFileSize(dictFileName), ssSize > 0 ? ssSize : maxSrcFileSize, cLevel); } - ress.dstBuffer = malloc(ress.dstBufferSize); ress.dictBufferSize = FIO_createDictBuffer(&ress.dictBuffer, dictFileName, prefs); /* works with dictFileName==NULL */ - if (!ress.srcBuffer || !ress.dstBuffer) - EXM_THROW(31, "allocation error : not enough memory"); + + ress.writeCtx = WritePool_create(prefs, ZSTD_CStreamOutSize()); + ress.readCtx = ReadPool_create(prefs, ZSTD_CStreamInSize()); /* Advanced parameters, including dictionary */ if (dictFileName && (ress.dictBuffer==NULL)) @@ -1066,9 +930,9 @@ static cRess_t FIO_createCResources(FIO_prefs_t* const prefs, static void FIO_freeCResources(const cRess_t* const ress) { - free(ress->srcBuffer); - free(ress->dstBuffer); free(ress->dictBuffer); + WritePool_free(ress->writeCtx); + ReadPool_free(ress->readCtx); ZSTD_freeCStream(ress->cctx); /* never fails */ } @@ -1081,6 +945,7 @@ FIO_compressGzFrame(const cRess_t* ress, /* buffers & handlers are used, but no { unsigned long long inFileSize = 0, outFileSize = 0; z_stream strm; + io_job_t *writeJob = NULL; if (compressionLevel > Z_BEST_COMPRESSION) compressionLevel = Z_BEST_COMPRESSION; @@ -1096,30 +961,37 @@ FIO_compressGzFrame(const cRess_t* ress, /* buffers & handlers are used, but no EXM_THROW(71, "zstd: %s: deflateInit2 error %d \n", srcFileName, ret); } } + writeJob = WritePool_acquireJob(ress->writeCtx); strm.next_in = 0; strm.avail_in = 0; - strm.next_out = (Bytef*)ress->dstBuffer; - strm.avail_out = (uInt)ress->dstBufferSize; + strm.next_out = (Bytef*)writeJob->buffer; + strm.avail_out = (uInt)writeJob->bufferSize; while (1) { int ret; if (strm.avail_in == 0) { - size_t const inSize = fread(ress->srcBuffer, 1, ress->srcBufferSize, ress->srcFile); - if (inSize == 0) break; - inFileSize += inSize; - strm.next_in = (z_const unsigned char*)ress->srcBuffer; - strm.avail_in = (uInt)inSize; + ReadPool_fillBuffer(ress->readCtx, ZSTD_CStreamInSize()); + if (ress->readCtx->srcBufferLoaded == 0) break; + inFileSize += ress->readCtx->srcBufferLoaded; + strm.next_in = (z_const unsigned char*)ress->readCtx->srcBuffer; + strm.avail_in = (uInt)ress->readCtx->srcBufferLoaded; + } + + { + size_t const availBefore = strm.avail_in; + ret = deflate(&strm, Z_NO_FLUSH); + ReadPool_consumeBytes(ress->readCtx, availBefore - strm.avail_in); } - ret = deflate(&strm, Z_NO_FLUSH); + if (ret != Z_OK) EXM_THROW(72, "zstd: %s: deflate error %d \n", srcFileName, ret); - { size_t const cSize = ress->dstBufferSize - strm.avail_out; + { size_t const cSize = writeJob->bufferSize - strm.avail_out; if (cSize) { - if (fwrite(ress->dstBuffer, 1, cSize, ress->dstFile) != cSize) - EXM_THROW(73, "Write error : cannot write to output file : %s ", strerror(errno)); + writeJob->usedBufferSize = cSize; + WritePool_enqueueAndReacquireWriteJob(&writeJob); outFileSize += cSize; - strm.next_out = (Bytef*)ress->dstBuffer; - strm.avail_out = (uInt)ress->dstBufferSize; + strm.next_out = (Bytef*)writeJob->buffer; + strm.avail_out = (uInt)writeJob->bufferSize; } } if (srcFileSize == UTIL_FILESIZE_UNKNOWN) { DISPLAYUPDATE(2, "\rRead : %u MB ==> %.2f%% ", @@ -1133,13 +1005,13 @@ FIO_compressGzFrame(const cRess_t* ress, /* buffers & handlers are used, but no while (1) { int const ret = deflate(&strm, Z_FINISH); - { size_t const cSize = ress->dstBufferSize - strm.avail_out; + { size_t const cSize = writeJob->bufferSize - strm.avail_out; if (cSize) { - if (fwrite(ress->dstBuffer, 1, cSize, ress->dstFile) != cSize) - EXM_THROW(75, "Write error : %s ", strerror(errno)); + writeJob->usedBufferSize = cSize; + WritePool_enqueueAndReacquireWriteJob(&writeJob); outFileSize += cSize; - strm.next_out = (Bytef*)ress->dstBuffer; - strm.avail_out = (uInt)ress->dstBufferSize; + strm.next_out = (Bytef*)writeJob->buffer; + strm.avail_out = (uInt)writeJob->bufferSize; } } if (ret == Z_STREAM_END) break; if (ret != Z_BUF_ERROR) @@ -1151,6 +1023,8 @@ FIO_compressGzFrame(const cRess_t* ress, /* buffers & handlers are used, but no EXM_THROW(79, "zstd: %s: deflateEnd error %d \n", srcFileName, ret); } } *readsize = inFileSize; + WritePool_releaseIoJob(writeJob); + WritePool_sparseWriteEnd(ress->writeCtx); return outFileSize; } #endif @@ -1166,6 +1040,7 @@ FIO_compressLzmaFrame(cRess_t* ress, lzma_stream strm = LZMA_STREAM_INIT; lzma_action action = LZMA_RUN; lzma_ret ret; + io_job_t *writeJob = NULL; if (compressionLevel < 0) compressionLevel = 0; if (compressionLevel > 9) compressionLevel = 9; @@ -1183,31 +1058,37 @@ FIO_compressLzmaFrame(cRess_t* ress, EXM_THROW(83, "zstd: %s: lzma_easy_encoder error %d", srcFileName, ret); } + writeJob = WritePool_acquireJob(ress->writeCtx); + strm.next_out = (Bytef*)writeJob->buffer; + strm.avail_out = (uInt)writeJob->bufferSize; strm.next_in = 0; strm.avail_in = 0; - strm.next_out = (BYTE*)ress->dstBuffer; - strm.avail_out = ress->dstBufferSize; while (1) { if (strm.avail_in == 0) { - size_t const inSize = fread(ress->srcBuffer, 1, ress->srcBufferSize, ress->srcFile); - if (inSize == 0) action = LZMA_FINISH; + size_t const inSize = ReadPool_fillBuffer(ress->readCtx, ZSTD_CStreamInSize()); + if (ress->readCtx->srcBufferLoaded == 0) action = LZMA_FINISH; inFileSize += inSize; - strm.next_in = (BYTE const*)ress->srcBuffer; - strm.avail_in = inSize; + strm.next_in = (BYTE const*)ress->readCtx->srcBuffer; + strm.avail_in = ress->readCtx->srcBufferLoaded; + } + + { + size_t const availBefore = strm.avail_in; + ret = lzma_code(&strm, action); + ReadPool_consumeBytes(ress->readCtx, availBefore - strm.avail_in); } - ret = lzma_code(&strm, action); if (ret != LZMA_OK && ret != LZMA_STREAM_END) EXM_THROW(84, "zstd: %s: lzma_code encoding error %d", srcFileName, ret); - { size_t const compBytes = ress->dstBufferSize - strm.avail_out; + { size_t const compBytes = writeJob->bufferSize - strm.avail_out; if (compBytes) { - if (fwrite(ress->dstBuffer, 1, compBytes, ress->dstFile) != compBytes) - EXM_THROW(85, "Write error : %s", strerror(errno)); + writeJob->usedBufferSize = compBytes; + WritePool_enqueueAndReacquireWriteJob(&writeJob); outFileSize += compBytes; - strm.next_out = (BYTE*)ress->dstBuffer; - strm.avail_out = ress->dstBufferSize; + strm.next_out = (Bytef*)writeJob->buffer; + strm.avail_out = writeJob->bufferSize; } } if (srcFileSize == UTIL_FILESIZE_UNKNOWN) DISPLAYUPDATE(2, "\rRead : %u MB ==> %.2f%%", @@ -1223,6 +1104,9 @@ FIO_compressLzmaFrame(cRess_t* ress, lzma_end(&strm); *readsize = inFileSize; + WritePool_releaseIoJob(writeJob); + WritePool_sparseWriteEnd(ress->writeCtx); + return outFileSize; } #endif @@ -1248,13 +1132,15 @@ FIO_compressLz4Frame(cRess_t* ress, LZ4F_preferences_t prefs; LZ4F_compressionContext_t ctx; + io_job_t *writeJob = WritePool_acquireJob(ress->writeCtx); + LZ4F_errorCode_t const errorCode = LZ4F_createCompressionContext(&ctx, LZ4F_VERSION); if (LZ4F_isError(errorCode)) EXM_THROW(31, "zstd: failed to create lz4 compression context"); memset(&prefs, 0, sizeof(prefs)); - assert(blockSize <= ress->srcBufferSize); + assert(blockSize <= ress->readCtx->srcBufferBaseSize); prefs.autoFlush = 1; prefs.compressionLevel = compressionLevel; @@ -1264,27 +1150,26 @@ FIO_compressLz4Frame(cRess_t* ress, #if LZ4_VERSION_NUMBER >= 10600 prefs.frameInfo.contentSize = (srcFileSize==UTIL_FILESIZE_UNKNOWN) ? 0 : srcFileSize; #endif - assert(LZ4F_compressBound(blockSize, &prefs) <= ress->dstBufferSize); + assert(LZ4F_compressBound(blockSize, &prefs) <= writeJob->bufferSize); { size_t readSize; - size_t headerSize = LZ4F_compressBegin(ctx, ress->dstBuffer, ress->dstBufferSize, &prefs); + size_t headerSize = LZ4F_compressBegin(ctx, writeJob->buffer, writeJob->bufferSize, &prefs); if (LZ4F_isError(headerSize)) EXM_THROW(33, "File header generation failed : %s", LZ4F_getErrorName(headerSize)); - if (fwrite(ress->dstBuffer, 1, headerSize, ress->dstFile) != headerSize) - EXM_THROW(34, "Write error : %s (cannot write header)", strerror(errno)); + writeJob->usedBufferSize = headerSize; + WritePool_enqueueAndReacquireWriteJob(&writeJob); outFileSize += headerSize; /* Read first block */ - readSize = fread(ress->srcBuffer, (size_t)1, (size_t)blockSize, ress->srcFile); + readSize = ReadPool_fillBuffer(ress->readCtx, blockSize); inFileSize += readSize; /* Main Loop */ - while (readSize>0) { - size_t const outSize = LZ4F_compressUpdate(ctx, - ress->dstBuffer, ress->dstBufferSize, - ress->srcBuffer, readSize, NULL); + while (ress->readCtx->srcBufferLoaded) { + size_t const outSize = LZ4F_compressUpdate(ctx, writeJob->buffer, writeJob->bufferSize, + ress->readCtx->srcBuffer, ress->readCtx->srcBufferLoaded, NULL); if (LZ4F_isError(outSize)) EXM_THROW(35, "zstd: %s: lz4 compression failed : %s", srcFileName, LZ4F_getErrorName(outSize)); @@ -1300,33 +1185,30 @@ FIO_compressLz4Frame(cRess_t* ress, } /* Write Block */ - { size_t const sizeCheck = fwrite(ress->dstBuffer, 1, outSize, ress->dstFile); - if (sizeCheck != outSize) - EXM_THROW(36, "Write error : %s", strerror(errno)); - } + writeJob->usedBufferSize = outSize; + WritePool_enqueueAndReacquireWriteJob(&writeJob); /* Read next block */ - readSize = fread(ress->srcBuffer, (size_t)1, (size_t)blockSize, ress->srcFile); + ReadPool_consumeBytes(ress->readCtx, ress->readCtx->srcBufferLoaded); + readSize = ReadPool_fillBuffer(ress->readCtx, blockSize); inFileSize += readSize; } - if (ferror(ress->srcFile)) EXM_THROW(37, "Error reading %s ", srcFileName); /* End of Stream mark */ - headerSize = LZ4F_compressEnd(ctx, ress->dstBuffer, ress->dstBufferSize, NULL); + headerSize = LZ4F_compressEnd(ctx, writeJob->buffer, writeJob->bufferSize, NULL); if (LZ4F_isError(headerSize)) EXM_THROW(38, "zstd: %s: lz4 end of file generation failed : %s", srcFileName, LZ4F_getErrorName(headerSize)); - { size_t const sizeCheck = fwrite(ress->dstBuffer, 1, headerSize, ress->dstFile); - if (sizeCheck != headerSize) - EXM_THROW(39, "Write error : %s (cannot write end of stream)", - strerror(errno)); - } + writeJob->usedBufferSize = headerSize; + WritePool_enqueueAndReacquireWriteJob(&writeJob); outFileSize += headerSize; } *readsize = inFileSize; LZ4F_freeCompressionContext(ctx); + WritePool_releaseIoJob(writeJob); + WritePool_sparseWriteEnd(ress->writeCtx); return outFileSize; } @@ -1341,8 +1223,8 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx, int compressionLevel, U64* readsize) { cRess_t const ress = *ressPtr; - FILE* const srcFile = ress.srcFile; - FILE* const dstFile = ress.dstFile; + io_job_t *writeJob = WritePool_acquireJob(ressPtr->writeCtx); + U64 compressedfilesize = 0; ZSTD_EndDirective directive = ZSTD_e_continue; U64 pledgedSrcSize = ZSTD_CONTENTSIZE_UNKNOWN; @@ -1387,12 +1269,12 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx, do { size_t stillToFlush; /* Fill input Buffer */ - size_t const inSize = fread(ress.srcBuffer, (size_t)1, ress.srcBufferSize, srcFile); - ZSTD_inBuffer inBuff = { ress.srcBuffer, inSize, 0 }; + size_t const inSize = ReadPool_fillBuffer(ress.readCtx, ZSTD_CStreamInSize()); + ZSTD_inBuffer inBuff = { ress.readCtx->srcBuffer, ress.readCtx->srcBufferLoaded, 0 }; DISPLAYLEVEL(6, "fread %u bytes from source \n", (unsigned)inSize); *readsize += inSize; - if ((inSize == 0) || (*readsize == fileSize)) + if ((ress.readCtx->srcBufferLoaded == 0) || (*readsize == fileSize)) directive = ZSTD_e_end; stillToFlush = 1; @@ -1400,9 +1282,10 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx, || (directive == ZSTD_e_end && stillToFlush != 0) ) { size_t const oldIPos = inBuff.pos; - ZSTD_outBuffer outBuff = { ress.dstBuffer, ress.dstBufferSize, 0 }; + ZSTD_outBuffer outBuff= { writeJob->buffer, writeJob->bufferSize, 0 }; size_t const toFlushNow = ZSTD_toFlushNow(ress.cctx); CHECK_V(stillToFlush, ZSTD_compressStream2(ress.cctx, &outBuff, &inBuff, directive)); + ReadPool_consumeBytes(ress.readCtx, inBuff.pos - oldIPos); /* count stats */ inputPresented++; @@ -1413,10 +1296,8 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx, DISPLAYLEVEL(6, "ZSTD_compress_generic(end:%u) => input pos(%u)<=(%u)size ; output generated %u bytes \n", (unsigned)directive, (unsigned)inBuff.pos, (unsigned)inBuff.size, (unsigned)outBuff.pos); if (outBuff.pos) { - size_t const sizeCheck = fwrite(ress.dstBuffer, 1, outBuff.pos, dstFile); - if (sizeCheck != outBuff.pos) - EXM_THROW(25, "Write error : %s (cannot write compressed block)", - strerror(errno)); + writeJob->usedBufferSize = outBuff.pos; + WritePool_enqueueAndReacquireWriteJob(&writeJob); compressedfilesize += outBuff.pos; } @@ -1548,14 +1429,14 @@ FIO_compressZstdFrame(FIO_ctx_t* const fCtx, } /* while ((inBuff.pos != inBuff.size) */ } while (directive != ZSTD_e_end); - if (ferror(srcFile)) { - EXM_THROW(26, "Read error : I/O error"); - } if (fileSize != UTIL_FILESIZE_UNKNOWN && *readsize != fileSize) { EXM_THROW(27, "Read error : Incomplete read : %llu / %llu B", (unsigned long long)*readsize, (unsigned long long)fileSize); } + WritePool_releaseIoJob(writeJob); + WritePool_sparseWriteEnd(ressPtr->writeCtx); + return compressedfilesize; } @@ -1656,7 +1537,7 @@ FIO_compressFilename_internal(FIO_ctx_t* const fCtx, /*! FIO_compressFilename_dstFile() : - * open dstFileName, or pass-through if ress.dstFile != NULL, + * open dstFileName, or pass-through if ress.file != NULL, * then start compression with FIO_compressFilename_internal(). * Manages source removal (--rm) and file permissions transfer. * note : ress.srcFile must be != NULL, @@ -1675,8 +1556,9 @@ static int FIO_compressFilename_dstFile(FIO_ctx_t* const fCtx, int result; stat_t statbuf; int transferMTime = 0; - assert(ress.srcFile != NULL); - if (ress.dstFile == NULL) { + FILE *dstFile; + assert(ReadPool_getFile(ress.readCtx) != NULL); + if (WritePool_getFile(ress.writeCtx) == NULL) { int dstFilePermissions = DEFAULT_FILE_PERMISSIONS; if ( strcmp (srcFileName, stdinmark) && strcmp (dstFileName, stdoutmark) @@ -1688,8 +1570,9 @@ static int FIO_compressFilename_dstFile(FIO_ctx_t* const fCtx, closeDstFile = 1; DISPLAYLEVEL(6, "FIO_compressFilename_dstFile: opening dst: %s \n", dstFileName); - ress.dstFile = FIO_openDstFile(fCtx, prefs, srcFileName, dstFileName, dstFilePermissions); - if (ress.dstFile==NULL) return 1; /* could not open dstFileName */ + dstFile = FIO_openDstFile(fCtx, prefs, srcFileName, dstFileName, dstFilePermissions); + if (dstFile==NULL) return 1; /* could not open dstFileName */ + WritePool_setFile(ress.writeCtx, dstFile); /* Must only be added after FIO_openDstFile() succeeds. * Otherwise we may delete the destination file if it already exists, * and the user presses Ctrl-C when asked if they wish to overwrite. @@ -1700,13 +1583,10 @@ static int FIO_compressFilename_dstFile(FIO_ctx_t* const fCtx, result = FIO_compressFilename_internal(fCtx, prefs, ress, dstFileName, srcFileName, compressionLevel); if (closeDstFile) { - FILE* const dstFile = ress.dstFile; - ress.dstFile = NULL; - clearHandler(); DISPLAYLEVEL(6, "FIO_compressFilename_dstFile: closing dst: %s \n", dstFileName); - if (fclose(dstFile)) { /* error closing dstFile */ + if (WritePool_closeFile(ress.writeCtx)) { /* error closing file */ DISPLAYLEVEL(1, "zstd: %s: %s \n", dstFileName, strerror(errno)); result=1; } @@ -1752,6 +1632,7 @@ FIO_compressFilename_srcFile(FIO_ctx_t* const fCtx, int compressionLevel) { int result; + FILE* srcFile; DISPLAYLEVEL(6, "FIO_compressFilename_srcFile: %s \n", srcFileName); /* ensure src is not a directory */ @@ -1775,13 +1656,13 @@ FIO_compressFilename_srcFile(FIO_ctx_t* const fCtx, return 0; } - ress.srcFile = FIO_openSrcFile(prefs, srcFileName); - if (ress.srcFile == NULL) return 1; /* srcFile could not be opened */ + srcFile = FIO_openSrcFile(prefs, srcFileName); + if (srcFile == NULL) return 1; /* srcFile could not be opened */ + ReadPool_setFile(ress.readCtx, srcFile); result = FIO_compressFilename_dstFile(fCtx, prefs, ress, dstFileName, srcFileName, compressionLevel); + ReadPool_closeFile(ress.readCtx); - fclose(ress.srcFile); - ress.srcFile = NULL; if ( prefs->removeSrcFile /* --rm */ && result == 0 /* success */ && strcmp(srcFileName, stdinmark) /* exception : don't erase stdin */ @@ -1798,7 +1679,7 @@ FIO_compressFilename_srcFile(FIO_ctx_t* const fCtx, static const char* checked_index(const char* options[], size_t length, size_t index) { assert(index < length); - // Necessary to avoid warnings since -O3 will omit the above `assert` + /* Necessary to avoid warnings since -O3 will omit the above `assert` */ (void) length; return options[index]; } @@ -1928,23 +1809,24 @@ int FIO_compressMultipleFilenames(FIO_ctx_t* const fCtx, /* init */ assert(outFileName != NULL || suffix != NULL); if (outFileName != NULL) { /* output into a single destination (stdout typically) */ + FILE *dstFile; if (FIO_removeMultiFilesWarning(fCtx, prefs, outFileName, 1 /* displayLevelCutoff */)) { FIO_freeCResources(&ress); return 1; } - ress.dstFile = FIO_openDstFile(fCtx, prefs, NULL, outFileName, DEFAULT_FILE_PERMISSIONS); - if (ress.dstFile == NULL) { /* could not open outFileName */ + dstFile = FIO_openDstFile(fCtx, prefs, NULL, outFileName, DEFAULT_FILE_PERMISSIONS); + if (dstFile == NULL) { /* could not open outFileName */ error = 1; } else { + WritePool_setFile(ress.writeCtx, dstFile); for (; fCtx->currFileIdx < fCtx->nbFilesTotal; ++fCtx->currFileIdx) { status = FIO_compressFilename_srcFile(fCtx, prefs, ress, outFileName, inFileNamesTable[fCtx->currFileIdx], compressionLevel); if (!status) fCtx->nbFilesProcessed++; error |= status; } - if (fclose(ress.dstFile)) + if (WritePool_closeFile(ress.writeCtx)) EXM_THROW(29, "Write error (%s) : cannot properly close %s", strerror(errno), outFileName); - ress.dstFile = NULL; } } else { if (outMirroredRootDirName) @@ -2001,13 +1883,9 @@ int FIO_compressMultipleFilenames(FIO_ctx_t* const fCtx, * Decompression ***************************************************************************/ typedef struct { - void* srcBuffer; - size_t srcBufferSize; - size_t srcBufferLoaded; - void* dstBuffer; - size_t dstBufferSize; ZSTD_DStream* dctx; - FILE* dstFile; + write_pool_ctx_t *writeCtx; + read_pool_ctx_t *readCtx; } dRess_t; static dRess_t FIO_createDResources(FIO_prefs_t* const prefs, const char* dictFileName) @@ -2025,13 +1903,6 @@ static dRess_t FIO_createDResources(FIO_prefs_t* const prefs, const char* dictFi CHECK( ZSTD_DCtx_setMaxWindowSize(ress.dctx, prefs->memLimit) ); CHECK( ZSTD_DCtx_setParameter(ress.dctx, ZSTD_d_forceIgnoreChecksum, !prefs->checksumFlag)); - ress.srcBufferSize = ZSTD_DStreamInSize(); - ress.srcBuffer = malloc(ress.srcBufferSize); - ress.dstBufferSize = ZSTD_DStreamOutSize(); - ress.dstBuffer = malloc(ress.dstBufferSize); - if (!ress.srcBuffer || !ress.dstBuffer) - EXM_THROW(61, "Allocation error : not enough memory"); - /* dictionary */ { void* dictBuffer; size_t const dictBufferSize = FIO_createDictBuffer(&dictBuffer, dictFileName, prefs); @@ -2039,145 +1910,40 @@ static dRess_t FIO_createDResources(FIO_prefs_t* const prefs, const char* dictFi free(dictBuffer); } + ress.writeCtx = WritePool_create(prefs, ZSTD_DStreamOutSize()); + ress.readCtx = ReadPool_create(prefs, ZSTD_DStreamInSize()); + return ress; } static void FIO_freeDResources(dRess_t ress) { CHECK( ZSTD_freeDStream(ress.dctx) ); - free(ress.srcBuffer); - free(ress.dstBuffer); + WritePool_free(ress.writeCtx); + ReadPool_free(ress.readCtx); } - -/** FIO_fwriteSparse() : -* @return : storedSkips, -* argument for next call to FIO_fwriteSparse() or FIO_fwriteSparseEnd() */ -static unsigned -FIO_fwriteSparse(FILE* file, - const void* buffer, size_t bufferSize, - const FIO_prefs_t* const prefs, - unsigned storedSkips) -{ - const size_t* const bufferT = (const size_t*)buffer; /* Buffer is supposed malloc'ed, hence aligned on size_t */ - size_t bufferSizeT = bufferSize / sizeof(size_t); - const size_t* const bufferTEnd = bufferT + bufferSizeT; - const size_t* ptrT = bufferT; - static const size_t segmentSizeT = (32 KB) / sizeof(size_t); /* check every 32 KB */ - - if (prefs->testMode) return 0; /* do not output anything in test mode */ - - if (!prefs->sparseFileSupport) { /* normal write */ - size_t const sizeCheck = fwrite(buffer, 1, bufferSize, file); - if (sizeCheck != bufferSize) - EXM_THROW(70, "Write error : cannot write decoded block : %s", - strerror(errno)); - return 0; - } - - /* avoid int overflow */ - if (storedSkips > 1 GB) { - if (LONG_SEEK(file, 1 GB, SEEK_CUR) != 0) - EXM_THROW(91, "1 GB skip error (sparse file support)"); - storedSkips -= 1 GB; - } - - while (ptrT < bufferTEnd) { - size_t nb0T; - - /* adjust last segment if < 32 KB */ - size_t seg0SizeT = segmentSizeT; - if (seg0SizeT > bufferSizeT) seg0SizeT = bufferSizeT; - bufferSizeT -= seg0SizeT; - - /* count leading zeroes */ - for (nb0T=0; (nb0T < seg0SizeT) && (ptrT[nb0T] == 0); nb0T++) ; - storedSkips += (unsigned)(nb0T * sizeof(size_t)); - - if (nb0T != seg0SizeT) { /* not all 0s */ - size_t const nbNon0ST = seg0SizeT - nb0T; - /* skip leading zeros */ - if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0) - EXM_THROW(92, "Sparse skip error ; try --no-sparse"); - storedSkips = 0; - /* write the rest */ - if (fwrite(ptrT + nb0T, sizeof(size_t), nbNon0ST, file) != nbNon0ST) - EXM_THROW(93, "Write error : cannot write decoded block : %s", - strerror(errno)); - } - ptrT += seg0SizeT; - } - - { static size_t const maskT = sizeof(size_t)-1; - if (bufferSize & maskT) { - /* size not multiple of sizeof(size_t) : implies end of block */ - const char* const restStart = (const char*)bufferTEnd; - const char* restPtr = restStart; - const char* const restEnd = (const char*)buffer + bufferSize; - assert(restEnd > restStart && restEnd < restStart + sizeof(size_t)); - for ( ; (restPtr < restEnd) && (*restPtr == 0); restPtr++) ; - storedSkips += (unsigned) (restPtr - restStart); - if (restPtr != restEnd) { - /* not all remaining bytes are 0 */ - size_t const restSize = (size_t)(restEnd - restPtr); - if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0) - EXM_THROW(92, "Sparse skip error ; try --no-sparse"); - if (fwrite(restPtr, 1, restSize, file) != restSize) - EXM_THROW(95, "Write error : cannot write end of decoded block : %s", - strerror(errno)); - storedSkips = 0; - } } } - - return storedSkips; -} - -static void -FIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips) -{ - if (prefs->testMode) assert(storedSkips == 0); - if (storedSkips>0) { - assert(prefs->sparseFileSupport > 0); /* storedSkips>0 implies sparse support is enabled */ - (void)prefs; /* assert can be disabled, in which case prefs becomes unused */ - if (LONG_SEEK(file, storedSkips-1, SEEK_CUR) != 0) - EXM_THROW(69, "Final skip error (sparse file support)"); - /* last zero must be explicitly written, - * so that skipped ones get implicitly translated as zero by FS */ - { const char lastZeroByte[1] = { 0 }; - if (fwrite(lastZeroByte, 1, 1, file) != 1) - EXM_THROW(69, "Write error : cannot write last zero : %s", strerror(errno)); - } } -} - - /** FIO_passThrough() : just copy input into output, for compatibility with gzip -df mode @return : 0 (no error) */ -static int FIO_passThrough(const FIO_prefs_t* const prefs, - FILE* foutput, FILE* finput, - void* buffer, size_t bufferSize, - size_t alreadyLoaded) +static int FIO_passThrough(dRess_t *ress) { - size_t const blockSize = MIN(64 KB, bufferSize); - size_t readFromInput; - unsigned storedSkips = 0; - - /* assumption : ress->srcBufferLoaded bytes already loaded and stored within buffer */ - { size_t const sizeCheck = fwrite(buffer, 1, alreadyLoaded, foutput); - if (sizeCheck != alreadyLoaded) { - DISPLAYLEVEL(1, "Pass-through write error : %s\n", strerror(errno)); - return 1; - } } - - do { - readFromInput = fread(buffer, 1, blockSize, finput); - storedSkips = FIO_fwriteSparse(foutput, buffer, readFromInput, prefs, storedSkips); - } while (readFromInput == blockSize); - if (ferror(finput)) { - DISPLAYLEVEL(1, "Pass-through read error : %s\n", strerror(errno)); - return 1; - } - assert(feof(finput)); - - FIO_fwriteSparseEnd(prefs, foutput, storedSkips); + size_t const blockSize = MIN(MIN(64 KB, ZSTD_DStreamInSize()), ZSTD_DStreamOutSize()); + io_job_t *writeJob = WritePool_acquireJob(ress->writeCtx); + ReadPool_fillBuffer(ress->readCtx, blockSize); + + while(ress->readCtx->srcBufferLoaded) { + size_t writeSize; + ReadPool_fillBuffer(ress->readCtx, blockSize); + writeSize = MIN(blockSize, ress->readCtx->srcBufferLoaded); + assert(writeSize <= writeJob->bufferSize); + memcpy(writeJob->buffer, ress->readCtx->srcBuffer, writeSize); + writeJob->usedBufferSize = writeSize; + WritePool_enqueueAndReacquireWriteJob(&writeJob); + ReadPool_consumeBytes(ress->readCtx, writeSize); + } + assert(ress->readCtx->reachedEof); + WritePool_releaseIoJob(writeJob); + WritePool_sparseWriteEnd(ress->writeCtx); return 0; } @@ -2195,7 +1961,7 @@ FIO_zstdErrorHelp(const FIO_prefs_t* const prefs, return; /* Try to decode the frame header */ - err = ZSTD_getFrameHeader(&header, ress->srcBuffer, ress->srcBufferLoaded); + err = ZSTD_getFrameHeader(&header, ress->readCtx->srcBuffer, ress->readCtx->srcBufferLoaded); if (err == 0) { unsigned long long const windowSize = header.windowSize; unsigned const windowLog = FIO_highbit64(windowSize) + ((windowSize & (windowSize - 1)) != 0); @@ -2218,13 +1984,13 @@ FIO_zstdErrorHelp(const FIO_prefs_t* const prefs, */ #define FIO_ERROR_FRAME_DECODING ((unsigned long long)(-2)) static unsigned long long -FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput, +FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, const FIO_prefs_t* const prefs, const char* srcFileName, U64 alreadyDecoded) /* for multi-frames streams */ { U64 frameSize = 0; - U32 storedSkips = 0; + io_job_t *writeJob = WritePool_acquireJob(ress->writeCtx); /* display last 20 characters only */ { size_t const srcFileLength = strlen(srcFileName); @@ -2234,17 +2000,13 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput, ZSTD_DCtx_reset(ress->dctx, ZSTD_reset_session_only); /* Header loading : ensures ZSTD_getFrameHeader() will succeed */ - { size_t const toDecode = ZSTD_FRAMEHEADERSIZE_MAX; - if (ress->srcBufferLoaded < toDecode) { - size_t const toRead = toDecode - ress->srcBufferLoaded; - void* const startPosition = (char*)ress->srcBuffer + ress->srcBufferLoaded; - ress->srcBufferLoaded += fread(startPosition, 1, toRead, finput); - } } + if (ress->readCtx->srcBufferLoaded < ZSTD_FRAMEHEADERSIZE_MAX) + ReadPool_fillBuffer(ress->readCtx, ZSTD_FRAMEHEADERSIZE_MAX); /* Main decompression Loop */ while (1) { - ZSTD_inBuffer inBuff = { ress->srcBuffer, ress->srcBufferLoaded, 0 }; - ZSTD_outBuffer outBuff= { ress->dstBuffer, ress->dstBufferSize, 0 }; + ZSTD_inBuffer inBuff = { ress->readCtx->srcBuffer, ress->readCtx->srcBufferLoaded, 0 }; + ZSTD_outBuffer outBuff= { writeJob->buffer, writeJob->bufferSize, 0 }; size_t const readSizeHint = ZSTD_decompressStream(ress->dctx, &outBuff, &inBuff); const int displayLevel = (g_display_prefs.progressSetting == FIO_ps_always) ? 1 : 2; UTIL_HumanReadableSize_t const hrs = UTIL_makeHumanReadableSize(alreadyDecoded+frameSize); @@ -2252,11 +2014,13 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput, DISPLAYLEVEL(1, "%s : Decoding error (36) : %s \n", srcFileName, ZSTD_getErrorName(readSizeHint)); FIO_zstdErrorHelp(prefs, ress, readSizeHint, srcFileName); + WritePool_releaseIoJob(writeJob); return FIO_ERROR_FRAME_DECODING; } /* Write block */ - storedSkips = FIO_fwriteSparse(ress->dstFile, ress->dstBuffer, outBuff.pos, prefs, storedSkips); + writeJob->usedBufferSize = outBuff.pos; + WritePool_enqueueAndReacquireWriteJob(&writeJob); frameSize += outBuff.pos; if (fCtx->nbFilesTotal > 1) { size_t srcFileNameSize = strlen(srcFileName); @@ -2273,28 +2037,24 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput, srcFileName, hrs.precision, hrs.value, hrs.suffix); } - if (inBuff.pos > 0) { - memmove(ress->srcBuffer, (char*)ress->srcBuffer + inBuff.pos, inBuff.size - inBuff.pos); - ress->srcBufferLoaded -= inBuff.pos; - } + ReadPool_consumeBytes(ress->readCtx, inBuff.pos); if (readSizeHint == 0) break; /* end of frame */ /* Fill input buffer */ - { size_t const toDecode = MIN(readSizeHint, ress->srcBufferSize); /* support large skippable frames */ - if (ress->srcBufferLoaded < toDecode) { - size_t const toRead = toDecode - ress->srcBufferLoaded; /* > 0 */ - void* const startPosition = (char*)ress->srcBuffer + ress->srcBufferLoaded; - size_t const readSize = fread(startPosition, 1, toRead, finput); + { size_t const toDecode = MIN(readSizeHint, ZSTD_DStreamInSize()); /* support large skippable frames */ + if (ress->readCtx->srcBufferLoaded < toDecode) { + size_t const readSize = ReadPool_fillBuffer(ress->readCtx, toDecode); if (readSize==0) { DISPLAYLEVEL(1, "%s : Read error (39) : premature end \n", srcFileName); + WritePool_releaseIoJob(writeJob); return FIO_ERROR_FRAME_DECODING; } - ress->srcBufferLoaded += readSize; } } } - FIO_fwriteSparseEnd(prefs, ress->dstFile, storedSkips); + WritePool_releaseIoJob(writeJob); + WritePool_sparseWriteEnd(ress->writeCtx); return frameSize; } @@ -2302,15 +2062,13 @@ FIO_decompressZstdFrame(FIO_ctx_t* const fCtx, dRess_t* ress, FILE* finput, #ifdef ZSTD_GZDECOMPRESS static unsigned long long -FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, - const FIO_prefs_t* const prefs, - const char* srcFileName) +FIO_decompressGzFrame(dRess_t* ress, const char* srcFileName) { unsigned long long outFileSize = 0; z_stream strm; int flush = Z_NO_FLUSH; int decodingError = 0; - unsigned storedSkips = 0; + io_job_t *writeJob = NULL; strm.zalloc = Z_NULL; strm.zfree = Z_NULL; @@ -2321,18 +2079,19 @@ FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, if (inflateInit2(&strm, 15 /* maxWindowLogSize */ + 16 /* gzip only */) != Z_OK) return FIO_ERROR_FRAME_DECODING; - strm.next_out = (Bytef*)ress->dstBuffer; - strm.avail_out = (uInt)ress->dstBufferSize; - strm.avail_in = (uInt)ress->srcBufferLoaded; - strm.next_in = (z_const unsigned char*)ress->srcBuffer; + writeJob = WritePool_acquireJob(ress->writeCtx); + strm.next_out = (Bytef*)writeJob->buffer; + strm.avail_out = (uInt)writeJob->bufferSize; + strm.avail_in = (uInt)ress->readCtx->srcBufferLoaded; + strm.next_in = (z_const unsigned char*)ress->readCtx->srcBuffer; for ( ; ; ) { int ret; if (strm.avail_in == 0) { - ress->srcBufferLoaded = fread(ress->srcBuffer, 1, ress->srcBufferSize, srcFile); - if (ress->srcBufferLoaded == 0) flush = Z_FINISH; - strm.next_in = (z_const unsigned char*)ress->srcBuffer; - strm.avail_in = (uInt)ress->srcBufferLoaded; + ReadPool_consumeAndRefill(ress->readCtx); + if (ress->readCtx->srcBufferLoaded == 0) flush = Z_FINISH; + strm.next_in = (z_const unsigned char*)ress->readCtx->srcBuffer; + strm.avail_in = (uInt)ress->readCtx->srcBufferLoaded; } ret = inflate(&strm, flush); if (ret == Z_BUF_ERROR) { @@ -2343,35 +2102,34 @@ FIO_decompressGzFrame(dRess_t* ress, FILE* srcFile, DISPLAYLEVEL(1, "zstd: %s: inflate error %d \n", srcFileName, ret); decodingError = 1; break; } - { size_t const decompBytes = ress->dstBufferSize - strm.avail_out; + { size_t const decompBytes = writeJob->bufferSize - strm.avail_out; if (decompBytes) { - storedSkips = FIO_fwriteSparse(ress->dstFile, ress->dstBuffer, decompBytes, prefs, storedSkips); + writeJob->usedBufferSize = decompBytes; + WritePool_enqueueAndReacquireWriteJob(&writeJob); outFileSize += decompBytes; - strm.next_out = (Bytef*)ress->dstBuffer; - strm.avail_out = (uInt)ress->dstBufferSize; + strm.next_out = (Bytef*)writeJob->buffer; + strm.avail_out = (uInt)writeJob->bufferSize; } } if (ret == Z_STREAM_END) break; } - if (strm.avail_in > 0) - memmove(ress->srcBuffer, strm.next_in, strm.avail_in); - ress->srcBufferLoaded = strm.avail_in; + ReadPool_consumeBytes(ress->readCtx, ress->readCtx->srcBufferLoaded - strm.avail_in); + if ( (inflateEnd(&strm) != Z_OK) /* release resources ; error detected */ && (decodingError==0) ) { DISPLAYLEVEL(1, "zstd: %s: inflateEnd error \n", srcFileName); decodingError = 1; } - FIO_fwriteSparseEnd(prefs, ress->dstFile, storedSkips); + WritePool_releaseIoJob(writeJob); + WritePool_sparseWriteEnd(ress->writeCtx); return decodingError ? FIO_ERROR_FRAME_DECODING : outFileSize; } #endif - #ifdef ZSTD_LZMADECOMPRESS static unsigned long long -FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile, - const FIO_prefs_t* const prefs, +FIO_decompressLzmaFrame(dRess_t* ress, const char* srcFileName, int plain_lzma) { unsigned long long outFileSize = 0; @@ -2379,7 +2137,7 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile, lzma_action action = LZMA_RUN; lzma_ret initRet; int decodingError = 0; - unsigned storedSkips = 0; + io_job_t *writeJob = NULL; strm.next_in = 0; strm.avail_in = 0; @@ -2396,18 +2154,19 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile, return FIO_ERROR_FRAME_DECODING; } - strm.next_out = (BYTE*)ress->dstBuffer; - strm.avail_out = ress->dstBufferSize; - strm.next_in = (BYTE const*)ress->srcBuffer; - strm.avail_in = ress->srcBufferLoaded; + writeJob = WritePool_acquireJob(ress->writeCtx); + strm.next_out = (Bytef*)writeJob->buffer; + strm.avail_out = (uInt)writeJob->bufferSize; + strm.next_in = (BYTE const*)ress->readCtx->srcBuffer; + strm.avail_in = ress->readCtx->srcBufferLoaded; for ( ; ; ) { lzma_ret ret; if (strm.avail_in == 0) { - ress->srcBufferLoaded = fread(ress->srcBuffer, 1, ress->srcBufferSize, srcFile); - if (ress->srcBufferLoaded == 0) action = LZMA_FINISH; - strm.next_in = (BYTE const*)ress->srcBuffer; - strm.avail_in = ress->srcBufferLoaded; + ReadPool_consumeAndRefill(ress->readCtx); + if (ress->readCtx->srcBufferLoaded == 0) action = LZMA_FINISH; + strm.next_in = (BYTE const*)ress->readCtx->srcBuffer; + strm.avail_in = ress->readCtx->srcBufferLoaded; } ret = lzma_code(&strm, action); @@ -2420,82 +2179,73 @@ FIO_decompressLzmaFrame(dRess_t* ress, FILE* srcFile, srcFileName, ret); decodingError = 1; break; } - { size_t const decompBytes = ress->dstBufferSize - strm.avail_out; + { size_t const decompBytes = writeJob->bufferSize - strm.avail_out; if (decompBytes) { - storedSkips = FIO_fwriteSparse(ress->dstFile, ress->dstBuffer, decompBytes, prefs, storedSkips); + writeJob->usedBufferSize = decompBytes; + WritePool_enqueueAndReacquireWriteJob(&writeJob); outFileSize += decompBytes; - strm.next_out = (BYTE*)ress->dstBuffer; - strm.avail_out = ress->dstBufferSize; + strm.next_out = (Bytef*)writeJob->buffer; + strm.avail_out = writeJob->bufferSize; } } if (ret == LZMA_STREAM_END) break; } - if (strm.avail_in > 0) - memmove(ress->srcBuffer, strm.next_in, strm.avail_in); - ress->srcBufferLoaded = strm.avail_in; + ReadPool_consumeBytes(ress->readCtx, ress->readCtx->srcBufferLoaded - strm.avail_in); lzma_end(&strm); - FIO_fwriteSparseEnd(prefs, ress->dstFile, storedSkips); + WritePool_releaseIoJob(writeJob); + WritePool_sparseWriteEnd(ress->writeCtx); return decodingError ? FIO_ERROR_FRAME_DECODING : outFileSize; } #endif #ifdef ZSTD_LZ4DECOMPRESS static unsigned long long -FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile, - const FIO_prefs_t* const prefs, - const char* srcFileName) +FIO_decompressLz4Frame(dRess_t* ress, const char* srcFileName) { unsigned long long filesize = 0; - LZ4F_errorCode_t nextToLoad; + LZ4F_errorCode_t nextToLoad = 4; LZ4F_decompressionContext_t dCtx; LZ4F_errorCode_t const errorCode = LZ4F_createDecompressionContext(&dCtx, LZ4F_VERSION); int decodingError = 0; - unsigned storedSkips = 0; + io_job_t *writeJob = NULL; if (LZ4F_isError(errorCode)) { DISPLAYLEVEL(1, "zstd: failed to create lz4 decompression context \n"); return FIO_ERROR_FRAME_DECODING; } - /* Init feed with magic number (already consumed from FILE* sFile) */ - { size_t inSize = 4; - size_t outSize= 0; - MEM_writeLE32(ress->srcBuffer, LZ4_MAGICNUMBER); - nextToLoad = LZ4F_decompress(dCtx, ress->dstBuffer, &outSize, ress->srcBuffer, &inSize, NULL); - if (LZ4F_isError(nextToLoad)) { - DISPLAYLEVEL(1, "zstd: %s: lz4 header error : %s \n", - srcFileName, LZ4F_getErrorName(nextToLoad)); - LZ4F_freeDecompressionContext(dCtx); - return FIO_ERROR_FRAME_DECODING; - } } + writeJob = WritePool_acquireJob(ress->writeCtx); /* Main Loop */ for (;nextToLoad;) { - size_t readSize; size_t pos = 0; - size_t decodedBytes = ress->dstBufferSize; + size_t decodedBytes = writeJob->bufferSize; + int fullBufferDecoded = 0; /* Read input */ - if (nextToLoad > ress->srcBufferSize) nextToLoad = ress->srcBufferSize; - readSize = fread(ress->srcBuffer, 1, nextToLoad, srcFile); - if (!readSize) break; /* reached end of file or stream */ + ReadPool_fillBuffer(ress->readCtx, nextToLoad); + if(!ress->readCtx->srcBufferLoaded) break; /* reached end of file */ - while ((pos < readSize) || (decodedBytes == ress->dstBufferSize)) { /* still to read, or still to flush */ + while ((pos < ress->readCtx->srcBufferLoaded) || fullBufferDecoded) { /* still to read, or still to flush */ /* Decode Input (at least partially) */ - size_t remaining = readSize - pos; - decodedBytes = ress->dstBufferSize; - nextToLoad = LZ4F_decompress(dCtx, ress->dstBuffer, &decodedBytes, (char*)(ress->srcBuffer)+pos, &remaining, NULL); + size_t remaining = ress->readCtx->srcBufferLoaded - pos; + decodedBytes = writeJob->bufferSize; + nextToLoad = LZ4F_decompress(dCtx, writeJob->buffer, &decodedBytes, (char*)(ress->readCtx->srcBuffer)+pos, + &remaining, NULL); if (LZ4F_isError(nextToLoad)) { DISPLAYLEVEL(1, "zstd: %s: lz4 decompression error : %s \n", srcFileName, LZ4F_getErrorName(nextToLoad)); decodingError = 1; nextToLoad = 0; break; } pos += remaining; + assert(pos <= ress->readCtx->srcBufferLoaded); + fullBufferDecoded = decodedBytes == writeJob->bufferSize; /* Write Block */ if (decodedBytes) { UTIL_HumanReadableSize_t hrs; - storedSkips = FIO_fwriteSparse(ress->dstFile, ress->dstBuffer, decodedBytes, prefs, storedSkips); + writeJob->usedBufferSize = decodedBytes; + WritePool_enqueueAndReacquireWriteJob(&writeJob); filesize += decodedBytes; hrs = UTIL_makeHumanReadableSize(filesize); DISPLAYUPDATE(2, "\rDecompressed : %.*f%s ", hrs.precision, hrs.value, hrs.suffix); @@ -2503,21 +2253,16 @@ FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile, if (!nextToLoad) break; } + ReadPool_consumeBytes(ress->readCtx, pos); } - /* can be out because readSize == 0, which could be an fread() error */ - if (ferror(srcFile)) { - DISPLAYLEVEL(1, "zstd: %s: read error \n", srcFileName); - decodingError=1; - } - if (nextToLoad!=0) { DISPLAYLEVEL(1, "zstd: %s: unfinished lz4 stream \n", srcFileName); decodingError=1; } LZ4F_freeDecompressionContext(dCtx); - ress->srcBufferLoaded = 0; /* LZ4F will reach exact frame boundary */ - FIO_fwriteSparseEnd(prefs, ress->dstFile, storedSkips); + WritePool_releaseIoJob(writeJob); + WritePool_sparseWriteEnd(ress->writeCtx); return decodingError ? FIO_ERROR_FRAME_DECODING : filesize; } @@ -2532,23 +2277,20 @@ FIO_decompressLz4Frame(dRess_t* ress, FILE* srcFile, * 1 : error */ static int FIO_decompressFrames(FIO_ctx_t* const fCtx, - dRess_t ress, FILE* srcFile, - const FIO_prefs_t* const prefs, + dRess_t ress, const FIO_prefs_t* const prefs, const char* dstFileName, const char* srcFileName) { unsigned readSomething = 0; unsigned long long filesize = 0; - assert(srcFile != NULL); /* for each frame */ for ( ; ; ) { /* check magic number -> version */ size_t const toRead = 4; - const BYTE* const buf = (const BYTE*)ress.srcBuffer; - if (ress.srcBufferLoaded < toRead) /* load up to 4 bytes for header */ - ress.srcBufferLoaded += fread((char*)ress.srcBuffer + ress.srcBufferLoaded, - (size_t)1, toRead - ress.srcBufferLoaded, srcFile); - if (ress.srcBufferLoaded==0) { + const BYTE* buf; + ReadPool_fillBuffer(ress.readCtx, toRead); + buf = (const BYTE*)ress.readCtx->srcBuffer; + if (ress.readCtx->srcBufferLoaded==0) { if (readSomething==0) { /* srcFile is empty (which is invalid) */ DISPLAYLEVEL(1, "zstd: %s: unexpected end of file \n", srcFileName); return 1; @@ -2556,17 +2298,17 @@ static int FIO_decompressFrames(FIO_ctx_t* const fCtx, break; /* no more input */ } readSomething = 1; /* there is at least 1 byte in srcFile */ - if (ress.srcBufferLoaded < toRead) { + if (ress.readCtx->srcBufferLoaded < toRead) { DISPLAYLEVEL(1, "zstd: %s: unknown header \n", srcFileName); return 1; } - if (ZSTD_isFrame(buf, ress.srcBufferLoaded)) { - unsigned long long const frameSize = FIO_decompressZstdFrame(fCtx, &ress, srcFile, prefs, srcFileName, filesize); + if (ZSTD_isFrame(buf, ress.readCtx->srcBufferLoaded)) { + unsigned long long const frameSize = FIO_decompressZstdFrame(fCtx, &ress, prefs, srcFileName, filesize); if (frameSize == FIO_ERROR_FRAME_DECODING) return 1; filesize += frameSize; } else if (buf[0] == 31 && buf[1] == 139) { /* gz magic number */ #ifdef ZSTD_GZDECOMPRESS - unsigned long long const frameSize = FIO_decompressGzFrame(&ress, srcFile, prefs, srcFileName); + unsigned long long const frameSize = FIO_decompressGzFrame(&ress, srcFileName); if (frameSize == FIO_ERROR_FRAME_DECODING) return 1; filesize += frameSize; #else @@ -2576,7 +2318,7 @@ static int FIO_decompressFrames(FIO_ctx_t* const fCtx, } else if ((buf[0] == 0xFD && buf[1] == 0x37) /* xz magic number */ || (buf[0] == 0x5D && buf[1] == 0x00)) { /* lzma header (no magic number) */ #ifdef ZSTD_LZMADECOMPRESS - unsigned long long const frameSize = FIO_decompressLzmaFrame(&ress, srcFile, prefs, srcFileName, buf[0] != 0xFD); + unsigned long long const frameSize = FIO_decompressLzmaFrame(&ress, srcFileName, buf[0] != 0xFD); if (frameSize == FIO_ERROR_FRAME_DECODING) return 1; filesize += frameSize; #else @@ -2585,7 +2327,7 @@ static int FIO_decompressFrames(FIO_ctx_t* const fCtx, #endif } else if (MEM_readLE32(buf) == LZ4_MAGICNUMBER) { #ifdef ZSTD_LZ4DECOMPRESS - unsigned long long const frameSize = FIO_decompressLz4Frame(&ress, srcFile, prefs, srcFileName); + unsigned long long const frameSize = FIO_decompressLz4Frame(&ress, srcFileName); if (frameSize == FIO_ERROR_FRAME_DECODING) return 1; filesize += frameSize; #else @@ -2593,10 +2335,7 @@ static int FIO_decompressFrames(FIO_ctx_t* const fCtx, return 1; #endif } else if ((prefs->overwrite) && !strcmp (dstFileName, stdoutmark)) { /* pass-through mode */ - return FIO_passThrough(prefs, - ress.dstFile, srcFile, - ress.srcBuffer, ress.srcBufferSize, - ress.srcBufferLoaded); + return FIO_passThrough(&ress); } else { DISPLAYLEVEL(1, "zstd: %s: unsupported format \n", srcFileName); return 1; @@ -2616,15 +2355,14 @@ static int FIO_decompressFrames(FIO_ctx_t* const fCtx, } /** FIO_decompressDstFile() : - open `dstFileName`, - or path-through if ress.dstFile is already != 0, + open `dstFileName`, or pass-through if writeCtx's file is already != 0, then start decompression process (FIO_decompressFrames()). @return : 0 : OK 1 : operation aborted */ static int FIO_decompressDstFile(FIO_ctx_t* const fCtx, FIO_prefs_t* const prefs, - dRess_t ress, FILE* srcFile, + dRess_t ress, const char* dstFileName, const char* srcFileName) { int result; @@ -2632,7 +2370,8 @@ static int FIO_decompressDstFile(FIO_ctx_t* const fCtx, int releaseDstFile = 0; int transferMTime = 0; - if ((ress.dstFile == NULL) && (prefs->testMode==0)) { + if ((WritePool_getFile(ress.writeCtx) == NULL) && (prefs->testMode == 0)) { + FILE *dstFile; int dstFilePermissions = DEFAULT_FILE_PERMISSIONS; if ( strcmp(srcFileName, stdinmark) /* special case : don't transfer permissions from stdin */ && strcmp(dstFileName, stdoutmark) @@ -2644,8 +2383,9 @@ static int FIO_decompressDstFile(FIO_ctx_t* const fCtx, releaseDstFile = 1; - ress.dstFile = FIO_openDstFile(fCtx, prefs, srcFileName, dstFileName, dstFilePermissions); - if (ress.dstFile==NULL) return 1; + dstFile = FIO_openDstFile(fCtx, prefs, srcFileName, dstFileName, dstFilePermissions); + if (dstFile==NULL) return 1; + WritePool_setFile(ress.writeCtx, dstFile); /* Must only be added after FIO_openDstFile() succeeds. * Otherwise we may delete the destination file if it already exists, @@ -2654,13 +2394,11 @@ static int FIO_decompressDstFile(FIO_ctx_t* const fCtx, addHandler(dstFileName); } - result = FIO_decompressFrames(fCtx, ress, srcFile, prefs, dstFileName, srcFileName); + result = FIO_decompressFrames(fCtx, ress, prefs, dstFileName, srcFileName); if (releaseDstFile) { - FILE* const dstFile = ress.dstFile; clearHandler(); - ress.dstFile = NULL; - if (fclose(dstFile)) { + if (WritePool_closeFile(ress.writeCtx)) { DISPLAYLEVEL(1, "zstd: %s: %s \n", dstFileName, strerror(errno)); result = 1; } @@ -2697,9 +2435,11 @@ static int FIO_decompressSrcFile(FIO_ctx_t* const fCtx, FIO_prefs_t* const prefs srcFile = FIO_openSrcFile(prefs, srcFileName); if (srcFile==NULL) return 1; - ress.srcBufferLoaded = 0; + ReadPool_setFile(ress.readCtx, srcFile); + + result = FIO_decompressDstFile(fCtx, prefs, ress, dstFileName, srcFileName); - result = FIO_decompressDstFile(fCtx, prefs, ress, srcFile, dstFileName, srcFileName); + ReadPool_setFile(ress.readCtx, NULL); /* Close file */ if (fclose(srcFile)) { @@ -2874,15 +2614,16 @@ FIO_decompressMultipleFilenames(FIO_ctx_t* const fCtx, return 1; } if (!prefs->testMode) { - ress.dstFile = FIO_openDstFile(fCtx, prefs, NULL, outFileName, DEFAULT_FILE_PERMISSIONS); - if (ress.dstFile == 0) EXM_THROW(19, "cannot open %s", outFileName); + FILE* dstFile = FIO_openDstFile(fCtx, prefs, NULL, outFileName, DEFAULT_FILE_PERMISSIONS); + if (dstFile == 0) EXM_THROW(19, "cannot open %s", outFileName); + WritePool_setFile(ress.writeCtx, dstFile); } for (; fCtx->currFileIdx < fCtx->nbFilesTotal; fCtx->currFileIdx++) { status = FIO_decompressSrcFile(fCtx, prefs, ress, outFileName, srcNamesTable[fCtx->currFileIdx]); if (!status) fCtx->nbFilesProcessed++; error |= status; } - if ((!prefs->testMode) && (fclose(ress.dstFile))) + if ((!prefs->testMode) && (WritePool_closeFile(ress.writeCtx))) EXM_THROW(72, "Write error : %s : cannot properly close output file", strerror(errno)); } else { diff --git a/programs/fileio.h b/programs/fileio.h index 61094db83cb..9d6ebb1accb 100644 --- a/programs/fileio.h +++ b/programs/fileio.h @@ -13,6 +13,7 @@ #define FILEIO_H_23981798732 #define ZSTD_STATIC_LINKING_ONLY /* ZSTD_compressionParameters */ +#include "fileio_types.h" #include "../lib/zstd.h" /* ZSTD_* */ #if defined (__cplusplus) @@ -53,10 +54,6 @@ extern "C" { /*-************************************* * Types ***************************************/ -typedef enum { FIO_zstdCompression, FIO_gzipCompression, FIO_xzCompression, FIO_lzmaCompression, FIO_lz4Compression } FIO_compressionType_t; - -typedef struct FIO_prefs_s FIO_prefs_t; - FIO_prefs_t* FIO_createPreferences(void); void FIO_freePreferences(FIO_prefs_t* const prefs); @@ -66,9 +63,6 @@ typedef struct FIO_ctx_s FIO_ctx_t; FIO_ctx_t* FIO_createContext(void); void FIO_freeContext(FIO_ctx_t* const fCtx); -typedef struct FIO_display_prefs_s FIO_display_prefs_t; - -typedef enum { FIO_ps_auto, FIO_ps_never, FIO_ps_always } FIO_progressSetting_e; /*-************************************* * Parameters @@ -109,6 +103,7 @@ void FIO_setAllowBlockDevices(FIO_prefs_t* const prefs, int allowBlockDevices); void FIO_setPatchFromMode(FIO_prefs_t* const prefs, int value); void FIO_setContentSize(FIO_prefs_t* const prefs, int value); void FIO_displayCompressionParameters(const FIO_prefs_t* prefs); +void FIO_setAsyncIOFlag(FIO_prefs_t* const prefs, unsigned value); /* FIO_ctx_t functions */ void FIO_setNbFilesTotal(FIO_ctx_t* const fCtx, int value); diff --git a/programs/fileio_asycio.c b/programs/fileio_asycio.c new file mode 100644 index 00000000000..86abb1bea39 --- /dev/null +++ b/programs/fileio_asycio.c @@ -0,0 +1,577 @@ +#include "platform.h" +#include /* fprintf, open, fdopen, fread, _fileno, stdin, stdout */ +#include /* malloc, free */ +#include +#include /* errno */ + +#if defined (_MSC_VER) +# include +# include +#endif + +#include "fileio_asycio.h" +#include "fileio_common.h" + +/* ********************************************************************** + * Sparse write + ************************************************************************/ + +/** FIO_fwriteSparse() : +* @return : storedSkips, +* argument for next call to FIO_fwriteSparse() or FIO_fwriteSparseEnd() */ +static unsigned +FIO_fwriteSparse(FILE* file, + const void* buffer, size_t bufferSize, + const FIO_prefs_t* const prefs, + unsigned storedSkips) +{ + const size_t* const bufferT = (const size_t*)buffer; /* Buffer is supposed malloc'ed, hence aligned on size_t */ + size_t bufferSizeT = bufferSize / sizeof(size_t); + const size_t* const bufferTEnd = bufferT + bufferSizeT; + const size_t* ptrT = bufferT; + static const size_t segmentSizeT = (32 KB) / sizeof(size_t); /* check every 32 KB */ + + if (prefs->testMode) return 0; /* do not output anything in test mode */ + + if (!prefs->sparseFileSupport) { /* normal write */ + size_t const sizeCheck = fwrite(buffer, 1, bufferSize, file); + if (sizeCheck != bufferSize) + EXM_THROW(70, "Write error : cannot write decoded block : %s", + strerror(errno)); + return 0; + } + + /* avoid int overflow */ + if (storedSkips > 1 GB) { + if (LONG_SEEK(file, 1 GB, SEEK_CUR) != 0) + EXM_THROW(91, "1 GB skip error (sparse file support)"); + storedSkips -= 1 GB; + } + + while (ptrT < bufferTEnd) { + size_t nb0T; + + /* adjust last segment if < 32 KB */ + size_t seg0SizeT = segmentSizeT; + if (seg0SizeT > bufferSizeT) seg0SizeT = bufferSizeT; + bufferSizeT -= seg0SizeT; + + /* count leading zeroes */ + for (nb0T=0; (nb0T < seg0SizeT) && (ptrT[nb0T] == 0); nb0T++) ; + storedSkips += (unsigned)(nb0T * sizeof(size_t)); + + if (nb0T != seg0SizeT) { /* not all 0s */ + size_t const nbNon0ST = seg0SizeT - nb0T; + /* skip leading zeros */ + if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0) + EXM_THROW(92, "Sparse skip error ; try --no-sparse"); + storedSkips = 0; + /* write the rest */ + if (fwrite(ptrT + nb0T, sizeof(size_t), nbNon0ST, file) != nbNon0ST) + EXM_THROW(93, "Write error : cannot write decoded block : %s", + strerror(errno)); + } + ptrT += seg0SizeT; + } + + { static size_t const maskT = sizeof(size_t)-1; + if (bufferSize & maskT) { + /* size not multiple of sizeof(size_t) : implies end of block */ + const char* const restStart = (const char*)bufferTEnd; + const char* restPtr = restStart; + const char* const restEnd = (const char*)buffer + bufferSize; + assert(restEnd > restStart && restEnd < restStart + sizeof(size_t)); + for ( ; (restPtr < restEnd) && (*restPtr == 0); restPtr++) ; + storedSkips += (unsigned) (restPtr - restStart); + if (restPtr != restEnd) { + /* not all remaining bytes are 0 */ + size_t const restSize = (size_t)(restEnd - restPtr); + if (LONG_SEEK(file, storedSkips, SEEK_CUR) != 0) + EXM_THROW(92, "Sparse skip error ; try --no-sparse"); + if (fwrite(restPtr, 1, restSize, file) != restSize) + EXM_THROW(95, "Write error : cannot write end of decoded block : %s", + strerror(errno)); + storedSkips = 0; + } } } + + return storedSkips; +} + +static void +FIO_fwriteSparseEnd(const FIO_prefs_t* const prefs, FILE* file, unsigned storedSkips) +{ + if (prefs->testMode) assert(storedSkips == 0); + if (storedSkips>0) { + assert(prefs->sparseFileSupport > 0); /* storedSkips>0 implies sparse support is enabled */ + (void)prefs; /* assert can be disabled, in which case prefs becomes unused */ + if (LONG_SEEK(file, storedSkips-1, SEEK_CUR) != 0) + EXM_THROW(69, "Final skip error (sparse file support)"); + /* last zero must be explicitly written, + * so that skipped ones get implicitly translated as zero by FS */ + { const char lastZeroByte[1] = { 0 }; + if (fwrite(lastZeroByte, 1, 1, file) != 1) + EXM_THROW(69, "Write error : cannot write last zero : %s", strerror(errno)); + } } +} + + +/* ********************************************************************** + * AsyncIO functionality + ************************************************************************/ + +/* *********************************** + * General IoPool implementation + *************************************/ + +static io_job_t *IoPool_createIoJob(io_pool_ctx_t *ctx, size_t bufferSize) { + void *buffer; + io_job_t *job; + job = (io_job_t*) malloc(sizeof(io_job_t)); + buffer = malloc(bufferSize); + if(!job || !buffer) + EXM_THROW(101, "Allocation error : not enough memory"); + job->buffer = buffer; + job->bufferSize = bufferSize; + job->usedBufferSize = 0; + job->file = NULL; + job->ctx = ctx; + job->offset = 0; + return job; +} + + +/* IoPool_createThreadPool: + * Creates a thread pool and a mutex for threaded IO pool. + * Displays warning if asyncio is requested but MT isn't available. */ +static void IoPool_createThreadPool(io_pool_ctx_t *ctx, const FIO_prefs_t *prefs) { + ctx->threadPool = NULL; + if(prefs->asyncIO) { + if (ZSTD_pthread_mutex_init(&ctx->ioJobsMutex, NULL)) + EXM_THROW(102,"Failed creating write availableJobs mutex"); + /* We want MAX_IO_JOBS-2 queue items because we need to always have 1 free buffer to + * decompress into and 1 buffer that's actively written to disk and owned by the writing thread. */ + assert(MAX_IO_JOBS >= 2); + ctx->threadPool = POOL_create(1, MAX_IO_JOBS - 2); + if (!ctx->threadPool) + EXM_THROW(104, "Failed creating writer thread pool"); + } +} + +/* IoPool_init: + * Allocates and sets and a new write pool including its included availableJobs. */ +static void IoPool_init(io_pool_ctx_t *ctx, FIO_prefs_t* const prefs, POOL_function poolFunction, size_t bufferSize) { + int i; + IoPool_createThreadPool(ctx, prefs); + ctx->prefs = prefs; + ctx->poolFunction = poolFunction; + ctx->totalIoJobs = ctx->threadPool ? MAX_IO_JOBS : 1; + ctx->availableJobsCount = ctx->totalIoJobs; + for(i=0; i < ctx->availableJobsCount; i++) { + ctx->availableJobs[i] = IoPool_createIoJob(ctx, bufferSize); + } + ctx->file = NULL; +} + + +/* IoPool_releaseIoJob: + * Releases an acquired job back to the pool. Doesn't execute the job. */ +static void IoPool_releaseIoJob(io_job_t *job) { + io_pool_ctx_t *ctx = (io_pool_ctx_t *) job->ctx; + if(ctx->threadPool) { + ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex); + assert(ctx->availableJobsCount < MAX_IO_JOBS); + ctx->availableJobs[ctx->availableJobsCount++] = job; + ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex); + } else { + assert(ctx->availableJobsCount == 0); + ctx->availableJobsCount++; + } +} + +/* IoPool_join: + * Waits for all tasks in the pool to finish executing. */ +static void IoPool_join(io_pool_ctx_t* ctx) { + if(ctx->threadPool) + POOL_joinJobs(ctx->threadPool); +} + +/* IoPool_free: + * Release a previously allocated write thread pool. Makes sure all takss are done and released. */ +static void IoPool_destroy(io_pool_ctx_t* ctx) { + int i; + if(ctx->threadPool) { + /* Make sure we finish all tasks and then free the resources */ + IoPool_join(ctx); + /* Make sure we are not leaking availableJobs */ + assert(ctx->availableJobsCount == ctx->totalIoJobs); + POOL_free(ctx->threadPool); + ZSTD_pthread_mutex_destroy(&ctx->ioJobsMutex); + } + assert(ctx->file == NULL); + for(i=0; iavailableJobsCount; i++) { + io_job_t* job = (io_job_t*) ctx->availableJobs[i]; + free(job->buffer); + free(job); + } +} + +/* IoPool_acquireJob: + * Returns an available io job to be used for a future io. */ +static io_job_t* IoPool_acquireJob(io_pool_ctx_t *ctx) { + io_job_t *job; + assert(ctx->file != NULL || ctx->prefs->testMode); + if(ctx->threadPool) { + ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex); + assert(ctx->availableJobsCount > 0); + job = (io_job_t*) ctx->availableJobs[--ctx->availableJobsCount]; + ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex); + } else { + assert(ctx->availableJobsCount == 1); + ctx->availableJobsCount--; + job = (io_job_t*)ctx->availableJobs[0]; + } + job->usedBufferSize = 0; + job->file = ctx->file; + job->offset = 0; + return job; +} + + +/* IoPool_setFile: + * Sets the destination file for future files in the pool. + * Requires completion of all queues write jobs and release of all otherwise acquired jobs. + * Also requires ending of sparse write if a previous file was used in sparse mode. */ +static void IoPool_setFile(io_pool_ctx_t *ctx, FILE* file) { + assert(ctx!=NULL); + IoPool_join(ctx); + assert(ctx->availableJobsCount == ctx->totalIoJobs); + ctx->file = file; +} + +static FILE* IoPool_getFile(io_pool_ctx_t *ctx) { + return ctx->file; +} + +/* IoPool_enqueueJob: + * Enqueues an io job for execution. + * The queued job shouldn't be used directly after queueing it. */ +static void IoPool_enqueueJob(io_job_t *job) { + io_pool_ctx_t* ctx = (io_pool_ctx_t *)job->ctx; + if(ctx->threadPool) + POOL_add(ctx->threadPool, ctx->poolFunction, job); + else + ctx->poolFunction(job); +} + +/* *********************************** + * WritePool implementation + *************************************/ + +/* WritePool_acquireJob: + * Returns an available write job to be used for a future write. */ +io_job_t* WritePool_acquireJob(write_pool_ctx_t *ctx) { + return IoPool_acquireJob(&ctx->base); +} + +/* WritePool_enqueueAndReacquireWriteJob: + * Queues a write job for execution and acquires a new one. + * After execution `job`'s pointed value would change to the newly acquired job. + * Make sure to set `usedBufferSize` to the wanted length before call. + * The queued job shouldn't be used directly after queueing it. */ +void WritePool_enqueueAndReacquireWriteJob(io_job_t **job) { + IoPool_enqueueJob(*job); + *job = IoPool_acquireJob((io_pool_ctx_t *)(*job)->ctx); +} + +/* WritePool_sparseWriteEnd: + * Ends sparse writes to the current file. + * Blocks on completion of all current write jobs before executing. */ +void WritePool_sparseWriteEnd(write_pool_ctx_t *ctx) { + assert(ctx != NULL); + if(ctx->base.threadPool) + POOL_joinJobs(ctx->base.threadPool); + FIO_fwriteSparseEnd(ctx->base.prefs, ctx->base.file, ctx->storedSkips); + ctx->storedSkips = 0; +} + +/* WritePool_setFile: + * Sets the destination file for future writes in the pool. + * Requires completion of all queues write jobs and release of all otherwise acquired jobs. + * Also requires ending of sparse write if a previous file was used in sparse mode. */ +void WritePool_setFile(write_pool_ctx_t *ctx, FILE* file) { + IoPool_setFile(&ctx->base, file); + assert(ctx->storedSkips == 0); +} + +/* WritePool_getFile: + * Returns the file the writePool is currently set to write to. */ +FILE* WritePool_getFile(write_pool_ctx_t *ctx) { + return IoPool_getFile(&ctx->base); +} + +/* WritePool_releaseIoJob: + * Releases an acquired job back to the pool. Doesn't execute the job. */ +void WritePool_releaseIoJob(io_job_t *job) { + IoPool_releaseIoJob(job); +} + +/* WritePool_closeFile: + * Ends sparse write and closes the writePool's current file and sets the file to NULL. + * Requires completion of all queues write jobs and release of all otherwise acquired jobs. */ +int WritePool_closeFile(write_pool_ctx_t *ctx) { + FILE *dstFile = ctx->base.file; + assert(dstFile!=NULL || ctx->base.prefs->testMode!=0); + WritePool_sparseWriteEnd(ctx); + IoPool_setFile(&ctx->base, NULL); + return fclose(dstFile); +} + +/* WritePool_executeWriteJob: + * Executes a write job synchronously. Can be used as a function for a thread pool. */ +static void WritePool_executeWriteJob(void* opaque){ + io_job_t* job = (io_job_t*) opaque; + write_pool_ctx_t* ctx = (write_pool_ctx_t*) job->ctx; + ctx->storedSkips = FIO_fwriteSparse(job->file, job->buffer, job->usedBufferSize, ctx->base.prefs, ctx->storedSkips); + IoPool_releaseIoJob(job); +} + +/* WritePool_create: + * Allocates and sets and a new write pool including its included jobs. */ +write_pool_ctx_t* WritePool_create(FIO_prefs_t* const prefs, size_t bufferSize) { + write_pool_ctx_t* ctx = (write_pool_ctx_t*) malloc(sizeof(write_pool_ctx_t)); + if(!ctx) EXM_THROW(100, "Allocation error : not enough memory"); + IoPool_init(&ctx->base, prefs, WritePool_executeWriteJob, bufferSize); + ctx->storedSkips = 0; + return ctx; +} + +/* WritePool_free: + * Frees and releases a writePool and its resources. Closes destination file if needs to. */ +void WritePool_free(write_pool_ctx_t* ctx) { + /* Make sure we finish all tasks and then free the resources */ + if(WritePool_getFile(ctx)) + WritePool_closeFile(ctx); + IoPool_destroy(&ctx->base); + assert(ctx->storedSkips==0); + free(ctx); +} + + +/* *********************************** + * ReadPool implementation + *************************************/ +static void ReadPool_releaseAllCompletedJobs(read_pool_ctx_t* ctx) { + int i; + for(i=0; icompletedJobsCount; i++) { + io_job_t* job = (io_job_t*) ctx->completedJobs[i]; + IoPool_releaseIoJob(job); + } + ctx->completedJobsCount = 0; +} + +static void ReadPool_addJobToCompleted(io_job_t *job) { + read_pool_ctx_t *ctx = (read_pool_ctx_t *)job->ctx; + if(ctx->base.threadPool) + ZSTD_pthread_mutex_lock(&ctx->base.ioJobsMutex); + assert(ctx->completedJobsCount < MAX_IO_JOBS); + ctx->completedJobs[ctx->completedJobsCount++] = job; + if(ctx->base.threadPool) { + ZSTD_pthread_cond_signal(&ctx->jobCompletedCond); + ZSTD_pthread_mutex_unlock(&ctx->base.ioJobsMutex); + } +} + +/* ReadPool_findNextWaitingOffsetCompletedJob: + * Looks through the completed jobs for a job matching the waitingOnOffset and returns it, + * if job wasn't found returns NULL. + * IMPORTANT: assumes ioJobsMutex is locked. */ +static io_job_t* ReadPool_findNextWaitingOffsetCompletedJob(read_pool_ctx_t *ctx) { + io_job_t *job = NULL; + int i; + for (i=0; icompletedJobsCount; i++) { + job = (io_job_t *) ctx->completedJobs[i]; + if (job->offset == ctx->waitingOnOffset) { + ctx->completedJobs[i] = ctx->completedJobs[--ctx->completedJobsCount]; + return job; + } + } + return NULL; +} + +/* ReadPool_getNextCompletedJob: + * Returns a completed io_job_t for the next read in line based on waitingOnOffset and advances waitingOnOffset. + * Would block. */ +static io_job_t* ReadPool_getNextCompletedJob(read_pool_ctx_t *ctx) { + io_job_t *job = NULL; + if(ctx->base.threadPool) + ZSTD_pthread_mutex_lock(&ctx->base.ioJobsMutex); + + job = ReadPool_findNextWaitingOffsetCompletedJob(ctx); + + /* As long as we didn't find the job matching the next read, and we have some reads in flight continue waiting */ + while (!job && (ctx->base.availableJobsCount + ctx->completedJobsCount < ctx->base.totalIoJobs)) { + assert(ctx->base.threadPool != NULL); /* we shouldn't be here if we work in sync mode */ + ZSTD_pthread_cond_wait(&ctx->jobCompletedCond, &ctx->base.ioJobsMutex); + job = ReadPool_findNextWaitingOffsetCompletedJob(ctx); + } + + if(job) { + assert(job->offset == ctx->waitingOnOffset); + ctx->waitingOnOffset += job->usedBufferSize; + } + + if(ctx->base.threadPool) + ZSTD_pthread_mutex_unlock(&ctx->base.ioJobsMutex); + return job; +} + + +/* ReadPool_executeReadJob: + * Executes a read job synchronously. Can be used as a function for a thread pool. */ +static void ReadPool_executeReadJob(void* opaque){ + io_job_t* job = (io_job_t*) opaque; + read_pool_ctx_t* ctx = (read_pool_ctx_t *)job->ctx; + if(ctx->reachedEof) { + job->usedBufferSize = 0; + ReadPool_addJobToCompleted(job); + return; + } + job->usedBufferSize = fread(job->buffer, 1, job->bufferSize, job->file); + if(job->usedBufferSize < job->bufferSize) { + if(ferror(job->file)) { + EXM_THROW(37, "Read error"); + } else if(feof(job->file)) { + ctx->reachedEof = 1; + } else + EXM_THROW(37, "Unexpected short read"); + } + ReadPool_addJobToCompleted(job); +} + +static void ReadPool_enqueueRead(read_pool_ctx_t *ctx) { + io_job_t *job = IoPool_acquireJob(&ctx->base); + job->offset = ctx->nextReadOffset; + ctx->nextReadOffset += job->bufferSize; + IoPool_enqueueJob(job); +} + +static void ReadPool_startReading(read_pool_ctx_t *ctx) { + int i; + for (i = 0; i < ctx->base.availableJobsCount; i++) { + ReadPool_enqueueRead(ctx); + } +} + +/* ReadPool_setFile: + * Sets the source file for future read in the pool. Initiates reading immediately if file is not NULL. + * Waits for all current enqueued tasks to complete if a previous file was set. */ +void ReadPool_setFile(read_pool_ctx_t *ctx, FILE* file) { + assert(ctx!=NULL); + IoPool_join(&ctx->base); + ReadPool_releaseAllCompletedJobs(ctx); + IoPool_setFile(&ctx->base, file); + ctx->nextReadOffset = 0; + ctx->waitingOnOffset = 0; + ctx->srcBuffer = ctx->srcBufferBase; + ctx->reachedEof = 0; + if(file != NULL) + ReadPool_startReading(ctx); +} + +/* ReadPool_create: + * Allocates and sets and a new readPool including its included jobs. + * bufferSize should be set to the maximal buffer we want to read at a time, will also be used + * as our basic read size. */ +read_pool_ctx_t* ReadPool_create(FIO_prefs_t* const prefs, size_t bufferSize) { + read_pool_ctx_t* ctx = (read_pool_ctx_t*) malloc(sizeof(read_pool_ctx_t)); + if(!ctx) EXM_THROW(100, "Allocation error : not enough memory"); + IoPool_init(&ctx->base, prefs, ReadPool_executeReadJob, bufferSize); + + ctx->srcBufferBaseSize = 2 * bufferSize; + ctx->srcBufferBase = (U8*) malloc(ctx->srcBufferBaseSize); + ctx->srcBuffer = ctx->srcBufferBase; + ctx->srcBufferLoaded = 0; + ctx->completedJobsCount = 0; + + if(ctx->base.threadPool) + if (ZSTD_pthread_cond_init(&ctx->jobCompletedCond, NULL)) + EXM_THROW(103,"Failed creating write jobCompletedCond mutex"); + + return ctx; +} + +/* ReadPool_free: + * Frees and releases a readPool and its resources. Closes source file. */ +void ReadPool_free(read_pool_ctx_t* ctx) { + if(ReadPool_getFile(ctx)) + ReadPool_closeFile(ctx); + if(ctx->base.threadPool) + ZSTD_pthread_cond_destroy(&ctx->jobCompletedCond); + IoPool_destroy(&ctx->base); + free(ctx->srcBufferBase); + free(ctx); +} + +/* ReadPool_consumeBytes: + * Consumes byes from srcBuffer's beginning and updates srcBufferLoaded accordingly. */ +void ReadPool_consumeBytes(read_pool_ctx_t *ctx, size_t n) { + assert(n <= ctx->srcBufferLoaded); + assert(ctx->srcBuffer + n <= ctx->srcBufferBase + ctx->srcBufferBaseSize); + ctx->srcBufferLoaded -= n; + ctx->srcBuffer += n; +} + +/* ReadPool_fillBuffer: + * Makes sure buffer has at least n bytes loaded (as long as n is not bigger than the initalized bufferSize). + * Returns if srcBuffer has at least n bytes loaded or if we've reached the end of the file. + * Return value is the number of bytes added to the buffer. + * Note that srcBuffer might have up to 2 times bufferSize bytes. */ +size_t ReadPool_fillBuffer(read_pool_ctx_t *ctx, size_t n) { + io_job_t *job; + size_t srcBufferOffsetFromBase; + size_t srcBufferRemainingSpace; + size_t bytesRead = 0; + assert(n <= ctx->srcBufferBaseSize/2); + while (ctx->srcBufferLoaded < n) { + job = ReadPool_getNextCompletedJob(ctx); + if(job == NULL) + break; + srcBufferOffsetFromBase = ctx->srcBuffer - ctx->srcBufferBase; + srcBufferRemainingSpace = ctx->srcBufferBaseSize - (srcBufferOffsetFromBase + ctx->srcBufferLoaded); + if(job->usedBufferSize > srcBufferRemainingSpace) { + memmove(ctx->srcBufferBase, ctx->srcBuffer, ctx->srcBufferLoaded); + ctx->srcBuffer = ctx->srcBufferBase; + } + memcpy(ctx->srcBuffer + ctx->srcBufferLoaded, job->buffer, job->usedBufferSize); + bytesRead += job->usedBufferSize; + ctx->srcBufferLoaded += job->usedBufferSize; + if(job->usedBufferSize < job->bufferSize) { + IoPool_releaseIoJob(job); + break; + } + IoPool_releaseIoJob(job); + ReadPool_enqueueRead(ctx); + } + return bytesRead; +} + +/* ReadPool_consumeAndRefill: + * Consumes the current buffer and refills it with bufferSize bytes. */ +size_t ReadPool_consumeAndRefill(read_pool_ctx_t *ctx) { + ReadPool_consumeBytes(ctx, ctx->srcBufferLoaded); + return ReadPool_fillBuffer(ctx, ctx->srcBufferBaseSize/2); +} + +/* ReadPool_getFile: + * Returns the current file set for the read pool. */ +FILE* ReadPool_getFile(read_pool_ctx_t *ctx) { + return IoPool_getFile(&ctx->base); +} + +/* ReadPool_closeFile: + * Closes the current set file. Waits for all current enqueued tasks to complete and resets state. */ +int ReadPool_closeFile(read_pool_ctx_t *ctx) { + FILE* file = ReadPool_getFile(ctx); + ReadPool_setFile(ctx, NULL); + return fclose(file); +} diff --git a/programs/fileio_asycio.h b/programs/fileio_asycio.h new file mode 100644 index 00000000000..88d55c0ffb9 --- /dev/null +++ b/programs/fileio_asycio.h @@ -0,0 +1,163 @@ +#ifndef ZSTD_FILEIO_ASYNCIO_H +#define ZSTD_FILEIO_ASYNCIO_H + +#if defined (__cplusplus) +extern "C" { +#endif + +#include "../lib/common/mem.h" /* U32, U64 */ +#include "fileio_types.h" +#include "platform.h" +#include "util.h" +#include "../lib/common/pool.h" +#include "../lib/common/threading.h" + +#define MAX_IO_JOBS (10) + +typedef struct { + /* These struct fields should be set only on creation and not changed afterwards */ + POOL_ctx* threadPool; + int totalIoJobs; + FIO_prefs_t* prefs; + POOL_function poolFunction; + + /* Controls the file we currently write to, make changes only by using provided utility functions */ + FILE* file; + + /* The jobs and availableJobsCount fields are accessed by both the main and worker threads and should + * only be mutated after locking the mutex */ + ZSTD_pthread_mutex_t ioJobsMutex; + void* availableJobs[MAX_IO_JOBS]; + int availableJobsCount; +} io_pool_ctx_t; + +typedef struct { + io_pool_ctx_t base; + + /* State regarding the currently read file */ + int reachedEof; + U64 nextReadOffset; + U64 waitingOnOffset; + + /* Bases buffer, shouldn't be accessed from outside ot utility functions. */ + U8 *srcBufferBase; + size_t srcBufferBaseSize; + + /* Read buffer can be used by consumer code, take care when copying this pointer aside as it might + * change when consuming / refilling buffer. */ + U8 *srcBuffer; + size_t srcBufferLoaded; + + /* We need to know what tasks completed so we can use their buffers when their time comes. + * Should only be accessed after locking base.ioJobsMutex . */ + void* completedJobs[MAX_IO_JOBS]; + int completedJobsCount; + ZSTD_pthread_cond_t jobCompletedCond; +} read_pool_ctx_t; + +typedef struct { + io_pool_ctx_t base; + unsigned storedSkips; +} write_pool_ctx_t; + +typedef struct { + /* These fields are automatically set and shouldn't be changed by non WritePool code. */ + void *ctx; + FILE* file; + void *buffer; + size_t bufferSize; + + /* This field should be changed before a job is queued for execution and should contain the number + * of bytes to write from the buffer. */ + size_t usedBufferSize; + U64 offset; +} io_job_t; + + +/* WritePool_releaseIoJob: + * Releases an acquired job back to the pool. Doesn't execute the job. */ +void WritePool_releaseIoJob(io_job_t *job); + +/* WritePool_acquireJob: + * Returns an available write job to be used for a future write. */ +io_job_t* WritePool_acquireJob(write_pool_ctx_t *ctx); + +/* WritePool_enqueueAndReacquireWriteJob: + * Enqueues a write job for execution and acquires a new one. + * After execution `job`'s pointed value would change to the newly acquired job. + * Make sure to set `usedBufferSize` to the wanted length before call. + * The queued job shouldn't be used directly after queueing it. */ +void WritePool_enqueueAndReacquireWriteJob(io_job_t **job); + +/* WritePool_sparseWriteEnd: + * Ends sparse writes to the current file. + * Blocks on completion of all current write jobs before executing. */ +void WritePool_sparseWriteEnd(write_pool_ctx_t *ctx); + +/* WritePool_setFile: + * Sets the destination file for future writes in the pool. + * Requires completion of all queues write jobs and release of all otherwise acquired jobs. + * Also requires ending of sparse write if a previous file was used in sparse mode. */ +void WritePool_setFile(write_pool_ctx_t *ctx, FILE* file); + +/* WritePool_getFile: + * Returns the file the writePool is currently set to write to. */ +FILE* WritePool_getFile(write_pool_ctx_t *ctx); + +/* WritePool_closeFile: + * Ends sparse write and closes the writePool's current file and sets the file to NULL. + * Requires completion of all queues write jobs and release of all otherwise acquired jobs. */ +int WritePool_closeFile(write_pool_ctx_t *ctx); + +/* WritePool_create: + * Allocates and sets and a new write pool including its included jobs. + * bufferSize should be set to the maximal buffer we want to write to at a time. */ +write_pool_ctx_t* WritePool_create(FIO_prefs_t* const prefs, size_t bufferSize); + +/* WritePool_free: + * Frees and releases a writePool and its resources. Closes destination file. */ +void WritePool_free(write_pool_ctx_t* ctx); + +/* ReadPool_create: + * Allocates and sets and a new readPool including its included jobs. + * bufferSize should be set to the maximal buffer we want to read at a time, will also be used + * as our basic read size. */ +read_pool_ctx_t* ReadPool_create(FIO_prefs_t* const prefs, size_t bufferSize); + +/* ReadPool_free: + * Frees and releases a readPool and its resources. Closes source file. */ +void ReadPool_free(read_pool_ctx_t* ctx); + +/* ReadPool_consumeBytes: + * Consumes byes from srcBuffer's beginning and updates srcBufferLoaded accordingly. */ +void ReadPool_consumeBytes(read_pool_ctx_t *ctx, size_t n); + +/* ReadPool_fillBuffer: + * Makes sure buffer has at least n bytes loaded (as long as n is not bigger than the initalized bufferSize). + * Returns if srcBuffer has at least n bytes loaded or if we've reached the end of the file. + * Return value is the number of bytes added to the buffer. + * Note that srcBuffer might have up to 2 times bufferSize bytes. */ +size_t ReadPool_fillBuffer(read_pool_ctx_t *ctx, size_t n); + +/* ReadPool_consumeAndRefill: + * Consumes the current buffer and refills it with bufferSize bytes. */ +size_t ReadPool_consumeAndRefill(read_pool_ctx_t *ctx); + +/* ReadPool_setFile: + * Sets the source file for future read in the pool. Initiates reading immediately if file is not NULL. + * Waits for all current enqueued tasks to complete if a previous file was set. */ +void ReadPool_setFile(read_pool_ctx_t *ctx, FILE* file); + +/* ReadPool_getFile: + * Returns the current file set for the read pool. */ +FILE* ReadPool_getFile(read_pool_ctx_t *ctx); + +/* ReadPool_closeFile: + * Closes the current set file. Waits for all current enqueued tasks to complete and resets state. */ +int ReadPool_closeFile(read_pool_ctx_t *ctx); + +#if defined (__cplusplus) +} +#endif + +#endif /* ZSTD_FILEIO_ASYNCIO_H */ diff --git a/programs/fileio_common.h b/programs/fileio_common.h new file mode 100644 index 00000000000..ba01eee5f8b --- /dev/null +++ b/programs/fileio_common.h @@ -0,0 +1,108 @@ +#ifndef ZSTD_FILEIO_COMMON_H +#define ZSTD_FILEIO_COMMON_H + +#if defined (__cplusplus) +extern "C" { +#endif + +#include "../lib/common/mem.h" /* U32, U64 */ +#include "timefn.h" +#include "fileio_types.h" +#include "platform.h" +#include "util.h" + +/*-************************************* +* Macros +***************************************/ +#define KB *(1 <<10) +#define MB *(1 <<20) +#define GB *(1U<<30) +#undef MAX +#define MAX(a,b) ((a)>(b) ? (a) : (b)) + +extern FIO_display_prefs_t g_display_prefs; + +#define DISPLAY(...) fprintf(stderr, __VA_ARGS__) +#define DISPLAYOUT(...) fprintf(stdout, __VA_ARGS__) +#define DISPLAYLEVEL(l, ...) { if (g_display_prefs.displayLevel>=l) { DISPLAY(__VA_ARGS__); } } + +static const U64 g_refreshRate = SEC_TO_MICRO / 6; +extern UTIL_time_t g_displayClock; + +#define READY_FOR_UPDATE() ((g_display_prefs.progressSetting != FIO_ps_never) && UTIL_clockSpanMicro(g_displayClock) > g_refreshRate) +#define DELAY_NEXT_UPDATE() { g_displayClock = UTIL_getTime(); } +#define DISPLAYUPDATE(l, ...) { \ + if (g_display_prefs.displayLevel>=l && (g_display_prefs.progressSetting != FIO_ps_never)) { \ + if (READY_FOR_UPDATE() || (g_display_prefs.displayLevel>=4)) { \ + DELAY_NEXT_UPDATE(); \ + DISPLAY(__VA_ARGS__); \ + if (g_display_prefs.displayLevel>=4) fflush(stderr); \ + } } } + +#undef MIN /* in case it would be already defined */ +#define MIN(a,b) ((a) < (b) ? (a) : (b)) + + +#define EXM_THROW(error, ...) \ +{ \ + DISPLAYLEVEL(1, "zstd: "); \ + DISPLAYLEVEL(5, "Error defined at %s, line %i : \n", __FILE__, __LINE__); \ + DISPLAYLEVEL(1, "error %i : ", error); \ + DISPLAYLEVEL(1, __VA_ARGS__); \ + DISPLAYLEVEL(1, " \n"); \ + exit(error); \ +} + +#define CHECK_V(v, f) \ + v = f; \ + if (ZSTD_isError(v)) { \ + DISPLAYLEVEL(5, "%s \n", #f); \ + EXM_THROW(11, "%s", ZSTD_getErrorName(v)); \ + } +#define CHECK(f) { size_t err; CHECK_V(err, f); } + + +/* Avoid fseek()'s 2GiB barrier with MSVC, macOS, *BSD, MinGW */ +#if defined(_MSC_VER) && _MSC_VER >= 1400 +# define LONG_SEEK _fseeki64 +# define LONG_TELL _ftelli64 +#elif !defined(__64BIT__) && (PLATFORM_POSIX_VERSION >= 200112L) /* No point defining Large file for 64 bit */ +# define LONG_SEEK fseeko +# define LONG_TELL ftello +#elif defined(__MINGW32__) && !defined(__STRICT_ANSI__) && !defined(__NO_MINGW_LFS) && defined(__MSVCRT__) +# define LONG_SEEK fseeko64 +# define LONG_TELL ftello64 +#elif defined(_WIN32) && !defined(__DJGPP__) +# include + static int LONG_SEEK(FILE* file, __int64 offset, int origin) { + LARGE_INTEGER off; + DWORD method; + off.QuadPart = offset; + if (origin == SEEK_END) + method = FILE_END; + else if (origin == SEEK_CUR) + method = FILE_CURRENT; + else + method = FILE_BEGIN; + + if (SetFilePointerEx((HANDLE) _get_osfhandle(_fileno(file)), off, NULL, method)) + return 0; + else + return -1; + } + static __int64 LONG_TELL(FILE* file) { + LARGE_INTEGER off, newOff; + off.QuadPart = 0; + newOff.QuadPart = 0; + SetFilePointerEx((HANDLE) _get_osfhandle(_fileno(file)), off, &newOff, FILE_CURRENT); + return newOff.QuadPart; + } +#else +# define LONG_SEEK fseek +# define LONG_TELL ftell +#endif + +#if defined (__cplusplus) +} +#endif +#endif //ZSTD_FILEIO_COMMON_H diff --git a/programs/fileio_types.h b/programs/fileio_types.h new file mode 100644 index 00000000000..80f55394fad --- /dev/null +++ b/programs/fileio_types.h @@ -0,0 +1,63 @@ +#ifndef FILEIO_TYPES_HEADER +#define FILEIO_TYPES_HEADER + +#define ZSTD_STATIC_LINKING_ONLY /* ZSTD_compressionParameters */ +#include "../lib/zstd.h" /* ZSTD_* */ + +/*-************************************* +* Parameters: FIO_prefs_t +***************************************/ + +typedef struct FIO_display_prefs_s FIO_display_prefs_t; + +typedef enum { FIO_ps_auto, FIO_ps_never, FIO_ps_always } FIO_progressSetting_e; + +struct FIO_display_prefs_s { + int displayLevel; /* 0 : no display; 1: errors; 2: + result + interaction + warnings; 3: + progression; 4: + information */ + FIO_progressSetting_e progressSetting; +}; + + +typedef enum { FIO_zstdCompression, FIO_gzipCompression, FIO_xzCompression, FIO_lzmaCompression, FIO_lz4Compression } FIO_compressionType_t; + +typedef struct FIO_prefs_s { + + /* Algorithm preferences */ + FIO_compressionType_t compressionType; + U32 sparseFileSupport; /* 0: no sparse allowed; 1: auto (file yes, stdout no); 2: force sparse */ + int dictIDFlag; + int checksumFlag; + int blockSize; + int overlapLog; + U32 adaptiveMode; + U32 useRowMatchFinder; + int rsyncable; + int minAdaptLevel; + int maxAdaptLevel; + int ldmFlag; + int ldmHashLog; + int ldmMinMatch; + int ldmBucketSizeLog; + int ldmHashRateLog; + size_t streamSrcSize; + size_t targetCBlockSize; + int srcSizeHint; + int testMode; + ZSTD_paramSwitch_e literalCompressionMode; + + /* IO preferences */ + U32 removeSrcFile; + U32 overwrite; + U32 asyncIO; + + /* Computation resources preferences */ + unsigned memLimit; + int nbWorkers; + + int excludeCompressedFiles; + int patchFromMode; + int contentSize; + int allowBlockDevices; +} FIO_prefs_t; + +#endif /* FILEIO_TYPES_HEADER */ \ No newline at end of file diff --git a/programs/zstdcli.c b/programs/zstdcli.c index bfe18c0c1ba..fd563e1c24d 100644 --- a/programs/zstdcli.c +++ b/programs/zstdcli.c @@ -239,9 +239,12 @@ static void usage_advanced(const char* programName) #ifndef ZSTD_NODECOMPRESS DISPLAYOUT( "\n"); DISPLAYOUT( "Advanced decompression arguments : \n"); - DISPLAYOUT( " -l : print information about zstd compressed files \n"); - DISPLAYOUT( "--test : test compressed file integrity \n"); - DISPLAYOUT( " -M# : Set a memory usage limit for decompression \n"); + DISPLAYOUT( " -l : print information about zstd compressed files \n"); + DISPLAYOUT( "--test : test compressed file integrity \n"); + DISPLAYOUT( " -M# : Set a memory usage limit for decompression \n"); +#ifdef ZSTD_MULTITHREAD + DISPLAYOUT( "--[no-]asyncio : use threaded asynchronous IO for output (default: disabled) \n"); +#endif # if ZSTD_SPARSE_DEFAULT DISPLAYOUT( "--[no-]sparse : sparse mode (default: enabled on file, disabled on stdout) \n"); # else @@ -912,6 +915,8 @@ int main(int argCount, const char* argv[]) if (!strcmp(argument, "--sparse")) { FIO_setSparseWrite(prefs, 2); continue; } if (!strcmp(argument, "--no-sparse")) { FIO_setSparseWrite(prefs, 0); continue; } if (!strcmp(argument, "--test")) { operation=zom_test; continue; } + if (!strcmp(argument, "--asyncio")) { FIO_setAsyncIOFlag(prefs, 1); continue;} + if (!strcmp(argument, "--no-asyncio")) { FIO_setAsyncIOFlag(prefs, 0); continue;} if (!strcmp(argument, "--train")) { operation=zom_train; if (outFileName==NULL) outFileName=g_defaultDictName; continue; } if (!strcmp(argument, "--no-dictID")) { FIO_setDictIDFlag(prefs, 0); continue; } if (!strcmp(argument, "--keep")) { FIO_setRemoveSrcFile(prefs, 0); continue; } diff --git a/tests/playTests.sh b/tests/playTests.sh index b7a3d88a817..b795823d782 100755 --- a/tests/playTests.sh +++ b/tests/playTests.sh @@ -185,7 +185,7 @@ fi println "\n===> simple tests " -datagen > tmp +datagen -g500K > tmp zstd -h zstd -H zstd -V @@ -1575,6 +1575,44 @@ elif [ "$longCSize19wlog23" -gt "$optCSize19wlog23" ]; then exit 1 fi +println "\n===> zstd asyncio tests " + +addFrame() { + datagen -g2M -s$2 >> tmp_uncompressed + datagen -g2M -s$2 | zstd --format=$1 >> tmp_compressed.zst +} + +addTwoFrames() { + addFrame $1 1 + addFrame $1 2 +} + +testAsyncIO() { + roundTripTest -g2M "3 --asyncio --format=$1" + roundTripTest -g2M "3 --no-asyncio --format=$1" +} + +rm -f tmp_compressed tmp_uncompressed +testAsyncIO zstd +addTwoFrames zstd +if [ $GZIPMODE -eq 1 ]; then + testAsyncIO gzip + addTwoFrames gzip +fi +if [ $LZMAMODE -eq 1 ]; then + testAsyncIO lzma + addTwoFrames lzma +fi +if [ $LZ4MODE -eq 1 ]; then + testAsyncIO lz4 + addTwoFrames lz4 +fi +cat tmp_uncompressed | $MD5SUM > tmp2 +zstd -d tmp_compressed.zst --asyncio -c | $MD5SUM > tmp1 +$DIFF -q tmp1 tmp2 +rm tmp1 +zstd -d tmp_compressed.zst --no-asyncio -c | $MD5SUM > tmp1 +$DIFF -q tmp1 tmp2 if [ "$1" != "--test-large-data" ]; then println "Skipping large data tests"