Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@

2. Compilation failed on CRAN's MacOS due to an older version of `zlib.h/zconf.h` which did not have `z_const` defined, [#3939](https://github.com/Rdatatable/data.table/issues/3939). Other open-source projects unrelated to R have experienced this problem on MacOS too. We have followed the common practice of removing `z_const` to support the older `zlib` versions, and data.table's release procedures have gained a `grep` to ensure `z_const` isn't used again by accident in future. The library `zlib` is used for `fwrite`'s new feature of multithreaded compression on-the-fly; see item 3 of 1.12.4 below.

3. An error, again in `fwrite`'s compression, but only observed so far on Solaris 32bit has been fixed, [#3931](https://github.com/Rdatatable/data.table/issues/3931): `Error -2: one or more threads failed to allocate buffers or there was a compression error.` In case it happens again, this area has been made more robust and the error more detailed.

## NOTES


Expand Down
105 changes: 58 additions & 47 deletions src/fwrite.c
Original file line number Diff line number Diff line change
Expand Up @@ -578,9 +578,6 @@ int compressbuff(z_stream *stream, void* dest, size_t *destLen, const void* sour
return err == Z_STREAM_END ? Z_OK : err;
}

static int failed = 0;
static int rowsPerBatch;

void fwriteMain(fwriteMainArgs args)
{
double startTime = wallclock();
Expand Down Expand Up @@ -766,6 +763,7 @@ void fwriteMain(fwriteMainArgs args)

// Decide buffer size and rowsPerBatch for each thread
// Once rowsPerBatch is decided it can't be changed
int rowsPerBatch=0;
if (maxLineLen*2>buffSize) { buffSize=2*maxLineLen; rowsPerBatch=2; }
else rowsPerBatch = buffSize / maxLineLen;
if (rowsPerBatch > args.nrow) rowsPerBatch = args.nrow;
Expand All @@ -780,8 +778,6 @@ void fwriteMain(fwriteMainArgs args)
}
t0 = wallclock();

failed=0; // static global so checkBuffer can set it. -errno for malloc or realloc fails, +errno for write fail

bool hasPrinted = false;
int maxBuffUsedPC = 0;

Expand All @@ -795,41 +791,51 @@ void fwriteMain(fwriteMainArgs args)
deflateEnd(&stream);
}

errno=0;
char *buffPool = malloc(nth*(size_t)buffSize);
if (!buffPool) {
// # nocov start
STOP("Unable to allocate %d MB * %d thread buffers; '%d: %s'. Please read ?fwrite for nThread, buffMB and verbose options.",
(size_t)buffSize/(1024^2), nth, errno, strerror(errno));
// # nocov end
}
char *zbuffPool = NULL;
if (args.is_gzip) {
zbuffPool = malloc(nth*(size_t)zbuffSize);
if (!zbuffPool) {
// # nocov start
free(buffPool);
STOP("Unable to allocate %d MB * %d thread compressed buffers; '%d: %s'. Please read ?fwrite for nThread, buffMB and verbose options.",
(size_t)zbuffSize/(1024^2), nth, errno, strerror(errno));
// # nocov end
}
}

bool failed = false; // naked (unprotected by atomic) write to bool ok because only ever write true in this special paradigm
int failed_compress = 0; // the first thread to fail writes their reason here when they first get to ordered section
int failed_write = 0; // same. could use +ve and -ve in the same code but separate it out to trace Solaris problem, #3931

#pragma omp parallel num_threads(nth)
{
// local to each thread
int me = omp_get_thread_num();
int my_failed_compress = 0;
char *ch, *myBuff;
ch = myBuff = buffPool + me*buffSize;

void *myzBuff = NULL;
size_t myzbuffUsed = 0;

// each thread has its own buffer. malloc and errno are thread-safe.
ch = myBuff = malloc(buffSize);
if (myBuff==NULL) failed=-errno;
// each thread has its own zbuffer
z_stream mystream;
if (args.is_gzip && !failed) {
myzBuff = malloc(zbuffSize);
if (myzBuff==NULL) failed=-errno;
if(init_stream(&mystream)) {
failed = -998; // # nocov
if (args.is_gzip) {
myzBuff = zbuffPool + me*zbuffSize;
if (init_stream(&mystream)) {
failed = true; // # nocov
my_failed_compress = -998; // # nocov
}
}

// Do not rely on availability of '#omp cancel' new in OpenMP v4.0 (July 2013).
// OpenMP v4.0 is in gcc 4.9+ (https://gcc.gnu.org/wiki/openmp) but
// not yet in clang as of v3.8 (http://openmp.llvm.org/)
// If not-me failed, I'll see shared 'failed', fall through loop, free my buffer
// and after parallel section, single thread will call STOP() safely.

#pragma omp single
{
nth = omp_get_num_threads(); // update nth with the actual nth (might be different than requested)
}
int me = omp_get_thread_num();

#pragma omp for ordered schedule(dynamic)
for(int64_t start=0; start<args.nrow; start+=rowsPerBatch) {
if (failed) continue; // Not break. See comments above about #omp cancel
if (failed) continue; // Not break. Because we don't use #omp cancel yet.
int64_t end = ((args.nrow - start)<rowsPerBatch) ? args.nrow : start + rowsPerBatch;
for (int64_t i=start; i<end; i++) {
// Tepid starts here (once at beginning of each per line)
Expand All @@ -852,28 +858,32 @@ void fwriteMain(fwriteMainArgs args)
// Tepid again (once at the end of each line)
ch--; // backup onto the last sep after the last column. ncol>=1 because 0-columns was caught earlier.
write_chars(args.eol, &ch); // overwrite last sep with eol instead
if (failed) break; // this thread stop writing rows; fall through to clear up and STOP() below
}
// compress buffer if gzip
if (args.is_gzip && !failed) {
if (!failed) {
myzbuffUsed = zbuffSize;
failed = compressbuff(&mystream, myzBuff, &myzbuffUsed, myBuff, (int)(ch-myBuff));
}
deflateReset(&mystream);
myzbuffUsed = zbuffSize;
int ret = compressbuff(&mystream, myzBuff, &myzbuffUsed, myBuff, (int)(ch-myBuff));
if (ret) { failed=true; my_failed_compress=ret; }
else deflateReset(&mystream);
}
#pragma omp ordered
{
if (!failed) { // a thread ahead of me could have failed below while I was working or waiting above
if (failed) {
if (failed_compress==0 && my_failed_compress!=0) failed_compress = my_failed_compress; // # nocov
// else another thread could have failed below while I was working or waiting above; their reason got here first
} else {
errno=0;
if (f==-1) {
*ch='\0'; // standard C string end marker so DTPRINT knows where to stop
DTPRINT(myBuff);
} else if ((args.is_gzip)) {
} else if (args.is_gzip) {
if (WRITE(f, myzBuff, (int)(myzbuffUsed)) == -1) {
failed=errno; // # nocov
failed=true; // # nocov
failed_write=errno; // # nocov
}
} else if (WRITE(f, myBuff, (int)(ch - myBuff)) == -1) {
failed=errno; // # nocov
failed=true; // # nocov
failed_write=errno; // # nocov
}

int used = 100*((double)(ch-myBuff))/buffSize; // percentage of original buffMB
Expand Down Expand Up @@ -921,9 +931,9 @@ void fwriteMain(fwriteMainArgs args)
if (args.is_gzip) {
deflateEnd(&mystream);
}
free(myBuff);
free(myzBuff);
}
free(buffPool);
free(zbuffPool);

// Finished parallel region and can call R API safely now.
if (hasPrinted) {
Expand All @@ -943,12 +953,13 @@ void fwriteMain(fwriteMainArgs args)
// If a write failed, the line above tries close() to clean up, but that might fail as well. So the
// '&& !failed' is to not report the error as just 'closing file' but the next line for more detail
// from the original error.
if (failed<0) {
STOP("Error %d: one or more threads failed to allocate buffers or there was a compression error." // # nocov
" Please try again with verbose=TRUE and try searching online for this error message.\n", failed); // # nocov
} else if (failed>0) {
STOP("%s: '%s'", strerror(failed), args.filename); // # nocov
if (failed) {
// # nocov start
if (failed_compress)
STOP("Error %d: compression error. Please retry with verbose=TRUE and search online for this error message.\n", failed_compress);
if (failed_write)
STOP("%s: '%s'", strerror(failed_write), args.filename);
// # nocov end
}
return;
}