From 6de51fd97f527398f6071ffa80c94371db2e57e9 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Fri, 4 Feb 2022 06:34:13 -0800 Subject: [PATCH 1/8] Add failing test --- r/tests/testthat/test-dplyr-join.R | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/r/tests/testthat/test-dplyr-join.R b/r/tests/testthat/test-dplyr-join.R index 5f4dbfbbd72..dd6f3c78809 100644 --- a/r/tests/testthat/test-dplyr-join.R +++ b/r/tests/testthat/test-dplyr-join.R @@ -249,3 +249,17 @@ test_that("arrow dplyr query correctly filters then joins", { ) ) }) + + +test_that("arrow dplyr query can join with tibble", { + # ARROW-14908 + dir_out <- tempdir() + + write_dataset(iris, file.path(dir_out, "iris")) + species_codes <- data.frame(Species = c("setosa", "versicolor", "virginica"), + code = c("SET", "VER", "VIR")) + + iris <- open_dataset(file.path(dir_out, "iris")) + + left_join(iris, species_codes) %>% collect() +}) \ No newline at end of file From 507050b7c26b82ff65b66af30191c450546bdcf6 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 9 Feb 2022 14:21:23 -0800 Subject: [PATCH 2/8] Add failing DCHECK --- cpp/src/arrow/compute/exec/hash_join.cc | 1 + 1 file changed, 1 insertion(+) diff --git a/cpp/src/arrow/compute/exec/hash_join.cc b/cpp/src/arrow/compute/exec/hash_join.cc index c1e587598fa..1834307516e 100644 --- a/cpp/src/arrow/compute/exec/hash_join.cc +++ b/cpp/src/arrow/compute/exec/hash_join.cc @@ -151,6 +151,7 @@ class HashJoinBasicImpl : public HashJoinImpl { } void InitLocalStateIfNeeded(size_t thread_index) { + DCHECK_LT(thread_index, local_states_.size()); ThreadLocalState& local_state = local_states_[thread_index]; if (!local_state.is_initialized) { InitEncoder(0, HashJoinProjection::KEY, &local_state.exec_batch_keys); From 763d1d731c147f1bfc1dbfd51bae4246e65f41db Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 9 Feb 2022 15:06:39 -0800 Subject: [PATCH 3/8] Create test that should fail across platforms --- r/tests/testthat/test-dplyr-join.R | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/r/tests/testthat/test-dplyr-join.R b/r/tests/testthat/test-dplyr-join.R index dd6f3c78809..f1241943146 100644 --- a/r/tests/testthat/test-dplyr-join.R +++ b/r/tests/testthat/test-dplyr-join.R @@ -253,8 +253,11 @@ test_that("arrow dplyr query correctly filters then joins", { test_that("arrow dplyr query can join with tibble", { # ARROW-14908 + existing_use_threads <- getOption("arrow.use_threads") + options(arrow.use_threads = FALSE) dir_out <- tempdir() + # Note: Species is a DictionaryArray, but this still fails even if we convert to StringArray. write_dataset(iris, file.path(dir_out, "iris")) species_codes <- data.frame(Species = c("setosa", "versicolor", "virginica"), code = c("SET", "VER", "VIR")) @@ -262,4 +265,7 @@ test_that("arrow dplyr query can join with tibble", { iris <- open_dataset(file.path(dir_out, "iris")) left_join(iris, species_codes) %>% collect() + + # Reset + options(arrow.use_threads = existing_use_threads) }) \ No newline at end of file From a61b7a482e97c1ded3896e50ce62ea8f60d4826a Mon Sep 17 00:00:00 2001 From: Will Jones Date: Fri, 11 Feb 2022 09:38:32 -0800 Subject: [PATCH 4/8] Add one to thread count The main thread touches the thread_indexer before it's used by the worker thread, so we need to make sure it has capacity for both threads. --- cpp/src/arrow/compute/exec/hash_join.cc | 2 +- cpp/src/arrow/compute/exec/hash_join_dict.cc | 2 +- r/tests/testthat/test-dplyr-join.R | 3 ++- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/compute/exec/hash_join.cc b/cpp/src/arrow/compute/exec/hash_join.cc index 1834307516e..54ddc778caf 100644 --- a/cpp/src/arrow/compute/exec/hash_join.cc +++ b/cpp/src/arrow/compute/exec/hash_join.cc @@ -103,7 +103,7 @@ class HashJoinBasicImpl : public HashJoinImpl { filter_ = std::move(filter); output_batch_callback_ = std::move(output_batch_callback); finished_callback_ = std::move(finished_callback); - local_states_.resize(num_threads); + local_states_.resize(num_threads + 1); // +1 for calling thread + worker thread for (size_t i = 0; i < local_states_.size(); ++i) { local_states_[i].is_initialized = false; local_states_[i].is_has_match_initialized = false; diff --git a/cpp/src/arrow/compute/exec/hash_join_dict.cc b/cpp/src/arrow/compute/exec/hash_join_dict.cc index b923433b493..35bdc9f7bfc 100644 --- a/cpp/src/arrow/compute/exec/hash_join_dict.cc +++ b/cpp/src/arrow/compute/exec/hash_join_dict.cc @@ -566,7 +566,7 @@ Status HashJoinDictBuildMulti::PostDecode( } void HashJoinDictProbeMulti::Init(size_t num_threads) { - local_states_.resize(num_threads); + local_states_.resize(num_threads + 1); // +1 for calling thread + worker thread for (size_t i = 0; i < local_states_.size(); ++i) { local_states_[i].is_initialized = false; } diff --git a/r/tests/testthat/test-dplyr-join.R b/r/tests/testthat/test-dplyr-join.R index f1241943146..0d7a7679815 100644 --- a/r/tests/testthat/test-dplyr-join.R +++ b/r/tests/testthat/test-dplyr-join.R @@ -264,7 +264,8 @@ test_that("arrow dplyr query can join with tibble", { iris <- open_dataset(file.path(dir_out, "iris")) - left_join(iris, species_codes) %>% collect() + res <- left_join(iris, species_codes) %>% collect() # We should not segfault here. + expect_equal(nrow(res), 150) # Reset options(arrow.use_threads = existing_use_threads) From 0418acf7f3631295f2b7fc69005ae0a1f0098c96 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Fri, 11 Feb 2022 09:49:18 -0800 Subject: [PATCH 5/8] Fix formatting --- cpp/src/arrow/compute/exec/hash_join_dict.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cpp/src/arrow/compute/exec/hash_join_dict.cc b/cpp/src/arrow/compute/exec/hash_join_dict.cc index 35bdc9f7bfc..2ea4c662a18 100644 --- a/cpp/src/arrow/compute/exec/hash_join_dict.cc +++ b/cpp/src/arrow/compute/exec/hash_join_dict.cc @@ -566,7 +566,7 @@ Status HashJoinDictBuildMulti::PostDecode( } void HashJoinDictProbeMulti::Init(size_t num_threads) { - local_states_.resize(num_threads + 1); // +1 for calling thread + worker thread + local_states_.resize(num_threads + 1); // +1 for calling thread + worker thread for (size_t i = 0; i < local_states_.size(); ++i) { local_states_[i].is_initialized = false; } From 8da2b1cbaad32727987fd9d525003b26907fbed1 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Fri, 11 Feb 2022 10:50:56 -0800 Subject: [PATCH 6/8] Apply suggestions from code review Co-authored-by: Jonathan Keane --- r/tests/testthat/test-dplyr-join.R | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/r/tests/testthat/test-dplyr-join.R b/r/tests/testthat/test-dplyr-join.R index 0d7a7679815..3035ba429ac 100644 --- a/r/tests/testthat/test-dplyr-join.R +++ b/r/tests/testthat/test-dplyr-join.R @@ -265,8 +265,8 @@ test_that("arrow dplyr query can join with tibble", { iris <- open_dataset(file.path(dir_out, "iris")) res <- left_join(iris, species_codes) %>% collect() # We should not segfault here. - expect_equal(nrow(res), 150) + expect_equal(nrow(res), 150) # Reset options(arrow.use_threads = existing_use_threads) -}) \ No newline at end of file +}) From 3ce1e7f603143b19a39c82b72163e797fe8448da Mon Sep 17 00:00:00 2001 From: Will Jones Date: Fri, 11 Feb 2022 10:59:35 -0800 Subject: [PATCH 7/8] Use withr to change options --- r/tests/testthat/test-dplyr-join.R | 25 ++++++++++++------------- 1 file changed, 12 insertions(+), 13 deletions(-) diff --git a/r/tests/testthat/test-dplyr-join.R b/r/tests/testthat/test-dplyr-join.R index 3035ba429ac..319aa8cf98d 100644 --- a/r/tests/testthat/test-dplyr-join.R +++ b/r/tests/testthat/test-dplyr-join.R @@ -253,20 +253,19 @@ test_that("arrow dplyr query correctly filters then joins", { test_that("arrow dplyr query can join with tibble", { # ARROW-14908 - existing_use_threads <- getOption("arrow.use_threads") - options(arrow.use_threads = FALSE) dir_out <- tempdir() - - # Note: Species is a DictionaryArray, but this still fails even if we convert to StringArray. write_dataset(iris, file.path(dir_out, "iris")) - species_codes <- data.frame(Species = c("setosa", "versicolor", "virginica"), - code = c("SET", "VER", "VIR")) - - iris <- open_dataset(file.path(dir_out, "iris")) - - res <- left_join(iris, species_codes) %>% collect() # We should not segfault here. - expect_equal(nrow(res), 150) + species_codes <- data.frame( + Species = c("setosa", "versicolor", "virginica"), + code = c("SET", "VER", "VIR") + ) - # Reset - options(arrow.use_threads = existing_use_threads) + withr::with_options( + list(arrow.use_threads = FALSE), + { + iris <- open_dataset(file.path(dir_out, "iris")) + res <- left_join(iris, species_codes) %>% collect() # We should not segfault here. + expect_equal(nrow(res), 150) + } + ) }) From 20c38d6da9a9f6ced8c61c9e82205db33a0377b9 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Mon, 14 Feb 2022 13:21:04 -0800 Subject: [PATCH 8/8] Apply suggestions from code review Co-authored-by: Weston Pace --- cpp/src/arrow/compute/exec/hash_join.cc | 2 +- cpp/src/arrow/compute/exec/hash_join_dict.cc | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/compute/exec/hash_join.cc b/cpp/src/arrow/compute/exec/hash_join.cc index 54ddc778caf..5a9afaa5bdf 100644 --- a/cpp/src/arrow/compute/exec/hash_join.cc +++ b/cpp/src/arrow/compute/exec/hash_join.cc @@ -103,7 +103,7 @@ class HashJoinBasicImpl : public HashJoinImpl { filter_ = std::move(filter); output_batch_callback_ = std::move(output_batch_callback); finished_callback_ = std::move(finished_callback); - local_states_.resize(num_threads + 1); // +1 for calling thread + worker thread + local_states_.resize(num_threads + 1); // +1 for calling thread + worker threads for (size_t i = 0; i < local_states_.size(); ++i) { local_states_[i].is_initialized = false; local_states_[i].is_has_match_initialized = false; diff --git a/cpp/src/arrow/compute/exec/hash_join_dict.cc b/cpp/src/arrow/compute/exec/hash_join_dict.cc index 2ea4c662a18..ac1fbbaa3df 100644 --- a/cpp/src/arrow/compute/exec/hash_join_dict.cc +++ b/cpp/src/arrow/compute/exec/hash_join_dict.cc @@ -566,7 +566,7 @@ Status HashJoinDictBuildMulti::PostDecode( } void HashJoinDictProbeMulti::Init(size_t num_threads) { - local_states_.resize(num_threads + 1); // +1 for calling thread + worker thread + local_states_.resize(num_threads + 1); // +1 for calling thread + worker threads for (size_t i = 0; i < local_states_.size(); ++i) { local_states_[i].is_initialized = false; }