Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
13a1969
initial mask kernel impl
bkietz May 21, 2019
db44424
remove empty MaskOptions
bkietz May 22, 2019
c953dca
remove empty TakeOptions
bkietz May 22, 2019
223a860
fix typo
bkietz May 22, 2019
d5c9c14
use checked_cast
bkietz May 22, 2019
a54741e
add some tests with empty masks/take indices
bkietz May 22, 2019
4c8ce6d
revert submodule
bkietz May 22, 2019
4b24ca3
revert removal of TakeOptions
bkietz May 22, 2019
edf2eb1
rename FilterFunction -> CompareFunction
bkietz Jun 10, 2019
0f29ab2
rename Mask -> Filter
bkietz Jun 10, 2019
6efc4f5
add filter tests with large, random arrays
bkietz Jun 10, 2019
7c50027
add a test integrating with arrow::compute::Compare
bkietz Jun 11, 2019
8a9f379
add a test integrating with arrow::compute::Compare (array-array)
bkietz Jun 11, 2019
ccd32a5
add a basic filter benchmark
bkietz Jun 11, 2019
a216388
add explicit qualification for MSVC
bkietz Jun 11, 2019
e3b4022
add filter impls for nested types
bkietz Jun 13, 2019
a8cb993
fix lint error
bkietz Jun 13, 2019
495e521
Add support for filtering MapArray
bkietz Jun 13, 2019
3387f21
use new path for concatenate.h
bkietz Jun 13, 2019
f424f34
fix nits and typos
bkietz Jun 14, 2019
f833e02
add benchmark for fixed_size_list(int64(), 1)
bkietz Jun 14, 2019
e4d9d85
refactor FilterKernel::Make to use a switch
bkietz Jun 14, 2019
24f2e85
add larger benchmarks to test for O(N^2) perf
bkietz Jun 14, 2019
060313c
refactor FilterImpl<StructType> to own child kernels
bkietz Jun 14, 2019
7702055
use expanded bitmap for FixedSizeList and List
bkietz Jun 14, 2019
030ac57
filter benchmarks += MinTime(1.0) nanoseconds
bkietz Jun 14, 2019
e8465e5
iwyu: vector
bkietz Jun 15, 2019
3d92b6e
Make FilterKernel public
bkietz Jun 19, 2019
032d341
fix doc error
bkietz Jun 19, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -172,8 +172,8 @@ if(ARROW_COMPUTE)
compute/kernels/cast.cc
compute/kernels/compare.cc
compute/kernels/count.cc
compute/kernels/filter.cc
compute/kernels/hash.cc
compute/kernels/filter.cc
compute/kernels/mean.cc
compute/kernels/sum.cc
compute/kernels/take.cc
Expand Down
169 changes: 169 additions & 0 deletions cpp/src/arrow/array.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1369,6 +1369,175 @@ std::shared_ptr<Array> MakeArray(const std::shared_ptr<ArrayData>& data) {

namespace internal {

// get the maximum buffer length required, then allocate a single zeroed buffer
// to use anywhere a buffer is required
class NullArrayFactory {
public:
struct GetBufferLength {
GetBufferLength(const std::shared_ptr<DataType>& type, int64_t length)
: type_(*type), length_(length), buffer_length_(BitUtil::BytesForBits(length)) {}

operator int64_t() && {
DCHECK_OK(VisitTypeInline(type_, this));
return buffer_length_;
}

template <typename T, typename = decltype(TypeTraits<T>::bytes_required(0))>
Status Visit(const T&) {
return MaxOf(TypeTraits<T>::bytes_required(length_));
}

Status Visit(const ListType& type) {
// list's values array may be empty, but there must be at least one offset of 0
return MaxOf(sizeof(int32_t));
}

Status Visit(const FixedSizeListType& type) {
return MaxOf(GetBufferLength(type.value_type(), type.list_size() * length_));
}

Status Visit(const StructType& type) {
for (const auto& child : type.children()) {
DCHECK_OK(MaxOf(GetBufferLength(child->type(), length_)));
}
return Status::OK();
}

Status Visit(const UnionType& type) {
// type codes
DCHECK_OK(MaxOf(length_));
if (type.mode() == UnionMode::DENSE) {
// offsets
DCHECK_OK(MaxOf(sizeof(int32_t) * length_));
}
for (const auto& child : type.children()) {
DCHECK_OK(MaxOf(GetBufferLength(child->type(), length_)));
}
return Status::OK();
}

Status Visit(const DictionaryType& type) {
DCHECK_OK(MaxOf(GetBufferLength(type.value_type(), length_)));
return MaxOf(GetBufferLength(type.index_type(), length_));
}

Status Visit(const ExtensionType& type) {
// XXX is an extension array's length always == storage length
return MaxOf(GetBufferLength(type.storage_type(), length_));
}

Status Visit(const DataType& type) {
return Status::NotImplemented("construction of all-null ", type);
}

private:
Status MaxOf(int64_t buffer_length) {
if (buffer_length > buffer_length_) {
buffer_length_ = buffer_length;
}
return Status::OK();
}

const DataType& type_;
int64_t length_, buffer_length_;
};

NullArrayFactory(const std::shared_ptr<DataType>& type, int64_t length,
std::shared_ptr<ArrayData>* out)
: type_(type), length_(length), out_(out) {}

Status CreateBuffer() {
int64_t buffer_length = GetBufferLength(type_, length_);
RETURN_NOT_OK(AllocateBuffer(buffer_length, &buffer_));
std::memset(buffer_->mutable_data(), 0, buffer_->size());
return Status::OK();
}

Status Create() {
if (buffer_ == nullptr) {
RETURN_NOT_OK(CreateBuffer());
}
std::vector<std::shared_ptr<ArrayData>> child_data(type_->num_children());
*out_ = ArrayData::Make(type_, length_, {buffer_}, child_data, length_, 0);
return VisitTypeInline(*type_, this);
}

Status Visit(const NullType&) { return Status::OK(); }

Status Visit(const FixedWidthType&) {
(*out_)->buffers.resize(2, buffer_);
return Status::OK();
}

Status Visit(const BinaryType&) {
(*out_)->buffers.resize(3, buffer_);
return Status::OK();
}

Status Visit(const ListType& type) {
(*out_)->buffers.resize(2, buffer_);
return CreateChild(0, length_, &(*out_)->child_data[0]);
}

Status Visit(const FixedSizeListType& type) {
return CreateChild(0, length_ * type.list_size(), &(*out_)->child_data[0]);
}

Status Visit(const StructType& type) {
for (int i = 0; i < type_->num_children(); ++i) {
DCHECK_OK(CreateChild(i, length_, &(*out_)->child_data[i]));
}
return Status::OK();
}

Status Visit(const UnionType& type) {
if (type.mode() == UnionMode::DENSE) {
(*out_)->buffers.resize(3, buffer_);
} else {
(*out_)->buffers.resize(2, buffer_);
}

for (int i = 0; i < type_->num_children(); ++i) {
DCHECK_OK(CreateChild(i, length_, &(*out_)->child_data[i]));
}
return Status::OK();
}

Status Visit(const DictionaryType& type) {
(*out_)->buffers.resize(2, buffer_);
std::shared_ptr<ArrayData> dictionary_data;
return MakeArrayOfNull(type.value_type(), 0, &(*out_)->dictionary);
}

Status Visit(const DataType& type) {
return Status::NotImplemented("construction of all-null ", type);
}

Status CreateChild(int i, int64_t length, std::shared_ptr<ArrayData>* out) {
NullArrayFactory child_factory(type_->child(i)->type(), length,
&(*out_)->child_data[i]);
child_factory.buffer_ = buffer_;
return child_factory.Create();
}

std::shared_ptr<DataType> type_;
int64_t length_;
std::shared_ptr<ArrayData>* out_;
std::shared_ptr<Buffer> buffer_;
};

} // namespace internal

Status MakeArrayOfNull(const std::shared_ptr<DataType>& type, int64_t length,
std::shared_ptr<Array>* out) {
std::shared_ptr<ArrayData> out_data;
RETURN_NOT_OK(internal::NullArrayFactory(type, length, &out_data).Create());
*out = MakeArray(out_data);
return Status::OK();
}

namespace internal {

std::vector<ArrayVector> RechunkArraysConsistently(
const std::vector<ArrayVector>& groups) {
if (groups.size() <= 1) {
Expand Down
18 changes: 16 additions & 2 deletions cpp/src/arrow/array.h
Original file line number Diff line number Diff line change
Expand Up @@ -220,6 +220,14 @@ struct ARROW_EXPORT ArrayData {
ARROW_EXPORT
std::shared_ptr<Array> MakeArray(const std::shared_ptr<ArrayData>& data);

/// \brief Create a strongly-typed Array instance with all elements null
/// \param[in] type the array type
/// \param[in] length the array length
/// \param[out] out resulting Array instance
ARROW_EXPORT
Status MakeArrayOfNull(const std::shared_ptr<DataType>& type, int64_t length,
std::shared_ptr<Array>* out);

// ----------------------------------------------------------------------
// User array accessor types

Expand Down Expand Up @@ -521,12 +529,15 @@ class ARROW_EXPORT ListArray : public Array {
/// Return pointer to raw value offsets accounting for any slice offset
const int32_t* raw_value_offsets() const { return raw_value_offsets_ + data_->offset; }

// Neither of these functions will perform boundschecking
// The following functions will not perform boundschecking
int32_t value_offset(int64_t i) const { return raw_value_offsets_[i + data_->offset]; }
int32_t value_length(int64_t i) const {
i += data_->offset;
return raw_value_offsets_[i + 1] - raw_value_offsets_[i];
}
std::shared_ptr<Array> value_slice(int64_t i) const {
return values_->Slice(value_offset(i), value_length(i));
}

protected:
// This constructor defers SetData to a derived array class
Expand Down Expand Up @@ -596,12 +607,15 @@ class ARROW_EXPORT FixedSizeListArray : public Array {

std::shared_ptr<DataType> value_type() const;

// Neither of these functions will perform boundschecking
// The following functions will not perform boundschecking
int32_t value_offset(int64_t i) const {
i += data_->offset;
return static_cast<int32_t>(list_size_ * i);
}
int32_t value_length(int64_t i = 0) const { return list_size_; }
std::shared_ptr<Array> value_slice(int64_t i) const {
return values_->Slice(value_offset(i), value_length(i));
}

protected:
void SetData(const std::shared_ptr<ArrayData>& data);
Expand Down
7 changes: 7 additions & 0 deletions cpp/src/arrow/array/builder_primitive.cc
Original file line number Diff line number Diff line change
Expand Up @@ -128,4 +128,11 @@ Status BooleanBuilder::AppendValues(const std::vector<bool>& values) {
return Status::OK();
}

Status BooleanBuilder::AppendValues(int64_t length, bool value) {
RETURN_NOT_OK(Reserve(length));
data_builder_.UnsafeAppend(length, value);
ArrayBuilder::UnsafeSetNotNull(length);
return Status::OK();
}

} // namespace arrow
2 changes: 2 additions & 0 deletions cpp/src/arrow/array/builder_primitive.h
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,8 @@ class ARROW_EXPORT BooleanBuilder : public ArrayBuilder {
return Status::OK();
}

Status AppendValues(int64_t length, bool value);

Status FinishInternal(std::shared_ptr<ArrayData>* out) override;

/// \cond FALSE
Expand Down
23 changes: 23 additions & 0 deletions cpp/src/arrow/compute/benchmark-util.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,5 +70,28 @@ void RegressionSetArgs(benchmark::internal::Benchmark* bench) {
BenchmarkSetArgsWithSizes(bench, {kL1Size});
}

// RAII struct to handle some of the boilerplate in regression benchmarks
struct RegressionArgs {
// size of memory tested (per iteration) in bytes
const int64_t size;

// proportion of nulls in generated arrays
const double null_proportion;

explicit RegressionArgs(benchmark::State& state)
: size(state.range(0)),
null_proportion(static_cast<double>(state.range(1)) / 100.0),
state_(state) {}

~RegressionArgs() {
state_.counters["size"] = static_cast<double>(size);
state_.counters["null_percent"] = static_cast<double>(state_.range(1));
state_.SetBytesProcessed(state_.iterations() * size);
}

private:
benchmark::State& state_;
};

} // namespace compute
} // namespace arrow
8 changes: 6 additions & 2 deletions cpp/src/arrow/compute/kernels/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,17 @@ arrow_install_all_headers("arrow/compute/kernels")
add_arrow_test(boolean-test PREFIX "arrow-compute")
add_arrow_test(cast-test PREFIX "arrow-compute")
add_arrow_test(hash-test PREFIX "arrow-compute")
add_arrow_test(take-test PREFIX "arrow-compute")
add_arrow_test(util-internal-test PREFIX "arrow-compute")

# Aggregates
add_arrow_test(aggregate-test PREFIX "arrow-compute")
add_arrow_benchmark(aggregate-benchmark PREFIX "arrow-compute")

# Filters
# Comparison
add_arrow_test(compare-test PREFIX "arrow-compute")
add_arrow_benchmark(compare-benchmark PREFIX "arrow-compute")

# Selection
add_arrow_test(take-test PREFIX "arrow-compute")
add_arrow_test(filter-test PREFIX "arrow-compute")
add_arrow_benchmark(filter-benchmark PREFIX "arrow-compute")
84 changes: 84 additions & 0 deletions cpp/src/arrow/compute/kernels/compare-benchmark.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "benchmark/benchmark.h"

#include <vector>

#include "arrow/compute/benchmark-util.h"
#include "arrow/compute/kernel.h"
#include "arrow/compute/kernels/compare.h"
#include "arrow/compute/test-util.h"
#include "arrow/testing/gtest_util.h"
#include "arrow/testing/random.h"

namespace arrow {
namespace compute {

constexpr auto kSeed = 0x94378165;

static void CompareArrayScalarKernel(benchmark::State& state) {
const int64_t memory_size = state.range(0);
const int64_t array_size = memory_size / sizeof(int64_t);
const double null_percent = static_cast<double>(state.range(1)) / 100.0;
auto rand = random::RandomArrayGenerator(kSeed);
auto array = std::static_pointer_cast<NumericArray<Int64Type>>(
rand.Int64(array_size, -100, 100, null_percent));

CompareOptions ge{GREATER_EQUAL};

FunctionContext ctx;
for (auto _ : state) {
Datum out;
ABORT_NOT_OK(Compare(&ctx, Datum(array), Datum(int64_t(0)), ge, &out));
benchmark::DoNotOptimize(out);
}

state.counters["size"] = static_cast<double>(memory_size);
state.counters["null_percent"] = static_cast<double>(state.range(1));
state.SetBytesProcessed(state.iterations() * array_size * sizeof(int64_t));
}

static void CompareArrayArrayKernel(benchmark::State& state) {
const int64_t memory_size = state.range(0);
const int64_t array_size = memory_size / sizeof(int64_t);
const double null_percent = static_cast<double>(state.range(1)) / 100.0;
auto rand = random::RandomArrayGenerator(kSeed);
auto lhs = std::static_pointer_cast<NumericArray<Int64Type>>(
rand.Int64(array_size, -100, 100, null_percent));
auto rhs = std::static_pointer_cast<NumericArray<Int64Type>>(
rand.Int64(array_size, -100, 100, null_percent));

CompareOptions ge(GREATER_EQUAL);

FunctionContext ctx;
for (auto _ : state) {
Datum out;
ABORT_NOT_OK(Compare(&ctx, Datum(lhs), Datum(rhs), ge, &out));
benchmark::DoNotOptimize(out);
}

state.counters["size"] = static_cast<double>(memory_size);
state.counters["null_percent"] = static_cast<double>(state.range(1));
state.SetBytesProcessed(state.iterations() * array_size * sizeof(int64_t) * 2);
}

BENCHMARK(CompareArrayScalarKernel)->Apply(RegressionSetArgs);
BENCHMARK(CompareArrayArrayKernel)->Apply(RegressionSetArgs);

} // namespace compute
} // namespace arrow
Loading