Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
4c86734
init
nirandaperera Jul 20, 2021
29d4d1c
adding Grouper::Find
nirandaperera Jul 26, 2021
7499274
incomplete
nirandaperera Jul 28, 2021
1a6ae7a
mid way
nirandaperera Jul 29, 2021
5822c7b
untested
nirandaperera Jul 30, 2021
caae9db
code complete
nirandaperera Jul 30, 2021
1d6ec31
adding test case dummy
nirandaperera Jul 30, 2021
16c62cf
adding PR comments
nirandaperera Aug 3, 2021
e391dc6
adding serial test case
nirandaperera Aug 3, 2021
24990ac
passing test
nirandaperera Aug 3, 2021
50db0e2
refactoring files
nirandaperera Aug 3, 2021
c4e379e
adding right semi join test
nirandaperera Aug 4, 2021
842204e
using log instead of cout
nirandaperera Aug 4, 2021
2964cc2
minor changes
nirandaperera Aug 4, 2021
1fcbb1d
minor bug fix
nirandaperera Aug 4, 2021
614a9b9
adding empty tests
nirandaperera Aug 4, 2021
43115d3
lint changes
nirandaperera Aug 4, 2021
3fcfbb4
fixing c++/cli mutex import
nirandaperera Aug 5, 2021
bbdd30a
adding anti-joins
nirandaperera Aug 5, 2021
ac19785
attempting to solve the threading issue
nirandaperera Aug 9, 2021
c76df38
Revert "attempting to solve the threading issue"
nirandaperera Aug 9, 2021
5a89c17
ARROW-13482: [C++][Compute] Refactoring away from hard coded ExecNode…
bkietz Aug 9, 2021
0e8795a
refactoring to new API
nirandaperera Aug 10, 2021
7977837
porting tests
nirandaperera Aug 10, 2021
db5c724
extending the test cases
nirandaperera Aug 11, 2021
b250e99
Hash semi-join: debugging ThreadIndexer
michalursa Aug 17, 2021
83c7358
Merge branch 'master' into ARROW-13268-m
michalursa Aug 18, 2021
ec09202
Hash semi-join: fixing exec context for exec plan in unit tests
michalursa Aug 24, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,7 @@ if(ARROW_COMPUTE)
compute/kernels/vector_replace.cc
compute/kernels/vector_selection.cc
compute/kernels/vector_sort.cc
compute/exec/hash_join_node.cc
compute/exec/key_hash.cc
compute/exec/key_map.cc
compute/exec/key_compare.cc
Expand Down
5 changes: 5 additions & 0 deletions cpp/src/arrow/compute/api_aggregate.h
Original file line number Diff line number Diff line change
Expand Up @@ -383,6 +383,11 @@ class ARROW_EXPORT Grouper {
/// be as wide as necessary.
virtual Result<Datum> Consume(const ExecBatch& batch) = 0;

/// Finds/ queries the group IDs for the given ExecBatch for every index. Returns the
/// group IDs as an integer array. If a group ID not found, a null will be added to that
/// index. This is a thread-safe lookup.
virtual Result<Datum> Find(const ExecBatch& batch) const = 0;

/// Get current unique keys. May be called multiple times.
virtual Result<ExecBatch> GetUniques() = 0;

Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/compute/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,7 @@ add_arrow_compute_test(expression_test
subtree_test.cc)

add_arrow_compute_test(plan_test PREFIX "arrow-compute")
add_arrow_compute_test(hash_join_node_test PREFIX "arrow-compute")

add_arrow_benchmark(expression_benchmark PREFIX "arrow-compute")
add_arrow_benchmark(hash_join_node_benchmark PREFIX "arrow-compute")
34 changes: 2 additions & 32 deletions cpp/src/arrow/compute/exec/aggregate_node.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,12 @@
// specific language governing permissions and limitations
// under the License.

#include "arrow/compute/exec/exec_plan.h"

#include <mutex>
#include <thread>
#include <unordered_map>

#include "arrow/compute/exec.h"
#include "arrow/compute/exec/exec_plan.h"
#include "arrow/compute/exec/options.h"
#include "arrow/compute/exec/util.h"
#include "arrow/compute/exec_internal.h"
Expand Down Expand Up @@ -59,34 +58,6 @@ Result<FieldVector> ResolveKernels(

namespace {

class ThreadIndexer {
public:
size_t operator()() {
auto id = std::this_thread::get_id();

std::unique_lock<std::mutex> lock(mutex_);
const auto& id_index = *id_to_index_.emplace(id, id_to_index_.size()).first;

return Check(id_index.second);
}

static size_t Capacity() {
static size_t max_size = arrow::internal::ThreadPool::DefaultCapacity();
return max_size;
}

private:
size_t Check(size_t thread_index) {
DCHECK_LT(thread_index, Capacity()) << "thread index " << thread_index
<< " is out of range [0, " << Capacity() << ")";

return thread_index;
}

std::mutex mutex_;
std::unordered_map<std::thread::id, size_t> id_to_index_;
};

class ScalarAggregateNode : public ExecNode {
public:
ScalarAggregateNode(ExecPlan* plan, std::vector<ExecNode*> inputs,
Expand Down Expand Up @@ -461,8 +432,7 @@ class GroupByNode : public ExecNode {
// bail if StopProducing was called
if (finished_.is_finished()) break;

auto plan = this->plan()->shared_from_this();
RETURN_NOT_OK(executor->Spawn([plan, this, i] { OutputNthBatch(i); }));
RETURN_NOT_OK(executor->Spawn([this, i] { OutputNthBatch(i); }));
} else {
OutputNthBatch(i);
}
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/compute/exec/exec_plan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@

#include "arrow/compute/exec/exec_plan.h"

#include <iostream>
#include <mutex>
#include <thread>
#include <unordered_map>
#include <unordered_set>

Expand Down
Loading