From 904af3059d8b666c1c124e942bfa80fdf695ea1f Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Wed, 18 Aug 2021 10:57:50 +0200 Subject: [PATCH 1/2] Switch ipc doc to use context managers --- docs/source/python/ipc.rst | 44 ++++++++++++++++++++------------------ 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/docs/source/python/ipc.rst b/docs/source/python/ipc.rst index 249780a8dcd..0ba557b6487 100644 --- a/docs/source/python/ipc.rst +++ b/docs/source/python/ipc.rst @@ -65,10 +65,13 @@ this one can be created with :func:`~pyarrow.ipc.new_stream`: .. ipython:: python sink = pa.BufferOutputStream() - writer = pa.ipc.new_stream(sink, batch.schema) + + with pa.ipc.new_stream(sink, batch.schema) as writer: + for i in range(5): + writer.write_batch(batch) -Here we used an in-memory Arrow buffer stream, but this could have been a -socket or some other IO sink. +Here we used an in-memory Arrow buffer stream (``sink``), +but this could have been a socket or some other IO sink. When creating the ``StreamWriter``, we pass the schema, since the schema (column names and types) must be the same for all of the batches sent in this @@ -76,10 +79,6 @@ particular stream. Now we can do: .. ipython:: python - for i in range(5): - writer.write_batch(batch) - writer.close() - buf = sink.getvalue() buf.size @@ -89,10 +88,11 @@ convenience function ``pyarrow.ipc.open_stream``: .. ipython:: python - reader = pa.ipc.open_stream(buf) - reader.schema - - batches = [b for b in reader] + with pa.ipc.open_stream(buf) as reader: + schema = reader.schema + batches = [b for b in reader] + + schema len(batches) We can check the returned batches are the same as the original input: @@ -115,11 +115,10 @@ The :class:`~pyarrow.RecordBatchFileWriter` has the same API as .. ipython:: python sink = pa.BufferOutputStream() - writer = pa.ipc.new_file(sink, batch.schema) - - for i in range(10): - writer.write_batch(batch) - writer.close() + + with pa.ipc.new_file(sink, batch.schema) as writer: + for i in range(10): + writer.write_batch(batch) buf = sink.getvalue() buf.size @@ -131,15 +130,16 @@ operations. We can also use the :func:`~pyarrow.ipc.open_file` method to open a .. ipython:: python - reader = pa.ipc.open_file(buf) + with pa.ipc.open_file(buf) as reader: + num_record_batches = reader.num_record_batches + b = reader.get_batch(3) Because we have access to the entire payload, we know the number of record -batches in the file, and can read any at random: +batches in the file, and can read any at random. .. ipython:: python - reader.num_record_batches - b = reader.get_batch(3) + num_record_batches b.equals(batch) Reading from Stream and File Format for pandas @@ -151,7 +151,9 @@ DataFrame output: .. ipython:: python - df = pa.ipc.open_file(buf).read_pandas() + with pa.ipc.open_file(buf) as reader: + df = reader.read_pandas() + df[:5] Efficiently Writing and Reading Arrow Data From 9a37536cc8e77117b7a6c206259d688208598abb Mon Sep 17 00:00:00 2001 From: Alessandro Molina Date: Wed, 18 Aug 2021 11:46:32 +0200 Subject: [PATCH 2/2] Migrate other examples to context managers --- docs/source/python/extending_types.rst | 9 ++++----- docs/source/python/parquet.rst | 15 +++------------ docs/source/python/plasma.rst | 10 ++++------ 3 files changed, 11 insertions(+), 23 deletions(-) diff --git a/docs/source/python/extending_types.rst b/docs/source/python/extending_types.rst index ac2668f59da..689724a4abc 100644 --- a/docs/source/python/extending_types.rst +++ b/docs/source/python/extending_types.rst @@ -111,15 +111,14 @@ IPC protocol:: >>> batch = pa.RecordBatch.from_arrays([arr], ["ext"]) >>> sink = pa.BufferOutputStream() - >>> writer = pa.RecordBatchStreamWriter(sink, batch.schema) - >>> writer.write_batch(batch) - >>> writer.close() + >>> with pa.RecordBatchStreamWriter(sink, batch.schema) as writer: + ... writer.write_batch(batch) >>> buf = sink.getvalue() and then reading it back yields the proper type:: - >>> reader = pa.ipc.open_stream(buf) - >>> result = reader.read_all() + >>> with pa.ipc.open_stream(buf) as reader: + ... result = reader.read_all() >>> result.column('ext').type UuidType(extension) diff --git a/docs/source/python/parquet.rst b/docs/source/python/parquet.rst index cab385b8b5d..7263ea98dc5 100644 --- a/docs/source/python/parquet.rst +++ b/docs/source/python/parquet.rst @@ -208,22 +208,13 @@ We can similarly write a Parquet file with multiple row groups by using .. ipython:: python - writer = pq.ParquetWriter('example2.parquet', table.schema) - for i in range(3): - writer.write_table(table) - writer.close() + with pq.ParquetWriter('example2.parquet', table.schema) as writer: + for i in range(3): + writer.write_table(table) pf2 = pq.ParquetFile('example2.parquet') pf2.num_row_groups -Alternatively python ``with`` syntax can also be use: - -.. ipython:: python - - with pq.ParquetWriter('example3.parquet', table.schema) as writer: - for i in range(3): - writer.write_table(table) - Inspecting the Parquet File Metadata ------------------------------------ diff --git a/docs/source/python/plasma.rst b/docs/source/python/plasma.rst index e373bd0a69d..51c7b6eafee 100644 --- a/docs/source/python/plasma.rst +++ b/docs/source/python/plasma.rst @@ -360,9 +360,8 @@ size of the Plasma object. # is done to determine the size of buffer to request from the object store. object_id = plasma.ObjectID(np.random.bytes(20)) mock_sink = pa.MockOutputStream() - stream_writer = pa.RecordBatchStreamWriter(mock_sink, record_batch.schema) - stream_writer.write_batch(record_batch) - stream_writer.close() + with pa.RecordBatchStreamWriter(mock_sink, record_batch.schema) as stream_writer: + stream_writer.write_batch(record_batch) data_size = mock_sink.size() buf = client.create(object_id, data_size) @@ -372,9 +371,8 @@ The DataFrame can now be written to the buffer as follows. # Write the PyArrow RecordBatch to Plasma stream = pa.FixedSizeBufferWriter(buf) - stream_writer = pa.RecordBatchStreamWriter(stream, record_batch.schema) - stream_writer.write_batch(record_batch) - stream_writer.close() + with pa.RecordBatchStreamWriter(stream, record_batch.schema) as stream_writer: + stream_writer.write_batch(record_batch) Finally, seal the finished object for use by all clients: