Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
619fa13
ARROW-10322: [C++][Dataset] Minimize dataset::Expression
bkietz Oct 26, 2020
1152d2c
replace filtering with Expression2
bkietz Dec 1, 2020
d9c6ae5
remove ExpressionState
bkietz Dec 9, 2020
36f1bbb
repair implicit casts
bkietz Dec 10, 2020
b954ef8
delete Expression DSL operators and old Expression class
bkietz Dec 10, 2020
e179f42
rename Expression2 -> Expression
bkietz Dec 10, 2020
c3e6be3
first pass at repairing bindings
bkietz Dec 10, 2020
27ebb46
add more scalar cast implementations
bkietz Dec 11, 2020
9245f24
use dataset___expr__call to create Call expressions
bkietz Dec 14, 2020
7fc23dd
lint fixes
bkietz Dec 14, 2020
d9ec4bf
Refactor dataset expression code to follow array expression pattern
nealrichardson Dec 15, 2020
35b9e1f
A little more
nealrichardson Dec 15, 2020
ec6ee9f
revamp Expression::ToString, make cast-from-string error more informa…
bkietz Dec 15, 2020
c560cbb
Merge branch '10322-Minimize-Expression-to-a-' of github.com:bkietz/a…
bkietz Dec 15, 2020
8bb9b90
print and_kleene, or_kleene as binary ops
bkietz Dec 15, 2020
7c88e35
clean up Expression class, extract Parameter
bkietz Dec 16, 2020
dfad163
revert variant noexcept changes
bkietz Dec 16, 2020
b32f254
Start adding support for arithmetic functions in R
nealrichardson Dec 17, 2020
6b3e796
get python binding building
bkietz Dec 17, 2020
849308a
fix doc generation for struct function
bkietz Dec 17, 2020
9b8b2ab
review comments
bkietz Dec 18, 2020
5206104
repair linkage on Expression friends
bkietz Dec 18, 2020
7ea0664
centos-7 doesn't recognize this initializer list
bkietz Dec 18, 2020
1a7d572
get test_{dataset,parquet}.py passing
bkietz Dec 18, 2020
5de9db0
move Identical() impl to .cc
bkietz Dec 18, 2020
9718255
export ExecuteScalarExpression
bkietz Dec 18, 2020
2197c88
Adjust some of the casting, datasets not (yet) working
jonkeane Dec 23, 2020
0ac1858
expresion-based casting for division
jonkeane Dec 23, 2020
1c8ab00
Merge remote-tracking branch 'bkietz/10322-Minimize-Expression-to-a-'…
jonkeane Dec 29, 2020
c0e7725
PR comments
jonkeane Dec 30, 2020
843ff2a
Move array/scalar autocasting to eval time
nealrichardson Dec 30, 2020
391061e
Add jira to skips
nealrichardson Dec 30, 2020
e3d3a6a
Remove datetime hackery
jonkeane Dec 31, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 5 additions & 8 deletions cpp/examples/arrow/dataset-parquet-scan-example.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
#include <arrow/api.h>
#include <arrow/dataset/dataset.h>
#include <arrow/dataset/discovery.h>
#include <arrow/dataset/expression.h>
#include <arrow/dataset/file_base.h>
#include <arrow/dataset/file_parquet.h>
#include <arrow/dataset/filter.h>
#include <arrow/dataset/scanner.h>
#include <arrow/filesystem/filesystem.h>
#include <arrow/filesystem/path_util.h>
Expand All @@ -37,8 +37,6 @@ namespace fs = arrow::fs;

namespace ds = arrow::dataset;

using ds::string_literals::operator"" _;

#define ABORT_ON_FAILURE(expr) \
do { \
arrow::Status status_ = (expr); \
Expand All @@ -62,7 +60,8 @@ struct Configuration {

// Indicates the filter by which rows will be filtered. This optimization can
// make use of partition information and/or file metadata if possible.
std::shared_ptr<ds::Expression> filter = ("total_amount"_ > 1000.0f).Copy();
ds::Expression filter =
ds::greater(ds::field_ref("total_amount"), ds::literal(1000.0f));

ds::InspectOptions inspect_options{};
ds::FinishOptions finish_options{};
Expand Down Expand Up @@ -147,17 +146,15 @@ std::shared_ptr<ds::Dataset> GetDatasetFromPath(

std::shared_ptr<ds::Scanner> GetScannerFromDataset(std::shared_ptr<ds::Dataset> dataset,
std::vector<std::string> columns,
std::shared_ptr<ds::Expression> filter,
ds::Expression filter,
bool use_threads) {
auto scanner_builder = dataset->NewScan().ValueOrDie();

if (!columns.empty()) {
ABORT_ON_FAILURE(scanner_builder->Project(columns));
}

if (filter != nullptr) {
ABORT_ON_FAILURE(scanner_builder->Filter(filter));
}
ABORT_ON_FAILURE(scanner_builder->Filter(filter));

ABORT_ON_FAILURE(scanner_builder->UseThreads(use_threads));

Expand Down
36 changes: 36 additions & 0 deletions cpp/src/arrow/array/array_struct_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include "arrow/array.h"
#include "arrow/array/builder_nested.h"
#include "arrow/chunked_array.h"
#include "arrow/status.h"
#include "arrow/testing/gtest_common.h"
#include "arrow/testing/gtest_util.h"
Expand Down Expand Up @@ -582,4 +583,39 @@ TEST_F(TestStructBuilder, TestSlice) {
ASSERT_EQ(list_field->null_count(), 1);
}

TEST(TestFieldRef, GetChildren) {
auto struct_array = ArrayFromJSON(struct_({field("a", float64())}), R"([
{"a": 6.125},
{"a": 0.0},
{"a": -1}
])");

ASSERT_OK_AND_ASSIGN(auto a, FieldRef("a").GetOne(*struct_array));
auto expected_a = ArrayFromJSON(float64(), "[6.125, 0.0, -1]");
AssertArraysEqual(*a, *expected_a);

auto ToChunked = [struct_array](int64_t midpoint) {
return ChunkedArray(
ArrayVector{
struct_array->Slice(0, midpoint),
struct_array->Slice(midpoint),
},
struct_array->type());
};
AssertChunkedEquivalent(ToChunked(1), ToChunked(2));

// more nested:
struct_array =
ArrayFromJSON(struct_({field("a", struct_({field("a", float64())}))}), R"([
{"a": {"a": 6.125}},
{"a": {"a": 0.0}},
{"a": {"a": -1}}
])");

ASSERT_OK_AND_ASSIGN(a, FieldRef("a", "a").GetOne(*struct_array));
expected_a = ArrayFromJSON(float64(), "[6.125, 0.0, -1]");
AssertArraysEqual(*a, *expected_a);
AssertChunkedEquivalent(ToChunked(1), ToChunked(2));
}

} // namespace arrow
17 changes: 15 additions & 2 deletions cpp/src/arrow/array/util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,9 @@ class RepeatedArrayFactory {
return out_;
}

Status Visit(const DataType& type) {
return Status::NotImplemented("construction from scalar of type ", *scalar_.type);
Status Visit(const NullType& type) {
DCHECK(false); // already forwarded to MakeArrayOfNull
return Status::OK();
}

Status Visit(const BooleanType&) {
Expand Down Expand Up @@ -403,6 +404,18 @@ class RepeatedArrayFactory {
return Status::OK();
}

Status Visit(const ExtensionType& type) {
return Status::NotImplemented("construction from scalar of type ", *scalar_.type);
}

Status Visit(const DenseUnionType& type) {
return Status::NotImplemented("construction from scalar of type ", *scalar_.type);
}

Status Visit(const SparseUnionType& type) {
return Status::NotImplemented("construction from scalar of type ", *scalar_.type);
}

template <typename OffsetType>
Status CreateOffsetsBuffer(OffsetType value_length, std::shared_ptr<Buffer>* out) {
TypedBufferBuilder<OffsetType> builder(pool_);
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/chunked_array.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ class ARROW_EXPORT ChunkedArray {
/// data type.
explicit ChunkedArray(ArrayVector chunks);

ChunkedArray(ChunkedArray&&) = default;
ChunkedArray& operator=(ChunkedArray&&) = default;

/// \brief Construct a chunked array from a single Array
explicit ChunkedArray(std::shared_ptr<Array> chunk)
: ChunkedArray(ArrayVector{std::move(chunk)}) {}
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/compute/api_scalar.h
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ struct ARROW_EXPORT SplitPatternOptions : public SplitOptions {

/// Options for IsIn and IndexIn functions
struct ARROW_EXPORT SetLookupOptions : public FunctionOptions {
explicit SetLookupOptions(Datum value_set, bool skip_nulls)
explicit SetLookupOptions(Datum value_set, bool skip_nulls = false)
: value_set(std::move(value_set)), skip_nulls(skip_nulls) {}

/// The set of values to look up input values into.
Expand All @@ -86,7 +86,7 @@ struct ARROW_EXPORT SetLookupOptions : public FunctionOptions {

struct ARROW_EXPORT StrptimeOptions : public FunctionOptions {
explicit StrptimeOptions(std::string format, TimeUnit::type unit)
: format(format), unit(unit) {}
: format(std::move(format)), unit(unit) {}

std::string format;
TimeUnit::type unit;
Expand Down
80 changes: 79 additions & 1 deletion cpp/src/arrow/compute/cast.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,86 @@ class CastMetaFunction : public MetaFunction {

} // namespace

const FunctionDoc struct_doc{"Wrap Arrays into a StructArray",
("Names of the StructArray's fields are\n"
"specified through StructOptions."),
{"*args"},
"StructOptions"};

Result<ValueDescr> StructResolve(KernelContext* ctx,
const std::vector<ValueDescr>& descrs) {
const auto& names = OptionsWrapper<StructOptions>::Get(ctx).field_names;
if (names.size() != descrs.size()) {
return Status::Invalid("Struct() was passed ", names.size(), " field ", "names but ",
descrs.size(), " arguments");
}

size_t i = 0;
FieldVector fields(descrs.size());

ValueDescr::Shape shape = ValueDescr::SCALAR;
for (const ValueDescr& descr : descrs) {
if (descr.shape != ValueDescr::SCALAR) {
shape = ValueDescr::ARRAY;
} else {
switch (descr.type->id()) {
case Type::EXTENSION:
case Type::DENSE_UNION:
case Type::SPARSE_UNION:
return Status::NotImplemented("Broadcasting scalars of type ", *descr.type);
default:
break;
}
}

fields[i] = field(names[i], descr.type);
++i;
}

return ValueDescr{struct_(std::move(fields)), shape};
}

void StructExec(KernelContext* ctx, const ExecBatch& batch, Datum* out) {
KERNEL_ASSIGN_OR_RAISE(auto descr, ctx, StructResolve(ctx, batch.GetDescriptors()));

if (descr.shape == ValueDescr::SCALAR) {
ScalarVector scalars(batch.num_values());
for (int i = 0; i < batch.num_values(); ++i) {
scalars[i] = batch[i].scalar();
}

*out =
Datum(std::make_shared<StructScalar>(std::move(scalars), std::move(descr.type)));
return;
}

ArrayVector arrays(batch.num_values());
for (int i = 0; i < batch.num_values(); ++i) {
if (batch[i].is_array()) {
arrays[i] = batch[i].make_array();
continue;
}

KERNEL_ASSIGN_OR_RAISE(
arrays[i], ctx,
MakeArrayFromScalar(*batch[i].scalar(), batch.length, ctx->memory_pool()));
}

*out = std::make_shared<StructArray>(descr.type, batch.length, std::move(arrays));
}

void RegisterScalarCast(FunctionRegistry* registry) {
DCHECK_OK(registry->AddFunction(std::make_shared<CastMetaFunction>()));

auto struct_function =
std::make_shared<ScalarFunction>("struct", Arity::VarArgs(), &struct_doc);
ScalarKernel kernel{KernelSignature::Make({InputType{}}, OutputType{StructResolve},
/*is_varargs=*/true),
StructExec, OptionsWrapper<StructOptions>::Init};
kernel.null_handling = NullHandling::OUTPUT_NOT_NULL;
kernel.mem_allocation = MemAllocation::NO_PREALLOCATE;
DCHECK_OK(struct_function->AddKernel(std::move(kernel)));
DCHECK_OK(registry->AddFunction(std::move(struct_function)));
}

} // namespace internal
Expand All @@ -135,7 +213,7 @@ CastFunction::CastFunction(std::string name, Type::type out_type)
impl_->out_type = out_type;
}

CastFunction::~CastFunction() {}
CastFunction::~CastFunction() = default;

Type::type CastFunction::out_type_id() const { return impl_->out_type; }

Expand Down
36 changes: 24 additions & 12 deletions cpp/src/arrow/compute/cast.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,25 +42,25 @@ class ExecContext;
/// @{

struct ARROW_EXPORT CastOptions : public FunctionOptions {
CastOptions()
: allow_int_overflow(false),
allow_time_truncate(false),
allow_time_overflow(false),
allow_decimal_truncate(false),
allow_float_truncate(false),
allow_invalid_utf8(false) {}

explicit CastOptions(bool safe)
explicit CastOptions(bool safe = true)
: allow_int_overflow(!safe),
allow_time_truncate(!safe),
allow_time_overflow(!safe),
allow_decimal_truncate(!safe),
allow_float_truncate(!safe),
allow_invalid_utf8(!safe) {}

static CastOptions Safe() { return CastOptions(true); }
static CastOptions Safe(std::shared_ptr<DataType> to_type = NULLPTR) {
CastOptions safe(true);
safe.to_type = std::move(to_type);
return safe;
}

static CastOptions Unsafe() { return CastOptions(false); }
static CastOptions Unsafe(std::shared_ptr<DataType> to_type = NULLPTR) {
CastOptions unsafe(false);
unsafe.to_type = std::move(to_type);
return unsafe;
}

// Type being casted to. May be passed separate to eager function
// compute::Cast
Expand All @@ -83,7 +83,7 @@ struct ARROW_EXPORT CastOptions : public FunctionOptions {
class CastFunction : public ScalarFunction {
public:
CastFunction(std::string name, Type::type out_type);
~CastFunction();
~CastFunction() override;

Type::type out_type_id() const;

Expand Down Expand Up @@ -157,5 +157,17 @@ Result<Datum> Cast(const Datum& value, std::shared_ptr<DataType> to_type,
const CastOptions& options = CastOptions::Safe(),
ExecContext* ctx = NULLPTR);

/// \addtogroup compute-concrete-options
/// @{

struct ARROW_EXPORT StructOptions : public FunctionOptions {
explicit StructOptions(std::vector<std::string> n) : field_names(std::move(n)) {}

/// Names for wrapped columns
std::vector<std::string> field_names;
};

/// @}

} // namespace compute
} // namespace arrow
3 changes: 2 additions & 1 deletion cpp/src/arrow/compute/exec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -648,7 +648,8 @@ class ScalarExecutor : public KernelExecutorImpl<ScalarKernel> {
// Decide if we need to preallocate memory for this kernel
validity_preallocated_ =
(kernel_->null_handling != NullHandling::COMPUTED_NO_PREALLOCATE &&
kernel_->null_handling != NullHandling::OUTPUT_NOT_NULL);
kernel_->null_handling != NullHandling::OUTPUT_NOT_NULL &&
output_descr_.type->id() != Type::NA);
if (kernel_->mem_allocation == MemAllocation::PREALLOCATE) {
ComputeDataPreallocate(*output_descr_.type, &data_preallocated_);
}
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/compute/exec.h
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ class ARROW_EXPORT SelectionVector {
/// than is desirable for this class. Microbenchmarks would help determine for
/// sure. See ARROW-8928.
struct ExecBatch {
ExecBatch() {}
ExecBatch() = default;
ExecBatch(std::vector<Datum> values, int64_t length)
: values(std::move(values)), length(length) {}

Expand Down
4 changes: 2 additions & 2 deletions cpp/src/arrow/compute/exec_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,14 @@ class ARROW_EXPORT ExecListener {

class DatumAccumulator : public ExecListener {
public:
DatumAccumulator() {}
DatumAccumulator() = default;

Status OnResult(Datum value) override {
values_.emplace_back(value);
return Status::OK();
}

std::vector<Datum> values() const { return values_; }
std::vector<Datum> values() { return std::move(values_); }

private:
std::vector<Datum> values_;
Expand Down
11 changes: 8 additions & 3 deletions cpp/src/arrow/compute/function.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,15 @@ Result<Datum> Function::Execute(const std::vector<Datum>& args,
Status Function::Validate() const {
if (!doc_->summary.empty()) {
// Documentation given, check its contents
if (static_cast<int>(doc_->arg_names.size()) != arity_.num_args) {
return Status::Invalid("In function '", name_,
"': ", "number of argument names != function arity");
int arg_count = static_cast<int>(doc_->arg_names.size());
if (arg_count == arity_.num_args) {
return Status::OK();
}
if (arity_.is_varargs && arg_count == arity_.num_args + 1) {
return Status::OK();
}
return Status::Invalid("In function '", name_,
"': ", "number of argument names != function arity");
}
return Status::OK();
}
Expand Down
6 changes: 4 additions & 2 deletions cpp/src/arrow/compute/function.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,9 @@ namespace compute {

/// \brief Base class for specifying options configuring a function's behavior,
/// such as error handling.
struct ARROW_EXPORT FunctionOptions {};
struct ARROW_EXPORT FunctionOptions {
virtual ~FunctionOptions() = default;
};

/// \brief Contains the number of required arguments for the function.
///
Expand Down Expand Up @@ -96,7 +98,7 @@ struct ARROW_EXPORT FunctionDoc {
/// \brief Name of the options class, if any.
std::string options_class;

FunctionDoc() {}
FunctionDoc() = default;

FunctionDoc(std::string summary, std::string description,
std::vector<std::string> arg_names, std::string options_class = "")
Expand Down
Loading