diff --git a/cmake_modules/FindClangTools.cmake b/cmake_modules/FindClangTools.cmake index c07c7d24..e4ee984e 100644 --- a/cmake_modules/FindClangTools.cmake +++ b/cmake_modules/FindClangTools.cmake @@ -27,16 +27,16 @@ # This module defines # CLANG_TIDY_BIN, The path to the clang tidy binary # CLANG_TIDY_FOUND, Whether clang tidy was found -# CLANG_FORMAT_BIN, The path to the clang format binary +# CLANG_FORMAT_BIN, The path to the clang format binary # CLANG_TIDY_FOUND, Whether clang format was found -find_program(CLANG_TIDY_BIN - NAMES clang-tidy-3.8 clang-tidy-3.7 clang-tidy-3.6 clang-tidy - PATHS ${ClangTools_PATH} $ENV{CLANG_TOOLS_PATH} /usr/local/bin /usr/bin +find_program(CLANG_TIDY_BIN + NAMES clang-tidy-3.9 clang-tidy-3.8 clang-tidy-3.7 clang-tidy-3.6 clang-tidy + PATHS ${ClangTools_PATH} $ENV{CLANG_TOOLS_PATH} /usr/local/bin /usr/bin NO_DEFAULT_PATH ) -if ( "${CLANG_TIDY_BIN}" STREQUAL "CLANG_TIDY_BIN-NOTFOUND" ) +if ( "${CLANG_TIDY_BIN}" STREQUAL "CLANG_TIDY_BIN-NOTFOUND" ) set(CLANG_TIDY_FOUND 0) message("clang-tidy not found") else() @@ -44,17 +44,16 @@ else() message("clang-tidy found at ${CLANG_TIDY_BIN}") endif() -find_program(CLANG_FORMAT_BIN - NAMES clang-format-3.8 clang-format-3.7 clang-format-3.6 clang-format - PATHS ${ClangTools_PATH} $ENV{CLANG_TOOLS_PATH} /usr/local/bin /usr/bin +find_program(CLANG_FORMAT_BIN + NAMES clang-format-3.9 clang-format-3.8 clang-format-3.7 clang-format-3.6 clang-format + PATHS ${ClangTools_PATH} $ENV{CLANG_TOOLS_PATH} /usr/local/bin /usr/bin NO_DEFAULT_PATH ) -if ( "${CLANG_FORMAT_BIN}" STREQUAL "CLANG_FORMAT_BIN-NOTFOUND" ) +if ( "${CLANG_FORMAT_BIN}" STREQUAL "CLANG_FORMAT_BIN-NOTFOUND" ) set(CLANG_FORMAT_FOUND 0) message("clang-format not found") else() set(CLANG_FORMAT_FOUND 1) message("clang-format found at ${CLANG_FORMAT_BIN}") endif() - diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc index 3b232f91..dd468936 100644 --- a/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/src/parquet/arrow/arrow-reader-writer-test.cc @@ -197,6 +197,36 @@ using ParquetDataType = DataType::parquet_enum>; template using ParquetWriter = TypedColumnWriter>; +void WriteTableToBuffer(const std::shared_ptr& table, int num_threads, + int64_t row_group_size, std::shared_ptr* out) { + auto sink = std::make_shared(); + + ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), sink, + row_group_size, default_writer_properties())); + *out = sink->GetBuffer(); +} + +void DoSimpleRoundtrip(const std::shared_ptr
& table, int num_threads, + int64_t row_group_size, const std::vector& column_subset, + std::shared_ptr
* out) { + std::shared_ptr buffer; + WriteTableToBuffer(table, num_threads, row_group_size, &buffer); + + std::unique_ptr reader; + ASSERT_OK_NO_THROW( + OpenFile(std::make_shared(buffer), ::arrow::default_memory_pool(), + ::parquet::default_reader_properties(), nullptr, &reader)); + + reader->set_num_threads(num_threads); + + if (column_subset.size() > 0) { + ASSERT_OK_NO_THROW(reader->ReadTable(column_subset, out)); + } else { + // Read everything + ASSERT_OK_NO_THROW(reader->ReadTable(out)); + } +} + template class TestParquetIO : public ::testing::Test { public: @@ -248,19 +278,6 @@ class TestParquetIO : public ::testing::Test { ASSERT_NE(nullptr, out->get()); } - void ReadAndCheckSingleColumnTable(const std::shared_ptr<::arrow::Array>& values) { - std::shared_ptr<::arrow::Table> out; - std::unique_ptr reader; - ReaderFromSink(&reader); - ReadTableFromFile(std::move(reader), &out); - ASSERT_EQ(1, out->num_columns()); - ASSERT_EQ(values->length(), out->num_rows()); - - std::shared_ptr chunked_array = out->column(0)->data(); - ASSERT_EQ(1, chunked_array->num_chunks()); - ASSERT_TRUE(values->Equals(chunked_array->chunk(0))); - } - void PrepareListTable(int64_t size, bool nullable_lists, bool nullable_elements, int64_t null_count, std::shared_ptr
* out) { std::shared_ptr values; @@ -289,13 +306,23 @@ class TestParquetIO : public ::testing::Test { *out = MakeSimpleTable(parent_lists, nullable_parent_lists); } - void WriteReadAndCheckSingleColumnTable(const std::shared_ptr
& table) { - std::shared_ptr values = table->column(0)->data()->chunk(0); - this->sink_ = std::make_shared(); - ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_, - values->length(), default_writer_properties())); + void ReadAndCheckSingleColumnTable(const std::shared_ptr<::arrow::Array>& values) { + std::shared_ptr<::arrow::Table> out; + std::unique_ptr reader; + ReaderFromSink(&reader); + ReadTableFromFile(std::move(reader), &out); + ASSERT_EQ(1, out->num_columns()); + ASSERT_EQ(values->length(), out->num_rows()); + + std::shared_ptr chunked_array = out->column(0)->data(); + ASSERT_EQ(1, chunked_array->num_chunks()); + ASSERT_TRUE(values->Equals(chunked_array->chunk(0))); + } - this->ReadAndCheckSingleColumnTable(values); + void CheckRoundTrip(const std::shared_ptr
& table) { + std::shared_ptr
result; + DoSimpleRoundtrip(table, 1, table->num_rows(), {}, &result); + ASSERT_TRUE(table->Equals(*result)); } template @@ -401,37 +428,37 @@ TYPED_TEST(TestParquetIO, SingleColumnTableOptionalReadWrite) { ASSERT_OK(NullableArray(SMALL_SIZE, 10, kDefaultSeed, &values)); std::shared_ptr
table = MakeSimpleTable(values, true); - this->WriteReadAndCheckSingleColumnTable(table); + this->CheckRoundTrip(table); } TYPED_TEST(TestParquetIO, SingleNullableListNullableColumnReadWrite) { std::shared_ptr
table; this->PrepareListTable(SMALL_SIZE, true, true, 10, &table); - this->WriteReadAndCheckSingleColumnTable(table); + this->CheckRoundTrip(table); } TYPED_TEST(TestParquetIO, SingleRequiredListNullableColumnReadWrite) { std::shared_ptr
table; this->PrepareListTable(SMALL_SIZE, false, true, 10, &table); - this->WriteReadAndCheckSingleColumnTable(table); + this->CheckRoundTrip(table); } TYPED_TEST(TestParquetIO, SingleNullableListRequiredColumnReadWrite) { std::shared_ptr
table; this->PrepareListTable(SMALL_SIZE, true, false, 10, &table); - this->WriteReadAndCheckSingleColumnTable(table); + this->CheckRoundTrip(table); } TYPED_TEST(TestParquetIO, SingleRequiredListRequiredColumnReadWrite) { std::shared_ptr
table; this->PrepareListTable(SMALL_SIZE, false, false, 0, &table); - this->WriteReadAndCheckSingleColumnTable(table); + this->CheckRoundTrip(table); } TYPED_TEST(TestParquetIO, SingleNullableListRequiredListRequiredColumnReadWrite) { std::shared_ptr
table; this->PrepareListOfListTable(SMALL_SIZE, true, false, false, 0, &table); - this->WriteReadAndCheckSingleColumnTable(table); + this->CheckRoundTrip(table); } TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedWrite) { @@ -756,18 +783,24 @@ TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedTableRead) { this->CheckSingleColumnRequiredTableRead(4); } -void MakeDoubleTable(int num_columns, int num_rows, std::shared_ptr
* out) { +void MakeDoubleTable( + int num_columns, int num_rows, int nchunks, std::shared_ptr
* out) { std::shared_ptr<::arrow::Column> column; std::vector> columns(num_columns); std::vector> fields(num_columns); - std::shared_ptr values; for (int i = 0; i < num_columns; ++i) { + std::vector> arrays; + std::shared_ptr values; ASSERT_OK(NullableArray<::arrow::DoubleType>( num_rows, num_rows / 10, static_cast(i), &values)); std::stringstream ss; ss << "col" << i; - column = MakeColumn(ss.str(), values, true); + + for (int j = 0; j < nchunks; ++j) { + arrays.push_back(values); + } + column = MakeColumn(ss.str(), arrays, true); columns[i] = column; fields[i] = column->field(); @@ -776,41 +809,46 @@ void MakeDoubleTable(int num_columns, int num_rows, std::shared_ptr
* out) *out = std::make_shared
(schema, columns); } -void DoTableRoundtrip(const std::shared_ptr
& table, int num_threads, - const std::vector& column_subset, std::shared_ptr
* out) { - auto sink = std::make_shared(); - - ASSERT_OK_NO_THROW(WriteTable( - *table, ::arrow::default_memory_pool(), sink, (table->num_rows() + 1) / 2)); +TEST(TestArrowReadWrite, MultithreadedRead) { + const int num_columns = 20; + const int num_rows = 1000; + const int num_threads = 4; - std::shared_ptr buffer = sink->GetBuffer(); - std::unique_ptr reader; - ASSERT_OK_NO_THROW( - OpenFile(std::make_shared(buffer), ::arrow::default_memory_pool(), - ::parquet::default_reader_properties(), nullptr, &reader)); + std::shared_ptr
table; + MakeDoubleTable(num_columns, num_rows, 1, &table); - reader->set_num_threads(num_threads); + std::shared_ptr
result; + DoSimpleRoundtrip(table, num_threads, table->num_rows(), {}, &result); - if (column_subset.size() > 0) { - ASSERT_OK_NO_THROW(reader->ReadTable(column_subset, out)); - } else { - // Read everything - ASSERT_OK_NO_THROW(reader->ReadTable(out)); - } + ASSERT_TRUE(table->Equals(*result)); } -TEST(TestArrowReadWrite, MultithreadedRead) { +TEST(TestArrowReadWrite, ReadSingleRowGroup) { const int num_columns = 20; const int num_rows = 1000; - const int num_threads = 4; std::shared_ptr
table; - MakeDoubleTable(num_columns, num_rows, &table); + MakeDoubleTable(num_columns, num_rows, 1, &table); - std::shared_ptr
result; - DoTableRoundtrip(table, num_threads, {}, &result); + std::shared_ptr buffer; + WriteTableToBuffer(table, 1, num_rows / 2, &buffer); - ASSERT_TRUE(table->Equals(*result)); + std::unique_ptr reader; + ASSERT_OK_NO_THROW( + OpenFile(std::make_shared(buffer), ::arrow::default_memory_pool(), + ::parquet::default_reader_properties(), nullptr, &reader)); + + ASSERT_EQ(2, reader->num_row_groups()); + + std::shared_ptr
r1, r2; + // Read everything + ASSERT_OK_NO_THROW(reader->ReadRowGroup(0, &r1)); + ASSERT_OK_NO_THROW(reader->ReadRowGroup(1, &r2)); + + std::shared_ptr
concatenated; + ASSERT_OK(ConcatenateTables({r1, r2}, &concatenated)); + + ASSERT_TRUE(table->Equals(*concatenated)); } TEST(TestArrowReadWrite, ReadColumnSubset) { @@ -819,11 +857,11 @@ TEST(TestArrowReadWrite, ReadColumnSubset) { const int num_threads = 4; std::shared_ptr
table; - MakeDoubleTable(num_columns, num_rows, &table); + MakeDoubleTable(num_columns, num_rows, 1, &table); std::shared_ptr
result; std::vector column_subset = {0, 4, 8, 10}; - DoTableRoundtrip(table, num_threads, column_subset, &result); + DoSimpleRoundtrip(table, num_threads, table->num_rows(), column_subset, &result); std::vector> ex_columns; std::vector> ex_fields; diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc index a26c3ea7..823aea9c 100644 --- a/src/parquet/arrow/reader.cc +++ b/src/parquet/arrow/reader.cc @@ -60,19 +60,139 @@ static inline int64_t impala_timestamp_to_nanoseconds(const Int96& impala_timest template using ArrayType = typename ::arrow::TypeTraits::ArrayType; +// ---------------------------------------------------------------------- +// Helper for parallel for-loop + +template +Status ParallelFor(int nthreads, int num_tasks, FUNCTION&& func) { + std::vector thread_pool; + thread_pool.reserve(nthreads); + std::atomic task_counter(0); + + std::mutex error_mtx; + bool error_occurred = false; + Status error; + + for (int thread_id = 0; thread_id < nthreads; ++thread_id) { + thread_pool.emplace_back( + [&num_tasks, &task_counter, &error, &error_occurred, &error_mtx, &func]() { + int task_id; + while (!error_occurred) { + task_id = task_counter.fetch_add(1); + if (task_id >= num_tasks) { break; } + Status s = func(task_id); + if (!s.ok()) { + std::lock_guard lock(error_mtx); + error_occurred = true; + error = s; + break; + } + } + }); + } + for (auto&& thread : thread_pool) { + thread.join(); + } + if (error_occurred) { return error; } + return Status::OK(); +} + +// ---------------------------------------------------------------------- +// Iteration utilities + +// Abstraction to decouple row group iteration details from the ColumnReader, +// so we can read only a single row group if we want +class FileColumnIterator { + public: + explicit FileColumnIterator(int column_index, ParquetFileReader* reader) + : column_index_(column_index), + reader_(reader), + schema_(reader->metadata()->schema()) {} + + virtual ~FileColumnIterator() {} + + virtual std::shared_ptr<::parquet::ColumnReader> Next() = 0; + + const SchemaDescriptor* schema() const { return schema_; } + + const ColumnDescriptor* descr() const { return schema_->Column(column_index_); } + + int column_index() const { return column_index_; } + + protected: + int column_index_; + ParquetFileReader* reader_; + const SchemaDescriptor* schema_; +}; + +class AllRowGroupsIterator : public FileColumnIterator { + public: + explicit AllRowGroupsIterator(int column_index, ParquetFileReader* reader) + : FileColumnIterator(column_index, reader), next_row_group_(0) {} + + std::shared_ptr<::parquet::ColumnReader> Next() override { + std::shared_ptr<::parquet::ColumnReader> result; + if (next_row_group_ < reader_->metadata()->num_row_groups()) { + result = reader_->RowGroup(next_row_group_)->Column(column_index_); + next_row_group_++; + } else { + result = nullptr; + } + return result; + }; + + private: + int next_row_group_; +}; + +class SingleRowGroupIterator : public FileColumnIterator { + public: + explicit SingleRowGroupIterator( + int column_index, int row_group_number, ParquetFileReader* reader) + : FileColumnIterator(column_index, reader), + row_group_number_(row_group_number), + done_(false) {} + + std::shared_ptr<::parquet::ColumnReader> Next() override { + if (done_) { return nullptr; } + + auto result = reader_->RowGroup(row_group_number_)->Column(column_index_); + done_ = true; + return result; + }; + + private: + int row_group_number_; + bool done_; +}; + +// ---------------------------------------------------------------------- +// File reader implementation + class FileReader::Impl { public: - Impl(MemoryPool* pool, std::unique_ptr reader); + Impl(MemoryPool* pool, std::unique_ptr reader) + : pool_(pool), reader_(std::move(reader)), num_threads_(1) {} + virtual ~Impl() {} - bool CheckForFlatColumn(const ColumnDescriptor* descr); - bool CheckForFlatListColumn(const ColumnDescriptor* descr); Status GetColumn(int i, std::unique_ptr* out); Status ReadColumn(int i, std::shared_ptr* out); - Status ReadTable(std::shared_ptr
* out); - Status ReadTable(const std::vector& column_indices, std::shared_ptr
* out); + Status GetSchema( + const std::vector& indices, std::shared_ptr<::arrow::Schema>* out); + Status ReadRowGroup(int row_group_index, const std::vector& indices, + std::shared_ptr<::arrow::Table>* out); + Status ReadTable(const std::vector& indices, std::shared_ptr
* table); + Status ReadTable(std::shared_ptr
* table); + Status ReadRowGroup(int i, std::shared_ptr
* table); + + bool CheckForFlatColumn(const ColumnDescriptor* descr); + bool CheckForFlatListColumn(const ColumnDescriptor* descr); + const ParquetFileReader* parquet_reader() const { return reader_.get(); } + int num_row_groups() const { return reader_->metadata()->num_row_groups(); } + void set_num_threads(int num_threads) { num_threads_ = num_threads; } private: @@ -84,8 +204,17 @@ class FileReader::Impl { class ColumnReader::Impl { public: - Impl(MemoryPool* pool, const ColumnDescriptor* descr, ParquetFileReader* reader, - int column_index); + Impl(MemoryPool* pool, std::unique_ptr input) + : pool_(pool), + input_(std::move(input)), + descr_(input_->descr()), + values_buffer_(pool), + def_levels_buffer_(pool), + rep_levels_buffer_(pool) { + NodeToField(input_->descr()->schema_node(), &field_); + NextRowGroup(); + } + virtual ~Impl() {} Status NextBatch(int batch_size, std::shared_ptr* out); @@ -121,10 +250,9 @@ class ColumnReader::Impl { }; MemoryPool* pool_; + std::unique_ptr input_; const ColumnDescriptor* descr_; - ParquetFileReader* reader_; - int column_index_; - int next_row_group_; + std::shared_ptr<::parquet::ColumnReader> column_reader_; std::shared_ptr field_; @@ -139,14 +267,16 @@ class ColumnReader::Impl { int64_t null_count_; }; -FileReader::Impl::Impl(MemoryPool* pool, std::unique_ptr reader) - : pool_(pool), reader_(std::move(reader)), num_threads_(1) {} +FileReader::FileReader(MemoryPool* pool, std::unique_ptr reader) + : impl_(new FileReader::Impl(pool, std::move(reader))) {} + +FileReader::~FileReader() {} Status FileReader::Impl::GetColumn(int i, std::unique_ptr* out) { - const SchemaDescriptor* schema = reader_->metadata()->schema(); + std::unique_ptr input(new AllRowGroupsIterator(i, reader_.get())); std::unique_ptr impl( - new ColumnReader::Impl(pool_, schema->Column(i), reader_.get(), i)); + new ColumnReader::Impl(pool_, std::move(input))); *out = std::unique_ptr(new ColumnReader(std::move(impl))); return Status::OK(); } @@ -163,55 +293,59 @@ Status FileReader::Impl::ReadColumn(int i, std::shared_ptr* out) { return flat_column_reader->NextBatch(batch_size, out); } -Status FileReader::Impl::ReadTable(std::shared_ptr
* table) { - std::vector column_indices(reader_->metadata()->num_columns()); - - for (size_t i = 0; i < column_indices.size(); ++i) { - column_indices[i] = i; - } - return ReadTable(column_indices, table); +Status FileReader::Impl::GetSchema( + const std::vector& indices, std::shared_ptr<::arrow::Schema>* out) { + auto descr = reader_->metadata()->schema(); + return FromParquetSchema(descr, indices, out); } -template -Status ParallelFor(int nthreads, int num_tasks, FUNCTION&& func) { - std::vector thread_pool; - thread_pool.reserve(nthreads); - std::atomic task_counter(0); +Status FileReader::Impl::ReadRowGroup(int row_group_index, + const std::vector& indices, std::shared_ptr<::arrow::Table>* out) { + std::shared_ptr<::arrow::Schema> schema; + RETURN_NOT_OK(GetSchema(indices, &schema)); - std::mutex error_mtx; - bool error_occurred = false; - Status error; + auto rg_metadata = reader_->metadata()->RowGroup(row_group_index); - for (int thread_id = 0; thread_id < nthreads; ++thread_id) { - thread_pool.emplace_back( - [&num_tasks, &task_counter, &error, &error_occurred, &error_mtx, &func]() { - int task_id; - while (!error_occurred) { - task_id = task_counter.fetch_add(1); - if (task_id >= num_tasks) { break; } - Status s = func(task_id); - if (!s.ok()) { - std::lock_guard lock(error_mtx); - error_occurred = true; - error = s; - break; - } - } - }); - } - for (auto&& thread : thread_pool) { - thread.join(); + int num_columns = static_cast(indices.size()); + int nthreads = std::min(num_threads_, num_columns); + std::vector> columns(num_columns); + + // TODO(wesm): Refactor to share more code with ReadTable + + auto ReadColumnFunc = [&indices, &row_group_index, &schema, &columns, &rg_metadata, + this](int i) { + int column_index = indices[i]; + int64_t batch_size = rg_metadata->ColumnChunk(column_index)->num_values(); + + std::unique_ptr input( + new SingleRowGroupIterator(column_index, row_group_index, reader_.get())); + + std::unique_ptr impl( + new ColumnReader::Impl(pool_, std::move(input))); + ColumnReader flat_column_reader(std::move(impl)); + + std::shared_ptr array; + RETURN_NOT_OK(flat_column_reader.NextBatch(batch_size, &array)); + columns[i] = std::make_shared(schema->field(i), array); + return Status::OK(); + }; + + if (nthreads == 1) { + for (int i = 0; i < num_columns; i++) { + RETURN_NOT_OK(ReadColumnFunc(i)); + } + } else { + RETURN_NOT_OK(ParallelFor(nthreads, num_columns, ReadColumnFunc)); } - if (error_occurred) { return error; } + + *out = std::make_shared
(schema, columns); return Status::OK(); } Status FileReader::Impl::ReadTable( const std::vector& indices, std::shared_ptr
* table) { - auto descr = reader_->metadata()->schema(); - std::shared_ptr<::arrow::Schema> schema; - RETURN_NOT_OK(FromParquetSchema(descr, indices, &schema)); + RETURN_NOT_OK(GetSchema(indices, &schema)); int num_columns = static_cast(indices.size()); int nthreads = std::min(num_threads_, num_columns); @@ -236,10 +370,23 @@ Status FileReader::Impl::ReadTable( return Status::OK(); } -FileReader::FileReader(MemoryPool* pool, std::unique_ptr reader) - : impl_(new FileReader::Impl(pool, std::move(reader))) {} +Status FileReader::Impl::ReadTable(std::shared_ptr
* table) { + std::vector indices(reader_->metadata()->num_columns()); -FileReader::~FileReader() {} + for (size_t i = 0; i < indices.size(); ++i) { + indices[i] = i; + } + return ReadTable(indices, table); +} + +Status FileReader::Impl::ReadRowGroup(int i, std::shared_ptr
* table) { + std::vector indices(reader_->metadata()->num_columns()); + + for (size_t i = 0; i < indices.size(); ++i) { + indices[i] = i; + } + return ReadRowGroup(i, indices, table); +} // Static ctor Status OpenFile(const std::shared_ptr<::arrow::io::ReadableFileInterface>& file, @@ -280,14 +427,35 @@ Status FileReader::ReadTable(std::shared_ptr
* out) { } Status FileReader::ReadTable( - const std::vector& column_indices, std::shared_ptr
* out) { + const std::vector& indices, std::shared_ptr
* out) { + try { + return impl_->ReadTable(indices, out); + } catch (const ::parquet::ParquetException& e) { + return ::arrow::Status::IOError(e.what()); + } +} + +Status FileReader::ReadRowGroup(int i, std::shared_ptr
* out) { + try { + return impl_->ReadRowGroup(i, out); + } catch (const ::parquet::ParquetException& e) { + return ::arrow::Status::IOError(e.what()); + } +} + +Status FileReader::ReadRowGroup( + int i, const std::vector& indices, std::shared_ptr
* out) { try { - return impl_->ReadTable(column_indices, out); + return impl_->ReadRowGroup(i, indices, out); } catch (const ::parquet::ParquetException& e) { return ::arrow::Status::IOError(e.what()); } } +int FileReader::num_row_groups() const { + return impl_->num_row_groups(); +} + void FileReader::set_num_threads(int num_threads) { impl_->set_num_threads(num_threads); } @@ -296,20 +464,6 @@ const ParquetFileReader* FileReader::parquet_reader() const { return impl_->parquet_reader(); } -ColumnReader::Impl::Impl(MemoryPool* pool, const ColumnDescriptor* descr, - ParquetFileReader* reader, int column_index) - : pool_(pool), - descr_(descr), - reader_(reader), - column_index_(column_index), - next_row_group_(0), - values_buffer_(pool), - def_levels_buffer_(pool), - rep_levels_buffer_(pool) { - NodeToField(descr_->schema_node(), &field_); - NextRowGroup(); -} - template Status ColumnReader::Impl::ReadNonNullableBatch(TypedColumnReader* reader, int64_t values_to_read, int64_t* levels_read) { @@ -563,7 +717,7 @@ Status ColumnReader::Impl::WrapIntoListArray(const int16_t* def_levels, if (descr_->max_repetition_level() > 0) { std::shared_ptr<::arrow::Schema> arrow_schema; RETURN_NOT_OK( - FromParquetSchema(reader_->metadata()->schema(), {column_index_}, &arrow_schema)); + FromParquetSchema(input_->schema(), {input_->column_index()}, &arrow_schema)); // Walk downwards to extract nullability std::shared_ptr current_field = arrow_schema->field(0); @@ -912,12 +1066,7 @@ Status ColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr* out } void ColumnReader::Impl::NextRowGroup() { - if (next_row_group_ < reader_->metadata()->num_row_groups()) { - column_reader_ = reader_->RowGroup(next_row_group_)->Column(column_index_); - next_row_group_++; - } else { - column_reader_ = nullptr; - } + column_reader_ = input_->Next(); } ColumnReader::ColumnReader(std::unique_ptr impl) : impl_(std::move(impl)) {} diff --git a/src/parquet/arrow/reader.h b/src/parquet/arrow/reader.h index 1aa9c3ef..f12acaf3 100644 --- a/src/parquet/arrow/reader.h +++ b/src/parquet/arrow/reader.h @@ -107,6 +107,13 @@ class PARQUET_EXPORT FileReader { ::arrow::Status ReadTable( const std::vector& column_indices, std::shared_ptr<::arrow::Table>* out); + ::arrow::Status ReadRowGroup(int i, const std::vector& column_indices, + std::shared_ptr<::arrow::Table>* out); + + ::arrow::Status ReadRowGroup(int i, std::shared_ptr<::arrow::Table>* out); + + int num_row_groups() const; + const ParquetFileReader* parquet_reader() const; /// Set the number of threads to use during reads of multiple columns. By diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h index 2cfc60ab..bff952bb 100644 --- a/src/parquet/arrow/test-util.h +++ b/src/parquet/arrow/test-util.h @@ -260,12 +260,18 @@ Status MakeListArary(const std::shared_ptr& values, int64_t size, return Status::OK(); } -std::shared_ptr<::arrow::Column> MakeColumn( +static std::shared_ptr<::arrow::Column> MakeColumn( const std::string& name, const std::shared_ptr& array, bool nullable) { auto field = std::make_shared<::arrow::Field>(name, array->type(), nullable); return std::make_shared<::arrow::Column>(field, array); } +static std::shared_ptr<::arrow::Column> MakeColumn(const std::string& name, + const std::vector>& arrays, bool nullable) { + auto field = std::make_shared<::arrow::Field>(name, arrays[0]->type(), nullable); + return std::make_shared<::arrow::Column>(field, arrays); +} + std::shared_ptr<::arrow::Table> MakeSimpleTable( const std::shared_ptr& values, bool nullable) { std::shared_ptr<::arrow::Column> column = MakeColumn("col", values, nullable); diff --git a/src/parquet/column/writer.cc b/src/parquet/column/writer.cc index 2ba4162a..eb741475 100644 --- a/src/parquet/column/writer.cc +++ b/src/parquet/column/writer.cc @@ -213,7 +213,7 @@ TypedColumnWriter::TypedColumnWriter(ColumnChunkMetaDataBuilder* metadata, const WriterProperties* properties) : ColumnWriter(metadata, std::move(pager), expected_rows, (encoding == Encoding::PLAIN_DICTIONARY || - encoding == Encoding::RLE_DICTIONARY), + encoding == Encoding::RLE_DICTIONARY), encoding, properties) { switch (encoding) { case Encoding::PLAIN: