Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

3. An error, again in `fwrite`'s compression, but only observed so far on Solaris 32bit has been fixed, [#3931](https://github.com/Rdatatable/data.table/issues/3931): `Error -2: one or more threads failed to allocate buffers or there was a compression error.` In case it happens again, this area has been made more robust and the error more detailed.

4. A leak could occur in the event of an unsupported column type error, or if working memory cannot all be allocated; [#3940](https://github.com/Rdatatable/data.table/issues/3940). Found thanks to `clang`'s Leak Sanitizer (prompted by CRAN's diligent use of latest tools), and two tests in the test suite which tested the unsupported type error.

## NOTES


Expand Down
87 changes: 48 additions & 39 deletions src/forder.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,14 +54,17 @@ static uint8_t **key = NULL;
static int *anso = NULL;
static bool notFirst=false;

#define Error(...) do {cleanup(); error(__VA_ARGS__);} while(0) // http://gcc.gnu.org/onlinedocs/cpp/Swallowing-the-Semicolon.html#Swallowing-the-Semicolon
static char msg[1001];
#define STOP(...) do {snprintf(msg, 1000, __VA_ARGS__); cleanup(); error(msg);} while(0) // http://gcc.gnu.org/onlinedocs/cpp/Swallowing-the-Semicolon.html#Swallowing-the-Semicolon
// use STOP in this file (not error()) to ensure cleanup() is called first
// snprintf to msg first in case nrow (just as an example) is provided in the message because cleanup() sets nrow to 0
#undef warning
#define warning(...) Do not use warning in this file // since it can be turned to error via warn=2
/* Using OS realloc() in this file to benefit from (often) in-place realloc() to save copy
* We have to trap on exit anyway to call savetl_end().
* NB: R_alloc() would be more convenient (fails within) and robust (auto free) but there is no R_realloc(). Implementing R_realloc() would be an alloc and copy, iiuc.
* Calloc/Realloc needs to be Free'd, even before error() [R-exts$6.1.2]. An oom within Calloc causes a previous Calloc to leak so Calloc would still needs to be trapped anyway.
* Therefore, using <<if (!malloc()) Error("helpful context msg")>> approach to cleanup() on error.
* Therefore, using <<if (!malloc()) STOP("helpful context msg")>> approach to cleanup() on error.
*/

static void free_ustr() {
Expand All @@ -88,7 +91,7 @@ static void cleanup() {
free(cradix_counts); cradix_counts=NULL;
free(cradix_xtmp); cradix_xtmp=NULL;
free_ustr();
if (key!=NULL) for (int i=0; i<nradix; i++) free(key[i]);
if (key!=NULL) { int i=0; while (key[i]!=NULL) free(key[i++]); } // ==nradix, other than rare cases e.g. tests 1844.5-6 (#3940), and if a calloc fails
free(key); key=NULL; nradix=0;
savetl_end(); // Restore R's own usage of tl. Must run after the for loop in free_ustr() since only CHARSXP which had tl>0 (R's usage) are stored there.
}
Expand All @@ -100,7 +103,7 @@ static void push(const int *x, const int n) {
if (gs_thread_alloc[me] < newn) {
gs_thread_alloc[me] = (newn < nrow/3) ? (1+(newn*2)/4096)*4096 : nrow; // [2|3] to not overflow and 3 not 2 to avoid allocating close to nrow (nrow groups occurs when all size 1 groups)
gs_thread[me] = realloc(gs_thread[me], gs_thread_alloc[me]*sizeof(int));
if (gs_thread[me]==NULL) Error("Failed to realloc thread private group size buffer to %d*4bytes", (int)gs_thread_alloc[me]);
if (gs_thread[me]==NULL) STOP("Failed to realloc thread private group size buffer to %d*4bytes", (int)gs_thread_alloc[me]);
}
memcpy(gs_thread[me]+gs_thread_n[me], x, n*sizeof(int));
gs_thread_n[me] += n;
Expand All @@ -114,7 +117,7 @@ static void flush() {
if (gs_alloc < newn) {
gs_alloc = (newn < nrow/3) ? (1+(newn*2)/4096)*4096 : nrow;
gs = realloc(gs, gs_alloc*sizeof(int));
if (gs==NULL) Error("Failed to realloc group size result to %d*4bytes", (int)gs_alloc);
if (gs==NULL) STOP("Failed to realloc group size result to %d*4bytes", (int)gs_alloc);
}
memcpy(gs+gs_n, gs_thread[me], n*sizeof(int));
gs_n += n;
Expand Down Expand Up @@ -257,7 +260,7 @@ static void cradix_r(SEXP *xsub, int n, int radix)
memset(thiscounts, 0, 256*sizeof(int));
return;
}
if (thiscounts[0] != 0) Error("Logical error. counts[0]=%d in cradix but should have been decremented to 0. radix=%d", thiscounts[0], radix);
if (thiscounts[0] != 0) STOP("Logical error. counts[0]=%d in cradix but should have been decremented to 0. radix=%d", thiscounts[0], radix);
itmp = 0;
for (int i=1; i<256; i++) {
if (thiscounts[i] == 0) continue;
Expand All @@ -272,9 +275,9 @@ static void cradix_r(SEXP *xsub, int n, int radix)
static void cradix(SEXP *x, int n)
{
cradix_counts = (int *)calloc(ustr_maxlen*256, sizeof(int)); // counts for the letters of left-aligned strings
if (!cradix_counts) Error("Failed to alloc cradix_counts");
if (!cradix_counts) STOP("Failed to alloc cradix_counts");
cradix_xtmp = (SEXP *)malloc(ustr_n*sizeof(SEXP));
if (!cradix_xtmp) Error("Failed to alloc cradix_tmp");
if (!cradix_xtmp) STOP("Failed to alloc cradix_tmp");
cradix_r(x, n, 0);
free(cradix_counts); cradix_counts=NULL;
free(cradix_xtmp); cradix_xtmp=NULL;
Expand All @@ -285,8 +288,8 @@ static void range_str(SEXP *x, int n, uint64_t *out_min, uint64_t *out_max, int
{
int na_count=0;
bool anyneedutf8=false;
if (ustr_n!=0) Error("Internal error: ustr isn't empty when starting range_str: ustr_n=%d, ustr_alloc=%d", ustr_n, ustr_alloc); // # nocov
if (ustr_maxlen!=0) Error("Internal error: ustr_maxlen isn't 0 when starting range_str"); // # nocov
if (ustr_n!=0) STOP("Internal error: ustr isn't empty when starting range_str: ustr_n=%d, ustr_alloc=%d", ustr_n, ustr_alloc); // # nocov
if (ustr_maxlen!=0) STOP("Internal error: ustr_maxlen isn't 0 when starting range_str"); // # nocov
// savetl_init() has already been called at the start of forder
#pragma omp parallel for num_threads(getDTthreads())
for(int i=0; i<n; i++) {
Expand All @@ -306,7 +309,7 @@ static void range_str(SEXP *x, int n, uint64_t *out_min, uint64_t *out_max, int
ustr_alloc = (ustr_alloc==0) ? 16384 : ustr_alloc*2; // small initial guess, negligible time to alloc 128KB (32 pages)
if (ustr_alloc>n) ustr_alloc = n; // clamp at n. Reaches n when fully unique (no dups)
ustr = realloc(ustr, ustr_alloc * sizeof(SEXP));
if (ustr==NULL) Error("Unable to realloc %d * %d bytes in range_str", ustr_alloc, sizeof(SEXP)); // # nocov
if (ustr==NULL) STOP("Unable to realloc %d * %d bytes in range_str", ustr_alloc, (int)sizeof(SEXP)); // # nocov
}
ustr[ustr_n++] = s;
SET_TRUELENGTH(s, -ustr_n); // unique in any order is fine. first-appearance order is achieved later in count_group
Expand All @@ -324,7 +327,7 @@ static void range_str(SEXP *x, int n, uint64_t *out_min, uint64_t *out_max, int
SEXP ustr2 = PROTECT(allocVector(STRSXP, ustr_n));
for (int i=0; i<ustr_n; i++) SET_STRING_ELT(ustr2, i, ENC2UTF8(ustr[i]));
SEXP *ustr3 = (SEXP *)malloc(ustr_n * sizeof(SEXP));
if (!ustr3) Error("Failed to alloc ustr3 when converting strings to UTF8"); // # nocov
if (!ustr3) STOP("Failed to alloc ustr3 when converting strings to UTF8"); // # nocov
memcpy(ustr3, STRING_PTR(ustr2), ustr_n*sizeof(SEXP));
// need to reset ustr_maxlen because we need ustr_maxlen for utf8 strings
ustr_maxlen = 0;
Expand All @@ -342,7 +345,7 @@ static void range_str(SEXP *x, int n, uint64_t *out_min, uint64_t *out_max, int
}
// now use the 1-1 mapping from ustr to ustr2 to get the ordering back into original ustr, being careful to reset tl to 0
int *tl = (int *)malloc(ustr_n * sizeof(int));
if (!tl) Error("Failed to alloc tl when converting strings to UTF8"); // # nocov
if (!tl) STOP("Failed to alloc tl when converting strings to UTF8"); // # nocov
SEXP *tt = STRING_PTR(ustr2);
for (int i=0; i<ustr_n; i++) tl[i] = TRUELENGTH(tt[i]); // fetches the o in ustr3 into tl which is ordered by ustr
for (int i=0; i<ustr_n; i++) SET_TRUELENGTH(ustr3[i], 0); // reset to 0 tl of the UTF8 (and possibly non-UTF in ustr too)
Expand Down Expand Up @@ -406,7 +409,7 @@ uint64_t dtwiddle(void *p, int i)
}
if (ISNAN(u.d)) return ISNA(u.d) ? 0 /*NA*/ : 1 /*NaN*/; // also normalises a difference between NA on 32bit R (bit 13 set) and 64bit R (bit 13 not set)
if (isinf(u.d)) return signbit(u.d) ? 2 /*-Inf*/ : (0xffffffffffffffff>>(dround*8)) /*+Inf*/;
Error("Unknown non-finite value; not NA, NaN, -Inf or +Inf"); // # nocov
STOP("Unknown non-finite value; not NA, NaN, -Inf or +Inf"); // # nocov
}

void radix_r(const int from, const int to, const int radix);
Expand All @@ -428,11 +431,11 @@ SEXP forder(SEXP DT, SEXP by, SEXP retGrpArg, SEXP sortGroupsArg, SEXP ascArg, S

if (!isNewList(DT)) {
if (!isVectorAtomic(DT))
error("Internal error: input is not either a list of columns, or an atomic vector."); // # nocov; caught by colnamesInt at R level, test 1962.0472
STOP("Internal error: input is not either a list of columns, or an atomic vector."); // # nocov; caught by colnamesInt at R level, test 1962.0472
if (!isNull(by))
error("Internal error: input is an atomic vector (not a list of columns) but by= is not NULL"); // # nocov; caught at R level, test 1962.043
STOP("Internal error: input is an atomic vector (not a list of columns) but by= is not NULL"); // # nocov; caught at R level, test 1962.043
if (!isInteger(ascArg) || LENGTH(ascArg)!=1)
error("Input is an atomic vector (not a list of columns) but order= is not a length 1 integer");
STOP("Input is an atomic vector (not a list of columns) but order= is not a length 1 integer");
if (verbose)
Rprintf("forder.c received a vector type '%s' length %d\n", type2char(TYPEOF(DT)), length(DT));
SEXP tt = PROTECT(allocVector(VECSXP, 1)); n_protect++;
Expand All @@ -445,31 +448,31 @@ SEXP forder(SEXP DT, SEXP by, SEXP retGrpArg, SEXP sortGroupsArg, SEXP ascArg, S
Rprintf("forder.c received %d rows and %d columns\n", length(VECTOR_ELT(DT,0)), length(DT));
}
if (!length(DT))
error("Internal error: DT is an empty list() of 0 columns"); // # nocov should have been caught be colnamesInt, test 2099.1
STOP("Internal error: DT is an empty list() of 0 columns"); // # nocov should have been caught be colnamesInt, test 2099.1
if (!isInteger(by) || !LENGTH(by))
error("Internal error: DT has %d columns but 'by' is either not integer or is length 0", length(DT)); // # nocov colnamesInt catches, 2099.2
STOP("Internal error: DT has %d columns but 'by' is either not integer or is length 0", length(DT)); // # nocov colnamesInt catches, 2099.2
if (!isInteger(ascArg) || LENGTH(ascArg)!=LENGTH(by))
error("Either order= is not integer or its length (%d) is different to by='s length (%d)", LENGTH(ascArg), LENGTH(by));
STOP("Either order= is not integer or its length (%d) is different to by='s length (%d)", LENGTH(ascArg), LENGTH(by));
nrow = length(VECTOR_ELT(DT,0));
int n_cplx = 0;
for (int i=0; i<LENGTH(by); i++) {
int by_i = INTEGER(by)[i];
if (by_i < 1 || by_i > length(DT))
error("internal error: 'by' value %d out of range [1,%d]", by_i, length(DT)); // # nocov # R forderv already catch that using C colnamesInt
STOP("internal error: 'by' value %d out of range [1,%d]", by_i, length(DT)); // # nocov # R forderv already catch that using C colnamesInt
if ( nrow != length(VECTOR_ELT(DT, by_i-1)) )
error("Column %d is length %d which differs from length of column 1 (%d)\n", INTEGER(by)[i], length(VECTOR_ELT(DT, INTEGER(by)[i]-1)), nrow);
STOP("Column %d is length %d which differs from length of column 1 (%d)\n", INTEGER(by)[i], length(VECTOR_ELT(DT, INTEGER(by)[i]-1)), nrow);
if (TYPEOF(VECTOR_ELT(DT, by_i-1)) == CPLXSXP) n_cplx++;
}
if (!isLogical(retGrpArg) || LENGTH(retGrpArg)!=1 || INTEGER(retGrpArg)[0]==NA_LOGICAL)
error("retGrp= must be TRUE or FALSE");
STOP("retGrp= must be TRUE or FALSE");
retgrp = LOGICAL(retGrpArg)[0]==TRUE;
if (!isLogical(sortGroupsArg) || LENGTH(sortGroupsArg)!=1 || INTEGER(sortGroupsArg)[0]==NA_LOGICAL )
error("sort= must be TRUE or FALSE");
STOP("sort= must be TRUE or FALSE");
sortType = LOGICAL(sortGroupsArg)[0]==TRUE; // if sortType is 1, it is later flipped between +1/-1 according to ascArg. Otherwise ascArg is ignored when sortType==0
if (!retgrp && !sortType)
error("At least one of retGrp= or sort= must be TRUE");
STOP("At least one of retGrp= or sort= must be TRUE");
if (!isLogical(naArg) || LENGTH(naArg) != 1)
error("na.last must be logical TRUE, FALSE or NA of length 1");
STOP("na.last must be logical TRUE, FALSE or NA of length 1");
nalast = (LOGICAL(naArg)[0] == NA_LOGICAL) ? -1 : LOGICAL(naArg)[0]; // 1=na last, 0=na first (default), -1=remove na

if (nrow==0) {
Expand All @@ -494,8 +497,10 @@ SEXP forder(SEXP DT, SEXP by, SEXP retGrpArg, SEXP sortGroupsArg, SEXP ascArg, S
savetl_init(); // from now on use Error not error

int ncol=length(by);
key = calloc((ncol+n_cplx)*8+1, sizeof(uint8_t *)); // needs to be before loop because part II relies on part I, column-by-column. +1 because we check NULL after last one
// TODO: if key==NULL Error
int keyAlloc = (ncol+n_cplx)*8 + 1; // +1 for NULL to mark end; calloc to initialize with NULLs
key = calloc(keyAlloc, sizeof(uint8_t *)); // needs to be before loop because part II relies on part I, column-by-column.
if (!key)
STOP("Unable to allocate %llu bytes of working memory", (unsigned long long)(keyAlloc*sizeof(uint8_t *))); // # nocov
nradix=0; // the current byte we're writing this column to; might be squashing into it (spare>0)
int spare=0; // the amount of bits remaining on the right of the current nradix byte
bool isReal=false;
Expand All @@ -511,7 +516,7 @@ SEXP forder(SEXP DT, SEXP by, SEXP retGrpArg, SEXP sortGroupsArg, SEXP ascArg, S
if (sortType) {
sortType=INTEGER(ascArg)[col]; // if sortType!=0 (not first-appearance) then +1/-1 comes from ascArg.
if (sortType!=1 && sortType!=-1)
Error("Item %d of order (ascending/descending) is %d. Must be +1 or -1.", col+1, sortType);
STOP("Item %d of order (ascending/descending) is %d. Must be +1 or -1.", col+1, sortType);
}
//Rprintf("sortType = %d\n", sortType);
switch(TYPEOF(x)) {
Expand Down Expand Up @@ -553,7 +558,7 @@ SEXP forder(SEXP DT, SEXP by, SEXP retGrpArg, SEXP sortGroupsArg, SEXP ascArg, S
range_str(STRING_PTR(x), nrow, &min, &max, &na_count);
break;
default:
Error("Column %d passed to [f]order is type '%s', not yet supported.", col+1, type2char(TYPEOF(x)));
STOP("Column %d passed to [f]order is type '%s', not yet supported.", col+1, type2char(TYPEOF(x)));
}
TEND(3);
if (na_count==nrow || (min>0 && min==max && na_count==0 && infnan_count==0)) {
Expand Down Expand Up @@ -596,8 +601,12 @@ SEXP forder(SEXP DT, SEXP by, SEXP retGrpArg, SEXP sortGroupsArg, SEXP ascArg, S
}

for (int b=0; b<nbyte; b++) {
if (key[nradix+b]==NULL)
key[nradix+b] = calloc(nrow, sizeof(uint8_t)); // 0 initialize so that NA's can just skip (NA is always the 0 offset)
if (key[nradix+b]==NULL) {
uint8_t *tt = calloc(nrow, sizeof(uint8_t)); // 0 initialize so that NA's can just skip (NA is always the 0 offset)
if (!tt)
STOP("Unable to allocate %llu bytes of working memory", (unsigned long long)(nrow * sizeof(uint8_t))); // # nocov
key[nradix+b] = tt;
}
}

const bool asc = (sortType>=0);
Expand Down Expand Up @@ -702,7 +711,7 @@ SEXP forder(SEXP DT, SEXP by, SEXP retGrpArg, SEXP sortGroupsArg, SEXP ascArg, S
free_ustr(); // ustr could be left allocated and reused, but free now in case large and we're tight on ram
break;
default:
Error("Internal error: column not supported not caught earlier"); // # nocov
STOP("Internal error: column not supported not caught earlier"); // # nocov
}
nradix += nbyte-1+(spare==0);
TEND(4)
Expand All @@ -716,12 +725,12 @@ SEXP forder(SEXP DT, SEXP by, SEXP retGrpArg, SEXP sortGroupsArg, SEXP ascArg, S
int nth = getDTthreads();
TMP = (int *)malloc(nth*UINT16_MAX*sizeof(int)); // used by counting sort (my_n<=65536) in radix_r()
UGRP = (uint8_t *)malloc(nth*256); // TODO: align TMP and UGRP to cache lines (and do the same for stack allocations too)
if (!TMP || !UGRP /*|| TMP%64 || UGRP%64*/) Error("Failed to allocate TMP or UGRP or they weren't cache line aligned: nth=%d", nth);
if (!TMP || !UGRP /*|| TMP%64 || UGRP%64*/) STOP("Failed to allocate TMP or UGRP or they weren't cache line aligned: nth=%d", nth);
if (retgrp) {
gs_thread = calloc(nth, sizeof(int *)); // thread private group size buffers
gs_thread_alloc = calloc(nth, sizeof(int));
gs_thread_n = calloc(nth, sizeof(int));
if (!gs_thread || !gs_thread_alloc || !gs_thread_n) Error("Could not allocate (very tiny) group size thread buffers");
if (!gs_thread || !gs_thread_alloc || !gs_thread_n) STOP("Could not allocate (very tiny) group size thread buffers");
}
if (nradix) {
radix_r(0, nrow-1, 0); // top level recursive call: (from, to, radix)
Expand Down Expand Up @@ -1041,7 +1050,7 @@ void radix_r(const int from, const int to, const int radix) {
uint16_t *counts = calloc(nBatch*256,sizeof(uint16_t));
uint8_t *ugrps = malloc(nBatch*256*sizeof(uint8_t));
int *ngrps = calloc(nBatch ,sizeof(int));
if (!counts || !ugrps || !ngrps) Error("Failed to allocate parallel counts. my_n=%d, nBatch=%d", my_n, nBatch);
if (!counts || !ugrps || !ngrps) STOP("Failed to allocate parallel counts. my_n=%d, nBatch=%d", my_n, nBatch);

bool skip=true;
const int n_rem = nradix-radix-1; // how many radix are remaining after this one
Expand Down Expand Up @@ -1150,7 +1159,7 @@ void radix_r(const int from, const int to, const int radix) {
TEND(18 + notFirst*3)
if (!skip) {
int *TMP = malloc(my_n * sizeof(int));
if (!TMP) Error("Unable to allocate TMP for my_n=%d items in parallel batch counting", my_n);
if (!TMP) STOP("Unable to allocate TMP for my_n=%d items in parallel batch counting", my_n);
#pragma omp parallel for num_threads(getDTthreads())
for (int batch=0; batch<nBatch; batch++) {
const int *restrict my_starts = starts + batch*256;
Expand Down Expand Up @@ -1257,7 +1266,7 @@ SEXP fsorted(SEXP x)
// These are all sequential access to x, so very quick and cache efficient. Could be parallel by checking continuity at batch boundaries.
const int n = length(x);
if (n <= 1) return(ScalarLogical(TRUE));
if (!isVectorAtomic(x)) Error("is.sorted (R level) and fsorted (C level) only to be used on vectors. If needed on a list/data.table, you'll need the order anyway if not sorted, so use if (length(o<-forder(...))) for efficiency in one step, or equivalent at C level");
if (!isVectorAtomic(x)) STOP("is.sorted (R level) and fsorted (C level) only to be used on vectors. If needed on a list/data.table, you'll need the order anyway if not sorted, so use if (length(o<-forder(...))) for efficiency in one step, or equivalent at C level");
int i=1;
switch(TYPEOF(x)) {
case INTSXP : case LGLSXP : {
Expand Down Expand Up @@ -1289,7 +1298,7 @@ SEXP fsorted(SEXP x)
}
} break;
default :
Error("type '%s' is not yet supported", type2char(TYPEOF(x)));
STOP("type '%s' is not yet supported", type2char(TYPEOF(x)));
}
return ScalarLogical(i==n);
}
Expand Down