Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 16 additions & 21 deletions cpp/src/parquet/arrow/arrow-reader-writer-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -383,8 +383,7 @@ void DoConfiguredRoundtrip(

std::unique_ptr<FileReader> reader;
ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool(),
::parquet::default_reader_properties(), nullptr, &reader));
::arrow::default_memory_pool(), &reader));
ASSERT_OK_NO_THROW(reader->ReadTable(out));
}

Expand Down Expand Up @@ -421,8 +420,7 @@ void DoSimpleRoundtrip(const std::shared_ptr<Table>& table, bool use_threads,

std::unique_ptr<FileReader> reader;
ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool(),
::parquet::default_reader_properties(), nullptr, &reader));
::arrow::default_memory_pool(), &reader));

reader->set_use_threads(use_threads);

Expand Down Expand Up @@ -499,8 +497,7 @@ class TestParquetIO : public ::testing::Test {
std::shared_ptr<Buffer> buffer;
ASSERT_OK_NO_THROW(sink_->Finish(&buffer));
ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool(),
::parquet::default_reader_properties(), nullptr, out));
::arrow::default_memory_pool(), out));
}

void ReadSingleColumnFile(std::unique_ptr<FileReader> file_reader,
Expand Down Expand Up @@ -1869,8 +1866,7 @@ TEST(TestArrowReadWrite, ReadSingleRowGroup) {

std::unique_ptr<FileReader> reader;
ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool(),
::parquet::default_reader_properties(), nullptr, &reader));
::arrow::default_memory_pool(), &reader));

ASSERT_EQ(2, reader->num_row_groups());

Expand Down Expand Up @@ -1907,8 +1903,9 @@ TEST(TestArrowReadWrite, GetRecordBatchReader) {
properties.set_batch_size(100);

std::unique_ptr<FileReader> reader;
ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool(), properties, &reader));
FileReaderBuilder builder;
ASSERT_OK(builder.Open(std::make_shared<BufferReader>(buffer)));
ASSERT_OK(builder.properties(properties)->Build(&reader));

std::shared_ptr<::arrow::RecordBatchReader> rb_reader;
ASSERT_OK_NO_THROW(reader->GetRecordBatchReader({0, 1}, &rb_reader));
Expand Down Expand Up @@ -1938,8 +1935,7 @@ TEST(TestArrowReadWrite, ScanContents) {

std::unique_ptr<FileReader> reader;
ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool(),
::parquet::default_reader_properties(), nullptr, &reader));
::arrow::default_memory_pool(), &reader));

int64_t num_rows_returned = 0;
ASSERT_OK_NO_THROW(reader->ScanContents({}, 256, &num_rows_returned));
Expand Down Expand Up @@ -1994,8 +1990,7 @@ TEST(TestArrowReadWrite, ListLargeRecords) {

std::unique_ptr<FileReader> reader;
ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool(),
::parquet::default_reader_properties(), nullptr, &reader));
::arrow::default_memory_pool(), &reader));

// Read everything
std::shared_ptr<Table> result;
Expand All @@ -2004,8 +1999,7 @@ TEST(TestArrowReadWrite, ListLargeRecords) {

// Read 1 record at a time
ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool(),
::parquet::default_reader_properties(), nullptr, &reader));
::arrow::default_memory_pool(), &reader));

std::unique_ptr<ColumnReader> col_reader;
ASSERT_OK(reader->GetColumn(0, &col_reader));
Expand Down Expand Up @@ -2216,9 +2210,8 @@ class TestNestedSchemaRead : public ::testing::TestWithParam<Repetition::type> {
void InitReader() {
std::shared_ptr<Buffer> buffer;
ASSERT_OK_NO_THROW(nested_parquet_->Finish(&buffer));
ASSERT_OK_NO_THROW(
OpenFile(std::make_shared<BufferReader>(buffer), ::arrow::default_memory_pool(),
::parquet::default_reader_properties(), nullptr, &reader_));
ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
::arrow::default_memory_pool(), &reader_));
}

void InitNewParquetFile(const std::shared_ptr<GroupNode>& schema, int num_rows) {
Expand Down Expand Up @@ -2780,8 +2773,10 @@ class TestArrowReadDictionary : public ::testing::TestWithParam<double> {

void CheckReadWholeFile(const Table& expected) {
std::unique_ptr<FileReader> reader;
ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer_),
::arrow::default_memory_pool(), properties_, &reader));

FileReaderBuilder builder;
ASSERT_OK_NO_THROW(builder.Open(std::make_shared<BufferReader>(buffer_)));
ASSERT_OK(builder.properties(properties_)->Build(&reader));

std::shared_ptr<Table> actual;
ASSERT_OK_NO_THROW(reader->ReadTable(&actual));
Expand Down
56 changes: 42 additions & 14 deletions cpp/src/parquet/arrow/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -814,28 +814,56 @@ Status FileReader::Make(::arrow::MemoryPool* pool,
return Make(pool, std::move(reader), default_arrow_reader_properties(), out);
}

Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file,
MemoryPool* pool, const ReaderProperties& props,
const std::shared_ptr<FileMetaData>& metadata,
std::unique_ptr<FileReader>* reader) {
std::unique_ptr<ParquetReader> pq_reader;
PARQUET_CATCH_NOT_OK(pq_reader = ParquetReader::Open(file, props, metadata));
return FileReader::Make(pool, std::move(pq_reader), default_arrow_reader_properties(),
reader);
FileReaderBuilder::FileReaderBuilder()
: pool_(::arrow::default_memory_pool()),
properties_(default_arrow_reader_properties()) {}

Status FileReaderBuilder::Open(const std::shared_ptr<::arrow::io::RandomAccessFile>& file,
const ReaderProperties& properties,
const std::shared_ptr<FileMetaData>& metadata) {
PARQUET_CATCH_NOT_OK(raw_reader_ = ParquetReader::Open(file, properties, metadata));
return Status::OK();
}

FileReaderBuilder* FileReaderBuilder::memory_pool(::arrow::MemoryPool* pool) {
pool_ = pool;
return this;
}

FileReaderBuilder* FileReaderBuilder::properties(
const ArrowReaderProperties& arg_properties) {
properties_ = arg_properties;
return this;
}

Status FileReaderBuilder::Build(std::unique_ptr<FileReader>* out) {
return FileReader::Make(pool_, std::move(raw_reader_), properties_, out);
}

Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file,
MemoryPool* pool, std::unique_ptr<FileReader>* reader) {
return OpenFile(file, pool, ::parquet::default_reader_properties(), nullptr, reader);
FileReaderBuilder builder;
RETURN_NOT_OK(builder.Open(file));
return builder.memory_pool(pool)->Build(reader);
}

Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file,
MemoryPool* pool, const ReaderProperties& props,
const std::shared_ptr<FileMetaData>& metadata,
std::unique_ptr<FileReader>* reader) {
// Deprecated since 0.15.0
FileReaderBuilder builder;
RETURN_NOT_OK(builder.Open(file, props, metadata));
return builder.memory_pool(pool)->Build(reader);
}

Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file,
::arrow::MemoryPool* pool, const ArrowReaderProperties& properties,
MemoryPool* pool, const ArrowReaderProperties& properties,
std::unique_ptr<FileReader>* reader) {
std::unique_ptr<ParquetReader> pq_reader;
PARQUET_CATCH_NOT_OK(pq_reader = ParquetReader::Open(
file, ::parquet::default_reader_properties(), nullptr));
return FileReader::Make(pool, std::move(pq_reader), properties, reader);
// Deprecated since 0.15.0
FileReaderBuilder builder;
RETURN_NOT_OK(builder.Open(file));
return builder.memory_pool(pool)->properties(properties)->Build(reader);
}

} // namespace arrow
Expand Down
33 changes: 27 additions & 6 deletions cpp/src/parquet/arrow/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <vector>

#include "parquet/platform.h"
#include "parquet/properties.h"

namespace arrow {

Expand Down Expand Up @@ -286,22 +287,42 @@ class PARQUET_EXPORT ColumnReader {
std::shared_ptr<::arrow::ChunkedArray>* out) = 0;
};

// Helper function to create a file reader from an implementation of an Arrow
// random access file
//
// metadata : separately-computed file metadata, can be nullptr
/// \brief Experimental helper class for bindings (like Python) that struggle
/// either with std::move or C++ exceptions
class PARQUET_EXPORT FileReaderBuilder {
public:
FileReaderBuilder();

::arrow::Status Open(const std::shared_ptr<::arrow::io::RandomAccessFile>& file,
const ReaderProperties& properties = default_reader_properties(),
const std::shared_ptr<FileMetaData>& metadata = NULLPTR);

ParquetFileReader* raw_reader() { return raw_reader_.get(); }

FileReaderBuilder* memory_pool(::arrow::MemoryPool* pool);
FileReaderBuilder* properties(const ArrowReaderProperties& arg_properties);
::arrow::Status Build(std::unique_ptr<FileReader>* out);

private:
::arrow::MemoryPool* pool_;
ArrowReaderProperties properties_;
std::unique_ptr<ParquetFileReader> raw_reader_;
};

PARQUET_EXPORT
::arrow::Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file,
::arrow::MemoryPool* allocator,
const ReaderProperties& properties,
const std::shared_ptr<FileMetaData>& metadata,
std::unique_ptr<FileReader>* reader);

ARROW_DEPRECATED("Deprecated since 0.15.0. Use FileReaderBuilder")
PARQUET_EXPORT
::arrow::Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file,
::arrow::MemoryPool* allocator,
const ReaderProperties& properties,
const std::shared_ptr<FileMetaData>& metadata,
std::unique_ptr<FileReader>* reader);

ARROW_DEPRECATED("Deprecated since 0.15.0. Use FileReaderBuilder")
PARQUET_EXPORT
::arrow::Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file,
::arrow::MemoryPool* allocator,
Expand Down
15 changes: 14 additions & 1 deletion docs/source/python/parquet.rst
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,19 @@ Alternatively python ``with`` syntax can also be use:
Data Type Handling
------------------

Reading types as DictionaryArray
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

The ``read_dictionary`` option in ``read_table`` and ``ParquetDataset`` will
cause columns to be read as ``DictionaryArray``, which will become
``pandas.Categorical`` when converted to pandas. This option is only valid for
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the limitation intended or simply because we only have it implemented for binary columns?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's only implemented for BYTE_ARRAY columns at the moment. We could expand that but there is little benefit from a performance/memory use point of view for the primitive types

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've also used this (through pandas.Categorical) in the past on date and float types (e.g. in some datasets you can have 1000s of products that only have one of 5 prices). This often gave a 4-6x improvement in memory usage for these columns.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(just dropping it here as FYI)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. I'll open a JIRA as a follow up

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

string and binary column types, and it can yield significantly lower memory use
and improved performance for columns with many repeated string values.

.. code-block:: python

pq.read_table(table, where, read_dictionary=['binary_c0', 'stringb_c2'])

Storing timestamps
~~~~~~~~~~~~~~~~~~

Expand Down Expand Up @@ -305,7 +318,7 @@ A dataset partitioned by year and month may look like on disk:
...

Writing to Partitioned Datasets
------------------------------------------------
-------------------------------

You can write a partitioned dataset for any ``pyarrow`` file system that is a
file-store (e.g. local, HDFS, S3). The default behaviour when no filesystem is
Expand Down
31 changes: 16 additions & 15 deletions python/pyarrow/_parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -328,14 +328,6 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil:
ReaderProperties default_reader_properties()

cdef cppclass ParquetFileReader:
@staticmethod
unique_ptr[ParquetFileReader] Open(
const shared_ptr[RandomAccessFile]& file,
const ReaderProperties& props,
const shared_ptr[CFileMetaData]& metadata)

@staticmethod
unique_ptr[ParquetFileReader] OpenFile(const c_string& path)
shared_ptr[CFileMetaData] metadata()


Expand All @@ -359,16 +351,14 @@ cdef extern from "parquet/api/writer.h" namespace "parquet" nogil:

cdef extern from "parquet/arrow/reader.h" namespace "parquet::arrow" nogil:
cdef cppclass ArrowReaderProperties:
pass
ArrowReaderProperties()
void set_read_dictionary(int column_index, c_bool read_dict)
c_bool read_dictionary()
void set_batch_size()
int64_t batch_size()

ArrowReaderProperties default_arrow_reader_properties()

CStatus OpenFile(const shared_ptr[RandomAccessFile]& file,
CMemoryPool* allocator,
const ReaderProperties& properties,
const shared_ptr[CFileMetaData]& metadata,
unique_ptr[FileReader]* reader)

cdef cppclass FileReader:
FileReader(CMemoryPool* pool, unique_ptr[ParquetFileReader] reader)
CStatus ReadColumn(int i, shared_ptr[CChunkedArray]* out)
Expand All @@ -390,6 +380,17 @@ cdef extern from "parquet/arrow/reader.h" namespace "parquet::arrow" nogil:

void set_use_threads(c_bool use_threads)

cdef cppclass FileReaderBuilder:
FileReaderBuilder()
CStatus Open(const shared_ptr[RandomAccessFile]& file,
const ReaderProperties& properties,
const shared_ptr[CFileMetaData]& metadata)

ParquetFileReader* raw_reader()
FileReaderBuilder* memory_pool(CMemoryPool*)
FileReaderBuilder* properties(const ArrowReaderProperties&)
CStatus Build(unique_ptr[FileReader]* out)

CStatus FromParquetSchema(
const SchemaDescriptor* parquet_schema,
const ArrowReaderProperties& properties,
Expand Down
Loading