Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion cpp/src/arrow/ipc/writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -651,9 +651,13 @@ Status GetTensorSize(const Tensor& tensor, int64_t* size) {

RecordBatchWriter::~RecordBatchWriter() {}

Status RecordBatchWriter::WriteTable(const Table& table) {
Status RecordBatchWriter::WriteTable(const Table& table, int64_t max_chunksize) {
TableBatchReader reader(table);

if (max_chunksize > 0) {
reader.set_chunksize(max_chunksize);
}

std::shared_ptr<RecordBatch> batch;
while (true) {
RETURN_NOT_OK(reader.ReadNext(&batch));
Expand All @@ -666,6 +670,8 @@ Status RecordBatchWriter::WriteTable(const Table& table) {
return Status::OK();
}

Status RecordBatchWriter::WriteTable(const Table& table) { return WriteTable(table, -1); }

// ----------------------------------------------------------------------
// Stream writer implementation

Expand Down
6 changes: 6 additions & 0 deletions cpp/src/arrow/ipc/writer.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ class ARROW_EXPORT RecordBatchWriter {
/// \return Status
Status WriteTable(const Table& table);

/// \brief Write Table with a particular chunksize
/// \param[in] table table to write
/// \param[in] max_chunksize maximum chunk size for table chunks
/// \return Status
Status WriteTable(const Table& table, int64_t max_chunksize);

/// \brief Perform any logic necessary to finish the stream
///
/// \return Status
Expand Down
33 changes: 33 additions & 0 deletions cpp/src/arrow/table-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -586,4 +586,37 @@ TEST_F(TestTableBatchReader, ReadNext) {
ASSERT_EQ(nullptr, batch);
}

TEST_F(TestTableBatchReader, Chunksize) {
auto a1 = MakeRandomArray<Int32Array>(10);
auto a2 = MakeRandomArray<Int32Array>(20);
auto a3 = MakeRandomArray<Int32Array>(10);

auto sch1 = arrow::schema({field("f1", int32())});
auto t1 = Table::Make(sch1, {column(sch1->field(0), {a1, a2, a3})});

TableBatchReader i1(*t1);

i1.set_chunksize(15);

std::shared_ptr<RecordBatch> batch;
ASSERT_OK(i1.ReadNext(&batch));
ASSERT_OK(batch->Validate());
ASSERT_EQ(10, batch->num_rows());

ASSERT_OK(i1.ReadNext(&batch));
ASSERT_OK(batch->Validate());
ASSERT_EQ(15, batch->num_rows());

ASSERT_OK(i1.ReadNext(&batch));
ASSERT_OK(batch->Validate());
ASSERT_EQ(5, batch->num_rows());

ASSERT_OK(i1.ReadNext(&batch));
ASSERT_OK(batch->Validate());
ASSERT_EQ(10, batch->num_rows());

ASSERT_OK(i1.ReadNext(&batch));
ASSERT_EQ(nullptr, batch);
}

} // namespace arrow
21 changes: 15 additions & 6 deletions cpp/src/arrow/table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include <algorithm>
#include <cstdlib>
#include <limits>
#include <memory>
#include <sstream>

Expand Down Expand Up @@ -403,7 +404,8 @@ class TableBatchReader::TableBatchReaderImpl {
column_data_(table.num_columns()),
chunk_numbers_(table.num_columns(), 0),
chunk_offsets_(table.num_columns(), 0),
absolute_row_position_(0) {
absolute_row_position_(0),
max_chunksize_(std::numeric_limits<int64_t>::max()) {
for (int i = 0; i < table.num_columns(); ++i) {
column_data_[i] = table.column(i)->data().get();
}
Expand All @@ -416,7 +418,7 @@ class TableBatchReader::TableBatchReaderImpl {
}

// Determine the minimum contiguous slice across all columns
int64_t chunksize = table_.num_rows();
int64_t chunksize = std::min(table_.num_rows(), max_chunksize_);
std::vector<const Array*> chunks(table_.num_columns());
for (int i = 0; i < table_.num_columns(); ++i) {
auto chunk = column_data_[i]->chunk(chunk_numbers_[i]).get();
Expand All @@ -430,8 +432,7 @@ class TableBatchReader::TableBatchReaderImpl {
}

// Slice chunks and advance chunk index as appropriate
std::vector<std::shared_ptr<ArrayData>> batch_data;
batch_data.reserve(table_.num_columns());
std::vector<std::shared_ptr<ArrayData>> batch_data(table_.num_columns());

for (int i = 0; i < table_.num_columns(); ++i) {
// Exhausted chunk
Expand All @@ -441,17 +442,18 @@ class TableBatchReader::TableBatchReaderImpl {
if ((chunk->length() - offset) == chunksize) {
++chunk_numbers_[i];
chunk_offsets_[i] = 0;
if (chunk_offsets_[i] > 0) {
if (offset > 0) {
// Need to slice
slice_data = chunk->Slice(offset, chunksize)->data();
} else {
// No slice
slice_data = chunk->data();
}
} else {
chunk_offsets_[i] += chunksize;
slice_data = chunk->Slice(offset, chunksize)->data();
}
batch_data.emplace_back(std::move(slice_data));
batch_data[i] = std::move(slice_data);
}

absolute_row_position_ += chunksize;
Expand All @@ -462,12 +464,15 @@ class TableBatchReader::TableBatchReaderImpl {

std::shared_ptr<Schema> schema() const { return table_.schema(); }

void set_chunksize(int64_t chunksize) { max_chunksize_ = chunksize; }

private:
const Table& table_;
std::vector<ChunkedArray*> column_data_;
std::vector<int> chunk_numbers_;
std::vector<int64_t> chunk_offsets_;
int64_t absolute_row_position_;
int64_t max_chunksize_;
};

TableBatchReader::TableBatchReader(const Table& table) {
Expand All @@ -478,6 +483,10 @@ TableBatchReader::~TableBatchReader() {}

std::shared_ptr<Schema> TableBatchReader::schema() const { return impl_->schema(); }

void TableBatchReader::set_chunksize(int64_t chunksize) {
impl_->set_chunksize(chunksize);
}

Status TableBatchReader::ReadNext(std::shared_ptr<RecordBatch>* out) {
return impl_->ReadNext(out);
}
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/arrow/table.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,8 @@ class ARROW_EXPORT TableBatchReader : public RecordBatchReader {

Status ReadNext(std::shared_ptr<RecordBatch>* out) override;

void set_chunksize(int64_t chunksize);

private:
class TableBatchReaderImpl;
std::unique_ptr<TableBatchReaderImpl> impl_;
Expand Down
9 changes: 8 additions & 1 deletion python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -456,6 +456,13 @@ cdef extern from "arrow/api.h" namespace "arrow" nogil:
shared_ptr[CTable] ReplaceSchemaMetadata(
const shared_ptr[CKeyValueMetadata]& metadata)

cdef cppclass RecordBatchReader:
CStatus ReadNext(shared_ptr[CRecordBatch]* out)

cdef cppclass TableBatchReader(RecordBatchReader):
TableBatchReader(const CTable& table)
void set_chunksize(int64_t chunksize)

cdef cppclass CTensor" arrow::Tensor":
shared_ptr[CDataType] type()
shared_ptr[CBuffer] data()
Expand Down Expand Up @@ -692,7 +699,7 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil:
CStatus Close()
CStatus WriteRecordBatch(const CRecordBatch& batch,
c_bool allow_64bit)
CStatus WriteTable(const CTable& table)
CStatus WriteTable(const CTable& table, int64_t max_chunksize)

cdef cppclass CRecordBatchReader" arrow::ipc::RecordBatchReader":
shared_ptr[CSchema] schema()
Expand Down
12 changes: 10 additions & 2 deletions python/pyarrow/ipc.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -202,16 +202,24 @@ cdef class _RecordBatchWriter:
check_status(self.writer.get()
.WriteRecordBatch(deref(batch.batch), 1))

def write_table(self, Table table):
def write_table(self, Table table, chunksize=None):
"""
Write RecordBatch to stream

Parameters
----------
batch : RecordBatch
"""
cdef:
# Chunksize must be > 0 to have any impact
int64_t c_chunksize = -1

if chunksize is not None:
c_chunksize = chunksize

with nogil:
check_status(self.writer.get().WriteTable(table.table[0]))
check_status(self.writer.get().WriteTable(table.table[0],
c_chunksize))

def close(self):
"""
Expand Down
38 changes: 38 additions & 0 deletions python/pyarrow/table.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -971,6 +971,44 @@ cdef class Table:

return pyarrow_wrap_table(c_table)

def to_batches(self, chunksize=None):
"""
Convert Table to list of (contiguous) RecordBatch objects, with optimal
maximum chunk size

Parameters
----------
chunksize : int, default None
Maximum size for RecordBatch chunks. Individual chunks may be
smaller depending on the chunk layout of individual columns

Returns
-------
batches : list of RecordBatch
"""
cdef:
unique_ptr[TableBatchReader] reader
int64_t c_chunksize
list result = []
shared_ptr[CRecordBatch] batch

reader.reset(new TableBatchReader(deref(self.table)))

if chunksize is not None:
c_chunksize = chunksize
reader.get().set_chunksize(c_chunksize)

while True:
with nogil:
check_status(reader.get().ReadNext(&batch))

if batch.get() == NULL:
break

result.append(pyarrow_wrap_batch(batch))

return result

def to_pandas(self, nthreads=None, strings_to_categorical=False,
memory_pool=None, zero_copy_only=False):
"""
Expand Down
23 changes: 23 additions & 0 deletions python/pyarrow/tests/test_ipc.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,29 @@ def test_stream_write_dispatch(self):
assert_frame_equal(table.to_pandas(),
pd.concat([df, df], ignore_index=True))

def test_stream_write_table_batches(self):
# ARROW-504
df = pd.DataFrame({
'one': np.random.randn(20),
})

b1 = pa.RecordBatch.from_pandas(df[:10], preserve_index=False)
b2 = pa.RecordBatch.from_pandas(df, preserve_index=False)

table = pa.Table.from_batches([b1, b2, b1])

writer = self._get_writer(self.sink, table.schema)
writer.write_table(table, chunksize=15)
writer.close()

batches = list(pa.open_stream(pa.BufferReader(self._get_source())))

assert list(map(len, batches)) == [10, 15, 5, 10]
result_table = pa.Table.from_batches(batches)
assert_frame_equal(result_table.to_pandas(),
pd.concat([df[:10], df, df[:10]],
ignore_index=True))

def test_simple_roundtrip(self):
_, batches = self.write_batches()
file_contents = pa.BufferReader(self._get_source())
Expand Down
25 changes: 25 additions & 0 deletions python/pyarrow/tests/test_table.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,31 @@ def test_recordbatchlist_schema_equals():
pa.Table.from_batches([batch1, batch2])


def test_table_to_batches():
df1 = pd.DataFrame({'a': list(range(10))})
df2 = pd.DataFrame({'a': list(range(10, 30))})

batch1 = pa.RecordBatch.from_pandas(df1, preserve_index=False)
batch2 = pa.RecordBatch.from_pandas(df2, preserve_index=False)

table = pa.Table.from_batches([batch1, batch2, batch1])

expected_df = pd.concat([df1, df2, df1], ignore_index=True)

batches = table.to_batches()
assert len(batches) == 3

assert_frame_equal(pa.Table.from_batches(batches).to_pandas(),
expected_df)

batches = table.to_batches(chunksize=15)
assert list(map(len, batches)) == [10, 15, 5, 10]

assert_frame_equal(table.to_pandas(), expected_df)
assert_frame_equal(pa.Table.from_batches(batches).to_pandas(),
expected_df)


def test_table_basics():
data = [
pa.array(range(5)),
Expand Down