From ffbe9336904bda5662762f83807a47480e070dd0 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Tue, 26 Jul 2022 12:18:12 +0200 Subject: [PATCH 01/31] Added field to CsvFragmentScanOptions that holds an optional transform function --- cpp/src/arrow/dataset/file_csv.cc | 10 ++++++++-- cpp/src/arrow/dataset/file_csv.h | 10 ++++++++++ python/pyarrow/includes/libarrow.pxd | 8 ++++++++ python/pyarrow/includes/libarrow_dataset.pxd | 1 + python/pyarrow/src/io.cc | 10 ++++++++++ python/pyarrow/src/io.h | 3 +++ 6 files changed, 40 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc index d4e0af7808c..3b9200c003b 100644 --- a/cpp/src/arrow/dataset/file_csv.cc +++ b/cpp/src/arrow/dataset/file_csv.cc @@ -183,9 +183,15 @@ static inline Future> OpenReaderAsync( auto tracer = arrow::internal::tracing::GetTracer(); auto span = tracer->StartSpan("arrow::dataset::CsvFileFormat::OpenReaderAsync"); #endif - ARROW_ASSIGN_OR_RAISE(auto reader_options, GetReadOptions(format, scan_options)); - + ARROW_ASSIGN_OR_RAISE( + auto fragment_scan_options, + GetFragmentScanOptions( + kCsvTypeName, scan_options.get(), format.default_fragment_scan_options)); + auto reader_options = fragment_scan_options->read_options; ARROW_ASSIGN_OR_RAISE(auto input, source.OpenCompressed()); + if (fragment_scan_options->stream_transform_func) { + ARROW_ASSIGN_OR_RAISE(input, fragment_scan_options->stream_transform_func(input)); + } const auto& path = source.path(); ARROW_ASSIGN_OR_RAISE( input, io::BufferedInputStream::Create(reader_options.block_size, diff --git a/cpp/src/arrow/dataset/file_csv.h b/cpp/src/arrow/dataset/file_csv.h index 83dbb88b85f..84bcf94abe3 100644 --- a/cpp/src/arrow/dataset/file_csv.h +++ b/cpp/src/arrow/dataset/file_csv.h @@ -73,6 +73,9 @@ class ARROW_DS_EXPORT CsvFileFormat : public FileFormat { struct ARROW_DS_EXPORT CsvFragmentScanOptions : public FragmentScanOptions { std::string type_name() const override { return kCsvTypeName; } + using StreamWrapFunc = std::function>( + std::shared_ptr)>; + /// CSV conversion options csv::ConvertOptions convert_options = csv::ConvertOptions::Defaults(); @@ -80,6 +83,13 @@ struct ARROW_DS_EXPORT CsvFragmentScanOptions : public FragmentScanOptions { /// /// Note that use_threads is always ignored. csv::ReadOptions read_options = csv::ReadOptions::Defaults(); + + /// Optional stream wrapping function + /// + /// If defined, all open dataset file fragments will be passed + /// through this function. One possible use case is to transparently + /// transcode all input files from a given character set to utf8. + StreamWrapFunc stream_transform_func{}; }; class ARROW_DS_EXPORT CsvFileWriteOptions : public FileWriteOptions { diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 781d2ce7ad6..cbf340864c5 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -1379,6 +1379,14 @@ cdef extern from "arrow/io/api.h" namespace "arrow::io" nogil: shared_ptr[CInputStream] wrapped, CTransformInputStreamVTable vtable, object method_arg) + ctypedef CResult[shared_ptr[CInputStream]] StreamWrapFunc( + shared_ptr[CInputStream]) + + function[StreamWrapFunc] makeStreamTransformFunc \ + "arrow::py::makeStreamTransformFunc"( + CTransformInputStreamVTable vtable, + object method_arg) + # ---------------------------------------------------------------------- # HDFS diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index bd8fbd1b56a..ad1bbbc5442 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -277,6 +277,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: "arrow::dataset::CsvFragmentScanOptions"(CFragmentScanOptions): CCSVConvertOptions convert_options CCSVReadOptions read_options + function[StreamWrapFunc] stream_transform_func cdef cppclass CPartitioning "arrow::dataset::Partitioning": c_string type_name() const diff --git a/python/pyarrow/src/io.cc b/python/pyarrow/src/io.cc index 173d84ff567..74834aef3af 100644 --- a/python/pyarrow/src/io.cc +++ b/python/pyarrow/src/io.cc @@ -370,5 +370,15 @@ std::shared_ptr<::arrow::io::InputStream> MakeTransformInputStream( return std::make_shared(std::move(wrapped), std::move(transform)); } +std::function>(std::shared_ptr)> +makeStreamTransformFunc(TransformInputStreamVTable vtable, PyObject* handler) { + std::function>( + std::shared_ptr)> + func = [=](std::shared_ptr<::arrow::io::InputStream> wrapped) { + return MakeTransformInputStream(wrapped, vtable, handler); + }; + return func; +} + } // namespace py } // namespace arrow diff --git a/python/pyarrow/src/io.h b/python/pyarrow/src/io.h index 53b15434ea6..fb050af2db0 100644 --- a/python/pyarrow/src/io.h +++ b/python/pyarrow/src/io.h @@ -112,5 +112,8 @@ std::shared_ptr<::arrow::io::InputStream> MakeTransformInputStream( std::shared_ptr<::arrow::io::InputStream> wrapped, TransformInputStreamVTable vtable, PyObject* arg); +std::function>(std::shared_ptr)> +makeStreamTransformFunc(TransformInputStreamVTable vtable, PyObject* handler); + } // namespace py } // namespace arrow From e7486c491cafecbeba9dd525b55bc69daf790640 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Tue, 26 Jul 2022 13:17:28 +0200 Subject: [PATCH 02/31] WIP wrapping a trancoder around all input streams of a dataset --- python/pyarrow/_dataset.pyx | 54 +++++++++++++++++++++++++++++++++++++ python/pyarrow/dataset.py | 10 ++++--- 2 files changed, 61 insertions(+), 3 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 68833a5350e..d505f638556 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -21,6 +21,7 @@ from cython.operator cimport dereference as deref +import codecs import collections import os import warnings @@ -1237,6 +1238,40 @@ cdef class CsvFileFormat(FileFormat): return f"" +# From io.pxi +def py_buffer(object obj): + """ + Construct an Arrow buffer from a Python bytes-like or buffer-like object + + Parameters + ---------- + obj : object + the object from which the buffer should be constructed. + """ + cdef shared_ptr[CBuffer] buf + buf = GetResultValue(PyBuffer.FromPyObject(obj)) + return pyarrow_wrap_buffer(buf) + + +# From io.pxi +cdef void _cb_transform(transform_func, const shared_ptr[CBuffer]& src, + shared_ptr[CBuffer]* dest) except *: + py_dest = transform_func(pyarrow_wrap_buffer(src)) + dest[0] = pyarrow_unwrap_buffer(py_buffer(py_dest)) + + +# from io.pxi +class Transcoder: + + def __init__(self, decoder, encoder): + self._decoder = decoder + self._encoder = encoder + + def __call__(self, buf): + final = len(buf) == 0 + return self._encoder.encode(self._decoder.decode(buf, final), final) + + cdef class CsvFragmentScanOptions(FragmentScanOptions): """ Scan-specific options for CSV fragments. @@ -1284,6 +1319,25 @@ cdef class CsvFragmentScanOptions(FragmentScanOptions): def read_options(self, ReadOptions read_options not None): self.csv_options.read_options = deref(read_options.options) + cdef transform_encoding(self, src_encoding): + cdef: + CTransformInputStreamVTable vtable + + src_codec = codecs.lookup(src_encoding) + dest_codec = codecs.lookup("utf8") + if src_codec.name == dest_codec.name: + # Avoid losing performance on no-op transcoding + # (encoding errors won't be detected) + return + + vtable.transform = _cb_transform + self.csv_options.stream_transform_func = makeStreamTransformFunc(move(vtable), + Transcoder(src_codec.incrementaldecoder(), + dest_codec.incrementalencoder())) + + def stream_transform_func(self, src_encoding): + self.transform_encoding(src_encoding) + def equals(self, CsvFragmentScanOptions other): return ( other and diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 326b37ec6e1..44839a1e2f9 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -422,7 +422,7 @@ def _ensure_single_source(path, filesystem=None): def _filesystem_dataset(source, schema=None, filesystem=None, partitioning=None, format=None, partition_base_dir=None, exclude_invalid_files=None, - selector_ignore_prefixes=None): + selector_ignore_prefixes=None, encoding='utf8'): """ Create a FileSystemDataset which can be used to build a Dataset. @@ -433,6 +433,9 @@ def _filesystem_dataset(source, schema=None, filesystem=None, FileSystemDataset """ format = _ensure_format(format or 'parquet') + if encoding != 'utf8': + format.default_fragment_scan_options.stream_transform_func(encoding) + partitioning = _ensure_partitioning(partitioning) if isinstance(source, (list, tuple)): @@ -539,7 +542,7 @@ def parquet_dataset(metadata_path, schema=None, filesystem=None, format=None, def dataset(source, schema=None, format=None, filesystem=None, partitioning=None, partition_base_dir=None, - exclude_invalid_files=None, ignore_prefixes=None): + exclude_invalid_files=None, ignore_prefixes=None, encoding='utf8'): """ Open a dataset. @@ -742,7 +745,8 @@ def dataset(source, schema=None, format=None, filesystem=None, format=format, partition_base_dir=partition_base_dir, exclude_invalid_files=exclude_invalid_files, - selector_ignore_prefixes=ignore_prefixes + selector_ignore_prefixes=ignore_prefixes, + encoding=encoding ) if _is_path_like(source): From 2c11549faa4198efea61f88ce9731b3053af06d0 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Thu, 28 Jul 2022 15:32:52 +0200 Subject: [PATCH 03/31] Added input stream wrapping to CsvFileFormat::CountRows too --- cpp/src/arrow/dataset/file_csv.cc | 9 ++++++++- python/pyarrow/src/io.cc | 16 ++++++++-------- python/pyarrow/src/io.h | 7 ++++--- 3 files changed, 20 insertions(+), 12 deletions(-) diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc index 3b9200c003b..4f36a349552 100644 --- a/cpp/src/arrow/dataset/file_csv.cc +++ b/cpp/src/arrow/dataset/file_csv.cc @@ -295,8 +295,15 @@ Future> CsvFileFormat::CountRows( return Future>::MakeFinished(util::nullopt); } auto self = checked_pointer_cast(shared_from_this()); + ARROW_ASSIGN_OR_RAISE( + auto fragment_scan_options, + GetFragmentScanOptions( + kCsvTypeName, options.get(), self->default_fragment_scan_options)); + auto read_options = fragment_scan_options->read_options; ARROW_ASSIGN_OR_RAISE(auto input, file->source().OpenCompressed()); - ARROW_ASSIGN_OR_RAISE(auto read_options, GetReadOptions(*self, options)); + if (fragment_scan_options->stream_transform_func) { + ARROW_ASSIGN_OR_RAISE(input, fragment_scan_options->stream_transform_func(input)); + } return csv::CountRowsAsync(options->io_context, std::move(input), ::arrow::internal::GetCpuThreadPool(), read_options, self->parse_options) diff --git a/python/pyarrow/src/io.cc b/python/pyarrow/src/io.cc index 74834aef3af..e81a52f92c6 100644 --- a/python/pyarrow/src/io.cc +++ b/python/pyarrow/src/io.cc @@ -370,14 +370,14 @@ std::shared_ptr<::arrow::io::InputStream> MakeTransformInputStream( return std::make_shared(std::move(wrapped), std::move(transform)); } -std::function>(std::shared_ptr)> -makeStreamTransformFunc(TransformInputStreamVTable vtable, PyObject* handler) { - std::function>( - std::shared_ptr)> - func = [=](std::shared_ptr<::arrow::io::InputStream> wrapped) { - return MakeTransformInputStream(wrapped, vtable, handler); - }; - return func; +std::shared_ptr makeStreamTransformFunc(TransformInputStreamVTable vtable, + PyObject* handler) { + TransformInputStream::TransformFunc transform( + TransformFunctionWrapper{std::move(vtable.transform), handler}); + StreamWrapFunc func = [transform](std::shared_ptr<::arrow::io::InputStream> wrapped) { + return std::make_shared(wrapped, transform); + }; + return std::make_shared(func); } } // namespace py diff --git a/python/pyarrow/src/io.h b/python/pyarrow/src/io.h index fb050af2db0..bd5a0f6066a 100644 --- a/python/pyarrow/src/io.h +++ b/python/pyarrow/src/io.h @@ -112,8 +112,9 @@ std::shared_ptr<::arrow::io::InputStream> MakeTransformInputStream( std::shared_ptr<::arrow::io::InputStream> wrapped, TransformInputStreamVTable vtable, PyObject* arg); -std::function>(std::shared_ptr)> -makeStreamTransformFunc(TransformInputStreamVTable vtable, PyObject* handler); - +using StreamWrapFunc = std::function>( + std::shared_ptr)>; +std::shared_ptr makeStreamTransformFunc(TransformInputStreamVTable vtable, + PyObject* handler); } // namespace py } // namespace arrow From 1bb77bad8b97545a79e3b08442138d6ddd8b21a7 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Thu, 28 Jul 2022 21:41:08 +0200 Subject: [PATCH 04/31] Moved make_streamwrap_func into io.pxi, removed duplicated code --- python/pyarrow/_dataset.pyx | 54 +++------------------------- python/pyarrow/dataset.py | 3 +- python/pyarrow/includes/libarrow.pxd | 8 ++--- python/pyarrow/io.pxi | 27 ++++++++++++++ python/pyarrow/lib.pxd | 3 ++ 5 files changed, 39 insertions(+), 56 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index d505f638556..9e471630548 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -1238,40 +1238,6 @@ cdef class CsvFileFormat(FileFormat): return f"" -# From io.pxi -def py_buffer(object obj): - """ - Construct an Arrow buffer from a Python bytes-like or buffer-like object - - Parameters - ---------- - obj : object - the object from which the buffer should be constructed. - """ - cdef shared_ptr[CBuffer] buf - buf = GetResultValue(PyBuffer.FromPyObject(obj)) - return pyarrow_wrap_buffer(buf) - - -# From io.pxi -cdef void _cb_transform(transform_func, const shared_ptr[CBuffer]& src, - shared_ptr[CBuffer]* dest) except *: - py_dest = transform_func(pyarrow_wrap_buffer(src)) - dest[0] = pyarrow_unwrap_buffer(py_buffer(py_dest)) - - -# from io.pxi -class Transcoder: - - def __init__(self, decoder, encoder): - self._decoder = decoder - self._encoder = encoder - - def __call__(self, buf): - final = len(buf) == 0 - return self._encoder.encode(self._decoder.decode(buf, final), final) - - cdef class CsvFragmentScanOptions(FragmentScanOptions): """ Scan-specific options for CSV fragments. @@ -1319,24 +1285,12 @@ cdef class CsvFragmentScanOptions(FragmentScanOptions): def read_options(self, ReadOptions read_options not None): self.csv_options.read_options = deref(read_options.options) - cdef transform_encoding(self, src_encoding): - cdef: - CTransformInputStreamVTable vtable - - src_codec = codecs.lookup(src_encoding) - dest_codec = codecs.lookup("utf8") - if src_codec.name == dest_codec.name: - # Avoid losing performance on no-op transcoding - # (encoding errors won't be detected) + def add_transcoder(self, src_encoding, dest_encoding): + if src_encoding == dest_encoding: return - vtable.transform = _cb_transform - self.csv_options.stream_transform_func = makeStreamTransformFunc(move(vtable), - Transcoder(src_codec.incrementaldecoder(), - dest_codec.incrementalencoder())) - - def stream_transform_func(self, src_encoding): - self.transform_encoding(src_encoding) + self.csv_options.stream_transform_func = deref( + make_streamwrap_func(src_encoding, dest_encoding)) def equals(self, CsvFragmentScanOptions other): return ( diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 44839a1e2f9..b283874cd9b 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -433,8 +433,7 @@ def _filesystem_dataset(source, schema=None, filesystem=None, FileSystemDataset """ format = _ensure_format(format or 'parquet') - if encoding != 'utf8': - format.default_fragment_scan_options.stream_transform_func(encoding) + format.default_fragment_scan_options.add_transcoder(encoding, "utf8") partitioning = _ensure_partitioning(partitioning) diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index cbf340864c5..3b0c9366042 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -1212,6 +1212,9 @@ cdef extern from "arrow/builder.h" namespace "arrow" nogil: ctypedef void CallbackTransform(object, const shared_ptr[CBuffer]& src, shared_ptr[CBuffer]* dest) +ctypedef CResult[shared_ptr[CInputStream]] StreamWrapFunc( + shared_ptr[CInputStream]) + cdef extern from "arrow/util/cancel.h" namespace "arrow" nogil: cdef cppclass CStopToken "arrow::StopToken": @@ -1379,10 +1382,7 @@ cdef extern from "arrow/io/api.h" namespace "arrow::io" nogil: shared_ptr[CInputStream] wrapped, CTransformInputStreamVTable vtable, object method_arg) - ctypedef CResult[shared_ptr[CInputStream]] StreamWrapFunc( - shared_ptr[CInputStream]) - - function[StreamWrapFunc] makeStreamTransformFunc \ + shared_ptr[function[StreamWrapFunc]] makeStreamTransformFunc \ "arrow::py::makeStreamTransformFunc"( CTransformInputStreamVTable vtable, object method_arg) diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi index d1d3feb3c17..4aa1b884c31 100644 --- a/python/pyarrow/io.pxi +++ b/python/pyarrow/io.pxi @@ -1583,6 +1583,33 @@ class Transcoder: return self._encoder.encode(self._decoder.decode(buf, final), final) +cdef shared_ptr[function[StreamWrapFunc]] make_streamwrap_func( + src_encoding, dest_encoding) except *: + """ + Create a function that will add a transcoding transformation to a stream. + Data from that stream will be decoded according to ``src_encoding`` and + then re-encoded according to ``dest_encoding``. + The created function can be used to wrap streams once they are created. + + Parameters + ---------- + src_encoding : str + The codec to use when reading data data. + dest_encoding : str + The codec to use for emitted data. + """ + cdef: + shared_ptr[function[StreamWrapFunc]] empty_func + CTransformInputStreamVTable vtable + + vtable.transform = _cb_transform + src_codec = codecs.lookup(src_encoding) + dest_codec = codecs.lookup(dest_encoding) + return makeStreamTransformFunc(move(vtable), + Transcoder(src_codec.incrementaldecoder(), + dest_codec.incrementalencoder())) + + def transcoding_input_stream(stream, src_encoding, dest_encoding): """ Add a transcoding transformation to the stream. diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd index 953b0e7b518..445e1132e73 100644 --- a/python/pyarrow/lib.pxd +++ b/python/pyarrow/lib.pxd @@ -536,6 +536,9 @@ cdef shared_ptr[CInputStream] native_transcoding_input_stream( shared_ptr[CInputStream] stream, src_encoding, dest_encoding) except * +cdef shared_ptr[function[StreamWrapFunc]] make_streamwrap_func( + src_encoding, dest_encoding) except * + # Default is allow_none=False cpdef DataType ensure_type(object type, bint allow_none=*) From 4e511a60330e905239ea1a5b9470f7288dd40995 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Thu, 28 Jul 2022 21:54:25 +0200 Subject: [PATCH 05/31] Use UpperCamelCase in function name --- python/pyarrow/io.pxi | 2 +- python/pyarrow/src/io.cc | 2 +- python/pyarrow/src/io.h | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi index 4aa1b884c31..a06d90b49cf 100644 --- a/python/pyarrow/io.pxi +++ b/python/pyarrow/io.pxi @@ -1605,7 +1605,7 @@ cdef shared_ptr[function[StreamWrapFunc]] make_streamwrap_func( vtable.transform = _cb_transform src_codec = codecs.lookup(src_encoding) dest_codec = codecs.lookup(dest_encoding) - return makeStreamTransformFunc(move(vtable), + return MakeStreamTransformFunc(move(vtable), Transcoder(src_codec.incrementaldecoder(), dest_codec.incrementalencoder())) diff --git a/python/pyarrow/src/io.cc b/python/pyarrow/src/io.cc index e81a52f92c6..0aa2c85939f 100644 --- a/python/pyarrow/src/io.cc +++ b/python/pyarrow/src/io.cc @@ -370,7 +370,7 @@ std::shared_ptr<::arrow::io::InputStream> MakeTransformInputStream( return std::make_shared(std::move(wrapped), std::move(transform)); } -std::shared_ptr makeStreamTransformFunc(TransformInputStreamVTable vtable, +std::shared_ptr MakeStreamTransformFunc(TransformInputStreamVTable vtable, PyObject* handler) { TransformInputStream::TransformFunc transform( TransformFunctionWrapper{std::move(vtable.transform), handler}); diff --git a/python/pyarrow/src/io.h b/python/pyarrow/src/io.h index bd5a0f6066a..2ff1b8eddf3 100644 --- a/python/pyarrow/src/io.h +++ b/python/pyarrow/src/io.h @@ -114,7 +114,7 @@ std::shared_ptr<::arrow::io::InputStream> MakeTransformInputStream( using StreamWrapFunc = std::function>( std::shared_ptr)>; -std::shared_ptr makeStreamTransformFunc(TransformInputStreamVTable vtable, +std::shared_ptr MakeStreamTransformFunc(TransformInputStreamVTable vtable, PyObject* handler); } // namespace py } // namespace arrow From 4c3a87e020ea0cb824574c59cd63fc969205d7a6 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Thu, 28 Jul 2022 22:32:26 +0200 Subject: [PATCH 06/31] Additional name change, formatting fix --- python/pyarrow/includes/libarrow.pxd | 4 ++-- python/pyarrow/lib.pxd | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 3b0c9366042..b3fada56680 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -1382,8 +1382,8 @@ cdef extern from "arrow/io/api.h" namespace "arrow::io" nogil: shared_ptr[CInputStream] wrapped, CTransformInputStreamVTable vtable, object method_arg) - shared_ptr[function[StreamWrapFunc]] makeStreamTransformFunc \ - "arrow::py::makeStreamTransformFunc"( + shared_ptr[function[StreamWrapFunc]] MakeStreamTransformFunc \ + "arrow::py::MakeStreamTransformFunc"( CTransformInputStreamVTable vtable, object method_arg) diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd index 445e1132e73..67db3d2ffb8 100644 --- a/python/pyarrow/lib.pxd +++ b/python/pyarrow/lib.pxd @@ -537,7 +537,7 @@ cdef shared_ptr[CInputStream] native_transcoding_input_stream( dest_encoding) except * cdef shared_ptr[function[StreamWrapFunc]] make_streamwrap_func( - src_encoding, dest_encoding) except * + src_encoding, dest_encoding) except * # Default is allow_none=False cpdef DataType ensure_type(object type, bint allow_none=*) From e16e9c1b78e72f249d18ecc5944e321fd1d6047c Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 29 Jul 2022 14:25:45 +0200 Subject: [PATCH 07/31] Use GetReadOptions(), because it overrides the use_threads field --- cpp/src/arrow/dataset/file_csv.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc index 4f36a349552..780f845429b 100644 --- a/cpp/src/arrow/dataset/file_csv.cc +++ b/cpp/src/arrow/dataset/file_csv.cc @@ -187,7 +187,7 @@ static inline Future> OpenReaderAsync( auto fragment_scan_options, GetFragmentScanOptions( kCsvTypeName, scan_options.get(), format.default_fragment_scan_options)); - auto reader_options = fragment_scan_options->read_options; + ARROW_ASSIGN_OR_RAISE(auto reader_options, GetReadOptions(format, scan_options)); ARROW_ASSIGN_OR_RAISE(auto input, source.OpenCompressed()); if (fragment_scan_options->stream_transform_func) { ARROW_ASSIGN_OR_RAISE(input, fragment_scan_options->stream_transform_func(input)); @@ -299,7 +299,7 @@ Future> CsvFileFormat::CountRows( auto fragment_scan_options, GetFragmentScanOptions( kCsvTypeName, options.get(), self->default_fragment_scan_options)); - auto read_options = fragment_scan_options->read_options; + ARROW_ASSIGN_OR_RAISE(auto read_options, GetReadOptions(*self, options)); ARROW_ASSIGN_OR_RAISE(auto input, file->source().OpenCompressed()); if (fragment_scan_options->stream_transform_func) { ARROW_ASSIGN_OR_RAISE(input, fragment_scan_options->stream_transform_func(input)); From 3fbe3594a70576b02c3e9d5039f09776c9404718 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 29 Jul 2022 14:31:07 +0200 Subject: [PATCH 08/31] Only add a transcoder for csv files --- python/pyarrow/dataset.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index b283874cd9b..8ceb7a912a6 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -433,7 +433,8 @@ def _filesystem_dataset(source, schema=None, filesystem=None, FileSystemDataset """ format = _ensure_format(format or 'parquet') - format.default_fragment_scan_options.add_transcoder(encoding, "utf8") + if format.default_fragment_scan_options.type_name == 'csv': + format.default_fragment_scan_options.add_transcoder(encoding, "utf8") partitioning = _ensure_partitioning(partitioning) From 6f59e67b11d049774f99127fb8f9df667661f6d8 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 29 Jul 2022 14:35:54 +0200 Subject: [PATCH 09/31] Processed some review comments regarding documentation --- python/pyarrow/io.pxi | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi index a06d90b49cf..d7f5acfff76 100644 --- a/python/pyarrow/io.pxi +++ b/python/pyarrow/io.pxi @@ -1589,12 +1589,12 @@ cdef shared_ptr[function[StreamWrapFunc]] make_streamwrap_func( Create a function that will add a transcoding transformation to a stream. Data from that stream will be decoded according to ``src_encoding`` and then re-encoded according to ``dest_encoding``. - The created function can be used to wrap streams once they are created. + The created function can be used to wrap streams. Parameters ---------- src_encoding : str - The codec to use when reading data data. + The codec to use when reading data. dest_encoding : str The codec to use for emitted data. """ @@ -1621,7 +1621,7 @@ def transcoding_input_stream(stream, src_encoding, dest_encoding): stream : NativeFile The stream to which the transformation should be applied. src_encoding : str - The codec to use when reading data data. + The codec to use when reading data. dest_encoding : str The codec to use for emitted data. """ From c5ac8bc8f85bf835720cb03e5f98a38bd9f42457 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 29 Jul 2022 15:41:27 +0200 Subject: [PATCH 10/31] Moved encoding parameter from dataset() into CsvFileFormat --- python/pyarrow/_dataset.pyx | 8 ++++++-- python/pyarrow/dataset.py | 7 ++++--- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 9e471630548..645a80aa3e9 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -1172,6 +1172,7 @@ cdef class CsvFileFormat(FileFormat): """ cdef: CCsvFileFormat* csv_format + public object encoding # Avoid mistakingly creating attributes __slots__ = () @@ -1179,7 +1180,7 @@ cdef class CsvFileFormat(FileFormat): def __init__(self, ParseOptions parse_options=None, default_fragment_scan_options=None, ConvertOptions convert_options=None, - ReadOptions read_options=None): + ReadOptions read_options=None, encoding=None): self.init(shared_ptr[CFileFormat](new CCsvFileFormat())) if parse_options is not None: self.parse_options = parse_options @@ -1199,6 +1200,8 @@ cdef class CsvFileFormat(FileFormat): raise TypeError('`default_fragment_scan_options` must be either ' 'a dictionary or an instance of ' 'CsvFragmentScanOptions') + # Python-specific option + self.encoding = encoding cdef void init(self, const shared_ptr[CFileFormat]& sp): FileFormat.init(self, sp) @@ -1228,7 +1231,8 @@ cdef class CsvFileFormat(FileFormat): return ( self.parse_options.equals(other.parse_options) and self.default_fragment_scan_options == - other.default_fragment_scan_options) + other.default_fragment_scan_options and + self.encoding == other.encoding) def __reduce__(self): return CsvFileFormat, (self.parse_options, diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 8ceb7a912a6..d92e5dad92f 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -422,7 +422,7 @@ def _ensure_single_source(path, filesystem=None): def _filesystem_dataset(source, schema=None, filesystem=None, partitioning=None, format=None, partition_base_dir=None, exclude_invalid_files=None, - selector_ignore_prefixes=None, encoding='utf8'): + selector_ignore_prefixes=None): """ Create a FileSystemDataset which can be used to build a Dataset. @@ -434,7 +434,8 @@ def _filesystem_dataset(source, schema=None, filesystem=None, """ format = _ensure_format(format or 'parquet') if format.default_fragment_scan_options.type_name == 'csv': - format.default_fragment_scan_options.add_transcoder(encoding, "utf8") + format.default_fragment_scan_options.add_transcoder(format.encoding, + "utf8") partitioning = _ensure_partitioning(partitioning) @@ -542,7 +543,7 @@ def parquet_dataset(metadata_path, schema=None, filesystem=None, format=None, def dataset(source, schema=None, format=None, filesystem=None, partitioning=None, partition_base_dir=None, - exclude_invalid_files=None, ignore_prefixes=None, encoding='utf8'): + exclude_invalid_files=None, ignore_prefixes=None): """ Open a dataset. From 01027f8947c095f6c2b34b6c9e6fad09bc940f08 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 29 Jul 2022 15:43:49 +0200 Subject: [PATCH 11/31] Removed a left-over occurrence of the workaround encoding parameter --- python/pyarrow/dataset.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index d92e5dad92f..5e916160a95 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -746,8 +746,7 @@ def dataset(source, schema=None, format=None, filesystem=None, format=format, partition_base_dir=partition_base_dir, exclude_invalid_files=exclude_invalid_files, - selector_ignore_prefixes=ignore_prefixes, - encoding=encoding + selector_ignore_prefixes=ignore_prefixes ) if _is_path_like(source): From b5848699412a0747b17cf5bae913b4617723c2a7 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 29 Jul 2022 15:45:33 +0200 Subject: [PATCH 12/31] Setting default encoding to utf8 --- python/pyarrow/_dataset.pyx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 645a80aa3e9..290520bbda2 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -1180,7 +1180,7 @@ cdef class CsvFileFormat(FileFormat): def __init__(self, ParseOptions parse_options=None, default_fragment_scan_options=None, ConvertOptions convert_options=None, - ReadOptions read_options=None, encoding=None): + ReadOptions read_options=None, encoding='utf8'): self.init(shared_ptr[CFileFormat](new CCsvFileFormat())) if parse_options is not None: self.parse_options = parse_options From d5dc249e406c1641961e1a12847d1d5187c752b7 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 29 Jul 2022 16:17:20 +0200 Subject: [PATCH 13/31] Always creating a default_fragment_scan_options --- python/pyarrow/_dataset.pyx | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 290520bbda2..8c59874ed20 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -1180,7 +1180,8 @@ cdef class CsvFileFormat(FileFormat): def __init__(self, ParseOptions parse_options=None, default_fragment_scan_options=None, ConvertOptions convert_options=None, - ReadOptions read_options=None, encoding='utf8'): + ReadOptions read_options=None, + encoding='utf8'): self.init(shared_ptr[CFileFormat](new CCsvFileFormat())) if parse_options is not None: self.parse_options = parse_options @@ -1200,6 +1201,9 @@ cdef class CsvFileFormat(FileFormat): raise TypeError('`default_fragment_scan_options` must be either ' 'a dictionary or an instance of ' 'CsvFragmentScanOptions') + else : + # default_fragment_scan_options is needed to add a transcoder + self.default_fragment_scan_options = CsvFragmentScanOptions() # Python-specific option self.encoding = encoding From a3edfccd7bacb6fb5ebd067cc7802106dbc925d7 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 29 Jul 2022 16:18:01 +0200 Subject: [PATCH 14/31] Using a different way of checking the file format --- python/pyarrow/dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 5e916160a95..b3f4ba1454e 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -433,7 +433,7 @@ def _filesystem_dataset(source, schema=None, filesystem=None, FileSystemDataset """ format = _ensure_format(format or 'parquet') - if format.default_fragment_scan_options.type_name == 'csv': + if isinstance(format, CsvFileFormat): format.default_fragment_scan_options.add_transcoder(format.encoding, "utf8") From ef19af09bddc7d6df0a0ff6ba9ed95e24565ea9f Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 29 Jul 2022 16:23:34 +0200 Subject: [PATCH 15/31] Added documentation about added encoding parameter --- python/pyarrow/_dataset.pyx | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 8c59874ed20..1839c4653e5 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -1169,6 +1169,9 @@ cdef class CsvFileFormat(FileFormat): General read options. default_fragment_scan_options : CsvFragmentScanOptions Default options for fragments scan. + encoding : str, optional (default 'utf8') + The character encoding of the CSV data. Columns that cannot + decode using this encoding can still be read as Binary. """ cdef: CCsvFileFormat* csv_format From c2244ab95c4b72836f0e747a7d36e73fba636a7b Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 29 Jul 2022 17:06:58 +0200 Subject: [PATCH 16/31] Implementaed an alternative way of passing the encoding. Instead of duplicating the encoding field in the CsvFileFormat, we store the encoding in a private field in the CsvFragmentScanOptions. In that class, the read_options.encoding field gets lost when initializing it by using the C struct (which doesn't have the encoding field). So when the read_options are read, we restore it again. --- python/pyarrow/_dataset.pyx | 29 +++++++++++++---------------- python/pyarrow/dataset.py | 3 ++- 2 files changed, 15 insertions(+), 17 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 1839c4653e5..010e479c3ec 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -1169,13 +1169,9 @@ cdef class CsvFileFormat(FileFormat): General read options. default_fragment_scan_options : CsvFragmentScanOptions Default options for fragments scan. - encoding : str, optional (default 'utf8') - The character encoding of the CSV data. Columns that cannot - decode using this encoding can still be read as Binary. """ cdef: CCsvFileFormat* csv_format - public object encoding # Avoid mistakingly creating attributes __slots__ = () @@ -1183,8 +1179,7 @@ cdef class CsvFileFormat(FileFormat): def __init__(self, ParseOptions parse_options=None, default_fragment_scan_options=None, ConvertOptions convert_options=None, - ReadOptions read_options=None, - encoding='utf8'): + ReadOptions read_options=None): self.init(shared_ptr[CFileFormat](new CCsvFileFormat())) if parse_options is not None: self.parse_options = parse_options @@ -1207,8 +1202,6 @@ cdef class CsvFileFormat(FileFormat): else : # default_fragment_scan_options is needed to add a transcoder self.default_fragment_scan_options = CsvFragmentScanOptions() - # Python-specific option - self.encoding = encoding cdef void init(self, const shared_ptr[CFileFormat]& sp): FileFormat.init(self, sp) @@ -1238,8 +1231,7 @@ cdef class CsvFileFormat(FileFormat): return ( self.parse_options.equals(other.parse_options) and self.default_fragment_scan_options == - other.default_fragment_scan_options and - self.encoding == other.encoding) + other.default_fragment_scan_options) def __reduce__(self): return CsvFileFormat, (self.parse_options, @@ -1263,6 +1255,7 @@ cdef class CsvFragmentScanOptions(FragmentScanOptions): cdef: CCsvFragmentScanOptions* csv_options + object encoding # Avoid mistakingly creating attributes __slots__ = () @@ -1275,6 +1268,7 @@ cdef class CsvFragmentScanOptions(FragmentScanOptions): self.convert_options = convert_options if read_options is not None: self.read_options = read_options + self.encoding = read_options.encoding cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp): FragmentScanOptions.init(self, sp) @@ -1290,18 +1284,21 @@ cdef class CsvFragmentScanOptions(FragmentScanOptions): @property def read_options(self): - return ReadOptions.wrap(self.csv_options.read_options) + read_options = ReadOptions.wrap(self.csv_options.read_options) + if self.encoding is not None: + # The encoding field in ReadOptions does not exist + # in the C struct. Re-set it here. + read_options.encoding = self.encoding + return read_options @read_options.setter def read_options(self, ReadOptions read_options not None): self.csv_options.read_options = deref(read_options.options) def add_transcoder(self, src_encoding, dest_encoding): - if src_encoding == dest_encoding: - return - - self.csv_options.stream_transform_func = deref( - make_streamwrap_func(src_encoding, dest_encoding)) + if src_encoding != dest_encoding: + self.csv_options.stream_transform_func = deref( + make_streamwrap_func(src_encoding, dest_encoding)) def equals(self, CsvFragmentScanOptions other): return ( diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index b3f4ba1454e..45e31a4ccc3 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -434,7 +434,8 @@ def _filesystem_dataset(source, schema=None, filesystem=None, """ format = _ensure_format(format or 'parquet') if isinstance(format, CsvFileFormat): - format.default_fragment_scan_options.add_transcoder(format.encoding, + format.default_fragment_scan_options.add_transcoder( + format.default_fragment_scan_options.read_options.encoding, "utf8") partitioning = _ensure_partitioning(partitioning) From 86446cd4130a17a9c82c470e274349acd1130a5c Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Mon, 1 Aug 2022 08:51:08 +0200 Subject: [PATCH 17/31] Added python-specific encoding field to CsvFileformat. It needs to be stored in both CsvFileFormat and CsvFragmentScanOptions because if the user has a reference to these separate objects, they would otherwise become inconsistent. 1 would report the default 'utf8' (forgetting the user's encoding choice), while the other would still properly report the requested encoding. To the user it would be unclear which of these values would be eventually used by the transcoding. --- python/pyarrow/_dataset.pyx | 36 +++++++++++++++++++++++++++--------- python/pyarrow/dataset.py | 4 ++-- 2 files changed, 29 insertions(+), 11 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 010e479c3ec..0b32f971a99 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -832,8 +832,13 @@ cdef class FileFormat(_Weakrefable): @property def default_fragment_scan_options(self): - return FragmentScanOptions.wrap( + dfso = FragmentScanOptions.wrap( self.wrapped.get().default_fragment_scan_options) + # CsvFileFormat stores a Python-specific encoding field that needs + # to be restored because it does not exist in the C struct + if self.encoding is not None: + dfso.encoding = self.encoding + return dfso @default_fragment_scan_options.setter def default_fragment_scan_options(self, FragmentScanOptions options): @@ -1172,6 +1177,10 @@ cdef class CsvFileFormat(FileFormat): """ cdef: CCsvFileFormat* csv_format + # The encoding field in ReadOptions does not exist + # in the C struct. We need to store it here and override it when + # reading read_options or default_fragment_scan_options + public object encoding # Avoid mistakingly creating attributes __slots__ = () @@ -1199,9 +1208,12 @@ cdef class CsvFileFormat(FileFormat): raise TypeError('`default_fragment_scan_options` must be either ' 'a dictionary or an instance of ' 'CsvFragmentScanOptions') - else : + else: # default_fragment_scan_options is needed to add a transcoder self.default_fragment_scan_options = CsvFragmentScanOptions() + if read_options is not None: + self.default_fragment_scan_options.encoding = read_options.encoding + self.encoding = read_options.encoding cdef void init(self, const shared_ptr[CFileFormat]& sp): FileFormat.init(self, sp) @@ -1224,6 +1236,8 @@ cdef class CsvFileFormat(FileFormat): cdef _set_default_fragment_scan_options(self, FragmentScanOptions options): if options.type_name == 'csv': self.csv_format.default_fragment_scan_options = options.wrapped + self.default_fragment_scan_options.encoding = options.encoding + self.encoding = options.encoding else: super()._set_default_fragment_scan_options(options) @@ -1255,17 +1269,21 @@ cdef class CsvFragmentScanOptions(FragmentScanOptions): cdef: CCsvFragmentScanOptions* csv_options - object encoding + # The encoding field in ReadOptions does not exist + # in the C struct. We need to store it here and override it when + # reading read_options + public object encoding # Avoid mistakingly creating attributes __slots__ = () def __init__(self, ConvertOptions convert_options=None, - ReadOptions read_options=None): + ReadOptions read_options=None, encoding='utf8'): self.init(shared_ptr[CFragmentScanOptions]( new CCsvFragmentScanOptions())) if convert_options is not None: self.convert_options = convert_options + self.encoding = encoding if read_options is not None: self.read_options = read_options self.encoding = read_options.encoding @@ -1286,29 +1304,29 @@ cdef class CsvFragmentScanOptions(FragmentScanOptions): def read_options(self): read_options = ReadOptions.wrap(self.csv_options.read_options) if self.encoding is not None: - # The encoding field in ReadOptions does not exist - # in the C struct. Re-set it here. read_options.encoding = self.encoding return read_options @read_options.setter def read_options(self, ReadOptions read_options not None): self.csv_options.read_options = deref(read_options.options) + self.encoding = read_options.encoding def add_transcoder(self, src_encoding, dest_encoding): if src_encoding != dest_encoding: self.csv_options.stream_transform_func = deref( - make_streamwrap_func(src_encoding, dest_encoding)) + make_streamwrap_func(src_encoding, dest_encoding)) def equals(self, CsvFragmentScanOptions other): return ( other and self.convert_options.equals(other.convert_options) and - self.read_options.equals(other.read_options)) + self.read_options.equals(other.read_options) and + self.encoding == other.encoding) def __reduce__(self): return CsvFragmentScanOptions, (self.convert_options, - self.read_options) + self.read_options, self.encoding) cdef class CsvFileWriteOptions(FileWriteOptions): diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 45e31a4ccc3..c10a6c05724 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -435,8 +435,8 @@ def _filesystem_dataset(source, schema=None, filesystem=None, format = _ensure_format(format or 'parquet') if isinstance(format, CsvFileFormat): format.default_fragment_scan_options.add_transcoder( - format.default_fragment_scan_options.read_options.encoding, - "utf8") + format.default_fragment_scan_options.read_options.encoding, + "utf8") partitioning = _ensure_partitioning(partitioning) From 5d066c1f03fb670beec9e20889aa69bc3a388ef6 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Mon, 1 Aug 2022 14:59:46 +0200 Subject: [PATCH 18/31] Removed encoding from CsvFragmentScanOptions --- python/pyarrow/_dataset.pyx | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 0b32f971a99..a8cf0c5e08a 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -1321,8 +1321,7 @@ cdef class CsvFragmentScanOptions(FragmentScanOptions): return ( other and self.convert_options.equals(other.convert_options) and - self.read_options.equals(other.read_options) and - self.encoding == other.encoding) + self.read_options.equals(other.read_options)) def __reduce__(self): return CsvFragmentScanOptions, (self.convert_options, From e0943fa8a37319545af7f310bcb61a7224e1af9f Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Tue, 9 Aug 2022 09:23:44 +0200 Subject: [PATCH 19/31] Instead a python encoding field, now passing around a Python ReadOptions object --- python/pyarrow/_dataset.pyx | 41 +++++++++++++++++-------------------- python/pyarrow/dataset.py | 5 ----- 2 files changed, 19 insertions(+), 27 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index a8cf0c5e08a..e5dd81b9de5 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -835,9 +835,10 @@ cdef class FileFormat(_Weakrefable): dfso = FragmentScanOptions.wrap( self.wrapped.get().default_fragment_scan_options) # CsvFileFormat stores a Python-specific encoding field that needs - # to be restored because it does not exist in the C struct - if self.encoding is not None: - dfso.encoding = self.encoding + # to be restored because it does not exist in the C++ struct + if isinstance(self, CsvFileFormat): + if self.read_options_py is not None: + dfso.read_options = self.read_options_py return dfso @default_fragment_scan_options.setter @@ -1178,9 +1179,9 @@ cdef class CsvFileFormat(FileFormat): cdef: CCsvFileFormat* csv_format # The encoding field in ReadOptions does not exist - # in the C struct. We need to store it here and override it when + # in the C++ struct. We need to store it here and override it when # reading read_options or default_fragment_scan_options - public object encoding + public ReadOptions read_options_py # Avoid mistakingly creating attributes __slots__ = () @@ -1212,8 +1213,7 @@ cdef class CsvFileFormat(FileFormat): # default_fragment_scan_options is needed to add a transcoder self.default_fragment_scan_options = CsvFragmentScanOptions() if read_options is not None: - self.default_fragment_scan_options.encoding = read_options.encoding - self.encoding = read_options.encoding + self.read_options_py = read_options cdef void init(self, const shared_ptr[CFileFormat]& sp): FileFormat.init(self, sp) @@ -1236,8 +1236,8 @@ cdef class CsvFileFormat(FileFormat): cdef _set_default_fragment_scan_options(self, FragmentScanOptions options): if options.type_name == 'csv': self.csv_format.default_fragment_scan_options = options.wrapped - self.default_fragment_scan_options.encoding = options.encoding - self.encoding = options.encoding + self.default_fragment_scan_options.read_options = options.read_options + self.read_options_py = options.read_options else: super()._set_default_fragment_scan_options(options) @@ -1270,23 +1270,22 @@ cdef class CsvFragmentScanOptions(FragmentScanOptions): cdef: CCsvFragmentScanOptions* csv_options # The encoding field in ReadOptions does not exist - # in the C struct. We need to store it here and override it when + # in the C++ struct. We need to store it here and override it when # reading read_options - public object encoding + ReadOptions read_options_py # Avoid mistakingly creating attributes __slots__ = () def __init__(self, ConvertOptions convert_options=None, - ReadOptions read_options=None, encoding='utf8'): + ReadOptions read_options=None): self.init(shared_ptr[CFragmentScanOptions]( new CCsvFragmentScanOptions())) if convert_options is not None: self.convert_options = convert_options - self.encoding = encoding if read_options is not None: self.read_options = read_options - self.encoding = read_options.encoding + self.read_options_py = read_options cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp): FragmentScanOptions.init(self, sp) @@ -1303,19 +1302,17 @@ cdef class CsvFragmentScanOptions(FragmentScanOptions): @property def read_options(self): read_options = ReadOptions.wrap(self.csv_options.read_options) - if self.encoding is not None: - read_options.encoding = self.encoding + if self.read_options_py is not None: + read_options.encoding = self.read_options_py.encoding return read_options @read_options.setter def read_options(self, ReadOptions read_options not None): self.csv_options.read_options = deref(read_options.options) - self.encoding = read_options.encoding - - def add_transcoder(self, src_encoding, dest_encoding): - if src_encoding != dest_encoding: + self.read_options_py = read_options + if (read_options.encoding != 'utf8'): self.csv_options.stream_transform_func = deref( - make_streamwrap_func(src_encoding, dest_encoding)) + make_streamwrap_func(read_options.encoding, 'utf8')) def equals(self, CsvFragmentScanOptions other): return ( @@ -1325,7 +1322,7 @@ cdef class CsvFragmentScanOptions(FragmentScanOptions): def __reduce__(self): return CsvFragmentScanOptions, (self.convert_options, - self.read_options, self.encoding) + self.read_options) cdef class CsvFileWriteOptions(FileWriteOptions): diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index c10a6c05724..326b37ec6e1 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -433,11 +433,6 @@ def _filesystem_dataset(source, schema=None, filesystem=None, FileSystemDataset """ format = _ensure_format(format or 'parquet') - if isinstance(format, CsvFileFormat): - format.default_fragment_scan_options.add_transcoder( - format.default_fragment_scan_options.read_options.encoding, - "utf8") - partitioning = _ensure_partitioning(partitioning) if isinstance(source, (list, tuple)): From ecee10f8a70aca8137315e416d4207124336edd4 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Tue, 9 Aug 2022 16:52:30 +0200 Subject: [PATCH 20/31] Removed redundant parentheses --- python/pyarrow/_dataset.pyx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index e5dd81b9de5..7b26ac72470 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -1310,7 +1310,7 @@ cdef class CsvFragmentScanOptions(FragmentScanOptions): def read_options(self, ReadOptions read_options not None): self.csv_options.read_options = deref(read_options.options) self.read_options_py = read_options - if (read_options.encoding != 'utf8'): + if read_options.encoding != 'utf8': self.csv_options.stream_transform_func = deref( make_streamwrap_func(read_options.encoding, 'utf8')) From cd9ec336869a7ad7e81a1a1e564d4018e664a0e3 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Tue, 9 Aug 2022 16:53:53 +0200 Subject: [PATCH 21/31] Added ARROW_PYTHON_EXPORT to new function --- python/pyarrow/src/io.h | 1 + 1 file changed, 1 insertion(+) diff --git a/python/pyarrow/src/io.h b/python/pyarrow/src/io.h index 2ff1b8eddf3..9d79d566efe 100644 --- a/python/pyarrow/src/io.h +++ b/python/pyarrow/src/io.h @@ -114,6 +114,7 @@ std::shared_ptr<::arrow::io::InputStream> MakeTransformInputStream( using StreamWrapFunc = std::function>( std::shared_ptr)>; +ARROW_PYTHON_EXPORT std::shared_ptr MakeStreamTransformFunc(TransformInputStreamVTable vtable, PyObject* handler); } // namespace py From c8d886a8238b9ed800aae619530560eabbfd909f Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Tue, 9 Aug 2022 17:30:42 +0200 Subject: [PATCH 22/31] Always creating default_fragment_scan_options is not needed anymore, now we're setting the transcoder in the read_options setter --- python/pyarrow/_dataset.pyx | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 7b26ac72470..2174486256d 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -1178,9 +1178,9 @@ cdef class CsvFileFormat(FileFormat): """ cdef: CCsvFileFormat* csv_format - # The encoding field in ReadOptions does not exist - # in the C++ struct. We need to store it here and override it when - # reading read_options or default_fragment_scan_options + # The encoding field in ReadOptions does not exist in the C++ struct. + # We need to store it here and override it when reading + # default_fragment_scan_options.read_options public ReadOptions read_options_py # Avoid mistakingly creating attributes @@ -1209,9 +1209,6 @@ cdef class CsvFileFormat(FileFormat): raise TypeError('`default_fragment_scan_options` must be either ' 'a dictionary or an instance of ' 'CsvFragmentScanOptions') - else: - # default_fragment_scan_options is needed to add a transcoder - self.default_fragment_scan_options = CsvFragmentScanOptions() if read_options is not None: self.read_options_py = read_options @@ -1269,9 +1266,8 @@ cdef class CsvFragmentScanOptions(FragmentScanOptions): cdef: CCsvFragmentScanOptions* csv_options - # The encoding field in ReadOptions does not exist - # in the C++ struct. We need to store it here and override it when - # reading read_options + # The encoding field in ReadOptions does not exist in the C++ struct. + # We need to store it here and override it when reading read_options ReadOptions read_options_py # Avoid mistakingly creating attributes From bea7fe75c9af93066796179915966ce46ffe8871 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Wed, 10 Aug 2022 09:06:32 +0200 Subject: [PATCH 23/31] Formatting --- python/pyarrow/_dataset.pyx | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 2174486256d..773eb17ffa4 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -1179,7 +1179,7 @@ cdef class CsvFileFormat(FileFormat): cdef: CCsvFileFormat* csv_format # The encoding field in ReadOptions does not exist in the C++ struct. - # We need to store it here and override it when reading + # We need to store it here and override it when reading # default_fragment_scan_options.read_options public ReadOptions read_options_py @@ -1266,7 +1266,7 @@ cdef class CsvFragmentScanOptions(FragmentScanOptions): cdef: CCsvFragmentScanOptions* csv_options - # The encoding field in ReadOptions does not exist in the C++ struct. + # The encoding field in ReadOptions does not exist in the C++ struct. # We need to store it here and override it when reading read_options ReadOptions read_options_py From d0110c06e35ad72405326153a740b5b1206b3d9e Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Mon, 15 Aug 2022 13:16:08 +0200 Subject: [PATCH 24/31] Added python tests for transcoding dataset. Schema detection does not seem to be working properly for UTF16. --- python/pyarrow/tests/test_dataset.py | 40 ++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 3dc9c3beb6e..33c615bc886 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -3130,6 +3130,46 @@ def test_csv_fragment_options(tempdir, dataset_reader): pa.table({'col0': pa.array(['foo', 'spam', 'MYNULL'])})) +def test_encoding(tempdir, dataset_reader): + path = str(tempdir / 'test.csv') + + for encoding, input_rows, expected_table in [ + ('latin-1', b"a,b\nun,\xe9l\xe9phant", {'a': ["un"], + 'b': [b"\xe9l\xe9phant"]}), + ('utf16', b'\x00a\x00,\x00b\x00\n\x00u\x00n\x00,\x00\xe9\x00l\x00\xe9\x00p\x00h\x00a\x00n\x00t\x00', + {'a': [b"\x00u\x00n"], 'b': [b"\x00\xe9\x00l\x00\xe9\x00p\x00h\x00a\x00n\x00t\x00"]}) + ]: + + with open(path, 'wb') as sink: + sink.write(input_rows) + + # Interpret as binary data: + expected_binary_schema = pa.schema([('a', pa.string()), + ('b', pa.binary())]) + expected_binary_table = pa.table(expected_table, schema=expected_binary_schema) + + # Reading in binary should still work + dataset_binary = ds.dataset(path, format='csv') + assert dataset_binary.to_table().equals(expected_binary_table) + + # Interpret as utf8: + expected_schema = pa.schema([("a", pa.string()), ("b", pa.string())]) + expected_table = pa.table({'a': ["un"], + 'b': ["éléphant"]}, schema=expected_schema) + + # Reading as string without specifying encoding should produce an error + dataset = ds.dataset(path, format='csv', schema=expected_schema) + with pytest.raises(pyarrow.lib.ArrowInvalid, match="invalid UTF8"): + result = dataset_reader.to_table(dataset) + + # Setting the encoding in the read_options should transcode the data + read_options = pa.csv.ReadOptions(encoding=encoding) + file_format = ds.CsvFileFormat(read_options=read_options) + dataset_transcoded = ds.dataset(path, format=file_format) + assert dataset_transcoded.schema.equals(expected_schema) + assert dataset_transcoded.to_table().equals(expected_table) + + def test_feather_format(tempdir, dataset_reader): from pyarrow.feather import write_feather From 8d27c128b49fee3dae6a3dcc711a51cd9d6c5d27 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Mon, 15 Aug 2022 13:18:02 +0200 Subject: [PATCH 25/31] Removed failing UTF16 test --- python/pyarrow/tests/test_dataset.py | 63 +++++++++++++--------------- 1 file changed, 29 insertions(+), 34 deletions(-) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 33c615bc886..e0eff81dd90 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -3133,41 +3133,36 @@ def test_csv_fragment_options(tempdir, dataset_reader): def test_encoding(tempdir, dataset_reader): path = str(tempdir / 'test.csv') - for encoding, input_rows, expected_table in [ - ('latin-1', b"a,b\nun,\xe9l\xe9phant", {'a': ["un"], - 'b': [b"\xe9l\xe9phant"]}), - ('utf16', b'\x00a\x00,\x00b\x00\n\x00u\x00n\x00,\x00\xe9\x00l\x00\xe9\x00p\x00h\x00a\x00n\x00t\x00', - {'a': [b"\x00u\x00n"], 'b': [b"\x00\xe9\x00l\x00\xe9\x00p\x00h\x00a\x00n\x00t\x00"]}) - ]: + with open(path, 'wb') as sink: + sink.write(b"a,b\nun,\xe9l\xe9phant") + + # Interpret as binary data: + expected_binary_schema = pa.schema([('a', pa.string()), + ('b', pa.binary())]) + expected_binary_table = pa.table({'a': ["un"], + 'b': [b"\xe9l\xe9phant"]}, + schema=expected_binary_schema) + + # Reading in binary should still work + dataset_binary = ds.dataset(path, format='csv') + assert dataset_binary.to_table().equals(expected_binary_table) + + # Interpret as utf8: + expected_schema = pa.schema([("a", pa.string()), ("b", pa.string())]) + expected_table = pa.table({'a': ["un"], + 'b': ["éléphant"]}, schema=expected_schema) + + # Reading as string without specifying encoding should produce an error + dataset = ds.dataset(path, format='csv', schema=expected_schema) + with pytest.raises(pyarrow.lib.ArrowInvalid, match="invalid UTF8"): + dataset_reader.to_table(dataset) - with open(path, 'wb') as sink: - sink.write(input_rows) - - # Interpret as binary data: - expected_binary_schema = pa.schema([('a', pa.string()), - ('b', pa.binary())]) - expected_binary_table = pa.table(expected_table, schema=expected_binary_schema) - - # Reading in binary should still work - dataset_binary = ds.dataset(path, format='csv') - assert dataset_binary.to_table().equals(expected_binary_table) - - # Interpret as utf8: - expected_schema = pa.schema([("a", pa.string()), ("b", pa.string())]) - expected_table = pa.table({'a': ["un"], - 'b': ["éléphant"]}, schema=expected_schema) - - # Reading as string without specifying encoding should produce an error - dataset = ds.dataset(path, format='csv', schema=expected_schema) - with pytest.raises(pyarrow.lib.ArrowInvalid, match="invalid UTF8"): - result = dataset_reader.to_table(dataset) - - # Setting the encoding in the read_options should transcode the data - read_options = pa.csv.ReadOptions(encoding=encoding) - file_format = ds.CsvFileFormat(read_options=read_options) - dataset_transcoded = ds.dataset(path, format=file_format) - assert dataset_transcoded.schema.equals(expected_schema) - assert dataset_transcoded.to_table().equals(expected_table) + # Setting the encoding in the read_options should transcode the data + read_options = pa.csv.ReadOptions(encoding='latin-1') + file_format = ds.CsvFileFormat(read_options=read_options) + dataset_transcoded = ds.dataset(path, format=file_format) + assert dataset_transcoded.schema.equals(expected_schema) + assert dataset_transcoded.to_table().equals(expected_table) def test_feather_format(tempdir, dataset_reader): From 154ed23e773910fa9f7bafc0df8aa66f133ed560 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Tue, 16 Aug 2022 13:23:43 +0200 Subject: [PATCH 26/31] Revert "Removed failing UTF16 test" This reverts commit 47a3462b756cf92594470cedcd0f56eaf6248016. --- python/pyarrow/tests/test_dataset.py | 63 +++++++++++++++------------- 1 file changed, 34 insertions(+), 29 deletions(-) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index e0eff81dd90..33c615bc886 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -3133,36 +3133,41 @@ def test_csv_fragment_options(tempdir, dataset_reader): def test_encoding(tempdir, dataset_reader): path = str(tempdir / 'test.csv') - with open(path, 'wb') as sink: - sink.write(b"a,b\nun,\xe9l\xe9phant") - - # Interpret as binary data: - expected_binary_schema = pa.schema([('a', pa.string()), - ('b', pa.binary())]) - expected_binary_table = pa.table({'a': ["un"], - 'b': [b"\xe9l\xe9phant"]}, - schema=expected_binary_schema) - - # Reading in binary should still work - dataset_binary = ds.dataset(path, format='csv') - assert dataset_binary.to_table().equals(expected_binary_table) - - # Interpret as utf8: - expected_schema = pa.schema([("a", pa.string()), ("b", pa.string())]) - expected_table = pa.table({'a': ["un"], - 'b': ["éléphant"]}, schema=expected_schema) - - # Reading as string without specifying encoding should produce an error - dataset = ds.dataset(path, format='csv', schema=expected_schema) - with pytest.raises(pyarrow.lib.ArrowInvalid, match="invalid UTF8"): - dataset_reader.to_table(dataset) + for encoding, input_rows, expected_table in [ + ('latin-1', b"a,b\nun,\xe9l\xe9phant", {'a': ["un"], + 'b': [b"\xe9l\xe9phant"]}), + ('utf16', b'\x00a\x00,\x00b\x00\n\x00u\x00n\x00,\x00\xe9\x00l\x00\xe9\x00p\x00h\x00a\x00n\x00t\x00', + {'a': [b"\x00u\x00n"], 'b': [b"\x00\xe9\x00l\x00\xe9\x00p\x00h\x00a\x00n\x00t\x00"]}) + ]: - # Setting the encoding in the read_options should transcode the data - read_options = pa.csv.ReadOptions(encoding='latin-1') - file_format = ds.CsvFileFormat(read_options=read_options) - dataset_transcoded = ds.dataset(path, format=file_format) - assert dataset_transcoded.schema.equals(expected_schema) - assert dataset_transcoded.to_table().equals(expected_table) + with open(path, 'wb') as sink: + sink.write(input_rows) + + # Interpret as binary data: + expected_binary_schema = pa.schema([('a', pa.string()), + ('b', pa.binary())]) + expected_binary_table = pa.table(expected_table, schema=expected_binary_schema) + + # Reading in binary should still work + dataset_binary = ds.dataset(path, format='csv') + assert dataset_binary.to_table().equals(expected_binary_table) + + # Interpret as utf8: + expected_schema = pa.schema([("a", pa.string()), ("b", pa.string())]) + expected_table = pa.table({'a': ["un"], + 'b': ["éléphant"]}, schema=expected_schema) + + # Reading as string without specifying encoding should produce an error + dataset = ds.dataset(path, format='csv', schema=expected_schema) + with pytest.raises(pyarrow.lib.ArrowInvalid, match="invalid UTF8"): + result = dataset_reader.to_table(dataset) + + # Setting the encoding in the read_options should transcode the data + read_options = pa.csv.ReadOptions(encoding=encoding) + file_format = ds.CsvFileFormat(read_options=read_options) + dataset_transcoded = ds.dataset(path, format=file_format) + assert dataset_transcoded.schema.equals(expected_schema) + assert dataset_transcoded.to_table().equals(expected_table) def test_feather_format(tempdir, dataset_reader): From 2c2776b024a5424d3128cf74aeac24f1b8853df1 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Tue, 16 Aug 2022 13:40:12 +0200 Subject: [PATCH 27/31] Added a test for a csv file with non-utf8 characters in the column names --- python/pyarrow/tests/test_dataset.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 33c615bc886..323eda7b33e 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -3170,6 +3170,31 @@ def test_encoding(tempdir, dataset_reader): assert dataset_transcoded.to_table().equals(expected_table) +# Test if a dataset with non-utf8 chars in the column names is properly handled +def test_column_names_encoding(tempdir, dataset_reader): + path = str(tempdir / 'test.csv') + + with open(path, 'wb') as sink: + sink.write(b"\xe9,b\nun,\xe9l\xe9phant") + + # Interpret as utf8: + expected_schema = pa.schema([("é", pa.string()), ("b", pa.string())]) + expected_table = pa.table({'é': ["un"], + 'b': ["éléphant"]}, schema=expected_schema) + + # Reading as string without specifying encoding should produce an error + dataset = ds.dataset(path, format='csv', schema=expected_schema) + with pytest.raises(pyarrow.lib.ArrowInvalid, match="invalid UTF8"): + result = dataset_reader.to_table(dataset) + + # Setting the encoding in the read_options should transcode the data + read_options = pa.csv.ReadOptions(encoding='latin-1') + file_format = ds.CsvFileFormat(read_options=read_options) + dataset_transcoded = ds.dataset(path, format=file_format) + assert dataset_transcoded.schema.equals(expected_schema) + assert dataset_transcoded.to_table().equals(expected_table) + + def test_feather_format(tempdir, dataset_reader): from pyarrow.feather import write_feather From 93ec2a42f808ad82d0dc283ebd7acb9dea8bf155 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Tue, 16 Aug 2022 15:55:38 +0200 Subject: [PATCH 28/31] Removed subchecks in encoding tests. Testing if reading a utf16 file as binary works fails, because the column names are not utf8 causing issues parsing the schema. Testing if reading a utf16 file without transcoder fails does not work, because the characters are not invalid utf8 (meaning no error is triggered) --- python/pyarrow/tests/test_dataset.py | 31 ++++++++-------------------- 1 file changed, 9 insertions(+), 22 deletions(-) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 323eda7b33e..767f17d4a02 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -3134,35 +3134,22 @@ def test_encoding(tempdir, dataset_reader): path = str(tempdir / 'test.csv') for encoding, input_rows, expected_table in [ - ('latin-1', b"a,b\nun,\xe9l\xe9phant", {'a': ["un"], - 'b': [b"\xe9l\xe9phant"]}), - ('utf16', b'\x00a\x00,\x00b\x00\n\x00u\x00n\x00,\x00\xe9\x00l\x00\xe9\x00p\x00h\x00a\x00n\x00t\x00', - {'a': [b"\x00u\x00n"], 'b': [b"\x00\xe9\x00l\x00\xe9\x00p\x00h\x00a\x00n\x00t\x00"]}) + ('latin-1', b"a,b\nun,\xe9l\xe9phant", + {'a': ["un"], 'b': [b"\xe9l\xe9phant"]}), + ('utf16', b'\xff\xfea\x00,\x00b\x00\n\x00u\x00n\x00,' + b'\x00\xe9\x00l\x00\xe9\x00p\x00h\x00a\x00n\x00t\x00', + {'a': [b"\x00u\x00n"], + 'b': [b"\x00\xe9\x00l\x00\xe9\x00p\x00h\x00a\x00n\x00t\x00"]}) ]: with open(path, 'wb') as sink: sink.write(input_rows) - # Interpret as binary data: - expected_binary_schema = pa.schema([('a', pa.string()), - ('b', pa.binary())]) - expected_binary_table = pa.table(expected_table, schema=expected_binary_schema) - - # Reading in binary should still work - dataset_binary = ds.dataset(path, format='csv') - assert dataset_binary.to_table().equals(expected_binary_table) - # Interpret as utf8: expected_schema = pa.schema([("a", pa.string()), ("b", pa.string())]) expected_table = pa.table({'a': ["un"], - 'b': ["éléphant"]}, schema=expected_schema) - - # Reading as string without specifying encoding should produce an error - dataset = ds.dataset(path, format='csv', schema=expected_schema) - with pytest.raises(pyarrow.lib.ArrowInvalid, match="invalid UTF8"): - result = dataset_reader.to_table(dataset) + 'b': ["éléphant"]}, schema=expected_schema) - # Setting the encoding in the read_options should transcode the data read_options = pa.csv.ReadOptions(encoding=encoding) file_format = ds.CsvFileFormat(read_options=read_options) dataset_transcoded = ds.dataset(path, format=file_format) @@ -3180,12 +3167,12 @@ def test_column_names_encoding(tempdir, dataset_reader): # Interpret as utf8: expected_schema = pa.schema([("é", pa.string()), ("b", pa.string())]) expected_table = pa.table({'é': ["un"], - 'b': ["éléphant"]}, schema=expected_schema) + 'b': ["éléphant"]}, schema=expected_schema) # Reading as string without specifying encoding should produce an error dataset = ds.dataset(path, format='csv', schema=expected_schema) with pytest.raises(pyarrow.lib.ArrowInvalid, match="invalid UTF8"): - result = dataset_reader.to_table(dataset) + dataset_reader.to_table(dataset) # Setting the encoding in the read_options should transcode the data read_options = pa.csv.ReadOptions(encoding='latin-1') From b9982c8d28d898bbf506135bf53bad629728afef Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 26 Aug 2022 15:13:44 +0200 Subject: [PATCH 29/31] Renamed read_options_py to _read_options_py --- python/pyarrow/_dataset.pyx | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 773eb17ffa4..976f999bd0f 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -837,8 +837,8 @@ cdef class FileFormat(_Weakrefable): # CsvFileFormat stores a Python-specific encoding field that needs # to be restored because it does not exist in the C++ struct if isinstance(self, CsvFileFormat): - if self.read_options_py is not None: - dfso.read_options = self.read_options_py + if self._read_options_py is not None: + dfso.read_options = self._read_options_py return dfso @default_fragment_scan_options.setter @@ -1181,7 +1181,7 @@ cdef class CsvFileFormat(FileFormat): # The encoding field in ReadOptions does not exist in the C++ struct. # We need to store it here and override it when reading # default_fragment_scan_options.read_options - public ReadOptions read_options_py + public ReadOptions _read_options_py # Avoid mistakingly creating attributes __slots__ = () @@ -1210,7 +1210,7 @@ cdef class CsvFileFormat(FileFormat): 'a dictionary or an instance of ' 'CsvFragmentScanOptions') if read_options is not None: - self.read_options_py = read_options + self._read_options_py = read_options cdef void init(self, const shared_ptr[CFileFormat]& sp): FileFormat.init(self, sp) @@ -1234,7 +1234,7 @@ cdef class CsvFileFormat(FileFormat): if options.type_name == 'csv': self.csv_format.default_fragment_scan_options = options.wrapped self.default_fragment_scan_options.read_options = options.read_options - self.read_options_py = options.read_options + self._read_options_py = options.read_options else: super()._set_default_fragment_scan_options(options) @@ -1268,7 +1268,7 @@ cdef class CsvFragmentScanOptions(FragmentScanOptions): CCsvFragmentScanOptions* csv_options # The encoding field in ReadOptions does not exist in the C++ struct. # We need to store it here and override it when reading read_options - ReadOptions read_options_py + ReadOptions _read_options_py # Avoid mistakingly creating attributes __slots__ = () @@ -1281,7 +1281,7 @@ cdef class CsvFragmentScanOptions(FragmentScanOptions): self.convert_options = convert_options if read_options is not None: self.read_options = read_options - self.read_options_py = read_options + self._read_options_py = read_options cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp): FragmentScanOptions.init(self, sp) @@ -1298,14 +1298,14 @@ cdef class CsvFragmentScanOptions(FragmentScanOptions): @property def read_options(self): read_options = ReadOptions.wrap(self.csv_options.read_options) - if self.read_options_py is not None: - read_options.encoding = self.read_options_py.encoding + if self._read_options_py is not None: + read_options.encoding = self._read_options_py.encoding return read_options @read_options.setter def read_options(self, ReadOptions read_options not None): self.csv_options.read_options = deref(read_options.options) - self.read_options_py = read_options + self._read_options_py = read_options if read_options.encoding != 'utf8': self.csv_options.stream_transform_func = deref( make_streamwrap_func(read_options.encoding, 'utf8')) From b3ac6979b0e78c96401dfe03298003287d1f9b7e Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 26 Aug 2022 15:26:36 +0200 Subject: [PATCH 30/31] Removed unused test data --- python/pyarrow/tests/test_dataset.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/python/pyarrow/tests/test_dataset.py b/python/pyarrow/tests/test_dataset.py index 767f17d4a02..e6aa789e792 100644 --- a/python/pyarrow/tests/test_dataset.py +++ b/python/pyarrow/tests/test_dataset.py @@ -3133,13 +3133,10 @@ def test_csv_fragment_options(tempdir, dataset_reader): def test_encoding(tempdir, dataset_reader): path = str(tempdir / 'test.csv') - for encoding, input_rows, expected_table in [ - ('latin-1', b"a,b\nun,\xe9l\xe9phant", - {'a': ["un"], 'b': [b"\xe9l\xe9phant"]}), + for encoding, input_rows in [ + ('latin-1', b"a,b\nun,\xe9l\xe9phant"), ('utf16', b'\xff\xfea\x00,\x00b\x00\n\x00u\x00n\x00,' - b'\x00\xe9\x00l\x00\xe9\x00p\x00h\x00a\x00n\x00t\x00', - {'a': [b"\x00u\x00n"], - 'b': [b"\x00\xe9\x00l\x00\xe9\x00p\x00h\x00a\x00n\x00t\x00"]}) + b'\x00\xe9\x00l\x00\xe9\x00p\x00h\x00a\x00n\x00t\x00'), ]: with open(path, 'wb') as sink: From 1e621fa0c12493798c98447043eafde2ca250dac Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 26 Aug 2022 15:37:50 +0200 Subject: [PATCH 31/31] Handling codec aliases by using codecs.lookup --- python/pyarrow/_dataset.pyx | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 976f999bd0f..c22d992ff18 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -1306,9 +1306,9 @@ cdef class CsvFragmentScanOptions(FragmentScanOptions): def read_options(self, ReadOptions read_options not None): self.csv_options.read_options = deref(read_options.options) self._read_options_py = read_options - if read_options.encoding != 'utf8': + if codecs.lookup(read_options.encoding).name != 'utf-8': self.csv_options.stream_transform_func = deref( - make_streamwrap_func(read_options.encoding, 'utf8')) + make_streamwrap_func(read_options.encoding, 'utf-8')) def equals(self, CsvFragmentScanOptions other): return (