From 77fc0149e6860fa5bf3ec20fc382a89b363d69ee Mon Sep 17 00:00:00 2001 From: Will Jones Date: Fri, 8 Apr 2022 11:16:38 -0700 Subject: [PATCH 1/6] Increase thread limit --- cpp/src/arrow/compute/exec/hash_join.cc | 3 ++- cpp/src/arrow/compute/exec/util.cc | 2 +- r/tests/testthat/test-dplyr-join.R | 28 +++++++++++++++---------- 3 files changed, 20 insertions(+), 13 deletions(-) diff --git a/cpp/src/arrow/compute/exec/hash_join.cc b/cpp/src/arrow/compute/exec/hash_join.cc index 5a9afaa5bdf..c87c0969e0a 100644 --- a/cpp/src/arrow/compute/exec/hash_join.cc +++ b/cpp/src/arrow/compute/exec/hash_join.cc @@ -103,7 +103,8 @@ class HashJoinBasicImpl : public HashJoinImpl { filter_ = std::move(filter); output_batch_callback_ = std::move(output_batch_callback); finished_callback_ = std::move(finished_callback); - local_states_.resize(num_threads + 1); // +1 for calling thread + worker threads + // Adding ten, since each side of join might have an IO thread being called from. + local_states_.resize(num_threads + 10); for (size_t i = 0; i < local_states_.size(); ++i) { local_states_[i].is_initialized = false; local_states_[i].is_has_match_initialized = false; diff --git a/cpp/src/arrow/compute/exec/util.cc b/cpp/src/arrow/compute/exec/util.cc index 6e26927e40c..ce66f255f68 100644 --- a/cpp/src/arrow/compute/exec/util.cc +++ b/cpp/src/arrow/compute/exec/util.cc @@ -321,7 +321,7 @@ size_t ThreadIndexer::operator()() { } size_t ThreadIndexer::Capacity() { - static size_t max_size = arrow::internal::ThreadPool::DefaultCapacity() + 1; + static size_t max_size = arrow::internal::ThreadPool::DefaultCapacity() + 10; return max_size; } diff --git a/r/tests/testthat/test-dplyr-join.R b/r/tests/testthat/test-dplyr-join.R index 6f4adcb58ac..876edad0c08 100644 --- a/r/tests/testthat/test-dplyr-join.R +++ b/r/tests/testthat/test-dplyr-join.R @@ -310,27 +310,33 @@ test_that("summarize and join", { expect_equal(expected_col_names, res_col_names) }) -test_that("arrow dplyr query can join with tibble", { - # ARROW-14908 +test_that("arrow dplyr query can join two datasets", { + # ARROW-14908 and ARROW-15718 + skip_if_not_available("dataset") # By default, snappy encoding will be used, and # Snappy has a UBSan issue: https://github.com/google/snappy/pull/148 skip_on_linux_devel() - skip_if_not_available("dataset") dir_out <- tempdir() - write_dataset(iris, file.path(dir_out, "iris")) - species_codes <- data.frame( - Species = c("setosa", "versicolor", "virginica"), - code = c("SET", "VER", "VIR") - ) + + quakes %>% + select(stations, lat, long) %>% + group_by(stations) %>% + write_dataset(file.path(dir_out, "ds1")) + + quakes %>% + select(stations, mag, depth) %>% + group_by(stations) %>% + write_dataset(file.path(dir_out, "ds2")) withr::with_options( list(arrow.use_threads = FALSE), { - iris <- open_dataset(file.path(dir_out, "iris")) - res <- left_join(iris, species_codes) %>% collect() # We should not segfault here. - expect_equal(nrow(res), 150) + res <- open_dataset(file.path(dir_out, "ds1")) %>% + left_join(open_dataset(file.path(dir_out, "ds2")), by = "stations") %>% + collect() # We should not segfault here. + expect_equal(nrow(res), 21872) } ) }) From 33e35670abecf3045b5ed698e1255cd766b5299a Mon Sep 17 00:00:00 2001 From: Will Jones Date: Mon, 11 Apr 2022 11:51:27 -0700 Subject: [PATCH 2/6] use less arbitrary value --- cpp/src/arrow/compute/exec/hash_join.cc | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/compute/exec/hash_join.cc b/cpp/src/arrow/compute/exec/hash_join.cc index c87c0969e0a..83669f8a258 100644 --- a/cpp/src/arrow/compute/exec/hash_join.cc +++ b/cpp/src/arrow/compute/exec/hash_join.cc @@ -103,8 +103,9 @@ class HashJoinBasicImpl : public HashJoinImpl { filter_ = std::move(filter); output_batch_callback_ = std::move(output_batch_callback); finished_callback_ = std::move(finished_callback); - // Adding ten, since each side of join might have an IO thread being called from. - local_states_.resize(num_threads + 10); + // TODO(ARROW-15732) + // Each side of join might have an IO thread being called from. + local_states_.resize(GetCpuThreadPoolCapacity() + io::GetIOThreadPoolCapacity() + 1); for (size_t i = 0; i < local_states_.size(); ++i) { local_states_[i].is_initialized = false; local_states_[i].is_has_match_initialized = false; From 40158b599cc1731dda1155140fd1701147352feb Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 12 Apr 2022 14:22:19 -0700 Subject: [PATCH 3/6] Format --- r/tests/testthat/test-dplyr-join.R | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/r/tests/testthat/test-dplyr-join.R b/r/tests/testthat/test-dplyr-join.R index 876edad0c08..9d8e22596a6 100644 --- a/r/tests/testthat/test-dplyr-join.R +++ b/r/tests/testthat/test-dplyr-join.R @@ -320,15 +320,15 @@ test_that("arrow dplyr query can join two datasets", { dir_out <- tempdir() - quakes %>% - select(stations, lat, long) %>% - group_by(stations) %>% - write_dataset(file.path(dir_out, "ds1")) - - quakes %>% - select(stations, mag, depth) %>% - group_by(stations) %>% - write_dataset(file.path(dir_out, "ds2")) + quakes %>% + select(stations, lat, long) %>% + group_by(stations) %>% + write_dataset(file.path(dir_out, "ds1")) + + quakes %>% + select(stations, mag, depth) %>% + group_by(stations) %>% + write_dataset(file.path(dir_out, "ds2")) withr::with_options( list(arrow.use_threads = FALSE), From 8e46cb1d7b48dababdf41924b238a532a12cffc1 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 13 Apr 2022 14:17:34 -0700 Subject: [PATCH 4/6] Modify other point --- cpp/src/arrow/compute/exec/util.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/compute/exec/util.cc b/cpp/src/arrow/compute/exec/util.cc index ce66f255f68..ef56e6128a3 100644 --- a/cpp/src/arrow/compute/exec/util.cc +++ b/cpp/src/arrow/compute/exec/util.cc @@ -321,7 +321,7 @@ size_t ThreadIndexer::operator()() { } size_t ThreadIndexer::Capacity() { - static size_t max_size = arrow::internal::ThreadPool::DefaultCapacity() + 10; + static size_t max_size = GetCpuThreadPoolCapacity() + io::GetIOThreadPoolCapacity() + 1; return max_size; } From 6d97b8d02d2802f14fef4d3c0f490f764206f094 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Fri, 15 Apr 2022 14:38:09 -0700 Subject: [PATCH 5/6] Also increaes capacity of dict probe --- cpp/src/arrow/compute/exec/hash_join.cc | 4 ++-- cpp/src/arrow/compute/exec/hash_join_dict.cc | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/compute/exec/hash_join.cc b/cpp/src/arrow/compute/exec/hash_join.cc index 83669f8a258..2612c78ed6f 100644 --- a/cpp/src/arrow/compute/exec/hash_join.cc +++ b/cpp/src/arrow/compute/exec/hash_join.cc @@ -100,7 +100,7 @@ class HashJoinBasicImpl : public HashJoinImpl { num_threads_ = num_threads; schema_mgr_ = schema_mgr; key_cmp_ = std::move(key_cmp); - filter_ = std::move(filter); + filter_ = std::move(filter); output_batch_callback_ = std::move(output_batch_callback); finished_callback_ = std::move(finished_callback); // TODO(ARROW-15732) @@ -110,7 +110,7 @@ class HashJoinBasicImpl : public HashJoinImpl { local_states_[i].is_initialized = false; local_states_[i].is_has_match_initialized = false; } - dict_probe_.Init(num_threads); + dict_probe_.Init(GetCpuThreadPoolCapacity() + io::GetIOThreadPoolCapacity() + 1); has_hash_table_ = false; num_batches_produced_.store(0); diff --git a/cpp/src/arrow/compute/exec/hash_join_dict.cc b/cpp/src/arrow/compute/exec/hash_join_dict.cc index ac1fbbaa3df..63d7d1143c9 100644 --- a/cpp/src/arrow/compute/exec/hash_join_dict.cc +++ b/cpp/src/arrow/compute/exec/hash_join_dict.cc @@ -566,7 +566,7 @@ Status HashJoinDictBuildMulti::PostDecode( } void HashJoinDictProbeMulti::Init(size_t num_threads) { - local_states_.resize(num_threads + 1); // +1 for calling thread + worker threads + local_states_.resize(num_threads); for (size_t i = 0; i < local_states_.size(); ++i) { local_states_[i].is_initialized = false; } @@ -576,6 +576,7 @@ bool HashJoinDictProbeMulti::BatchRemapNeeded( size_t thread_index, const SchemaProjectionMaps& proj_map_probe, const SchemaProjectionMaps& proj_map_build, ExecContext* ctx) { InitLocalStateIfNeeded(thread_index, proj_map_probe, proj_map_build, ctx); + DCHECK_LT(thread_index, local_states_.size()); return local_states_[thread_index].any_needs_remap; } From a5293950669fb201252e5c7a6bbb3fbed6f4a956 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Fri, 15 Apr 2022 15:21:20 -0700 Subject: [PATCH 6/6] Format --- cpp/src/arrow/compute/exec/hash_join.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/compute/exec/hash_join.cc b/cpp/src/arrow/compute/exec/hash_join.cc index 2612c78ed6f..56d02dd3b11 100644 --- a/cpp/src/arrow/compute/exec/hash_join.cc +++ b/cpp/src/arrow/compute/exec/hash_join.cc @@ -100,7 +100,7 @@ class HashJoinBasicImpl : public HashJoinImpl { num_threads_ = num_threads; schema_mgr_ = schema_mgr; key_cmp_ = std::move(key_cmp); - filter_ = std::move(filter); + filter_ = std::move(filter); output_batch_callback_ = std::move(output_batch_callback); finished_callback_ = std::move(finished_callback); // TODO(ARROW-15732)