Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions python/pyarrow/_parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil:

@staticmethod
unique_ptr[ParquetFileReader] OpenFile(const c_string& path)
shared_ptr[CFileMetaData] metadata();
shared_ptr[CFileMetaData] metadata()


cdef extern from "parquet/api/writer.h" namespace "parquet" nogil:
Expand Down Expand Up @@ -211,11 +211,18 @@ cdef extern from "parquet/arrow/reader.h" namespace "parquet::arrow" nogil:

cdef cppclass FileReader:
FileReader(CMemoryPool* pool, unique_ptr[ParquetFileReader] reader)
CStatus ReadColumn(int i, shared_ptr[CArray]* out);
CStatus ReadTable(shared_ptr[CTable]* out);
CStatus ReadColumn(int i, shared_ptr[CArray]* out)

int num_row_groups()
CStatus ReadRowGroup(int i, shared_ptr[CTable]* out)
CStatus ReadRowGroup(int i, const vector[int]& column_indices,
shared_ptr[CTable]* out)

CStatus ReadTable(shared_ptr[CTable]* out)
CStatus ReadTable(const vector[int]& column_indices,
shared_ptr[CTable]* out);
const ParquetFileReader* parquet_reader();
shared_ptr[CTable]* out)

const ParquetFileReader* parquet_reader()

void set_num_threads(int num_threads)

Expand Down
37 changes: 29 additions & 8 deletions python/pyarrow/_parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ from pyarrow.error import ArrowException
from pyarrow.error cimport check_status
from pyarrow.io import NativeFile
from pyarrow.memory cimport MemoryPool, maybe_unbox_memory_pool
from pyarrow.table cimport Table
from pyarrow.table cimport Table, table_from_ctable

from pyarrow.io cimport NativeFile, get_reader, get_writer

Expand Down Expand Up @@ -381,16 +381,39 @@ cdef class ParquetReader:
result.init(metadata)
return result

def read(self, column_indices=None, nthreads=1):
property num_row_groups:

def __get__(self):
return self.reader.get().num_row_groups()

def set_num_threads(self, int nthreads):
self.reader.get().set_num_threads(nthreads)

def read_row_group(self, int i, column_indices=None):
cdef:
Table table = Table()
shared_ptr[CTable] ctable
vector[int] c_column_indices

self.reader.get().set_num_threads(nthreads)
if column_indices is not None:
for index in column_indices:
c_column_indices.push_back(index)

with nogil:
check_status(self.reader.get()
.ReadRowGroup(i, c_column_indices, &ctable))
else:
# Read all columns
with nogil:
check_status(self.reader.get()
.ReadRowGroup(i, &ctable))
return table_from_ctable(ctable)

def read_all(self, column_indices=None):
cdef:
shared_ptr[CTable] ctable
vector[int] c_column_indices

if column_indices is not None:
# Read only desired column indices
for index in column_indices:
c_column_indices.push_back(index)

Expand All @@ -402,9 +425,7 @@ cdef class ParquetReader:
with nogil:
check_status(self.reader.get()
.ReadTable(&ctable))

table.init(ctable)
return table
return table_from_ctable(ctable)

def column_name_idx(self, column_name):
"""
Expand Down
53 changes: 39 additions & 14 deletions python/pyarrow/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,32 @@ def metadata(self):
def schema(self):
return self.metadata.schema

def read(self, nrows=None, columns=None, nthreads=1):
@property
def num_row_groups(self):
return self.reader.num_row_groups

def read_row_group(self, i, columns=None, nthreads=1):
"""
Read a single row group from a Parquet file

Parameters
----------
columns: list
If not None, only these columns will be read from the row group.
nthreads : int, default 1
Number of columns to read in parallel. If > 1, requires that the
underlying file source is threadsafe

Returns
-------
pyarrow.table.Table
Content of the row group as a table (of columns)
"""
column_indices = self._get_column_indices(columns)
self.reader.set_num_threads(nthreads)
return self.reader.read_row_group(i, column_indices=column_indices)

def read(self, columns=None, nthreads=1):
"""
Read a Table from Parquet format

Expand All @@ -67,17 +92,16 @@ def read(self, nrows=None, columns=None, nthreads=1):
pyarrow.table.Table
Content of the file as a table (of columns)
"""
if nrows is not None:
raise NotImplementedError("nrows argument")
column_indices = self._get_column_indices(columns)
self.reader.set_num_threads(nthreads)
return self.reader.read_all(column_indices=column_indices)

if columns is None:
column_indices = None
def _get_column_indices(self, column_names):
if column_names is None:
return None
else:
column_indices = [self.reader.column_name_idx(column)
for column in columns]

return self.reader.read(column_indices=column_indices,
nthreads=nthreads)
return [self.reader.column_name_idx(column)
for column in column_names]


def read_table(source, columns=None, nthreads=1, metadata=None):
Expand Down Expand Up @@ -178,16 +202,16 @@ def open_file(path, meta=None):
return all_data


def write_table(table, sink, chunk_size=None, version='1.0',
use_dictionary=True, compression='snappy'):
def write_table(table, sink, row_group_size=None, version='1.0',
use_dictionary=True, compression='snappy', **kwargs):
"""
Write a Table to Parquet format

Parameters
----------
table : pyarrow.Table
sink: string or pyarrow.io.NativeFile
chunk_size : int, default None
row_group_size : int, default None
The maximum number of rows in each Parquet RowGroup. As a default,
we will write a single RowGroup per file.
version : {"1.0", "2.0"}, default "1.0"
Expand All @@ -198,7 +222,8 @@ def write_table(table, sink, chunk_size=None, version='1.0',
compression : str or dict
Specify the compression codec, either on a general basis or per-column.
"""
row_group_size = kwargs.get('chunk_size', row_group_size)
writer = ParquetWriter(sink, use_dictionary=use_dictionary,
compression=compression,
version=version)
writer.write_table(table, row_group_size=chunk_size)
writer.write_table(table, row_group_size=row_group_size)
29 changes: 29 additions & 0 deletions python/pyarrow/tests/test_parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,35 @@ def test_pass_separate_metadata():
pdt.assert_frame_equal(df, fileh.read().to_pandas())


@parquet
def test_read_single_row_group():
# ARROW-471
N, K = 10000, 4
df = alltypes_sample(size=N)

a_table = pa.Table.from_pandas(df, timestamps_to_ms=True)

buf = io.BytesIO()
pq.write_table(a_table, buf, row_group_size=N / K,
compression='snappy', version='2.0')

buf.seek(0)

pf = pq.ParquetFile(buf)

assert pf.num_row_groups == K

row_groups = [pf.read_row_group(i) for i in range(K)]
result = pa.concat_tables(row_groups)
pdt.assert_frame_equal(df, result.to_pandas())

cols = df.columns[:2]
row_groups = [pf.read_row_group(i, columns=cols)
for i in range(K)]
result = pa.concat_tables(row_groups)
pdt.assert_frame_equal(df[cols], result.to_pandas())


@parquet
def test_read_multiple_files(tmpdir):
nfiles = 10
Expand Down