diff --git a/cpp/src/arrow/csv/options.cc b/cpp/src/arrow/csv/options.cc index 365b5646b66..ef6d46b944c 100644 --- a/cpp/src/arrow/csv/options.cc +++ b/cpp/src/arrow/csv/options.cc @@ -67,6 +67,8 @@ Status ReadOptions::Validate() const { "ReadOptions: autogenerate_column_names cannot be true when column_names are " "provided"); } + // Validating encoding == utf8 is not possible, because the input stream can be + // wrapped in a transcoder return Status::OK(); } diff --git a/cpp/src/arrow/csv/options.h b/cpp/src/arrow/csv/options.h index 7723dcedc61..1450385022a 100644 --- a/cpp/src/arrow/csv/options.h +++ b/cpp/src/arrow/csv/options.h @@ -39,6 +39,8 @@ namespace csv { // Silly workaround for https://github.com/michaeljones/breathe/issues/453 constexpr char kDefaultEscapeChar = '\\'; +constexpr char kCsvDefaultEncoding[] = "utf8"; + struct ARROW_EXPORT ParseOptions { // Parsing options @@ -163,6 +165,9 @@ struct ARROW_EXPORT ReadOptions { /// If false, column names will be read from the first CSV row after `skip_rows`. bool autogenerate_column_names = false; + /// Character encoding used. Only "utf8" is supported. + std::string encoding = kCsvDefaultEncoding; + /// Create read options with default values static ReadOptions Defaults(); diff --git a/cpp/src/arrow/dataset/file_csv.cc b/cpp/src/arrow/dataset/file_csv.cc index d4e0af7808c..c92baf713e2 100644 --- a/cpp/src/arrow/dataset/file_csv.cc +++ b/cpp/src/arrow/dataset/file_csv.cc @@ -183,9 +183,19 @@ static inline Future> OpenReaderAsync( auto tracer = arrow::internal::tracing::GetTracer(); auto span = tracer->StartSpan("arrow::dataset::CsvFileFormat::OpenReaderAsync"); #endif + ARROW_ASSIGN_OR_RAISE( + auto fragment_scan_options, + GetFragmentScanOptions( + kCsvTypeName, scan_options.get(), format.default_fragment_scan_options)); ARROW_ASSIGN_OR_RAISE(auto reader_options, GetReadOptions(format, scan_options)); - ARROW_ASSIGN_OR_RAISE(auto input, source.OpenCompressed()); + if (reader_options.encoding != csv::kCsvDefaultEncoding) { + if (fragment_scan_options->stream_transform_func) { + ARROW_ASSIGN_OR_RAISE(input, fragment_scan_options->stream_transform_func(input)); + } else { + return Status::Invalid("File encoding is not utf8, but no stream_transform_func has been provided"); + } + } const auto& path = source.path(); ARROW_ASSIGN_OR_RAISE( input, io::BufferedInputStream::Create(reader_options.block_size, @@ -289,8 +299,19 @@ Future> CsvFileFormat::CountRows( return Future>::MakeFinished(util::nullopt); } auto self = checked_pointer_cast(shared_from_this()); - ARROW_ASSIGN_OR_RAISE(auto input, file->source().OpenCompressed()); + ARROW_ASSIGN_OR_RAISE( + auto fragment_scan_options, + GetFragmentScanOptions( + kCsvTypeName, options.get(), self->default_fragment_scan_options)); ARROW_ASSIGN_OR_RAISE(auto read_options, GetReadOptions(*self, options)); + ARROW_ASSIGN_OR_RAISE(auto input, file->source().OpenCompressed()); + if (read_options.encoding != csv::kCsvDefaultEncoding) { + if (fragment_scan_options->stream_transform_func) { + ARROW_ASSIGN_OR_RAISE(input, fragment_scan_options->stream_transform_func(input)); + } else { + return Status::Invalid("File encoding is not utf8, but no stream_transform_func has been provided"); + } + } return csv::CountRowsAsync(options->io_context, std::move(input), ::arrow::internal::GetCpuThreadPool(), read_options, self->parse_options) diff --git a/cpp/src/arrow/dataset/file_csv.h b/cpp/src/arrow/dataset/file_csv.h index 83dbb88b85f..84bcf94abe3 100644 --- a/cpp/src/arrow/dataset/file_csv.h +++ b/cpp/src/arrow/dataset/file_csv.h @@ -73,6 +73,9 @@ class ARROW_DS_EXPORT CsvFileFormat : public FileFormat { struct ARROW_DS_EXPORT CsvFragmentScanOptions : public FragmentScanOptions { std::string type_name() const override { return kCsvTypeName; } + using StreamWrapFunc = std::function>( + std::shared_ptr)>; + /// CSV conversion options csv::ConvertOptions convert_options = csv::ConvertOptions::Defaults(); @@ -80,6 +83,13 @@ struct ARROW_DS_EXPORT CsvFragmentScanOptions : public FragmentScanOptions { /// /// Note that use_threads is always ignored. csv::ReadOptions read_options = csv::ReadOptions::Defaults(); + + /// Optional stream wrapping function + /// + /// If defined, all open dataset file fragments will be passed + /// through this function. One possible use case is to transparently + /// transcode all input files from a given character set to utf8. + StreamWrapFunc stream_transform_func{}; }; class ARROW_DS_EXPORT CsvFileWriteOptions : public FileWriteOptions { diff --git a/cpp/src/arrow/python/io.cc b/cpp/src/arrow/python/io.cc index 73525feed38..c621d16427b 100644 --- a/cpp/src/arrow/python/io.cc +++ b/cpp/src/arrow/python/io.cc @@ -370,5 +370,15 @@ std::shared_ptr<::arrow::io::InputStream> MakeTransformInputStream( return std::make_shared(std::move(wrapped), std::move(transform)); } +std::shared_ptr MakeStreamTransformFunc(TransformInputStreamVTable vtable, + PyObject* handler) { + TransformInputStream::TransformFunc transform( + TransformFunctionWrapper{std::move(vtable.transform), handler}); + StreamWrapFunc func = [transform](std::shared_ptr<::arrow::io::InputStream> wrapped) { + return std::make_shared(wrapped, transform); + }; + return std::make_shared(func); +} + } // namespace py } // namespace arrow diff --git a/cpp/src/arrow/python/io.h b/cpp/src/arrow/python/io.h index a38d0ca332c..f6c70ff50a4 100644 --- a/cpp/src/arrow/python/io.h +++ b/cpp/src/arrow/python/io.h @@ -112,5 +112,10 @@ std::shared_ptr<::arrow::io::InputStream> MakeTransformInputStream( std::shared_ptr<::arrow::io::InputStream> wrapped, TransformInputStreamVTable vtable, PyObject* arg); +using StreamWrapFunc = std::function>( + std::shared_ptr)>; +std::shared_ptr MakeStreamTransformFunc(TransformInputStreamVTable vtable, + PyObject* handler); + } // namespace py } // namespace arrow diff --git a/python/pyarrow/_csv.pxd b/python/pyarrow/_csv.pxd index dcc562a41c7..a7d63e548bf 100644 --- a/python/pyarrow/_csv.pxd +++ b/python/pyarrow/_csv.pxd @@ -41,7 +41,6 @@ cdef class ParseOptions(_Weakrefable): cdef class ReadOptions(_Weakrefable): cdef: unique_ptr[CCSVReadOptions] options - public object encoding @staticmethod cdef ReadOptions wrap(CCSVReadOptions options) diff --git a/python/pyarrow/_csv.pyx b/python/pyarrow/_csv.pyx index d1db03c75f1..5dd2181292b 100644 --- a/python/pyarrow/_csv.pyx +++ b/python/pyarrow/_csv.pyx @@ -201,7 +201,6 @@ cdef class ReadOptions(_Weakrefable): self.column_names = column_names if autogenerate_column_names is not None: self.autogenerate_column_names= autogenerate_column_names - # Python-specific option self.encoding = encoding if skip_rows_after_names is not None: self.skip_rows_after_names = skip_rows_after_names @@ -288,6 +287,17 @@ cdef class ReadOptions(_Weakrefable): def skip_rows_after_names(self, value): deref(self.options).skip_rows_after_names = value + @property + def encoding(self): + """ + Character encoding used for this input. Default 'utf8'. + """ + return frombytes(deref(self.options).encoding) + + @encoding.setter + def encoding(self, value): + deref(self.options).encoding = tobytes(value) + def validate(self): check_status(deref(self.options).Validate()) @@ -307,7 +317,6 @@ cdef class ReadOptions(_Weakrefable): cdef ReadOptions wrap(CCSVReadOptions options): out = ReadOptions() out.options.reset(new CCSVReadOptions(move(options))) - out.encoding = 'utf8' # No way to know this return out def __getstate__(self): @@ -1075,7 +1084,7 @@ cdef _get_reader(input_file, ReadOptions read_options, shared_ptr[CInputStream]* out): use_memory_map = False get_input_stream(input_file, use_memory_map, out) - if read_options is not None: + if read_options is not None and read_options.encoding != 'utf8': out[0] = native_transcoding_input_stream(out[0], read_options.encoding, 'utf8') diff --git a/python/pyarrow/_dataset.pyx b/python/pyarrow/_dataset.pyx index 68833a5350e..9c69c39de3e 100644 --- a/python/pyarrow/_dataset.pyx +++ b/python/pyarrow/_dataset.pyx @@ -21,6 +21,7 @@ from cython.operator cimport dereference as deref +import codecs import collections import os import warnings @@ -1198,6 +1199,9 @@ cdef class CsvFileFormat(FileFormat): raise TypeError('`default_fragment_scan_options` must be either ' 'a dictionary or an instance of ' 'CsvFragmentScanOptions') + else: + # default_fragment_scan_options is needed to add a transcoder + self.default_fragment_scan_options = CsvFragmentScanOptions() cdef void init(self, const shared_ptr[CFileFormat]& sp): FileFormat.init(self, sp) @@ -1283,6 +1287,9 @@ cdef class CsvFragmentScanOptions(FragmentScanOptions): @read_options.setter def read_options(self, ReadOptions read_options not None): self.csv_options.read_options = deref(read_options.options) + if read_options.encoding != 'utf8': + self.csv_options.stream_transform_func = deref( + make_streamwrap_func(read_options.encoding, 'utf8')) def equals(self, CsvFragmentScanOptions other): return ( diff --git a/python/pyarrow/includes/libarrow.pxd b/python/pyarrow/includes/libarrow.pxd index 4cbcef84e88..8f8bd5f531b 100644 --- a/python/pyarrow/includes/libarrow.pxd +++ b/python/pyarrow/includes/libarrow.pxd @@ -1208,6 +1208,9 @@ cdef extern from "arrow/builder.h" namespace "arrow" nogil: ctypedef void CallbackTransform(object, const shared_ptr[CBuffer]& src, shared_ptr[CBuffer]* dest) +ctypedef CResult[shared_ptr[CInputStream]] StreamWrapFunc( + shared_ptr[CInputStream]) + cdef extern from "arrow/util/cancel.h" namespace "arrow" nogil: cdef cppclass CStopToken "arrow::StopToken": @@ -1369,6 +1372,11 @@ cdef extern from "arrow/io/api.h" namespace "arrow::io" nogil: shared_ptr[CInputStream] wrapped, CTransformInputStreamVTable vtable, object method_arg) + shared_ptr[function[StreamWrapFunc]] MakeStreamTransformFunc \ + "arrow::py::MakeStreamTransformFunc"( + CTransformInputStreamVTable vtable, + object method_arg) + # ---------------------------------------------------------------------- # HDFS @@ -1723,6 +1731,7 @@ cdef extern from "arrow/csv/api.h" namespace "arrow::csv" nogil: int32_t skip_rows_after_names vector[c_string] column_names c_bool autogenerate_column_names + c_string encoding CCSVReadOptions() CCSVReadOptions(CCSVReadOptions&&) diff --git a/python/pyarrow/includes/libarrow_dataset.pxd b/python/pyarrow/includes/libarrow_dataset.pxd index bd8fbd1b56a..ad1bbbc5442 100644 --- a/python/pyarrow/includes/libarrow_dataset.pxd +++ b/python/pyarrow/includes/libarrow_dataset.pxd @@ -277,6 +277,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil: "arrow::dataset::CsvFragmentScanOptions"(CFragmentScanOptions): CCSVConvertOptions convert_options CCSVReadOptions read_options + function[StreamWrapFunc] stream_transform_func cdef cppclass CPartitioning "arrow::dataset::Partitioning": c_string type_name() const diff --git a/python/pyarrow/io.pxi b/python/pyarrow/io.pxi index d2e4f7062e6..716a23721c5 100644 --- a/python/pyarrow/io.pxi +++ b/python/pyarrow/io.pxi @@ -1547,6 +1547,33 @@ class Transcoder: return self._encoder.encode(self._decoder.decode(buf, final), final) +cdef shared_ptr[function[StreamWrapFunc]] make_streamwrap_func( + src_encoding, dest_encoding) except *: + """ + Create a function that will add a transcoding transformation to a stream. + Data from that stream will be decoded according to ``src_encoding`` and + then re-encoded according to ``dest_encoding``. + The created function can be used to wrap streams. + + Parameters + ---------- + src_encoding : str + The codec to use when reading data. + dest_encoding : str + The codec to use for emitted data. + """ + cdef: + shared_ptr[function[StreamWrapFunc]] empty_func + CTransformInputStreamVTable vtable + + vtable.transform = _cb_transform + src_codec = codecs.lookup(src_encoding) + dest_codec = codecs.lookup(dest_encoding) + return MakeStreamTransformFunc(move(vtable), + Transcoder(src_codec.incrementaldecoder(), + dest_codec.incrementalencoder())) + + def transcoding_input_stream(stream, src_encoding, dest_encoding): """ Add a transcoding transformation to the stream. @@ -1558,7 +1585,7 @@ def transcoding_input_stream(stream, src_encoding, dest_encoding): stream : NativeFile The stream to which the transformation should be applied. src_encoding : str - The codec to use when reading data data. + The codec to use when reading data. dest_encoding : str The codec to use for emitted data. """ diff --git a/python/pyarrow/lib.pxd b/python/pyarrow/lib.pxd index 953b0e7b518..67db3d2ffb8 100644 --- a/python/pyarrow/lib.pxd +++ b/python/pyarrow/lib.pxd @@ -536,6 +536,9 @@ cdef shared_ptr[CInputStream] native_transcoding_input_stream( shared_ptr[CInputStream] stream, src_encoding, dest_encoding) except * +cdef shared_ptr[function[StreamWrapFunc]] make_streamwrap_func( + src_encoding, dest_encoding) except * + # Default is allow_none=False cpdef DataType ensure_type(object type, bint allow_none=*)