Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions programs/fileio.c
Original file line number Diff line number Diff line change
Expand Up @@ -1758,6 +1758,7 @@ FIO_compressFilename_srcFile(FIO_ctx_t* const fCtx,
int result;
FILE* srcFile;
stat_t srcFileStat;
U64 fileSize = UTIL_FILESIZE_UNKNOWN;
DISPLAYLEVEL(6, "FIO_compressFilename_srcFile: %s \n", srcFileName);

if (strcmp(srcFileName, stdinmark)) {
Expand Down Expand Up @@ -1790,6 +1791,17 @@ FIO_compressFilename_srcFile(FIO_ctx_t* const fCtx,
srcFile = FIO_openSrcFile(prefs, srcFileName, &srcFileStat);
if (srcFile == NULL) return 1; /* srcFile could not be opened */

/* Don't use AsyncIO for small files */
if (strcmp(srcFileName, stdinmark)) /* Stdin doesn't have stats */
fileSize = UTIL_getFileSizeStat(&srcFileStat);
if(fileSize != UTIL_FILESIZE_UNKNOWN && fileSize < ZSTD_BLOCKSIZE_MAX * 3) {
AIO_ReadPool_setAsync(ress.readCtx, 0);
AIO_WritePool_setAsync(ress.writeCtx, 0);
} else {
AIO_ReadPool_setAsync(ress.readCtx, 1);
AIO_WritePool_setAsync(ress.writeCtx, 1);
}

AIO_ReadPool_setFile(ress.readCtx, srcFile);
result = FIO_compressFilename_dstFile(
fCtx, prefs, ress,
Expand Down Expand Up @@ -2586,6 +2598,7 @@ static int FIO_decompressSrcFile(FIO_ctx_t* const fCtx, FIO_prefs_t* const prefs
FILE* srcFile;
stat_t srcFileStat;
int result;
U64 fileSize = UTIL_FILESIZE_UNKNOWN;

if (UTIL_isDirectory(srcFileName)) {
DISPLAYLEVEL(1, "zstd: %s is a directory -- ignored \n", srcFileName);
Expand All @@ -2594,6 +2607,18 @@ static int FIO_decompressSrcFile(FIO_ctx_t* const fCtx, FIO_prefs_t* const prefs

srcFile = FIO_openSrcFile(prefs, srcFileName, &srcFileStat);
if (srcFile==NULL) return 1;

/* Don't use AsyncIO for small files */
if (strcmp(srcFileName, stdinmark)) /* Stdin doesn't have stats */
fileSize = UTIL_getFileSizeStat(&srcFileStat);
if(fileSize != UTIL_FILESIZE_UNKNOWN && fileSize < ZSTD_BLOCKSIZE_MAX * 3) {
AIO_ReadPool_setAsync(ress.readCtx, 0);
AIO_WritePool_setAsync(ress.writeCtx, 0);
} else {
AIO_ReadPool_setAsync(ress.readCtx, 1);
AIO_WritePool_setAsync(ress.writeCtx, 1);
}

AIO_ReadPool_setFile(ress.readCtx, srcFile);

result = FIO_decompressDstFile(fCtx, prefs, ress, dstFileName, srcFileName, &srcFileStat);
Expand Down
97 changes: 69 additions & 28 deletions programs/fileio_asyncio.c
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ int AIO_supported(void) {
}

/* ***********************************
* General IoPool implementation
* Generic IoPool implementation
*************************************/

static IOJob_t *AIO_IOPool_createIoJob(IOPoolCtx_t *ctx, size_t bufferSize) {
Expand All @@ -163,20 +163,22 @@ static IOJob_t *AIO_IOPool_createIoJob(IOPoolCtx_t *ctx, size_t bufferSize) {
* Displays warning if asyncio is requested but MT isn't available. */
static void AIO_IOPool_createThreadPool(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs) {
ctx->threadPool = NULL;
ctx->threadPoolActive = 0;
if(prefs->asyncIO) {
if (ZSTD_pthread_mutex_init(&ctx->ioJobsMutex, NULL))
EXM_THROW(102,"Failed creating write availableJobs mutex");
EXM_THROW(102,"Failed creating ioJobsMutex mutex");
/* We want MAX_IO_JOBS-2 queue items because we need to always have 1 free buffer to
* decompress into and 1 buffer that's actively written to disk and owned by the writing thread. */
assert(MAX_IO_JOBS >= 2);
ctx->threadPool = POOL_create(1, MAX_IO_JOBS - 2);
ctx->threadPoolActive = 1;
if (!ctx->threadPool)
EXM_THROW(104, "Failed creating writer thread pool");
EXM_THROW(104, "Failed creating I/O thread pool");
}
}

/* AIO_IOPool_init:
* Allocates and sets and a new write pool including its included availableJobs. */
* Allocates and sets and a new I/O thread pool including its included availableJobs. */
static void AIO_IOPool_init(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs, POOL_function poolFunction, size_t bufferSize) {
int i;
AIO_IOPool_createThreadPool(ctx, prefs);
Expand All @@ -192,27 +194,59 @@ static void AIO_IOPool_init(IOPoolCtx_t* ctx, const FIO_prefs_t* prefs, POOL_fun
}


/* AIO_IOPool_threadPoolActive:
* Check if current operation uses thread pool.
* Note that in some cases we have a thread pool initialized but choose not to use it. */
static int AIO_IOPool_threadPoolActive(IOPoolCtx_t* ctx) {
return ctx->threadPool && ctx->threadPoolActive;
}


/* AIO_IOPool_lockJobsMutex:
* Locks the IO jobs mutex if threading is active */
static void AIO_IOPool_lockJobsMutex(IOPoolCtx_t* ctx) {
if(AIO_IOPool_threadPoolActive(ctx))
ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex);
}

/* AIO_IOPool_unlockJobsMutex:
* Unlocks the IO jobs mutex if threading is active */
static void AIO_IOPool_unlockJobsMutex(IOPoolCtx_t* ctx) {
if(AIO_IOPool_threadPoolActive(ctx))
ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex);
}

/* AIO_IOPool_releaseIoJob:
* Releases an acquired job back to the pool. Doesn't execute the job. */
static void AIO_IOPool_releaseIoJob(IOJob_t* job) {
IOPoolCtx_t* const ctx = (IOPoolCtx_t *) job->ctx;
if(ctx->threadPool)
ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex);
AIO_IOPool_lockJobsMutex(ctx);
assert(ctx->availableJobsCount < ctx->totalIoJobs);
ctx->availableJobs[ctx->availableJobsCount++] = job;
if(ctx->threadPool)
ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex);
AIO_IOPool_unlockJobsMutex(ctx);
}

/* AIO_IOPool_join:
* Waits for all tasks in the pool to finish executing. */
static void AIO_IOPool_join(IOPoolCtx_t* ctx) {
if(ctx->threadPool)
if(AIO_IOPool_threadPoolActive(ctx))
POOL_joinJobs(ctx->threadPool);
}

/* AIO_IOPool_setThreaded:
* Allows (de)activating threaded mode, to be used when the expected overhead
* of threading costs more than the expected gains. */
static void AIO_IOPool_setThreaded(IOPoolCtx_t* ctx, int threaded) {
assert(threaded == 0 || threaded == 1);
assert(ctx != NULL);
if(ctx->threadPoolActive != threaded) {
AIO_IOPool_join(ctx);
ctx->threadPoolActive = threaded;
}
}

/* AIO_IOPool_free:
* Release a previously allocated write thread pool. Makes sure all takss are done and released. */
* Release a previously allocated IO thread pool. Makes sure all tasks are done and released. */
static void AIO_IOPool_destroy(IOPoolCtx_t* ctx) {
int i;
if(ctx->threadPool) {
Expand All @@ -236,12 +270,10 @@ static void AIO_IOPool_destroy(IOPoolCtx_t* ctx) {
static IOJob_t* AIO_IOPool_acquireJob(IOPoolCtx_t* ctx) {
IOJob_t *job;
assert(ctx->file != NULL || ctx->prefs->testMode);
if(ctx->threadPool)
ZSTD_pthread_mutex_lock(&ctx->ioJobsMutex);
AIO_IOPool_lockJobsMutex(ctx);
assert(ctx->availableJobsCount > 0);
job = (IOJob_t*) ctx->availableJobs[--ctx->availableJobsCount];
if(ctx->threadPool)
ZSTD_pthread_mutex_unlock(&ctx->ioJobsMutex);
AIO_IOPool_unlockJobsMutex(ctx);
job->usedBufferSize = 0;
job->file = ctx->file;
job->offset = 0;
Expand All @@ -251,8 +283,7 @@ static IOJob_t* AIO_IOPool_acquireJob(IOPoolCtx_t* ctx) {

/* AIO_IOPool_setFile:
* Sets the destination file for future files in the pool.
* Requires completion of all queues write jobs and release of all otherwise acquired jobs.
* Also requires ending of sparse write if a previous file was used in sparse mode. */
* Requires completion of all queued jobs and release of all otherwise acquired jobs. */
static void AIO_IOPool_setFile(IOPoolCtx_t* ctx, FILE* file) {
assert(ctx!=NULL);
AIO_IOPool_join(ctx);
Expand All @@ -269,7 +300,7 @@ static FILE* AIO_IOPool_getFile(const IOPoolCtx_t* ctx) {
* The queued job shouldn't be used directly after queueing it. */
static void AIO_IOPool_enqueueJob(IOJob_t* job) {
IOPoolCtx_t* const ctx = (IOPoolCtx_t *)job->ctx;
if(ctx->threadPool)
if(AIO_IOPool_threadPoolActive(ctx))
POOL_add(ctx->threadPool, ctx->poolFunction, job);
else
ctx->poolFunction(job);
Expand Down Expand Up @@ -300,8 +331,7 @@ void AIO_WritePool_enqueueAndReacquireWriteJob(IOJob_t **job) {
* Blocks on completion of all current write jobs before executing. */
void AIO_WritePool_sparseWriteEnd(WritePoolCtx_t* ctx) {
assert(ctx != NULL);
if(ctx->base.threadPool)
POOL_joinJobs(ctx->base.threadPool);
AIO_IOPool_join(&ctx->base);
AIO_fwriteSparseEnd(ctx->base.prefs, ctx->base.file, ctx->storedSkips);
ctx->storedSkips = 0;
}
Expand Down Expand Up @@ -368,6 +398,13 @@ void AIO_WritePool_free(WritePoolCtx_t* ctx) {
free(ctx);
}

/* AIO_WritePool_setAsync:
* Allows (de)activating async mode, to be used when the expected overhead
* of asyncio costs more than the expected gains. */
void AIO_WritePool_setAsync(WritePoolCtx_t* ctx, int async) {
AIO_IOPool_setThreaded(&ctx->base, async);
}


/* ***********************************
* ReadPool implementation
Expand All @@ -383,14 +420,13 @@ static void AIO_ReadPool_releaseAllCompletedJobs(ReadPoolCtx_t* ctx) {

static void AIO_ReadPool_addJobToCompleted(IOJob_t* job) {
ReadPoolCtx_t* const ctx = (ReadPoolCtx_t *)job->ctx;
if(ctx->base.threadPool)
ZSTD_pthread_mutex_lock(&ctx->base.ioJobsMutex);
AIO_IOPool_lockJobsMutex(&ctx->base);
assert(ctx->completedJobsCount < MAX_IO_JOBS);
ctx->completedJobs[ctx->completedJobsCount++] = job;
if(ctx->base.threadPool) {
if(AIO_IOPool_threadPoolActive(&ctx->base)) {
ZSTD_pthread_cond_signal(&ctx->jobCompletedCond);
ZSTD_pthread_mutex_unlock(&ctx->base.ioJobsMutex);
}
AIO_IOPool_unlockJobsMutex(&ctx->base);
}

/* AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked:
Expand Down Expand Up @@ -426,8 +462,7 @@ static size_t AIO_ReadPool_numReadsInFlight(ReadPoolCtx_t* ctx) {
* Would block. */
static IOJob_t* AIO_ReadPool_getNextCompletedJob(ReadPoolCtx_t* ctx) {
IOJob_t *job = NULL;
if (ctx->base.threadPool)
ZSTD_pthread_mutex_lock(&ctx->base.ioJobsMutex);
AIO_IOPool_lockJobsMutex(&ctx->base);

job = AIO_ReadPool_findNextWaitingOffsetCompletedJob_locked(ctx);

Expand All @@ -443,8 +478,7 @@ static IOJob_t* AIO_ReadPool_getNextCompletedJob(ReadPoolCtx_t* ctx) {
ctx->waitingOnOffset += job->usedBufferSize;
}

if (ctx->base.threadPool)
ZSTD_pthread_mutex_unlock(&ctx->base.ioJobsMutex);
AIO_IOPool_unlockJobsMutex(&ctx->base);
return job;
}

Expand Down Expand Up @@ -524,7 +558,7 @@ ReadPoolCtx_t* AIO_ReadPool_create(const FIO_prefs_t* prefs, size_t bufferSize)

if(ctx->base.threadPool)
if (ZSTD_pthread_cond_init(&ctx->jobCompletedCond, NULL))
EXM_THROW(103,"Failed creating write jobCompletedCond mutex");
EXM_THROW(103,"Failed creating jobCompletedCond cond");

return ctx;
}
Expand Down Expand Up @@ -620,3 +654,10 @@ int AIO_ReadPool_closeFile(ReadPoolCtx_t* ctx) {
AIO_ReadPool_setFile(ctx, NULL);
return fclose(file);
}

/* AIO_ReadPool_setAsync:
* Allows (de)activating async mode, to be used when the expected overhead
* of asyncio costs more than the expected gains. */
void AIO_ReadPool_setAsync(ReadPoolCtx_t* ctx, int async) {
AIO_IOPool_setThreaded(&ctx->base, async);
}
22 changes: 22 additions & 0 deletions programs/fileio_asyncio.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,17 @@
* You may select, at your option, one of the above-listed licenses.
*/

/*
* FileIO AsyncIO exposes read/write IO pools that allow doing IO asynchronously.
* Current implementation relies on having one thread that reads and one that
* writes.
* Each IO pool supports up to `MAX_IO_JOBS` that can be enqueued for work, but
* are performed serially by the appropriate worker thread.
* Most systems exposes better primitives to perform asynchronous IO, such as
* io_uring on newer linux systems. The API is built in such a way that in the
* future we could replace the threads with better solutions when available.
*/

#ifndef ZSTD_FILEIO_ASYNCIO_H
#define ZSTD_FILEIO_ASYNCIO_H

Expand All @@ -27,6 +38,7 @@ extern "C" {
typedef struct {
/* These struct fields should be set only on creation and not changed afterwards */
POOL_ctx* threadPool;
int threadPoolActive;
int totalIoJobs;
const FIO_prefs_t* prefs;
POOL_function poolFunction;
Expand Down Expand Up @@ -136,6 +148,11 @@ WritePoolCtx_t* AIO_WritePool_create(const FIO_prefs_t* prefs, size_t bufferSize
* Frees and releases a writePool and its resources. Closes destination file. */
void AIO_WritePool_free(WritePoolCtx_t* ctx);

/* AIO_WritePool_setAsync:
* Allows (de)activating async mode, to be used when the expected overhead
* of asyncio costs more than the expected gains. */
void AIO_WritePool_setAsync(WritePoolCtx_t* ctx, int async);

/* AIO_ReadPool_create:
* Allocates and sets and a new readPool including its included jobs.
* bufferSize should be set to the maximal buffer we want to read at a time, will also be used
Expand All @@ -146,6 +163,11 @@ ReadPoolCtx_t* AIO_ReadPool_create(const FIO_prefs_t* prefs, size_t bufferSize);
* Frees and releases a readPool and its resources. Closes source file. */
void AIO_ReadPool_free(ReadPoolCtx_t* ctx);

/* AIO_ReadPool_setAsync:
* Allows (de)activating async mode, to be used when the expected overhead
* of asyncio costs more than the expected gains. */
void AIO_ReadPool_setAsync(ReadPoolCtx_t* ctx, int async);

/* AIO_ReadPool_consumeBytes:
* Consumes byes from srcBuffer's beginning and updates srcBufferLoaded accordingly. */
void AIO_ReadPool_consumeBytes(ReadPoolCtx_t *ctx, size_t n);
Expand Down