Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 4 additions & 5 deletions docs/source/python/extending_types.rst
Original file line number Diff line number Diff line change
Expand Up @@ -111,15 +111,14 @@ IPC protocol::

>>> batch = pa.RecordBatch.from_arrays([arr], ["ext"])
>>> sink = pa.BufferOutputStream()
>>> writer = pa.RecordBatchStreamWriter(sink, batch.schema)
>>> writer.write_batch(batch)
>>> writer.close()
>>> with pa.RecordBatchStreamWriter(sink, batch.schema) as writer:
... writer.write_batch(batch)
>>> buf = sink.getvalue()

and then reading it back yields the proper type::

>>> reader = pa.ipc.open_stream(buf)
>>> result = reader.read_all()
>>> with pa.ipc.open_stream(buf) as reader:
... result = reader.read_all()
>>> result.column('ext').type
UuidType(extension<arrow.py_extension_type>)

Expand Down
44 changes: 23 additions & 21 deletions docs/source/python/ipc.rst
Original file line number Diff line number Diff line change
Expand Up @@ -65,21 +65,20 @@ this one can be created with :func:`~pyarrow.ipc.new_stream`:
.. ipython:: python

sink = pa.BufferOutputStream()
writer = pa.ipc.new_stream(sink, batch.schema)

with pa.ipc.new_stream(sink, batch.schema) as writer:
for i in range(5):
writer.write_batch(batch)

Here we used an in-memory Arrow buffer stream, but this could have been a
socket or some other IO sink.
Here we used an in-memory Arrow buffer stream (``sink``),
but this could have been a socket or some other IO sink.

When creating the ``StreamWriter``, we pass the schema, since the schema
(column names and types) must be the same for all of the batches sent in this
particular stream. Now we can do:

.. ipython:: python

for i in range(5):
writer.write_batch(batch)
writer.close()

buf = sink.getvalue()
buf.size

Expand All @@ -89,10 +88,11 @@ convenience function ``pyarrow.ipc.open_stream``:

.. ipython:: python

reader = pa.ipc.open_stream(buf)
reader.schema

batches = [b for b in reader]
with pa.ipc.open_stream(buf) as reader:
schema = reader.schema
batches = [b for b in reader]
Copy link
Member Author

@amol- amol- Aug 18, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Took me a while to get around this one, if you wonder why this block is indented to more spaces than the other blocks, it's to work around what seemed like a bug in ipython directive.
When indented normally the second line was parsed as outside of the with block

In [0]: with pa.ipc.open_stream(buf) as reader:
   ...:    schema = reader.schema
In [1]: batches = [b for b in reader]

instead of

In [0]: with pa.ipc.open_stream(buf) as reader:
   ...:       schema = reader.schema
   ...:       batches = [b for b in reader]

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Uh.


schema
len(batches)

We can check the returned batches are the same as the original input:
Expand All @@ -115,11 +115,10 @@ The :class:`~pyarrow.RecordBatchFileWriter` has the same API as
.. ipython:: python

sink = pa.BufferOutputStream()
writer = pa.ipc.new_file(sink, batch.schema)

for i in range(10):
writer.write_batch(batch)
writer.close()

with pa.ipc.new_file(sink, batch.schema) as writer:
for i in range(10):
writer.write_batch(batch)

buf = sink.getvalue()
buf.size
Expand All @@ -131,15 +130,16 @@ operations. We can also use the :func:`~pyarrow.ipc.open_file` method to open a

.. ipython:: python

reader = pa.ipc.open_file(buf)
with pa.ipc.open_file(buf) as reader:
num_record_batches = reader.num_record_batches
b = reader.get_batch(3)

Because we have access to the entire payload, we know the number of record
batches in the file, and can read any at random:
batches in the file, and can read any at random.

.. ipython:: python

reader.num_record_batches
b = reader.get_batch(3)
num_record_batches
b.equals(batch)

Reading from Stream and File Format for pandas
Expand All @@ -151,7 +151,9 @@ DataFrame output:

.. ipython:: python

df = pa.ipc.open_file(buf).read_pandas()
with pa.ipc.open_file(buf) as reader:
df = reader.read_pandas()

df[:5]

Efficiently Writing and Reading Arrow Data
Expand Down
15 changes: 3 additions & 12 deletions docs/source/python/parquet.rst
Original file line number Diff line number Diff line change
Expand Up @@ -208,22 +208,13 @@ We can similarly write a Parquet file with multiple row groups by using

.. ipython:: python

writer = pq.ParquetWriter('example2.parquet', table.schema)
for i in range(3):
writer.write_table(table)
writer.close()
with pq.ParquetWriter('example2.parquet', table.schema) as writer:
for i in range(3):
writer.write_table(table)

pf2 = pq.ParquetFile('example2.parquet')
pf2.num_row_groups

Alternatively python ``with`` syntax can also be use:

.. ipython:: python

with pq.ParquetWriter('example3.parquet', table.schema) as writer:
for i in range(3):
writer.write_table(table)

Inspecting the Parquet File Metadata
------------------------------------

Expand Down
10 changes: 4 additions & 6 deletions docs/source/python/plasma.rst
Original file line number Diff line number Diff line change
Expand Up @@ -360,9 +360,8 @@ size of the Plasma object.
# is done to determine the size of buffer to request from the object store.
object_id = plasma.ObjectID(np.random.bytes(20))
mock_sink = pa.MockOutputStream()
stream_writer = pa.RecordBatchStreamWriter(mock_sink, record_batch.schema)
stream_writer.write_batch(record_batch)
stream_writer.close()
with pa.RecordBatchStreamWriter(mock_sink, record_batch.schema) as stream_writer:
stream_writer.write_batch(record_batch)
data_size = mock_sink.size()
buf = client.create(object_id, data_size)

Expand All @@ -372,9 +371,8 @@ The DataFrame can now be written to the buffer as follows.

# Write the PyArrow RecordBatch to Plasma
stream = pa.FixedSizeBufferWriter(buf)
stream_writer = pa.RecordBatchStreamWriter(stream, record_batch.schema)
stream_writer.write_batch(record_batch)
stream_writer.close()
with pa.RecordBatchStreamWriter(stream, record_batch.schema) as stream_writer:
stream_writer.write_batch(record_batch)

Finally, seal the finished object for use by all clients:

Expand Down