Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions ci/scripts/cpp_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ if [ "${ARROW_USE_CCACHE}" == "ON" ]; then
ccache -s
fi

if [ "${ARROW_USE_TSAN}" == "ON" ] && [ ! -x "${ASAN_SYMBOLIZER_PATH}" ]; then
echo -e "Invalid value for \$ASAN_SYMBOLIZER_PATH: ${ASAN_SYMBOLIZER_PATH}"
exit 1
fi

mkdir -p ${build_dir}
pushd ${build_dir}

Expand Down
7 changes: 5 additions & 2 deletions cpp/cmake_modules/SetupCxxFlags.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,6 @@ if("${BUILD_WARNING_LEVEL}" STREQUAL "CHECKIN")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wdocumentation")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-missing-braces")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-unused-parameter")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-unknown-warning-option")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-constant-logical-operand")
elseif(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wall")
Expand Down Expand Up @@ -342,7 +341,11 @@ if(MSVC)
# Disable "switch statement contains 'default' but no 'case' labels" warning
# (required for protobuf, see https://github.com/protocolbuffers/protobuf/issues/6885)
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} /wd4065")

elseif(CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
# Avoid error when an unknown warning flag is passed
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-unknown-warning-option")

if(CMAKE_CXX_COMPILER_VERSION VERSION_EQUAL "7.0" OR CMAKE_CXX_COMPILER_VERSION
VERSION_GREATER "7.0")
# Without this, gcc >= 7 warns related to changes in C++17
Expand Down Expand Up @@ -383,7 +386,7 @@ elseif(CMAKE_CXX_COMPILER_ID STREQUAL "AppleClang" OR CMAKE_CXX_COMPILER_ID STRE
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Qunused-arguments")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Qunused-arguments")

# Avoid clang error when an unknown warning flag is passed
# Avoid error when an unknown warning flag is passed
set(CXX_COMMON_FLAGS "${CXX_COMMON_FLAGS} -Wno-unknown-warning-option")
# Add colors when paired with ninja
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fcolor-diagnostics")
Expand Down
13 changes: 8 additions & 5 deletions cpp/src/arrow/compute/exec/expression.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
#include "arrow/io/memory.h"
#include "arrow/ipc/reader.h"
#include "arrow/ipc/writer.h"
#include "arrow/util/atomic_shared_ptr.h"
#include "arrow/util/hash_util.h"
#include "arrow/util/key_value_metadata.h"
#include "arrow/util/logging.h"
Expand All @@ -43,11 +42,15 @@ using internal::checked_pointer_cast;

namespace compute {

Expression::Expression(Call call) {
call.hash = std::hash<std::string>{}(call.function_name);
for (const auto& arg : call.arguments) {
arrow::internal::hash_combine(call.hash, arg.hash());
void Expression::Call::ComputeHash() {
hash = std::hash<std::string>{}(function_name);
for (const auto& arg : arguments) {
arrow::internal::hash_combine(hash, arg.hash());
}
}

Expression::Expression(Call call) {
call.ComputeHash();
impl_ = std::make_shared<Impl>(std::move(call));
}

Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/compute/exec/expression.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,16 @@ class ARROW_EXPORT Expression {
std::string function_name;
std::vector<Expression> arguments;
std::shared_ptr<FunctionOptions> options;
// Cached hash value
size_t hash;

// post-Bind properties:
std::shared_ptr<Function> function;
const Kernel* kernel = NULLPTR;
std::shared_ptr<KernelState> kernel_state;
ValueDescr descr;

void ComputeHash();
};

std::string ToString() const;
Expand Down
31 changes: 26 additions & 5 deletions cpp/src/arrow/util/async_generator.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@

#pragma once

#include <atomic>
#include <cassert>
#include <cstring>
#include <deque>
#include <queue>
#include <thread>
Expand Down Expand Up @@ -1253,7 +1255,9 @@ class BackgroundGenerator {
it(std::move(it)),
reading(false),
finished(false),
should_shutdown(false) {}
should_shutdown(false) {
SetWorkerThreadId({}); // default-initialized thread id
}

void ClearQueue() {
while (!queue.empty()) {
Expand Down Expand Up @@ -1312,11 +1316,28 @@ class BackgroundGenerator {
return next;
}

void SetWorkerThreadId(const std::thread::id tid) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Of course, we could also not bother with this complication, since worker_thread_id is only used for debugging.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True, but the assert catches an otherwise potentially subtle issue in how BackgroundGenerator is meant to be used. It's not entirely obvious that you aren't allowed to dispose of it from the worker thread and it a fair amount of debugging to discover this root cause.

What was the issue? Is it because the worker thread id might be changing (from set to unset) when the assert in cleanup happens?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it because the worker thread id might be changing (from set to unset) when the assert in cleanup happens?

Exactly, yes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this neat trick to turn a thread::id into an int64_t is indeed reliable it might be nice to add int64_t arrow::internal::GetCurrentThreadId(). Probably in thread_pool.h/cc.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will do.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #10644

uint64_t equiv{0};
// std::thread::id is trivially copyable as per C++ spec,
// so type punning as a uint64_t should work
static_assert(sizeof(std::thread::id) <= sizeof(uint64_t),
"std::thread::id can't fit into uint64_t");
memcpy(&equiv, reinterpret_cast<const void*>(&tid), sizeof(tid));
worker_thread_id.store(equiv);
}

std::thread::id GetWorkerThreadId() {
const auto equiv = worker_thread_id.load();
std::thread::id tid;
memcpy(reinterpret_cast<void*>(&tid), &equiv, sizeof(tid));
return tid;
}

internal::Executor* io_executor;
const int max_q;
const int q_restart;
Iterator<T> it;
std::thread::id worker_thread_id;
std::atomic<uint64_t> worker_thread_id;

// If true, the task is actively pumping items from the queue and does not need a
// restart
Expand Down Expand Up @@ -1344,7 +1365,7 @@ class BackgroundGenerator {
///
/// It's a deadlock if we enter cleanup from
/// the worker thread but it can happen if the consumer doesn't transfer away
assert(state->worker_thread_id != std::this_thread::get_id());
assert(state->GetWorkerThreadId() != std::this_thread::get_id());
Future<> finish_fut;
{
auto lock = state->mutex.Lock();
Expand All @@ -1365,7 +1386,7 @@ class BackgroundGenerator {
static void WorkerTask(std::shared_ptr<State> state) {
// We need to capture the state to read while outside the mutex
bool reading = true;
state->worker_thread_id = std::this_thread::get_id();
state->SetWorkerThreadId(std::this_thread::get_id());
while (reading) {
auto next = state->it.Next();
// Need to capture state->waiting_future inside the mutex to mark finished outside
Expand Down Expand Up @@ -1417,7 +1438,7 @@ class BackgroundGenerator {
// reference it. We can safely transition to idle now.
task_finished = state->task_finished;
state->task_finished = Future<>();
state->worker_thread_id = std::thread::id();
state->SetWorkerThreadId({}); // default-initialized thread id
}
task_finished.MarkFinished();
}
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/arrow/util/async_generator_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
// specific language governing permissions and limitations
// under the License.

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <mutex>
Expand Down Expand Up @@ -68,14 +69,14 @@ class TrackingGenerator {
return state_->source();
}

int num_read() { return state_->num_read; }
int num_read() { return state_->num_read.load(); }

private:
struct State {
explicit State(AsyncGenerator<T> source) : source(std::move(source)), num_read(0) {}

AsyncGenerator<T> source;
int num_read;
std::atomic<int> num_read;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@westonpace I had to change this as well, got a sporadic race condition in MergedGeneratorTests/MergedGeneratorTestFixture.MergedLimitedSubscriptions/1.

};

std::shared_ptr<State> state_;
Expand Down
3 changes: 3 additions & 0 deletions dev/tasks/tasks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -824,7 +824,10 @@ tasks:
template: docker-tests/github.linux.yml
params:
env:
# clang-tools and llvm version need to be synchronized so as
# to have the right llvm-symbolizer version
CLANG_TOOLS: 11
LLVM: 11
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kszucs Does this seem ok?

UBUNTU: 20.04
image: ubuntu-cpp-thread-sanitizer

Expand Down
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ services:
<<: *ccache
CC: clang-${CLANG_TOOLS}
CXX: clang++-${CLANG_TOOLS}
ARROW_BUILD_STATIC: "OFF"
ARROW_ENABLE_TIMING_TESTS: # inherit
ARROW_FUZZING: "ON" # Check fuzz regressions
ARROW_JEMALLOC: "OFF"
Expand Down Expand Up @@ -399,6 +400,7 @@ services:
<<: *ccache
CC: clang-${CLANG_TOOLS}
CXX: clang++-${CLANG_TOOLS}
ARROW_BUILD_STATIC: "OFF"
ARROW_ENABLE_TIMING_TESTS: # inherit
ARROW_DATASET: "ON"
ARROW_JEMALLOC: "OFF"
Expand Down