Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/src/arrow/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ set(ARROW_SRCS
util/bitmap_builders.cc
util/bitmap_ops.cc
util/bpacking.cc
util/cancel.cc
util/compression.cc
util/cpu_info.cc
util/decimal.cc
Expand Down
64 changes: 45 additions & 19 deletions cpp/src/arrow/csv/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -313,12 +313,13 @@ class ReaderMixin {
public:
ReaderMixin(MemoryPool* pool, std::shared_ptr<io::InputStream> input,
const ReadOptions& read_options, const ParseOptions& parse_options,
const ConvertOptions& convert_options)
const ConvertOptions& convert_options, StopToken stop_token)
: pool_(pool),
read_options_(read_options),
parse_options_(parse_options),
convert_options_(convert_options),
input_(std::move(input)) {}
input_(std::move(input)),
stop_token_(std::move(stop_token)) {}

protected:
// Read header and column names from buffer, create column builders
Expand Down Expand Up @@ -500,6 +501,7 @@ class ReaderMixin {

std::shared_ptr<io::InputStream> input_;
std::shared_ptr<internal::TaskGroup> task_group_;
StopToken stop_token_;
};

/////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -697,7 +699,7 @@ class SerialStreamingReader : public BaseStreamingReader {
ARROW_ASSIGN_OR_RAISE(auto rh_it,
MakeReadaheadIterator(std::move(istream_it), block_queue_size));
buffer_iterator_ = CSVBufferIterator::Make(std::move(rh_it));
task_group_ = internal::TaskGroup::MakeSerial();
task_group_ = internal::TaskGroup::MakeSerial(stop_token_);

// Read schema from first batch
ARROW_ASSIGN_OR_RAISE(pending_batch_, ReadNext());
Expand All @@ -710,6 +712,10 @@ class SerialStreamingReader : public BaseStreamingReader {
if (eof_) {
return nullptr;
}
if (stop_token_.IsStopRequested()) {
eof_ = true;
return stop_token_.Poll();
}
if (!block_iterator_) {
Status st = SetupReader();
if (!st.ok()) {
Expand Down Expand Up @@ -790,7 +796,7 @@ class SerialTableReader : public BaseTableReader {
}

Result<std::shared_ptr<Table>> Read() override {
task_group_ = internal::TaskGroup::MakeSerial();
task_group_ = internal::TaskGroup::MakeSerial(stop_token_);

// First block
ARROW_ASSIGN_OR_RAISE(auto first_buffer, buffer_iterator_.Next());
Expand All @@ -804,6 +810,8 @@ class SerialTableReader : public BaseTableReader {
MakeChunker(parse_options_),
std::move(first_buffer));
while (true) {
RETURN_NOT_OK(stop_token_.Poll());

ARROW_ASSIGN_OR_RAISE(auto maybe_block, block_iterator.Next());
if (maybe_block == IterationTraits<CSVBlock>::End()) {
// EOF
Expand Down Expand Up @@ -833,9 +841,10 @@ class AsyncThreadedTableReader
AsyncThreadedTableReader(MemoryPool* pool, std::shared_ptr<io::InputStream> input,
const ReadOptions& read_options,
const ParseOptions& parse_options,
const ConvertOptions& convert_options, Executor* cpu_executor,
Executor* io_executor)
: BaseTableReader(pool, input, read_options, parse_options, convert_options),
const ConvertOptions& convert_options, StopToken stop_token,
Executor* cpu_executor, Executor* io_executor)
Comment on lines +844 to +845
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we might prefer to rewrite this constructor to take an IOContext

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We must support the legacy TableReader::Make taking both a MemoryPool* and an IOContext.

: BaseTableReader(pool, input, read_options, parse_options, convert_options,
std::move(stop_token)),
cpu_executor_(cpu_executor),
io_executor_(io_executor) {}

Expand Down Expand Up @@ -870,7 +879,7 @@ class AsyncThreadedTableReader
Result<std::shared_ptr<Table>> Read() override { return ReadAsync().result(); }

Future<std::shared_ptr<Table>> ReadAsync() override {
task_group_ = internal::TaskGroup::MakeThreaded(cpu_executor_);
task_group_ = internal::TaskGroup::MakeThreaded(cpu_executor_, stop_token_);

auto self = shared_from_this();
return ProcessFirstBuffer().Then([self](std::shared_ptr<Buffer> first_buffer) {
Expand Down Expand Up @@ -939,17 +948,30 @@ Result<std::shared_ptr<TableReader>> MakeTableReader(
if (read_options.use_threads) {
auto cpu_executor = internal::GetCpuThreadPool();
auto io_executor = io_context.executor();
reader = std::make_shared<AsyncThreadedTableReader>(pool, input, read_options,
parse_options, convert_options,
cpu_executor, io_executor);
reader = std::make_shared<AsyncThreadedTableReader>(
pool, input, read_options, parse_options, convert_options,
io_context.stop_token(), cpu_executor, io_executor);
} else {
reader = std::make_shared<SerialTableReader>(pool, input, read_options, parse_options,
convert_options);
reader =
std::make_shared<SerialTableReader>(pool, input, read_options, parse_options,
convert_options, io_context.stop_token());
}
RETURN_NOT_OK(reader->Init());
return reader;
}

Result<std::shared_ptr<StreamingReader>> MakeStreamingReader(
io::IOContext io_context, std::shared_ptr<io::InputStream> input,
const ReadOptions& read_options, const ParseOptions& parse_options,
const ConvertOptions& convert_options) {
std::shared_ptr<BaseStreamingReader> reader;
reader = std::make_shared<SerialStreamingReader>(io_context.pool(), input, read_options,
parse_options, convert_options,
io_context.stop_token());
RETURN_NOT_OK(reader->Init());
return reader;
}

} // namespace

/////////////////////////////////////////////////////////////////////////
Expand All @@ -975,13 +997,17 @@ Result<std::shared_ptr<StreamingReader>> StreamingReader::Make(
MemoryPool* pool, std::shared_ptr<io::InputStream> input,
const ReadOptions& read_options, const ParseOptions& parse_options,
const ConvertOptions& convert_options) {
std::shared_ptr<BaseStreamingReader> reader;
reader = std::make_shared<SerialStreamingReader>(pool, input, read_options,
parse_options, convert_options);
RETURN_NOT_OK(reader->Init());
return reader;
return MakeStreamingReader(io::IOContext(pool), std::move(input), read_options,
parse_options, convert_options);
}

} // namespace csv
Result<std::shared_ptr<StreamingReader>> StreamingReader::Make(
io::IOContext io_context, std::shared_ptr<io::InputStream> input,
const ReadOptions& read_options, const ParseOptions& parse_options,
const ConvertOptions& convert_options) {
return MakeStreamingReader(io_context, std::move(input), read_options, parse_options,
convert_options);
}

} // namespace csv
} // namespace arrow
7 changes: 6 additions & 1 deletion cpp/src/arrow/csv/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class ARROW_EXPORT TableReader {
const ParseOptions&,
const ConvertOptions&);

ARROW_DEPRECATED("Use MemoryPool-less overload (the IOContext holds a pool already)")
ARROW_DEPRECATED("Use MemoryPool-less variant (the IOContext holds a pool already)")
static Result<std::shared_ptr<TableReader>> Make(
MemoryPool* pool, io::IOContext io_context, std::shared_ptr<io::InputStream> input,
const ReadOptions&, const ParseOptions&, const ConvertOptions&);
Expand All @@ -67,6 +67,11 @@ class ARROW_EXPORT StreamingReader : public RecordBatchReader {
///
/// Currently, the StreamingReader is always single-threaded (parallel
/// readahead is not supported).
static Result<std::shared_ptr<StreamingReader>> Make(
io::IOContext io_context, std::shared_ptr<io::InputStream> input,
const ReadOptions&, const ParseOptions&, const ConvertOptions&);

ARROW_DEPRECATED("Use IOContext-based overload")
static Result<std::shared_ptr<StreamingReader>> Make(
MemoryPool* pool, std::shared_ptr<io::InputStream> input, const ReadOptions&,
const ParseOptions&, const ConvertOptions&);
Expand Down
5 changes: 3 additions & 2 deletions cpp/src/arrow/dataset/file_csv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,9 @@ static inline Result<std::shared_ptr<csv::StreamingReader>> OpenReader(
GetConvertOptions(format, scan_options, *first_block, pool));
}

auto maybe_reader = csv::StreamingReader::Make(pool, std::move(input), reader_options,
parse_options, convert_options);
auto maybe_reader =
csv::StreamingReader::Make(io::IOContext(pool), std::move(input), reader_options,
parse_options, convert_options);
if (!maybe_reader.ok()) {
return maybe_reader.status().WithMessage("Could not open CSV input source '",
source.path(), "': ", maybe_reader.status());
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/arrow/io/interfaces.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ namespace io {

static IOContext g_default_io_context{};

IOContext::IOContext(MemoryPool* pool) : IOContext(pool, internal::GetIOThreadPool()) {}
IOContext::IOContext(MemoryPool* pool, StopToken stop_token)
: IOContext(pool, internal::GetIOThreadPool(), std::move(stop_token)) {}

const IOContext& default_io_context() { return g_default_io_context; }

Expand Down
27 changes: 21 additions & 6 deletions cpp/src/arrow/io/interfaces.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include "arrow/io/type_fwd.h"
#include "arrow/type_fwd.h"
#include "arrow/util/cancel.h"
#include "arrow/util/macros.h"
#include "arrow/util/string_view.h"
#include "arrow/util/type_fwd.h"
Expand Down Expand Up @@ -56,17 +57,28 @@ struct ReadRange {
/// multiple sources and must distinguish tasks associated with this IOContext).
struct ARROW_EXPORT IOContext {
// No specified executor: will use a global IO thread pool
IOContext() : IOContext(default_memory_pool()) {}
IOContext() : IOContext(default_memory_pool(), StopToken::Unstoppable()) {}

// No specified executor: will use a global IO thread pool
explicit IOContext(MemoryPool* pool);
explicit IOContext(StopToken stop_token)
: IOContext(default_memory_pool(), std::move(stop_token)) {}

explicit IOContext(MemoryPool* pool, StopToken stop_token = StopToken::Unstoppable());

explicit IOContext(MemoryPool* pool, ::arrow::internal::Executor* executor,
StopToken stop_token = StopToken::Unstoppable(),
int64_t external_id = -1)
: pool_(pool), executor_(executor), external_id_(external_id) {}
: pool_(pool),
executor_(executor),
external_id_(external_id),
stop_token_(std::move(stop_token)) {}

explicit IOContext(::arrow::internal::Executor* executor, int64_t external_id = -1)
: pool_(default_memory_pool()), executor_(executor), external_id_(external_id) {}
explicit IOContext(::arrow::internal::Executor* executor,
StopToken stop_token = StopToken::Unstoppable(),
int64_t external_id = -1)
: pool_(default_memory_pool()),
executor_(executor),
external_id_(external_id),
stop_token_(std::move(stop_token)) {}

MemoryPool* pool() const { return pool_; }

Expand All @@ -75,10 +87,13 @@ struct ARROW_EXPORT IOContext {
// An application-specific ID, forwarded to executor task submissions
int64_t external_id() const { return external_id_; }

StopToken stop_token() const { return stop_token_; }

private:
MemoryPool* pool_;
::arrow::internal::Executor* executor_;
int64_t external_id_;
StopToken stop_token_;
};

struct ARROW_DEPRECATED("renamed to IOContext in 4.0.0") AsyncContext : public IOContext {
Expand Down
3 changes: 3 additions & 0 deletions cpp/src/arrow/status.cc
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ std::string Status::CodeAsString(StatusCode code) {
case StatusCode::Invalid:
type = "Invalid";
break;
case StatusCode::Cancelled:
type = "Cancelled";
break;
case StatusCode::IOError:
type = "IOError";
break;
Expand Down
9 changes: 9 additions & 0 deletions cpp/src/arrow/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ enum class StatusCode : char {
IOError = 5,
CapacityError = 6,
IndexError = 7,
Cancelled = 8,
UnknownError = 9,
NotImplemented = 10,
SerializationError = 11,
Expand Down Expand Up @@ -204,6 +205,12 @@ class ARROW_MUST_USE_TYPE ARROW_EXPORT Status : public util::EqualityComparable<
return Status::FromArgs(StatusCode::Invalid, std::forward<Args>(args)...);
}

/// Return an error status for cancelled operation
template <typename... Args>
static Status Cancelled(Args&&... args) {
return Status::FromArgs(StatusCode::Cancelled, std::forward<Args>(args)...);
}

/// Return an error status when an index is out of bounds
template <typename... Args>
static Status IndexError(Args&&... args) {
Expand Down Expand Up @@ -263,6 +270,8 @@ class ARROW_MUST_USE_TYPE ARROW_EXPORT Status : public util::EqualityComparable<
bool IsKeyError() const { return code() == StatusCode::KeyError; }
/// Return true iff the status indicates invalid data.
bool IsInvalid() const { return code() == StatusCode::Invalid; }
/// Return true iff the status indicates a cancelled operation.
bool IsCancelled() const { return code() == StatusCode::Cancelled; }
/// Return true iff the status indicates an IO-related failure.
bool IsIOError() const { return code() == StatusCode::IOError; }
/// Return true iff the status indicates a container reaching capacity limits.
Expand Down
25 changes: 25 additions & 0 deletions cpp/src/arrow/testing/gtest_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -539,6 +539,24 @@ EnvVarGuard::~EnvVarGuard() {
}
}

struct SignalHandlerGuard::Impl {
int signum_;
internal::SignalHandler old_handler_;

Impl(int signum, const internal::SignalHandler& handler)
: signum_(signum), old_handler_(*internal::SetSignalHandler(signum, handler)) {}

~Impl() { ARROW_EXPECT_OK(internal::SetSignalHandler(signum_, old_handler_)); }
};

SignalHandlerGuard::SignalHandlerGuard(int signum, Callback cb)
: SignalHandlerGuard(signum, internal::SignalHandler(cb)) {}

SignalHandlerGuard::SignalHandlerGuard(int signum, const internal::SignalHandler& handler)
: impl_(new Impl{signum, handler}) {}

SignalHandlerGuard::~SignalHandlerGuard() = default;

namespace {

// Used to prevent compiler optimizing away side-effect-less statements
Expand Down Expand Up @@ -576,6 +594,13 @@ void SleepFor(double seconds) {
std::chrono::nanoseconds(static_cast<int64_t>(seconds * 1e9)));
}

void BusyWait(double seconds, std::function<bool()> predicate) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice. I was just thinking of writing something like this.

const double period = 0.001;
for (int i = 0; !predicate() && i * period < seconds; ++i) {
SleepFor(period);
}
}

///////////////////////////////////////////////////////////////////////////
// Extension types

Expand Down
22 changes: 22 additions & 0 deletions cpp/src/arrow/testing/gtest_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include <cstdint>
#include <cstdlib>
#include <cstring>
#include <functional>
#include <memory>
#include <string>
#include <type_traits>
Expand Down Expand Up @@ -472,6 +473,10 @@ inline void BitmapFromVector(const std::vector<T>& is_valid,
ARROW_TESTING_EXPORT
void SleepFor(double seconds);

// Wait until predicate is true or timeout in seconds expires.
ARROW_TESTING_EXPORT
void BusyWait(double seconds, std::function<bool()> predicate);

template <typename T>
std::vector<T> IteratorToVector(Iterator<T> iterator) {
EXPECT_OK_AND_ASSIGN(auto out, iterator.ToVector());
Expand Down Expand Up @@ -504,6 +509,23 @@ class ARROW_TESTING_EXPORT EnvVarGuard {
bool was_set_;
};

namespace internal {
class SignalHandler;
}

class ARROW_TESTING_EXPORT SignalHandlerGuard {
public:
typedef void (*Callback)(int);

SignalHandlerGuard(int signum, Callback cb);
SignalHandlerGuard(int signum, const internal::SignalHandler& handler);
~SignalHandlerGuard();

protected:
struct Impl;
std::unique_ptr<Impl> impl_;
};

#ifndef ARROW_LARGE_MEMORY_TESTS
#define LARGE_MEMORY_TEST(name) DISABLED_##name
#else
Expand Down
7 changes: 4 additions & 3 deletions cpp/src/arrow/util/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,10 @@ add_arrow_test(utility-test

add_arrow_test(threading-utility-test
SOURCES
future_test
task_group_test
thread_pool_test)
cancel_test.cc
future_test.cc
task_group_test.cc
thread_pool_test.cc)

add_arrow_benchmark(bit_block_counter_benchmark)
add_arrow_benchmark(bit_util_benchmark)
Expand Down
Loading