Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions cpp/src/arrow/compute/kernels/filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include <utility>
#include <vector>

#include "arrow/array/concatenate.h"
#include "arrow/builder.h"
#include "arrow/compute/kernels/take_internal.h"
#include "arrow/record_batch.h"
Expand Down Expand Up @@ -163,5 +164,91 @@ Status Filter(FunctionContext* ctx, const RecordBatch& batch, const Array& filte
return Status::OK();
}

Status Filter(FunctionContext* ctx, const ChunkedArray& values, const Array& filter,
std::shared_ptr<ChunkedArray>* out) {
if (values.length() != filter.length()) {
return Status::Invalid("filter and value array must have identical lengths");
}
auto num_chunks = values.num_chunks();
std::vector<std::shared_ptr<Array>> new_chunks(num_chunks);
std::shared_ptr<Array> current_chunk;
int64_t offset = 0;
int64_t len;

for (int i = 0; i < num_chunks; i++) {
current_chunk = values.chunk(i);
len = current_chunk->length();
RETURN_NOT_OK(
Filter(ctx, *current_chunk, *filter.Slice(offset, len), &new_chunks[i]));
offset += len;
}

*out = std::make_shared<ChunkedArray>(std::move(new_chunks));
return Status::OK();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should create a function that accepts a lambda for chunked evaluation so this logic can be reused in other places. Does not have to happen in this patch

}

Status Filter(FunctionContext* ctx, const ChunkedArray& values,
const ChunkedArray& filter, std::shared_ptr<ChunkedArray>* out) {
if (values.length() != filter.length()) {
return Status::Invalid("filter and value array must have identical lengths");
}
auto num_chunks = values.num_chunks();
std::vector<std::shared_ptr<Array>> new_chunks(num_chunks);
std::shared_ptr<Array> current_chunk;
std::shared_ptr<ChunkedArray> current_chunked_filter;
std::shared_ptr<Array> current_filter;
int64_t offset = 0;
int64_t len;

for (int i = 0; i < num_chunks; i++) {
current_chunk = values.chunk(i);
len = current_chunk->length();
if (len > 0) {
current_chunked_filter = filter.Slice(offset, len);
if (current_chunked_filter->num_chunks() == 1) {
current_filter = current_chunked_filter->chunk(0);
} else {
// Concatenate the chunks of the filter so we have an Array
RETURN_NOT_OK(Concatenate(current_chunked_filter->chunks(), default_memory_pool(),
&current_filter));
}
RETURN_NOT_OK(Filter(ctx, *current_chunk, *current_filter, &new_chunks[i]));
offset += len;
} else {
// Put a zero length array there, which we know our current chunk to be
new_chunks[i] = current_chunk;
}
}

*out = std::make_shared<ChunkedArray>(std::move(new_chunks));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same with this logic. I'm not sure about the concatenation part, it seems like you would want to split larger chunks into smaller pieces, yielding an output that has more chunks than the input

e.g.

array chunks [10, 10, 10, 3]
filter chunks [5, 5, 5, 15, 3]

output chunks [5, 5, 5, 5, 10, 3]

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

return Status::OK();
}

Status Filter(FunctionContext* ctx, const Table& table, const Array& filter,
std::shared_ptr<Table>* out) {
auto ncols = table.num_columns();

std::vector<std::shared_ptr<ChunkedArray>> columns(ncols);

for (int j = 0; j < ncols; j++) {
RETURN_NOT_OK(Filter(ctx, *table.column(j), filter, &columns[j]));
}
*out = Table::Make(table.schema(), columns);
return Status::OK();
}

Status Filter(FunctionContext* ctx, const Table& table, const ChunkedArray& filter,
std::shared_ptr<Table>* out) {
auto ncols = table.num_columns();

std::vector<std::shared_ptr<ChunkedArray>> columns(ncols);

for (int j = 0; j < ncols; j++) {
RETURN_NOT_OK(Filter(ctx, *table.column(j), filter, &columns[j]));
}
*out = Table::Make(table.schema(), columns);
return Status::OK();
}

} // namespace compute
} // namespace arrow
69 changes: 69 additions & 0 deletions cpp/src/arrow/compute/kernels/filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,44 @@ ARROW_EXPORT
Status Filter(FunctionContext* ctx, const Array& values, const Array& filter,
std::shared_ptr<Array>* out);

/// \brief Filter a chunked array with a boolean selection filter
///
/// The output chunked array will be populated with values from the input at positions
/// where the selection filter is not 0. Nulls in the filter will result in nulls
/// in the output.
///
/// For example given values = ["a", "b", "c", null, "e", "f"] and
/// filter = [0, 1, 1, 0, null, 1], the output will be
/// = ["b", "c", null, "f"]
///
/// \param[in] ctx the FunctionContext
/// \param[in] values chunked array to filter
/// \param[in] filter indicates which values should be filtered out
/// \param[out] out resulting chunked array
/// NOTE: Experimental API
ARROW_EXPORT
Status Filter(FunctionContext* ctx, const ChunkedArray& values, const Array& filter,
std::shared_ptr<ChunkedArray>* out);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per comments on Take below I think that using Datum and having fewer public APIs would be better. There are implicit ctors for Datum to make the usage easier

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I briefly looked into refactoring to use Datum but it didn't seem like a good use of my time right now to figure it out. https://issues.apache.org/jira/browse/ARROW-7009 is for someone else to pick that up.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem


/// \brief Filter a chunked array with a boolean selection filter
///
/// The output chunked array will be populated with values from the input at positions
/// where the selection filter is not 0. Nulls in the filter will result in nulls
/// in the output.
///
/// For example given values = ["a", "b", "c", null, "e", "f"] and
/// filter = [0, 1, 1, 0, null, 1], the output will be
/// = ["b", "c", null, "f"]
///
/// \param[in] ctx the FunctionContext
/// \param[in] values chunked array to filter
/// \param[in] filter indicates which values should be filtered out
/// \param[out] out resulting chunked array
/// NOTE: Experimental API
ARROW_EXPORT
Status Filter(FunctionContext* ctx, const ChunkedArray& values,
const ChunkedArray& filter, std::shared_ptr<ChunkedArray>* out);

/// \brief Filter a record batch with a boolean selection filter
///
/// The output record batch's columns will be populated with values from corresponding
Expand All @@ -60,10 +98,41 @@ Status Filter(FunctionContext* ctx, const Array& values, const Array& filter,
/// \param[in] batch record batch to filter
/// \param[in] filter indicates which values should be filtered out
/// \param[out] out resulting record batch
/// NOTE: Experimental API
ARROW_EXPORT
Status Filter(FunctionContext* ctx, const RecordBatch& batch, const Array& filter,
std::shared_ptr<RecordBatch>* out);

/// \brief Filter a table with a boolean selection filter
///
/// The output table's columns will be populated with values from corresponding
/// columns of the input at positions where the selection filter is not 0. Nulls in the
/// filter will result in nulls in each column of the output.
///
/// \param[in] ctx the FunctionContext
/// \param[in] table table to filter
/// \param[in] filter indicates which values should be filtered out
/// \param[out] out resulting table
/// NOTE: Experimental API
ARROW_EXPORT
Status Filter(FunctionContext* ctx, const Table& table, const Array& filter,
std::shared_ptr<Table>* out);

/// \brief Filter a table with a boolean selection filter
///
/// The output record batch's columns will be populated with values from corresponding
/// columns of the input at positions where the selection filter is not 0. Nulls in the
/// filter will result in nulls in the output.
///
/// \param[in] ctx the FunctionContext
/// \param[in] table record batch to filter
/// \param[in] filter indicates which values should be filtered out
/// \param[out] out resulting record batch
/// NOTE: Experimental API
ARROW_EXPORT
Status Filter(FunctionContext* ctx, const Table& table, const ChunkedArray& filter,
std::shared_ptr<Table>* out);

/// \brief Filter an array with a boolean selection filter
///
/// \param[in] ctx the FunctionContext
Expand Down
152 changes: 152 additions & 0 deletions cpp/src/arrow/compute/kernels/filter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -468,5 +468,157 @@ TEST_F(TestFilterKernelWithUnion, FilterUnion) {
}
}

class TestFilterKernelWithRecordBatch : public TestFilterKernel<RecordBatch> {
public:
void AssertFilter(const std::shared_ptr<Schema>& schm, const std::string& batch_json,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: not fond of condensed variable names like schm

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I don't do something like this, I collide with the function named schema:

/Users/enpiar/Documents/ursa/arrow/cpp/src/arrow/compute/kernels/filter_test.cc:492:17: error: variable 'schema' declared with deduced type 'auto' cannot appear in its own initializer
  auto schema = schema(fields);
                ^

const std::string& selection, const std::string& expected_batch) {
std::shared_ptr<RecordBatch> actual;

ASSERT_OK(this->Filter(schm, batch_json, selection, &actual));
ASSERT_OK(actual->Validate());
ASSERT_BATCHES_EQUAL(*RecordBatchFromJSON(schm, expected_batch), *actual);
}

Status Filter(const std::shared_ptr<Schema>& schm, const std::string& batch_json,
const std::string& selection, std::shared_ptr<RecordBatch>* out) {
auto batch = RecordBatchFromJSON(schm, batch_json);
return arrow::compute::Filter(&this->ctx_, *batch,
*ArrayFromJSON(boolean(), selection), out);
}
};

TEST_F(TestFilterKernelWithRecordBatch, FilterRecordBatch) {
std::vector<std::shared_ptr<Field>> fields = {field("a", int32()), field("b", utf8())};
auto schm = schema(fields);

auto struct_json = R"([
{"a": null, "b": "yo"},
{"a": 1, "b": ""},
{"a": 2, "b": "hello"},
{"a": 4, "b": "eh"}
])";
this->AssertFilter(schm, struct_json, "[0, 0, 0, 0]", "[]");
this->AssertFilter(schm, struct_json, "[0, 1, 1, null]", R"([
{"a": 1, "b": ""},
{"a": 2, "b": "hello"},
{"a": null, "b": null}
])");
this->AssertFilter(schm, struct_json, "[1, 1, 1, 1]", struct_json);
this->AssertFilter(schm, struct_json, "[1, 0, 1, 0]", R"([
{"a": null, "b": "yo"},
{"a": 2, "b": "hello"}
])");
}

class TestFilterKernelWithChunkedArray : public TestFilterKernel<ChunkedArray> {
public:
void AssertFilter(const std::shared_ptr<DataType>& type,
const std::vector<std::string>& values, const std::string& filter,
const std::vector<std::string>& expected) {
std::shared_ptr<ChunkedArray> actual;
ASSERT_OK(this->FilterWithArray(type, values, filter, &actual));
ASSERT_OK(actual->Validate());
AssertChunkedEqual(*ChunkedArrayFromJSON(type, expected), *actual);
}

void AssertChunkedFilter(const std::shared_ptr<DataType>& type,
const std::vector<std::string>& values,
const std::vector<std::string>& filter,
const std::vector<std::string>& expected) {
std::shared_ptr<ChunkedArray> actual;
ASSERT_OK(this->FilterWithChunkedArray(type, values, filter, &actual));
ASSERT_OK(actual->Validate());
AssertChunkedEqual(*ChunkedArrayFromJSON(type, expected), *actual);
}

Status FilterWithArray(const std::shared_ptr<DataType>& type,
const std::vector<std::string>& values,
const std::string& filter, std::shared_ptr<ChunkedArray>* out) {
return arrow::compute::Filter(&this->ctx_, *ChunkedArrayFromJSON(type, values),
*ArrayFromJSON(boolean(), filter), out);
}

Status FilterWithChunkedArray(const std::shared_ptr<DataType>& type,
const std::vector<std::string>& values,
const std::vector<std::string>& filter,
std::shared_ptr<ChunkedArray>* out) {
return arrow::compute::Filter(&this->ctx_, *ChunkedArrayFromJSON(type, values),
*ChunkedArrayFromJSON(boolean(), filter), out);
}
};

TEST_F(TestFilterKernelWithChunkedArray, FilterChunkedArray) {
this->AssertFilter(int8(), {"[]"}, "[]", {"[]"});
this->AssertChunkedFilter(int8(), {"[]"}, {"[]"}, {"[]"});

this->AssertFilter(int8(), {"[7]", "[8, 9]"}, "[0, 1, 0]", {"[]", "[8]"});
this->AssertChunkedFilter(int8(), {"[7]", "[8, 9]"}, {"[0]", "[1, 0]"}, {"[]", "[8]"});
this->AssertChunkedFilter(int8(), {"[7]", "[8, 9]"}, {"[0, 1]", "[0]"}, {"[8]", "[]"});

std::shared_ptr<ChunkedArray> arr;
ASSERT_RAISES(
Invalid, this->FilterWithArray(int8(), {"[7]", "[8, 9]"}, "[0, 1, 0, 1, 1]", &arr));
ASSERT_RAISES(Invalid, this->FilterWithChunkedArray(int8(), {"[7]", "[8, 9]"},
{"[0, 1, 0]", "[1, 1]"}, &arr));
}

class TestFilterKernelWithTable : public TestFilterKernel<Table> {
public:
void AssertFilter(const std::shared_ptr<Schema>& schm,
const std::vector<std::string>& table_json, const std::string& filter,
const std::vector<std::string>& expected_table) {
std::shared_ptr<Table> actual;

ASSERT_OK(this->FilterWithArray(schm, table_json, filter, &actual));
ASSERT_OK(actual->Validate());
ASSERT_TABLES_EQUAL(*TableFromJSON(schm, expected_table), *actual);
}

void AssertChunkedFilter(const std::shared_ptr<Schema>& schm,
const std::vector<std::string>& table_json,
const std::vector<std::string>& filter,
const std::vector<std::string>& expected_table) {
std::shared_ptr<Table> actual;

ASSERT_OK(this->FilterWithChunkedArray(schm, table_json, filter, &actual));
ASSERT_OK(actual->Validate());
ASSERT_TABLES_EQUAL(*TableFromJSON(schm, expected_table), *actual);
}

Status FilterWithArray(const std::shared_ptr<Schema>& schm,
const std::vector<std::string>& values,
const std::string& filter, std::shared_ptr<Table>* out) {
return arrow::compute::Filter(&this->ctx_, *TableFromJSON(schm, values),
*ArrayFromJSON(boolean(), filter), out);
}

Status FilterWithChunkedArray(const std::shared_ptr<Schema>& schm,
const std::vector<std::string>& values,
const std::vector<std::string>& filter,
std::shared_ptr<Table>* out) {
return arrow::compute::Filter(&this->ctx_, *TableFromJSON(schm, values),
*ChunkedArrayFromJSON(boolean(), filter), out);
}
};

TEST_F(TestFilterKernelWithTable, FilterTable) {
std::vector<std::shared_ptr<Field>> fields = {field("a", int32()), field("b", utf8())};
auto schm = schema(fields);

std::vector<std::string> table_json = {
"[{\"a\": null, \"b\": \"yo\"},{\"a\": 1, \"b\": \"\"}]",
"[{\"a\": 2, \"b\": \"hello\"},{\"a\": 4, \"b\": \"eh\"}]"};
this->AssertFilter(schm, table_json, "[0, 0, 0, 0]", {"[]", "[]"});
this->AssertChunkedFilter(schm, table_json, {"[0]", "[0, 0, 0]"}, {"[]", "[]"});

std::vector<std::string> expected2 = {
"[{\"a\": 1, \"b\": \"\"}]",
"[{\"a\": 2, \"b\": \"hello\"},{\"a\": null, \"b\": null}]"};
this->AssertFilter(schm, table_json, "[0, 1, 1, null]", expected2);
this->AssertChunkedFilter(schm, table_json, {"[0, 1, 1]", "[null]"}, expected2);
this->AssertFilter(schm, table_json, "[1, 1, 1, 1]", table_json);
this->AssertChunkedFilter(schm, table_json, {"[1]", "[1, 1, 1]"}, table_json);
}

} // namespace compute
} // namespace arrow
Loading