Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
7cbafc8
Adding Bloom filter implementation
michalursa Jan 3, 2022
012175a
Add Bloom Filter Pushdown
save-buffer Jan 25, 2022
e529280
Fix after rebase
save-buffer Apr 12, 2022
3b75380
clang-format
save-buffer Apr 12, 2022
0e8a770
Standardize on int64_t, make rngs thread-local
save-buffer Apr 13, 2022
244aaf1
Delete some random unnecessary code, fix rand_int
save-buffer Apr 14, 2022
a3dfc3b
Make thread sanitizer happy
save-buffer Apr 14, 2022
df5f8cd
Make Bloom filter test multithreaded. This upsets tsan.
save-buffer Apr 14, 2022
eb8117e
Dumb bug
save-buffer Apr 20, 2022
48204fd
Remove my thread fences
save-buffer Apr 20, 2022
6839829
Rebase
save-buffer Apr 21, 2022
05b935c
Switch to int64_t again
save-buffer Apr 21, 2022
7e594b2
Switch to unique_ptr
save-buffer Apr 21, 2022
5ec4e0c
Make bloom filter blocks atomics
save-buffer Apr 21, 2022
4f5dbcd
Switch back to non-atomics, but silence tsan for these functions
save-buffer Apr 22, 2022
c7e250b
clang-format
save-buffer Apr 22, 2022
3af8ce4
clang-format
save-buffer Apr 22, 2022
21eb53a
static_cast
save-buffer Apr 23, 2022
7c58dd1
Use condition variable in test
save-buffer Apr 23, 2022
dbb773f
Make it green
save-buffer Apr 25, 2022
80301a4
Respond to Weston comments
save-buffer Apr 26, 2022
9f67171
ARROW_EXPORT
save-buffer Apr 26, 2022
7aa7458
More ARROW_EXPORT
save-buffer Apr 26, 2022
a10a6a8
Remove line
save-buffer Apr 27, 2022
9f23c88
Pray to the mighty barney that his condition variable may work
save-buffer Apr 29, 2022
54d5022
Add an underscore
save-buffer Apr 29, 2022
f1335e1
Document parameter, explicitly initialize variable in constructor
save-buffer Apr 29, 2022
9929b84
Document another thing
save-buffer Apr 29, 2022
f68ea78
Fix dumb bug
save-buffer Apr 30, 2022
75bf765
Run fewer tests
save-buffer Apr 30, 2022
1aaeb6f
Respond to michal comments
save-buffer May 5, 2022
643eeb5
Fix CV and fix TSAN BloomFilterBuilder (thanks Michal and Weston)
save-buffer May 9, 2022
b28f4d5
Fix on big endian
save-buffer May 9, 2022
b18a7d3
Run fewer tests with ASAN
save-buffer May 9, 2022
caa3193
Fix weston comments
save-buffer May 9, 2022
16e7d76
clang-format
save-buffer May 9, 2022
68c3ba6
Make windows happy again
save-buffer May 9, 2022
bdbcb01
clang-format
save-buffer May 9, 2022
36a229b
Try fixing on big endian again
save-buffer May 16, 2022
6e5bf4c
clang-format
save-buffer May 16, 2022
a3b2f93
Disable bloom filter on big endian
save-buffer May 17, 2022
1a4ae69
Hopefully fix timeout
save-buffer May 17, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,6 @@ if(ARROW_COMPUTE)
compute/exec/key_encode.cc
compute/exec/key_hash.cc
compute/exec/key_map.cc
compute/exec/options.cc
compute/exec/order_by_impl.cc
compute/exec/partition_util.cc
compute/exec/options.cc
Expand Down
37 changes: 23 additions & 14 deletions cpp/src/arrow/compute/exec/bloom_filter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -311,21 +311,22 @@ Status BloomFilterBuilder_SingleThreaded::Begin(size_t /*num_threads*/,
}

Status BloomFilterBuilder_SingleThreaded::PushNextBatch(size_t /*thread_index*/,
int num_rows,
int64_t num_rows,
const uint32_t* hashes) {
PushNextBatchImp(num_rows, hashes);
return Status::OK();
}

Status BloomFilterBuilder_SingleThreaded::PushNextBatch(size_t /*thread_index*/,
int num_rows,
int64_t num_rows,
const uint64_t* hashes) {
PushNextBatchImp(num_rows, hashes);
return Status::OK();
}

template <typename T>
void BloomFilterBuilder_SingleThreaded::PushNextBatchImp(int num_rows, const T* hashes) {
void BloomFilterBuilder_SingleThreaded::PushNextBatchImp(int64_t num_rows,
const T* hashes) {
build_target_->Insert(hardware_flags_, num_rows, hashes);
}

Expand All @@ -340,29 +341,40 @@ Status BloomFilterBuilder_Parallel::Begin(size_t num_threads, int64_t hardware_f
log_num_prtns_ = std::min(kMaxLogNumPrtns, bit_util::Log2(num_threads));

thread_local_states_.resize(num_threads);
prtn_locks_.Init(1 << log_num_prtns_);
prtn_locks_.Init(num_threads, 1 << log_num_prtns_);

RETURN_NOT_OK(build_target->CreateEmpty(num_rows, pool));

return Status::OK();
}

Status BloomFilterBuilder_Parallel::PushNextBatch(size_t thread_id, int num_rows,
Status BloomFilterBuilder_Parallel::PushNextBatch(size_t thread_id, int64_t num_rows,
const uint32_t* hashes) {
PushNextBatchImp(thread_id, num_rows, hashes);
return Status::OK();
}

Status BloomFilterBuilder_Parallel::PushNextBatch(size_t thread_id, int num_rows,
Status BloomFilterBuilder_Parallel::PushNextBatch(size_t thread_id, int64_t num_rows,
const uint64_t* hashes) {
PushNextBatchImp(thread_id, num_rows, hashes);
return Status::OK();
}

template <typename T>
void BloomFilterBuilder_Parallel::PushNextBatchImp(size_t thread_id, int num_rows,
void BloomFilterBuilder_Parallel::PushNextBatchImp(size_t thread_id, int64_t num_rows,
const T* hashes) {
int num_prtns = 1 << log_num_prtns_;
// Partition IDs are calculated using the higher bits of the block ID. This
// ensures that each block is contained entirely within a partition and prevents
// concurrent access to a block.
constexpr int kLogBlocksKeptTogether = 7;
constexpr int kPrtnIdBitOffset =
BloomFilterMasks::kLogNumMasks + 6 + kLogBlocksKeptTogether;

const int log_num_prtns_max =
std::max(0, build_target_->log_num_blocks() - kLogBlocksKeptTogether);
const int log_num_prtns_mod = std::min(log_num_prtns_, log_num_prtns_max);
int num_prtns = 1 << log_num_prtns_mod;

ThreadLocalState& local_state = thread_local_states_[thread_id];
local_state.partition_ranges.resize(num_prtns + 1);
local_state.partitioned_hashes_64.resize(num_rows);
Expand All @@ -373,13 +385,10 @@ void BloomFilterBuilder_Parallel::PushNextBatchImp(size_t thread_id, int num_row

PartitionSort::Eval(
num_rows, num_prtns, partition_ranges,
[hashes, num_prtns](int row_id) {
constexpr int kLogBlocksKeptTogether = 7;
constexpr int kPrtnIdBitOffset =
BloomFilterMasks::kLogNumMasks + 6 + kLogBlocksKeptTogether;
[=](int64_t row_id) {
return (hashes[row_id] >> (kPrtnIdBitOffset)) & (num_prtns - 1);
},
[hashes, partitioned_hashes](int row_id, int output_pos) {
[=](int64_t row_id, int output_pos) {
partitioned_hashes[output_pos] = hashes[row_id];
});

Expand All @@ -393,7 +402,7 @@ void BloomFilterBuilder_Parallel::PushNextBatchImp(size_t thread_id, int num_row
while (num_unprocessed_partitions > 0) {
int locked_prtn_id;
int locked_prtn_id_pos;
prtn_locks_.AcquirePartitionLock(num_unprocessed_partitions,
prtn_locks_.AcquirePartitionLock(thread_id, num_unprocessed_partitions,
unprocessed_partition_ids,
/*limit_retries=*/false, /*max_retries=*/-1,
&locked_prtn_id, &locked_prtn_id_pos);
Expand Down
22 changes: 12 additions & 10 deletions cpp/src/arrow/compute/exec/bloom_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,49 +261,51 @@ class ARROW_EXPORT BloomFilterBuilder {
int64_t num_rows, int64_t num_batches,
BlockedBloomFilter* build_target) = 0;
virtual int64_t num_tasks() const { return 0; }
virtual Status PushNextBatch(size_t thread_index, int num_rows,
virtual Status PushNextBatch(size_t thread_index, int64_t num_rows,
const uint32_t* hashes) = 0;
virtual Status PushNextBatch(size_t thread_index, int num_rows,
virtual Status PushNextBatch(size_t thread_index, int64_t num_rows,
const uint64_t* hashes) = 0;
virtual void CleanUp() {}
static std::unique_ptr<BloomFilterBuilder> Make(BloomFilterBuildStrategy strategy);
};

class BloomFilterBuilder_SingleThreaded : public BloomFilterBuilder {
class ARROW_EXPORT BloomFilterBuilder_SingleThreaded : public BloomFilterBuilder {
public:
Status Begin(size_t num_threads, int64_t hardware_flags, MemoryPool* pool,
int64_t num_rows, int64_t num_batches,
BlockedBloomFilter* build_target) override;

Status PushNextBatch(size_t /*thread_index*/, int num_rows,
Status PushNextBatch(size_t /*thread_index*/, int64_t num_rows,
const uint32_t* hashes) override;

Status PushNextBatch(size_t /*thread_index*/, int num_rows,
Status PushNextBatch(size_t /*thread_index*/, int64_t num_rows,
const uint64_t* hashes) override;

private:
template <typename T>
void PushNextBatchImp(int num_rows, const T* hashes);
void PushNextBatchImp(int64_t num_rows, const T* hashes);

int64_t hardware_flags_;
BlockedBloomFilter* build_target_;
};

class BloomFilterBuilder_Parallel : public BloomFilterBuilder {
class ARROW_EXPORT BloomFilterBuilder_Parallel : public BloomFilterBuilder {
public:
Status Begin(size_t num_threads, int64_t hardware_flags, MemoryPool* pool,
int64_t num_rows, int64_t num_batches,
BlockedBloomFilter* build_target) override;

Status PushNextBatch(size_t thread_id, int num_rows, const uint32_t* hashes) override;
Status PushNextBatch(size_t thread_id, int64_t num_rows,
const uint32_t* hashes) override;

Status PushNextBatch(size_t thread_id, int num_rows, const uint64_t* hashes) override;
Status PushNextBatch(size_t thread_id, int64_t num_rows,
const uint64_t* hashes) override;

void CleanUp() override;

private:
template <typename T>
void PushNextBatchImp(size_t thread_id, int num_rows, const T* hashes);
void PushNextBatchImp(size_t thread_id, int64_t num_rows, const T* hashes);

int64_t hardware_flags_;
BlockedBloomFilter* build_target_;
Expand Down
116 changes: 92 additions & 24 deletions cpp/src/arrow/compute/exec/bloom_filter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <unordered_set>
#include "arrow/compute/exec/bloom_filter.h"
#include "arrow/compute/exec/key_hash.h"
#include "arrow/compute/exec/task_util.h"
#include "arrow/compute/exec/test_util.h"
#include "arrow/compute/exec/util.h"
#include "arrow/util/bitmap_ops.h"
Expand All @@ -32,39 +33,106 @@
namespace arrow {
namespace compute {

Status BuildBloomFilter(BloomFilterBuildStrategy strategy, int64_t hardware_flags,
MemoryPool* pool, int64_t num_rows,
std::function<void(int64_t, int, uint32_t*)> get_hash32_impl,
std::function<void(int64_t, int, uint64_t*)> get_hash64_impl,
BlockedBloomFilter* target) {
constexpr int batch_size_max = 32 * 1024;
int64_t num_batches = bit_util::CeilDiv(num_rows, batch_size_max);

auto builder = BloomFilterBuilder::Make(strategy);

std::vector<uint32_t> thread_local_hashes32;
std::vector<uint64_t> thread_local_hashes64;
thread_local_hashes32.resize(batch_size_max);
thread_local_hashes64.resize(batch_size_max);

RETURN_NOT_OK(builder->Begin(/*num_threads=*/1, hardware_flags, pool, num_rows,
bit_util::CeilDiv(num_rows, batch_size_max), target));

for (int64_t i = 0; i < num_batches; ++i) {
constexpr int kBatchSizeMax = 32 * 1024;
Status BuildBloomFilter_Serial(
std::unique_ptr<BloomFilterBuilder>& builder, int64_t num_rows, int64_t num_batches,
std::function<void(int64_t, int, uint32_t*)> get_hash32_impl,
std::function<void(int64_t, int, uint64_t*)> get_hash64_impl,
BlockedBloomFilter* target) {
std::vector<uint32_t> hashes32(kBatchSizeMax);
std::vector<uint64_t> hashes64(kBatchSizeMax);
for (int64_t i = 0; i < num_batches; i++) {
size_t thread_index = 0;
int batch_size = static_cast<int>(
std::min(num_rows - i * batch_size_max, static_cast<int64_t>(batch_size_max)));
std::min(num_rows - i * kBatchSizeMax, static_cast<int64_t>(kBatchSizeMax)));
if (target->NumHashBitsUsed() > 32) {
uint64_t* hashes = thread_local_hashes64.data();
get_hash64_impl(i * batch_size_max, batch_size, hashes);
uint64_t* hashes = hashes64.data();
get_hash64_impl(i * kBatchSizeMax, batch_size, hashes);
RETURN_NOT_OK(builder->PushNextBatch(thread_index, batch_size, hashes));
} else {
uint32_t* hashes = thread_local_hashes32.data();
get_hash32_impl(i * batch_size_max, batch_size, hashes);
uint32_t* hashes = hashes32.data();
get_hash32_impl(i * kBatchSizeMax, batch_size, hashes);
RETURN_NOT_OK(builder->PushNextBatch(thread_index, batch_size, hashes));
}
}
return Status::OK();
}

Status BuildBloomFilter_Parallel(
std::unique_ptr<BloomFilterBuilder>& builder, size_t num_threads, int64_t num_rows,
int64_t num_batches, std::function<void(int64_t, int, uint32_t*)> get_hash32_impl,
std::function<void(int64_t, int, uint64_t*)> get_hash64_impl,
BlockedBloomFilter* target) {
ThreadIndexer thread_indexer;
std::unique_ptr<TaskScheduler> scheduler = TaskScheduler::Make();
std::vector<std::vector<uint32_t>> thread_local_hashes32(num_threads);
std::vector<std::vector<uint64_t>> thread_local_hashes64(num_threads);
for (std::vector<uint32_t>& h : thread_local_hashes32) h.resize(kBatchSizeMax);
for (std::vector<uint64_t>& h : thread_local_hashes64) h.resize(kBatchSizeMax);

std::condition_variable cv;
std::mutex mutex;
auto group = scheduler->RegisterTaskGroup(
[&](size_t thread_index, int64_t task_id) -> Status {
int batch_size = static_cast<int>(std::min(num_rows - task_id * kBatchSizeMax,
static_cast<int64_t>(kBatchSizeMax)));
if (target->NumHashBitsUsed() > 32) {
uint64_t* hashes = thread_local_hashes64[thread_index].data();
get_hash64_impl(task_id * kBatchSizeMax, batch_size, hashes);
RETURN_NOT_OK(builder->PushNextBatch(thread_index, batch_size, hashes));
} else {
uint32_t* hashes = thread_local_hashes32[thread_index].data();
get_hash32_impl(task_id * kBatchSizeMax, batch_size, hashes);
RETURN_NOT_OK(builder->PushNextBatch(thread_index, batch_size, hashes));
}
return Status::OK();
},
[&](size_t thread_index) -> Status {
std::unique_lock<std::mutex> lk(mutex);
cv.notify_all();
return Status::OK();
});
scheduler->RegisterEnd();
auto tp = arrow::internal::GetCpuThreadPool();
RETURN_NOT_OK(scheduler->StartScheduling(
0,
[&](std::function<Status(size_t)> func) -> Status {
return tp->Spawn([&, func]() {
size_t tid = thread_indexer();
ARROW_DCHECK_OK(func(tid));
});
},
static_cast<int>(num_threads), false));
{
std::unique_lock<std::mutex> lk(mutex);
RETURN_NOT_OK(scheduler->StartTaskGroup(0, group, num_batches));
cv.wait(lk);
}
return Status::OK();
}

Status BuildBloomFilter(BloomFilterBuildStrategy strategy, int64_t hardware_flags,
MemoryPool* pool, int64_t num_rows,
std::function<void(int64_t, int, uint32_t*)> get_hash32_impl,
std::function<void(int64_t, int, uint64_t*)> get_hash64_impl,
BlockedBloomFilter* target) {
int64_t num_batches = bit_util::CeilDiv(num_rows, kBatchSizeMax);

bool is_serial = strategy == BloomFilterBuildStrategy::SINGLE_THREADED;
auto builder = BloomFilterBuilder::Make(strategy);

size_t num_threads = is_serial ? 1 : arrow::GetCpuThreadPoolCapacity();
RETURN_NOT_OK(builder->Begin(num_threads, hardware_flags, pool, num_rows,
bit_util::CeilDiv(num_rows, kBatchSizeMax), target));

if (is_serial)
RETURN_NOT_OK(BuildBloomFilter_Serial(builder, num_rows, num_batches,
std::move(get_hash32_impl),
std::move(get_hash64_impl), target));
else
RETURN_NOT_OK(BuildBloomFilter_Parallel(builder, num_threads, num_rows, num_batches,
std::move(get_hash32_impl),
std::move(get_hash64_impl), target));
builder->CleanUp();

return Status::OK();
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/compute/exec/exec_plan.cc
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,9 @@ struct ExecPlanImpl : public ExecPlan {

// producers precede consumers
sorted_nodes_ = TopoSort();
for (ExecNode* node : sorted_nodes_) {
RETURN_NOT_OK(node->PrepareToProduce());
}

std::vector<Future<>> futures;

Expand Down
10 changes: 10 additions & 0 deletions cpp/src/arrow/compute/exec/exec_plan.h
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,16 @@ class ARROW_EXPORT ExecNode {
// A node with multiple outputs will also need to ensure it is applying backpressure if
// any of its outputs is asking to pause

/// \brief Perform any needed initialization
///
/// This hook performs any actions in between creation of ExecPlan and the call to
/// StartProducing. An example could be Bloom filter pushdown. The order of ExecNodes
/// that executes this method is undefined, but the calls are made synchronously.
///
/// At this point a node can rely on all inputs & outputs (and the input schemas)
/// being well defined.
virtual Status PrepareToProduce() { return Status::OK(); }

/// \brief Start producing
///
/// This must only be called once. If this fails, then other lifecycle
Expand Down
Loading