From 0c3b6bca45d16407a614e955b00947118067e782 Mon Sep 17 00:00:00 2001 From: mattdowle Date: Wed, 9 Oct 2019 09:16:49 -0700 Subject: [PATCH 1/5] malloc before parallel region with its own error, and naked bool write true only with distinct compress/write error codes. To trace Solaris. --- src/fwrite.c | 105 ++++++++++++++++++++++++++++----------------------- 1 file changed, 58 insertions(+), 47 deletions(-) diff --git a/src/fwrite.c b/src/fwrite.c index e5a7b68190..efaf64ce73 100644 --- a/src/fwrite.c +++ b/src/fwrite.c @@ -578,9 +578,6 @@ int compressbuff(z_stream *stream, void* dest, size_t *destLen, const void* sour return err == Z_STREAM_END ? Z_OK : err; } -static int failed = 0; -static int rowsPerBatch; - void fwriteMain(fwriteMainArgs args) { double startTime = wallclock(); @@ -766,6 +763,7 @@ void fwriteMain(fwriteMainArgs args) // Decide buffer size and rowsPerBatch for each thread // Once rowsPerBatch is decided it can't be changed + int rowsPerBatch=0; if (maxLineLen*2>buffSize) { buffSize=2*maxLineLen; rowsPerBatch=2; } else rowsPerBatch = buffSize / maxLineLen; if (rowsPerBatch > args.nrow) rowsPerBatch = args.nrow; @@ -780,8 +778,6 @@ void fwriteMain(fwriteMainArgs args) } t0 = wallclock(); - failed=0; // static global so checkBuffer can set it. -errno for malloc or realloc fails, +errno for write fail - bool hasPrinted = false; int maxBuffUsedPC = 0; @@ -795,41 +791,51 @@ void fwriteMain(fwriteMainArgs args) deflateEnd(&stream); } + errno=0; + char *buffPool = malloc(nth*(size_t)buffSize); + if (!buffPool) { + STOP("Unable to allocate %d MB * %d thread buffers; '%d: %s'. Please read ?fwrite for nThread, buffMB and verbose options.", + (size_t)buffSize/(1024^2), nth, errno, strerror(errno)); // # nocov + } + char *zbuffPool = NULL; + if (args.is_gzip) { + zbuffPool = malloc(nth*(size_t)zbuffSize); + if (!zbuffPool) { + free(buffPool); // # nocov + STOP("Unable to allocate %d MB * %d thread compressed buffers; '%d: %s'. Please read ?fwrite for nThread, buffMB and verbose options.", + (size_t)zbuffSize/(1024^2), nth, errno, strerror(errno)); // # nocov + } + } + + bool failed = false; // naked (unprotected by atomic) write to bool ok because only ever write true in this special paradigm + int failed_compress = 0; // the first thread to fail writes their reason here when they first get to ordered section + int failed_write = 0; // same. could use +ve and -ve in the same code but separate it out to trace Solaris problem, #3931 + #pragma omp parallel num_threads(nth) { - // local to each thread + #pragma omp single + { + nth = omp_get_num_threads(); // update nth with the actual nth (might be less than requested) + } + int me = omp_get_thread_num(); + int my_failed_compress = 0; char *ch, *myBuff; + ch = myBuff = buffPool + me*buffSize; + void *myzBuff = NULL; size_t myzbuffUsed = 0; - - // each thread has its own buffer. malloc and errno are thread-safe. - ch = myBuff = malloc(buffSize); - if (myBuff==NULL) failed=-errno; - // each thread has its own zbuffer z_stream mystream; - if (args.is_gzip && !failed) { - myzBuff = malloc(zbuffSize); - if (myzBuff==NULL) failed=-errno; - if(init_stream(&mystream)) { - failed = -998; // # nocov + if (args.is_gzip) { + myzBuff = zbuffPool + me*zbuffSize; + if (init_stream(&mystream)) { + failed = true; // # nocov + my_failed_compress = -998; // # nocov } } - // Do not rely on availability of '#omp cancel' new in OpenMP v4.0 (July 2013). - // OpenMP v4.0 is in gcc 4.9+ (https://gcc.gnu.org/wiki/openmp) but - // not yet in clang as of v3.8 (http://openmp.llvm.org/) - // If not-me failed, I'll see shared 'failed', fall through loop, free my buffer - // and after parallel section, single thread will call STOP() safely. - - #pragma omp single - { - nth = omp_get_num_threads(); // update nth with the actual nth (might be different than requested) - } - int me = omp_get_thread_num(); - #pragma omp for ordered schedule(dynamic) for(int64_t start=0; start=1 because 0-columns was caught earlier. write_chars(args.eol, &ch); // overwrite last sep with eol instead - if (failed) break; // this thread stop writing rows; fall through to clear up and STOP() below } // compress buffer if gzip if (args.is_gzip && !failed) { - if (!failed) { - myzbuffUsed = zbuffSize; - failed = compressbuff(&mystream, myzBuff, &myzbuffUsed, myBuff, (int)(ch-myBuff)); - } - deflateReset(&mystream); + myzbuffUsed = zbuffSize; + int ret = compressbuff(&mystream, myzBuff, &myzbuffUsed, myBuff, (int)(ch-myBuff)); + if (ret) { failed=true; my_failed_compress=ret; } + else deflateReset(&mystream); } #pragma omp ordered { - if (!failed) { // a thread ahead of me could have failed below while I was working or waiting above + if (failed) { + if (failed_compress==0 && my_failed_compress!=0) failed_compress = my_failed_compress; // # nocov + // else another thread could have failed below while I was working or waiting above; their reason got here first + } else { + errno=0; if (f==-1) { *ch='\0'; // standard C string end marker so DTPRINT knows where to stop DTPRINT(myBuff); - } else if ((args.is_gzip)) { + } else if (args.is_gzip) { if (WRITE(f, myzBuff, (int)(myzbuffUsed)) == -1) { - failed=errno; // # nocov + failed=true; // # nocov + failed_write=errno; // # nocov } } else if (WRITE(f, myBuff, (int)(ch - myBuff)) == -1) { - failed=errno; // # nocov + failed=true; // # nocov + failed_write=errno; // # nocov } int used = 100*((double)(ch-myBuff))/buffSize; // percentage of original buffMB @@ -921,9 +931,9 @@ void fwriteMain(fwriteMainArgs args) if (args.is_gzip) { deflateEnd(&mystream); } - free(myBuff); - free(myzBuff); } + free(buffPool); + free(zbuffPool); // Finished parallel region and can call R API safely now. if (hasPrinted) { @@ -943,12 +953,13 @@ void fwriteMain(fwriteMainArgs args) // If a write failed, the line above tries close() to clean up, but that might fail as well. So the // '&& !failed' is to not report the error as just 'closing file' but the next line for more detail // from the original error. - if (failed<0) { - STOP("Error %d: one or more threads failed to allocate buffers or there was a compression error." // # nocov - " Please try again with verbose=TRUE and try searching online for this error message.\n", failed); // # nocov - } else if (failed>0) { - STOP("%s: '%s'", strerror(failed), args.filename); // # nocov + if (failed) { + // nocov start + if (failed_compress) + STOP("Error %d: compression error. Please retry with verbose=TRUE and search online for this error message.\n", failed_compress); + if (failed_write) + STOP("%s: '%s'", strerror(failed_write), args.filename); + // nocov end } - return; } From 35b97b8255309df75818667208401ddd8ecfc90d Mon Sep 17 00:00:00 2001 From: mattdowle Date: Wed, 9 Oct 2019 10:12:34 -0700 Subject: [PATCH 2/5] verbose added just after test 1658.41 to trace on Solaris, and added news item --- NEWS.md | 2 ++ inst/tests/tests.Rraw | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/NEWS.md b/NEWS.md index fb10f4f81a..21d52722a6 100644 --- a/NEWS.md +++ b/NEWS.md @@ -12,6 +12,8 @@ 2. Compilation failed on CRAN's MacOS due to an older version of `zlib.h/zconf.h` which did not have `z_const` defined, [#3939](https://github.com/Rdatatable/data.table/issues/3939). Other open-source projects unrelated to R have experienced this problem on MacOS too. We have followed the common practice of removing `z_const` to support the older `zlib` versions, and data.table's release procedures have gained a `grep` to ensure `z_const` isn't used again by accident in future. The library `zlib` is used for `fwrite`'s new feature of multithreaded compression on-the-fly; see item 3 of 1.12.4 below. +3. An error, again in `fwrite`'s compression, but only observed so far on Solaris 32bit has been fixed, [#3931](https://github.com/Rdatatable/data.table/issues/3931): `Error -2: one or more threads failed to allocate buffers or there was a compression error.` In case it happens again, this area has been made more robust and the error more detailed. + ## NOTES diff --git a/inst/tests/tests.Rraw b/inst/tests/tests.Rraw index 8e471a2d3f..a93346cdf1 100644 --- a/inst/tests/tests.Rraw +++ b/inst/tests/tests.Rraw @@ -9639,8 +9639,8 @@ test(1658.40, fwrite(matrix(1:4, nrow=2, ncol=2, dimnames = list(c("ra","rb"),c( # fwrite compress test(1658.41, fwrite(data.table(a=c(1:3), b=c(1:3)), compress="gzip"), output='a,b\n1,1\n2,2\n3,3') # compress ignored on console DT = data.table(a=rep(1:2,each=100), b=rep(1:4,each=25)) -fwrite(DT, file=f1<-tempfile(fileext=".gz")) -fwrite(DT, file=f2<-tempfile()) +fwrite(DT, file=f1<-tempfile(fileext=".gz"), verbose=TRUE) # verbose temporary to trace #3931 on Solaris +fwrite(DT, file=f2<-tempfile(), verbose=TRUE) test(1658.42, file.info(f1)$size < file.info(f2)$size) # 74 < 804 (file.size() isn't available in R 3.1.0) if (test_R.utils) test(1658.43, fread(f1), DT) # use fread to decompress gz (works cross-platform) fwrite(DT, file=f3<-tempfile(), compress="gzip") # compress to filename not ending .gz From 2022457ddb0ff4a00b308572fafbd706fee6790f Mon Sep 17 00:00:00 2001 From: mattdowle Date: Wed, 9 Oct 2019 10:17:22 -0700 Subject: [PATCH 3/5] coverage --- src/fwrite.c | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/src/fwrite.c b/src/fwrite.c index efaf64ce73..56b13ce70b 100644 --- a/src/fwrite.c +++ b/src/fwrite.c @@ -794,16 +794,20 @@ void fwriteMain(fwriteMainArgs args) errno=0; char *buffPool = malloc(nth*(size_t)buffSize); if (!buffPool) { + // # nocov start STOP("Unable to allocate %d MB * %d thread buffers; '%d: %s'. Please read ?fwrite for nThread, buffMB and verbose options.", - (size_t)buffSize/(1024^2), nth, errno, strerror(errno)); // # nocov + (size_t)buffSize/(1024^2), nth, errno, strerror(errno)); + // # nocov end } char *zbuffPool = NULL; if (args.is_gzip) { zbuffPool = malloc(nth*(size_t)zbuffSize); if (!zbuffPool) { - free(buffPool); // # nocov + // # nocov start + free(buffPool); STOP("Unable to allocate %d MB * %d thread compressed buffers; '%d: %s'. Please read ?fwrite for nThread, buffMB and verbose options.", - (size_t)zbuffSize/(1024^2), nth, errno, strerror(errno)); // # nocov + (size_t)zbuffSize/(1024^2), nth, errno, strerror(errno)); + // # nocov end } } @@ -954,12 +958,12 @@ void fwriteMain(fwriteMainArgs args) // '&& !failed' is to not report the error as just 'closing file' but the next line for more detail // from the original error. if (failed) { - // nocov start + // # nocov start if (failed_compress) STOP("Error %d: compression error. Please retry with verbose=TRUE and search online for this error message.\n", failed_compress); if (failed_write) STOP("%s: '%s'", strerror(failed_write), args.filename); - // nocov end + // # nocov end } } From 6c3f98de832073d5c71e9cb10a81aa0d3692324b Mon Sep 17 00:00:00 2001 From: mattdowle Date: Wed, 9 Oct 2019 10:30:09 -0700 Subject: [PATCH 4/5] removed superfluous check on nth since the malloc is up front now --- src/fwrite.c | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/fwrite.c b/src/fwrite.c index 56b13ce70b..8c366630e5 100644 --- a/src/fwrite.c +++ b/src/fwrite.c @@ -817,10 +817,6 @@ void fwriteMain(fwriteMainArgs args) #pragma omp parallel num_threads(nth) { - #pragma omp single - { - nth = omp_get_num_threads(); // update nth with the actual nth (might be less than requested) - } int me = omp_get_thread_num(); int my_failed_compress = 0; char *ch, *myBuff; From e6761273cd1bfa4b9bf803e4a049e1eca96f897f Mon Sep 17 00:00:00 2001 From: mattdowle Date: Wed, 9 Oct 2019 11:27:11 -0700 Subject: [PATCH 5/5] temporary verbose removed just after test 1658.41. It was just in ad hoc version sent to Prof Ripley, just in case it still fails. --- inst/tests/tests.Rraw | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/inst/tests/tests.Rraw b/inst/tests/tests.Rraw index a93346cdf1..8e471a2d3f 100644 --- a/inst/tests/tests.Rraw +++ b/inst/tests/tests.Rraw @@ -9639,8 +9639,8 @@ test(1658.40, fwrite(matrix(1:4, nrow=2, ncol=2, dimnames = list(c("ra","rb"),c( # fwrite compress test(1658.41, fwrite(data.table(a=c(1:3), b=c(1:3)), compress="gzip"), output='a,b\n1,1\n2,2\n3,3') # compress ignored on console DT = data.table(a=rep(1:2,each=100), b=rep(1:4,each=25)) -fwrite(DT, file=f1<-tempfile(fileext=".gz"), verbose=TRUE) # verbose temporary to trace #3931 on Solaris -fwrite(DT, file=f2<-tempfile(), verbose=TRUE) +fwrite(DT, file=f1<-tempfile(fileext=".gz")) +fwrite(DT, file=f2<-tempfile()) test(1658.42, file.info(f1)$size < file.info(f2)$size) # 74 < 804 (file.size() isn't available in R 3.1.0) if (test_R.utils) test(1658.43, fread(f1), DT) # use fread to decompress gz (works cross-platform) fwrite(DT, file=f3<-tempfile(), compress="gzip") # compress to filename not ending .gz