Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/src/arrow/io/interfaces.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#define ARROW_IO_INTERFACES_H

#include <cstdint>
#include <memory>

namespace arrow {

Expand Down
19 changes: 15 additions & 4 deletions cpp/src/arrow/parquet/io.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,23 @@ void ParquetAllocator::Free(uint8_t* buffer, int64_t size) {
// ----------------------------------------------------------------------
// ParquetReadSource

ParquetReadSource::ParquetReadSource(
const std::shared_ptr<ArrowROFile>& file, ParquetAllocator* allocator)
: file_(file), allocator_(allocator) {}
ParquetReadSource::ParquetReadSource(ParquetAllocator* allocator)
: file_(nullptr), allocator_(allocator) {}

Status ParquetReadSource::Open(const std::shared_ptr<io::RandomAccessFile>& file) {
int64_t file_size;
RETURN_NOT_OK(file->GetSize(&file_size));

file_ = file;
size_ = file_size;
return Status::OK();
}

void ParquetReadSource::Close() {
PARQUET_THROW_NOT_OK(file_->Close());
// TODO(wesm): Make this a no-op for now. This leaves Python wrappers for
// these classes in a borked state. Probably better to explicitly close.

// PARQUET_THROW_NOT_OK(file_->Close());
}

int64_t ParquetReadSource::Tell() const {
Expand Down
10 changes: 7 additions & 3 deletions cpp/src/arrow/parquet/io.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,16 +49,20 @@ class ARROW_EXPORT ParquetAllocator : public ::parquet::MemoryAllocator {
uint8_t* Malloc(int64_t size) override;
void Free(uint8_t* buffer, int64_t size) override;

MemoryPool* pool() { return pool_; }
void set_pool(MemoryPool* pool) { pool_ = pool; }

MemoryPool* pool() const { return pool_; }

private:
MemoryPool* pool_;
};

class ARROW_EXPORT ParquetReadSource : public ::parquet::RandomAccessSource {
public:
ParquetReadSource(
const std::shared_ptr<io::RandomAccessFile>& file, ParquetAllocator* allocator);
explicit ParquetReadSource(ParquetAllocator* allocator);

// We need to ask for the file size on opening the file, and this can fail
Status Open(const std::shared_ptr<io::RandomAccessFile>& file);

void Close() override;
int64_t Tell() const override;
Expand Down
8 changes: 6 additions & 2 deletions cpp/src/arrow/parquet/parquet-io-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include "gtest/gtest.h"

#include "arrow/parquet/io.h"
#include "arrow/test-util.h"
#include "arrow/util/memory-pool.h"
#include "arrow/util/status.h"

Expand Down Expand Up @@ -147,9 +148,12 @@ TEST(TestParquetReadSource, Basics) {
std::string data = "this is the data";
auto data_buffer = reinterpret_cast<const uint8_t*>(data.c_str());

ParquetAllocator allocator;
ParquetAllocator allocator(default_memory_pool());

auto file = std::make_shared<BufferReader>(data_buffer, data.size());
auto source = std::make_shared<ParquetReadSource>(file, &allocator);
auto source = std::make_shared<ParquetReadSource>(&allocator);

ASSERT_OK(source->Open(file));

ASSERT_EQ(0, source->Tell());
ASSERT_NO_THROW(source->Seek(5));
Expand Down
20 changes: 20 additions & 0 deletions cpp/src/arrow/parquet/reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
#include <vector>

#include "arrow/column.h"
#include "arrow/parquet/io.h"
#include "arrow/parquet/schema.h"
#include "arrow/parquet/utils.h"
#include "arrow/schema.h"
Expand All @@ -35,6 +36,10 @@ using parquet::ColumnReader;
using parquet::Repetition;
using parquet::TypedColumnReader;

// Help reduce verbosity
using ParquetRAS = parquet::RandomAccessSource;
using ParquetReader = parquet::ParquetFileReader;

namespace arrow {
namespace parquet {

Expand Down Expand Up @@ -181,6 +186,21 @@ FileReader::FileReader(

FileReader::~FileReader() {}

// Static ctor
Status OpenFile(const std::shared_ptr<io::RandomAccessFile>& file,
ParquetAllocator* allocator, std::unique_ptr<FileReader>* reader) {
std::unique_ptr<ParquetReadSource> source(new ParquetReadSource(allocator));
RETURN_NOT_OK(source->Open(file));

// TODO(wesm): reader properties
std::unique_ptr<ParquetReader> pq_reader;
PARQUET_CATCH_NOT_OK(pq_reader = ParquetReader::Open(std::move(source)));

// Use the same memory pool as the ParquetAllocator
reader->reset(new FileReader(allocator->pool(), std::move(pq_reader)));
return Status::OK();
}

Status FileReader::GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out) {
return impl_->GetFlatColumn(i, out);
}
Expand Down
13 changes: 10 additions & 3 deletions cpp/src/arrow/parquet/reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include "parquet/api/reader.h"
#include "parquet/api/schema.h"

#include "arrow/io/interfaces.h"
#include "arrow/parquet/io.h"
#include "arrow/util/visibility.h"

namespace arrow {
Expand Down Expand Up @@ -99,7 +101,7 @@ class ARROW_EXPORT FileReader {
virtual ~FileReader();

private:
class Impl;
class ARROW_NO_EXPORT Impl;
std::unique_ptr<Impl> impl_;
};

Expand All @@ -125,15 +127,20 @@ class ARROW_EXPORT FlatColumnReader {
Status NextBatch(int batch_size, std::shared_ptr<Array>* out);

private:
class Impl;
class ARROW_NO_EXPORT Impl;
std::unique_ptr<Impl> impl_;
explicit FlatColumnReader(std::unique_ptr<Impl> impl);

friend class FileReader;
};

} // namespace parquet
// Helper function to create a file reader from an implementation of an Arrow
// readable file
ARROW_EXPORT
Status OpenFile(const std::shared_ptr<io::RandomAccessFile>& file,
ParquetAllocator* allocator, std::unique_ptr<FileReader>* reader);

} // namespace parquet
} // namespace arrow

#endif // ARROW_PARQUET_READER_H
1 change: 0 additions & 1 deletion cpp/src/arrow/parquet/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ using parquet::ParquetVersion;
using parquet::schema::GroupNode;

namespace arrow {

namespace parquet {

class FileWriter::Impl {
Expand Down
2 changes: 1 addition & 1 deletion cpp/src/arrow/parquet/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class ARROW_EXPORT FileWriter {
MemoryPool* memory_pool() const;

private:
class Impl;
class ARROW_NO_EXPORT Impl;
std::unique_ptr<Impl> impl_;
};

Expand Down
49 changes: 31 additions & 18 deletions python/pyarrow/includes/libarrow_io.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,37 @@

from pyarrow.includes.common cimport *

cdef extern from "arrow/io/interfaces.h" nogil:
cdef extern from "arrow/io/interfaces.h" namespace "arrow::io" nogil:
enum FileMode" arrow::io::FileMode::type":
FileMode_READ" arrow::io::FileMode::READ"
FileMode_WRITE" arrow::io::FileMode::WRITE"
FileMode_READWRITE" arrow::io::FileMode::READWRITE"

enum ObjectType" arrow::io::ObjectType::type":
ObjectType_FILE" arrow::io::ObjectType::FILE"
ObjectType_DIRECTORY" arrow::io::ObjectType::DIRECTORY"

cdef cppclass FileBase:
CStatus Close()
CStatus Tell(int64_t* position)

cdef cppclass ReadableFile(FileBase):
CStatus GetSize(int64_t* size)
CStatus Read(int64_t nbytes, int64_t* bytes_read,
uint8_t* buffer)

CStatus ReadAt(int64_t position, int64_t nbytes,
int64_t* bytes_read, uint8_t* buffer)

cdef cppclass RandomAccessFile(ReadableFile):
CStatus Seek(int64_t position)

cdef cppclass WriteableFile(FileBase):
CStatus Write(const uint8_t* buffer, int64_t nbytes)
# CStatus Write(const uint8_t* buffer, int64_t nbytes,
# int64_t* bytes_written)


cdef extern from "arrow/io/hdfs.h" namespace "arrow::io" nogil:
CStatus ConnectLibHdfs()

Expand All @@ -44,24 +70,11 @@ cdef extern from "arrow/io/hdfs.h" namespace "arrow::io" nogil:
int64_t block_size
int16_t permissions

cdef cppclass CHdfsFile:
CStatus Close()
CStatus Seek(int64_t position)
CStatus Tell(int64_t* position)

cdef cppclass HdfsReadableFile(CHdfsFile):
CStatus GetSize(int64_t* size)
CStatus Read(int64_t nbytes, int64_t* bytes_read,
uint8_t* buffer)

CStatus ReadAt(int64_t position, int64_t nbytes,
int64_t* bytes_read, uint8_t* buffer)

cdef cppclass HdfsWriteableFile(CHdfsFile):
CStatus Write(const uint8_t* buffer, int64_t nbytes)
cdef cppclass HdfsReadableFile(RandomAccessFile):
pass

CStatus Write(const uint8_t* buffer, int64_t nbytes,
int64_t* bytes_written)
cdef cppclass HdfsWriteableFile(WriteableFile):
pass

cdef cppclass CHdfsClient" arrow::io::HdfsClient":
@staticmethod
Expand Down
24 changes: 21 additions & 3 deletions python/pyarrow/includes/parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport CSchema, CStatus, CTable, MemoryPool
from pyarrow.includes.libarrow_io cimport RandomAccessFile


cdef extern from "parquet/api/schema.h" namespace "parquet::schema" nogil:
Expand Down Expand Up @@ -90,19 +91,36 @@ cdef extern from "parquet/api/writer.h" namespace "parquet" nogil:
shared_ptr[WriterProperties] build()


cdef extern from "arrow/parquet/io.h" namespace "arrow::parquet" nogil:
cdef cppclass ParquetAllocator:
ParquetAllocator()
ParquetAllocator(MemoryPool* pool)
MemoryPool* pool()
void set_pool(MemoryPool* pool)

cdef cppclass ParquetReadSource:
ParquetReadSource(ParquetAllocator* allocator)
Open(const shared_ptr[RandomAccessFile]& file)


cdef extern from "arrow/parquet/reader.h" namespace "arrow::parquet" nogil:
CStatus OpenFile(const shared_ptr[RandomAccessFile]& file,
ParquetAllocator* allocator,
unique_ptr[FileReader]* reader)

cdef cppclass FileReader:
FileReader(MemoryPool* pool, unique_ptr[ParquetFileReader] reader)
CStatus ReadFlatTable(shared_ptr[CTable]* out);


cdef extern from "arrow/parquet/schema.h" namespace "arrow::parquet" nogil:
CStatus FromParquetSchema(const SchemaDescriptor* parquet_schema, shared_ptr[CSchema]* out)
CStatus ToParquetSchema(const CSchema* arrow_schema, shared_ptr[SchemaDescriptor]* out)
CStatus FromParquetSchema(const SchemaDescriptor* parquet_schema,
shared_ptr[CSchema]* out)
CStatus ToParquetSchema(const CSchema* arrow_schema,
shared_ptr[SchemaDescriptor]* out)


cdef extern from "arrow/parquet/writer.h" namespace "arrow::parquet" nogil:
cdef CStatus WriteFlatTable(const CTable* table, MemoryPool* pool,
const shared_ptr[OutputStream]& sink, int64_t chunk_size,
const shared_ptr[WriterProperties]& properties)

32 changes: 32 additions & 0 deletions python/pyarrow/io.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

# distutils: language = c++

from pyarrow.includes.common cimport *
from pyarrow.includes.libarrow cimport *
from pyarrow.includes.libarrow_io cimport RandomAccessFile, WriteableFile


cdef class NativeFileInterface:

# By implementing these "virtual" functions (all functions in Cython
# extension classes are technically virtual in the C++ sense)m we can
# expose the arrow::io abstract file interfaces to other components
# throughout the suite of Arrow C++ libraries
cdef read_handle(self, shared_ptr[RandomAccessFile]* file)
cdef write_handle(self, shared_ptr[WriteableFile]* file)
19 changes: 17 additions & 2 deletions python/pyarrow/io.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ cdef class HdfsClient:
.ListDirectory(c_path, &listing))

cdef const HdfsPathInfo* info
for i in range(listing.size()):
for i in range(<int> listing.size()):
info = &listing[i]

# Try to trim off the hdfs://HOST:PORT piece
Expand Down Expand Up @@ -314,8 +314,15 @@ cdef class HdfsClient:
f = self.open(path, 'rb', buffer_size=buffer_size)
f.download(stream)

cdef class NativeFileInterface:

cdef class HdfsFile:
cdef read_handle(self, shared_ptr[RandomAccessFile]* file):
raise NotImplementedError

cdef write_handle(self, shared_ptr[WriteableFile]* file):
raise NotImplementedError

cdef class HdfsFile(NativeFileInterface):
cdef:
shared_ptr[HdfsReadableFile] rd_file
shared_ptr[HdfsWriteableFile] wr_file
Expand Down Expand Up @@ -357,6 +364,14 @@ cdef class HdfsFile:
if self.is_readonly:
raise IOError("only valid on writeonly files")

cdef read_handle(self, shared_ptr[RandomAccessFile]* file):
self._assert_readable()
file[0] = <shared_ptr[RandomAccessFile]> self.rd_file

cdef write_handle(self, shared_ptr[WriteableFile]* file):
self._assert_writeable()
file[0] = <shared_ptr[WriteableFile]> self.wr_file

def size(self):
cdef int64_t size
self._assert_readable()
Expand Down
Loading