Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@

1. `gforce()` now allocates the correct amount of memory for the data.table with more than 1e9 rows, [#4295](https://github.com/Rdatatable/data.table/issues/4295) and [#4818](https://github.com/Rdatatable/data.table/issues/4818). Before the fixing, data.table could throw an error "Failed to allocate counts or TMP when assigning g in gforce", due to an integer overflow when `malloc()` memories. Thanks to @renkun-ken and @jangorecki for reporting and @shrektan for fixing.

2. `fwrite()`'s mutithreaded `gzip` compression should now work on Solaris, [#4099](https://github.com/Rdatatable/data.table/issues/4099). Since this feature was released in Oct 2019 (see item 3 in v1.12.4 below in this news file) we have been successively adding more and more detailed tracing to the output in each release, culminating in tracing `zlib` internals at byte level by reading `zlib`'s source. The problem did not manifest itself on [R-hub](https://builder.r-hub.io/)'s Solaris instances, so we had to work via CRAN output. If `zlib`'s `z_stream` structure is declared inside a parallel region but before a parallel for, it appears that the particular OpenMP implementation used by CRAN's Solaris moves the structure to a new address on entering the parallel for. Ordinarily this memory move would not matter, however, `zlib` internals have a self reference pointer to the parent, and check that the pointers match. This mismatch caused the -2 (Z_STREAM_ERROR). Allocating an array of structures, one for each thread, before the parallel region avoids the memory move with no cost. Other than less elegant internal code needing a comment explaining why it is done that way in this case.
It should be carefully noted that we cannot be sure it really is a problem unique to CRAN's Solaris. Even if it seems that way after one year of observations. For example, it could be compiler flags, or particular memory circumstances, either of which could occur on other operating systems too. However, we are unaware of why it would make sense for the OpenMP implementation to move the structure at that point. For example, any optimizations, such as aligning the set of structures to cache line boundaries, could be performed at the start of the parallel region, not after the parallel for. If anyone finding this news items knows more, please let us know.

## NOTES


Expand Down
26 changes: 17 additions & 9 deletions src/fwrite.c
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,7 @@ void writeCategString(const void *col, int64_t row, char **pch)
}

int init_stream(z_stream *stream) {
memset(stream, 0, sizeof(z_stream)); // shouldn't be needed, done as part of #4099 to be sure
stream->next_in = Z_NULL;
stream->zalloc = Z_NULL;
stream->zfree = Z_NULL;
Expand Down Expand Up @@ -857,6 +858,12 @@ void fwriteMain(fwriteMainArgs args)
int failed_write = 0; // same. could use +ve and -ve in the same code but separate it out to trace Solaris problem, #3931

if (nth>1) verbose=false; // printing isn't thread safe (there's a temporary print in compressbuff for tracing solaris; #4099)

z_stream thread_streams[nth];
// VLA on stack should be fine for nth structs; in zlib v1.2.11 sizeof(struct)==112 on 64bit
// not declared inside the parallel region because solaris appears to move the struct in
// memory when the #pragma omp for is entered, which causes zlib's internal self reference
// pointer to mismatch, #4099

#pragma omp parallel num_threads(nth)
{
Expand All @@ -867,19 +874,20 @@ void fwriteMain(fwriteMainArgs args)

void *myzBuff = NULL;
size_t myzbuffUsed = 0;
z_stream mystream = {0};
z_stream *mystream = &thread_streams[me];
if (args.is_gzip) {
myzBuff = zbuffPool + me*zbuffSize;
if (init_stream(&mystream)) { // this should be thread safe according to zlib documentation
if (init_stream(mystream)) { // this should be thread safe according to zlib documentation
failed = true; // # nocov
my_failed_compress = -998; // # nocov
}
if (verbose) {DTPRINT(_("z_stream for data (%d): "), 1); print_z_stream(&mystream);}
if (verbose) {DTPRINT(_("z_stream for data (%d): "), 1); print_z_stream(mystream);}
}

#pragma omp for ordered schedule(dynamic)
for(int64_t start=0; start<args.nrow; start+=rowsPerBatch) {
if (failed) continue; // Not break. Because we don't use #omp cancel yet.
if (verbose && args.is_gzip) {DTPRINT(_("z_stream for data (%d): "), 2); print_z_stream(mystream);} // extra trace point referred to in #4099
int64_t end = ((args.nrow - start)<rowsPerBatch) ? args.nrow : start + rowsPerBatch;
for (int64_t i=start; i<end; i++) {
// Tepid starts here (once at beginning of each per line)
Expand All @@ -906,19 +914,19 @@ void fwriteMain(fwriteMainArgs args)
// compress buffer if gzip
if (args.is_gzip && !failed) {
myzbuffUsed = zbuffSize;
if (verbose) {DTPRINT(_("z_stream for data (%d): "), 2); print_z_stream(&mystream);}
int ret = compressbuff(&mystream, myzBuff, &myzbuffUsed, myBuff, (size_t)(ch-myBuff));
if (verbose) {DTPRINT(_("z_stream for data (%d): "), 3); print_z_stream(&mystream);}
if (verbose) {DTPRINT(_("z_stream for data (%d): "), 3); print_z_stream(mystream);}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

question: supposing this works, would you then aim to remove all the extra verbose printing for tracing in the next release? (less maintenance overhead / tidier code vs. ease of setting up the in-depth tracing should a similar error recur later on)

curious your thinking

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes will just remove the extra tracing. can always get it back from the history should we need it again

int ret = compressbuff(mystream, myzBuff, &myzbuffUsed, myBuff, (size_t)(ch-myBuff));
if (verbose) {DTPRINT(_("z_stream for data (%d): "), 4); print_z_stream(mystream);}
if (ret) { failed=true; my_failed_compress=ret; }
else deflateReset(&mystream);
else deflateReset(mystream);
}
#pragma omp ordered
{
if (failed) {
// # nocov start
if (failed_compress==0 && my_failed_compress!=0) {
failed_compress = my_failed_compress;
if (mystream.msg!=NULL) strncpy(failed_msg, mystream.msg, 1000); // copy zlib's msg for safe use after deflateEnd just in case zlib allocated the message
if (mystream->msg!=NULL) strncpy(failed_msg, mystream->msg, 1000); // copy zlib's msg for safe use after deflateEnd just in case zlib allocated the message
}
// else another thread could have failed below while I was working or waiting above; their reason got here first
// # nocov end
Expand Down Expand Up @@ -976,7 +984,7 @@ void fwriteMain(fwriteMainArgs args)
// all threads will call this free on their buffer, even if one or more threads had malloc
// or realloc fail. If the initial malloc failed, free(NULL) is ok and does nothing.
if (args.is_gzip) {
deflateEnd(&mystream);
deflateEnd(mystream);
}
}
free(buffPool);
Expand Down