From e8cde5842288ac00ff850f2e17d22f96cb71d1c4 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Mon, 16 Sep 2019 16:22:38 -0500 Subject: [PATCH 1/4] Add option to use legacy / pre-0.15 IPC message format and to set the default value by environment variable --- python/pyarrow/includes/libarrow.pxd | 10 +++++---- python/pyarrow/ipc.pxi | 33 ++++++++++++++++++---------- python/pyarrow/ipc.py | 24 ++++++++++++++++---- python/pyarrow/tests/test_ipc.py | 31 ++++++++++++++++++++++++-- 4 files changed, 77 insertions(+), 21 deletions(-) diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 33f92ca244f..8cef315ed0b 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -1029,14 +1029,16 @@ cdef extern from "arrow/ipc/api.h" namespace "arrow::ipc" nogil: cdef cppclass CRecordBatchStreamWriter \ " arrow::ipc::RecordBatchStreamWriter"(CRecordBatchWriter): @staticmethod - CStatus Open(OutputStream* sink, const shared_ptr[CSchema]& schema, - shared_ptr[CRecordBatchWriter]* out) + CResult[shared_ptr[CRecordBatchWriter]] Open( + OutputStream* sink, const shared_ptr[CSchema]& schema, + CIpcOptions& options) cdef cppclass CRecordBatchFileWriter \ " arrow::ipc::RecordBatchFileWriter"(CRecordBatchWriter): @staticmethod - CStatus Open(OutputStream* sink, const shared_ptr[CSchema]& schema, - shared_ptr[CRecordBatchWriter]* out) + CResult[shared_ptr[CRecordBatchWriter]] Open( + OutputStream* sink, const shared_ptr[CSchema]& schema, + CIpcOptions& options) cdef cppclass CRecordBatchFileReader \ " arrow::ipc::RecordBatchFileReader": diff --git a/python/pyarrow/ipc.pxi b/python/pyarrow/ipc.pxi index 6710f639b00..0e52c6f1ae3 100644 --- a/python/pyarrow/ipc.pxi +++ b/python/pyarrow/ipc.pxi @@ -248,6 +248,7 @@ cdef class _CRecordBatchWriter: cdef class _RecordBatchStreamWriter(_CRecordBatchWriter): cdef: shared_ptr[OutputStream] sink + CIpcOptions options bint closed def __cinit__(self): @@ -256,14 +257,20 @@ cdef class _RecordBatchStreamWriter(_CRecordBatchWriter): def __dealloc__(self): pass - def _open(self, sink, Schema schema): - get_writer(sink, &self.sink) + @property + def _use_legacy_format(self): + return self.options.write_legacy_ipc_format + + def _open(self, sink, Schema schema, use_legacy_format=False): + cdef: + CResult[shared_ptr[CRecordBatchWriter]] result + self.options.write_legacy_ipc_format = use_legacy_format + get_writer(sink, &self.sink) with nogil: - check_status( - CRecordBatchStreamWriter.Open(self.sink.get(), - schema.sp_schema, - &self.writer)) + result = CRecordBatchStreamWriter.Open( + self.sink.get(), schema.sp_schema, self.options) + self.writer = GetResultValue(result) cdef _get_input_stream(object source, shared_ptr[InputStream]* out): @@ -341,13 +348,17 @@ cdef class _RecordBatchStreamReader(_CRecordBatchReader): cdef class _RecordBatchFileWriter(_RecordBatchStreamWriter): - def _open(self, sink, Schema schema): - get_writer(sink, &self.sink) + def _open(self, sink, Schema schema, use_legacy_format=False): + cdef: + CResult[shared_ptr[CRecordBatchWriter]] result + self.options.write_legacy_ipc_format = use_legacy_format + get_writer(sink, &self.sink) with nogil: - check_status( - CRecordBatchFileWriter.Open(self.sink.get(), schema.sp_schema, - &self.writer)) + result = CRecordBatchFileWriter.Open(self.sink.get(), + schema.sp_schema, + self.options) + self.writer = GetResultValue(result) cdef class _RecordBatchFileReader: diff --git a/python/pyarrow/ipc.py b/python/pyarrow/ipc.py index 378a92b2fb2..4ca21788c0c 100644 --- a/python/pyarrow/ipc.py +++ b/python/pyarrow/ipc.py @@ -70,9 +70,13 @@ class RecordBatchStreamWriter(lib._RecordBatchStreamWriter): Either a file path, or a writable file object schema : pyarrow.Schema The Arrow schema for data to be written to the file + use_legacy_format : boolean, default None + If None, use True unless overridden by PYARROW_LEGACY_IPC_FORMAT=1 + environment variable """ - def __init__(self, sink, schema): - self._open(sink, schema) + def __init__(self, sink, schema, use_legacy_format=None): + use_legacy_format = _get_legacy_format_default(use_legacy_format) + self._open(sink, schema, use_legacy_format=use_legacy_format) class RecordBatchFileReader(lib._RecordBatchFileReader, _ReadPandasOption): @@ -101,9 +105,21 @@ class RecordBatchFileWriter(lib._RecordBatchFileWriter): Either a file path, or a writable file object schema : pyarrow.Schema The Arrow schema for data to be written to the file + use_legacy_format : boolean, default None + If None, use True unless overridden by PYARROW_LEGACY_IPC_FORMAT=1 + environment variable """ - def __init__(self, sink, schema): - self._open(sink, schema) + def __init__(self, sink, schema, use_legacy_format=None): + use_legacy_format = _get_legacy_format_default(use_legacy_format) + self._open(sink, schema, use_legacy_format=use_legacy_format) + + +def _get_legacy_format_default(use_legacy_format): + if use_legacy_format is None: + import os + return bool(int(os.environ.get('PYARROW_LEGACY_IPC_FORMAT', '0'))) + else: + return use_legacy_format def open_stream(source): diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py index a94151fbeed..9e5d3e58cc5 100644 --- a/python/pyarrow/tests/test_ipc.py +++ b/python/pyarrow/tests/test_ipc.py @@ -102,8 +102,15 @@ def _check_roundtrip(self, as_table=False): class StreamFormatFixture(IpcFixture): + # ARROW-6474, for testing writing old IPC protocol with 4-byte prefix + use_legacy_ipc_format = False + def _get_writer(self, sink, schema): - return pa.RecordBatchStreamWriter(sink, schema) + return pa.RecordBatchStreamWriter( + sink, + schema, + use_legacy_format=self.use_legacy_ipc_format + ) class MessageFixture(IpcFixture): @@ -289,7 +296,9 @@ def test_stream_write_table_batches(stream_fixture): ignore_index=True)) -def test_stream_simple_roundtrip(stream_fixture): +@pytest.mark.parametrize('use_legacy_ipc_format', [False, True]) +def test_stream_simple_roundtrip(stream_fixture, use_legacy_ipc_format): + stream_fixture.use_legacy_ipc_format = use_legacy_ipc_format _, batches = stream_fixture.write_batches() file_contents = pa.BufferReader(stream_fixture.get_source()) reader = pa.ipc.open_stream(file_contents) @@ -307,6 +316,24 @@ def test_stream_simple_roundtrip(stream_fixture): reader.read_next_batch() +def test_envvar_set_legacy_ipc_format(): + schema = pa.schema([pa.field('foo', pa.int32())]) + + writer = pa.RecordBatchStreamWriter(pa.BufferOutputStream(), schema) + assert not writer._use_legacy_format + writer = pa.RecordBatchFileWriter(pa.BufferOutputStream(), schema) + assert not writer._use_legacy_format + + import os + os.environ['PYARROW_LEGACY_IPC_FORMAT'] = '1' + writer = pa.RecordBatchStreamWriter(pa.BufferOutputStream(), schema) + assert writer._use_legacy_format + writer = pa.RecordBatchFileWriter(pa.BufferOutputStream(), schema) + assert writer._use_legacy_format + + del os.environ['PYARROW_LEGACY_IPC_FORMAT'] + + def test_stream_read_all(stream_fixture): _, batches = stream_fixture.write_batches() file_contents = pa.BufferReader(stream_fixture.get_source()) From 0bb54d4cec84e7a5ba464fee5c38519ef7f5c4bb Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Tue, 17 Sep 2019 18:55:50 -0500 Subject: [PATCH 2/4] Rename environment variable per comments --- python/pyarrow/ipc.py | 2 +- python/pyarrow/tests/test_ipc.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/python/pyarrow/ipc.py b/python/pyarrow/ipc.py index 4ca21788c0c..992280018fe 100644 --- a/python/pyarrow/ipc.py +++ b/python/pyarrow/ipc.py @@ -117,7 +117,7 @@ def __init__(self, sink, schema, use_legacy_format=None): def _get_legacy_format_default(use_legacy_format): if use_legacy_format is None: import os - return bool(int(os.environ.get('PYARROW_LEGACY_IPC_FORMAT', '0'))) + return bool(int(os.environ.get('ARROW_PRE_0_15_IPC_FORMAT', '0'))) else: return use_legacy_format diff --git a/python/pyarrow/tests/test_ipc.py b/python/pyarrow/tests/test_ipc.py index 9e5d3e58cc5..5f1a9320e8d 100644 --- a/python/pyarrow/tests/test_ipc.py +++ b/python/pyarrow/tests/test_ipc.py @@ -325,13 +325,13 @@ def test_envvar_set_legacy_ipc_format(): assert not writer._use_legacy_format import os - os.environ['PYARROW_LEGACY_IPC_FORMAT'] = '1' + os.environ['ARROW_PRE_0_15_IPC_FORMAT'] = '1' writer = pa.RecordBatchStreamWriter(pa.BufferOutputStream(), schema) assert writer._use_legacy_format writer = pa.RecordBatchFileWriter(pa.BufferOutputStream(), schema) assert writer._use_legacy_format - del os.environ['PYARROW_LEGACY_IPC_FORMAT'] + del os.environ['ARROW_PRE_0_15_IPC_FORMAT'] def test_stream_read_all(stream_fixture): From a1768bd82ad21aa7bd1e9a8e66d30705475929b1 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Wed, 18 Sep 2019 10:06:43 -0500 Subject: [PATCH 3/4] Synchronize docsstrings --- python/pyarrow/ipc.py | 41 +++++++++++++++++------------------------ 1 file changed, 17 insertions(+), 24 deletions(-) diff --git a/python/pyarrow/ipc.py b/python/pyarrow/ipc.py index 992280018fe..77518659c43 100644 --- a/python/pyarrow/ipc.py +++ b/python/pyarrow/ipc.py @@ -60,20 +60,22 @@ def __init__(self, source): self._open(source) +_ipc_writer_class_doc = """\ +Parameters +---------- +sink : str, pyarrow.NativeFile, or file-like Python object + Either a file path, or a writable file object +schema : pyarrow.Schema + The Arrow schema for data to be written to the file +use_legacy_format : boolean, default None + If None, use True unless overridden by ARROW_PRE_0_15_IPC_FORMAT=1 + environment variable""" + + class RecordBatchStreamWriter(lib._RecordBatchStreamWriter): - """ - Writer for the Arrow streaming binary format + __doc__ = """Writer for the Arrow streaming binary format +{}""".format(_ipc_writer_class_doc) - Parameters - ---------- - sink : str, pyarrow.NativeFile, or file-like Python object - Either a file path, or a writable file object - schema : pyarrow.Schema - The Arrow schema for data to be written to the file - use_legacy_format : boolean, default None - If None, use True unless overridden by PYARROW_LEGACY_IPC_FORMAT=1 - environment variable - """ def __init__(self, sink, schema, use_legacy_format=None): use_legacy_format = _get_legacy_format_default(use_legacy_format) self._open(sink, schema, use_legacy_format=use_legacy_format) @@ -96,19 +98,10 @@ def __init__(self, source, footer_offset=None): class RecordBatchFileWriter(lib._RecordBatchFileWriter): - """ - Writer to create the Arrow binary file format - Parameters - ---------- - sink : str, pyarrow.NativeFile, or file-like Python object - Either a file path, or a writable file object - schema : pyarrow.Schema - The Arrow schema for data to be written to the file - use_legacy_format : boolean, default None - If None, use True unless overridden by PYARROW_LEGACY_IPC_FORMAT=1 - environment variable - """ + __doc__ = """Writer to create the Arrow binary file format +{}""".format(_ipc_writer_class_doc) + def __init__(self, sink, schema, use_legacy_format=None): use_legacy_format = _get_legacy_format_default(use_legacy_format) self._open(sink, schema, use_legacy_format=use_legacy_format) From 52a966d86661a5fdcabc0b729fa2932f4991bc97 Mon Sep 17 00:00:00 2001 From: Wes McKinney Date: Wed, 18 Sep 2019 12:00:54 -0500 Subject: [PATCH 4/4] Fix sphinx warning --- python/pyarrow/ipc.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python/pyarrow/ipc.py b/python/pyarrow/ipc.py index 77518659c43..664f000977b 100644 --- a/python/pyarrow/ipc.py +++ b/python/pyarrow/ipc.py @@ -74,6 +74,7 @@ def __init__(self, source): class RecordBatchStreamWriter(lib._RecordBatchStreamWriter): __doc__ = """Writer for the Arrow streaming binary format + {}""".format(_ipc_writer_class_doc) def __init__(self, sink, schema, use_legacy_format=None): @@ -100,6 +101,7 @@ def __init__(self, source, footer_offset=None): class RecordBatchFileWriter(lib._RecordBatchFileWriter): __doc__ = """Writer to create the Arrow binary file format + {}""".format(_ipc_writer_class_doc) def __init__(self, sink, schema, use_legacy_format=None):