Skip to content
Closed
227 changes: 51 additions & 176 deletions cpp/src/arrow/csv/column_decoder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,91 +45,13 @@ using internal::TaskGroup;

class ConcreteColumnDecoder : public ColumnDecoder {
public:
explicit ConcreteColumnDecoder(MemoryPool* pool,
std::shared_ptr<internal::TaskGroup> task_group,
int32_t col_index = -1)
: ColumnDecoder(std::move(task_group)),
pool_(pool),
col_index_(col_index),
num_chunks_(-1),
next_chunk_(0) {}

void Append(const std::shared_ptr<BlockParser>& parser) override {
Insert(static_cast<int64_t>(chunks_.size()), parser);
}

void SetEOF(int64_t num_blocks) override {
std::lock_guard<std::mutex> lock(mutex_);

DCHECK_EQ(num_chunks_, -1) << "Cannot change EOF";
num_chunks_ = num_blocks;

// If further chunks have been requested in NextChunk(), arrange to return nullptr
for (int64_t i = num_chunks_; i < static_cast<int64_t>(chunks_.size()); ++i) {
auto* chunk = &chunks_[i];
if (chunk->is_valid()) {
DCHECK(!IsFutureFinished(chunk->state()));
chunk->MarkFinished(std::shared_ptr<Array>());
}
}
}

Result<std::shared_ptr<Array>> NextChunk() override {
std::unique_lock<std::mutex> lock(mutex_);

if (num_chunks_ > 0 && next_chunk_ >= num_chunks_) {
return nullptr; // EOF
}
PrepareChunkUnlocked(next_chunk_);
auto chunk_index = next_chunk_++;
WaitForChunkUnlocked(chunk_index);
// Move Future to avoid keeping chunk alive
return chunks_[chunk_index].MoveResult();
}
explicit ConcreteColumnDecoder(MemoryPool* pool, int32_t col_index = -1)
: ColumnDecoder(), pool_(pool), col_index_(col_index) {}

protected:
// XXX useful?
virtual std::shared_ptr<DataType> type() const = 0;

void WaitForChunkUnlocked(int64_t chunk_index) {
auto future = chunks_[chunk_index]; // Make copy because of resizes
mutex_.unlock();
future.Wait();
mutex_.lock();
}

void PrepareChunk(int64_t block_index) {
std::lock_guard<std::mutex> lock(mutex_);
PrepareChunkUnlocked(block_index);
}

void PrepareChunkUnlocked(int64_t block_index) {
size_t chunk_index = static_cast<size_t>(block_index);
if (chunks_.size() <= chunk_index) {
chunks_.resize(chunk_index + 1);
}
if (!chunks_[block_index].is_valid()) {
chunks_[block_index] = Future<std::shared_ptr<Array>>::Make();
}
}

void SetChunk(int64_t chunk_index, Result<std::shared_ptr<Array>> maybe_array) {
std::lock_guard<std::mutex> lock(mutex_);
SetChunkUnlocked(chunk_index, std::move(maybe_array));
}

void SetChunkUnlocked(int64_t chunk_index, Result<std::shared_ptr<Array>> maybe_array) {
auto* chunk = &chunks_[chunk_index];
DCHECK(chunk->is_valid());
DCHECK(!IsFutureFinished(chunk->state()));

if (maybe_array.ok()) {
chunk->MarkFinished(std::move(maybe_array));
} else {
chunk->MarkFinished(WrapConversionError(maybe_array.status()));
}
}

Status WrapConversionError(const Status& st) {
if (st.ok()) {
return st;
Expand All @@ -142,49 +64,30 @@ class ConcreteColumnDecoder : public ColumnDecoder {

MemoryPool* pool_;
int32_t col_index_;

std::vector<Future<std::shared_ptr<Array>>> chunks_;
int64_t num_chunks_;
int64_t next_chunk_;

std::mutex mutex_;
internal::Executor* executor_;
};

//////////////////////////////////////////////////////////////////////////
// Null column decoder implementation (for a column not in the CSV file)

class NullColumnDecoder : public ConcreteColumnDecoder {
public:
explicit NullColumnDecoder(const std::shared_ptr<DataType>& type, MemoryPool* pool,
const std::shared_ptr<internal::TaskGroup>& task_group)
: ConcreteColumnDecoder(pool, task_group), type_(type) {}
explicit NullColumnDecoder(const std::shared_ptr<DataType>& type, MemoryPool* pool)
: ConcreteColumnDecoder(pool), type_(type) {}

void Insert(int64_t block_index, const std::shared_ptr<BlockParser>& parser) override;
Future<std::shared_ptr<Array>> Decode(
const std::shared_ptr<BlockParser>& parser) override;

protected:
std::shared_ptr<DataType> type() const override { return type_; }

std::shared_ptr<DataType> type_;
};

void NullColumnDecoder::Insert(int64_t block_index,
const std::shared_ptr<BlockParser>& parser) {
PrepareChunk(block_index);

// Spawn a task that will build an array of nulls with the right DataType
const int32_t num_rows = parser->num_rows();
DCHECK_GE(num_rows, 0);

task_group_->Append([=]() -> Status {
std::unique_ptr<ArrayBuilder> builder;
RETURN_NOT_OK(MakeBuilder(pool_, type_, &builder));
std::shared_ptr<Array> array;
RETURN_NOT_OK(builder->AppendNulls(num_rows));
RETURN_NOT_OK(builder->Finish(&array));

SetChunk(block_index, array);
return Status::OK();
});
Future<std::shared_ptr<Array>> NullColumnDecoder::Decode(
const std::shared_ptr<BlockParser>& parser) {
DCHECK_GE(parser->num_rows(), 0);
return MakeArrayOfNull(type_, parser->num_rows(), pool_);
}

//////////////////////////////////////////////////////////////////////////
Expand All @@ -193,15 +96,13 @@ void NullColumnDecoder::Insert(int64_t block_index,
class TypedColumnDecoder : public ConcreteColumnDecoder {
public:
TypedColumnDecoder(const std::shared_ptr<DataType>& type, int32_t col_index,
const ConvertOptions& options, MemoryPool* pool,
const std::shared_ptr<internal::TaskGroup>& task_group)
: ConcreteColumnDecoder(pool, task_group, col_index),
type_(type),
options_(options) {}
const ConvertOptions& options, MemoryPool* pool)
: ConcreteColumnDecoder(pool, col_index), type_(type), options_(options) {}

Status Init();

void Insert(int64_t block_index, const std::shared_ptr<BlockParser>& parser) override;
Future<std::shared_ptr<Array>> Decode(
const std::shared_ptr<BlockParser>& parser) override;

protected:
std::shared_ptr<DataType> type() const override { return type_; }
Expand All @@ -219,17 +120,11 @@ Status TypedColumnDecoder::Init() {
return Status::OK();
}

void TypedColumnDecoder::Insert(int64_t block_index,
const std::shared_ptr<BlockParser>& parser) {
Future<std::shared_ptr<Array>> TypedColumnDecoder::Decode(
const std::shared_ptr<BlockParser>& parser) {
DCHECK_NE(converter_, nullptr);

PrepareChunk(block_index);

// We're careful that all references in the closure outlive the Append() call
task_group_->Append([=]() -> Status {
SetChunk(block_index, converter_->Convert(*parser, col_index_));
return Status::OK();
});
return Future<std::shared_ptr<Array>>::MakeFinished(
converter_->Convert(*parser, col_index_));
}

//////////////////////////////////////////////////////////////////////////
Expand All @@ -238,16 +133,19 @@ void TypedColumnDecoder::Insert(int64_t block_index,
class InferringColumnDecoder : public ConcreteColumnDecoder {
public:
InferringColumnDecoder(int32_t col_index, const ConvertOptions& options,
MemoryPool* pool,
const std::shared_ptr<internal::TaskGroup>& task_group)
: ConcreteColumnDecoder(pool, task_group, col_index),
MemoryPool* pool)
: ConcreteColumnDecoder(pool, col_index),
options_(options),
infer_status_(options),
type_frozen_(false) {}
type_frozen_(false) {
first_inference_run_ = Future<>::Make();
first_inferrer_ = 0;
}

Status Init();

void Insert(int64_t block_index, const std::shared_ptr<BlockParser>& parser) override;
Future<std::shared_ptr<Array>> Decode(
const std::shared_ptr<BlockParser>& parser) override;

protected:
std::shared_ptr<DataType> type() const override {
Expand All @@ -265,10 +163,9 @@ class InferringColumnDecoder : public ConcreteColumnDecoder {
// Current inference status
InferStatus infer_status_;
bool type_frozen_;
std::atomic<int> first_inferrer_;
Future<> first_inference_run_;
std::shared_ptr<Converter> converter_;

// The parsers corresponding to each chunk (for reconverting)
std::vector<std::shared_ptr<BlockParser>> parsers_;
};

Status InferringColumnDecoder::Init() { return UpdateType(); }
Expand All @@ -283,84 +180,62 @@ Result<std::shared_ptr<Array>> InferringColumnDecoder::RunInference(
// (no one else should be updating converter_ concurrently)
auto maybe_array = converter_->Convert(*parser, col_index_);

std::unique_lock<std::mutex> lock(mutex_);
if (maybe_array.ok() || !infer_status_.can_loosen_type()) {
// Conversion succeeded, or failed definitively
DCHECK(!type_frozen_);
type_frozen_ = true;
return maybe_array;
}
// Conversion failed temporarily, try another type
infer_status_.LoosenType(maybe_array.status());
RETURN_NOT_OK(UpdateType());
auto update_status = UpdateType();
if (!update_status.ok()) {
return update_status;
}
}
}

void InferringColumnDecoder::Insert(int64_t block_index,
const std::shared_ptr<BlockParser>& parser) {
PrepareChunk(block_index);

Future<std::shared_ptr<Array>> InferringColumnDecoder::Decode(
const std::shared_ptr<BlockParser>& parser) {
bool already_taken = first_inferrer_.fetch_or(1);
// First block: run inference
if (block_index == 0) {
task_group_->Append([=]() -> Status {
auto maybe_array = RunInference(parser);

std::unique_lock<std::mutex> lock(mutex_);
DCHECK(!type_frozen_);
type_frozen_ = true;
SetChunkUnlocked(block_index, std::move(maybe_array));
return Status::OK();
});
return;
if (!already_taken) {
auto maybe_array = RunInference(parser);
first_inference_run_.MarkFinished();
return Future<std::shared_ptr<Array>>::MakeFinished(std::move(maybe_array));
}

// Non-first block: wait for inference to finish on first block now,
// without blocking a TaskGroup thread.
{
std::unique_lock<std::mutex> lock(mutex_);
PrepareChunkUnlocked(0);
WaitForChunkUnlocked(0);
if (!chunks_[0].status().ok()) {
// Failed converting first chunk: bail out by marking EOF,
// because we can't decide a type for the other chunks.
SetChunkUnlocked(block_index, std::shared_ptr<Array>());
}
return first_inference_run_.Then([this, parser] {
DCHECK(type_frozen_);
}

// Then use the inferred type to convert this block.
task_group_->Append([=]() -> Status {
auto maybe_array = converter_->Convert(*parser, col_index_);

SetChunk(block_index, std::move(maybe_array));
return Status::OK();
return converter_->Convert(*parser, col_index_);
});
}

//////////////////////////////////////////////////////////////////////////
// Factory functions

Result<std::shared_ptr<ColumnDecoder>> ColumnDecoder::Make(
MemoryPool* pool, int32_t col_index, const ConvertOptions& options,
std::shared_ptr<TaskGroup> task_group) {
auto ptr = std::make_shared<InferringColumnDecoder>(col_index, options, pool,
std::move(task_group));
MemoryPool* pool, int32_t col_index, const ConvertOptions& options) {
auto ptr = std::make_shared<InferringColumnDecoder>(col_index, options, pool);
RETURN_NOT_OK(ptr->Init());
return ptr;
}

Result<std::shared_ptr<ColumnDecoder>> ColumnDecoder::Make(
MemoryPool* pool, std::shared_ptr<DataType> type, int32_t col_index,
const ConvertOptions& options, std::shared_ptr<TaskGroup> task_group) {
auto ptr = std::make_shared<TypedColumnDecoder>(std::move(type), col_index, options,
pool, std::move(task_group));
const ConvertOptions& options) {
auto ptr =
std::make_shared<TypedColumnDecoder>(std::move(type), col_index, options, pool);
RETURN_NOT_OK(ptr->Init());
return ptr;
}

Result<std::shared_ptr<ColumnDecoder>> ColumnDecoder::MakeNull(
MemoryPool* pool, std::shared_ptr<DataType> type,
std::shared_ptr<internal::TaskGroup> task_group) {
return std::make_shared<NullColumnDecoder>(std::move(type), pool,
std::move(task_group));
MemoryPool* pool, std::shared_ptr<DataType> type) {
return std::make_shared<NullColumnDecoder>(std::move(type), pool);
}

} // namespace csv
Expand Down
39 changes: 11 additions & 28 deletions cpp/src/arrow/csv/column_decoder.h
Original file line number Diff line number Diff line change
Expand Up @@ -36,45 +36,28 @@ class ARROW_EXPORT ColumnDecoder {
public:
virtual ~ColumnDecoder() = default;

/// Spawn a task that will try to convert and append the given CSV block.
/// All calls to Append() should happen on the same thread, otherwise
/// call Insert() instead.
virtual void Append(const std::shared_ptr<BlockParser>& parser) = 0;

/// Spawn a task that will try to convert and insert the given CSV block
virtual void Insert(int64_t block_index,
const std::shared_ptr<BlockParser>& parser) = 0;

/// Set EOF at the given number of blocks. Must only be called once.
virtual void SetEOF(int64_t num_blocks) = 0;

/// Fetch a chunk.
virtual Result<std::shared_ptr<Array>> NextChunk() = 0;

std::shared_ptr<internal::TaskGroup> task_group() { return task_group_; }
virtual Future<std::shared_ptr<Array>> Decode(
const std::shared_ptr<BlockParser>& parser) = 0;

/// Construct a strictly-typed ColumnDecoder.
static Result<std::shared_ptr<ColumnDecoder>> Make(
MemoryPool* pool, std::shared_ptr<DataType> type, int32_t col_index,
const ConvertOptions& options, std::shared_ptr<internal::TaskGroup> task_group);
static Result<std::shared_ptr<ColumnDecoder>> Make(MemoryPool* pool,
std::shared_ptr<DataType> type,
int32_t col_index,
const ConvertOptions& options);

/// Construct a type-inferring ColumnDecoder.
/// Inference will run only on the first block, the type will be frozen afterwards.
static Result<std::shared_ptr<ColumnDecoder>> Make(
MemoryPool* pool, int32_t col_index, const ConvertOptions& options,
std::shared_ptr<internal::TaskGroup> task_group);
static Result<std::shared_ptr<ColumnDecoder>> Make(MemoryPool* pool, int32_t col_index,
const ConvertOptions& options);

/// Construct a ColumnDecoder for a column of nulls
/// (i.e. not present in the CSV file).
static Result<std::shared_ptr<ColumnDecoder>> MakeNull(
MemoryPool* pool, std::shared_ptr<DataType> type,
std::shared_ptr<internal::TaskGroup> task_group);
static Result<std::shared_ptr<ColumnDecoder>> MakeNull(MemoryPool* pool,
std::shared_ptr<DataType> type);

protected:
explicit ColumnDecoder(std::shared_ptr<internal::TaskGroup> task_group)
: task_group_(std::move(task_group)) {}

std::shared_ptr<internal::TaskGroup> task_group_;
ColumnDecoder() = default;
};

} // namespace csv
Expand Down
Loading