Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
f8e258b
ARROW-10322: [C++][Dataset] Minimize dataset::Expression
bkietz Oct 26, 2020
c31069e
replace filtering with Expression2
bkietz Dec 1, 2020
67dcd0e
remove ExpressionState
bkietz Dec 9, 2020
e9dffce
repair implicit casts
bkietz Dec 10, 2020
0e6da5b
delete Expression DSL operators and old Expression class
bkietz Dec 10, 2020
ec6d9a0
rename Expression2 -> Expression
bkietz Dec 10, 2020
9f05186
first pass at repairing bindings
bkietz Dec 10, 2020
c691f6b
add more scalar cast implementations
bkietz Dec 11, 2020
1083e08
use dataset___expr__call to create Call expressions
bkietz Dec 14, 2020
557ca31
lint fixes
bkietz Dec 14, 2020
9d5a57d
Refactor dataset expression code to follow array expression pattern
nealrichardson Dec 15, 2020
a04c7d9
A little more
nealrichardson Dec 15, 2020
2470187
revamp Expression::ToString, make cast-from-string error more informa…
bkietz Dec 15, 2020
d2ccb27
print and_kleene, or_kleene as binary ops
bkietz Dec 15, 2020
c149640
clean up Expression class, extract Parameter
bkietz Dec 16, 2020
cc1fd25
revert variant noexcept changes
bkietz Dec 16, 2020
6f2cc2c
get python binding building
bkietz Dec 17, 2020
44017b9
fix doc generation for struct function
bkietz Dec 17, 2020
13aca81
review comments
bkietz Dec 18, 2020
a88685e
repair linkage on Expression friends
bkietz Dec 18, 2020
9be418b
centos-7 doesn't recognize this initializer list
bkietz Dec 18, 2020
82c32de
get test_{dataset,parquet}.py passing
bkietz Dec 18, 2020
615efdb
move Identical() impl to .cc
bkietz Dec 18, 2020
c220823
export ExecuteScalarExpression
bkietz Dec 18, 2020
38389e7
remove test_parquet.py
bkietz Jan 4, 2021
fbe89d1
rename struct->project, move out of cast's test
bkietz Jan 4, 2021
f03953b
add some docstrings
bkietz Jan 4, 2021
63f1464
extract BindNonRecursive, filter2->filter, comments
bkietz Jan 5, 2021
607ed96
move more things to namespace{}, docstring for Modify
bkietz Jan 5, 2021
f952400
ensure field_ref into list safely errors
bkietz Jan 5, 2021
5f29f52
debug prints
bkietz Jan 5, 2021
0dffb56
remove unused functions
bkietz Jan 6, 2021
24c9277
take ownership of buffer to preserve deserialized arrays' storage
bkietz Jan 6, 2021
1292076
cleanup, FieldPath::operator bool -> empty
bkietz Jan 6, 2021
a072952
move project to scalar_nested.cc, add test for different chunking
bkietz Jan 6, 2021
9ce0adb
remove AddSimpleArrayOnlyCast
bkietz Jan 6, 2021
b14a456
clang-format
bkietz Jan 6, 2021
e8080da
incorrect projection in CsvFileFormat
bkietz Jan 6, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions cpp/examples/arrow/dataset-parquet-scan-example.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
#include <arrow/api.h>
#include <arrow/dataset/dataset.h>
#include <arrow/dataset/discovery.h>
#include <arrow/dataset/expression.h>
#include <arrow/dataset/file_base.h>
#include <arrow/dataset/file_parquet.h>
#include <arrow/dataset/filter.h>
#include <arrow/dataset/scanner.h>
#include <arrow/filesystem/filesystem.h>
#include <arrow/filesystem/path_util.h>
Expand All @@ -37,8 +37,6 @@ namespace fs = arrow::fs;

namespace ds = arrow::dataset;

using ds::string_literals::operator"" _;

#define ABORT_ON_FAILURE(expr) \
do { \
arrow::Status status_ = (expr); \
Expand All @@ -62,7 +60,8 @@ struct Configuration {

// Indicates the filter by which rows will be filtered. This optimization can
// make use of partition information and/or file metadata if possible.
std::shared_ptr<ds::Expression> filter = ("total_amount"_ > 1000.0f).Copy();
ds::Expression filter =
ds::greater(ds::field_ref("total_amount"), ds::literal(1000.0f));

ds::InspectOptions inspect_options{};
ds::FinishOptions finish_options{};
Expand Down Expand Up @@ -147,17 +146,15 @@ std::shared_ptr<ds::Dataset> GetDatasetFromPath(

std::shared_ptr<ds::Scanner> GetScannerFromDataset(std::shared_ptr<ds::Dataset> dataset,
std::vector<std::string> columns,
std::shared_ptr<ds::Expression> filter,
ds::Expression filter,
bool use_threads) {
auto scanner_builder = dataset->NewScan().ValueOrDie();

if (!columns.empty()) {
ABORT_ON_FAILURE(scanner_builder->Project(columns));
}

if (filter != nullptr) {
ABORT_ON_FAILURE(scanner_builder->Filter(filter));
}
ABORT_ON_FAILURE(scanner_builder->Filter(filter));

ABORT_ON_FAILURE(scanner_builder->UseThreads(use_threads));

Expand Down
25 changes: 25 additions & 0 deletions cpp/src/arrow/array/array_struct_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include "arrow/array.h"
#include "arrow/array/builder_nested.h"
#include "arrow/chunked_array.h"
#include "arrow/status.h"
#include "arrow/testing/gtest_common.h"
#include "arrow/testing/gtest_util.h"
Expand Down Expand Up @@ -582,4 +583,28 @@ TEST_F(TestStructBuilder, TestSlice) {
ASSERT_EQ(list_field->null_count(), 1);
}

TEST(TestFieldRef, GetChildren) {
auto struct_array = ArrayFromJSON(struct_({field("a", float64())}), R"([
{"a": 6.125},
{"a": 0.0},
{"a": -1}
])");

ASSERT_OK_AND_ASSIGN(auto a, FieldRef("a").GetOne(*struct_array));
auto expected_a = ArrayFromJSON(float64(), "[6.125, 0.0, -1]");
AssertArraysEqual(*a, *expected_a);

// more nested:
struct_array =
ArrayFromJSON(struct_({field("a", struct_({field("a", float64())}))}), R"([
{"a": {"a": 6.125}},
{"a": {"a": 0.0}},
{"a": {"a": -1}}
])");

ASSERT_OK_AND_ASSIGN(a, FieldRef("a", "a").GetOne(*struct_array));
expected_a = ArrayFromJSON(float64(), "[6.125, 0.0, -1]");
AssertArraysEqual(*a, *expected_a);
}

} // namespace arrow
17 changes: 15 additions & 2 deletions cpp/src/arrow/array/util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,9 @@ class RepeatedArrayFactory {
return out_;
}

Status Visit(const DataType& type) {
return Status::NotImplemented("construction from scalar of type ", *scalar_.type);
Status Visit(const NullType& type) {
DCHECK(false); // already forwarded to MakeArrayOfNull
return Status::OK();
}

Status Visit(const BooleanType&) {
Expand Down Expand Up @@ -403,6 +404,18 @@ class RepeatedArrayFactory {
return Status::OK();
}

Status Visit(const ExtensionType& type) {
return Status::NotImplemented("construction from scalar of type ", *scalar_.type);
}

Status Visit(const DenseUnionType& type) {
return Status::NotImplemented("construction from scalar of type ", *scalar_.type);
}

Status Visit(const SparseUnionType& type) {
return Status::NotImplemented("construction from scalar of type ", *scalar_.type);
}

template <typename OffsetType>
Status CreateOffsetsBuffer(OffsetType value_length, std::shared_ptr<Buffer>* out) {
TypedBufferBuilder<OffsetType> builder(pool_);
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/chunked_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ class ARROW_EXPORT ChunkedArray {
/// data type.
explicit ChunkedArray(ArrayVector chunks);

ChunkedArray(ChunkedArray&&) = default;
ChunkedArray& operator=(ChunkedArray&&) = default;

/// \brief Construct a chunked array from a single Array
explicit ChunkedArray(std::shared_ptr<Array> chunk)
: ChunkedArray(ArrayVector{std::move(chunk)}) {}
Expand Down
11 changes: 9 additions & 2 deletions cpp/src/arrow/compute/api_scalar.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ struct ARROW_EXPORT SplitPatternOptions : public SplitOptions {

/// Options for IsIn and IndexIn functions
struct ARROW_EXPORT SetLookupOptions : public FunctionOptions {
explicit SetLookupOptions(Datum value_set, bool skip_nulls)
explicit SetLookupOptions(Datum value_set, bool skip_nulls = false)
: value_set(std::move(value_set)), skip_nulls(skip_nulls) {}

/// The set of values to look up input values into.
Expand All @@ -86,7 +86,7 @@ struct ARROW_EXPORT SetLookupOptions : public FunctionOptions {

struct ARROW_EXPORT StrptimeOptions : public FunctionOptions {
explicit StrptimeOptions(std::string format, TimeUnit::type unit)
: format(format), unit(unit) {}
: format(std::move(format)), unit(unit) {}

std::string format;
TimeUnit::type unit;
Expand All @@ -107,6 +107,13 @@ struct CompareOptions : public FunctionOptions {
enum CompareOperator op;
};

struct ARROW_EXPORT ProjectOptions : public FunctionOptions {
explicit ProjectOptions(std::vector<std::string> n) : field_names(std::move(n)) {}

/// Names for wrapped columns
std::vector<std::string> field_names;
};

/// @}

/// \brief Add two values together. Array values must be the same length. If
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/compute/cast.cc
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ CastFunction::CastFunction(std::string name, Type::type out_type)
impl_->out_type = out_type;
}

CastFunction::~CastFunction() {}
CastFunction::~CastFunction() = default;

Type::type CastFunction::out_type_id() const { return impl_->out_type; }

Expand Down
24 changes: 12 additions & 12 deletions cpp/src/arrow/compute/cast.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,25 +42,25 @@ class ExecContext;
/// @{

struct ARROW_EXPORT CastOptions : public FunctionOptions {
CastOptions()
: allow_int_overflow(false),
allow_time_truncate(false),
allow_time_overflow(false),
allow_decimal_truncate(false),
allow_float_truncate(false),
allow_invalid_utf8(false) {}

explicit CastOptions(bool safe)
explicit CastOptions(bool safe = true)
: allow_int_overflow(!safe),
allow_time_truncate(!safe),
allow_time_overflow(!safe),
allow_decimal_truncate(!safe),
allow_float_truncate(!safe),
allow_invalid_utf8(!safe) {}

static CastOptions Safe() { return CastOptions(true); }
static CastOptions Safe(std::shared_ptr<DataType> to_type = NULLPTR) {
CastOptions safe(true);
safe.to_type = std::move(to_type);
return safe;
}

static CastOptions Unsafe() { return CastOptions(false); }
static CastOptions Unsafe(std::shared_ptr<DataType> to_type = NULLPTR) {
CastOptions unsafe(false);
unsafe.to_type = std::move(to_type);
return unsafe;
}

// Type being casted to. May be passed separate to eager function
// compute::Cast
Expand All @@ -83,7 +83,7 @@ struct ARROW_EXPORT CastOptions : public FunctionOptions {
class CastFunction : public ScalarFunction {
public:
CastFunction(std::string name, Type::type out_type);
~CastFunction();
~CastFunction() override;

Type::type out_type_id() const;

Expand Down
3 changes: 2 additions & 1 deletion cpp/src/arrow/compute/exec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,8 @@ class ScalarExecutor : public KernelExecutorImpl<ScalarKernel> {
// Decide if we need to preallocate memory for this kernel
validity_preallocated_ =
(kernel_->null_handling != NullHandling::COMPUTED_NO_PREALLOCATE &&
kernel_->null_handling != NullHandling::OUTPUT_NOT_NULL);
kernel_->null_handling != NullHandling::OUTPUT_NOT_NULL &&
output_descr_.type->id() != Type::NA);
if (kernel_->mem_allocation == MemAllocation::PREALLOCATE) {
ComputeDataPreallocate(*output_descr_.type, &data_preallocated_);
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/compute/exec.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ class ARROW_EXPORT SelectionVector {
/// than is desirable for this class. Microbenchmarks would help determine for
/// sure. See ARROW-8928.
struct ExecBatch {
ExecBatch() {}
ExecBatch() = default;
ExecBatch(std::vector<Datum> values, int64_t length)
: values(std::move(values)), length(length) {}

Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/compute/exec_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,14 @@ class ARROW_EXPORT ExecListener {

class DatumAccumulator : public ExecListener {
public:
DatumAccumulator() {}
DatumAccumulator() = default;

Status OnResult(Datum value) override {
values_.emplace_back(value);
return Status::OK();
}

std::vector<Datum> values() const { return values_; }
std::vector<Datum> values() { return std::move(values_); }

private:
std::vector<Datum> values_;
Expand Down
11 changes: 8 additions & 3 deletions cpp/src/arrow/compute/function.cc
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,15 @@ Result<Datum> Function::Execute(const std::vector<Datum>& args,
Status Function::Validate() const {
if (!doc_->summary.empty()) {
// Documentation given, check its contents
if (static_cast<int>(doc_->arg_names.size()) != arity_.num_args) {
return Status::Invalid("In function '", name_,
"': ", "number of argument names != function arity");
int arg_count = static_cast<int>(doc_->arg_names.size());
if (arg_count == arity_.num_args) {
return Status::OK();
}
if (arity_.is_varargs && arg_count == arity_.num_args + 1) {
return Status::OK();
}
return Status::Invalid("In function '", name_,
"': ", "number of argument names != function arity");
}
return Status::OK();
}
Expand Down
6 changes: 4 additions & 2 deletions cpp/src/arrow/compute/function.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ namespace compute {

/// \brief Base class for specifying options configuring a function's behavior,
/// such as error handling.
struct ARROW_EXPORT FunctionOptions {};
struct ARROW_EXPORT FunctionOptions {
virtual ~FunctionOptions() = default;
};

/// \brief Contains the number of required arguments for the function.
///
Expand Down Expand Up @@ -96,7 +98,7 @@ struct ARROW_EXPORT FunctionDoc {
/// \brief Name of the options class, if any.
std::string options_class;

FunctionDoc() {}
FunctionDoc() = default;

FunctionDoc(std::string summary, std::string description,
std::vector<std::string> arg_names, std::string options_class = "")
Expand Down
30 changes: 19 additions & 11 deletions cpp/src/arrow/compute/kernels/scalar_cast_internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,12 @@ void CastNumberToNumberUnsafe(Type::type in_type, Type::type out_type, const Dat
// ----------------------------------------------------------------------

void UnpackDictionary(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
if (out->is_scalar()) {
KERNEL_ASSIGN_OR_RAISE(*out, ctx,
batch[0].scalar_as<DictionaryScalar>().GetEncodedValue());
return;
}

DictionaryArray dict_arr(batch[0].array());
const CastOptions& options = checked_cast<const CastState&>(*ctx->state()).options;

Expand All @@ -160,16 +166,16 @@ void UnpackDictionary(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
return;
}

Result<Datum> result = Take(Datum(dict_arr.dictionary()), Datum(dict_arr.indices()),
/*options=*/TakeOptions::Defaults(), ctx->exec_context());
if (!result.ok()) {
ctx->SetStatus(result.status());
return;
}
*out = *result;
KERNEL_ASSIGN_OR_RAISE(*out, ctx,
Take(Datum(dict_arr.dictionary()), Datum(dict_arr.indices()),
TakeOptions::Defaults(), ctx->exec_context()));
}

void OutputAllNull(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
if (out->is_scalar()) {
out->scalar()->is_valid = false;
return;
}
ArrayData* output = out->mutable_array();
output->buffers = {nullptr};
output->null_count = batch.length;
Expand All @@ -191,6 +197,8 @@ void CastFromExtension(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
}

void CastFromNull(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
if (batch[0].is_scalar()) return;

ArrayData* output = out->mutable_array();
std::shared_ptr<Array> nulls;
Status s = MakeArrayOfNull(output->type, batch.length).Value(&nulls);
Expand Down Expand Up @@ -251,17 +259,17 @@ static bool CanCastFromDictionary(Type::type type_id) {

void AddCommonCasts(Type::type out_type_id, OutputType out_ty, CastFunction* func) {
// From null to this type
DCHECK_OK(func->AddKernel(Type::NA, {InputType::Array(null())}, out_ty, CastFromNull));
DCHECK_OK(func->AddKernel(Type::NA, {null()}, out_ty, CastFromNull));

// From dictionary to this type
if (CanCastFromDictionary(out_type_id)) {
// Dictionary unpacking not implemented for boolean or nested types.
//
// XXX: Uses Take and does its own memory allocation for the moment. We can
// fix this later.
DCHECK_OK(func->AddKernel(
Type::DICTIONARY, {InputType::Array(Type::DICTIONARY)}, out_ty, UnpackDictionary,
NullHandling::COMPUTED_NO_PREALLOCATE, MemAllocation::NO_PREALLOCATE));
DCHECK_OK(func->AddKernel(Type::DICTIONARY, {InputType(Type::DICTIONARY)}, out_ty,
UnpackDictionary, NullHandling::COMPUTED_NO_PREALLOCATE,
MemAllocation::NO_PREALLOCATE));
}

// From extension type to this type
Expand Down
11 changes: 7 additions & 4 deletions cpp/src/arrow/compute/kernels/scalar_cast_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "arrow/compute/cast.h" // IWYU pragma: export
#include "arrow/compute/cast_internal.h" // IWYU pragma: export
#include "arrow/compute/kernels/common.h"
#include "arrow/compute/kernels/util_internal.h"

namespace arrow {

Expand Down Expand Up @@ -54,12 +55,14 @@ void OutputAllNull(KernelContext* ctx, const ExecBatch& batch, Datum* out);

void CastFromNull(KernelContext* ctx, const ExecBatch& batch, Datum* out);

// Adds a cast function where the functor is defined and the input and output
// types have a type_singleton
// Adds a cast function where CastFunctor is specialized and the input and output
// types are parameter free (have a type_singleton). Scalar inputs are handled by
// wrapping with TrivialScalarUnaryAsArraysExec.
template <typename InType, typename OutType>
void AddSimpleCast(InputType in_ty, OutputType out_ty, CastFunction* func) {
DCHECK_OK(func->AddKernel(InType::type_id, {in_ty}, out_ty,
CastFunctor<OutType, InType>::Exec));
DCHECK_OK(func->AddKernel(
InType::type_id, {in_ty}, out_ty,
TrivialScalarUnaryAsArraysExec(CastFunctor<OutType, InType>::Exec)));
}

void ZeroCopyCastExec(KernelContext* ctx, const ExecBatch& batch, Datum* out);
Expand Down
Loading