diff --git a/DESCRIPTION b/DESCRIPTION index 41be910570..d05f4ec762 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -43,6 +43,7 @@ Authors@R: c( Depends: R (>= 3.1.0) Imports: methods Suggests: bit64, curl, R.utils, knitr, xts, nanotime, zoo +SystemRequirements: zlib Description: Fast aggregation of large data (e.g. 100GB in RAM), fast ordered joins, fast add/modify/delete of columns by group using no copies at all, list columns, friendly and fast character-separated-value read/write. Offers a natural and flexible syntax, for faster development. License: MPL-2.0 | file LICENSE URL: http://r-datatable.com diff --git a/NEWS.md b/NEWS.md index 435568c110..75d23fddf4 100644 --- a/NEWS.md +++ b/NEWS.md @@ -4,13 +4,21 @@ #### NEW FEATURES -1. New option `options(datatable.quiet = TRUE)` turns off the package startup message, [#3489](https://github.com/Rdatatable/data.table/issues/3489). `suppressPackageStartupMessages()` continues to work too. Thanks to @leobarlach for the suggestion inspired by `options(tidyverse.quiet = TRUE)`. We don't know of a way to make a package respect the `quietly=` option of `library()` and `require()` because the `quietly=` isn't passed through for use by the package's own `.onAttach`. If you can see how to do that, please submit a patch to R. +1. `rleid()` functions now support long vectors (length > 2 billion). -2. `rleid()` functions now support long vectors (length > 2 billion). - -3. `fread()`: +2. `fread()`: * now skips embedded `NUL` (`\0`), [#3400](https://github.com/Rdatatable/data.table/issues/3400). Thanks to Marcus Davy for reporting with examples, and Roy Storey for the initial PR. +3. `fwrite()`: + * now writes compressed `.gz` files directly, [#2016](https://github.com/Rdatatable/data.table/issues/2016). Compression, like `fwrite()`, is multithreaded and compresses each chunk on-the-fly (a full size intermediate file is not created). Use a ".gz" extension, or the new `compress=` option. Many thanks to Philippe Chataignon for the significant PR. For example: + + ```R + DT = data.table(A=rep(1:2,each=100), B=rep(1:4,each=25)) + fwrite(DT, "data.csv") # 804 bytes + fwrite(DT, "data.csv.gz") # 74 bytes + identical(DT, fread("data.csv.gz")) + ``` + 4. Assigning to one item of a list column no longer requires the RHS to be wrapped with `list` or `.()`, [#950](https://github.com/Rdatatable/data.table/issues/950). ```R > DT = data.table(A=1:3, B=list(1:2,"foo",3:5)) @@ -45,6 +53,8 @@ 3. A missing item in `j` such as `j=.(colA, )` now gives a helpful error (`Item 2 of the .() or list() passed to j is missing`) rather than the unhelpful error `argument "this_jsub" is missing, with no default` (v1.12.2) or `argument 2 is empty` (v1.12.0 and before), [#3507](https://github.com/Rdatatable/data.table/issues/3507). Thanks to @eddelbuettel for the report. +4. `fwrite()` could crash when writing very long strings such as 30 million characters, [#2974](https://github.com/Rdatatable/data.table/issues/2974), and could be unstable in memory constrained environments, [#2612](https://github.com/Rdatatable/data.table/issues/2612). Thanks to @logworthy and @zachokeeffe for reporting and Philippe Chataignon for fixing in PR [#3288](https://github.com/Rdatatable/data.table/pull/3288). + #### NOTES 1. `rbindlist`'s `use.names="check"` now emits its message for automatic column names (`"V[0-9]+"`) too, [#3484](https://github.com/Rdatatable/data.table/pull/3484). See news item 5 of v1.12.2 below. @@ -58,6 +68,8 @@ 3. `setorder` on a superset of a keyed `data.table`'s key now retains its key, [#3456](https://github.com/Rdatatable/data.table/issues/3456). For example, if `a` is the key of `DT`, `setorder(DT, a, -v)` will leave `DT` keyed by `a`. +4. New option `options(datatable.quiet = TRUE)` turns off the package startup message, [#3489](https://github.com/Rdatatable/data.table/issues/3489). `suppressPackageStartupMessages()` continues to work too. Thanks to @leobarlach for the suggestion inspired by `options(tidyverse.quiet = TRUE)`. We don't know of a way to make a package respect the `quietly=` option of `library()` and `require()` because the `quietly=` isn't passed through for use by the package's own `.onAttach`. If you can see how to do that, please submit a patch to R. + ### Changes in [v1.12.2](https://github.com/Rdatatable/data.table/milestone/14?closed=1) (07 Apr 2019) diff --git a/R/fwrite.R b/R/fwrite.R index 13c7ed9f55..4e8c337e59 100644 --- a/R/fwrite.R +++ b/R/fwrite.R @@ -7,9 +7,11 @@ fwrite <- function(x, file="", append=FALSE, quote="auto", dateTimeAs = c("ISO","squash","epoch","write.csv"), buffMB=8, nThread=getDTthreads(verbose), showProgress=getOption("datatable.showProgress", interactive()), + compress = c("auto", "none", "gzip"), verbose=getOption("datatable.verbose", FALSE)) { na = as.character(na[1L]) # fix for #1725 if (missing(qmethod)) qmethod = qmethod[1L] + if (missing(compress)) compress = compress[1L] if (missing(dateTimeAs)) { dateTimeAs = dateTimeAs[1L] } else if (length(dateTimeAs)>1L) stop("dateTimeAs must be a single string") dateTimeAs = chmatch(dateTimeAs, c("ISO","squash","epoch","write.csv"))-1L @@ -37,6 +39,7 @@ fwrite <- function(x, file="", append=FALSE, quote="auto", dec != sep, # sep2!=dec and sep2!=sep checked at C level when we know if list columns are present is.character(eol) && length(eol)==1L, length(qmethod) == 1L && qmethod %chin% c("double", "escape"), + length(compress) == 1L && compress %chin% c("auto", "none", "gzip"), isTRUEorFALSE(col.names), isTRUEorFALSE(append), isTRUEorFALSE(row.names), isTRUEorFALSE(verbose), isTRUEorFALSE(showProgress), isTRUEorFALSE(logical01), length(na) == 1L, #1725, handles NULL or character(0) input @@ -44,6 +47,9 @@ fwrite <- function(x, file="", append=FALSE, quote="auto", length(buffMB)==1L && !is.na(buffMB) && 1L<=buffMB && buffMB<=1024, length(nThread)==1L && !is.na(nThread) && nThread>=1L ) + + is_gzip <- compress == "gzip" || (compress == "auto" && grepl("\\.gz$", file)) + file <- path.expand(file) # "~/foo/bar" if (append && missing(col.names) && (file=="" || file.exists(file))) col.names = FALSE # test 1658.16 checks this @@ -70,7 +76,6 @@ fwrite <- function(x, file="", append=FALSE, quote="auto", file <- enc2native(file) # CfwriteR cannot handle UTF-8 if that is not the native encoding, see #3078. .Call(CfwriteR, x, file, sep, sep2, eol, na, dec, quote, qmethod=="escape", append, row.names, col.names, logical01, dateTimeAs, buffMB, nThread, - showProgress, verbose) + showProgress, is_gzip, verbose) invisible() } - diff --git a/inst/tests/tests.Rraw b/inst/tests/tests.Rraw index 1e9ea9c12d..8ad591f4b2 100644 --- a/inst/tests/tests.Rraw +++ b/inst/tests/tests.Rraw @@ -9467,6 +9467,23 @@ test(1658.34, fwrite(matrix(1:4, nrow=2, ncol=2), quote = TRUE), output = '"V1", test(1658.35, fwrite(matrix(1:3, nrow=3, ncol=1), quote = TRUE), output = '"V1"\n.*1\n2\n3', message = "x being coerced from class: matrix to data.table") test(1658.36, fwrite(matrix(1:4, nrow=2, ncol=2, dimnames = list(c("ra","rb"),c("ca","cb"))), quote = TRUE), output = '"ca","cb"\n.*1,3\n2,4', message = "x being coerced from class: matrix to data.table") +# fwrite compress +test(1658.37, fwrite(data.table(a=c(1:3), b=c(1:3)), compress="gzip"), output='a,b\n1,1\n2,2\n3,3') # compress ignored on console +DT = data.table(a=rep(1:2,each=100), b=rep(1:4,each=25)) +fwrite(DT, file=f1<-tempfile(fileext=".gz")) +fwrite(DT, file=f2<-tempfile()) +test(1658.38, file.size(f1) // INT32_MIN #include // isfinite, isnan #include // abs -#include // strlen, strerror +#include // strnlen (n for codacy), strerror + #ifdef WIN32 #include #include @@ -17,6 +18,8 @@ #define WRITE write #define CLOSE close #endif + +#include "zlib.h" // for writing gzip file #include "myomp.h" #include "fwrite.h" @@ -33,6 +36,10 @@ static bool qmethodEscape=false; // when quoting fields, how to escape dou static bool squashDateTime=false; // 0=ISO(yyyy-mm-dd) 1=squash(yyyymmdd) extern const char *getString(void *, int); +extern const int getStringLen(void *, int); +extern const int getMaxStringLen(void *, int64_t); +extern const int getMaxCategLen(void *); +extern const int getMaxListItemLen(void *, int64_t); extern const char *getCategString(void *, int); extern double wallclock(void); @@ -526,43 +533,47 @@ void writeCategString(void *col, int64_t row, char **pch) write_string(getCategString(col, row), pch); } +int compressbuff(void* dest, size_t *destLen, const void* source, size_t sourceLen) +{ + z_stream stream; + stream.zalloc = (alloc_func)0; + stream.zfree = (free_func)0; + stream.opaque = (voidpf)0; + + int err = deflateInit2(&stream, Z_DEFAULT_COMPRESSION, Z_DEFLATED, 31, 8, Z_DEFAULT_STRATEGY); + if (err != Z_OK) + return err; // # nocov + + stream.next_out = dest; + stream.avail_out = 0; + stream.next_in = (z_const Bytef *)source; + stream.avail_in = 0; + size_t left = *destLen; + const uInt uInt_max = (uInt)-1; // stream.avail_out is type uInt + do { + if (stream.avail_out == 0) { + stream.avail_out = left>uInt_max ? uInt_max : left; + left -= stream.avail_out; + } + if (stream.avail_in == 0) { + stream.avail_in = sourceLen>uInt_max ? uInt_max : sourceLen; + sourceLen -= stream.avail_in; + } + err = deflate(&stream, sourceLen ? Z_NO_FLUSH : Z_FINISH); + } while (err == Z_OK); + + *destLen = stream.total_out; + deflateEnd(&stream); + return err == Z_STREAM_END ? Z_OK : err; +} static int failed = 0; static int rowsPerBatch; -static inline void checkBuffer( - char **buffer, // this thread's buffer - size_t *myAlloc, // the size of this buffer - char **ch, // the end of the last line written to the buffer by this thread - size_t myMaxLineLen // the longest line seen so far by this thread - // Initial size for the thread's buffer is twice as big as needed for rowsPerBatch based on - // maxLineLen from the sample; i.e. only 50% of the buffer should be used. - // If we get to 75% used, we'll realloc. - // i.e. very cautious and grateful to the OS for not fetching untouched pages of buffer. - // Plus, more caution ... myMaxLineLine is tracked and if that grows we'll realloc too. - // Very long lines are caught up front and rowsPerBatch is set to 1 in that case. - // This checkBuffer() is called after every line. -) { - if (failed) return; // another thread already failed. Fall through and error(). - size_t thresh = 0.75*(*myAlloc); - if ((*ch > (*buffer)+thresh) || - (rowsPerBatch*myMaxLineLen > thresh )) { - size_t off = *ch-*buffer; - *myAlloc = 1.5*(*myAlloc); - *buffer = realloc(*buffer, *myAlloc); - if (*buffer==NULL) { - failed = -errno; // - for malloc/realloc errno, + for write errno - } else { - *ch = *buffer+off; // in case realloc moved the allocation - } - } -} - void fwriteMain(fwriteMainArgs args) { double startTime = wallclock(); double nextTime = startTime+2; // start printing progress meter in 2 sec if not completed by then - double t0 = startTime; na = args.na; sep = args.sep; @@ -577,32 +588,11 @@ void fwriteMain(fwriteMainArgs args) qmethodEscape = args.qmethodEscape; squashDateTime = args.squashDateTime; - // Estimate max line length of a 1000 row sample (100 rows in 10 places). - // 'Estimate' even of this sample because quote='auto' may add quotes and escape embedded quotes. - // Buffers will be resized later if there are too many line lengths outside the sample, anyway. - // maxLineLen is required to determine a reasonable rowsPerBatch. - - - // alloc one buffMB here. Keep rewriting each field to it, to sum up the size. Restriction: one field can't be - // greater that minimumum buffMB (1MB = 1 million characters). Otherwise unbounded overwrite. Possible with very - // very long single strings, or very long list column values. - // The caller guarantees no field with be longer than this. If so, it can set buffMB larger. It might know - // due to some stats it has maintained on each column or in the environment generally. - // However, a single field being longer than 1 million characters is considered a very reasonable restriction. - // Once we have a good line length estimate, we may increase the buffer size a lot anyway. - // The default buffMB is 8MB, so it's really 8 million character limit by default. 1MB is because user might set - // buffMB to 1, say if they have 512 CPUs or more, perhaps. - - // Cold section as only 1,000 rows. Speed not an issue issue here. - // Overestimating line length is ok. - int eolLen = strlen(args.eol); - if (eolLen<=0) STOP("eol must be 1 or more bytes (usually either \\n or \\r\\n) but is length %d", eolLen); + if (args.buffMB<1 || args.buffMB>1024) STOP("buffMB=%d outside [1,1024]", args.buffMB); + size_t buffSize = (size_t)1024*1024*args.buffMB; - int buffMB = args.buffMB; - if (buffMB<1 || buffMB>1024) STOP("buffMB=%d outside [1,1024]", buffMB); - size_t buffSize = (size_t)1024*1024*buffMB; - char *buff = malloc(buffSize); - if (!buff) STOP("Unable to allocate %dMB for line length estimation: %s", buffMB, strerror(errno)); + int eolLen=strnlen(args.eol, 1024), naLen=strnlen(args.na, 1024); // strnlen required by Codacy + if (eolLen<=0) STOP("eol must be 1 or more bytes (usually either \\n or \\r\\n) but is length %d", eolLen); if (args.verbose) { DTPRINT("Column writers: "); @@ -613,39 +603,51 @@ void fwriteMain(fwriteMainArgs args) DTPRINT("... "); for (int j=args.ncol-10; j 1 million bytes - args.funs[args.whichFun[j]]( args.columns[j], i, &ch ); - thisLineLen += (int)(ch-buff) + 1/*sep*/; // see comments above about restrictions/guarantees/contracts + // Calculate upper bound for line length. Numbers use a fixed maximum (e.g. 12 for integer) while strings find the longest + // string in each column. Upper bound is then the sum of the columns' max widths. + // This upper bound is required to determine a reasonable rowsPerBatch. It also saves needing to grow the buffers which + // is especially tricky when compressing, and saves needing to check/limit the buffer writing because we know + // up front the buffer does have sufficient capacity. + // A large overestimate (e.g. 2-5x too big) is ok, provided it is not so large that the buffers can't be allocated. + // Do this first so that, for example, any unsupported types in list columns happen first before opening file (which + // could be console output) and writing column names to it. + + double t0 = wallclock(); + size_t maxLineLen = eolLen + args.ncol*(2*(doQuote!=0) + 1/*sep*/); + if (args.doRowNames) { + maxLineLen += args.rowNames ? getMaxStringLen(args.rowNames, args.nrow)*2 : 1+(int)log10(args.nrow); // the width of the row number + maxLineLen += 2*(doQuote!=0/*NA('auto') or true*/) + 1/*sep*/; + } + for (int j=0; j maxLineLen) maxLineLen = thisLineLen; } + if (width> column name) + char *buff = malloc(headerLen); + if (!buff) STOP("Unable to allocate %d MiB for header: %s", headerLen / 1024 / 1024, strerror(errno)); char *ch = buff; if (args.doRowNames) { // Unusual: the extra blank column name when row_names are added as the first column @@ -683,28 +687,38 @@ void fwriteMain(fwriteMainArgs args) } for (int j=0; j 1 million bytes long - *ch++ = args.sep; // this sep after the last column name won't be written to the file + *ch++ = sep; } + ch--; // backup over the last sep + write_chars(args.eol, &ch); if (f==-1) { - DTPRINT(args.eol); - } else if (WRITE(f, args.eol, eolLen)==-1) { - int errwrite=errno; - close(f); + *ch = '\0'; + DTPRINT(buff); free(buff); - STOP("%s: '%s'", strerror(errwrite), args.filename); + } else { + int ret1=0, ret2=0; + if (args.is_gzip) { + size_t zbuffSize = headerLen + headerLen/10 + 16; + char *zbuff = malloc(zbuffSize); + if (!zbuff) {free(buff); STOP("Unable to allocate %d MiB for zbuffer: %s", zbuffSize / 1024 / 1024, strerror(errno));} + size_t zbuffUsed = zbuffSize; + ret1 = compressbuff(zbuff, &zbuffUsed, buff, (int)(ch-buff)); + if (ret1==0) ret2 = WRITE(f, zbuff, (int)zbuffUsed); + free(zbuff); + } else { + ret2 = WRITE(f, buff, (int)(ch-buff)); + } + free(buff); + if (ret1 || ret2==-1) { + // # nocov start + int errwrite = errno; // capture write errno now incase close fails with a different errno + CLOSE(f); + if (ret1) STOP("Compress gzip error: %d", ret1); + else STOP("%s: '%s'", strerror(errwrite), args.filename); + // # nocov end + } } } - free(buff); // TODO: also to be free'd in cleanup when there's an error opening file above if (args.verbose) DTPRINT("done in %.3fs\n", 1.0*(wallclock()-t0)); if (args.nrow == 0) { if (args.verbose) DTPRINT("No data rows present (nrow==0)\n"); @@ -713,18 +727,11 @@ void fwriteMain(fwriteMainArgs args) } // Decide buffer size and rowsPerBatch for each thread - // Once rowsPerBatch is decided it can't be changed, but we can increase buffer size if the lines - // turn out to be longer than estimated from the sample. - // buffSize large enough to fit many lines to i) reduce calls to write() and ii) reduce thread sync points - // It doesn't need to be small in cache because it's written contiguously. - // If we don't use all the buffer for any reasons that's ok as OS will only getch the cache lines touched. - // So, generally the larger the better up to max filesize/nth to use all the threads. A few times - // smaller than that though, to achieve some load balancing across threads since schedule(dynamic). - if (maxLineLen > buffSize) buffSize=2*maxLineLen; // A very long line; at least 1,048,576 characters (since min(buffMB)==1) - rowsPerBatch = - (10*maxLineLen > buffSize) ? 1 : // very very long lines (100,000 characters+) each thread will just do one row at a time. - 0.5 * buffSize/maxLineLen; // Aim for 50% buffer usage. See checkBuffer for comments. + // Once rowsPerBatch is decided it can't be changed + if (maxLineLen*2>buffSize) { buffSize=2*maxLineLen; rowsPerBatch=2; } + else rowsPerBatch = buffSize / maxLineLen; if (rowsPerBatch > args.nrow) rowsPerBatch = args.nrow; + if (rowsPerBatch < 1) rowsPerBatch = 1; int numBatches = (args.nrow-1)/rowsPerBatch + 1; int nth = args.nth; if (numBatches < nth) nth = numBatches; @@ -736,27 +743,31 @@ void fwriteMain(fwriteMainArgs args) t0 = wallclock(); failed=0; // static global so checkBuffer can set it. -errno for malloc or realloc fails, +errno for write fail + bool hasPrinted=false; - bool anyBufferGrown=false; int maxBuffUsedPC=0; #pragma omp parallel num_threads(nth) { char *ch, *myBuff; // local to each thread ch = myBuff = malloc(buffSize); // each thread has its own buffer. malloc and errno are thread-safe. - if (myBuff==NULL) {failed=-errno;} + if (myBuff==NULL) failed=-errno; + + size_t myzbuffUsed = 0; + size_t myzbuffSize = 0; + void *myzBuff = NULL; + + if(args.is_gzip && !failed){ + myzbuffSize = buffSize + buffSize/10 + 16; + myzBuff = malloc(myzbuffSize); + if (myzBuff==NULL) failed=-errno; + } // Do not rely on availability of '#omp cancel' new in OpenMP v4.0 (July 2013). // OpenMP v4.0 is in gcc 4.9+ (https://gcc.gnu.org/wiki/openmp) but // not yet in clang as of v3.8 (http://openmp.llvm.org/) // If not-me failed, I'll see shared 'failed', fall through loop, free my buffer // and after parallel section, single thread will call STOP() safely. - size_t myAlloc = buffSize; - size_t myMaxLineLen = maxLineLen; - // so we can realloc(). Should only be needed if there are very long lines that are - // much longer than occurred in the sample for maxLineLen; e.g. unusally long string values - // that didn't occur in the sample, or list columns with some very long vectors in some cells. - #pragma omp single { nth = omp_get_num_threads(); // update nth with the actual nth (might be different than requested) @@ -768,7 +779,6 @@ void fwriteMain(fwriteMainArgs args) if (failed) continue; // Not break. See comments above about #omp cancel int64_t end = ((args.nrow - start)=1 because 0-columns was caught earlier. write_chars(args.eol, &ch); // overwrite last sep with eol instead - - // Track longest line seen so far. If we start to see longer lines than we saw in the - // sample, we'll realloc the buffer. The rowsPerBatch chosen based on the (very good) sample, - // must fit in the buffer. Can't early write and reset buffer because the - // file output would be out-of-order. Can't change rowsPerBatch after the 'parallel for' started. - size_t thisLineLen = ch-lineStart; - if (thisLineLen > myMaxLineLen) myMaxLineLen=thisLineLen; - checkBuffer(&myBuff, &myAlloc, &ch, myMaxLineLen); if (failed) break; // this thread stop writing rows; fall through to clear up and STOP() below } + // compress buffer if gzip + if (args.is_gzip && !failed) { + myzbuffUsed = myzbuffSize; + failed = compressbuff(myzBuff, &myzbuffUsed, myBuff, (int)(ch-myBuff)); + } #pragma omp ordered { if (!failed) { // a thread ahead of me could have failed below while I was working or waiting above if (f==-1) { *ch='\0'; // standard C string end marker so DTPRINT knows where to stop DTPRINT(myBuff); - // nth==1 at this point since when file=="" (f==-1 here) fwrite.R calls setDTthreads(1) - // Although this ordered section is one-at-a-time it seems that calling Rprintf() here, even with a - // R_FlushConsole() too, causes corruptions on Windows but not on Linux. At least, as observed so - // far using capture.output(). Perhaps Rprintf() updates some state or allocation that cannot be done - // by slave threads, even when one-at-a-time. Anyway, made this single-threaded when output to console - // to be safe (setDTthreads(1) in fwrite.R) since output to console doesn't need to be fast. - } else { - if (WRITE(f, myBuff, (int)(ch-myBuff)) == -1) { - failed=errno; + } else if ((args.is_gzip)) { + if (WRITE(f, myzBuff, (int)(myzbuffUsed)) == -1) { + failed=errno; // # nocov } - if (myAlloc > buffSize) anyBufferGrown = true; - int used = 100*((double)(ch-myBuff))/buffSize; // percentage of original buffMB - if (used > maxBuffUsedPC) maxBuffUsedPC = used; - double now; - if (me==0 && args.showProgress && (now=wallclock())>=nextTime && !failed) { - // See comments above inside the f==-1 clause. - // Not only is this ordered section one-at-a-time but we'll also Rprintf() here only from the - // master thread (me==0) and hopefully this will work on Windows. If not, user should set - // showProgress=FALSE until this can be fixed or removed. - int ETA = (int)((args.nrow-end)*((now-startTime)/end)); - if (hasPrinted || ETA >= 2) { - if (args.verbose && !hasPrinted) DTPRINT("\n"); - DTPRINT("\rWritten %.1f%% of %d rows in %d secs using %d thread%s. " - "anyBufferGrown=%s; maxBuffUsed=%d%%. ETA %d secs. ", - (100.0*end)/args.nrow, args.nrow, (int)(now-startTime), nth, nth==1?"":"s", - anyBufferGrown?"yes":"no", maxBuffUsedPC, ETA); - // TODO: use progress() as in fread - nextTime = now+1; - hasPrinted = true; - } + } else if (WRITE(f, myBuff, (int)(ch - myBuff)) == -1) { + failed=errno; // # nocov + } + + int used = 100*((double)(ch-myBuff))/buffSize; // percentage of original buffMB + if (used > maxBuffUsedPC) maxBuffUsedPC = used; + double now; + if (me==0 && args.showProgress && (now=wallclock())>=nextTime && !failed) { + // See comments above inside the f==-1 clause. + // Not only is this ordered section one-at-a-time but we'll also Rprintf() here only from the + // master thread (me==0) and hopefully this will work on Windows. If not, user should set + // showProgress=FALSE until this can be fixed or removed. + // # nocov start + int ETA = (int)((args.nrow-end)*((now-startTime)/end)); + if (hasPrinted || ETA >= 2) { + if (args.verbose && !hasPrinted) DTPRINT("\n"); + DTPRINT("\rWritten %.1f%% of %d rows in %d secs using %d thread%s. " + "maxBuffUsed=%d%%. ETA %d secs. ", + (100.0*end)/args.nrow, args.nrow, (int)(now-startTime), nth, nth==1?"":"s", + maxBuffUsedPC, ETA); + // TODO: use progress() as in fread + nextTime = now+1; + hasPrinted = true; } - // May be possible for master thread (me==0) to call R_CheckUserInterrupt() here. - // Something like: - // if (me==0) { - // failed = TRUE; // inside ordered here; the slaves are before ordered and not looking at 'failed' - // R_CheckUserInterrupt(); - // failed = FALSE; // no user interrupt so return state - // } - // But I fear the slaves will hang waiting for the master (me==0) to complete the ordered - // section which may not happen if the master thread has been interrupted. Rather than - // seeing failed=TRUE and falling through to free() and close() as intended. - // Could register a finalizer to free() and close() perhaps : - // [r-devel] http://r.789695.n4.nabble.com/checking-user-interrupts-in-C-code-tp2717528p2717722.html - // Conclusion for now: do not provide ability to interrupt. - // write() errors and malloc() fails will be caught and cleaned up properly, however. + // # nocov end } + // May be possible for master thread (me==0) to call R_CheckUserInterrupt() here. + // Something like: + // if (me==0) { + // failed = TRUE; // inside ordered here; the slaves are before ordered and not looking at 'failed' + // R_CheckUserInterrupt(); + // failed = FALSE; // no user interrupt so return state + // } + // But I fear the slaves will hang waiting for the master (me==0) to complete the ordered + // section which may not happen if the master thread has been interrupted. Rather than + // seeing failed=TRUE and falling through to free() and close() as intended. + // Could register a finalizer to free() and close() perhaps : + // [r-devel] http://r.789695.n4.nabble.com/checking-user-interrupts-in-C-code-tp2717528p2717722.html + // Conclusion for now: do not provide ability to interrupt. + // write() errors and malloc() fails will be caught and cleaned up properly, however. ch = myBuff; // back to the start of my buffer ready to fill it up again } } } - free(myBuff); // all threads will call this free on their buffer, even if one or more threads had malloc // or realloc fail. If the initial malloc failed, free(NULL) is ok and does nothing. + free(myBuff); + free(myzBuff); } + // Finished parallel region and can call R API safely now. if (hasPrinted) { - if (!failed) { - // clear the progress meter + // # nocov start + if (!failed) { // clear the progress meter DTPRINT("\r " " \r"); - } else { - // unless failed as we'd like to see anyBufferGrown and maxBuffUsedPC + } else { // don't clear any potentially helpful output before error DTPRINT("\n"); } + // # nocov end } + if (f!=-1 && CLOSE(f) && !failed) - STOP("%s: '%s'", strerror(errno), args.filename); + STOP("%s: '%s'", strerror(errno), args.filename); // # nocov // quoted '%s' in case of trailing spaces in the filename // If a write failed, the line above tries close() to clean up, but that might fail as well. So the // '&& !failed' is to not report the error as just 'closing file' but the next line for more detail // from the original error. if (failed<0) { - STOP("%s. One or more threads failed to malloc or realloc their private buffer. nThread=%d and initial buffMB per thread was %d.\n", - strerror(-failed), nth, args.buffMB); + STOP("Error %d: one or more threads failed to allocate buffers or there was a compression error." // # nocov + " Please try again with verbose=TRUE and try searching online for this error message.\n", failed); // # nocov } else if (failed>0) { - STOP("%s: '%s'", strerror(failed), args.filename); + STOP("%s: '%s'", strerror(failed), args.filename); // # nocov } - if (args.verbose) DTPRINT("done (actual nth=%d, anyBufferGrown=%s, maxBuffUsed=%d%%)\n", - nth, anyBufferGrown?"yes":"no", maxBuffUsedPC); return; } diff --git a/src/fwrite.h b/src/fwrite.h index 2a6933b785..406cdf9d7d 100644 --- a/src/fwrite.h +++ b/src/fwrite.h @@ -26,20 +26,50 @@ void writeList(); void write_chars(const char *source, char **dest); +typedef enum { // same order as fun[] above + WF_Bool8, + WF_Bool32, + WF_Bool32AsString, + WF_Int32, + WF_Int64, + WF_Float64, + WF_ITime, + WF_DateInt32, + WF_DateFloat64, + WF_POSIXct, + WF_Nanotime, + WF_String, + WF_CategString, + WF_List +} WFs; + +static const int writerMaxLen[] = { // same order as fun[] and WFs above; max field width used for calculating upper bound line length + 5, //&writeBool8 "false" + 5, //&writeBool32 "false" + 5, //&writeBool32AsString "false" + 11, //&writeInt32 "-2147483647" + 20, //&writeInt64 "-9223372036854775807" + 29, //&writeFloat64 "-3.141592653589793115998E-123" [max sf 22 consistent with options()$digits] + 32, //&writeITime + 16, //&writeDateInt32 + 16, //&writeDateFloat64 + 32, //&writePOSIXct + 48, //&writeNanotime + 0, //&writeString + 0, //&writeCategString + 0, //&writeList +}; + typedef struct fwriteMainArgs { // Name of the file to open (a \0-terminated C string). If the file name // contains non-ASCII characters, it should be UTF-8 encoded (however fread // will not validate the encoding). const char *filename; - int ncol; - int64_t nrow; - // a vector of pointers to all-same-length column vectors void **columns; - writer_fun_t *funs; // a vector of writer_fun_t function pointers // length ncol vector containing which fun[] to use for each column @@ -48,19 +78,12 @@ typedef struct fwriteMainArgs uint8_t *whichFun; void *colNames; // NULL means no header, otherwise ncol strings - bool doRowNames; // optional, likely false - void *rowNames; // if doRowNames is true and rowNames is not NULL then they're used, otherwise row numbers are output. - char sep; - char sep2; - char dec; - const char *eol; - const char *na; // The quote character is always " (ascii 34) and cannot be changed since nobody on Earth uses a different quoting character, surely @@ -69,19 +92,13 @@ typedef struct fwriteMainArgs int8_t doQuote; bool qmethodEscape; // true means escape quotes using backslash, else double-up double quotes. - bool squashDateTime; - bool append; - int buffMB; // [1-1024] default 8MB - int nth; - bool showProgress; - + bool is_gzip; bool verbose; - } fwriteMainArgs; void fwriteMain(fwriteMainArgs args); diff --git a/src/fwriteR.c b/src/fwriteR.c index e3affcc3dc..5f6995eeb6 100644 --- a/src/fwriteR.c +++ b/src/fwriteR.c @@ -1,4 +1,3 @@ - #include #include "data.table.h" #include "fwrite.h" @@ -20,6 +19,29 @@ const char *getString(SEXP *col, int64_t row) { // TODO: inline for use in fwr return x==NA_STRING ? NULL : CHAR(x); } +const int getStringLen(SEXP *col, int64_t row) { + return LENGTH(col[row]); // LENGTH of CHARSXP is nchar +} + +const int getMaxStringLen(const SEXP *col, const int64_t n) { + int max=0; + SEXP last=NULL; + for (int i=0; imax) max=thisnchar; + last = this; + } + return max; +} + +const int getMaxCategLen(SEXP col) { + col = getAttrib(col, R_LevelsSymbol); + if (!isString(col)) error("Internal error: col passed to getMaxCategLen is missing levels"); + return getMaxStringLen( STRING_PTR(col), LENGTH(col) ); +} + const char *getCategString(SEXP col, int64_t row) { // the only writer that needs to have the header of the SEXP column, to get to the levels int x = INTEGER(col)[row]; @@ -43,31 +65,13 @@ writer_fun_t funs[] = { &writeList }; -typedef enum { // same order as fun[] above - WF_Bool8, - WF_Bool32, - WF_Bool32AsString, - WF_Int32, - WF_Int64, - WF_Float64, - WF_ITime, - WF_DateInt32, - WF_DateFloat64, - WF_POSIXct, - WF_Nanotime, - WF_String, - WF_CategString, - WF_List -} WFs; - static int32_t whichWriter(SEXP); void writeList(SEXP *col, int64_t row, char **pch) { SEXP v = col[row]; int32_t wf = whichWriter(v); - if (TYPEOF(v)==VECSXP || wf==INT32_MIN) { - error("Row %d of list column is type '%s' - not yet implemented. fwrite() can write list columns containing atomic vectors of type logical, integer, integer64, double, character and factor, currently.", - row+1, type2char(TYPEOF(v))); + if (TYPEOF(v)==VECSXP || wf==INT32_MIN || isFactor(v)) { + error("Internal error: getMaxListItemLen should have caught this up front."); // # nocov } char *ch = *pch; write_chars(sep2start, &ch); @@ -82,6 +86,32 @@ void writeList(SEXP *col, int64_t row, char **pch) { *pch = ch; } +const int getMaxListItemLen(const SEXP *col, const int64_t n) { + int max=0; + SEXP last=NULL; + for (int i=0; imax) max=width; + last = this; + } + return max; +} + static int32_t whichWriter(SEXP column) { // int32_t is returned here just so the caller can output nice context-full error message should INT32_MIN be returned // the caller then passes uint8_t to fwriteMain @@ -128,10 +158,13 @@ SEXP fwriteR( SEXP buffMB_Arg, // [1-1024] default 8MB SEXP nThread_Arg, SEXP showProgress_Arg, - SEXP verbose_Arg) + SEXP is_gzip_Arg, + SEXP verbose_Arg + ) { if (!isNewList(DF)) error("fwrite must be passed an object of type list; e.g. data.frame, data.table"); fwriteMainArgs args; + args.is_gzip = LOGICAL(is_gzip_Arg)[0]; args.verbose = LOGICAL(verbose_Arg)[0]; args.filename = CHAR(STRING_ELT(filename_Arg, 0)); args.ncol = length(DF);