Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions cpp/src/arrow/type-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,22 @@ TEST_F(TestSchema, GetFieldByName) {
ASSERT_TRUE(result == nullptr);
}

TEST_F(TestSchema, GetFieldIndex) {
auto f0 = field("f0", int32());
auto f1 = field("f1", uint8(), false);
auto f2 = field("f2", utf8());
auto f3 = field("f3", list(int16()));

vector<shared_ptr<Field>> fields = {f0, f1, f2, f3};
auto schema = std::make_shared<Schema>(fields);

ASSERT_EQ(0, schema->GetFieldIndex(fields[0]->name()));
ASSERT_EQ(1, schema->GetFieldIndex(fields[1]->name()));
ASSERT_EQ(2, schema->GetFieldIndex(fields[2]->name()));
ASSERT_EQ(3, schema->GetFieldIndex(fields[3]->name()));
ASSERT_EQ(-1, schema->GetFieldIndex("not-found"));
}

TEST_F(TestSchema, TestMetadataConstruction) {
auto f0 = field("f0", int32());
auto f1 = field("f1", uint8(), false);
Expand Down
11 changes: 8 additions & 3 deletions cpp/src/arrow/type.cc
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,12 @@ bool Schema::Equals(const Schema& other) const {
return true;
}

std::shared_ptr<Field> Schema::GetFieldByName(const std::string& name) {
std::shared_ptr<Field> Schema::GetFieldByName(const std::string& name) const {
int64_t i = GetFieldIndex(name);
return i == -1 ? nullptr : fields_[i];
}

int64_t Schema::GetFieldIndex(const std::string& name) const {
if (fields_.size() > 0 && name_to_index_.size() == 0) {
for (size_t i = 0; i < fields_.size(); ++i) {
name_to_index_[fields_[i]->name()] = static_cast<int>(i);
Expand All @@ -274,9 +279,9 @@ std::shared_ptr<Field> Schema::GetFieldByName(const std::string& name) {

auto it = name_to_index_.find(name);
if (it == name_to_index_.end()) {
return nullptr;
return -1;
} else {
return fields_[it->second];
return it->second;
}
}

Expand Down
7 changes: 5 additions & 2 deletions cpp/src/arrow/type.h
Original file line number Diff line number Diff line change
Expand Up @@ -699,7 +699,10 @@ class ARROW_EXPORT Schema {
std::shared_ptr<Field> field(int i) const { return fields_[i]; }

// Returns nullptr if name not found
std::shared_ptr<Field> GetFieldByName(const std::string& name);
std::shared_ptr<Field> GetFieldByName(const std::string& name) const;

// Returns -1 if name not found
int64_t GetFieldIndex(const std::string& name) const;

const std::vector<std::shared_ptr<Field>>& fields() const { return fields_; }
std::shared_ptr<const KeyValueMetadata> metadata() const { return metadata_; }
Expand All @@ -720,7 +723,7 @@ class ARROW_EXPORT Schema {

private:
std::vector<std::shared_ptr<Field>> fields_;
std::unordered_map<std::string, int> name_to_index_;
mutable std::unordered_map<std::string, int> name_to_index_;

std::shared_ptr<const KeyValueMetadata> metadata_;
};
Expand Down
3 changes: 2 additions & 1 deletion python/pyarrow/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,8 @@ def jemalloc_memory_pool():
from pyarrow.ipc import (RecordBatchFileReader, RecordBatchFileWriter,
RecordBatchStreamReader, RecordBatchStreamWriter,
open_stream,
open_file)
open_file,
serialize_pandas, deserialize_pandas)


localfs = LocalFilesystem.get_instance()
Expand Down
5 changes: 5 additions & 0 deletions python/pyarrow/_parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport (CArray, CSchema, CStatus,
CTable, CMemoryPool,
CKeyValueMetadata,
RandomAccessFile, OutputStream)


Expand Down Expand Up @@ -164,6 +165,7 @@ cdef extern from "parquet/api/reader.h" namespace "parquet" nogil:

unique_ptr[CRowGroupMetaData] RowGroup(int i)
const SchemaDescriptor* schema()
shared_ptr[const CKeyValueMetadata] key_value_metadata() const

cdef cppclass ReaderProperties:
pass
Expand Down Expand Up @@ -229,8 +231,11 @@ cdef extern from "parquet/arrow/reader.h" namespace "parquet::arrow" nogil:

cdef extern from "parquet/arrow/schema.h" namespace "parquet::arrow" nogil:
CStatus FromParquetSchema(const SchemaDescriptor* parquet_schema,
const shared_ptr[const CKeyValueMetadata]& key_value_metadata,
shared_ptr[CSchema]* out)

CStatus ToParquetSchema(const CSchema* arrow_schema,
const shared_ptr[const CKeyValueMetadata]& key_value_metadata,
shared_ptr[SchemaDescriptor]* out)


Expand Down
61 changes: 38 additions & 23 deletions python/pyarrow/_parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,15 @@ cdef class RowGroupMetaData:
cdef:
unique_ptr[CRowGroupMetaData] up_metadata
CRowGroupMetaData* metadata
object parent
FileMetaData parent

def __cinit__(self):
pass

cdef init_from_file(self, FileMetaData parent, int i):
cdef void init_from_file(self, FileMetaData parent, int i):
if i < 0 or i >= parent.num_row_groups:
raise IndexError('{0} out of bounds'.format(i))
self.up_metadata = parent.metadata.RowGroup(i)
self.up_metadata = parent._metadata.RowGroup(i)
self.metadata = self.up_metadata.get()
self.parent = parent

Expand Down Expand Up @@ -80,15 +80,15 @@ cdef class RowGroupMetaData:
cdef class FileMetaData:
cdef:
shared_ptr[CFileMetaData] sp_metadata
CFileMetaData* metadata
object _schema
CFileMetaData* _metadata
ParquetSchema _schema

def __cinit__(self):
pass

cdef init(self, const shared_ptr[CFileMetaData]& metadata):
self.sp_metadata = metadata
self.metadata = metadata.get()
self._metadata = metadata.get()

def __repr__(self):
return """{0}
Expand Down Expand Up @@ -116,27 +116,27 @@ cdef class FileMetaData:
property serialized_size:

def __get__(self):
return self.metadata.size()
return self._metadata.size()

property num_columns:

def __get__(self):
return self.metadata.num_columns()
return self._metadata.num_columns()

property num_rows:

def __get__(self):
return self.metadata.num_rows()
return self._metadata.num_rows()

property num_row_groups:

def __get__(self):
return self.metadata.num_row_groups()
return self._metadata.num_row_groups()

property format_version:

def __get__(self):
cdef ParquetVersion version = self.metadata.version()
cdef ParquetVersion version = self._metadata.version()
if version == ParquetVersion_V1:
return '1.0'
if version == ParquetVersion_V2:
Expand All @@ -149,7 +149,7 @@ cdef class FileMetaData:
property created_by:

def __get__(self):
return frombytes(self.metadata.created_by())
return frombytes(self._metadata.created_by())

def row_group(self, int i):
"""
Expand All @@ -159,14 +159,26 @@ cdef class FileMetaData:
result.init_from_file(self, i)
return result

property metadata:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs a test

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this cool now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let me add a small test, doing it now.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


def __get__(self):
cdef:
unordered_map[c_string, c_string] metadata
const CKeyValueMetadata* underlying_metadata
underlying_metadata = self._metadata.key_value_metadata().get()
if underlying_metadata != NULL:
underlying_metadata.ToUnorderedMap(&metadata)
return metadata
else:
return None


cdef class ParquetSchema:
cdef:
object parent # the FileMetaData owning the SchemaDescriptor
FileMetaData parent # the FileMetaData owning the SchemaDescriptor
const SchemaDescriptor* schema

def __cinit__(self):
self.parent = None
self.schema = NULL

def __repr__(self):
Expand All @@ -186,7 +198,7 @@ cdef class ParquetSchema:

cdef init_from_filemeta(self, FileMetaData container):
self.parent = container
self.schema = container.metadata.schema()
self.schema = container._metadata.schema()

def __len__(self):
return self.schema.num_columns()
Expand All @@ -211,7 +223,9 @@ cdef class ParquetSchema:
shared_ptr[CSchema] sp_arrow_schema

with nogil:
check_status(FromParquetSchema(self.schema, &sp_arrow_schema))
check_status(FromParquetSchema(
self.schema, self.parent._metadata.key_value_metadata(),
&sp_arrow_schema))

return pyarrow_wrap_schema(sp_arrow_schema)

Expand All @@ -232,7 +246,7 @@ cdef class ParquetSchema:

cdef class ColumnSchema:
cdef:
object parent
ParquetSchema parent
const ColumnDescriptor* descr

def __cinit__(self):
Expand Down Expand Up @@ -463,7 +477,7 @@ cdef class ParquetReader:
"""
cdef:
FileMetaData container = self.metadata
const CFileMetaData* metadata = container.metadata
const CFileMetaData* metadata = container._metadata
int i = 0

if self.column_idx_map is None:
Expand All @@ -488,12 +502,13 @@ cdef class ParquetReader:
return array


cdef check_compression_name(name):
cdef int check_compression_name(name) except -1:
if name.upper() not in ['NONE', 'SNAPPY', 'GZIP', 'LZO', 'BROTLI']:
raise ArrowException("Unsupported compression: " + name)
return 0


cdef ParquetCompression compression_from_name(object name):
cdef ParquetCompression compression_from_name(str name):
name = name.upper()
if name == "SNAPPY":
return ParquetCompression_SNAPPY
Expand Down Expand Up @@ -546,7 +561,7 @@ cdef class ParquetWriter:
maybe_unbox_memory_pool(memory_pool),
sink, properties, &self.writer))

cdef _set_version(self, WriterProperties.Builder* props):
cdef void _set_version(self, WriterProperties.Builder* props):
if self.version is not None:
if self.version == "1.0":
props.version(ParquetVersion_V1)
Expand All @@ -555,7 +570,7 @@ cdef class ParquetWriter:
else:
raise ArrowException("Unsupported Parquet format version")

cdef _set_compression_props(self, WriterProperties.Builder* props):
cdef void _set_compression_props(self, WriterProperties.Builder* props):
if isinstance(self.compression, basestring):
check_compression_name(self.compression)
props.compression(compression_from_name(self.compression))
Expand All @@ -564,7 +579,7 @@ cdef class ParquetWriter:
check_compression_name(codec)
props.compression(column, compression_from_name(codec))

cdef _set_dictionary_props(self, WriterProperties.Builder* props):
cdef void _set_dictionary_props(self, WriterProperties.Builder* props):
if isinstance(self.use_dictionary, bool):
if self.use_dictionary:
props.enable_dictionary()
Expand Down
Loading