From 24c40cdc05476c5661fb376d4b2187be40bdd473 Mon Sep 17 00:00:00 2001 From: Sasha Krassovsky Date: Fri, 5 Nov 2021 22:28:01 -0700 Subject: [PATCH 01/16] Add benchmarking for hash join --- cpp/src/arrow/compute/exec/CMakeLists.txt | 1 + .../arrow/compute/exec/hash_join_benchmark.cc | 159 ++++++++++++++++++ 2 files changed, 160 insertions(+) create mode 100644 cpp/src/arrow/compute/exec/hash_join_benchmark.cc diff --git a/cpp/src/arrow/compute/exec/CMakeLists.txt b/cpp/src/arrow/compute/exec/CMakeLists.txt index 79ffd67b8fd..d055d42f7e1 100644 --- a/cpp/src/arrow/compute/exec/CMakeLists.txt +++ b/cpp/src/arrow/compute/exec/CMakeLists.txt @@ -31,6 +31,7 @@ add_arrow_compute_test(union_node_test PREFIX "arrow-compute") add_arrow_compute_test(util_test PREFIX "arrow-compute") add_arrow_benchmark(expression_benchmark PREFIX "arrow-compute") +add_arrow_benchmark(hash_join_benchmark PREFIX "arrow-compute") add_arrow_compute_test(ir_test PREFIX diff --git a/cpp/src/arrow/compute/exec/hash_join_benchmark.cc b/cpp/src/arrow/compute/exec/hash_join_benchmark.cc new file mode 100644 index 00000000000..185c426b174 --- /dev/null +++ b/cpp/src/arrow/compute/exec/hash_join_benchmark.cc @@ -0,0 +1,159 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "benchmark/benchmark.h" +#include "arrow/testing/gtest_util.h" + +#include "arrow/api.h" +#include "arrow/compute/exec/options.h" +#include "arrow/compute/exec/hash_join.h" +#include "arrow/compute/exec/test_util.h" +#include "arrow/compute/exec/util.h" +#include "arrow/compute/kernels/row_encoder.h" +#include "arrow/compute/kernels/test_util.h" +#include "arrow/testing/gtest_util.h" +#include "arrow/testing/matchers.h" +#include "arrow/testing/random.h" +#include "arrow/util/checked_cast.h" +#include "arrow/util/make_unique.h" +#include "arrow/util/pcg_random.h" +#include "arrow/util/thread_pool.h" + +#include +#include +#include + +#include + +namespace arrow +{ +namespace compute +{ + + class JoinImplFixture : public benchmark::Fixture + { + public: + JoinImplFixture() + { + printf("Constructor\n"); + } + + void SetUp(const benchmark::State &state) + { + bool parallel = false; + JoinType join_type = JoinType::INNER; + int num_batches = 10; + int batch_size = 1024; + + auto l_schema = schema({field("l1", int32())}); + auto r_schema = schema({field("r1", int32())}); + + l_batches_ = MakeRandomBatches(l_schema, num_batches, batch_size); + r_batches_ = MakeRandomBatches(r_schema, num_batches, batch_size); + + std::vector left_keys{"l1"}; + std::vector right_keys{"r1"}; + + ctx_ = arrow::internal::make_unique( + default_memory_pool(), parallel ? arrow::internal::GetCpuThreadPool() : nullptr); + + schema_mgr_ = arrow::internal::make_unique(); + DCHECK_OK(schema_mgr_->Init( + join_type, + *l_batches_.schema, + left_keys, + *r_batches_.schema, + right_keys, + "l_", + "r_")); + + join_ = *HashJoinImpl::MakeBasic(); + DCHECK_OK(join_->Init( + ctx_.get(), join_type, true, 1, + schema_mgr_.get(), {JoinKeyCmp::EQ}, + [](ExecBatch) {}, + [](int64_t) {}, + [this](std::function func) -> Status + { + auto executor = this->ctx_->executor(); + if (executor) + { + RETURN_NOT_OK(executor->Spawn([this, func] + { + size_t thread_index = thread_indexer_(); + Status status = func(thread_index); + if (!status.ok()) + { + ARROW_DCHECK(false); + return; + } + })); + } + else + { + // We should not get here in serial execution mode + ARROW_DCHECK(false); + } + return Status::OK(); + })); + } + + void TearDown(const benchmark::State &state) + { + } + + void RunJoin() + { + for(auto batch : r_batches_.batches) + DCHECK_OK(join_->InputReceived(0, 1, batch)); + DCHECK_OK(join_->InputFinished(0, 1)); + + for(auto batch : r_batches_.batches) + DCHECK_OK(join_->InputReceived(0, 0, batch)); + DCHECK_OK(join_->InputFinished(0, 0)); + } + + ~JoinImplFixture() + { + printf("Destructor\n"); + } + ThreadIndexer thread_indexer_; + BatchesWithSchema l_batches_; + BatchesWithSchema r_batches_; + std::unique_ptr schema_mgr_; + std::unique_ptr join_; + std::unique_ptr ctx_; + }; + + BENCHMARK_F(JoinImplFixture, Basic)(benchmark::State &st) + { + for(auto _ : st) + { + RunJoin(); + } + } + + BENCHMARK_F(JoinImplFixture, Basic2)(benchmark::State &st) + { + for(auto _ : st) + { + RunJoin(); + } + } + +} // namespace compute +} // namespace arrow From e1706c1c67e37af9c900b68fcc831a00118bb567 Mon Sep 17 00:00:00 2001 From: Sasha Krassovsky Date: Mon, 15 Nov 2021 19:06:44 -0800 Subject: [PATCH 02/16] Parameterize the fixture, including making it multithreaded with OMP --- cpp/src/arrow/compute/exec/CMakeLists.txt | 4 +- .../arrow/compute/exec/hash_join_benchmark.cc | 218 ++++++++++++------ 2 files changed, 151 insertions(+), 71 deletions(-) diff --git a/cpp/src/arrow/compute/exec/CMakeLists.txt b/cpp/src/arrow/compute/exec/CMakeLists.txt index d055d42f7e1..738655bbe6e 100644 --- a/cpp/src/arrow/compute/exec/CMakeLists.txt +++ b/cpp/src/arrow/compute/exec/CMakeLists.txt @@ -31,7 +31,9 @@ add_arrow_compute_test(union_node_test PREFIX "arrow-compute") add_arrow_compute_test(util_test PREFIX "arrow-compute") add_arrow_benchmark(expression_benchmark PREFIX "arrow-compute") -add_arrow_benchmark(hash_join_benchmark PREFIX "arrow-compute") + +find_package(OpenMP) +add_arrow_benchmark(hash_join_benchmark PREFIX "arrow-compute" EXTRA_LINK_LIBS OpenMP::OpenMP_CXX) add_arrow_compute_test(ir_test PREFIX diff --git a/cpp/src/arrow/compute/exec/hash_join_benchmark.cc b/cpp/src/arrow/compute/exec/hash_join_benchmark.cc index 185c426b174..65da3db39c0 100644 --- a/cpp/src/arrow/compute/exec/hash_join_benchmark.cc +++ b/cpp/src/arrow/compute/exec/hash_join_benchmark.cc @@ -37,43 +37,71 @@ #include #include -#include +#include namespace arrow { namespace compute { + struct BenchmarkSettings + { + int num_threads = 1; + JoinType join_type = JoinType::INNER; + double build_to_probe_proportion = 0.1; + int batch_size = 1024; + int num_build_batches = 10; + std::vector> key_types; + std::vector> build_payload_types; + std::vector> probe_payload_types; + }; - class JoinImplFixture : public benchmark::Fixture + class JoinBenchmark { public: - JoinImplFixture() + JoinBenchmark(BenchmarkSettings &settings) { - printf("Constructor\n"); - } - - void SetUp(const benchmark::State &state) - { - bool parallel = false; - JoinType join_type = JoinType::INNER; - int num_batches = 10; - int batch_size = 1024; - - auto l_schema = schema({field("l1", int32())}); - auto r_schema = schema({field("r1", int32())}); - - l_batches_ = MakeRandomBatches(l_schema, num_batches, batch_size); - r_batches_ = MakeRandomBatches(r_schema, num_batches, batch_size); - - std::vector left_keys{"l1"}; - std::vector right_keys{"r1"}; + bool is_parallel = settings.num_threads != 1; + + SchemaBuilder l_schema_builder, r_schema_builder; + std::vector left_keys, right_keys; + for(size_t i = 0; i < settings.key_types.size(); i++) + { + std::string l_name = "lk" + std::to_string(i); + std::string r_name = "rk" + std::to_string(i); + DCHECK_OK(l_schema_builder.AddField(field(l_name, settings.key_types[i]))); + DCHECK_OK(r_schema_builder.AddField(field(r_name, settings.key_types[i]))); + + left_keys.push_back(FieldRef(l_name)); + right_keys.push_back(FieldRef(r_name)); + } + + for(size_t i = 0; i < settings.build_payload_types.size(); i++) + { + std::string name = "lp" + std::to_string(i); + DCHECK_OK(l_schema_builder.AddField(field(name, settings.probe_payload_types[i]))); + } + + for(size_t i = 0; i < settings.build_payload_types.size(); i++) + { + std::string name = "rp" + std::to_string(i); + DCHECK_OK(r_schema_builder.AddField(field(name, settings.build_payload_types[i]))); + } + + auto l_schema = *l_schema_builder.Finish(); + auto r_schema = *r_schema_builder.Finish(); + + int num_probe_batches = static_cast(settings.num_build_batches / settings.build_to_probe_proportion); + l_batches_ = MakeRandomBatches(l_schema, num_probe_batches, settings.batch_size); + r_batches_ = MakeRandomBatches(r_schema, settings.num_build_batches, settings.batch_size); + + stats_.num_probe_rows = num_probe_batches * settings.batch_size; ctx_ = arrow::internal::make_unique( - default_memory_pool(), parallel ? arrow::internal::GetCpuThreadPool() : nullptr); + default_memory_pool(), is_parallel ? arrow::internal::GetCpuThreadPool() : nullptr); schema_mgr_ = arrow::internal::make_unique(); DCHECK_OK(schema_mgr_->Init( - join_type, + settings.join_type, *l_batches_.schema, left_keys, *r_batches_.schema, @@ -82,78 +110,128 @@ namespace compute "r_")); join_ = *HashJoinImpl::MakeBasic(); + + omp_set_num_threads(settings.num_threads); + auto schedule_callback = [](std::function func) -> Status + { + #pragma omp task + { DCHECK_OK(func(omp_get_thread_num())); } + return Status::OK(); + }; + + DCHECK_OK(join_->Init( - ctx_.get(), join_type, true, 1, + ctx_.get(), settings.join_type, !is_parallel /* use_sync_execution*/, settings.num_threads, schema_mgr_.get(), {JoinKeyCmp::EQ}, [](ExecBatch) {}, - [](int64_t) {}, - [this](std::function func) -> Status - { - auto executor = this->ctx_->executor(); - if (executor) - { - RETURN_NOT_OK(executor->Spawn([this, func] - { - size_t thread_index = thread_indexer_(); - Status status = func(thread_index); - if (!status.ok()) - { - ARROW_DCHECK(false); - return; - } - })); - } - else - { - // We should not get here in serial execution mode - ARROW_DCHECK(false); - } - return Status::OK(); - })); - } - - void TearDown(const benchmark::State &state) - { + [](int64_t x) {}, + schedule_callback)); + } void RunJoin() { - for(auto batch : r_batches_.batches) - DCHECK_OK(join_->InputReceived(0, 1, batch)); - DCHECK_OK(join_->InputFinished(0, 1)); - - for(auto batch : r_batches_.batches) - DCHECK_OK(join_->InputReceived(0, 0, batch)); - DCHECK_OK(join_->InputFinished(0, 0)); + double nanos = 0; + #pragma omp parallel reduction(+:nanos) + { + auto start = std::chrono::high_resolution_clock::now(); + int tid = omp_get_thread_num(); + #pragma omp for nowait + for(auto batch : r_batches_.batches) + DCHECK_OK(join_->InputReceived(tid, 1 /* side */, batch)); + #pragma omp for nowait + for(auto batch : l_batches_.batches) + DCHECK_OK(join_->InputReceived(tid, 0 /* side */, batch)); + + #pragma omp barrier + + #pragma omp single nowait + { DCHECK_OK(join_->InputFinished(tid, /* side */ 1)); } + + #pragma omp single nowait + { DCHECK_OK(join_->InputFinished(tid, /* side */ 0)); } + std::chrono::duration elapsed = std::chrono::high_resolution_clock::now() - start; + nanos += elapsed.count(); + } + stats_.total_nanoseconds = nanos; } - ~JoinImplFixture() - { - printf("Destructor\n"); - } ThreadIndexer thread_indexer_; BatchesWithSchema l_batches_; BatchesWithSchema r_batches_; std::unique_ptr schema_mgr_; std::unique_ptr join_; std::unique_ptr ctx_; + + struct + { + double total_nanoseconds; + uint64_t num_probe_rows; + } stats_; }; - BENCHMARK_F(JoinImplFixture, Basic)(benchmark::State &st) + static void HashJoinBasicBenchmarkImpl(benchmark::State &st, BenchmarkSettings &settings) { + JoinBenchmark bm(settings); + double total_nanos = 0; + uint64_t total_rows = 0; for(auto _ : st) { - RunJoin(); + bm.RunJoin(); + total_nanos += bm.stats_.total_nanoseconds; + total_rows += bm.stats_.num_probe_rows; } + st.counters["ns/row"] = total_nanos / total_rows; } - BENCHMARK_F(JoinImplFixture, Basic2)(benchmark::State &st) + static void BM_HashJoinBasic_Threads(benchmark::State &st) { - for(auto _ : st) - { - RunJoin(); - } + BenchmarkSettings settings; + settings.num_threads = static_cast(st.range(0)); + settings.join_type = JoinType::INNER; + settings.build_to_probe_proportion = 0.1; + settings.batch_size = 1024; + settings.num_build_batches = 32; + settings.key_types = { int32() }; + settings.build_payload_types = {}; + settings.probe_payload_types = {}; + + HashJoinBasicBenchmarkImpl(st, settings); + } + + static void BM_HashJoinBasic_RelativeBuildProbe(benchmark::State &st) + { + BenchmarkSettings settings; + settings.num_threads = 1; + settings.join_type = JoinType::INNER; + settings.build_to_probe_proportion = static_cast(st.range(0)) / 100.0; + settings.batch_size = 1024; + settings.num_build_batches = 32; + settings.key_types = { int32() }; + settings.build_payload_types = {}; + settings.probe_payload_types = {}; + + HashJoinBasicBenchmarkImpl(st, settings); } + static void BM_HashJoinBasic_NumKeyColumns(benchmark::State &st) + { + BenchmarkSettings settings; + settings.num_threads = 1; + settings.join_type = JoinType::INNER; + settings.build_to_probe_proportion = 0.1; + settings.batch_size = 1024; + settings.num_build_batches = 32; + for(int i = 0; i < st.range(0); i++) + settings.key_types.push_back(int32()); + settings.build_payload_types = {}; + settings.probe_payload_types = {}; + + HashJoinBasicBenchmarkImpl(st, settings); + } + + BENCHMARK(BM_HashJoinBasic_Threads)->ArgNames({"Threads"})->DenseRange(1, 16); + BENCHMARK(BM_HashJoinBasic_RelativeBuildProbe)->ArgNames({"RelativeBuildProbePercentage"})->DenseRange(1, 200, 20); + BENCHMARK(BM_HashJoinBasic_NumKeyColumns)->ArgNames({"NumKeyColumns"})->RangeMultiplier(2)->Range(1, 32); } // namespace compute } // namespace arrow From cd3c6cad2497e99b48f26185a12da0ab2361267a Mon Sep 17 00:00:00 2001 From: Sasha Krassovsky Date: Thu, 25 Nov 2021 00:38:59 -0800 Subject: [PATCH 03/16] Add some more benchmarks --- .../arrow/compute/exec/hash_join_benchmark.cc | 65 +++++++++++-------- 1 file changed, 38 insertions(+), 27 deletions(-) diff --git a/cpp/src/arrow/compute/exec/hash_join_benchmark.cc b/cpp/src/arrow/compute/exec/hash_join_benchmark.cc index 65da3db39c0..1a826b2cb09 100644 --- a/cpp/src/arrow/compute/exec/hash_join_benchmark.cc +++ b/cpp/src/arrow/compute/exec/hash_join_benchmark.cc @@ -49,10 +49,13 @@ namespace compute JoinType join_type = JoinType::INNER; double build_to_probe_proportion = 0.1; int batch_size = 1024; - int num_build_batches = 10; - std::vector> key_types; - std::vector> build_payload_types; - std::vector> probe_payload_types; + int num_build_batches = 32; + std::vector> key_types = { int32() }; + std::vector> build_payload_types = {}; + std::vector> probe_payload_types = {}; + + double null_percentage = 0.0; + double build_cardinality = 0.6; }; class JoinBenchmark @@ -68,8 +71,19 @@ namespace compute { std::string l_name = "lk" + std::to_string(i); std::string r_name = "rk" + std::to_string(i); - DCHECK_OK(l_schema_builder.AddField(field(l_name, settings.key_types[i]))); - DCHECK_OK(r_schema_builder.AddField(field(r_name, settings.key_types[i]))); + + uint64_t num_build_rows = settings.num_build_batches * settings.batch_size; + uint64_t max_int_value = static_cast(num_build_rows * settings.build_cardinality); + + std::unordered_map metadata; + metadata["null_probability"] = std::to_string(settings.null_percentage); + metadata["min"] = "0"; + metadata["max"] = std::to_string(max_int_value); + auto l_field = field(l_name, settings.key_types[i], key_value_metadata(metadata)); + auto r_field = field(r_name, settings.key_types[i], key_value_metadata(metadata)); + + DCHECK_OK(l_schema_builder.AddField(std::move(l_field))); + DCHECK_OK(r_schema_builder.AddField(std::move(r_field))); left_keys.push_back(FieldRef(l_name)); right_keys.push_back(FieldRef(r_name)); @@ -188,13 +202,6 @@ namespace compute { BenchmarkSettings settings; settings.num_threads = static_cast(st.range(0)); - settings.join_type = JoinType::INNER; - settings.build_to_probe_proportion = 0.1; - settings.batch_size = 1024; - settings.num_build_batches = 32; - settings.key_types = { int32() }; - settings.build_payload_types = {}; - settings.probe_payload_types = {}; HashJoinBasicBenchmarkImpl(st, settings); } @@ -202,14 +209,7 @@ namespace compute static void BM_HashJoinBasic_RelativeBuildProbe(benchmark::State &st) { BenchmarkSettings settings; - settings.num_threads = 1; - settings.join_type = JoinType::INNER; settings.build_to_probe_proportion = static_cast(st.range(0)) / 100.0; - settings.batch_size = 1024; - settings.num_build_batches = 32; - settings.key_types = { int32() }; - settings.build_payload_types = {}; - settings.probe_payload_types = {}; HashJoinBasicBenchmarkImpl(st, settings); } @@ -217,21 +217,32 @@ namespace compute static void BM_HashJoinBasic_NumKeyColumns(benchmark::State &st) { BenchmarkSettings settings; - settings.num_threads = 1; - settings.join_type = JoinType::INNER; - settings.build_to_probe_proportion = 0.1; - settings.batch_size = 1024; - settings.num_build_batches = 32; for(int i = 0; i < st.range(0); i++) settings.key_types.push_back(int32()); - settings.build_payload_types = {}; - settings.probe_payload_types = {}; + + HashJoinBasicBenchmarkImpl(st, settings); + } + + static void BM_HashJoinBasic_NullPercentage(benchmark::State &st) + { + BenchmarkSettings settings; + settings.null_percentage = static_cast(st.range(0)) / 100.0; HashJoinBasicBenchmarkImpl(st, settings); } + static void BM_HashJoinBasic_BuildCardinality(benchmark::State &st) + { + BenchmarkSettings settings; + settings.build_cardinality = static_cast(st.range(0)) / 100.0; + + HashJoinBasicBenchmarkImpl(st, settings); + } + BENCHMARK(BM_HashJoinBasic_Threads)->ArgNames({"Threads"})->DenseRange(1, 16); BENCHMARK(BM_HashJoinBasic_RelativeBuildProbe)->ArgNames({"RelativeBuildProbePercentage"})->DenseRange(1, 200, 20); BENCHMARK(BM_HashJoinBasic_NumKeyColumns)->ArgNames({"NumKeyColumns"})->RangeMultiplier(2)->Range(1, 32); + BENCHMARK(BM_HashJoinBasic_NullPercentage)->ArgNames({"NullPercentage"})->DenseRange(0, 100, 10); + BENCHMARK(BM_HashJoinBasic_BuildCardinality)->ArgNames({"BuildCardinality"})->DenseRange(10, 100, 10); } // namespace compute } // namespace arrow From f3c1fdc13b54e482f93281ca7a89f74b1eb40c6c Mon Sep 17 00:00:00 2001 From: Sasha Krassovsky Date: Wed, 1 Dec 2021 15:58:44 -0800 Subject: [PATCH 04/16] Add cardinality and selectivity --- .../arrow/compute/exec/hash_join_benchmark.cc | 92 +++++++++++++++---- 1 file changed, 76 insertions(+), 16 deletions(-) diff --git a/cpp/src/arrow/compute/exec/hash_join_benchmark.cc b/cpp/src/arrow/compute/exec/hash_join_benchmark.cc index 1a826b2cb09..d76158db753 100644 --- a/cpp/src/arrow/compute/exec/hash_join_benchmark.cc +++ b/cpp/src/arrow/compute/exec/hash_join_benchmark.cc @@ -47,15 +47,16 @@ namespace compute { int num_threads = 1; JoinType join_type = JoinType::INNER; - double build_to_probe_proportion = 0.1; int batch_size = 1024; int num_build_batches = 32; + int num_probe_batches = 32 * 16; std::vector> key_types = { int32() }; std::vector> build_payload_types = {}; std::vector> probe_payload_types = {}; double null_percentage = 0.0; - double build_cardinality = 0.6; + double cardinality = 1.0; // Proportion of distinct keys in build side + double selectivity = 1.0; // Probability of a match for a given row }; class JoinBenchmark @@ -72,15 +73,28 @@ namespace compute std::string l_name = "lk" + std::to_string(i); std::string r_name = "rk" + std::to_string(i); + // For integers, selectivity is the proportion of the build interval that overlaps with + // the probe interval uint64_t num_build_rows = settings.num_build_batches * settings.batch_size; - uint64_t max_int_value = static_cast(num_build_rows * settings.build_cardinality); - std::unordered_map metadata; - metadata["null_probability"] = std::to_string(settings.null_percentage); - metadata["min"] = "0"; - metadata["max"] = std::to_string(max_int_value); - auto l_field = field(l_name, settings.key_types[i], key_value_metadata(metadata)); - auto r_field = field(r_name, settings.key_types[i], key_value_metadata(metadata)); + uint64_t min_build_value = 0; + uint64_t max_build_value = static_cast(num_build_rows * settings.cardinality); + + uint64_t min_probe_value = static_cast((1.0 - settings.selectivity) * max_build_value); + uint64_t max_probe_value = 2 * max_build_value - min_probe_value; + + std::unordered_map build_metadata; + build_metadata["null_probability"] = std::to_string(settings.null_percentage); + build_metadata["min"] = std::to_string(min_build_value); + build_metadata["max"] = std::to_string(max_build_value); + + std::unordered_map probe_metadata; + probe_metadata["null_probability"] = std::to_string(settings.null_percentage); + probe_metadata["min"] = std::to_string(min_probe_value); + probe_metadata["max"] = std::to_string(max_probe_value); + + auto l_field = field(l_name, settings.key_types[i], key_value_metadata(probe_metadata)); + auto r_field = field(r_name, settings.key_types[i], key_value_metadata(build_metadata)); DCHECK_OK(l_schema_builder.AddField(std::move(l_field))); DCHECK_OK(r_schema_builder.AddField(std::move(r_field))); @@ -104,11 +118,10 @@ namespace compute auto l_schema = *l_schema_builder.Finish(); auto r_schema = *r_schema_builder.Finish(); - int num_probe_batches = static_cast(settings.num_build_batches / settings.build_to_probe_proportion); - l_batches_ = MakeRandomBatches(l_schema, num_probe_batches, settings.batch_size); + l_batches_ = MakeRandomBatches(l_schema, settings.num_probe_batches, settings.batch_size); r_batches_ = MakeRandomBatches(r_schema, settings.num_build_batches, settings.batch_size); - stats_.num_probe_rows = num_probe_batches * settings.batch_size; + stats_.num_probe_rows = settings.num_probe_batches * settings.batch_size; ctx_ = arrow::internal::make_unique( default_memory_pool(), is_parallel ? arrow::internal::GetCpuThreadPool() : nullptr); @@ -196,6 +209,38 @@ namespace compute total_rows += bm.stats_.num_probe_rows; } st.counters["ns/row"] = total_nanos / total_rows; + st.counters["HashTable krows"] = settings.num_build_batches; + } + + static void BM_HashJoinBasic_HashTableSize(benchmark::State &st) + { + BenchmarkSettings settings; + settings.num_build_batches = static_cast(st.range(0)); + settings.num_probe_batches = settings.num_build_batches; + + HashJoinBasicBenchmarkImpl(st, settings); + } + + template + static void BM_HashJoinBasic_KeyTypes(benchmark::State &st, std::vector> key_types, Args&& ...) + { + BenchmarkSettings settings; + settings.num_build_batches = static_cast(st.range(0)); + settings.num_probe_batches = settings.num_probe_batches; + settings.key_types = key_types; + + HashJoinBasicBenchmarkImpl(st, settings); + } + + static void BM_HashJoinBasic_Selectivity(benchmark::State &st) + { + BenchmarkSettings settings; + settings.selectivity = static_cast(st.range(0)) + + settings.num_build_batches = static_cast(st.range(1)); + settings.num_probe_batches = settings.num_probe_batches; + + HashJoinBasicBenchmarkImpl(st, settings); } static void BM_HashJoinBasic_Threads(benchmark::State &st) @@ -209,7 +254,8 @@ namespace compute static void BM_HashJoinBasic_RelativeBuildProbe(benchmark::State &st) { BenchmarkSettings settings; - settings.build_to_probe_proportion = static_cast(st.range(0)) / 100.0; + double proportion = static_cast(st.range(0)) / 100.0; + settings.num_probe_batches = static_cast(settings.num_build_batches / proportion); HashJoinBasicBenchmarkImpl(st, settings); } @@ -231,18 +277,32 @@ namespace compute HashJoinBasicBenchmarkImpl(st, settings); } - static void BM_HashJoinBasic_BuildCardinality(benchmark::State &st) + static void BM_HashJoinBasic_Cardinality(benchmark::State &st) { BenchmarkSettings settings; - settings.build_cardinality = static_cast(st.range(0)) / 100.0; + settings.cardinality = static_cast(st.range(0)) / 100.0; HashJoinBasicBenchmarkImpl(st, settings); } + BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{int32}", {int32()})->RangeMultiplier(4)->Range(1, 4 * 1024); + BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{int64}", {int64()})->RangeMultiplier(4)->Range(1, 4 * 1024); + BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{int64,int64}", {int64(), int64()})->RangeMultiplier(4)->Range(1, 4 * 1024); + BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{int64,int64,int64}", {int64(), int64(), int64()})->RangeMultiplier(4)->Range(1, 4 * 1024); + BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{int64,int64,int64,int64}", {int64(), int64(), int64(), int64()})->RangeMultiplier(4)->Range(1, 4 * 1024); + BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{utf8}", {utf8()})->RangeMultiplier(4)->Range(1, 256); + BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{fixed_size_binary(4)}", {fixed_size_binary(4)})->RangeMultiplier(4)->Range(1, 4 * 1024); + BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{fixed_size_binary(8)}", {fixed_size_binary(8)})->RangeMultiplier(4)->Range(1, 4 * 1024); + BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{fixed_size_binary(16)}", {fixed_size_binary(16)})->RangeMultiplier(4)->Range(1, 4 * 1024); + BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{fixed_size_binary(24)}", {fixed_size_binary(24)})->RangeMultiplier(4)->Range(1, 4 * 1024); + BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{fixed_size_binary(32)}", {fixed_size_binary(32)})->RangeMultiplier(4)->Range(1, 4 * 1024); + + BENCHMARK(BM_HashJoinBasic_Selectivity)->ArgNames({"Selectivity", "HashTable krows"})->ArgsProduct( + { benchmark::CreateRange(1, 4 * 1024, 2), benchmark::CreateDenseRange(0, 100, 0.0)}); BENCHMARK(BM_HashJoinBasic_Threads)->ArgNames({"Threads"})->DenseRange(1, 16); BENCHMARK(BM_HashJoinBasic_RelativeBuildProbe)->ArgNames({"RelativeBuildProbePercentage"})->DenseRange(1, 200, 20); BENCHMARK(BM_HashJoinBasic_NumKeyColumns)->ArgNames({"NumKeyColumns"})->RangeMultiplier(2)->Range(1, 32); BENCHMARK(BM_HashJoinBasic_NullPercentage)->ArgNames({"NullPercentage"})->DenseRange(0, 100, 10); - BENCHMARK(BM_HashJoinBasic_BuildCardinality)->ArgNames({"BuildCardinality"})->DenseRange(10, 100, 10); + BENCHMARK(BM_HashJoinBasic_Cardinality)->ArgNames({"Cardinality"})->DenseRange(10, 100, 10); } // namespace compute } // namespace arrow From 59e8cd996d255028a24c88b507091883dba61967 Mon Sep 17 00:00:00 2001 From: Sasha Krassovsky Date: Mon, 6 Dec 2021 19:32:05 -0800 Subject: [PATCH 05/16] Add python script --- .../arrow/compute/exec/hash_join_benchmark.cc | 139 ++++++++++++------ .../arrow/compute/exec/hash_join_graphs.py | 128 ++++++++++++++++ 2 files changed, 226 insertions(+), 41 deletions(-) create mode 100755 cpp/src/arrow/compute/exec/hash_join_graphs.py diff --git a/cpp/src/arrow/compute/exec/hash_join_benchmark.cc b/cpp/src/arrow/compute/exec/hash_join_benchmark.cc index d76158db753..52a5b327cea 100644 --- a/cpp/src/arrow/compute/exec/hash_join_benchmark.cc +++ b/cpp/src/arrow/compute/exec/hash_join_benchmark.cc @@ -81,7 +81,7 @@ namespace compute uint64_t max_build_value = static_cast(num_build_rows * settings.cardinality); uint64_t min_probe_value = static_cast((1.0 - settings.selectivity) * max_build_value); - uint64_t max_probe_value = 2 * max_build_value - min_probe_value; + uint64_t max_probe_value = min_probe_value + max_build_value; std::unordered_map build_metadata; build_metadata["null_probability"] = std::to_string(settings.null_percentage); @@ -127,12 +127,14 @@ namespace compute default_memory_pool(), is_parallel ? arrow::internal::GetCpuThreadPool() : nullptr); schema_mgr_ = arrow::internal::make_unique(); + Expression filter = literal(true); DCHECK_OK(schema_mgr_->Init( settings.join_type, *l_batches_.schema, left_keys, *r_batches_.schema, right_keys, + filter, "l_", "r_")); @@ -149,7 +151,7 @@ namespace compute DCHECK_OK(join_->Init( ctx_.get(), settings.join_type, !is_parallel /* use_sync_execution*/, settings.num_threads, - schema_mgr_.get(), {JoinKeyCmp::EQ}, + schema_mgr_.get(), {JoinKeyCmp::EQ}, std::move(filter), [](ExecBatch) {}, [](int64_t x) {}, schedule_callback)); @@ -209,100 +211,155 @@ namespace compute total_rows += bm.stats_.num_probe_rows; } st.counters["ns/row"] = total_nanos / total_rows; - st.counters["HashTable krows"] = settings.num_build_batches; } - static void BM_HashJoinBasic_HashTableSize(benchmark::State &st) + template + static void BM_HashJoinBasic_KeyTypes(benchmark::State &st, std::vector> key_types, Args&& ...) { BenchmarkSettings settings; settings.num_build_batches = static_cast(st.range(0)); settings.num_probe_batches = settings.num_build_batches; + settings.key_types = key_types; HashJoinBasicBenchmarkImpl(st, settings); } template - static void BM_HashJoinBasic_KeyTypes(benchmark::State &st, std::vector> key_types, Args&& ...) + static void BM_HashJoinBasic_Selectivity(benchmark::State &st, std::vector> key_types, Args&& ...) { BenchmarkSettings settings; - settings.num_build_batches = static_cast(st.range(0)); - settings.num_probe_batches = settings.num_probe_batches; + settings.selectivity = static_cast(st.range(0)) / 100.0; + + settings.num_build_batches = static_cast(st.range(1)); + settings.num_probe_batches = settings.num_build_batches; settings.key_types = key_types; HashJoinBasicBenchmarkImpl(st, settings); } - static void BM_HashJoinBasic_Selectivity(benchmark::State &st) + template + static void BM_HashJoinBasic_JoinType(benchmark::State &st, JoinType join_type, Args&& ...) { BenchmarkSettings settings; - settings.selectivity = static_cast(st.range(0)) + settings.selectivity = static_cast(st.range(0)) / 100.0; settings.num_build_batches = static_cast(st.range(1)); - settings.num_probe_batches = settings.num_probe_batches; + settings.num_probe_batches = settings.num_build_batches; + settings.join_type = join_type; HashJoinBasicBenchmarkImpl(st, settings); } - static void BM_HashJoinBasic_Threads(benchmark::State &st) + static void BM_HashJoinBasic_MatchesPerRow(benchmark::State &st) { BenchmarkSettings settings; - settings.num_threads = static_cast(st.range(0)); + settings.cardinality = 1.0 / static_cast(st.range(0)); + + settings.num_build_batches = static_cast(st.range(1)); + settings.num_probe_batches = settings.num_probe_batches; HashJoinBasicBenchmarkImpl(st, settings); } - static void BM_HashJoinBasic_RelativeBuildProbe(benchmark::State &st) + static void BM_HashJoinBasic_PayloadSize(benchmark::State &st) { BenchmarkSettings settings; - double proportion = static_cast(st.range(0)) / 100.0; - settings.num_probe_batches = static_cast(settings.num_build_batches / proportion); + int32_t payload_size = static_cast(st.range(0)); + settings.probe_payload_types = { fixed_size_binary(payload_size) }; + settings.cardinality = 1.0 / static_cast(st.range(1)); + + settings.num_build_batches = static_cast(st.range(2)); + settings.num_probe_batches = settings.num_probe_batches; HashJoinBasicBenchmarkImpl(st, settings); } - static void BM_HashJoinBasic_NumKeyColumns(benchmark::State &st) + static void BM_HashJoinBasic_ProbeParallelism(benchmark::State &st) { BenchmarkSettings settings; - for(int i = 0; i < st.range(0); i++) - settings.key_types.push_back(int32()); + settings.num_threads = static_cast(st.range(0)); + settings.num_build_batches = static_cast(st.range(1)); + settings.num_probe_batches = settings.num_build_batches * 8; HashJoinBasicBenchmarkImpl(st, settings); } - static void BM_HashJoinBasic_NullPercentage(benchmark::State &st) + static void BM_HashJoinBasic_BuildParallelism(benchmark::State &st) { BenchmarkSettings settings; - settings.null_percentage = static_cast(st.range(0)) / 100.0; + settings.num_threads = static_cast(st.range(0)); + settings.num_build_batches = static_cast(st.range(1)); + settings.num_probe_batches = settings.num_threads; HashJoinBasicBenchmarkImpl(st, settings); } - - static void BM_HashJoinBasic_Cardinality(benchmark::State &st) + + static void BM_HashJoinBasic_NullPercentage(benchmark::State &st) { BenchmarkSettings settings; - settings.cardinality = static_cast(st.range(0)) / 100.0; + settings.null_percentage = static_cast(st.range(0)) / 100.0; HashJoinBasicBenchmarkImpl(st, settings); } - BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{int32}", {int32()})->RangeMultiplier(4)->Range(1, 4 * 1024); - BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{int64}", {int64()})->RangeMultiplier(4)->Range(1, 4 * 1024); - BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{int64,int64}", {int64(), int64()})->RangeMultiplier(4)->Range(1, 4 * 1024); - BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{int64,int64,int64}", {int64(), int64(), int64()})->RangeMultiplier(4)->Range(1, 4 * 1024); - BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{int64,int64,int64,int64}", {int64(), int64(), int64(), int64()})->RangeMultiplier(4)->Range(1, 4 * 1024); - BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{utf8}", {utf8()})->RangeMultiplier(4)->Range(1, 256); - BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{fixed_size_binary(4)}", {fixed_size_binary(4)})->RangeMultiplier(4)->Range(1, 4 * 1024); - BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{fixed_size_binary(8)}", {fixed_size_binary(8)})->RangeMultiplier(4)->Range(1, 4 * 1024); - BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{fixed_size_binary(16)}", {fixed_size_binary(16)})->RangeMultiplier(4)->Range(1, 4 * 1024); - BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{fixed_size_binary(24)}", {fixed_size_binary(24)})->RangeMultiplier(4)->Range(1, 4 * 1024); - BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{fixed_size_binary(32)}", {fixed_size_binary(32)})->RangeMultiplier(4)->Range(1, 4 * 1024); - - BENCHMARK(BM_HashJoinBasic_Selectivity)->ArgNames({"Selectivity", "HashTable krows"})->ArgsProduct( - { benchmark::CreateRange(1, 4 * 1024, 2), benchmark::CreateDenseRange(0, 100, 0.0)}); - BENCHMARK(BM_HashJoinBasic_Threads)->ArgNames({"Threads"})->DenseRange(1, 16); - BENCHMARK(BM_HashJoinBasic_RelativeBuildProbe)->ArgNames({"RelativeBuildProbePercentage"})->DenseRange(1, 200, 20); - BENCHMARK(BM_HashJoinBasic_NumKeyColumns)->ArgNames({"NumKeyColumns"})->RangeMultiplier(2)->Range(1, 32); + std::vector keytypes_argnames = { "Hashtable krows" }; + BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{int32}", {int32()})->ArgNames(keytypes_argnames)->RangeMultiplier(4)->Range(1, 4 * 1024); + BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{int64}", {int64()})->ArgNames(keytypes_argnames)->RangeMultiplier(4)->Range(1, 4 * 1024); + BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{int64,int64}", {int64(), int64()})->ArgNames(keytypes_argnames)->RangeMultiplier(4)->Range(1, 4 * 1024); + BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{int64,int64,int64}", {int64(), int64(), int64()})->ArgNames(keytypes_argnames)->RangeMultiplier(4)->Range(1, 4 * 1024); + BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{int64,int64,int64,int64}", {int64(), int64(), int64(), int64()})->ArgNames(keytypes_argnames)->RangeMultiplier(4)->Range(1, 4 * 1024); + BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{utf8}", {utf8()})->ArgNames(keytypes_argnames)->RangeMultiplier(4)->Range(1, 256); + BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{fixed_size_binary(4)}", {fixed_size_binary(4)})->ArgNames(keytypes_argnames)->RangeMultiplier(4)->Range(1, 4 * 1024); + BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{fixed_size_binary(8)}", {fixed_size_binary(8)})->ArgNames(keytypes_argnames)->RangeMultiplier(4)->Range(1, 4 * 1024); + BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{fixed_size_binary(16)}", {fixed_size_binary(16)})->ArgNames(keytypes_argnames)->RangeMultiplier(4)->Range(1, 4 * 1024); + BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{fixed_size_binary(24)}", {fixed_size_binary(24)})->ArgNames(keytypes_argnames)->RangeMultiplier(4)->Range(1, 4 * 1024); + BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{fixed_size_binary(32)}", {fixed_size_binary(32)})->ArgNames(keytypes_argnames)->RangeMultiplier(4)->Range(1, 4 * 1024); + + std::vector selectivity_argnames = { "Selectivity", "HashTable krows" }; + BENCHMARK_CAPTURE(BM_HashJoinBasic_Selectivity, "{int32}", {int32()})->ArgNames(selectivity_argnames)->ArgsProduct( + { benchmark::CreateDenseRange(0, 100, 10), benchmark::CreateRange(1, 4 * 1024, 8) }); + BENCHMARK_CAPTURE(BM_HashJoinBasic_Selectivity, "{int64}", {int64()})->ArgNames(selectivity_argnames)->ArgsProduct( + { benchmark::CreateDenseRange(0, 100, 10), benchmark::CreateRange(1, 4 * 1024, 8) }); + + // Joins on UTF8 are currently really slow, so anything above 512 doesn't finished within a reasonable amount of time. + BENCHMARK_CAPTURE(BM_HashJoinBasic_Selectivity, "{utf8}", {utf8()})->ArgNames(selectivity_argnames)->ArgsProduct( + { benchmark::CreateDenseRange(0, 100, 10), benchmark::CreateRange(1, 512, 8) }); + + BENCHMARK_CAPTURE(BM_HashJoinBasic_Selectivity, "{fixed_size_binary(16)}", {fixed_size_binary(16)})->ArgNames(selectivity_argnames)->ArgsProduct( + { benchmark::CreateDenseRange(0, 100, 10), benchmark::CreateRange(1, 4 * 1024, 8) }); + BENCHMARK_CAPTURE(BM_HashJoinBasic_Selectivity, "{fixed_size_binary(32)}", {fixed_size_binary(32)})->ArgNames(selectivity_argnames)->ArgsProduct( + { benchmark::CreateDenseRange(0, 100, 10), benchmark::CreateRange(1, 4 * 1024, 8) }); + + std::vector jointype_argnames = { "Selectivity", "HashTable krows" }; + BENCHMARK_CAPTURE(BM_HashJoinBasic_JoinType, "Inner", JoinType::INNER)->ArgNames(jointype_argnames)->ArgsProduct( + { benchmark::CreateDenseRange(0, 100, 10), benchmark::CreateRange(1, 4 * 1024, 8) }); + BENCHMARK_CAPTURE(BM_HashJoinBasic_JoinType, "Left Semi", JoinType::LEFT_SEMI)->ArgNames(jointype_argnames)->ArgsProduct( + { benchmark::CreateDenseRange(0, 100, 10), benchmark::CreateRange(1, 4 * 1024, 8) }); + BENCHMARK_CAPTURE(BM_HashJoinBasic_JoinType, "Right Semi", JoinType::RIGHT_SEMI)->ArgNames(jointype_argnames)->ArgsProduct( + { benchmark::CreateDenseRange(0, 100, 10), benchmark::CreateRange(1, 4 * 1024, 8) }); + BENCHMARK_CAPTURE(BM_HashJoinBasic_JoinType, "Left Anti", JoinType::LEFT_ANTI)->ArgNames(jointype_argnames)->ArgsProduct( + { benchmark::CreateDenseRange(0, 100, 10), benchmark::CreateRange(1, 4 * 1024, 8) }); + BENCHMARK_CAPTURE(BM_HashJoinBasic_JoinType, "Right Anti", JoinType::RIGHT_ANTI)->ArgNames(jointype_argnames)->ArgsProduct( + { benchmark::CreateDenseRange(0, 100, 10), benchmark::CreateRange(1, 4 * 1024, 8) }); + BENCHMARK_CAPTURE(BM_HashJoinBasic_JoinType, "Left Outer", JoinType::LEFT_OUTER)->ArgNames(jointype_argnames)->ArgsProduct( + { benchmark::CreateDenseRange(0, 100, 10), benchmark::CreateRange(1, 4 * 1024, 8) }); + BENCHMARK_CAPTURE(BM_HashJoinBasic_JoinType, "Right Outer", JoinType::RIGHT_OUTER)->ArgNames(jointype_argnames)->ArgsProduct( + { benchmark::CreateDenseRange(0, 100, 10), benchmark::CreateRange(1, 4 * 1024, 8) }); + BENCHMARK_CAPTURE(BM_HashJoinBasic_JoinType, "Full Outer", JoinType::FULL_OUTER)->ArgNames(jointype_argnames)->ArgsProduct( + { benchmark::CreateDenseRange(0, 100, 10), benchmark::CreateRange(1, 4 * 1024, 8) }); + + BENCHMARK(BM_HashJoinBasic_MatchesPerRow)->ArgNames({"MatchesPerRow", "HashTable krows"})->ArgsProduct( + { benchmark::CreateRange(1, 16, 2), benchmark::CreateRange(1, 4 * 1024, 8) }); + + BENCHMARK(BM_HashJoinBasic_PayloadSize)->ArgNames({"Payload Size", "MatchesPerRow", "HashTable krows"})->ArgsProduct( + { benchmark::CreateRange(1, 128, 4), benchmark::CreateRange(1, 16, 2), benchmark::CreateRange(1, 4 * 1024, 8) }); + + BENCHMARK(BM_HashJoinBasic_ProbeParallelism)->ArgNames({"Threads", "HashTable krows"})->ArgsProduct( + { benchmark::CreateDenseRange(1, 16, 1), benchmark::CreateRange(1, 4 * 1024, 8) }); + + BENCHMARK(BM_HashJoinBasic_BuildParallelism)->ArgNames({"Threads", "HashTable krows"})->ArgsProduct( + { benchmark::CreateDenseRange(1, 16, 1), benchmark::CreateRange(1, 4 * 1024, 8) }); + BENCHMARK(BM_HashJoinBasic_NullPercentage)->ArgNames({"NullPercentage"})->DenseRange(0, 100, 10); - BENCHMARK(BM_HashJoinBasic_Cardinality)->ArgNames({"Cardinality"})->DenseRange(10, 100, 10); } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/exec/hash_join_graphs.py b/cpp/src/arrow/compute/exec/hash_join_graphs.py new file mode 100755 index 00000000000..d103745cee2 --- /dev/null +++ b/cpp/src/arrow/compute/exec/hash_join_graphs.py @@ -0,0 +1,128 @@ +#!/bin/env python3 + +import math +import sys +import json +import matplotlib.pyplot as plt +import seaborn as sns + +sns.set() + +class Test: + def __init__(self): + self.times = [] + self.args = {} + + def get_argnames_by_cardinality_increasing(self): + key = lambda x: len(set(self.args[x])) + return sorted(self.args.keys(), key=key) + +def organize_tests(filename): + tests = {} + with open(filename) as f: + df = json.load(f) + for idx, row in enumerate(df['benchmarks']): + test_name = row['name'] + test_name_split = test_name.split('/') + base_name = test_name_split[0] + args = test_name_split[1:] + if base_name not in tests.keys(): + tests[base_name] = Test() + + tests[base_name].times.append(row['ns/row']) + + if len(args) > 3: + raise('Test can have at most 3 parameters! Found', len(args), 'in test', test_name) + + nonnamed_args = [x for x in args if ':' not in x] + if len(nonnamed_args) > 1: + raise('Test name must have only one non-named parameter! Found', len(nonnamed_args), 'in test', test_name) + + for arg in args: + arg_name = '' + arg_value = arg + if ':' in arg: + arg_split = arg.split(':') + arg_name = arg_split[0] + arg_value = arg_split[1] + if arg_name not in tests[base_name].args.keys(): + tests[base_name].args[arg_name] = [arg_value] + else: + tests[base_name].args[arg_name].append(arg_value) + return tests; + +def string_numeric_sort_key(val): + try: + return int(val) + except: + return str(val) + +def plot_1d(test, argname, ax, label=None): + x_axis = test.args[argname] + y_axis = test.times + ax.plot(x_axis, y_axis, label=label) + ax.legend() + ax.set_xlabel(argname) + ax.set_ylabel('ns/row') + +def plot_2d(test, sorted_argnames, ax, title): + assert len(sorted_argnames) == 2 + lines = set(test.args[sorted_argnames[0]]) + ax.set_title(title) + for line in sorted(lines, key=string_numeric_sort_key): + indices = range(len(test.times)) + indices = list(filter(lambda i: test.args[sorted_argnames[0]][i] == line, indices)) + filtered_test = Test() + filtered_test.times = [test.times[i] for i in indices] + filtered_test.args[sorted_argnames[1]] = [test.args[sorted_argnames[1]][i] for i in indices] + plot_1d(filtered_test, sorted_argnames[1], ax, '%s: %s' % (sorted_argnames[0], line)) + +def plot_3d(test, sorted_argnames): + assert len(sorted_argnames) == 3 + num_graphs = len(set(test.args[sorted_argnames[0]])) + num_rows = int(math.ceil(math.sqrt(num_graphs))) + num_cols = int(math.ceil(num_graphs / num_rows)) + graphs = set(test.args[sorted_argnames[0]]) + + for j, graph in enumerate(sorted(graphs, key=string_numeric_sort_key)): + ax = plt.subplot(num_rows, num_cols, j + 1) + filtered_test = Test() + indices = range(len(test.times)) + indices = list(filter(lambda i: test.args[sorted_argnames[0]][i] == graph, indices)) + filtered_test.times = [test.times[i] for i in indices] + filtered_test.args[sorted_argnames[1]] = [test.args[sorted_argnames[1]][i] for i in indices] + filtered_test.args[sorted_argnames[2]] = [test.args[sorted_argnames[2]][i] for i in indices] + plot_2d(filtered_test, sorted_argnames[1:], ax, '%s: %s' % (sorted_argnames[0], graph)) + +def main(): + if len(sys.argv) != 2: + print('Usage: hash_join_graphs.py .json') + print('This script expects there to be a counter called ns/row as a field of every test in the JSON file.') + return + + tests = organize_tests(sys.argv[1]) + + for i, test_name in enumerate(tests.keys()): + test = tests[test_name] + sorted_argnames = test.get_argnames_by_cardinality_increasing() + # Create a graph per lowest-cardinality arg + # Create a line per second-lowest-cardinality arg + # Use highest-cardinality arg as X axis + fig = plt.figure(i) + num_args = len(sorted_argnames) + if num_args == 3: + fig.suptitle(test_name) + plot_3d(test, sorted_argnames) + fig.subplots_adjust(hspace=0.4) + elif num_args == 2: + ax = plt.subplot() + plot_2d(test, sorted_argnames, ax, test_name) + else: + ax = plt.subplot() + plot_1d(test, sorted_argnames[0], ax) + fig.set_size_inches(16, 9) + fig.savefig('%s.svg' % test_name, dpi=fig.dpi, bbox_inches='tight') + plt.show() + +if __name__ == '__main__': + main() From d3ce1e367ac2bb9013b5da2dc92ed6bebf3ca5aa Mon Sep 17 00:00:00 2001 From: Sasha Krassovsky Date: Mon, 6 Dec 2021 21:27:12 -0800 Subject: [PATCH 06/16] cmake-format --- cpp/src/arrow/compute/exec/CMakeLists.txt | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/cpp/src/arrow/compute/exec/CMakeLists.txt b/cpp/src/arrow/compute/exec/CMakeLists.txt index 738655bbe6e..70ade8b9cbb 100644 --- a/cpp/src/arrow/compute/exec/CMakeLists.txt +++ b/cpp/src/arrow/compute/exec/CMakeLists.txt @@ -33,7 +33,11 @@ add_arrow_compute_test(util_test PREFIX "arrow-compute") add_arrow_benchmark(expression_benchmark PREFIX "arrow-compute") find_package(OpenMP) -add_arrow_benchmark(hash_join_benchmark PREFIX "arrow-compute" EXTRA_LINK_LIBS OpenMP::OpenMP_CXX) +add_arrow_benchmark(hash_join_benchmark + PREFIX + "arrow-compute" + EXTRA_LINK_LIBS + OpenMP::OpenMP_CXX) add_arrow_compute_test(ir_test PREFIX From ca30b92faac54695019d00eb2a34ce6632c1d70e Mon Sep 17 00:00:00 2001 From: Sasha Krassovsky Date: Tue, 7 Dec 2021 13:13:01 -0800 Subject: [PATCH 07/16] clang-format --- cpp/src/arrow/compute/exec/CMakeLists.txt | 12 +- .../arrow/compute/exec/hash_join_benchmark.cc | 683 ++++++++++-------- 2 files changed, 379 insertions(+), 316 deletions(-) diff --git a/cpp/src/arrow/compute/exec/CMakeLists.txt b/cpp/src/arrow/compute/exec/CMakeLists.txt index 70ade8b9cbb..6cb892f495c 100644 --- a/cpp/src/arrow/compute/exec/CMakeLists.txt +++ b/cpp/src/arrow/compute/exec/CMakeLists.txt @@ -33,11 +33,13 @@ add_arrow_compute_test(util_test PREFIX "arrow-compute") add_arrow_benchmark(expression_benchmark PREFIX "arrow-compute") find_package(OpenMP) -add_arrow_benchmark(hash_join_benchmark - PREFIX - "arrow-compute" - EXTRA_LINK_LIBS - OpenMP::OpenMP_CXX) +if(OpenMP_CXX_FOUND) + add_arrow_benchmark(hash_join_benchmark + PREFIX + "arrow-compute" + EXTRA_LINK_LIBS + OpenMP::OpenMP_CXX) +endif() add_arrow_compute_test(ir_test PREFIX diff --git a/cpp/src/arrow/compute/exec/hash_join_benchmark.cc b/cpp/src/arrow/compute/exec/hash_join_benchmark.cc index 52a5b327cea..7a971e3ecc4 100644 --- a/cpp/src/arrow/compute/exec/hash_join_benchmark.cc +++ b/cpp/src/arrow/compute/exec/hash_join_benchmark.cc @@ -15,12 +15,12 @@ // specific language governing permissions and limitations // under the License. -#include "benchmark/benchmark.h" #include "arrow/testing/gtest_util.h" +#include "benchmark/benchmark.h" #include "arrow/api.h" -#include "arrow/compute/exec/options.h" #include "arrow/compute/exec/hash_join.h" +#include "arrow/compute/exec/options.h" #include "arrow/compute/exec/test_util.h" #include "arrow/compute/exec/util.h" #include "arrow/compute/kernels/row_encoder.h" @@ -33,333 +33,394 @@ #include "arrow/util/pcg_random.h" #include "arrow/util/thread_pool.h" -#include #include +#include #include #include -namespace arrow -{ -namespace compute -{ - struct BenchmarkSettings - { - int num_threads = 1; - JoinType join_type = JoinType::INNER; - int batch_size = 1024; - int num_build_batches = 32; - int num_probe_batches = 32 * 16; - std::vector> key_types = { int32() }; - std::vector> build_payload_types = {}; - std::vector> probe_payload_types = {}; - - double null_percentage = 0.0; - double cardinality = 1.0; // Proportion of distinct keys in build side - double selectivity = 1.0; // Probability of a match for a given row - }; - - class JoinBenchmark - { - public: - JoinBenchmark(BenchmarkSettings &settings) - { - bool is_parallel = settings.num_threads != 1; - - SchemaBuilder l_schema_builder, r_schema_builder; - std::vector left_keys, right_keys; - for(size_t i = 0; i < settings.key_types.size(); i++) - { - std::string l_name = "lk" + std::to_string(i); - std::string r_name = "rk" + std::to_string(i); - - // For integers, selectivity is the proportion of the build interval that overlaps with - // the probe interval - uint64_t num_build_rows = settings.num_build_batches * settings.batch_size; - - uint64_t min_build_value = 0; - uint64_t max_build_value = static_cast(num_build_rows * settings.cardinality); - - uint64_t min_probe_value = static_cast((1.0 - settings.selectivity) * max_build_value); - uint64_t max_probe_value = min_probe_value + max_build_value; - - std::unordered_map build_metadata; - build_metadata["null_probability"] = std::to_string(settings.null_percentage); - build_metadata["min"] = std::to_string(min_build_value); - build_metadata["max"] = std::to_string(max_build_value); - - std::unordered_map probe_metadata; - probe_metadata["null_probability"] = std::to_string(settings.null_percentage); - probe_metadata["min"] = std::to_string(min_probe_value); - probe_metadata["max"] = std::to_string(max_probe_value); - - auto l_field = field(l_name, settings.key_types[i], key_value_metadata(probe_metadata)); - auto r_field = field(r_name, settings.key_types[i], key_value_metadata(build_metadata)); - - DCHECK_OK(l_schema_builder.AddField(std::move(l_field))); - DCHECK_OK(r_schema_builder.AddField(std::move(r_field))); - - left_keys.push_back(FieldRef(l_name)); - right_keys.push_back(FieldRef(r_name)); - } - - for(size_t i = 0; i < settings.build_payload_types.size(); i++) - { - std::string name = "lp" + std::to_string(i); - DCHECK_OK(l_schema_builder.AddField(field(name, settings.probe_payload_types[i]))); - } - - for(size_t i = 0; i < settings.build_payload_types.size(); i++) - { - std::string name = "rp" + std::to_string(i); - DCHECK_OK(r_schema_builder.AddField(field(name, settings.build_payload_types[i]))); - } - - auto l_schema = *l_schema_builder.Finish(); - auto r_schema = *r_schema_builder.Finish(); - - l_batches_ = MakeRandomBatches(l_schema, settings.num_probe_batches, settings.batch_size); - r_batches_ = MakeRandomBatches(r_schema, settings.num_build_batches, settings.batch_size); - - stats_.num_probe_rows = settings.num_probe_batches * settings.batch_size; - - ctx_ = arrow::internal::make_unique( - default_memory_pool(), is_parallel ? arrow::internal::GetCpuThreadPool() : nullptr); - - schema_mgr_ = arrow::internal::make_unique(); - Expression filter = literal(true); - DCHECK_OK(schema_mgr_->Init( - settings.join_type, - *l_batches_.schema, - left_keys, - *r_batches_.schema, - right_keys, - filter, - "l_", - "r_")); - - join_ = *HashJoinImpl::MakeBasic(); - - omp_set_num_threads(settings.num_threads); - auto schedule_callback = [](std::function func) -> Status - { - #pragma omp task - { DCHECK_OK(func(omp_get_thread_num())); } - return Status::OK(); - }; - - - DCHECK_OK(join_->Init( - ctx_.get(), settings.join_type, !is_parallel /* use_sync_execution*/, settings.num_threads, - schema_mgr_.get(), {JoinKeyCmp::EQ}, std::move(filter), - [](ExecBatch) {}, - [](int64_t x) {}, - schedule_callback)); - - } - - void RunJoin() - { - double nanos = 0; - #pragma omp parallel reduction(+:nanos) - { - auto start = std::chrono::high_resolution_clock::now(); - int tid = omp_get_thread_num(); - #pragma omp for nowait - for(auto batch : r_batches_.batches) - DCHECK_OK(join_->InputReceived(tid, 1 /* side */, batch)); - #pragma omp for nowait - for(auto batch : l_batches_.batches) - DCHECK_OK(join_->InputReceived(tid, 0 /* side */, batch)); - - #pragma omp barrier - - #pragma omp single nowait - { DCHECK_OK(join_->InputFinished(tid, /* side */ 1)); } - - #pragma omp single nowait - { DCHECK_OK(join_->InputFinished(tid, /* side */ 0)); } - std::chrono::duration elapsed = std::chrono::high_resolution_clock::now() - start; - nanos += elapsed.count(); - } - stats_.total_nanoseconds = nanos; - } - - ThreadIndexer thread_indexer_; - BatchesWithSchema l_batches_; - BatchesWithSchema r_batches_; - std::unique_ptr schema_mgr_; - std::unique_ptr join_; - std::unique_ptr ctx_; - - struct - { - double total_nanoseconds; - uint64_t num_probe_rows; - } stats_; - }; - - static void HashJoinBasicBenchmarkImpl(benchmark::State &st, BenchmarkSettings &settings) - { - JoinBenchmark bm(settings); - double total_nanos = 0; - uint64_t total_rows = 0; - for(auto _ : st) - { - bm.RunJoin(); - total_nanos += bm.stats_.total_nanoseconds; - total_rows += bm.stats_.num_probe_rows; - } - st.counters["ns/row"] = total_nanos / total_rows; +namespace arrow { +namespace compute { +struct BenchmarkSettings { + int num_threads = 1; + JoinType join_type = JoinType::INNER; + int batch_size = 1024; + int num_build_batches = 32; + int num_probe_batches = 32 * 16; + std::vector> key_types = {int32()}; + std::vector> build_payload_types = {}; + std::vector> probe_payload_types = {}; + + double null_percentage = 0.0; + double cardinality = 1.0; // Proportion of distinct keys in build side + double selectivity = 1.0; // Probability of a match for a given row +}; + +class JoinBenchmark { + public: + JoinBenchmark(BenchmarkSettings& settings) { + bool is_parallel = settings.num_threads != 1; + + SchemaBuilder l_schema_builder, r_schema_builder; + std::vector left_keys, right_keys; + for (size_t i = 0; i < settings.key_types.size(); i++) { + std::string l_name = "lk" + std::to_string(i); + std::string r_name = "rk" + std::to_string(i); + + // For integers, selectivity is the proportion of the build interval that overlaps + // with the probe interval + uint64_t num_build_rows = settings.num_build_batches * settings.batch_size; + + uint64_t min_build_value = 0; + uint64_t max_build_value = + static_cast(num_build_rows * settings.cardinality); + + uint64_t min_probe_value = + static_cast((1.0 - settings.selectivity) * max_build_value); + uint64_t max_probe_value = min_probe_value + max_build_value; + + std::unordered_map build_metadata; + build_metadata["null_probability"] = std::to_string(settings.null_percentage); + build_metadata["min"] = std::to_string(min_build_value); + build_metadata["max"] = std::to_string(max_build_value); + + std::unordered_map probe_metadata; + probe_metadata["null_probability"] = std::to_string(settings.null_percentage); + probe_metadata["min"] = std::to_string(min_probe_value); + probe_metadata["max"] = std::to_string(max_probe_value); + + auto l_field = + field(l_name, settings.key_types[i], key_value_metadata(probe_metadata)); + auto r_field = + field(r_name, settings.key_types[i], key_value_metadata(build_metadata)); + + DCHECK_OK(l_schema_builder.AddField(std::move(l_field))); + DCHECK_OK(r_schema_builder.AddField(std::move(r_field))); + + left_keys.push_back(FieldRef(l_name)); + right_keys.push_back(FieldRef(r_name)); } - template - static void BM_HashJoinBasic_KeyTypes(benchmark::State &st, std::vector> key_types, Args&& ...) - { - BenchmarkSettings settings; - settings.num_build_batches = static_cast(st.range(0)); - settings.num_probe_batches = settings.num_build_batches; - settings.key_types = key_types; - - HashJoinBasicBenchmarkImpl(st, settings); - } - - template - static void BM_HashJoinBasic_Selectivity(benchmark::State &st, std::vector> key_types, Args&& ...) - { - BenchmarkSettings settings; - settings.selectivity = static_cast(st.range(0)) / 100.0; - - settings.num_build_batches = static_cast(st.range(1)); - settings.num_probe_batches = settings.num_build_batches; - settings.key_types = key_types; - - HashJoinBasicBenchmarkImpl(st, settings); + for (size_t i = 0; i < settings.build_payload_types.size(); i++) { + std::string name = "lp" + std::to_string(i); + DCHECK_OK(l_schema_builder.AddField(field(name, settings.probe_payload_types[i]))); } - template - static void BM_HashJoinBasic_JoinType(benchmark::State &st, JoinType join_type, Args&& ...) - { - BenchmarkSettings settings; - settings.selectivity = static_cast(st.range(0)) / 100.0; - - settings.num_build_batches = static_cast(st.range(1)); - settings.num_probe_batches = settings.num_build_batches; - settings.join_type = join_type; - - HashJoinBasicBenchmarkImpl(st, settings); + for (size_t i = 0; i < settings.build_payload_types.size(); i++) { + std::string name = "rp" + std::to_string(i); + DCHECK_OK(r_schema_builder.AddField(field(name, settings.build_payload_types[i]))); } - static void BM_HashJoinBasic_MatchesPerRow(benchmark::State &st) - { - BenchmarkSettings settings; - settings.cardinality = 1.0 / static_cast(st.range(0)); - - settings.num_build_batches = static_cast(st.range(1)); - settings.num_probe_batches = settings.num_probe_batches; - - HashJoinBasicBenchmarkImpl(st, settings); - } + auto l_schema = *l_schema_builder.Finish(); + auto r_schema = *r_schema_builder.Finish(); - static void BM_HashJoinBasic_PayloadSize(benchmark::State &st) - { - BenchmarkSettings settings; - int32_t payload_size = static_cast(st.range(0)); - settings.probe_payload_types = { fixed_size_binary(payload_size) }; - settings.cardinality = 1.0 / static_cast(st.range(1)); + l_batches_ = + MakeRandomBatches(l_schema, settings.num_probe_batches, settings.batch_size); + r_batches_ = + MakeRandomBatches(r_schema, settings.num_build_batches, settings.batch_size); - settings.num_build_batches = static_cast(st.range(2)); - settings.num_probe_batches = settings.num_probe_batches; + stats_.num_probe_rows = settings.num_probe_batches * settings.batch_size; - HashJoinBasicBenchmarkImpl(st, settings); - } + ctx_ = arrow::internal::make_unique( + default_memory_pool(), + is_parallel ? arrow::internal::GetCpuThreadPool() : nullptr); - static void BM_HashJoinBasic_ProbeParallelism(benchmark::State &st) - { - BenchmarkSettings settings; - settings.num_threads = static_cast(st.range(0)); - settings.num_build_batches = static_cast(st.range(1)); - settings.num_probe_batches = settings.num_build_batches * 8; + schema_mgr_ = arrow::internal::make_unique(); + Expression filter = literal(true); + DCHECK_OK(schema_mgr_->Init(settings.join_type, *l_batches_.schema, left_keys, + *r_batches_.schema, right_keys, filter, "l_", "r_")); - HashJoinBasicBenchmarkImpl(st, settings); - } + join_ = *HashJoinImpl::MakeBasic(); - static void BM_HashJoinBasic_BuildParallelism(benchmark::State &st) - { - BenchmarkSettings settings; - settings.num_threads = static_cast(st.range(0)); - settings.num_build_batches = static_cast(st.range(1)); - settings.num_probe_batches = settings.num_threads; + omp_set_num_threads(settings.num_threads); + auto schedule_callback = [](std::function func) -> Status { +#pragma omp task + { DCHECK_OK(func(omp_get_thread_num())); } + return Status::OK(); + }; - HashJoinBasicBenchmarkImpl(st, settings); - } + DCHECK_OK(join_->Init( + ctx_.get(), settings.join_type, !is_parallel /* use_sync_execution*/, + settings.num_threads, schema_mgr_.get(), {JoinKeyCmp::EQ}, std::move(filter), + [](ExecBatch) {}, [](int64_t x) {}, schedule_callback)); + } - static void BM_HashJoinBasic_NullPercentage(benchmark::State &st) + void RunJoin() { + double nanos = 0; +#pragma omp parallel reduction(+ : nanos) { - BenchmarkSettings settings; - settings.null_percentage = static_cast(st.range(0)) / 100.0; - - HashJoinBasicBenchmarkImpl(st, settings); + auto start = std::chrono::high_resolution_clock::now(); + int tid = omp_get_thread_num(); +#pragma omp for nowait + for (auto batch : r_batches_.batches) + DCHECK_OK(join_->InputReceived(tid, 1 /* side */, batch)); +#pragma omp for nowait + for (auto batch : l_batches_.batches) + DCHECK_OK(join_->InputReceived(tid, 0 /* side */, batch)); + +#pragma omp barrier + +#pragma omp single nowait + { DCHECK_OK(join_->InputFinished(tid, /* side */ 1)); } + +#pragma omp single nowait + { DCHECK_OK(join_->InputFinished(tid, /* side */ 0)); } + std::chrono::duration elapsed = + std::chrono::high_resolution_clock::now() - start; + nanos += elapsed.count(); } - - std::vector keytypes_argnames = { "Hashtable krows" }; - BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{int32}", {int32()})->ArgNames(keytypes_argnames)->RangeMultiplier(4)->Range(1, 4 * 1024); - BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{int64}", {int64()})->ArgNames(keytypes_argnames)->RangeMultiplier(4)->Range(1, 4 * 1024); - BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{int64,int64}", {int64(), int64()})->ArgNames(keytypes_argnames)->RangeMultiplier(4)->Range(1, 4 * 1024); - BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{int64,int64,int64}", {int64(), int64(), int64()})->ArgNames(keytypes_argnames)->RangeMultiplier(4)->Range(1, 4 * 1024); - BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{int64,int64,int64,int64}", {int64(), int64(), int64(), int64()})->ArgNames(keytypes_argnames)->RangeMultiplier(4)->Range(1, 4 * 1024); - BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{utf8}", {utf8()})->ArgNames(keytypes_argnames)->RangeMultiplier(4)->Range(1, 256); - BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{fixed_size_binary(4)}", {fixed_size_binary(4)})->ArgNames(keytypes_argnames)->RangeMultiplier(4)->Range(1, 4 * 1024); - BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{fixed_size_binary(8)}", {fixed_size_binary(8)})->ArgNames(keytypes_argnames)->RangeMultiplier(4)->Range(1, 4 * 1024); - BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{fixed_size_binary(16)}", {fixed_size_binary(16)})->ArgNames(keytypes_argnames)->RangeMultiplier(4)->Range(1, 4 * 1024); - BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{fixed_size_binary(24)}", {fixed_size_binary(24)})->ArgNames(keytypes_argnames)->RangeMultiplier(4)->Range(1, 4 * 1024); - BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{fixed_size_binary(32)}", {fixed_size_binary(32)})->ArgNames(keytypes_argnames)->RangeMultiplier(4)->Range(1, 4 * 1024); - - std::vector selectivity_argnames = { "Selectivity", "HashTable krows" }; - BENCHMARK_CAPTURE(BM_HashJoinBasic_Selectivity, "{int32}", {int32()})->ArgNames(selectivity_argnames)->ArgsProduct( - { benchmark::CreateDenseRange(0, 100, 10), benchmark::CreateRange(1, 4 * 1024, 8) }); - BENCHMARK_CAPTURE(BM_HashJoinBasic_Selectivity, "{int64}", {int64()})->ArgNames(selectivity_argnames)->ArgsProduct( - { benchmark::CreateDenseRange(0, 100, 10), benchmark::CreateRange(1, 4 * 1024, 8) }); - - // Joins on UTF8 are currently really slow, so anything above 512 doesn't finished within a reasonable amount of time. - BENCHMARK_CAPTURE(BM_HashJoinBasic_Selectivity, "{utf8}", {utf8()})->ArgNames(selectivity_argnames)->ArgsProduct( - { benchmark::CreateDenseRange(0, 100, 10), benchmark::CreateRange(1, 512, 8) }); - - BENCHMARK_CAPTURE(BM_HashJoinBasic_Selectivity, "{fixed_size_binary(16)}", {fixed_size_binary(16)})->ArgNames(selectivity_argnames)->ArgsProduct( - { benchmark::CreateDenseRange(0, 100, 10), benchmark::CreateRange(1, 4 * 1024, 8) }); - BENCHMARK_CAPTURE(BM_HashJoinBasic_Selectivity, "{fixed_size_binary(32)}", {fixed_size_binary(32)})->ArgNames(selectivity_argnames)->ArgsProduct( - { benchmark::CreateDenseRange(0, 100, 10), benchmark::CreateRange(1, 4 * 1024, 8) }); - - std::vector jointype_argnames = { "Selectivity", "HashTable krows" }; - BENCHMARK_CAPTURE(BM_HashJoinBasic_JoinType, "Inner", JoinType::INNER)->ArgNames(jointype_argnames)->ArgsProduct( - { benchmark::CreateDenseRange(0, 100, 10), benchmark::CreateRange(1, 4 * 1024, 8) }); - BENCHMARK_CAPTURE(BM_HashJoinBasic_JoinType, "Left Semi", JoinType::LEFT_SEMI)->ArgNames(jointype_argnames)->ArgsProduct( - { benchmark::CreateDenseRange(0, 100, 10), benchmark::CreateRange(1, 4 * 1024, 8) }); - BENCHMARK_CAPTURE(BM_HashJoinBasic_JoinType, "Right Semi", JoinType::RIGHT_SEMI)->ArgNames(jointype_argnames)->ArgsProduct( - { benchmark::CreateDenseRange(0, 100, 10), benchmark::CreateRange(1, 4 * 1024, 8) }); - BENCHMARK_CAPTURE(BM_HashJoinBasic_JoinType, "Left Anti", JoinType::LEFT_ANTI)->ArgNames(jointype_argnames)->ArgsProduct( - { benchmark::CreateDenseRange(0, 100, 10), benchmark::CreateRange(1, 4 * 1024, 8) }); - BENCHMARK_CAPTURE(BM_HashJoinBasic_JoinType, "Right Anti", JoinType::RIGHT_ANTI)->ArgNames(jointype_argnames)->ArgsProduct( - { benchmark::CreateDenseRange(0, 100, 10), benchmark::CreateRange(1, 4 * 1024, 8) }); - BENCHMARK_CAPTURE(BM_HashJoinBasic_JoinType, "Left Outer", JoinType::LEFT_OUTER)->ArgNames(jointype_argnames)->ArgsProduct( - { benchmark::CreateDenseRange(0, 100, 10), benchmark::CreateRange(1, 4 * 1024, 8) }); - BENCHMARK_CAPTURE(BM_HashJoinBasic_JoinType, "Right Outer", JoinType::RIGHT_OUTER)->ArgNames(jointype_argnames)->ArgsProduct( - { benchmark::CreateDenseRange(0, 100, 10), benchmark::CreateRange(1, 4 * 1024, 8) }); - BENCHMARK_CAPTURE(BM_HashJoinBasic_JoinType, "Full Outer", JoinType::FULL_OUTER)->ArgNames(jointype_argnames)->ArgsProduct( - { benchmark::CreateDenseRange(0, 100, 10), benchmark::CreateRange(1, 4 * 1024, 8) }); - - BENCHMARK(BM_HashJoinBasic_MatchesPerRow)->ArgNames({"MatchesPerRow", "HashTable krows"})->ArgsProduct( - { benchmark::CreateRange(1, 16, 2), benchmark::CreateRange(1, 4 * 1024, 8) }); - - BENCHMARK(BM_HashJoinBasic_PayloadSize)->ArgNames({"Payload Size", "MatchesPerRow", "HashTable krows"})->ArgsProduct( - { benchmark::CreateRange(1, 128, 4), benchmark::CreateRange(1, 16, 2), benchmark::CreateRange(1, 4 * 1024, 8) }); - - BENCHMARK(BM_HashJoinBasic_ProbeParallelism)->ArgNames({"Threads", "HashTable krows"})->ArgsProduct( - { benchmark::CreateDenseRange(1, 16, 1), benchmark::CreateRange(1, 4 * 1024, 8) }); - - BENCHMARK(BM_HashJoinBasic_BuildParallelism)->ArgNames({"Threads", "HashTable krows"})->ArgsProduct( - { benchmark::CreateDenseRange(1, 16, 1), benchmark::CreateRange(1, 4 * 1024, 8) }); - - BENCHMARK(BM_HashJoinBasic_NullPercentage)->ArgNames({"NullPercentage"})->DenseRange(0, 100, 10); -} // namespace compute -} // namespace arrow + stats_.total_nanoseconds = nanos; + } + + ThreadIndexer thread_indexer_; + BatchesWithSchema l_batches_; + BatchesWithSchema r_batches_; + std::unique_ptr schema_mgr_; + std::unique_ptr join_; + std::unique_ptr ctx_; + + struct { + double total_nanoseconds; + uint64_t num_probe_rows; + } stats_; +}; + +static void HashJoinBasicBenchmarkImpl(benchmark::State& st, + BenchmarkSettings& settings) { + JoinBenchmark bm(settings); + double total_nanos = 0; + uint64_t total_rows = 0; + for (auto _ : st) { + bm.RunJoin(); + total_nanos += bm.stats_.total_nanoseconds; + total_rows += bm.stats_.num_probe_rows; + } + st.counters["ns/row"] = total_nanos / total_rows; +} + +template +static void BM_HashJoinBasic_KeyTypes(benchmark::State& st, + std::vector> key_types, + Args&&...) { + BenchmarkSettings settings; + settings.num_build_batches = static_cast(st.range(0)); + settings.num_probe_batches = settings.num_build_batches; + settings.key_types = key_types; + + HashJoinBasicBenchmarkImpl(st, settings); +} + +template +static void BM_HashJoinBasic_Selectivity(benchmark::State& st, + std::vector> key_types, + Args&&...) { + BenchmarkSettings settings; + settings.selectivity = static_cast(st.range(0)) / 100.0; + + settings.num_build_batches = static_cast(st.range(1)); + settings.num_probe_batches = settings.num_build_batches; + settings.key_types = key_types; + + HashJoinBasicBenchmarkImpl(st, settings); +} + +template +static void BM_HashJoinBasic_JoinType(benchmark::State& st, JoinType join_type, + Args&&...) { + BenchmarkSettings settings; + settings.selectivity = static_cast(st.range(0)) / 100.0; + + settings.num_build_batches = static_cast(st.range(1)); + settings.num_probe_batches = settings.num_build_batches; + settings.join_type = join_type; + + HashJoinBasicBenchmarkImpl(st, settings); +} + +static void BM_HashJoinBasic_MatchesPerRow(benchmark::State& st) { + BenchmarkSettings settings; + settings.cardinality = 1.0 / static_cast(st.range(0)); + + settings.num_build_batches = static_cast(st.range(1)); + settings.num_probe_batches = settings.num_probe_batches; + + HashJoinBasicBenchmarkImpl(st, settings); +} + +static void BM_HashJoinBasic_PayloadSize(benchmark::State& st) { + BenchmarkSettings settings; + int32_t payload_size = static_cast(st.range(0)); + settings.probe_payload_types = {fixed_size_binary(payload_size)}; + settings.cardinality = 1.0 / static_cast(st.range(1)); + + settings.num_build_batches = static_cast(st.range(2)); + settings.num_probe_batches = settings.num_probe_batches; + + HashJoinBasicBenchmarkImpl(st, settings); +} + +static void BM_HashJoinBasic_ProbeParallelism(benchmark::State& st) { + BenchmarkSettings settings; + settings.num_threads = static_cast(st.range(0)); + settings.num_build_batches = static_cast(st.range(1)); + settings.num_probe_batches = settings.num_build_batches * 8; + + HashJoinBasicBenchmarkImpl(st, settings); +} + +static void BM_HashJoinBasic_BuildParallelism(benchmark::State& st) { + BenchmarkSettings settings; + settings.num_threads = static_cast(st.range(0)); + settings.num_build_batches = static_cast(st.range(1)); + settings.num_probe_batches = settings.num_threads; + + HashJoinBasicBenchmarkImpl(st, settings); +} + +static void BM_HashJoinBasic_NullPercentage(benchmark::State& st) { + BenchmarkSettings settings; + settings.null_percentage = static_cast(st.range(0)) / 100.0; + + HashJoinBasicBenchmarkImpl(st, settings); +} + +std::vector keytypes_argnames = {"Hashtable krows"}; +BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{int32}", {int32()}) + ->ArgNames(keytypes_argnames) + ->RangeMultiplier(4) + ->Range(1, 4 * 1024); +BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{int64}", {int64()}) + ->ArgNames(keytypes_argnames) + ->RangeMultiplier(4) + ->Range(1, 4 * 1024); +BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{int64,int64}", {int64(), int64()}) + ->ArgNames(keytypes_argnames) + ->RangeMultiplier(4) + ->Range(1, 4 * 1024); +BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{int64,int64,int64}", + {int64(), int64(), int64()}) + ->ArgNames(keytypes_argnames) + ->RangeMultiplier(4) + ->Range(1, 4 * 1024); +BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{int64,int64,int64,int64}", + {int64(), int64(), int64(), int64()}) + ->ArgNames(keytypes_argnames) + ->RangeMultiplier(4) + ->Range(1, 4 * 1024); +BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{utf8}", {utf8()}) + ->ArgNames(keytypes_argnames) + ->RangeMultiplier(4) + ->Range(1, 256); +BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{fixed_size_binary(4)}", + {fixed_size_binary(4)}) + ->ArgNames(keytypes_argnames) + ->RangeMultiplier(4) + ->Range(1, 4 * 1024); +BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{fixed_size_binary(8)}", + {fixed_size_binary(8)}) + ->ArgNames(keytypes_argnames) + ->RangeMultiplier(4) + ->Range(1, 4 * 1024); +BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{fixed_size_binary(16)}", + {fixed_size_binary(16)}) + ->ArgNames(keytypes_argnames) + ->RangeMultiplier(4) + ->Range(1, 4 * 1024); +BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{fixed_size_binary(24)}", + {fixed_size_binary(24)}) + ->ArgNames(keytypes_argnames) + ->RangeMultiplier(4) + ->Range(1, 4 * 1024); +BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{fixed_size_binary(32)}", + {fixed_size_binary(32)}) + ->ArgNames(keytypes_argnames) + ->RangeMultiplier(4) + ->Range(1, 4 * 1024); + +std::vector selectivity_argnames = {"Selectivity", "HashTable krows"}; +BENCHMARK_CAPTURE(BM_HashJoinBasic_Selectivity, "{int32}", {int32()}) + ->ArgNames(selectivity_argnames) + ->ArgsProduct({benchmark::CreateDenseRange(0, 100, 10), + benchmark::CreateRange(1, 4 * 1024, 8)}); +BENCHMARK_CAPTURE(BM_HashJoinBasic_Selectivity, "{int64}", {int64()}) + ->ArgNames(selectivity_argnames) + ->ArgsProduct({benchmark::CreateDenseRange(0, 100, 10), + benchmark::CreateRange(1, 4 * 1024, 8)}); + +// Joins on UTF8 are currently really slow, so anything above 512 doesn't finished within +// a reasonable amount of time. +BENCHMARK_CAPTURE(BM_HashJoinBasic_Selectivity, "{utf8}", {utf8()}) + ->ArgNames(selectivity_argnames) + ->ArgsProduct({benchmark::CreateDenseRange(0, 100, 10), + benchmark::CreateRange(1, 512, 8)}); + +BENCHMARK_CAPTURE(BM_HashJoinBasic_Selectivity, "{fixed_size_binary(16)}", + {fixed_size_binary(16)}) + ->ArgNames(selectivity_argnames) + ->ArgsProduct({benchmark::CreateDenseRange(0, 100, 10), + benchmark::CreateRange(1, 4 * 1024, 8)}); +BENCHMARK_CAPTURE(BM_HashJoinBasic_Selectivity, "{fixed_size_binary(32)}", + {fixed_size_binary(32)}) + ->ArgNames(selectivity_argnames) + ->ArgsProduct({benchmark::CreateDenseRange(0, 100, 10), + benchmark::CreateRange(1, 4 * 1024, 8)}); + +std::vector jointype_argnames = {"Selectivity", "HashTable krows"}; +BENCHMARK_CAPTURE(BM_HashJoinBasic_JoinType, "Inner", JoinType::INNER) + ->ArgNames(jointype_argnames) + ->ArgsProduct({benchmark::CreateDenseRange(0, 100, 10), + benchmark::CreateRange(1, 4 * 1024, 8)}); +BENCHMARK_CAPTURE(BM_HashJoinBasic_JoinType, "Left Semi", JoinType::LEFT_SEMI) + ->ArgNames(jointype_argnames) + ->ArgsProduct({benchmark::CreateDenseRange(0, 100, 10), + benchmark::CreateRange(1, 4 * 1024, 8)}); +BENCHMARK_CAPTURE(BM_HashJoinBasic_JoinType, "Right Semi", JoinType::RIGHT_SEMI) + ->ArgNames(jointype_argnames) + ->ArgsProduct({benchmark::CreateDenseRange(0, 100, 10), + benchmark::CreateRange(1, 4 * 1024, 8)}); +BENCHMARK_CAPTURE(BM_HashJoinBasic_JoinType, "Left Anti", JoinType::LEFT_ANTI) + ->ArgNames(jointype_argnames) + ->ArgsProduct({benchmark::CreateDenseRange(0, 100, 10), + benchmark::CreateRange(1, 4 * 1024, 8)}); +BENCHMARK_CAPTURE(BM_HashJoinBasic_JoinType, "Right Anti", JoinType::RIGHT_ANTI) + ->ArgNames(jointype_argnames) + ->ArgsProduct({benchmark::CreateDenseRange(0, 100, 10), + benchmark::CreateRange(1, 4 * 1024, 8)}); +BENCHMARK_CAPTURE(BM_HashJoinBasic_JoinType, "Left Outer", JoinType::LEFT_OUTER) + ->ArgNames(jointype_argnames) + ->ArgsProduct({benchmark::CreateDenseRange(0, 100, 10), + benchmark::CreateRange(1, 4 * 1024, 8)}); +BENCHMARK_CAPTURE(BM_HashJoinBasic_JoinType, "Right Outer", JoinType::RIGHT_OUTER) + ->ArgNames(jointype_argnames) + ->ArgsProduct({benchmark::CreateDenseRange(0, 100, 10), + benchmark::CreateRange(1, 4 * 1024, 8)}); +BENCHMARK_CAPTURE(BM_HashJoinBasic_JoinType, "Full Outer", JoinType::FULL_OUTER) + ->ArgNames(jointype_argnames) + ->ArgsProduct({benchmark::CreateDenseRange(0, 100, 10), + benchmark::CreateRange(1, 4 * 1024, 8)}); + +BENCHMARK(BM_HashJoinBasic_MatchesPerRow) + ->ArgNames({"MatchesPerRow", "HashTable krows"}) + ->ArgsProduct({benchmark::CreateRange(1, 16, 2), + benchmark::CreateRange(1, 4 * 1024, 8)}); + +BENCHMARK(BM_HashJoinBasic_PayloadSize) + ->ArgNames({"Payload Size", "MatchesPerRow", "HashTable krows"}) + ->ArgsProduct({benchmark::CreateRange(1, 128, 4), benchmark::CreateRange(1, 16, 2), + benchmark::CreateRange(1, 4 * 1024, 8)}); + +BENCHMARK(BM_HashJoinBasic_ProbeParallelism) + ->ArgNames({"Threads", "HashTable krows"}) + ->ArgsProduct({benchmark::CreateDenseRange(1, 16, 1), + benchmark::CreateRange(1, 4 * 1024, 8)}); + +BENCHMARK(BM_HashJoinBasic_BuildParallelism) + ->ArgNames({"Threads", "HashTable krows"}) + ->ArgsProduct({benchmark::CreateDenseRange(1, 16, 1), + benchmark::CreateRange(1, 4 * 1024, 8)}); + +BENCHMARK(BM_HashJoinBasic_NullPercentage) + ->ArgNames({"NullPercentage"}) + ->DenseRange(0, 100, 10); +} // namespace compute +} // namespace arrow From 49f7d53a6968b5ab4c74985e23d4ead9b565bdaf Mon Sep 17 00:00:00 2001 From: Sasha Krassovsky Date: Tue, 7 Dec 2021 13:24:14 -0800 Subject: [PATCH 08/16] Add license header --- cpp/src/arrow/compute/exec/hash_join_graphs.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/cpp/src/arrow/compute/exec/hash_join_graphs.py b/cpp/src/arrow/compute/exec/hash_join_graphs.py index d103745cee2..5b96b06f866 100755 --- a/cpp/src/arrow/compute/exec/hash_join_graphs.py +++ b/cpp/src/arrow/compute/exec/hash_join_graphs.py @@ -1,5 +1,22 @@ #!/bin/env python3 +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + import math import sys import json From 4b9977470986f0234347b1260415344c213391aa Mon Sep 17 00:00:00 2001 From: Sasha Krassovsky Date: Tue, 7 Dec 2021 13:55:02 -0800 Subject: [PATCH 09/16] Linter comments --- cpp/src/arrow/compute/exec/hash_join_benchmark.cc | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cpp/src/arrow/compute/exec/hash_join_benchmark.cc b/cpp/src/arrow/compute/exec/hash_join_benchmark.cc index 7a971e3ecc4..9855c63124a 100644 --- a/cpp/src/arrow/compute/exec/hash_join_benchmark.cc +++ b/cpp/src/arrow/compute/exec/hash_join_benchmark.cc @@ -15,7 +15,6 @@ // specific language governing permissions and limitations // under the License. -#include "arrow/testing/gtest_util.h" #include "benchmark/benchmark.h" #include "arrow/api.h" @@ -58,7 +57,7 @@ struct BenchmarkSettings { class JoinBenchmark { public: - JoinBenchmark(BenchmarkSettings& settings) { + explicit JoinBenchmark(BenchmarkSettings& settings) { bool is_parallel = settings.num_threads != 1; SchemaBuilder l_schema_builder, r_schema_builder; From d81a94e44c9d7b63340f96804c5c48f9e9af6886 Mon Sep 17 00:00:00 2001 From: Sasha Krassovsky Date: Wed, 8 Dec 2021 20:58:02 -0800 Subject: [PATCH 10/16] Make openmp work on msvc --- cpp/src/arrow/compute/exec/CMakeLists.txt | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cpp/src/arrow/compute/exec/CMakeLists.txt b/cpp/src/arrow/compute/exec/CMakeLists.txt index 6cb892f495c..6226507d17c 100644 --- a/cpp/src/arrow/compute/exec/CMakeLists.txt +++ b/cpp/src/arrow/compute/exec/CMakeLists.txt @@ -39,6 +39,10 @@ if(OpenMP_CXX_FOUND) "arrow-compute" EXTRA_LINK_LIBS OpenMP::OpenMP_CXX) + if(MSVC) + target_compile_options(arrow-compute-hash-join-benchmark + PRIVATE "-openmp:experimental -openmp:llvm") + endif() endif() add_arrow_compute_test(ir_test From 08b5234a4185238d98f5eee68a24e2b61b89c27c Mon Sep 17 00:00:00 2001 From: Sasha Krassovsky Date: Sun, 12 Dec 2021 21:04:56 -0800 Subject: [PATCH 11/16] Respond to comments --- cpp/src/arrow/compute/exec/CMakeLists.txt | 4 +- .../arrow/compute/exec/hash_join_benchmark.cc | 70 ++++++++----------- .../arrow/compute/exec/hash_join_graphs.py | 9 ++- 3 files changed, 38 insertions(+), 45 deletions(-) diff --git a/cpp/src/arrow/compute/exec/CMakeLists.txt b/cpp/src/arrow/compute/exec/CMakeLists.txt index 6226507d17c..3832273593d 100644 --- a/cpp/src/arrow/compute/exec/CMakeLists.txt +++ b/cpp/src/arrow/compute/exec/CMakeLists.txt @@ -32,8 +32,8 @@ add_arrow_compute_test(util_test PREFIX "arrow-compute") add_arrow_benchmark(expression_benchmark PREFIX "arrow-compute") -find_package(OpenMP) -if(OpenMP_CXX_FOUND) +if(ARROW_BUILD_OPENMP_BENCHMARKS) + find_package(OpenMP REQUIRED) add_arrow_benchmark(hash_join_benchmark PREFIX "arrow-compute" diff --git a/cpp/src/arrow/compute/exec/hash_join_benchmark.cc b/cpp/src/arrow/compute/exec/hash_join_benchmark.cc index 9855c63124a..e23bf2abfca 100644 --- a/cpp/src/arrow/compute/exec/hash_join_benchmark.cc +++ b/cpp/src/arrow/compute/exec/hash_join_benchmark.cc @@ -23,13 +23,8 @@ #include "arrow/compute/exec/test_util.h" #include "arrow/compute/exec/util.h" #include "arrow/compute/kernels/row_encoder.h" -#include "arrow/compute/kernels/test_util.h" -#include "arrow/testing/gtest_util.h" -#include "arrow/testing/matchers.h" #include "arrow/testing/random.h" -#include "arrow/util/checked_cast.h" #include "arrow/util/make_unique.h" -#include "arrow/util/pcg_random.h" #include "arrow/util/thread_pool.h" #include @@ -93,8 +88,8 @@ class JoinBenchmark { auto r_field = field(r_name, settings.key_types[i], key_value_metadata(build_metadata)); - DCHECK_OK(l_schema_builder.AddField(std::move(l_field))); - DCHECK_OK(r_schema_builder.AddField(std::move(r_field))); + DCHECK_OK(l_schema_builder.AddField(l_field)); + DCHECK_OK(r_schema_builder.AddField(r_field)); left_keys.push_back(FieldRef(l_name)); right_keys.push_back(FieldRef(r_name)); @@ -151,11 +146,11 @@ class JoinBenchmark { auto start = std::chrono::high_resolution_clock::now(); int tid = omp_get_thread_num(); #pragma omp for nowait - for (auto batch : r_batches_.batches) - DCHECK_OK(join_->InputReceived(tid, 1 /* side */, batch)); + for (auto it = r_batches_.batches.begin(); it != r_batches_.batches.end(); ++it) + DCHECK_OK(join_->InputReceived(tid, 1 /* side */, *it)); #pragma omp for nowait - for (auto batch : l_batches_.batches) - DCHECK_OK(join_->InputReceived(tid, 0 /* side */, batch)); + for (auto it = l_batches_.batches.begin(); it != l_batches_.batches.end(); ++it) + DCHECK_OK(join_->InputReceived(tid, 0 /* side */, *it)); #pragma omp barrier @@ -171,7 +166,6 @@ class JoinBenchmark { stats_.total_nanoseconds = nanos; } - ThreadIndexer thread_indexer_; BatchesWithSchema l_batches_; BatchesWithSchema r_batches_; std::unique_ptr schema_mgr_; @@ -204,7 +198,7 @@ static void BM_HashJoinBasic_KeyTypes(benchmark::State& st, BenchmarkSettings settings; settings.num_build_batches = static_cast(st.range(0)); settings.num_probe_batches = settings.num_build_batches; - settings.key_types = key_types; + settings.key_types = std::move(key_types); HashJoinBasicBenchmarkImpl(st, settings); } @@ -218,7 +212,7 @@ static void BM_HashJoinBasic_Selectivity(benchmark::State& st, settings.num_build_batches = static_cast(st.range(1)); settings.num_probe_batches = settings.num_build_batches; - settings.key_types = key_types; + settings.key_types = std::move(key_types); HashJoinBasicBenchmarkImpl(st, settings); } @@ -309,7 +303,7 @@ BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{int64,int64,int64,int64}", BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{utf8}", {utf8()}) ->ArgNames(keytypes_argnames) ->RangeMultiplier(4) - ->Range(1, 256); + ->Range(1, 64); BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{fixed_size_binary(4)}", {fixed_size_binary(4)}) ->ArgNames(keytypes_argnames) @@ -337,66 +331,60 @@ BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{fixed_size_binary(32)}", ->Range(1, 4 * 1024); std::vector selectivity_argnames = {"Selectivity", "HashTable krows"}; +std::vector> selectivity_args = { + benchmark::CreateDenseRange(0, 100, 20), benchmark::CreateRange(1, 4 * 1024, 8)}; + BENCHMARK_CAPTURE(BM_HashJoinBasic_Selectivity, "{int32}", {int32()}) ->ArgNames(selectivity_argnames) - ->ArgsProduct({benchmark::CreateDenseRange(0, 100, 10), - benchmark::CreateRange(1, 4 * 1024, 8)}); + ->ArgsProduct(selectivity_args); BENCHMARK_CAPTURE(BM_HashJoinBasic_Selectivity, "{int64}", {int64()}) ->ArgNames(selectivity_argnames) - ->ArgsProduct({benchmark::CreateDenseRange(0, 100, 10), - benchmark::CreateRange(1, 4 * 1024, 8)}); + ->ArgsProduct(selectivity_args); // Joins on UTF8 are currently really slow, so anything above 512 doesn't finished within // a reasonable amount of time. BENCHMARK_CAPTURE(BM_HashJoinBasic_Selectivity, "{utf8}", {utf8()}) ->ArgNames(selectivity_argnames) - ->ArgsProduct({benchmark::CreateDenseRange(0, 100, 10), - benchmark::CreateRange(1, 512, 8)}); + ->ArgsProduct({benchmark::CreateDenseRange(0, 100, 20), + benchmark::CreateRange(1, 64, 8)}); BENCHMARK_CAPTURE(BM_HashJoinBasic_Selectivity, "{fixed_size_binary(16)}", {fixed_size_binary(16)}) ->ArgNames(selectivity_argnames) - ->ArgsProduct({benchmark::CreateDenseRange(0, 100, 10), - benchmark::CreateRange(1, 4 * 1024, 8)}); + ->ArgsProduct(selectivity_args); BENCHMARK_CAPTURE(BM_HashJoinBasic_Selectivity, "{fixed_size_binary(32)}", {fixed_size_binary(32)}) ->ArgNames(selectivity_argnames) - ->ArgsProduct({benchmark::CreateDenseRange(0, 100, 10), - benchmark::CreateRange(1, 4 * 1024, 8)}); + ->ArgsProduct(selectivity_args); std::vector jointype_argnames = {"Selectivity", "HashTable krows"}; +std::vector> jointype_args = { + benchmark::CreateDenseRange(0, 100, 20), benchmark::CreateRange(1, 4 * 1024, 8)}; + BENCHMARK_CAPTURE(BM_HashJoinBasic_JoinType, "Inner", JoinType::INNER) ->ArgNames(jointype_argnames) - ->ArgsProduct({benchmark::CreateDenseRange(0, 100, 10), - benchmark::CreateRange(1, 4 * 1024, 8)}); + ->ArgsProduct(jointype_args); BENCHMARK_CAPTURE(BM_HashJoinBasic_JoinType, "Left Semi", JoinType::LEFT_SEMI) ->ArgNames(jointype_argnames) - ->ArgsProduct({benchmark::CreateDenseRange(0, 100, 10), - benchmark::CreateRange(1, 4 * 1024, 8)}); + ->ArgsProduct(jointype_args); BENCHMARK_CAPTURE(BM_HashJoinBasic_JoinType, "Right Semi", JoinType::RIGHT_SEMI) ->ArgNames(jointype_argnames) - ->ArgsProduct({benchmark::CreateDenseRange(0, 100, 10), - benchmark::CreateRange(1, 4 * 1024, 8)}); + ->ArgsProduct(jointype_args); BENCHMARK_CAPTURE(BM_HashJoinBasic_JoinType, "Left Anti", JoinType::LEFT_ANTI) ->ArgNames(jointype_argnames) - ->ArgsProduct({benchmark::CreateDenseRange(0, 100, 10), - benchmark::CreateRange(1, 4 * 1024, 8)}); + ->ArgsProduct(jointype_args); BENCHMARK_CAPTURE(BM_HashJoinBasic_JoinType, "Right Anti", JoinType::RIGHT_ANTI) ->ArgNames(jointype_argnames) - ->ArgsProduct({benchmark::CreateDenseRange(0, 100, 10), - benchmark::CreateRange(1, 4 * 1024, 8)}); + ->ArgsProduct(jointype_args); BENCHMARK_CAPTURE(BM_HashJoinBasic_JoinType, "Left Outer", JoinType::LEFT_OUTER) ->ArgNames(jointype_argnames) - ->ArgsProduct({benchmark::CreateDenseRange(0, 100, 10), - benchmark::CreateRange(1, 4 * 1024, 8)}); + ->ArgsProduct(jointype_args); BENCHMARK_CAPTURE(BM_HashJoinBasic_JoinType, "Right Outer", JoinType::RIGHT_OUTER) ->ArgNames(jointype_argnames) - ->ArgsProduct({benchmark::CreateDenseRange(0, 100, 10), - benchmark::CreateRange(1, 4 * 1024, 8)}); + ->ArgsProduct(jointype_args); BENCHMARK_CAPTURE(BM_HashJoinBasic_JoinType, "Full Outer", JoinType::FULL_OUTER) ->ArgNames(jointype_argnames) - ->ArgsProduct({benchmark::CreateDenseRange(0, 100, 10), - benchmark::CreateRange(1, 4 * 1024, 8)}); + ->ArgsProduct(jointype_args); BENCHMARK(BM_HashJoinBasic_MatchesPerRow) ->ArgNames({"MatchesPerRow", "HashTable krows"}) diff --git a/cpp/src/arrow/compute/exec/hash_join_graphs.py b/cpp/src/arrow/compute/exec/hash_join_graphs.py index 5b96b06f866..9e8486cd173 100755 --- a/cpp/src/arrow/compute/exec/hash_join_graphs.py +++ b/cpp/src/arrow/compute/exec/hash_join_graphs.py @@ -74,6 +74,11 @@ def string_numeric_sort_key(val): except: return str(val) +def construct_name(argname, argvalue): + if not argname: + return argvalue + return '%s: %s' % (argname, argvalue) + def plot_1d(test, argname, ax, label=None): x_axis = test.args[argname] y_axis = test.times @@ -92,7 +97,7 @@ def plot_2d(test, sorted_argnames, ax, title): filtered_test = Test() filtered_test.times = [test.times[i] for i in indices] filtered_test.args[sorted_argnames[1]] = [test.args[sorted_argnames[1]][i] for i in indices] - plot_1d(filtered_test, sorted_argnames[1], ax, '%s: %s' % (sorted_argnames[0], line)) + plot_1d(filtered_test, sorted_argnames[1], ax, construct_name(sorted_argnames[0], line)) def plot_3d(test, sorted_argnames): assert len(sorted_argnames) == 3 @@ -109,7 +114,7 @@ def plot_3d(test, sorted_argnames): filtered_test.times = [test.times[i] for i in indices] filtered_test.args[sorted_argnames[1]] = [test.args[sorted_argnames[1]][i] for i in indices] filtered_test.args[sorted_argnames[2]] = [test.args[sorted_argnames[2]][i] for i in indices] - plot_2d(filtered_test, sorted_argnames[1:], ax, '%s: %s' % (sorted_argnames[0], graph)) + plot_2d(filtered_test, sorted_argnames[1:], ax, construct_name(sorted_argnames[0], graph)) def main(): if len(sys.argv) != 2: From 5e1e329796542a369f193022ae4cccad8adbd991 Mon Sep 17 00:00:00 2001 From: Sasha Krassovsky Date: Tue, 14 Dec 2021 12:13:23 -0800 Subject: [PATCH 12/16] Switch to rows/sec, make script a bit more intelligent with categorical data --- cpp/CMakePresets.json | 1 + cpp/cmake_modules/DefineOptions.cmake | 3 +++ .../arrow/compute/exec/hash_join_benchmark.cc | 19 +++++---------- .../arrow/compute/exec/hash_join_graphs.py | 24 ++++++++++++++----- 4 files changed, 28 insertions(+), 19 deletions(-) diff --git a/cpp/CMakePresets.json b/cpp/CMakePresets.json index ebb3d86283b..39a27d6c7e0 100644 --- a/cpp/CMakePresets.json +++ b/cpp/CMakePresets.json @@ -40,6 +40,7 @@ "cacheVariables": { "ARROW_BUILD_BENCHMARKS": "ON", "ARROW_BUILD_BENCHMARKS_REFERENCE": "ON", + "ARROW_BUILD_OPENMP_BENCHMARKS": "ON", "CMAKE_BUILD_TYPE": "RelWithDebInfo" } }, diff --git a/cpp/cmake_modules/DefineOptions.cmake b/cpp/cmake_modules/DefineOptions.cmake index 2afbdab4a40..74a477cc064 100644 --- a/cpp/cmake_modules/DefineOptions.cmake +++ b/cpp/cmake_modules/DefineOptions.cmake @@ -168,6 +168,9 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}") define_option(ARROW_BUILD_BENCHMARKS_REFERENCE "Build the Arrow micro reference benchmarks" OFF) + define_option(ARROW_BUILD_OPENMP_BENCHMARKS + "Build the Arrow benchmarks that rely on OpenMP" OFF) + if(ARROW_BUILD_SHARED) set(ARROW_TEST_LINKAGE_DEFAULT "shared") else() diff --git a/cpp/src/arrow/compute/exec/hash_join_benchmark.cc b/cpp/src/arrow/compute/exec/hash_join_benchmark.cc index e23bf2abfca..2a3546fca6b 100644 --- a/cpp/src/arrow/compute/exec/hash_join_benchmark.cc +++ b/cpp/src/arrow/compute/exec/hash_join_benchmark.cc @@ -140,10 +140,8 @@ class JoinBenchmark { } void RunJoin() { - double nanos = 0; -#pragma omp parallel reduction(+ : nanos) +#pragma omp parallel { - auto start = std::chrono::high_resolution_clock::now(); int tid = omp_get_thread_num(); #pragma omp for nowait for (auto it = r_batches_.batches.begin(); it != r_batches_.batches.end(); ++it) @@ -159,11 +157,7 @@ class JoinBenchmark { #pragma omp single nowait { DCHECK_OK(join_->InputFinished(tid, /* side */ 0)); } - std::chrono::duration elapsed = - std::chrono::high_resolution_clock::now() - start; - nanos += elapsed.count(); } - stats_.total_nanoseconds = nanos; } BatchesWithSchema l_batches_; @@ -173,7 +167,6 @@ class JoinBenchmark { std::unique_ptr ctx_; struct { - double total_nanoseconds; uint64_t num_probe_rows; } stats_; }; @@ -181,14 +174,12 @@ class JoinBenchmark { static void HashJoinBasicBenchmarkImpl(benchmark::State& st, BenchmarkSettings& settings) { JoinBenchmark bm(settings); - double total_nanos = 0; uint64_t total_rows = 0; for (auto _ : st) { bm.RunJoin(); - total_nanos += bm.stats_.total_nanoseconds; total_rows += bm.stats_.num_probe_rows; } - st.counters["ns/row"] = total_nanos / total_rows; + st.counters["rows/sec"] = benchmark::Counter(total_rows, benchmark::Counter::kIsRate); } template @@ -399,12 +390,14 @@ BENCHMARK(BM_HashJoinBasic_PayloadSize) BENCHMARK(BM_HashJoinBasic_ProbeParallelism) ->ArgNames({"Threads", "HashTable krows"}) ->ArgsProduct({benchmark::CreateDenseRange(1, 16, 1), - benchmark::CreateRange(1, 4 * 1024, 8)}); + benchmark::CreateRange(1, 4 * 1024, 8)}) + ->MeasureProcessCPUTime(); BENCHMARK(BM_HashJoinBasic_BuildParallelism) ->ArgNames({"Threads", "HashTable krows"}) ->ArgsProduct({benchmark::CreateDenseRange(1, 16, 1), - benchmark::CreateRange(1, 4 * 1024, 8)}); + benchmark::CreateRange(1, 4 * 1024, 8)}) + ->MeasureProcessCPUTime(); BENCHMARK(BM_HashJoinBasic_NullPercentage) ->ArgNames({"NullPercentage"}) diff --git a/cpp/src/arrow/compute/exec/hash_join_graphs.py b/cpp/src/arrow/compute/exec/hash_join_graphs.py index 9e8486cd173..11008e47568 100755 --- a/cpp/src/arrow/compute/exec/hash_join_graphs.py +++ b/cpp/src/arrow/compute/exec/hash_join_graphs.py @@ -31,8 +31,16 @@ def __init__(self): self.args = {} def get_argnames_by_cardinality_increasing(self): - key = lambda x: len(set(self.args[x])) - return sorted(self.args.keys(), key=key) + key_cardinality = lambda x: len(set(self.args[x])) + def key_strings_first(x): + try: + as_float = float(self.args[x][0]) + return True + except: + return False + by_cardinality = sorted(self.args.keys(), key=key_cardinality) + strings_first = sorted(by_cardinality, key=key_strings_first) + return strings_first def organize_tests(filename): tests = {} @@ -41,12 +49,15 @@ def organize_tests(filename): for idx, row in enumerate(df['benchmarks']): test_name = row['name'] test_name_split = test_name.split('/') + if test_name_split[-1] == 'process_time': + test_name_split = test_name_split[:-1] + base_name = test_name_split[0] args = test_name_split[1:] if base_name not in tests.keys(): tests[base_name] = Test() - tests[base_name].times.append(row['ns/row']) + tests[base_name].times.append(row['rows/sec']) if len(args) > 3: raise('Test can have at most 3 parameters! Found', len(args), 'in test', test_name) @@ -70,7 +81,7 @@ def organize_tests(filename): def string_numeric_sort_key(val): try: - return int(val) + return float(val) except: return str(val) @@ -85,7 +96,7 @@ def plot_1d(test, argname, ax, label=None): ax.plot(x_axis, y_axis, label=label) ax.legend() ax.set_xlabel(argname) - ax.set_ylabel('ns/row') + ax.set_ylabel('rows/sec') def plot_2d(test, sorted_argnames, ax, title): assert len(sorted_argnames) == 2 @@ -119,7 +130,7 @@ def plot_3d(test, sorted_argnames): def main(): if len(sys.argv) != 2: print('Usage: hash_join_graphs.py .json') - print('This script expects there to be a counter called ns/row as a field of every test in the JSON file.') + print('This script expects there to be a counter called rows/sec as a field of every test in the JSON file.') return tests = organize_tests(sys.argv[1]) @@ -140,6 +151,7 @@ def main(): ax = plt.subplot() plot_2d(test, sorted_argnames, ax, test_name) else: + fig.suptitle(test_name) ax = plt.subplot() plot_1d(test, sorted_argnames[0], ax) fig.set_size_inches(16, 9) From ce9cec0a7de8fbc78371b3b1064e4b574b83c1a0 Mon Sep 17 00:00:00 2001 From: Sasha Krassovsky Date: Wed, 15 Dec 2021 13:42:41 -0800 Subject: [PATCH 13/16] Respond to some more comments --- .../arrow/compute/exec/hash_join_benchmark.cc | 51 ++++++++----------- .../arrow/compute/exec/hash_join_graphs.py | 9 +++- 2 files changed, 29 insertions(+), 31 deletions(-) diff --git a/cpp/src/arrow/compute/exec/hash_join_benchmark.cc b/cpp/src/arrow/compute/exec/hash_join_benchmark.cc index 2a3546fca6b..a7668793e17 100644 --- a/cpp/src/arrow/compute/exec/hash_join_benchmark.cc +++ b/cpp/src/arrow/compute/exec/hash_join_benchmark.cc @@ -268,29 +268,26 @@ static void BM_HashJoinBasic_NullPercentage(benchmark::State& st) { HashJoinBasicBenchmarkImpl(st, settings); } +std::vector hashtable_krows = benchmark::CreateRange(1, 4096, 8); + std::vector keytypes_argnames = {"Hashtable krows"}; BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{int32}", {int32()}) ->ArgNames(keytypes_argnames) - ->RangeMultiplier(4) - ->Range(1, 4 * 1024); + ->ArgsProduct({hashtable_krows}); BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{int64}", {int64()}) ->ArgNames(keytypes_argnames) - ->RangeMultiplier(4) - ->Range(1, 4 * 1024); + ->ArgsProduct({hashtable_krows}); BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{int64,int64}", {int64(), int64()}) ->ArgNames(keytypes_argnames) - ->RangeMultiplier(4) - ->Range(1, 4 * 1024); + ->ArgsProduct({hashtable_krows}); BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{int64,int64,int64}", {int64(), int64(), int64()}) ->ArgNames(keytypes_argnames) - ->RangeMultiplier(4) - ->Range(1, 4 * 1024); + ->ArgsProduct({hashtable_krows}); BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{int64,int64,int64,int64}", {int64(), int64(), int64(), int64()}) ->ArgNames(keytypes_argnames) - ->RangeMultiplier(4) - ->Range(1, 4 * 1024); + ->ArgsProduct({hashtable_krows}); BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{utf8}", {utf8()}) ->ArgNames(keytypes_argnames) ->RangeMultiplier(4) @@ -298,32 +295,28 @@ BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{utf8}", {utf8()}) BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{fixed_size_binary(4)}", {fixed_size_binary(4)}) ->ArgNames(keytypes_argnames) - ->RangeMultiplier(4) - ->Range(1, 4 * 1024); + ->ArgsProduct({hashtable_krows}); BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{fixed_size_binary(8)}", {fixed_size_binary(8)}) ->ArgNames(keytypes_argnames) ->RangeMultiplier(4) - ->Range(1, 4 * 1024); + ->ArgsProduct({hashtable_krows}); BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{fixed_size_binary(16)}", {fixed_size_binary(16)}) ->ArgNames(keytypes_argnames) - ->RangeMultiplier(4) - ->Range(1, 4 * 1024); + ->ArgsProduct({hashtable_krows}); BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{fixed_size_binary(24)}", {fixed_size_binary(24)}) ->ArgNames(keytypes_argnames) - ->RangeMultiplier(4) - ->Range(1, 4 * 1024); + ->ArgsProduct({hashtable_krows}); BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{fixed_size_binary(32)}", {fixed_size_binary(32)}) ->ArgNames(keytypes_argnames) - ->RangeMultiplier(4) - ->Range(1, 4 * 1024); + ->ArgsProduct({hashtable_krows}); std::vector selectivity_argnames = {"Selectivity", "HashTable krows"}; std::vector> selectivity_args = { - benchmark::CreateDenseRange(0, 100, 20), benchmark::CreateRange(1, 4 * 1024, 8)}; + benchmark::CreateDenseRange(0, 100, 20), hashtable_krows}; BENCHMARK_CAPTURE(BM_HashJoinBasic_Selectivity, "{int32}", {int32()}) ->ArgNames(selectivity_argnames) @@ -332,7 +325,7 @@ BENCHMARK_CAPTURE(BM_HashJoinBasic_Selectivity, "{int64}", {int64()}) ->ArgNames(selectivity_argnames) ->ArgsProduct(selectivity_args); -// Joins on UTF8 are currently really slow, so anything above 512 doesn't finished within +// Joins on UTF8 are currently really slow, so anything above 64 doesn't finished within // a reasonable amount of time. BENCHMARK_CAPTURE(BM_HashJoinBasic_Selectivity, "{utf8}", {utf8()}) ->ArgNames(selectivity_argnames) @@ -350,7 +343,7 @@ BENCHMARK_CAPTURE(BM_HashJoinBasic_Selectivity, "{fixed_size_binary(32)}", std::vector jointype_argnames = {"Selectivity", "HashTable krows"}; std::vector> jointype_args = { - benchmark::CreateDenseRange(0, 100, 20), benchmark::CreateRange(1, 4 * 1024, 8)}; + benchmark::CreateDenseRange(0, 100, 20), hashtable_krows}; BENCHMARK_CAPTURE(BM_HashJoinBasic_JoinType, "Inner", JoinType::INNER) ->ArgNames(jointype_argnames) @@ -378,29 +371,29 @@ BENCHMARK_CAPTURE(BM_HashJoinBasic_JoinType, "Full Outer", JoinType::FULL_OUTER) ->ArgsProduct(jointype_args); BENCHMARK(BM_HashJoinBasic_MatchesPerRow) - ->ArgNames({"MatchesPerRow", "HashTable krows"}) + ->ArgNames({"Matches Per Row", "HashTable krows"}) ->ArgsProduct({benchmark::CreateRange(1, 16, 2), - benchmark::CreateRange(1, 4 * 1024, 8)}); + hashtable_krows}); BENCHMARK(BM_HashJoinBasic_PayloadSize) - ->ArgNames({"Payload Size", "MatchesPerRow", "HashTable krows"}) + ->ArgNames({"Payload Size", "Matches Per Row", "HashTable krows"}) ->ArgsProduct({benchmark::CreateRange(1, 128, 4), benchmark::CreateRange(1, 16, 2), - benchmark::CreateRange(1, 4 * 1024, 8)}); + hashtable_krows}); BENCHMARK(BM_HashJoinBasic_ProbeParallelism) ->ArgNames({"Threads", "HashTable krows"}) ->ArgsProduct({benchmark::CreateDenseRange(1, 16, 1), - benchmark::CreateRange(1, 4 * 1024, 8)}) + hashtable_krows}) ->MeasureProcessCPUTime(); BENCHMARK(BM_HashJoinBasic_BuildParallelism) ->ArgNames({"Threads", "HashTable krows"}) ->ArgsProduct({benchmark::CreateDenseRange(1, 16, 1), - benchmark::CreateRange(1, 4 * 1024, 8)}) + hashtable_krows}) ->MeasureProcessCPUTime(); BENCHMARK(BM_HashJoinBasic_NullPercentage) - ->ArgNames({"NullPercentage"}) + ->ArgNames({"Null Percentage"}) ->DenseRange(0, 100, 10); } // namespace compute } // namespace arrow diff --git a/cpp/src/arrow/compute/exec/hash_join_graphs.py b/cpp/src/arrow/compute/exec/hash_join_graphs.py index 11008e47568..69c6ffefb1f 100755 --- a/cpp/src/arrow/compute/exec/hash_join_graphs.py +++ b/cpp/src/arrow/compute/exec/hash_join_graphs.py @@ -17,6 +17,11 @@ # specific language governing permissions and limitations # under the License. +''' +This script takes a JSON file from a google benchmark run that measures rows/sec and generates graphs +for each benchmark. +''' + import math import sys import json @@ -68,11 +73,11 @@ def organize_tests(filename): for arg in args: arg_name = '' - arg_value = arg + arg_value = arg.strip('\"') if ':' in arg: arg_split = arg.split(':') arg_name = arg_split[0] - arg_value = arg_split[1] + arg_value = arg_split[1].strip('\"') if arg_name not in tests[base_name].args.keys(): tests[base_name].args[arg_name] = [arg_value] else: From 4f3bf2d7d310529c861a4fedf70c8296888dda62 Mon Sep 17 00:00:00 2001 From: Sasha Krassovsky Date: Wed, 15 Dec 2021 13:53:38 -0800 Subject: [PATCH 14/16] clang-format --- cpp/src/arrow/compute/exec/hash_join_benchmark.cc | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/cpp/src/arrow/compute/exec/hash_join_benchmark.cc b/cpp/src/arrow/compute/exec/hash_join_benchmark.cc index a7668793e17..63dd61b3b55 100644 --- a/cpp/src/arrow/compute/exec/hash_join_benchmark.cc +++ b/cpp/src/arrow/compute/exec/hash_join_benchmark.cc @@ -372,8 +372,7 @@ BENCHMARK_CAPTURE(BM_HashJoinBasic_JoinType, "Full Outer", JoinType::FULL_OUTER) BENCHMARK(BM_HashJoinBasic_MatchesPerRow) ->ArgNames({"Matches Per Row", "HashTable krows"}) - ->ArgsProduct({benchmark::CreateRange(1, 16, 2), - hashtable_krows}); + ->ArgsProduct({benchmark::CreateRange(1, 16, 2), hashtable_krows}); BENCHMARK(BM_HashJoinBasic_PayloadSize) ->ArgNames({"Payload Size", "Matches Per Row", "HashTable krows"}) @@ -382,14 +381,12 @@ BENCHMARK(BM_HashJoinBasic_PayloadSize) BENCHMARK(BM_HashJoinBasic_ProbeParallelism) ->ArgNames({"Threads", "HashTable krows"}) - ->ArgsProduct({benchmark::CreateDenseRange(1, 16, 1), - hashtable_krows}) + ->ArgsProduct({benchmark::CreateDenseRange(1, 16, 1), hashtable_krows}) ->MeasureProcessCPUTime(); BENCHMARK(BM_HashJoinBasic_BuildParallelism) ->ArgNames({"Threads", "HashTable krows"}) - ->ArgsProduct({benchmark::CreateDenseRange(1, 16, 1), - hashtable_krows}) + ->ArgsProduct({benchmark::CreateDenseRange(1, 16, 1), hashtable_krows}) ->MeasureProcessCPUTime(); BENCHMARK(BM_HashJoinBasic_NullPercentage) From f182ddf16643bc438278c284307119ebe00d9ee9 Mon Sep 17 00:00:00 2001 From: Sasha Krassovsky Date: Tue, 4 Jan 2022 17:11:59 -0800 Subject: [PATCH 15/16] Add ARROW_BUILD_DETAILED_BENCHMARKS flag --- cpp/CMakePresets.json | 1 + cpp/cmake_modules/BuildUtils.cmake | 4 ++ cpp/cmake_modules/DefineOptions.cmake | 3 ++ .../arrow/compute/exec/hash_join_benchmark.cc | 39 ++++++++++++++----- 4 files changed, 38 insertions(+), 9 deletions(-) diff --git a/cpp/CMakePresets.json b/cpp/CMakePresets.json index 39a27d6c7e0..fa04be9c3ad 100644 --- a/cpp/CMakePresets.json +++ b/cpp/CMakePresets.json @@ -41,6 +41,7 @@ "ARROW_BUILD_BENCHMARKS": "ON", "ARROW_BUILD_BENCHMARKS_REFERENCE": "ON", "ARROW_BUILD_OPENMP_BENCHMARKS": "ON", + "ARROW_BUILD_DETAILED_BENCHMARKS": "OFF", "CMAKE_BUILD_TYPE": "RelWithDebInfo" } }, diff --git a/cpp/cmake_modules/BuildUtils.cmake b/cpp/cmake_modules/BuildUtils.cmake index 294b6ae0ff8..391c43e0ac5 100644 --- a/cpp/cmake_modules/BuildUtils.cmake +++ b/cpp/cmake_modules/BuildUtils.cmake @@ -609,6 +609,10 @@ function(ADD_BENCHMARK REL_BENCHMARK_NAME) set(ARG_LABELS benchmark) endif() + if(ARROW_BUILD_DETAILED_BENCHMARKS) + target_compile_definitions(${BENCHMARK_NAME} PRIVATE ARROW_BUILD_DETAILED_BENCHMARKS) + endif() + add_test(${BENCHMARK_NAME} ${BUILD_SUPPORT_DIR}/run-test.sh ${CMAKE_BINARY_DIR} diff --git a/cpp/cmake_modules/DefineOptions.cmake b/cpp/cmake_modules/DefineOptions.cmake index 74a477cc064..0a43ec18f60 100644 --- a/cpp/cmake_modules/DefineOptions.cmake +++ b/cpp/cmake_modules/DefineOptions.cmake @@ -171,6 +171,9 @@ if("${CMAKE_SOURCE_DIR}" STREQUAL "${CMAKE_CURRENT_SOURCE_DIR}") define_option(ARROW_BUILD_OPENMP_BENCHMARKS "Build the Arrow benchmarks that rely on OpenMP" OFF) + define_option(ARROW_BUILD_DETAILED_BENCHMARKS + "Build benchmarks that do a longer exploration of performance" OFF) + if(ARROW_BUILD_SHARED) set(ARROW_TEST_LINKAGE_DEFAULT "shared") else() diff --git a/cpp/src/arrow/compute/exec/hash_join_benchmark.cc b/cpp/src/arrow/compute/exec/hash_join_benchmark.cc index 63dd61b3b55..5cfe47f6cf7 100644 --- a/cpp/src/arrow/compute/exec/hash_join_benchmark.cc +++ b/cpp/src/arrow/compute/exec/hash_join_benchmark.cc @@ -194,6 +194,16 @@ static void BM_HashJoinBasic_KeyTypes(benchmark::State& st, HashJoinBasicBenchmarkImpl(st, settings); } +static void BM_HashJoinBasic_ProbeParallelism(benchmark::State& st) { + BenchmarkSettings settings; + settings.num_threads = static_cast(st.range(0)); + settings.num_build_batches = static_cast(st.range(1)); + settings.num_probe_batches = settings.num_build_batches * 8; + + HashJoinBasicBenchmarkImpl(st, settings); +} + +#ifdef ARROW_BUILD_DETAILED_BENCHMARKS // Necessary to suppress warnings template static void BM_HashJoinBasic_Selectivity(benchmark::State& st, std::vector> key_types, @@ -243,15 +253,6 @@ static void BM_HashJoinBasic_PayloadSize(benchmark::State& st) { HashJoinBasicBenchmarkImpl(st, settings); } -static void BM_HashJoinBasic_ProbeParallelism(benchmark::State& st) { - BenchmarkSettings settings; - settings.num_threads = static_cast(st.range(0)); - settings.num_build_batches = static_cast(st.range(1)); - settings.num_probe_batches = settings.num_build_batches * 8; - - HashJoinBasicBenchmarkImpl(st, settings); -} - static void BM_HashJoinBasic_BuildParallelism(benchmark::State& st) { BenchmarkSettings settings; settings.num_threads = static_cast(st.range(0)); @@ -267,10 +268,12 @@ static void BM_HashJoinBasic_NullPercentage(benchmark::State& st) { HashJoinBasicBenchmarkImpl(st, settings); } +#endif std::vector hashtable_krows = benchmark::CreateRange(1, 4096, 8); std::vector keytypes_argnames = {"Hashtable krows"}; +#ifdef ARROW_BUILD_DETAILED_BENCHMARKS BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{int32}", {int32()}) ->ArgNames(keytypes_argnames) ->ArgsProduct({hashtable_krows}); @@ -392,5 +395,23 @@ BENCHMARK(BM_HashJoinBasic_BuildParallelism) BENCHMARK(BM_HashJoinBasic_NullPercentage) ->ArgNames({"Null Percentage"}) ->DenseRange(0, 100, 10); +#else + +BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{int32}", {int32()}) + ->ArgNames(keytypes_argnames) + ->ArgsProduct({hashtable_krows}); + +BENCHMARK_CAPTURE(BM_HashJoinBasic_KeyTypes, "{utf8}", {utf8()}) + ->ArgNames(keytypes_argnames) + ->RangeMultiplier(4) + ->Range(1, 64); + +BENCHMARK(BM_HashJoinBasic_ProbeParallelism) + ->ArgNames({"Threads", "HashTable krows"}) + ->ArgsProduct({benchmark::CreateDenseRange(1, 16, 1), hashtable_krows}) + ->MeasureProcessCPUTime(); + +#endif // ARROW_BUILD_DETAILED_BENCHMARKS + } // namespace compute } // namespace arrow From ddab7f5c56f356a5d711d474e0f8fc6b88148912 Mon Sep 17 00:00:00 2001 From: Sasha Krassovsky Date: Wed, 12 Jan 2022 12:32:42 -0800 Subject: [PATCH 16/16] Respond to Weston comments, switch to log scale when prudent --- .../arrow/compute/exec/hash_join_benchmark.cc | 14 +++---- .../arrow/compute/exec/hash_join_graphs.py | 40 +++++++++++++++---- 2 files changed, 40 insertions(+), 14 deletions(-) diff --git a/cpp/src/arrow/compute/exec/hash_join_benchmark.cc b/cpp/src/arrow/compute/exec/hash_join_benchmark.cc index 5cfe47f6cf7..3d4271b6cb9 100644 --- a/cpp/src/arrow/compute/exec/hash_join_benchmark.cc +++ b/cpp/src/arrow/compute/exec/hash_join_benchmark.cc @@ -134,9 +134,9 @@ class JoinBenchmark { }; DCHECK_OK(join_->Init( - ctx_.get(), settings.join_type, !is_parallel /* use_sync_execution*/, - settings.num_threads, schema_mgr_.get(), {JoinKeyCmp::EQ}, std::move(filter), - [](ExecBatch) {}, [](int64_t x) {}, schedule_callback)); + ctx_.get(), settings.join_type, !is_parallel, settings.num_threads, + schema_mgr_.get(), {JoinKeyCmp::EQ}, std::move(filter), [](ExecBatch) {}, + [](int64_t x) {}, schedule_callback)); } void RunJoin() { @@ -145,18 +145,18 @@ class JoinBenchmark { int tid = omp_get_thread_num(); #pragma omp for nowait for (auto it = r_batches_.batches.begin(); it != r_batches_.batches.end(); ++it) - DCHECK_OK(join_->InputReceived(tid, 1 /* side */, *it)); + DCHECK_OK(join_->InputReceived(tid, /*side=*/1, *it)); #pragma omp for nowait for (auto it = l_batches_.batches.begin(); it != l_batches_.batches.end(); ++it) - DCHECK_OK(join_->InputReceived(tid, 0 /* side */, *it)); + DCHECK_OK(join_->InputReceived(tid, /*side=*/0, *it)); #pragma omp barrier #pragma omp single nowait - { DCHECK_OK(join_->InputFinished(tid, /* side */ 1)); } + { DCHECK_OK(join_->InputFinished(tid, /*side=*/1)); } #pragma omp single nowait - { DCHECK_OK(join_->InputFinished(tid, /* side */ 0)); } + { DCHECK_OK(join_->InputFinished(tid, /*side=*/0)); } } } diff --git a/cpp/src/arrow/compute/exec/hash_join_graphs.py b/cpp/src/arrow/compute/exec/hash_join_graphs.py index 69c6ffefb1f..ff7ab63187d 100755 --- a/cpp/src/arrow/compute/exec/hash_join_graphs.py +++ b/cpp/src/arrow/compute/exec/hash_join_graphs.py @@ -20,6 +20,17 @@ ''' This script takes a JSON file from a google benchmark run that measures rows/sec and generates graphs for each benchmark. + +Example usage: +1. Generate Benchmark Data: +release/arrow-compute-hash-join-benchmark \ + --benchmark_counters_tabular=true \ + --benchmark_format=console \ + --benchmark_out=benchmark_data.json \ + --benchmark_out_format=json + +2. Visualize: +../src/arrow/compute/exec/hash_join_graphs.py benchmarks_data.json ''' import math @@ -30,6 +41,22 @@ sns.set() +def try_as_numeric(val): + try: + return float(val) + except: + return str(val) + +def is_multiplicative(lst): + if len(lst) < 3: + return False + + if (lst[2] - lst[1]) == (lst[1] - lst[0]): + return False + + assert (lst[2] / lst[1]) == (lst[1] / lst[0]) + return True + class Test: def __init__(self): self.times = [] @@ -78,18 +105,14 @@ def organize_tests(filename): arg_split = arg.split(':') arg_name = arg_split[0] arg_value = arg_split[1].strip('\"') + + arg_value = try_as_numeric(arg_value) if arg_name not in tests[base_name].args.keys(): tests[base_name].args[arg_name] = [arg_value] else: tests[base_name].args[arg_name].append(arg_value) return tests; -def string_numeric_sort_key(val): - try: - return float(val) - except: - return str(val) - def construct_name(argname, argvalue): if not argname: return argvalue @@ -99,6 +122,9 @@ def plot_1d(test, argname, ax, label=None): x_axis = test.args[argname] y_axis = test.times ax.plot(x_axis, y_axis, label=label) + if is_multiplicative(x_axis): + ax.set_xscale('log', base=(x_axis[1] / x_axis[0])) + ax.xaxis.set_major_formatter(plt.ScalarFormatter()) ax.legend() ax.set_xlabel(argname) ax.set_ylabel('rows/sec') @@ -107,7 +133,7 @@ def plot_2d(test, sorted_argnames, ax, title): assert len(sorted_argnames) == 2 lines = set(test.args[sorted_argnames[0]]) ax.set_title(title) - for line in sorted(lines, key=string_numeric_sort_key): + for line in sorted(lines, key=try_as_numeric): indices = range(len(test.times)) indices = list(filter(lambda i: test.args[sorted_argnames[0]][i] == line, indices)) filtered_test = Test()