From 5d7c2bdb2db6bdb88fc3e8f6067abf63a7afff84 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Tue, 26 Jul 2022 12:18:12 +0200 Subject: [PATCH 01/27] Added field to CsvFragmentScanOptions that holds an optional transform function --- cpp/src/arrow/dataset/file_csv.cc | 10 ++++++++-- cpp/src/arrow/dataset/file_csv.h | 10 ++++++++++ cpp/src/arrow/python/io.cc | 10 ++++++++++ cpp/src/arrow/python/io.h | 3 +++ python/pyarrow/includes/libarrow.pxd | 8 ++++++++ python/pyarrow/includes/libarrow_dataset.pxd | 1 + 6 files changed, 40 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc index d4e0af7808c..3b9200c003b 100644 --- a/cpp/src/arrow/dataset/file_csv.cc +++ b/cpp/src/arrow/dataset/file_csv.cc @@ -183,9 +183,15 @@ static inline Future> OpenReaderAsync( auto tracer = arrow::internal::tracing::GetTracer(); auto span = tracer->StartSpan("arrow::dataset::CsvFileFormat::OpenReaderAsync"); #endif - ARROW_ASSIGN_OR_RAISE(auto reader_options, GetReadOptions(format, scan_options)); - + ARROW_ASSIGN_OR_RAISE( + auto fragment_scan_options, + GetFragmentScanOptions( + kCsvTypeName, scan_options.get(), format.default_fragment_scan_options)); + auto reader_options = fragment_scan_options->read_options; ARROW_ASSIGN_OR_RAISE(auto input, source.OpenCompressed()); + if (fragment_scan_options->stream_transform_func) { + ARROW_ASSIGN_OR_RAISE(input, fragment_scan_options->stream_transform_func(input)); + } const auto& path = source.path(); ARROW_ASSIGN_OR_RAISE( input, io::BufferedInputStream::Create(reader_options.block_size, diff --git a/cpp/src/arrow/dataset/file_csv.h b/cpp/src/arrow/dataset/file_csv.h index 83dbb88b85f..84bcf94abe3 100644 --- a/cpp/src/arrow/dataset/file_csv.h +++ b/cpp/src/arrow/dataset/file_csv.h @@ -73,6 +73,9 @@ class ARROW_DS_EXPORT CsvFileFormat : public FileFormat { struct ARROW_DS_EXPORT CsvFragmentScanOptions : public FragmentScanOptions { std::string type_name() const override { return kCsvTypeName; } + using StreamWrapFunc = std::function>( + std::shared_ptr)>; + /// CSV conversion options csv::ConvertOptions convert_options = csv::ConvertOptions::Defaults(); @@ -80,6 +83,13 @@ struct ARROW_DS_EXPORT CsvFragmentScanOptions : public FragmentScanOptions { /// /// Note that use_threads is always ignored. csv::ReadOptions read_options = csv::ReadOptions::Defaults(); + + /// Optional stream wrapping function + /// + /// If defined, all open dataset file fragments will be passed + /// through this function. One possible use case is to transparently + /// transcode all input files from a given character set to utf8. + StreamWrapFunc stream_transform_func{}; }; class ARROW_DS_EXPORT CsvFileWriteOptions : public FileWriteOptions { diff --git a/cpp/src/arrow/python/io.cc b/cpp/src/arrow/python/io.cc index 73525feed38..e80a803ba21 100644 --- a/cpp/src/arrow/python/io.cc +++ b/cpp/src/arrow/python/io.cc @@ -370,5 +370,15 @@ std::shared_ptr<::arrow::io::InputStream> MakeTransformInputStream( return std::make_shared(std::move(wrapped), std::move(transform)); } +std::function>(std::shared_ptr)> +makeStreamTransformFunc(TransformInputStreamVTable vtable, PyObject* handler) { + std::function>( + std::shared_ptr)> + func = [=](std::shared_ptr<::arrow::io::InputStream> wrapped) { + return MakeTransformInputStream(wrapped, vtable, handler); + }; + return func; +} + } // namespace py } // namespace arrow diff --git a/cpp/src/arrow/python/io.h b/cpp/src/arrow/python/io.h index a38d0ca332c..41563677a17 100644 --- a/cpp/src/arrow/python/io.h +++ b/cpp/src/arrow/python/io.h @@ -112,5 +112,8 @@ std::shared_ptr<::arrow::io::InputStream> MakeTransformInputStream( std::shared_ptr<::arrow::io::InputStream> wrapped, TransformInputStreamVTable vtable, PyObject* arg); +std::function>(std::shared_ptr)> +makeStreamTransformFunc(TransformInputStreamVTable vtable, PyObject* handler); + } // namespace py } // namespace arrow diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 4cbcef84e88..0b1303aa80c 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -1369,6 +1369,14 @@ cdef extern from "arrow/io/api.h" namespace "arrow::io" nogil: shared_ptr[CInputStream] wrapped, CTransformInputStreamVTable vtable, object method_arg) + ctypedef CResult[shared_ptr[CInputStream]] StreamWrapFunc( + shared_ptr[CInputStream]) + + function[StreamWrapFunc] makeStreamTransformFunc \ + "arrow::py::makeStreamTransformFunc"( + CTransformInputStreamVTable vtable, + object method_arg) + # ---------------------------------------------------------------------- # HDFS diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index bd8fbd1b56a..ad1bbbc5442 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -277,6 +277,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: "arrow::dataset::CsvFragmentScanOptions"(CFragmentScanOptions): CCSVConvertOptions convert_options CCSVReadOptions read_options + function[StreamWrapFunc] stream_transform_func cdef cppclass CPartitioning "arrow::dataset::Partitioning": c_string type_name() const From a82239d87bcab7fa2205cb424212b413d70359f7 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Tue, 26 Jul 2022 13:17:28 +0200 Subject: [PATCH 02/27] WIP wrapping a trancoder around all input streams of a dataset --- python/pyarrow/_dataset.pyx | 54 +++++++++++++++++++++++++++++++++++++ python/pyarrow/dataset.py | 10 ++++--- 2 files changed, 61 insertions(+), 3 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 68833a5350e..d505f638556 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -21,6 +21,7 @@ from cython.operator cimport dereference as deref +import codecs import collections import os import warnings @@ -1237,6 +1238,40 @@ cdef class CsvFileFormat(FileFormat): return f"" +# From io.pxi +def py_buffer(object obj): + """ + Construct an Arrow buffer from a Python bytes-like or buffer-like object + + Parameters + ---------- + obj : object + the object from which the buffer should be constructed. + """ + cdef shared_ptr[CBuffer] buf + buf = GetResultValue(PyBuffer.FromPyObject(obj)) + return pyarrow_wrap_buffer(buf) + + +# From io.pxi +cdef void _cb_transform(transform_func, const shared_ptr[CBuffer]& src, + shared_ptr[CBuffer]* dest) except *: + py_dest = transform_func(pyarrow_wrap_buffer(src)) + dest[0] = pyarrow_unwrap_buffer(py_buffer(py_dest)) + + +# from io.pxi +class Transcoder: + + def __init__(self, decoder, encoder): + self._decoder = decoder + self._encoder = encoder + + def __call__(self, buf): + final = len(buf) == 0 + return self._encoder.encode(self._decoder.decode(buf, final), final) + + cdef class CsvFragmentScanOptions(FragmentScanOptions): """ Scan-specific options for CSV fragments. @@ -1284,6 +1319,25 @@ cdef class CsvFragmentScanOptions(FragmentScanOptions): def read_options(self, ReadOptions read_options not None): self.csv_options.read_options = deref(read_options.options) + cdef transform_encoding(self, src_encoding): + cdef: + CTransformInputStreamVTable vtable + + src_codec = codecs.lookup(src_encoding) + dest_codec = codecs.lookup("utf8") + if src_codec.name == dest_codec.name: + # Avoid losing performance on no-op transcoding + # (encoding errors won't be detected) + return + + vtable.transform = _cb_transform + self.csv_options.stream_transform_func = makeStreamTransformFunc(move(vtable), + Transcoder(src_codec.incrementaldecoder(), + dest_codec.incrementalencoder())) + + def stream_transform_func(self, src_encoding): + self.transform_encoding(src_encoding) + def equals(self, CsvFragmentScanOptions other): return ( other and diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 2518e37ec6f..91c871b5c2c 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -422,7 +422,7 @@ def _ensure_single_source(path, filesystem=None): def _filesystem_dataset(source, schema=None, filesystem=None, partitioning=None, format=None, partition_base_dir=None, exclude_invalid_files=None, - selector_ignore_prefixes=None): + selector_ignore_prefixes=None, encoding='utf8'): """ Create a FileSystemDataset which can be used to build a Dataset. @@ -433,6 +433,9 @@ def _filesystem_dataset(source, schema=None, filesystem=None, FileSystemDataset """ format = _ensure_format(format or 'parquet') + if encoding != 'utf8': + format.default_fragment_scan_options.stream_transform_func(encoding) + partitioning = _ensure_partitioning(partitioning) if isinstance(source, (list, tuple)): @@ -539,7 +542,7 @@ def parquet_dataset(metadata_path, schema=None, filesystem=None, format=None, def dataset(source, schema=None, format=None, filesystem=None, partitioning=None, partition_base_dir=None, - exclude_invalid_files=None, ignore_prefixes=None): + exclude_invalid_files=None, ignore_prefixes=None, encoding='utf8'): """ Open a dataset. @@ -742,7 +745,8 @@ def dataset(source, schema=None, format=None, filesystem=None, format=format, partition_base_dir=partition_base_dir, exclude_invalid_files=exclude_invalid_files, - selector_ignore_prefixes=ignore_prefixes + selector_ignore_prefixes=ignore_prefixes, + encoding=encoding ) if _is_path_like(source): From f67590c87c7de7f647df832db83cde122b9afb12 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Thu, 28 Jul 2022 15:32:52 +0200 Subject: [PATCH 03/27] Added input stream wrapping to CsvFileFormat::CountRows too --- cpp/src/arrow/dataset/file_csv.cc | 9 ++++++++- cpp/src/arrow/python/io.cc | 16 ++++++++-------- cpp/src/arrow/python/io.h | 7 ++++--- 3 files changed, 20 insertions(+), 12 deletions(-) diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc index 3b9200c003b..4f36a349552 100644 --- a/cpp/src/arrow/dataset/file_csv.cc +++ b/cpp/src/arrow/dataset/file_csv.cc @@ -295,8 +295,15 @@ Future> CsvFileFormat::CountRows( return Future>::MakeFinished(util::nullopt); } auto self = checked_pointer_cast(shared_from_this()); + ARROW_ASSIGN_OR_RAISE( + auto fragment_scan_options, + GetFragmentScanOptions( + kCsvTypeName, options.get(), self->default_fragment_scan_options)); + auto read_options = fragment_scan_options->read_options; ARROW_ASSIGN_OR_RAISE(auto input, file->source().OpenCompressed()); - ARROW_ASSIGN_OR_RAISE(auto read_options, GetReadOptions(*self, options)); + if (fragment_scan_options->stream_transform_func) { + ARROW_ASSIGN_OR_RAISE(input, fragment_scan_options->stream_transform_func(input)); + } return csv::CountRowsAsync(options->io_context, std::move(input), ::arrow::internal::GetCpuThreadPool(), read_options, self->parse_options) diff --git a/cpp/src/arrow/python/io.cc b/cpp/src/arrow/python/io.cc index e80a803ba21..5937d8b3cfd 100644 --- a/cpp/src/arrow/python/io.cc +++ b/cpp/src/arrow/python/io.cc @@ -370,14 +370,14 @@ std::shared_ptr<::arrow::io::InputStream> MakeTransformInputStream( return std::make_shared(std::move(wrapped), std::move(transform)); } -std::function>(std::shared_ptr)> -makeStreamTransformFunc(TransformInputStreamVTable vtable, PyObject* handler) { - std::function>( - std::shared_ptr)> - func = [=](std::shared_ptr<::arrow::io::InputStream> wrapped) { - return MakeTransformInputStream(wrapped, vtable, handler); - }; - return func; +std::shared_ptr makeStreamTransformFunc(TransformInputStreamVTable vtable, + PyObject* handler) { + TransformInputStream::TransformFunc transform( + TransformFunctionWrapper{std::move(vtable.transform), handler}); + StreamWrapFunc func = [transform](std::shared_ptr<::arrow::io::InputStream> wrapped) { + return std::make_shared(wrapped, transform); + }; + return std::make_shared(func); } } // namespace py diff --git a/cpp/src/arrow/python/io.h b/cpp/src/arrow/python/io.h index 41563677a17..3afad7ec6ca 100644 --- a/cpp/src/arrow/python/io.h +++ b/cpp/src/arrow/python/io.h @@ -112,8 +112,9 @@ std::shared_ptr<::arrow::io::InputStream> MakeTransformInputStream( std::shared_ptr<::arrow::io::InputStream> wrapped, TransformInputStreamVTable vtable, PyObject* arg); -std::function>(std::shared_ptr)> -makeStreamTransformFunc(TransformInputStreamVTable vtable, PyObject* handler); - +using StreamWrapFunc = std::function>( + std::shared_ptr)>; +std::shared_ptr makeStreamTransformFunc(TransformInputStreamVTable vtable, + PyObject* handler); } // namespace py } // namespace arrow From a717ddc7dfcb509bb0cc6bbccfcd46ea156aa7fe Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Thu, 28 Jul 2022 21:41:08 +0200 Subject: [PATCH 04/27] Moved make_streamwrap_func into io.pxi, removed duplicated code --- python/pyarrow/_dataset.pyx | 54 +++------------------------- python/pyarrow/dataset.py | 3 +- python/pyarrow/includes/libarrow.pxd | 8 ++--- python/pyarrow/io.pxi | 27 ++++++++++++++ python/pyarrow/lib.pxd | 3 ++ 5 files changed, 39 insertions(+), 56 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index d505f638556..9e471630548 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -1238,40 +1238,6 @@ cdef class CsvFileFormat(FileFormat): return f"" -# From io.pxi -def py_buffer(object obj): - """ - Construct an Arrow buffer from a Python bytes-like or buffer-like object - - Parameters - ---------- - obj : object - the object from which the buffer should be constructed. - """ - cdef shared_ptr[CBuffer] buf - buf = GetResultValue(PyBuffer.FromPyObject(obj)) - return pyarrow_wrap_buffer(buf) - - -# From io.pxi -cdef void _cb_transform(transform_func, const shared_ptr[CBuffer]& src, - shared_ptr[CBuffer]* dest) except *: - py_dest = transform_func(pyarrow_wrap_buffer(src)) - dest[0] = pyarrow_unwrap_buffer(py_buffer(py_dest)) - - -# from io.pxi -class Transcoder: - - def __init__(self, decoder, encoder): - self._decoder = decoder - self._encoder = encoder - - def __call__(self, buf): - final = len(buf) == 0 - return self._encoder.encode(self._decoder.decode(buf, final), final) - - cdef class CsvFragmentScanOptions(FragmentScanOptions): """ Scan-specific options for CSV fragments. @@ -1319,24 +1285,12 @@ cdef class CsvFragmentScanOptions(FragmentScanOptions): def read_options(self, ReadOptions read_options not None): self.csv_options.read_options = deref(read_options.options) - cdef transform_encoding(self, src_encoding): - cdef: - CTransformInputStreamVTable vtable - - src_codec = codecs.lookup(src_encoding) - dest_codec = codecs.lookup("utf8") - if src_codec.name == dest_codec.name: - # Avoid losing performance on no-op transcoding - # (encoding errors won't be detected) + def add_transcoder(self, src_encoding, dest_encoding): + if src_encoding == dest_encoding: return - vtable.transform = _cb_transform - self.csv_options.stream_transform_func = makeStreamTransformFunc(move(vtable), - Transcoder(src_codec.incrementaldecoder(), - dest_codec.incrementalencoder())) - - def stream_transform_func(self, src_encoding): - self.transform_encoding(src_encoding) + self.csv_options.stream_transform_func = deref( + make_streamwrap_func(src_encoding, dest_encoding)) def equals(self, CsvFragmentScanOptions other): return ( diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 91c871b5c2c..f0afa4aca73 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -433,8 +433,7 @@ def _filesystem_dataset(source, schema=None, filesystem=None, FileSystemDataset """ format = _ensure_format(format or 'parquet') - if encoding != 'utf8': - format.default_fragment_scan_options.stream_transform_func(encoding) + format.default_fragment_scan_options.add_transcoder(encoding, "utf8") partitioning = _ensure_partitioning(partitioning) diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 0b1303aa80c..2875ec1f83b 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -1208,6 +1208,9 @@ cdef extern from "arrow/builder.h" namespace "arrow" nogil: ctypedef void CallbackTransform(object, const shared_ptr[CBuffer]& src, shared_ptr[CBuffer]* dest) +ctypedef CResult[shared_ptr[CInputStream]] StreamWrapFunc( + shared_ptr[CInputStream]) + cdef extern from "arrow/util/cancel.h" namespace "arrow" nogil: cdef cppclass CStopToken "arrow::StopToken": @@ -1369,10 +1372,7 @@ cdef extern from "arrow/io/api.h" namespace "arrow::io" nogil: shared_ptr[CInputStream] wrapped, CTransformInputStreamVTable vtable, object method_arg) - ctypedef CResult[shared_ptr[CInputStream]] StreamWrapFunc( - shared_ptr[CInputStream]) - - function[StreamWrapFunc] makeStreamTransformFunc \ + shared_ptr[function[StreamWrapFunc]] makeStreamTransformFunc \ "arrow::py::makeStreamTransformFunc"( CTransformInputStreamVTable vtable, object method_arg) diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi index d2e4f7062e6..5069e3b7375 100644 --- a/python/pyarrow/io.pxi +++ b/python/pyarrow/io.pxi @@ -1547,6 +1547,33 @@ class Transcoder: return self._encoder.encode(self._decoder.decode(buf, final), final) +cdef shared_ptr[function[StreamWrapFunc]] make_streamwrap_func( + src_encoding, dest_encoding) except *: + """ + Create a function that will add a transcoding transformation to a stream. + Data from that stream will be decoded according to ``src_encoding`` and + then re-encoded according to ``dest_encoding``. + The created function can be used to wrap streams once they are created. + + Parameters + ---------- + src_encoding : str + The codec to use when reading data data. + dest_encoding : str + The codec to use for emitted data. + """ + cdef: + shared_ptr[function[StreamWrapFunc]] empty_func + CTransformInputStreamVTable vtable + + vtable.transform = _cb_transform + src_codec = codecs.lookup(src_encoding) + dest_codec = codecs.lookup(dest_encoding) + return makeStreamTransformFunc(move(vtable), + Transcoder(src_codec.incrementaldecoder(), + dest_codec.incrementalencoder())) + + def transcoding_input_stream(stream, src_encoding, dest_encoding): """ Add a transcoding transformation to the stream. diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd index 953b0e7b518..445e1132e73 100644 --- a/python/pyarrow/lib.pxd +++ b/python/pyarrow/lib.pxd @@ -536,6 +536,9 @@ cdef shared_ptr[CInputStream] native_transcoding_input_stream( shared_ptr[CInputStream] stream, src_encoding, dest_encoding) except * +cdef shared_ptr[function[StreamWrapFunc]] make_streamwrap_func( + src_encoding, dest_encoding) except * + # Default is allow_none=False cpdef DataType ensure_type(object type, bint allow_none=*) From 9284eb6b89d7f2d7ed86c2d909c0948ba4805213 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Thu, 28 Jul 2022 21:54:25 +0200 Subject: [PATCH 05/27] Use UpperCamelCase in function name --- cpp/src/arrow/python/io.cc | 2 +- cpp/src/arrow/python/io.h | 2 +- python/pyarrow/io.pxi | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/cpp/src/arrow/python/io.cc b/cpp/src/arrow/python/io.cc index 5937d8b3cfd..c621d16427b 100644 --- a/cpp/src/arrow/python/io.cc +++ b/cpp/src/arrow/python/io.cc @@ -370,7 +370,7 @@ std::shared_ptr<::arrow::io::InputStream> MakeTransformInputStream( return std::make_shared(std::move(wrapped), std::move(transform)); } -std::shared_ptr makeStreamTransformFunc(TransformInputStreamVTable vtable, +std::shared_ptr MakeStreamTransformFunc(TransformInputStreamVTable vtable, PyObject* handler) { TransformInputStream::TransformFunc transform( TransformFunctionWrapper{std::move(vtable.transform), handler}); diff --git a/cpp/src/arrow/python/io.h b/cpp/src/arrow/python/io.h index 3afad7ec6ca..13f9650f9a7 100644 --- a/cpp/src/arrow/python/io.h +++ b/cpp/src/arrow/python/io.h @@ -114,7 +114,7 @@ std::shared_ptr<::arrow::io::InputStream> MakeTransformInputStream( using StreamWrapFunc = std::function>( std::shared_ptr)>; -std::shared_ptr makeStreamTransformFunc(TransformInputStreamVTable vtable, +std::shared_ptr MakeStreamTransformFunc(TransformInputStreamVTable vtable, PyObject* handler); } // namespace py } // namespace arrow diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi index 5069e3b7375..609a4b0b679 100644 --- a/python/pyarrow/io.pxi +++ b/python/pyarrow/io.pxi @@ -1569,7 +1569,7 @@ cdef shared_ptr[function[StreamWrapFunc]] make_streamwrap_func( vtable.transform = _cb_transform src_codec = codecs.lookup(src_encoding) dest_codec = codecs.lookup(dest_encoding) - return makeStreamTransformFunc(move(vtable), + return MakeStreamTransformFunc(move(vtable), Transcoder(src_codec.incrementaldecoder(), dest_codec.incrementalencoder())) From b2061bff300e0e5c39a6b0da29a77f8fef466386 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Thu, 28 Jul 2022 22:32:26 +0200 Subject: [PATCH 06/27] Additional name change, formatting fix --- python/pyarrow/includes/libarrow.pxd | 4 ++-- python/pyarrow/lib.pxd | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 2875ec1f83b..6a83f8516e5 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -1372,8 +1372,8 @@ cdef extern from "arrow/io/api.h" namespace "arrow::io" nogil: shared_ptr[CInputStream] wrapped, CTransformInputStreamVTable vtable, object method_arg) - shared_ptr[function[StreamWrapFunc]] makeStreamTransformFunc \ - "arrow::py::makeStreamTransformFunc"( + shared_ptr[function[StreamWrapFunc]] MakeStreamTransformFunc \ + "arrow::py::MakeStreamTransformFunc"( CTransformInputStreamVTable vtable, object method_arg) diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd index 445e1132e73..67db3d2ffb8 100644 --- a/python/pyarrow/lib.pxd +++ b/python/pyarrow/lib.pxd @@ -537,7 +537,7 @@ cdef shared_ptr[CInputStream] native_transcoding_input_stream( dest_encoding) except * cdef shared_ptr[function[StreamWrapFunc]] make_streamwrap_func( - src_encoding, dest_encoding) except * + src_encoding, dest_encoding) except * # Default is allow_none=False cpdef DataType ensure_type(object type, bint allow_none=*) From bbdaa07161bc81ccf9105399fd99414dd6bda301 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 29 Jul 2022 14:25:45 +0200 Subject: [PATCH 07/27] Use GetReadOptions(), because it overrides the use_threads field --- cpp/src/arrow/dataset/file_csv.cc | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc index 4f36a349552..780f845429b 100644 --- a/cpp/src/arrow/dataset/file_csv.cc +++ b/cpp/src/arrow/dataset/file_csv.cc @@ -187,7 +187,7 @@ static inline Future> OpenReaderAsync( auto fragment_scan_options, GetFragmentScanOptions( kCsvTypeName, scan_options.get(), format.default_fragment_scan_options)); - auto reader_options = fragment_scan_options->read_options; + ARROW_ASSIGN_OR_RAISE(auto reader_options, GetReadOptions(format, scan_options)); ARROW_ASSIGN_OR_RAISE(auto input, source.OpenCompressed()); if (fragment_scan_options->stream_transform_func) { ARROW_ASSIGN_OR_RAISE(input, fragment_scan_options->stream_transform_func(input)); @@ -299,7 +299,7 @@ Future> CsvFileFormat::CountRows( auto fragment_scan_options, GetFragmentScanOptions( kCsvTypeName, options.get(), self->default_fragment_scan_options)); - auto read_options = fragment_scan_options->read_options; + ARROW_ASSIGN_OR_RAISE(auto read_options, GetReadOptions(*self, options)); ARROW_ASSIGN_OR_RAISE(auto input, file->source().OpenCompressed()); if (fragment_scan_options->stream_transform_func) { ARROW_ASSIGN_OR_RAISE(input, fragment_scan_options->stream_transform_func(input)); From c9591127ff208f3bcd8106fdd89f9c9cb01a9c9d Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 29 Jul 2022 14:31:07 +0200 Subject: [PATCH 08/27] Only add a transcoder for csv files --- python/pyarrow/dataset.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index f0afa4aca73..ea24b79b516 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -433,7 +433,8 @@ def _filesystem_dataset(source, schema=None, filesystem=None, FileSystemDataset """ format = _ensure_format(format or 'parquet') - format.default_fragment_scan_options.add_transcoder(encoding, "utf8") + if format.default_fragment_scan_options.type_name == 'csv': + format.default_fragment_scan_options.add_transcoder(encoding, "utf8") partitioning = _ensure_partitioning(partitioning) From 7fceb84d7f2125a858f25c8346719ac2324be855 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 29 Jul 2022 14:35:54 +0200 Subject: [PATCH 09/27] Processed some review comments regarding documentation --- python/pyarrow/io.pxi | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi index 609a4b0b679..716a23721c5 100644 --- a/python/pyarrow/io.pxi +++ b/python/pyarrow/io.pxi @@ -1553,12 +1553,12 @@ cdef shared_ptr[function[StreamWrapFunc]] make_streamwrap_func( Create a function that will add a transcoding transformation to a stream. Data from that stream will be decoded according to ``src_encoding`` and then re-encoded according to ``dest_encoding``. - The created function can be used to wrap streams once they are created. + The created function can be used to wrap streams. Parameters ---------- src_encoding : str - The codec to use when reading data data. + The codec to use when reading data. dest_encoding : str The codec to use for emitted data. """ @@ -1585,7 +1585,7 @@ def transcoding_input_stream(stream, src_encoding, dest_encoding): stream : NativeFile The stream to which the transformation should be applied. src_encoding : str - The codec to use when reading data data. + The codec to use when reading data. dest_encoding : str The codec to use for emitted data. """ From 401f67d819cbcc309144e21d96934011792f1bfa Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 29 Jul 2022 15:41:27 +0200 Subject: [PATCH 10/27] Moved encoding parameter from dataset() into CsvFileFormat --- python/pyarrow/_dataset.pyx | 8 ++++++-- python/pyarrow/dataset.py | 7 ++++--- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 9e471630548..645a80aa3e9 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -1172,6 +1172,7 @@ cdef class CsvFileFormat(FileFormat): """ cdef: CCsvFileFormat* csv_format + public object encoding # Avoid mistakingly creating attributes __slots__ = () @@ -1179,7 +1180,7 @@ cdef class CsvFileFormat(FileFormat): def __init__(self, ParseOptions parse_options=None, default_fragment_scan_options=None, ConvertOptions convert_options=None, - ReadOptions read_options=None): + ReadOptions read_options=None, encoding=None): self.init(shared_ptr[CFileFormat](new CCsvFileFormat())) if parse_options is not None: self.parse_options = parse_options @@ -1199,6 +1200,8 @@ cdef class CsvFileFormat(FileFormat): raise TypeError('`default_fragment_scan_options` must be either ' 'a dictionary or an instance of ' 'CsvFragmentScanOptions') + # Python-specific option + self.encoding = encoding cdef void init(self, const shared_ptr[CFileFormat]& sp): FileFormat.init(self, sp) @@ -1228,7 +1231,8 @@ cdef class CsvFileFormat(FileFormat): return ( self.parse_options.equals(other.parse_options) and self.default_fragment_scan_options == - other.default_fragment_scan_options) + other.default_fragment_scan_options and + self.encoding == other.encoding) def __reduce__(self): return CsvFileFormat, (self.parse_options, diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index ea24b79b516..4959cff2881 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -422,7 +422,7 @@ def _ensure_single_source(path, filesystem=None): def _filesystem_dataset(source, schema=None, filesystem=None, partitioning=None, format=None, partition_base_dir=None, exclude_invalid_files=None, - selector_ignore_prefixes=None, encoding='utf8'): + selector_ignore_prefixes=None): """ Create a FileSystemDataset which can be used to build a Dataset. @@ -434,7 +434,8 @@ def _filesystem_dataset(source, schema=None, filesystem=None, """ format = _ensure_format(format or 'parquet') if format.default_fragment_scan_options.type_name == 'csv': - format.default_fragment_scan_options.add_transcoder(encoding, "utf8") + format.default_fragment_scan_options.add_transcoder(format.encoding, + "utf8") partitioning = _ensure_partitioning(partitioning) @@ -542,7 +543,7 @@ def parquet_dataset(metadata_path, schema=None, filesystem=None, format=None, def dataset(source, schema=None, format=None, filesystem=None, partitioning=None, partition_base_dir=None, - exclude_invalid_files=None, ignore_prefixes=None, encoding='utf8'): + exclude_invalid_files=None, ignore_prefixes=None): """ Open a dataset. From 9816e46690ee87a2c46a66eb279908eb125d9a5b Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 29 Jul 2022 15:43:49 +0200 Subject: [PATCH 11/27] Removed a left-over occurrence of the workaround encoding parameter --- python/pyarrow/dataset.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 4959cff2881..973bfbe7c4f 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -746,8 +746,7 @@ def dataset(source, schema=None, format=None, filesystem=None, format=format, partition_base_dir=partition_base_dir, exclude_invalid_files=exclude_invalid_files, - selector_ignore_prefixes=ignore_prefixes, - encoding=encoding + selector_ignore_prefixes=ignore_prefixes ) if _is_path_like(source): From 888b3e8a4127b4e4362cf78083064624c5879eaf Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 29 Jul 2022 15:45:33 +0200 Subject: [PATCH 12/27] Setting default encoding to utf8 --- python/pyarrow/_dataset.pyx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 645a80aa3e9..290520bbda2 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -1180,7 +1180,7 @@ cdef class CsvFileFormat(FileFormat): def __init__(self, ParseOptions parse_options=None, default_fragment_scan_options=None, ConvertOptions convert_options=None, - ReadOptions read_options=None, encoding=None): + ReadOptions read_options=None, encoding='utf8'): self.init(shared_ptr[CFileFormat](new CCsvFileFormat())) if parse_options is not None: self.parse_options = parse_options From 0f67e8cf4b6b77bcfc9db878ee9574f4a816697e Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 29 Jul 2022 16:17:20 +0200 Subject: [PATCH 13/27] Always creating a default_fragment_scan_options --- python/pyarrow/_dataset.pyx | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 290520bbda2..8c59874ed20 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -1180,7 +1180,8 @@ cdef class CsvFileFormat(FileFormat): def __init__(self, ParseOptions parse_options=None, default_fragment_scan_options=None, ConvertOptions convert_options=None, - ReadOptions read_options=None, encoding='utf8'): + ReadOptions read_options=None, + encoding='utf8'): self.init(shared_ptr[CFileFormat](new CCsvFileFormat())) if parse_options is not None: self.parse_options = parse_options @@ -1200,6 +1201,9 @@ cdef class CsvFileFormat(FileFormat): raise TypeError('`default_fragment_scan_options` must be either ' 'a dictionary or an instance of ' 'CsvFragmentScanOptions') + else : + # default_fragment_scan_options is needed to add a transcoder + self.default_fragment_scan_options = CsvFragmentScanOptions() # Python-specific option self.encoding = encoding From 47c536d8aeeaec97ce816734801cb154693cf00e Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 29 Jul 2022 16:18:01 +0200 Subject: [PATCH 14/27] Using a different way of checking the file format --- python/pyarrow/dataset.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 973bfbe7c4f..346b0f65db2 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -433,7 +433,7 @@ def _filesystem_dataset(source, schema=None, filesystem=None, FileSystemDataset """ format = _ensure_format(format or 'parquet') - if format.default_fragment_scan_options.type_name == 'csv': + if isinstance(format, CsvFileFormat): format.default_fragment_scan_options.add_transcoder(format.encoding, "utf8") From a539dd09746fd30f690d967c5a566758f1562633 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 29 Jul 2022 16:23:34 +0200 Subject: [PATCH 15/27] Added documentation about added encoding parameter --- python/pyarrow/_dataset.pyx | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 8c59874ed20..1839c4653e5 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -1169,6 +1169,9 @@ cdef class CsvFileFormat(FileFormat): General read options. default_fragment_scan_options : CsvFragmentScanOptions Default options for fragments scan. + encoding : str, optional (default 'utf8') + The character encoding of the CSV data. Columns that cannot + decode using this encoding can still be read as Binary. """ cdef: CCsvFileFormat* csv_format From 22eff73806c8511288c16d87cbf64da26a7d941e Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Fri, 29 Jul 2022 17:06:58 +0200 Subject: [PATCH 16/27] Implementaed an alternative way of passing the encoding. Instead of duplicating the encoding field in the CsvFileFormat, we store the encoding in a private field in the CsvFragmentScanOptions. In that class, the read_options.encoding field gets lost when initializing it by using the C struct (which doesn't have the encoding field). So when the read_options are read, we restore it again. --- python/pyarrow/_dataset.pyx | 29 +++++++++++++---------------- python/pyarrow/dataset.py | 3 ++- 2 files changed, 15 insertions(+), 17 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 1839c4653e5..010e479c3ec 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -1169,13 +1169,9 @@ cdef class CsvFileFormat(FileFormat): General read options. default_fragment_scan_options : CsvFragmentScanOptions Default options for fragments scan. - encoding : str, optional (default 'utf8') - The character encoding of the CSV data. Columns that cannot - decode using this encoding can still be read as Binary. """ cdef: CCsvFileFormat* csv_format - public object encoding # Avoid mistakingly creating attributes __slots__ = () @@ -1183,8 +1179,7 @@ cdef class CsvFileFormat(FileFormat): def __init__(self, ParseOptions parse_options=None, default_fragment_scan_options=None, ConvertOptions convert_options=None, - ReadOptions read_options=None, - encoding='utf8'): + ReadOptions read_options=None): self.init(shared_ptr[CFileFormat](new CCsvFileFormat())) if parse_options is not None: self.parse_options = parse_options @@ -1207,8 +1202,6 @@ cdef class CsvFileFormat(FileFormat): else : # default_fragment_scan_options is needed to add a transcoder self.default_fragment_scan_options = CsvFragmentScanOptions() - # Python-specific option - self.encoding = encoding cdef void init(self, const shared_ptr[CFileFormat]& sp): FileFormat.init(self, sp) @@ -1238,8 +1231,7 @@ cdef class CsvFileFormat(FileFormat): return ( self.parse_options.equals(other.parse_options) and self.default_fragment_scan_options == - other.default_fragment_scan_options and - self.encoding == other.encoding) + other.default_fragment_scan_options) def __reduce__(self): return CsvFileFormat, (self.parse_options, @@ -1263,6 +1255,7 @@ cdef class CsvFragmentScanOptions(FragmentScanOptions): cdef: CCsvFragmentScanOptions* csv_options + object encoding # Avoid mistakingly creating attributes __slots__ = () @@ -1275,6 +1268,7 @@ cdef class CsvFragmentScanOptions(FragmentScanOptions): self.convert_options = convert_options if read_options is not None: self.read_options = read_options + self.encoding = read_options.encoding cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp): FragmentScanOptions.init(self, sp) @@ -1290,18 +1284,21 @@ cdef class CsvFragmentScanOptions(FragmentScanOptions): @property def read_options(self): - return ReadOptions.wrap(self.csv_options.read_options) + read_options = ReadOptions.wrap(self.csv_options.read_options) + if self.encoding is not None: + # The encoding field in ReadOptions does not exist + # in the C struct. Re-set it here. + read_options.encoding = self.encoding + return read_options @read_options.setter def read_options(self, ReadOptions read_options not None): self.csv_options.read_options = deref(read_options.options) def add_transcoder(self, src_encoding, dest_encoding): - if src_encoding == dest_encoding: - return - - self.csv_options.stream_transform_func = deref( - make_streamwrap_func(src_encoding, dest_encoding)) + if src_encoding != dest_encoding: + self.csv_options.stream_transform_func = deref( + make_streamwrap_func(src_encoding, dest_encoding)) def equals(self, CsvFragmentScanOptions other): return ( diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 346b0f65db2..3ed38d59e2c 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -434,7 +434,8 @@ def _filesystem_dataset(source, schema=None, filesystem=None, """ format = _ensure_format(format or 'parquet') if isinstance(format, CsvFileFormat): - format.default_fragment_scan_options.add_transcoder(format.encoding, + format.default_fragment_scan_options.add_transcoder( + format.default_fragment_scan_options.read_options.encoding, "utf8") partitioning = _ensure_partitioning(partitioning) From 4d9802b60480180e8deab6dd162a72d837cccdeb Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Mon, 1 Aug 2022 08:51:08 +0200 Subject: [PATCH 17/27] Added python-specific encoding field to CsvFileformat. It needs to be stored in both CsvFileFormat and CsvFragmentScanOptions because if the user has a reference to these separate objects, they would otherwise become inconsistent. 1 would report the default 'utf8' (forgetting the user's encoding choice), while the other would still properly report the requested encoding. To the user it would be unclear which of these values would be eventually used by the transcoding. --- python/pyarrow/_dataset.pyx | 36 +++++++++++++++++++++++++++--------- python/pyarrow/dataset.py | 4 ++-- 2 files changed, 29 insertions(+), 11 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 010e479c3ec..0b32f971a99 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -832,8 +832,13 @@ cdef class FileFormat(_Weakrefable): @property def default_fragment_scan_options(self): - return FragmentScanOptions.wrap( + dfso = FragmentScanOptions.wrap( self.wrapped.get().default_fragment_scan_options) + # CsvFileFormat stores a Python-specific encoding field that needs + # to be restored because it does not exist in the C struct + if self.encoding is not None: + dfso.encoding = self.encoding + return dfso @default_fragment_scan_options.setter def default_fragment_scan_options(self, FragmentScanOptions options): @@ -1172,6 +1177,10 @@ cdef class CsvFileFormat(FileFormat): """ cdef: CCsvFileFormat* csv_format + # The encoding field in ReadOptions does not exist + # in the C struct. We need to store it here and override it when + # reading read_options or default_fragment_scan_options + public object encoding # Avoid mistakingly creating attributes __slots__ = () @@ -1199,9 +1208,12 @@ cdef class CsvFileFormat(FileFormat): raise TypeError('`default_fragment_scan_options` must be either ' 'a dictionary or an instance of ' 'CsvFragmentScanOptions') - else : + else: # default_fragment_scan_options is needed to add a transcoder self.default_fragment_scan_options = CsvFragmentScanOptions() + if read_options is not None: + self.default_fragment_scan_options.encoding = read_options.encoding + self.encoding = read_options.encoding cdef void init(self, const shared_ptr[CFileFormat]& sp): FileFormat.init(self, sp) @@ -1224,6 +1236,8 @@ cdef class CsvFileFormat(FileFormat): cdef _set_default_fragment_scan_options(self, FragmentScanOptions options): if options.type_name == 'csv': self.csv_format.default_fragment_scan_options = options.wrapped + self.default_fragment_scan_options.encoding = options.encoding + self.encoding = options.encoding else: super()._set_default_fragment_scan_options(options) @@ -1255,17 +1269,21 @@ cdef class CsvFragmentScanOptions(FragmentScanOptions): cdef: CCsvFragmentScanOptions* csv_options - object encoding + # The encoding field in ReadOptions does not exist + # in the C struct. We need to store it here and override it when + # reading read_options + public object encoding # Avoid mistakingly creating attributes __slots__ = () def __init__(self, ConvertOptions convert_options=None, - ReadOptions read_options=None): + ReadOptions read_options=None, encoding='utf8'): self.init(shared_ptr[CFragmentScanOptions]( new CCsvFragmentScanOptions())) if convert_options is not None: self.convert_options = convert_options + self.encoding = encoding if read_options is not None: self.read_options = read_options self.encoding = read_options.encoding @@ -1286,29 +1304,29 @@ cdef class CsvFragmentScanOptions(FragmentScanOptions): def read_options(self): read_options = ReadOptions.wrap(self.csv_options.read_options) if self.encoding is not None: - # The encoding field in ReadOptions does not exist - # in the C struct. Re-set it here. read_options.encoding = self.encoding return read_options @read_options.setter def read_options(self, ReadOptions read_options not None): self.csv_options.read_options = deref(read_options.options) + self.encoding = read_options.encoding def add_transcoder(self, src_encoding, dest_encoding): if src_encoding != dest_encoding: self.csv_options.stream_transform_func = deref( - make_streamwrap_func(src_encoding, dest_encoding)) + make_streamwrap_func(src_encoding, dest_encoding)) def equals(self, CsvFragmentScanOptions other): return ( other and self.convert_options.equals(other.convert_options) and - self.read_options.equals(other.read_options)) + self.read_options.equals(other.read_options) and + self.encoding == other.encoding) def __reduce__(self): return CsvFragmentScanOptions, (self.convert_options, - self.read_options) + self.read_options, self.encoding) cdef class CsvFileWriteOptions(FileWriteOptions): diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 3ed38d59e2c..1833f45bcfd 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -435,8 +435,8 @@ def _filesystem_dataset(source, schema=None, filesystem=None, format = _ensure_format(format or 'parquet') if isinstance(format, CsvFileFormat): format.default_fragment_scan_options.add_transcoder( - format.default_fragment_scan_options.read_options.encoding, - "utf8") + format.default_fragment_scan_options.read_options.encoding, + "utf8") partitioning = _ensure_partitioning(partitioning) From 4d819aa9e01520911b250a6159e8f1815dc47839 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Mon, 1 Aug 2022 14:59:46 +0200 Subject: [PATCH 18/27] Removed encoding from CsvFragmentScanOptions --- python/pyarrow/_dataset.pyx | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 0b32f971a99..a8cf0c5e08a 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -1321,8 +1321,7 @@ cdef class CsvFragmentScanOptions(FragmentScanOptions): return ( other and self.convert_options.equals(other.convert_options) and - self.read_options.equals(other.read_options) and - self.encoding == other.encoding) + self.read_options.equals(other.read_options)) def __reduce__(self): return CsvFragmentScanOptions, (self.convert_options, From 1534bd1ff75e89dcf4c7f7e9eb2a3e09dd2ffd73 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Mon, 1 Aug 2022 16:24:28 +0200 Subject: [PATCH 19/27] Added transcoding functionality by dlopening libiconv --- cpp/src/arrow/csv/options.h | 3 ++ cpp/src/arrow/python/io.cc | 47 ++++++++++++++++++++++++++++ cpp/src/arrow/python/io.h | 3 ++ python/pyarrow/_csv.pxd | 1 - python/pyarrow/_csv.pyx | 13 ++++++-- python/pyarrow/_dataset.pyx | 37 +++++----------------- python/pyarrow/dataset.py | 2 +- python/pyarrow/includes/libarrow.pxd | 6 ++++ 8 files changed, 79 insertions(+), 33 deletions(-) diff --git a/cpp/src/arrow/csv/options.h b/cpp/src/arrow/csv/options.h index 7723dcedc61..c5fd2c65c1a 100644 --- a/cpp/src/arrow/csv/options.h +++ b/cpp/src/arrow/csv/options.h @@ -163,6 +163,9 @@ struct ARROW_EXPORT ReadOptions { /// If false, column names will be read from the first CSV row after `skip_rows`. bool autogenerate_column_names = false; + /// Character encoding used + std::string encoding = "UTF-8"; + /// Create read options with default values static ReadOptions Defaults(); diff --git a/cpp/src/arrow/python/io.cc b/cpp/src/arrow/python/io.cc index c621d16427b..eebe672b414 100644 --- a/cpp/src/arrow/python/io.cc +++ b/cpp/src/arrow/python/io.cc @@ -19,6 +19,7 @@ #include #include +#include #include #include #include @@ -380,5 +381,51 @@ std::shared_ptr MakeStreamTransformFunc(TransformInputStreamVTab return std::make_shared(func); } +#if defined(__MACH__) +#define ICONV_LIB ".dylib" +#else +#define ICONV_LIB ".so" +#endif +typedef void *iconv_t; +iconv_t (*iconv_open) (const char*, const char*); +size_t (*iconv) (iconv_t, const char* *, size_t *, char* *, size_t *); + +std::shared_ptr MakeStreamTransformLibFunc(std::string encoding, std::string lib_name) { + auto empty = std::make_shared(); + + if (lib_name == "libiconv") { + printf("Adding Transcoder using libiconv\n"); + auto handle = dlopen(ICONV_LIB, RTLD_NOW); + if (handle == nullptr) { + return empty; + } + *reinterpret_cast((&iconv_open)) = dlsym(handle, "iconv_open"); + *reinterpret_cast((&iconv)) = dlsym(handle, "iconv"); + + // This lambda will be called to wrap each input stream when it is created + StreamWrapFunc func = [=](std::shared_ptr<::arrow::io::InputStream> wrapped) { + // Open a transcoder + auto conv_state = iconv_open("UTF-8", encoding.c_str()); + + // This lambda will be called each time data is read from a stream + TransformInputStream::TransformFunc transform = [=](const std::shared_ptr& in_buf) -> Result>{ + const char *in_data_ptr = reinterpret_cast(in_buf->data()); + size_t in_size = in_buf->size(); + ARROW_ASSIGN_OR_RAISE(auto out_buf, AllocateResizableBuffer(in_buf->capacity())); + char *out_data_ptr = reinterpret_cast(out_buf->mutable_data()); + size_t out_size = out_buf->capacity(); + iconv(conv_state, &in_data_ptr, &in_size, &out_data_ptr, &out_size); + return std::shared_ptr(std::move(out_buf)); + }; + + return std::make_shared(wrapped, transform); + }; + return std::make_shared(func); + + } else { + return empty; + } +} + } // namespace py } // namespace arrow diff --git a/cpp/src/arrow/python/io.h b/cpp/src/arrow/python/io.h index 13f9650f9a7..4733aa1b1ed 100644 --- a/cpp/src/arrow/python/io.h +++ b/cpp/src/arrow/python/io.h @@ -116,5 +116,8 @@ using StreamWrapFunc = std::function>( std::shared_ptr)>; std::shared_ptr MakeStreamTransformFunc(TransformInputStreamVTable vtable, PyObject* handler); + +std::shared_ptr MakeStreamTransformLibFunc(std::string encoding, std::string lib_name); + } // namespace py } // namespace arrow diff --git a/python/pyarrow/_csv.pxd b/python/pyarrow/_csv.pxd index dcc562a41c7..a7d63e548bf 100644 --- a/python/pyarrow/_csv.pxd +++ b/python/pyarrow/_csv.pxd @@ -41,7 +41,6 @@ cdef class ParseOptions(_Weakrefable): cdef class ReadOptions(_Weakrefable): cdef: unique_ptr[CCSVReadOptions] options - public object encoding @staticmethod cdef ReadOptions wrap(CCSVReadOptions options) diff --git a/python/pyarrow/_csv.pyx b/python/pyarrow/_csv.pyx index d1db03c75f1..d943cce137f 100644 --- a/python/pyarrow/_csv.pyx +++ b/python/pyarrow/_csv.pyx @@ -201,7 +201,6 @@ cdef class ReadOptions(_Weakrefable): self.column_names = column_names if autogenerate_column_names is not None: self.autogenerate_column_names= autogenerate_column_names - # Python-specific option self.encoding = encoding if skip_rows_after_names is not None: self.skip_rows_after_names = skip_rows_after_names @@ -287,6 +286,17 @@ cdef class ReadOptions(_Weakrefable): @skip_rows_after_names.setter def skip_rows_after_names(self, value): deref(self.options).skip_rows_after_names = value + + @property + def encoding(self): + """ + Character encoding used for this input. Default UTF-8". + """ + return deref(self.options).encoding + + @encoding.setter + def encoding(self, value): + deref(self.options).encoding = tobytes(value) def validate(self): check_status(deref(self.options).Validate()) @@ -307,7 +317,6 @@ cdef class ReadOptions(_Weakrefable): cdef ReadOptions wrap(CCSVReadOptions options): out = ReadOptions() out.options.reset(new CCSVReadOptions(move(options))) - out.encoding = 'utf8' # No way to know this return out def __getstate__(self): diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index a8cf0c5e08a..afe6993b85d 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -832,13 +832,8 @@ cdef class FileFormat(_Weakrefable): @property def default_fragment_scan_options(self): - dfso = FragmentScanOptions.wrap( + return FragmentScanOptions.wrap( self.wrapped.get().default_fragment_scan_options) - # CsvFileFormat stores a Python-specific encoding field that needs - # to be restored because it does not exist in the C struct - if self.encoding is not None: - dfso.encoding = self.encoding - return dfso @default_fragment_scan_options.setter def default_fragment_scan_options(self, FragmentScanOptions options): @@ -1177,10 +1172,6 @@ cdef class CsvFileFormat(FileFormat): """ cdef: CCsvFileFormat* csv_format - # The encoding field in ReadOptions does not exist - # in the C struct. We need to store it here and override it when - # reading read_options or default_fragment_scan_options - public object encoding # Avoid mistakingly creating attributes __slots__ = () @@ -1211,9 +1202,6 @@ cdef class CsvFileFormat(FileFormat): else: # default_fragment_scan_options is needed to add a transcoder self.default_fragment_scan_options = CsvFragmentScanOptions() - if read_options is not None: - self.default_fragment_scan_options.encoding = read_options.encoding - self.encoding = read_options.encoding cdef void init(self, const shared_ptr[CFileFormat]& sp): FileFormat.init(self, sp) @@ -1236,8 +1224,6 @@ cdef class CsvFileFormat(FileFormat): cdef _set_default_fragment_scan_options(self, FragmentScanOptions options): if options.type_name == 'csv': self.csv_format.default_fragment_scan_options = options.wrapped - self.default_fragment_scan_options.encoding = options.encoding - self.encoding = options.encoding else: super()._set_default_fragment_scan_options(options) @@ -1269,24 +1255,18 @@ cdef class CsvFragmentScanOptions(FragmentScanOptions): cdef: CCsvFragmentScanOptions* csv_options - # The encoding field in ReadOptions does not exist - # in the C struct. We need to store it here and override it when - # reading read_options - public object encoding # Avoid mistakingly creating attributes __slots__ = () def __init__(self, ConvertOptions convert_options=None, - ReadOptions read_options=None, encoding='utf8'): + ReadOptions read_options=None): self.init(shared_ptr[CFragmentScanOptions]( new CCsvFragmentScanOptions())) if convert_options is not None: self.convert_options = convert_options - self.encoding = encoding if read_options is not None: self.read_options = read_options - self.encoding = read_options.encoding cdef void init(self, const shared_ptr[CFragmentScanOptions]& sp): FragmentScanOptions.init(self, sp) @@ -1303,19 +1283,18 @@ cdef class CsvFragmentScanOptions(FragmentScanOptions): @property def read_options(self): read_options = ReadOptions.wrap(self.csv_options.read_options) - if self.encoding is not None: - read_options.encoding = self.encoding return read_options @read_options.setter def read_options(self, ReadOptions read_options not None): self.csv_options.read_options = deref(read_options.options) - self.encoding = read_options.encoding - def add_transcoder(self, src_encoding, dest_encoding): - if src_encoding != dest_encoding: + def add_transcoder(self, src_encoding, transcoding_library): + if src_encoding != 'utf8': self.csv_options.stream_transform_func = deref( - make_streamwrap_func(src_encoding, dest_encoding)) + #make_streamwrap_func(src_encoding, dest_encoding)) + MakeStreamTransformLibFunc(tobytes(src_encoding), + tobytes(transcoding_library))) def equals(self, CsvFragmentScanOptions other): return ( @@ -1325,7 +1304,7 @@ cdef class CsvFragmentScanOptions(FragmentScanOptions): def __reduce__(self): return CsvFragmentScanOptions, (self.convert_options, - self.read_options, self.encoding) + self.read_options) cdef class CsvFileWriteOptions(FileWriteOptions): diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 1833f45bcfd..d0040f50f5f 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -436,7 +436,7 @@ def _filesystem_dataset(source, schema=None, filesystem=None, if isinstance(format, CsvFileFormat): format.default_fragment_scan_options.add_transcoder( format.default_fragment_scan_options.read_options.encoding, - "utf8") + "libiconv") partitioning = _ensure_partitioning(partitioning) diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 6a83f8516e5..819209bb583 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -1377,6 +1377,11 @@ cdef extern from "arrow/io/api.h" namespace "arrow::io" nogil: CTransformInputStreamVTable vtable, object method_arg) + shared_ptr[function[StreamWrapFunc]] MakeStreamTransformLibFunc \ + "arrow::py::MakeStreamTransformLibFunc"( + c_string encoding, + c_string lib_name) + # ---------------------------------------------------------------------- # HDFS @@ -1731,6 +1736,7 @@ cdef extern from "arrow/csv/api.h" namespace "arrow::csv" nogil: int32_t skip_rows_after_names vector[c_string] column_names c_bool autogenerate_column_names + c_string encoding CCSVReadOptions() CCSVReadOptions(CCSVReadOptions&&) From d2c9a0678a7d334cfc711c4a3b7d5572ad18a6d6 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Tue, 2 Aug 2022 21:03:21 +0200 Subject: [PATCH 20/27] Removed encoding library wrapper code, now returning error if no transcoder was supplied --- cpp/src/arrow/dataset/file_csv.cc | 16 +++++++--- cpp/src/arrow/python/io.cc | 47 ---------------------------- cpp/src/arrow/python/io.h | 2 -- python/pyarrow/_csv.pyx | 4 +-- python/pyarrow/_dataset.pyx | 9 ++---- python/pyarrow/dataset.py | 5 ++- python/pyarrow/includes/libarrow.pxd | 5 --- 7 files changed, 19 insertions(+), 69 deletions(-) diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc index 780f845429b..4fa2dc63e59 100644 --- a/cpp/src/arrow/dataset/file_csv.cc +++ b/cpp/src/arrow/dataset/file_csv.cc @@ -189,8 +189,12 @@ static inline Future> OpenReaderAsync( kCsvTypeName, scan_options.get(), format.default_fragment_scan_options)); ARROW_ASSIGN_OR_RAISE(auto reader_options, GetReadOptions(format, scan_options)); ARROW_ASSIGN_OR_RAISE(auto input, source.OpenCompressed()); - if (fragment_scan_options->stream_transform_func) { - ARROW_ASSIGN_OR_RAISE(input, fragment_scan_options->stream_transform_func(input)); + if (reader_options.encoding != "UTF-8") { + if (fragment_scan_options->stream_transform_func) { + ARROW_ASSIGN_OR_RAISE(input, fragment_scan_options->stream_transform_func(input)); + } else { + return Status::Invalid("File encoding is not UTF-8, but no stream_transform_func has been provided."); + } } const auto& path = source.path(); ARROW_ASSIGN_OR_RAISE( @@ -301,8 +305,12 @@ Future> CsvFileFormat::CountRows( kCsvTypeName, options.get(), self->default_fragment_scan_options)); ARROW_ASSIGN_OR_RAISE(auto read_options, GetReadOptions(*self, options)); ARROW_ASSIGN_OR_RAISE(auto input, file->source().OpenCompressed()); - if (fragment_scan_options->stream_transform_func) { - ARROW_ASSIGN_OR_RAISE(input, fragment_scan_options->stream_transform_func(input)); + if (read_options.encoding != "UTF-8") { + if (fragment_scan_options->stream_transform_func) { + ARROW_ASSIGN_OR_RAISE(input, fragment_scan_options->stream_transform_func(input)); + } else { + return Status::Invalid("File encoding is not UTF-8, but no stream_transform_func has been provided."); + } } return csv::CountRowsAsync(options->io_context, std::move(input), ::arrow::internal::GetCpuThreadPool(), read_options, diff --git a/cpp/src/arrow/python/io.cc b/cpp/src/arrow/python/io.cc index eebe672b414..c621d16427b 100644 --- a/cpp/src/arrow/python/io.cc +++ b/cpp/src/arrow/python/io.cc @@ -19,7 +19,6 @@ #include #include -#include #include #include #include @@ -381,51 +380,5 @@ std::shared_ptr MakeStreamTransformFunc(TransformInputStreamVTab return std::make_shared(func); } -#if defined(__MACH__) -#define ICONV_LIB ".dylib" -#else -#define ICONV_LIB ".so" -#endif -typedef void *iconv_t; -iconv_t (*iconv_open) (const char*, const char*); -size_t (*iconv) (iconv_t, const char* *, size_t *, char* *, size_t *); - -std::shared_ptr MakeStreamTransformLibFunc(std::string encoding, std::string lib_name) { - auto empty = std::make_shared(); - - if (lib_name == "libiconv") { - printf("Adding Transcoder using libiconv\n"); - auto handle = dlopen(ICONV_LIB, RTLD_NOW); - if (handle == nullptr) { - return empty; - } - *reinterpret_cast((&iconv_open)) = dlsym(handle, "iconv_open"); - *reinterpret_cast((&iconv)) = dlsym(handle, "iconv"); - - // This lambda will be called to wrap each input stream when it is created - StreamWrapFunc func = [=](std::shared_ptr<::arrow::io::InputStream> wrapped) { - // Open a transcoder - auto conv_state = iconv_open("UTF-8", encoding.c_str()); - - // This lambda will be called each time data is read from a stream - TransformInputStream::TransformFunc transform = [=](const std::shared_ptr& in_buf) -> Result>{ - const char *in_data_ptr = reinterpret_cast(in_buf->data()); - size_t in_size = in_buf->size(); - ARROW_ASSIGN_OR_RAISE(auto out_buf, AllocateResizableBuffer(in_buf->capacity())); - char *out_data_ptr = reinterpret_cast(out_buf->mutable_data()); - size_t out_size = out_buf->capacity(); - iconv(conv_state, &in_data_ptr, &in_size, &out_data_ptr, &out_size); - return std::shared_ptr(std::move(out_buf)); - }; - - return std::make_shared(wrapped, transform); - }; - return std::make_shared(func); - - } else { - return empty; - } -} - } // namespace py } // namespace arrow diff --git a/cpp/src/arrow/python/io.h b/cpp/src/arrow/python/io.h index 4733aa1b1ed..f6c70ff50a4 100644 --- a/cpp/src/arrow/python/io.h +++ b/cpp/src/arrow/python/io.h @@ -117,7 +117,5 @@ using StreamWrapFunc = std::function>( std::shared_ptr MakeStreamTransformFunc(TransformInputStreamVTable vtable, PyObject* handler); -std::shared_ptr MakeStreamTransformLibFunc(std::string encoding, std::string lib_name); - } // namespace py } // namespace arrow diff --git a/python/pyarrow/_csv.pyx b/python/pyarrow/_csv.pyx index d943cce137f..734147c9cc7 100644 --- a/python/pyarrow/_csv.pyx +++ b/python/pyarrow/_csv.pyx @@ -290,9 +290,9 @@ cdef class ReadOptions(_Weakrefable): @property def encoding(self): """ - Character encoding used for this input. Default UTF-8". + Character encoding used for this input. Default UTF-8. """ - return deref(self.options).encoding + return frombytes(deref(self.options).encoding) @encoding.setter def encoding(self, value): diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index afe6993b85d..7f535e47877 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -1282,19 +1282,16 @@ cdef class CsvFragmentScanOptions(FragmentScanOptions): @property def read_options(self): - read_options = ReadOptions.wrap(self.csv_options.read_options) - return read_options + return ReadOptions.wrap(self.csv_options.read_options) @read_options.setter def read_options(self, ReadOptions read_options not None): self.csv_options.read_options = deref(read_options.options) - def add_transcoder(self, src_encoding, transcoding_library): + def set_transcoder(self, src_encoding): if src_encoding != 'utf8': self.csv_options.stream_transform_func = deref( - #make_streamwrap_func(src_encoding, dest_encoding)) - MakeStreamTransformLibFunc(tobytes(src_encoding), - tobytes(transcoding_library))) + make_streamwrap_func(src_encoding, 'utf8')) def equals(self, CsvFragmentScanOptions other): return ( diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index d0040f50f5f..606c134ddeb 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -434,9 +434,8 @@ def _filesystem_dataset(source, schema=None, filesystem=None, """ format = _ensure_format(format or 'parquet') if isinstance(format, CsvFileFormat): - format.default_fragment_scan_options.add_transcoder( - format.default_fragment_scan_options.read_options.encoding, - "libiconv") + format.default_fragment_scan_options.set_transcoder( + format.default_fragment_scan_options.read_options.encoding) partitioning = _ensure_partitioning(partitioning) diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 819209bb583..8f8bd5f531b 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -1377,11 +1377,6 @@ cdef extern from "arrow/io/api.h" namespace "arrow::io" nogil: CTransformInputStreamVTable vtable, object method_arg) - shared_ptr[function[StreamWrapFunc]] MakeStreamTransformLibFunc \ - "arrow::py::MakeStreamTransformLibFunc"( - c_string encoding, - c_string lib_name) - # ---------------------------------------------------------------------- # HDFS From a82a32acfdce5fb15a405be978988e6e741a2b2d Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Wed, 3 Aug 2022 11:53:55 +0200 Subject: [PATCH 21/27] Changed default for csv encoding to a constant --- cpp/src/arrow/csv/options.h | 6 ++++-- cpp/src/arrow/dataset/file_csv.cc | 8 ++++---- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/cpp/src/arrow/csv/options.h b/cpp/src/arrow/csv/options.h index c5fd2c65c1a..5092bf97339 100644 --- a/cpp/src/arrow/csv/options.h +++ b/cpp/src/arrow/csv/options.h @@ -39,6 +39,8 @@ namespace csv { // Silly workaround for https://github.com/michaeljones/breathe/issues/453 constexpr char kDefaultEscapeChar = '\\'; +constexpr char kCsvDefaultEncoding[] = "UTF-8"; + struct ARROW_EXPORT ParseOptions { // Parsing options @@ -163,8 +165,8 @@ struct ARROW_EXPORT ReadOptions { /// If false, column names will be read from the first CSV row after `skip_rows`. bool autogenerate_column_names = false; - /// Character encoding used - std::string encoding = "UTF-8"; + /// Character encoding used. Only "UTF-8" is supported. + std::string encoding = kCsvDefaultEncoding; /// Create read options with default values static ReadOptions Defaults(); diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc index 4fa2dc63e59..e9a0aba2e09 100644 --- a/cpp/src/arrow/dataset/file_csv.cc +++ b/cpp/src/arrow/dataset/file_csv.cc @@ -189,11 +189,11 @@ static inline Future> OpenReaderAsync( kCsvTypeName, scan_options.get(), format.default_fragment_scan_options)); ARROW_ASSIGN_OR_RAISE(auto reader_options, GetReadOptions(format, scan_options)); ARROW_ASSIGN_OR_RAISE(auto input, source.OpenCompressed()); - if (reader_options.encoding != "UTF-8") { + if (reader_options.encoding != csv::kCsvDefaultEncoding) { if (fragment_scan_options->stream_transform_func) { ARROW_ASSIGN_OR_RAISE(input, fragment_scan_options->stream_transform_func(input)); } else { - return Status::Invalid("File encoding is not UTF-8, but no stream_transform_func has been provided."); + return Status::Invalid("File encoding is not UTF-8, but no stream_transform_func has been provided"); } } const auto& path = source.path(); @@ -305,11 +305,11 @@ Future> CsvFileFormat::CountRows( kCsvTypeName, options.get(), self->default_fragment_scan_options)); ARROW_ASSIGN_OR_RAISE(auto read_options, GetReadOptions(*self, options)); ARROW_ASSIGN_OR_RAISE(auto input, file->source().OpenCompressed()); - if (read_options.encoding != "UTF-8") { + if (read_options.encoding != csv::kCsvDefaultEncoding) { if (fragment_scan_options->stream_transform_func) { ARROW_ASSIGN_OR_RAISE(input, fragment_scan_options->stream_transform_func(input)); } else { - return Status::Invalid("File encoding is not UTF-8, but no stream_transform_func has been provided."); + return Status::Invalid("File encoding is not UTF-8, but no stream_transform_func has been provided"); } } return csv::CountRowsAsync(options->io_context, std::move(input), From 21f120245de7e97433d046fe48b92e730e86a5ef Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Wed, 3 Aug 2022 12:00:14 +0200 Subject: [PATCH 22/27] Generating an error in C++ when the encoding is not UTF-8 --- cpp/src/arrow/csv/options.cc | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/cpp/src/arrow/csv/options.cc b/cpp/src/arrow/csv/options.cc index 365b5646b66..d73f284362e 100644 --- a/cpp/src/arrow/csv/options.cc +++ b/cpp/src/arrow/csv/options.cc @@ -67,6 +67,10 @@ Status ReadOptions::Validate() const { "ReadOptions: autogenerate_column_names cannot be true when column_names are " "provided"); } + if (ARROW_PREDICT_FALSE(encoding != kCsvDefaultEncoding)) { + return Status::Invalid( + "ReadOptions: only UTF-8 is supported: ", encoding); + } return Status::OK(); } From e06f03fdfddc1ebf2845d9d4a70c779e7e8ea24b Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Wed, 3 Aug 2022 12:08:46 +0200 Subject: [PATCH 23/27] In python, setting the encoding back to utf8 after creating a transcoding wrapper --- python/pyarrow/_csv.pyx | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/python/pyarrow/_csv.pyx b/python/pyarrow/_csv.pyx index 734147c9cc7..9471e2317b6 100644 --- a/python/pyarrow/_csv.pyx +++ b/python/pyarrow/_csv.pyx @@ -290,7 +290,7 @@ cdef class ReadOptions(_Weakrefable): @property def encoding(self): """ - Character encoding used for this input. Default UTF-8. + Character encoding used for this input. Default 'utf8'. """ return frombytes(deref(self.options).encoding) @@ -1084,10 +1084,12 @@ cdef _get_reader(input_file, ReadOptions read_options, shared_ptr[CInputStream]* out): use_memory_map = False get_input_stream(input_file, use_memory_map, out) - if read_options is not None: + if read_options is not None and read_options.encoding != 'utf8': out[0] = native_transcoding_input_stream(out[0], read_options.encoding, 'utf8') + # Set encoding to utf8 because we are transcoding to that + read_options.encoding = 'utf8' cdef _get_read_options(ReadOptions read_options, CCSVReadOptions* out): From 24b1cc5c7c792707227193963e2275ad7c5c1484 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Wed, 3 Aug 2022 14:55:36 +0200 Subject: [PATCH 24/27] 'UTF-8' -> 'utf8' to make it consistent with python --- cpp/src/arrow/csv/options.cc | 2 +- cpp/src/arrow/csv/options.h | 4 ++-- cpp/src/arrow/dataset/file_csv.cc | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/cpp/src/arrow/csv/options.cc b/cpp/src/arrow/csv/options.cc index d73f284362e..54750bbaf56 100644 --- a/cpp/src/arrow/csv/options.cc +++ b/cpp/src/arrow/csv/options.cc @@ -69,7 +69,7 @@ Status ReadOptions::Validate() const { } if (ARROW_PREDICT_FALSE(encoding != kCsvDefaultEncoding)) { return Status::Invalid( - "ReadOptions: only UTF-8 is supported: ", encoding); + "ReadOptions: only 'utf8' encoding is supported: ", encoding); } return Status::OK(); } diff --git a/cpp/src/arrow/csv/options.h b/cpp/src/arrow/csv/options.h index 5092bf97339..1450385022a 100644 --- a/cpp/src/arrow/csv/options.h +++ b/cpp/src/arrow/csv/options.h @@ -39,7 +39,7 @@ namespace csv { // Silly workaround for https://github.com/michaeljones/breathe/issues/453 constexpr char kDefaultEscapeChar = '\\'; -constexpr char kCsvDefaultEncoding[] = "UTF-8"; +constexpr char kCsvDefaultEncoding[] = "utf8"; struct ARROW_EXPORT ParseOptions { // Parsing options @@ -165,7 +165,7 @@ struct ARROW_EXPORT ReadOptions { /// If false, column names will be read from the first CSV row after `skip_rows`. bool autogenerate_column_names = false; - /// Character encoding used. Only "UTF-8" is supported. + /// Character encoding used. Only "utf8" is supported. std::string encoding = kCsvDefaultEncoding; /// Create read options with default values diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc index e9a0aba2e09..c92baf713e2 100644 --- a/cpp/src/arrow/dataset/file_csv.cc +++ b/cpp/src/arrow/dataset/file_csv.cc @@ -193,7 +193,7 @@ static inline Future> OpenReaderAsync( if (fragment_scan_options->stream_transform_func) { ARROW_ASSIGN_OR_RAISE(input, fragment_scan_options->stream_transform_func(input)); } else { - return Status::Invalid("File encoding is not UTF-8, but no stream_transform_func has been provided"); + return Status::Invalid("File encoding is not utf8, but no stream_transform_func has been provided"); } } const auto& path = source.path(); @@ -309,7 +309,7 @@ Future> CsvFileFormat::CountRows( if (fragment_scan_options->stream_transform_func) { ARROW_ASSIGN_OR_RAISE(input, fragment_scan_options->stream_transform_func(input)); } else { - return Status::Invalid("File encoding is not UTF-8, but no stream_transform_func has been provided"); + return Status::Invalid("File encoding is not utf8, but no stream_transform_func has been provided"); } } return csv::CountRowsAsync(options->io_context, std::move(input), From c2c4b2260bf01f9336908b9b5198fdc9c58d0d68 Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Wed, 3 Aug 2022 16:45:55 +0200 Subject: [PATCH 25/27] Disabled ReadOptions encoding validation --- cpp/src/arrow/csv/options.cc | 6 ++---- python/pyarrow/_csv.pyx | 2 -- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/cpp/src/arrow/csv/options.cc b/cpp/src/arrow/csv/options.cc index 54750bbaf56..ef6d46b944c 100644 --- a/cpp/src/arrow/csv/options.cc +++ b/cpp/src/arrow/csv/options.cc @@ -67,10 +67,8 @@ Status ReadOptions::Validate() const { "ReadOptions: autogenerate_column_names cannot be true when column_names are " "provided"); } - if (ARROW_PREDICT_FALSE(encoding != kCsvDefaultEncoding)) { - return Status::Invalid( - "ReadOptions: only 'utf8' encoding is supported: ", encoding); - } + // Validating encoding == utf8 is not possible, because the input stream can be + // wrapped in a transcoder return Status::OK(); } diff --git a/python/pyarrow/_csv.pyx b/python/pyarrow/_csv.pyx index 9471e2317b6..51792468d36 100644 --- a/python/pyarrow/_csv.pyx +++ b/python/pyarrow/_csv.pyx @@ -1088,8 +1088,6 @@ cdef _get_reader(input_file, ReadOptions read_options, out[0] = native_transcoding_input_stream(out[0], read_options.encoding, 'utf8') - # Set encoding to utf8 because we are transcoding to that - read_options.encoding = 'utf8' cdef _get_read_options(ReadOptions read_options, CCSVReadOptions* out): From 30124c2aba889cfab229d55d092bfbfe69880cdb Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Mon, 8 Aug 2022 09:52:02 +0200 Subject: [PATCH 26/27] Formatting --- python/pyarrow/_csv.pyx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyarrow/_csv.pyx b/python/pyarrow/_csv.pyx index 51792468d36..5dd2181292b 100644 --- a/python/pyarrow/_csv.pyx +++ b/python/pyarrow/_csv.pyx @@ -286,7 +286,7 @@ cdef class ReadOptions(_Weakrefable): @skip_rows_after_names.setter def skip_rows_after_names(self, value): deref(self.options).skip_rows_after_names = value - + @property def encoding(self): """ From 9a5bf209a859fd05d5d219421a7ec6cbc205f28f Mon Sep 17 00:00:00 2001 From: Joost Hoozemans Date: Mon, 8 Aug 2022 12:55:57 +0200 Subject: [PATCH 27/27] Moved the creation of the stream wrapper function into the readoption setter --- python/pyarrow/_dataset.pyx | 6 ++---- python/pyarrow/dataset.py | 4 ---- 2 files changed, 2 insertions(+), 8 deletions(-) diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 7f535e47877..9c69c39de3e 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -1287,11 +1287,9 @@ cdef class CsvFragmentScanOptions(FragmentScanOptions): @read_options.setter def read_options(self, ReadOptions read_options not None): self.csv_options.read_options = deref(read_options.options) - - def set_transcoder(self, src_encoding): - if src_encoding != 'utf8': + if read_options.encoding != 'utf8': self.csv_options.stream_transform_func = deref( - make_streamwrap_func(src_encoding, 'utf8')) + make_streamwrap_func(read_options.encoding, 'utf8')) def equals(self, CsvFragmentScanOptions other): return ( diff --git a/python/pyarrow/dataset.py b/python/pyarrow/dataset.py index 606c134ddeb..2518e37ec6f 100644 --- a/python/pyarrow/dataset.py +++ b/python/pyarrow/dataset.py @@ -433,10 +433,6 @@ def _filesystem_dataset(source, schema=None, filesystem=None, FileSystemDataset """ format = _ensure_format(format or 'parquet') - if isinstance(format, CsvFileFormat): - format.default_fragment_scan_options.set_transcoder( - format.default_fragment_scan_options.read_options.encoding) - partitioning = _ensure_partitioning(partitioning) if isinstance(source, (list, tuple)):