Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions c_glib/arrow-glib/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1591,8 +1591,7 @@ garrow_csv_reader_new(GArrowInputStream *input,
}

auto arrow_reader =
arrow::csv::TableReader::Make(arrow::default_memory_pool(),
arrow::io::AsyncContext(),
arrow::csv::TableReader::Make(arrow::io::default_io_context(),
arrow_input,
read_options,
parse_options,
Expand Down
2 changes: 1 addition & 1 deletion cpp/examples/minimal_build/example.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ Status RunMain(int argc, char** argv) {
ARROW_ASSIGN_OR_RAISE(
auto csv_reader,
arrow::csv::TableReader::Make(arrow::default_memory_pool(),
arrow::io::AsyncContext(),
arrow::io::default_io_context(),
input_file,
arrow::csv::ReadOptions::Defaults(),
arrow::csv::ParseOptions::Defaults(),
Expand Down
54 changes: 35 additions & 19 deletions cpp/src/arrow/csv/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
#include "arrow/status.h"
#include "arrow/table.h"
#include "arrow/type.h"
#include "arrow/type_fwd.h"
#include "arrow/util/async_generator.h"
#include "arrow/util/future.h"
#include "arrow/util/iterator.h"
Expand All @@ -51,19 +52,12 @@
#include "arrow/util/utf8.h"

namespace arrow {

class MemoryPool;

namespace io {

class InputStream;

} // namespace io

namespace csv {

using internal::Executor;

namespace {

struct ConversionSchema {
struct Column {
std::string name;
Expand Down Expand Up @@ -154,6 +148,7 @@ struct CSVBlock {
std::function<Status(int64_t)> consume_bytes;
};

} // namespace
} // namespace csv

template <>
Expand All @@ -162,6 +157,7 @@ struct IterationTraits<csv::CSVBlock> {
};

namespace csv {
namespace {

// The == operator must be defined to be used as T in Iterator<T>
bool operator==(const CSVBlock& left, const CSVBlock& right) {
Expand Down Expand Up @@ -935,18 +931,17 @@ class AsyncThreadedTableReader
AsyncGenerator<std::shared_ptr<Buffer>> buffer_generator_;
};

/////////////////////////////////////////////////////////////////////////
// Factory functions

Result<std::shared_ptr<TableReader>> TableReader::Make(
MemoryPool* pool, io::AsyncContext async_context,
std::shared_ptr<io::InputStream> input, const ReadOptions& read_options,
const ParseOptions& parse_options, const ConvertOptions& convert_options) {
Result<std::shared_ptr<TableReader>> MakeTableReader(
MemoryPool* pool, io::IOContext io_context, std::shared_ptr<io::InputStream> input,
const ReadOptions& read_options, const ParseOptions& parse_options,
const ConvertOptions& convert_options) {
std::shared_ptr<BaseTableReader> reader;
if (read_options.use_threads) {
reader = std::make_shared<AsyncThreadedTableReader>(
pool, input, read_options, parse_options, convert_options, async_context.executor,
internal::GetCpuThreadPool());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that the CPU and IO executors were passed in the wrong order here.

auto cpu_executor = internal::GetCpuThreadPool();
auto io_executor = io_context.executor();
reader = std::make_shared<AsyncThreadedTableReader>(pool, input, read_options,
parse_options, convert_options,
cpu_executor, io_executor);
} else {
reader = std::make_shared<SerialTableReader>(pool, input, read_options, parse_options,
convert_options);
Expand All @@ -955,6 +950,27 @@ Result<std::shared_ptr<TableReader>> TableReader::Make(
return reader;
}

} // namespace

/////////////////////////////////////////////////////////////////////////
// Factory functions

Result<std::shared_ptr<TableReader>> TableReader::Make(
io::IOContext io_context, std::shared_ptr<io::InputStream> input,
const ReadOptions& read_options, const ParseOptions& parse_options,
const ConvertOptions& convert_options) {
return MakeTableReader(io_context.pool(), io_context, std::move(input), read_options,
parse_options, convert_options);
}

Result<std::shared_ptr<TableReader>> TableReader::Make(
MemoryPool* pool, io::IOContext io_context, std::shared_ptr<io::InputStream> input,
const ReadOptions& read_options, const ParseOptions& parse_options,
const ConvertOptions& convert_options) {
return MakeTableReader(pool, io_context, std::move(input), read_options, parse_options,
convert_options);
}

Result<std::shared_ptr<StreamingReader>> StreamingReader::Make(
MemoryPool* pool, std::shared_ptr<io::InputStream> input,
const ReadOptions& read_options, const ParseOptions& parse_options,
Expand Down
8 changes: 6 additions & 2 deletions cpp/src/arrow/csv/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,16 @@ class ARROW_EXPORT TableReader {
virtual Future<std::shared_ptr<Table>> ReadAsync() = 0;

/// Create a TableReader instance
static Result<std::shared_ptr<TableReader>> Make(MemoryPool* pool,
io::AsyncContext async_context,
static Result<std::shared_ptr<TableReader>> Make(io::IOContext io_context,
std::shared_ptr<io::InputStream> input,
const ReadOptions&,
const ParseOptions&,
const ConvertOptions&);

ARROW_DEPRECATED("Use MemoryPool-less overload (the IOContext holds a pool already)")
static Result<std::shared_ptr<TableReader>> Make(
MemoryPool* pool, io::IOContext io_context, std::shared_ptr<io::InputStream> input,
const ReadOptions&, const ParseOptions&, const ConvertOptions&);
};

/// Experimental
Expand Down
11 changes: 5 additions & 6 deletions cpp/src/arrow/csv/reader_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,8 @@ TableReaderFactory MakeSerialFactory() {
auto read_options = ReadOptions::Defaults();
read_options.block_size = 1 << 10;
read_options.use_threads = false;
return TableReader::Make(default_memory_pool(), io::AsyncContext(), input_stream,
read_options, ParseOptions::Defaults(),
ConvertOptions::Defaults());
return TableReader::Make(io::default_io_context(), input_stream, read_options,
ParseOptions::Defaults(), ConvertOptions::Defaults());
};
}

Expand All @@ -131,9 +130,9 @@ Result<TableReaderFactory> MakeAsyncFactory(
ReadOptions read_options = ReadOptions::Defaults();
read_options.use_threads = true;
read_options.block_size = 1 << 10;
auto table_reader = TableReader::Make(
default_memory_pool(), io::AsyncContext(thread_pool.get()), input_stream,
read_options, ParseOptions::Defaults(), ConvertOptions::Defaults());
auto table_reader =
TableReader::Make(io::IOContext(thread_pool.get()), input_stream, read_options,
ParseOptions::Defaults(), ConvertOptions::Defaults());
return table_reader;
};
}
Expand Down
3 changes: 1 addition & 2 deletions cpp/src/arrow/dataset/discovery.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@
#include "arrow/dataset/partition.h"
#include "arrow/dataset/type_fwd.h"
#include "arrow/dataset/visibility.h"
#include "arrow/filesystem/filesystem.h"
#include "arrow/filesystem/path_forest.h"
#include "arrow/filesystem/type_fwd.h"
#include "arrow/result.h"
#include "arrow/util/macros.h"
#include "arrow/util/variant.h"
Expand Down
9 changes: 9 additions & 0 deletions cpp/src/arrow/dataset/file_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,13 @@ Status FileWriter::Write(RecordBatchReader* batches) {
return Status::OK();
}

Status FileWriter::Finish() {
RETURN_NOT_OK(FinishInternal());
return destination_->Close();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: explicitly closing output files is preferrable, especially with remote filesystems where this might plausibly fail.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(and MockFileSystem will deliberately not write anything out if you don't close it explicitly)

}

namespace {

constexpr util::string_view kIntegerToken = "{i}";

Status ValidateBasenameTemplate(util::string_view basename_template) {
Expand Down Expand Up @@ -257,6 +264,8 @@ class WriteQueue {
std::shared_ptr<Schema> schema_;
};

} // namespace

Status FileSystemDataset::Write(const FileSystemDatasetWriteOptions& write_options,
std::shared_ptr<Scanner> scanner) {
RETURN_NOT_OK(ValidateBasenameTemplate(write_options.basename_template));
Expand Down
12 changes: 9 additions & 3 deletions cpp/src/arrow/dataset/file_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,18 +268,24 @@ class ARROW_DS_EXPORT FileWriter {

Status Write(RecordBatchReader* batches);

virtual Status Finish() = 0;
Status Finish();

const std::shared_ptr<FileFormat>& format() const { return options_->format(); }
const std::shared_ptr<Schema>& schema() const { return schema_; }
const std::shared_ptr<FileWriteOptions>& options() const { return options_; }

protected:
FileWriter(std::shared_ptr<Schema> schema, std::shared_ptr<FileWriteOptions> options)
: schema_(std::move(schema)), options_(std::move(options)) {}
FileWriter(std::shared_ptr<Schema> schema, std::shared_ptr<FileWriteOptions> options,
std::shared_ptr<io::OutputStream> destination)
: schema_(std::move(schema)),
options_(std::move(options)),
destination_(destination) {}

virtual Status FinishInternal() = 0;

std::shared_ptr<Schema> schema_;
std::shared_ptr<FileWriteOptions> options_;
std::shared_ptr<io::OutputStream> destination_;
};

struct ARROW_DS_EXPORT FileSystemDatasetWriteOptions {
Expand Down
10 changes: 6 additions & 4 deletions cpp/src/arrow/dataset/file_ipc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -193,20 +193,22 @@ Result<std::shared_ptr<FileWriter>> IpcFileFormat::MakeWriter(
ipc_options->metadata));

return std::shared_ptr<FileWriter>(
new IpcFileWriter(std::move(writer), std::move(schema), std::move(ipc_options)));
new IpcFileWriter(std::move(destination), std::move(writer), std::move(schema),
std::move(ipc_options)));
}

IpcFileWriter::IpcFileWriter(std::shared_ptr<ipc::RecordBatchWriter> writer,
IpcFileWriter::IpcFileWriter(std::shared_ptr<io::OutputStream> destination,
std::shared_ptr<ipc::RecordBatchWriter> writer,
std::shared_ptr<Schema> schema,
std::shared_ptr<IpcFileWriteOptions> options)
: FileWriter(std::move(schema), std::move(options)),
: FileWriter(std::move(schema), std::move(options), std::move(destination)),
batch_writer_(std::move(writer)) {}

Status IpcFileWriter::Write(const std::shared_ptr<RecordBatch>& batch) {
return batch_writer_->WriteRecordBatch(*batch);
}

Status IpcFileWriter::Finish() { return batch_writer_->Close(); }
Status IpcFileWriter::FinishInternal() { return batch_writer_->Close(); }

} // namespace dataset
} // namespace arrow
15 changes: 6 additions & 9 deletions cpp/src/arrow/dataset/file_ipc.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,15 +25,10 @@
#include "arrow/dataset/file_base.h"
#include "arrow/dataset/type_fwd.h"
#include "arrow/dataset/visibility.h"
#include "arrow/ipc/type_fwd.h"
#include "arrow/result.h"

namespace arrow {
namespace ipc {

class RecordBatchWriter;
struct IpcWriteOptions;

} // namespace ipc
namespace dataset {

/// \brief A FileFormat implementation that reads from and writes to Ipc files
Expand Down Expand Up @@ -82,13 +77,15 @@ class ARROW_DS_EXPORT IpcFileWriter : public FileWriter {
public:
Status Write(const std::shared_ptr<RecordBatch>& batch) override;

Status Finish() override;

private:
IpcFileWriter(std::shared_ptr<ipc::RecordBatchWriter> writer,
IpcFileWriter(std::shared_ptr<io::OutputStream> destination,
std::shared_ptr<ipc::RecordBatchWriter> writer,
std::shared_ptr<Schema> schema,
std::shared_ptr<IpcFileWriteOptions> options);

Status FinishInternal() override;

std::shared_ptr<io::OutputStream> destination_;
std::shared_ptr<ipc::RecordBatchWriter> batch_writer_;

friend class IpcFileFormat;
Expand Down
22 changes: 11 additions & 11 deletions cpp/src/arrow/dataset/file_parquet.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,7 @@ class ParquetScanTask : public ScanTask {
ParquetScanTask(int row_group, std::vector<int> column_projection,
std::shared_ptr<parquet::arrow::FileReader> reader,
std::shared_ptr<std::once_flag> pre_buffer_once,
std::vector<int> pre_buffer_row_groups,
arrow::io::AsyncContext async_context,
std::vector<int> pre_buffer_row_groups, arrow::io::IOContext io_context,
arrow::io::CacheOptions cache_options,
std::shared_ptr<ScanOptions> options,
std::shared_ptr<ScanContext> context)
Expand All @@ -67,7 +66,7 @@ class ParquetScanTask : public ScanTask {
reader_(std::move(reader)),
pre_buffer_once_(std::move(pre_buffer_once)),
pre_buffer_row_groups_(std::move(pre_buffer_row_groups)),
async_context_(async_context),
io_context_(io_context),
cache_options_(cache_options) {}

Result<RecordBatchIterator> Execute() override {
Expand Down Expand Up @@ -106,7 +105,7 @@ class ParquetScanTask : public ScanTask {
BEGIN_PARQUET_CATCH_EXCEPTIONS
std::call_once(*pre_buffer_once_, [this]() {
reader_->parquet_reader()->PreBuffer(pre_buffer_row_groups_, column_projection_,
async_context_, cache_options_);
io_context_, cache_options_);
});
END_PARQUET_CATCH_EXCEPTIONS
}
Expand All @@ -121,7 +120,7 @@ class ParquetScanTask : public ScanTask {
// to be done. We assume all scan tasks have the same column projection.
std::shared_ptr<std::once_flag> pre_buffer_once_;
std::vector<int> pre_buffer_row_groups_;
arrow::io::AsyncContext async_context_;
arrow::io::IOContext io_context_;
arrow::io::CacheOptions cache_options_;
};

Expand Down Expand Up @@ -362,7 +361,7 @@ Result<ScanTaskIterator> ParquetFileFormat::ScanFile(std::shared_ptr<ScanOptions
for (size_t i = 0; i < row_groups.size(); ++i) {
tasks[i] = std::make_shared<ParquetScanTask>(
row_groups[i], column_projection, reader, pre_buffer_once, row_groups,
reader_options.async_context, reader_options.cache_options, options, context);
reader_options.io_context, reader_options.cache_options, options, context);
}

return MakeVectorIterator(std::move(tasks));
Expand Down Expand Up @@ -410,21 +409,22 @@ Result<std::shared_ptr<FileWriter>> ParquetFileFormat::MakeWriter(
*schema, default_memory_pool(), destination, parquet_options->writer_properties,
parquet_options->arrow_writer_properties, &parquet_writer));

return std::shared_ptr<FileWriter>(
new ParquetFileWriter(std::move(parquet_writer), std::move(parquet_options)));
return std::shared_ptr<FileWriter>(new ParquetFileWriter(
std::move(destination), std::move(parquet_writer), std::move(parquet_options)));
}

ParquetFileWriter::ParquetFileWriter(std::shared_ptr<parquet::arrow::FileWriter> writer,
ParquetFileWriter::ParquetFileWriter(std::shared_ptr<io::OutputStream> destination,
std::shared_ptr<parquet::arrow::FileWriter> writer,
std::shared_ptr<ParquetFileWriteOptions> options)
: FileWriter(writer->schema(), std::move(options)),
: FileWriter(writer->schema(), std::move(options), std::move(destination)),
parquet_writer_(std::move(writer)) {}

Status ParquetFileWriter::Write(const std::shared_ptr<RecordBatch>& batch) {
ARROW_ASSIGN_OR_RAISE(auto table, Table::FromRecordBatches(batch->schema(), {batch}));
return parquet_writer_->WriteTable(*table, batch->num_rows());
}

Status ParquetFileWriter::Finish() { return parquet_writer_->Close(); }
Status ParquetFileWriter::FinishInternal() { return parquet_writer_->Close(); }

//
// ParquetFileFragment
Expand Down
9 changes: 5 additions & 4 deletions cpp/src/arrow/dataset/file_parquet.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ class ARROW_DS_EXPORT ParquetFileFormat : public FileFormat {
std::unordered_set<std::string> dict_columns;
bool pre_buffer = false;
arrow::io::CacheOptions cache_options = arrow::io::CacheOptions::Defaults();
arrow::io::AsyncContext async_context;
arrow::io::IOContext io_context;
/// @}

/// EXPERIMENTAL: Parallelize conversion across columns. This option is ignored if a
Expand Down Expand Up @@ -226,12 +226,13 @@ class ARROW_DS_EXPORT ParquetFileWriter : public FileWriter {

Status Write(const std::shared_ptr<RecordBatch>& batch) override;

Status Finish() override;

private:
ParquetFileWriter(std::shared_ptr<parquet::arrow::FileWriter> writer,
ParquetFileWriter(std::shared_ptr<io::OutputStream> destination,
std::shared_ptr<parquet::arrow::FileWriter> writer,
std::shared_ptr<ParquetFileWriteOptions> options);

Status FinishInternal() override;

std::shared_ptr<parquet::arrow::FileWriter> parquet_writer_;

friend class ParquetFileFormat;
Expand Down
Loading