Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
8fe9d7e
ARROW-12759: [C++][Compute] Add ExecNode for group by
michalursa Jul 6, 2021
7020192
ARROW-12759: [C++][Compute] Add basic unit test for group by ExecNode
michalursa Jul 7, 2021
eb79dd3
ARROW-12759: [C++][Compute] Add ExecNode for group by
michalursa Jul 6, 2021
7cbe2d6
ARROW-12759: [C++][Compute] Add basic unit test for group by ExecNode
michalursa Jul 7, 2021
9e0b35c
add support for merging grouped aggregation state
bkietz Jul 7, 2021
fa084cb
remove confusing Grouped*Impl::*Impl-based impls
bkietz Jul 8, 2021
2d0f174
Add merge for other hash agg kernels
bkietz Jul 8, 2021
e1efb70
add overload of Resize with default shrink_to_fit
bkietz Jul 8, 2021
f102acd
repair GroupByNode tests
bkietz Jul 9, 2021
4db5556
export MakeGroupByNode
bkietz Jul 9, 2021
8b31684
implement ExecNode::finished()
bkietz Jul 9, 2021
637643d
Merge branch 'ARROW-12759-execnode-for-groupby' of https://github.com…
michalursa Jul 9, 2021
6110546
GrouperFastImpl: fix wrong merge
michalursa Jul 9, 2021
831bfb3
ExecNode: fixing SinkNode calling Finish
michalursa Jul 12, 2021
3c4b9ba
GroupByNode: adding support for concurrent aggregation
michalursa Jul 13, 2021
a114a5c
Merge remote-tracking branch 'origin/master' into ARROW-12759-execnod…
michalursa Jul 14, 2021
b36348e
lint fixes
bkietz Jul 14, 2021
87cb50e
msvc: add explicit cast to size_t
bkietz Jul 14, 2021
d491654
relieve some constraints on MakeScanNode.options
bkietz Jul 14, 2021
f2da288
use ScalarAggregateNode in CountRows
bkietz Jul 14, 2021
336cb66
increment num_received_ inside the sync block with MaybeFinish()
bkietz Jul 14, 2021
9dd1751
add more EndToEnd tests
bkietz Jul 14, 2021
500ede5
GroupByNode: adding tests for parallel group by merge
michalursa Jul 16, 2021
083c13c
merge failure: double default move constructors
bkietz Jul 19, 2021
1a496dc
merge failure: make_struct
bkietz Jul 19, 2021
b63d868
Merge branch 'ARROW-12759-execnode-for-groupby' of github.com:michalu…
bkietz Jul 19, 2021
8108235
remove TestParallelSourceNode
bkietz Jul 20, 2021
e49c060
msvc: explicit int cast
bkietz Jul 20, 2021
a0c5d09
fix embarassing fork bomb
bkietz Jul 20, 2021
9c5d9d3
ensure ScalarAggregateNode::StopProducing prevents output from being …
bkietz Jul 20, 2021
b35e1fe
simplify GroupByNode state management
bkietz Jul 20, 2021
6f3b077
use a correct ExecContext in CountRows
bkietz Jul 20, 2021
a457e7a
wait for plan to finish before checking for errors
bkietz Jul 20, 2021
d26564d
extract some helpers for tracking input count/completion
bkietz Jul 21, 2021
0e26d6c
ensure we call Future<>::MarkFinished only once
bkietz Jul 22, 2021
92319f7
remove unnecessary locking
bkietz Jul 22, 2021
40223d8
guard against hanging 0-groups case
bkietz Jul 22, 2021
2b21880
paranoid assertions
bkietz Jul 22, 2021
6cc8ad6
msvc: explicit cast to int
bkietz Jul 22, 2021
24ba7c0
paranoid ~~assertions~~ IndexErrors
bkietz Jul 22, 2021
3c17e2a
Revert "try printing coredumps in python_test.sh"
bkietz Jul 22, 2021
6f6c324
Merge branch 'master' into ARROW-12759-execnode-for-groupby
bkietz Jul 22, 2021
64ca1f1
overallocate state to always accommodate multiple threads
bkietz Jul 22, 2021
8b22540
don't redundantly resolve the output type
bkietz Jul 22, 2021
a0e7364
Rely on FnOnce's releasing of resources in DoMarkFinishedOrFailed
bkietz Jul 22, 2021
5eaf8e9
ensure DoMarkFinishedOrFailed tolerates deletion of the called future
bkietz Jul 22, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions cpp/examples/arrow/compute_register_example.cc
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class ExampleFunctionOptionsType : public cp::FunctionOptionsType {
}
// optional: support for serialization
// Result<std::shared_ptr<Buffer>> Serialize(const FunctionOptions&) const override;
// Result<std::unique_ptr<FunctionOptions>> Deserialize(const Buffer& buffer) const override;
// Result<std::unique_ptr<FunctionOptions>> Deserialize(const Buffer&) const override;
};

cp::FunctionOptionsType* GetExampleFunctionOptionsType() {
Expand Down Expand Up @@ -74,8 +74,8 @@ const cp::FunctionDoc func_doc{
int main(int argc, char** argv) {
const std::string name = "compute_register_example";
auto func = std::make_shared<cp::ScalarFunction>(name, cp::Arity::Unary(), &func_doc);
func->AddKernel({cp::InputType::Array(arrow::int64())}, arrow::int64(),
ExampleFunctionImpl);
ABORT_ON_FAILURE(func->AddKernel({cp::InputType::Array(arrow::int64())}, arrow::int64(),
ExampleFunctionImpl));

auto registry = cp::GetFunctionRegistry();
ABORT_ON_FAILURE(registry->AddFunction(std::move(func)));
Expand All @@ -90,8 +90,8 @@ int main(int argc, char** argv) {

std::cout << maybe_result->make_array()->ToString() << std::endl;

// Expression serialization will raise NotImplemented if an expression includes FunctionOptions
// for which serialization is not supported.
// Expression serialization will raise NotImplemented if an expression includes
// FunctionOptions for which serialization is not supported.
auto expr = cp::call(name, {}, options);
auto maybe_serialized = cp::Serialize(expr);
std::cerr << maybe_serialized.status().ToString() << std::endl;
Expand Down
1 change: 1 addition & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ set(ARROW_SRCS
util/thread_pool.cc
util/time.cc
util/trie.cc
util/unreachable.cc
util/uri.cc
util/utf8.cc
util/value_parsing.cc
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/arrow/array/builder_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,10 @@ class ARROW_EXPORT ArrayBuilder {
public:
explicit ArrayBuilder(MemoryPool* pool) : pool_(pool), null_bitmap_builder_(pool) {}

virtual ~ArrayBuilder() = default;
ARROW_DEFAULT_MOVE_AND_ASSIGN(ArrayBuilder);

virtual ~ArrayBuilder() = default;

/// For nested types. Since the objects are owned by this class instance, we
/// skip shared pointers and just return a raw pointer
ArrayBuilder* child(int i) { return children_[i].get(); }
Expand Down
5 changes: 4 additions & 1 deletion cpp/src/arrow/buffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,10 @@ class ARROW_EXPORT ResizableBuffer : public MutableBuffer {
///
/// @param new_size The new size for the buffer.
/// @param shrink_to_fit Whether to shrink the capacity if new size < current size
virtual Status Resize(const int64_t new_size, bool shrink_to_fit = true) = 0;
virtual Status Resize(const int64_t new_size, bool shrink_to_fit) = 0;
Status Resize(const int64_t new_size) {
return Resize(new_size, /*shrink_to_fit=*/true);
}

/// Ensure that buffer has enough memory allocated to fit the indicated
/// capacity (and meets the 64 byte padding requirement in Layout.md).
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/compute/api_aggregate.h
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,7 @@ struct ARROW_EXPORT Aggregate {
/// This will be replaced by streaming execution operators.
ARROW_EXPORT
Result<Datum> GroupBy(const std::vector<Datum>& arguments, const std::vector<Datum>& keys,
const std::vector<Aggregate>& aggregates,
const std::vector<Aggregate>& aggregates, bool use_threads = false,
ExecContext* ctx = default_exec_context());

} // namespace internal
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/arrow/compute/api_vector.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#pragma once

#include <memory>
#include <utility>

#include "arrow/compute/function.h"
#include "arrow/datum.h"
Expand Down Expand Up @@ -87,7 +88,7 @@ enum class SortOrder {
class ARROW_EXPORT SortKey : public util::EqualityComparable<SortKey> {
public:
explicit SortKey(std::string name, SortOrder order = SortOrder::Ascending)
: name(name), order(order) {}
: name(std::move(name)), order(order) {}

using util::EqualityComparable<SortKey>::Equals;
using util::EqualityComparable<SortKey>::operator==;
Expand Down
7 changes: 7 additions & 0 deletions cpp/src/arrow/compute/exec.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <cstddef>
#include <cstdint>
#include <memory>
#include <sstream>
#include <utility>
#include <vector>

Expand Down Expand Up @@ -102,6 +103,12 @@ void PrintTo(const ExecBatch& batch, std::ostream* os) {
}
}

std::string ExecBatch::ToString() const {
std::stringstream ss;
PrintTo(*this, &ss);
return ss.str();
}

ExecBatch ExecBatch::Slice(int64_t offset, int64_t length) const {
ExecBatch out = *this;
for (auto& value : out.values) {
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/compute/exec.h
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@ struct ARROW_EXPORT ExecBatch {
return result;
}

std::string ToString() const;

ARROW_EXPORT friend void PrintTo(const ExecBatch&, std::ostream*);
};

Expand Down
Loading