Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
5d7c2bd
Added field to CsvFragmentScanOptions that holds an optional transfor…
joosthooz Jul 26, 2022
a82239d
WIP wrapping a trancoder around all input streams of a dataset
joosthooz Jul 26, 2022
f67590c
Added input stream wrapping to CsvFileFormat::CountRows too
joosthooz Jul 28, 2022
a717ddc
Moved make_streamwrap_func into io.pxi, removed duplicated code
joosthooz Jul 28, 2022
9284eb6
Use UpperCamelCase in function name
joosthooz Jul 28, 2022
b2061bf
Additional name change, formatting fix
joosthooz Jul 28, 2022
bbdaa07
Use GetReadOptions(), because it overrides the use_threads field
joosthooz Jul 29, 2022
c959112
Only add a transcoder for csv files
joosthooz Jul 29, 2022
7fceb84
Processed some review comments regarding documentation
joosthooz Jul 29, 2022
401f67d
Moved encoding parameter from dataset() into CsvFileFormat
joosthooz Jul 29, 2022
9816e46
Removed a left-over occurrence of the workaround encoding parameter
joosthooz Jul 29, 2022
888b3e8
Setting default encoding to utf8
joosthooz Jul 29, 2022
0f67e8c
Always creating a default_fragment_scan_options
joosthooz Jul 29, 2022
47c536d
Using a different way of checking the file format
joosthooz Jul 29, 2022
a539dd0
Added documentation about added encoding parameter
joosthooz Jul 29, 2022
22eff73
Implementaed an alternative way of passing the encoding.
joosthooz Jul 29, 2022
4d9802b
Added python-specific encoding field to CsvFileformat.
joosthooz Aug 1, 2022
4d819aa
Removed encoding from CsvFragmentScanOptions
joosthooz Aug 1, 2022
1534bd1
Added transcoding functionality by dlopening libiconv
joosthooz Aug 1, 2022
d2c9a06
Removed encoding library wrapper code, now returning error if no tran…
joosthooz Aug 2, 2022
a82a32a
Changed default for csv encoding to a constant
joosthooz Aug 3, 2022
21f1202
Generating an error in C++ when the encoding is not UTF-8
joosthooz Aug 3, 2022
e06f03f
In python, setting the encoding back to utf8 after creating a transco…
joosthooz Aug 3, 2022
24b1cc5
'UTF-8' -> 'utf8' to make it consistent with python
joosthooz Aug 3, 2022
c2c4b22
Disabled ReadOptions encoding validation
joosthooz Aug 3, 2022
30124c2
Formatting
joosthooz Aug 8, 2022
9a5bf20
Moved the creation of the stream wrapper function into the readoption…
joosthooz Aug 8, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cpp/src/arrow/csv/options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ Status ReadOptions::Validate() const {
"ReadOptions: autogenerate_column_names cannot be true when column_names are "
"provided");
}
// Validating encoding == utf8 is not possible, because the input stream can be
// wrapped in a transcoder
return Status::OK();
}

Expand Down
5 changes: 5 additions & 0 deletions cpp/src/arrow/csv/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ namespace csv {
// Silly workaround for https://github.com/michaeljones/breathe/issues/453
constexpr char kDefaultEscapeChar = '\\';

constexpr char kCsvDefaultEncoding[] = "utf8";

struct ARROW_EXPORT ParseOptions {
// Parsing options

Expand Down Expand Up @@ -163,6 +165,9 @@ struct ARROW_EXPORT ReadOptions {
/// If false, column names will be read from the first CSV row after `skip_rows`.
bool autogenerate_column_names = false;

/// Character encoding used. Only "utf8" is supported.
std::string encoding = kCsvDefaultEncoding;

/// Create read options with default values
static ReadOptions Defaults();

Expand Down
25 changes: 23 additions & 2 deletions cpp/src/arrow/dataset/file_csv.cc
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,19 @@ static inline Future<std::shared_ptr<csv::StreamingReader>> OpenReaderAsync(
auto tracer = arrow::internal::tracing::GetTracer();
auto span = tracer->StartSpan("arrow::dataset::CsvFileFormat::OpenReaderAsync");
#endif
ARROW_ASSIGN_OR_RAISE(
auto fragment_scan_options,
GetFragmentScanOptions<CsvFragmentScanOptions>(
kCsvTypeName, scan_options.get(), format.default_fragment_scan_options));
ARROW_ASSIGN_OR_RAISE(auto reader_options, GetReadOptions(format, scan_options));

ARROW_ASSIGN_OR_RAISE(auto input, source.OpenCompressed());
if (reader_options.encoding != csv::kCsvDefaultEncoding) {
if (fragment_scan_options->stream_transform_func) {
ARROW_ASSIGN_OR_RAISE(input, fragment_scan_options->stream_transform_func(input));
} else {
return Status::Invalid("File encoding is not utf8, but no stream_transform_func has been provided");
}
}
const auto& path = source.path();
ARROW_ASSIGN_OR_RAISE(
input, io::BufferedInputStream::Create(reader_options.block_size,
Expand Down Expand Up @@ -289,8 +299,19 @@ Future<util::optional<int64_t>> CsvFileFormat::CountRows(
return Future<util::optional<int64_t>>::MakeFinished(util::nullopt);
}
auto self = checked_pointer_cast<CsvFileFormat>(shared_from_this());
ARROW_ASSIGN_OR_RAISE(auto input, file->source().OpenCompressed());
ARROW_ASSIGN_OR_RAISE(
auto fragment_scan_options,
GetFragmentScanOptions<CsvFragmentScanOptions>(
kCsvTypeName, options.get(), self->default_fragment_scan_options));
ARROW_ASSIGN_OR_RAISE(auto read_options, GetReadOptions(*self, options));
ARROW_ASSIGN_OR_RAISE(auto input, file->source().OpenCompressed());
if (read_options.encoding != csv::kCsvDefaultEncoding) {
if (fragment_scan_options->stream_transform_func) {
ARROW_ASSIGN_OR_RAISE(input, fragment_scan_options->stream_transform_func(input));
} else {
return Status::Invalid("File encoding is not utf8, but no stream_transform_func has been provided");
}
}
return csv::CountRowsAsync(options->io_context, std::move(input),
::arrow::internal::GetCpuThreadPool(), read_options,
self->parse_options)
Expand Down
10 changes: 10 additions & 0 deletions cpp/src/arrow/dataset/file_csv.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,13 +73,23 @@ class ARROW_DS_EXPORT CsvFileFormat : public FileFormat {
struct ARROW_DS_EXPORT CsvFragmentScanOptions : public FragmentScanOptions {
std::string type_name() const override { return kCsvTypeName; }

using StreamWrapFunc = std::function<Result<std::shared_ptr<io::InputStream>>(
std::shared_ptr<io::InputStream>)>;

/// CSV conversion options
csv::ConvertOptions convert_options = csv::ConvertOptions::Defaults();

/// CSV reading options
///
/// Note that use_threads is always ignored.
csv::ReadOptions read_options = csv::ReadOptions::Defaults();

/// Optional stream wrapping function
///
/// If defined, all open dataset file fragments will be passed
/// through this function. One possible use case is to transparently
/// transcode all input files from a given character set to utf8.
StreamWrapFunc stream_transform_func{};
};

class ARROW_DS_EXPORT CsvFileWriteOptions : public FileWriteOptions {
Expand Down
10 changes: 10 additions & 0 deletions cpp/src/arrow/python/io.cc
Original file line number Diff line number Diff line change
Expand Up @@ -370,5 +370,15 @@ std::shared_ptr<::arrow::io::InputStream> MakeTransformInputStream(
return std::make_shared<TransformInputStream>(std::move(wrapped), std::move(transform));
}

std::shared_ptr<StreamWrapFunc> MakeStreamTransformFunc(TransformInputStreamVTable vtable,
PyObject* handler) {
TransformInputStream::TransformFunc transform(
TransformFunctionWrapper{std::move(vtable.transform), handler});
StreamWrapFunc func = [transform](std::shared_ptr<::arrow::io::InputStream> wrapped) {
return std::make_shared<TransformInputStream>(wrapped, transform);
};
return std::make_shared<StreamWrapFunc>(func);
}

} // namespace py
} // namespace arrow
5 changes: 5 additions & 0 deletions cpp/src/arrow/python/io.h
Original file line number Diff line number Diff line change
Expand Up @@ -112,5 +112,10 @@ std::shared_ptr<::arrow::io::InputStream> MakeTransformInputStream(
std::shared_ptr<::arrow::io::InputStream> wrapped, TransformInputStreamVTable vtable,
PyObject* arg);

using StreamWrapFunc = std::function<Result<std::shared_ptr<io::InputStream>>(
std::shared_ptr<io::InputStream>)>;
std::shared_ptr<StreamWrapFunc> MakeStreamTransformFunc(TransformInputStreamVTable vtable,
PyObject* handler);

} // namespace py
} // namespace arrow
1 change: 0 additions & 1 deletion python/pyarrow/_csv.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ cdef class ParseOptions(_Weakrefable):
cdef class ReadOptions(_Weakrefable):
cdef:
unique_ptr[CCSVReadOptions] options
public object encoding

@staticmethod
cdef ReadOptions wrap(CCSVReadOptions options)
Expand Down
15 changes: 12 additions & 3 deletions python/pyarrow/_csv.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,6 @@ cdef class ReadOptions(_Weakrefable):
self.column_names = column_names
if autogenerate_column_names is not None:
self.autogenerate_column_names= autogenerate_column_names
# Python-specific option
self.encoding = encoding
if skip_rows_after_names is not None:
self.skip_rows_after_names = skip_rows_after_names
Expand Down Expand Up @@ -288,6 +287,17 @@ cdef class ReadOptions(_Weakrefable):
def skip_rows_after_names(self, value):
deref(self.options).skip_rows_after_names = value

@property
def encoding(self):
"""
Character encoding used for this input. Default 'utf8'.
"""
return frombytes(deref(self.options).encoding)

@encoding.setter
def encoding(self, value):
deref(self.options).encoding = tobytes(value)

def validate(self):
check_status(deref(self.options).Validate())

Expand All @@ -307,7 +317,6 @@ cdef class ReadOptions(_Weakrefable):
cdef ReadOptions wrap(CCSVReadOptions options):
out = ReadOptions()
out.options.reset(new CCSVReadOptions(move(options)))
out.encoding = 'utf8' # No way to know this
return out

def __getstate__(self):
Expand Down Expand Up @@ -1075,7 +1084,7 @@ cdef _get_reader(input_file, ReadOptions read_options,
shared_ptr[CInputStream]* out):
use_memory_map = False
get_input_stream(input_file, use_memory_map, out)
if read_options is not None:
if read_options is not None and read_options.encoding != 'utf8':
out[0] = native_transcoding_input_stream(out[0],
read_options.encoding,
'utf8')
Expand Down
7 changes: 7 additions & 0 deletions python/pyarrow/_dataset.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

from cython.operator cimport dereference as deref

import codecs
import collections
import os
import warnings
Expand Down Expand Up @@ -1198,6 +1199,9 @@ cdef class CsvFileFormat(FileFormat):
raise TypeError('`default_fragment_scan_options` must be either '
'a dictionary or an instance of '
'CsvFragmentScanOptions')
else:
# default_fragment_scan_options is needed to add a transcoder
self.default_fragment_scan_options = CsvFragmentScanOptions()

cdef void init(self, const shared_ptr[CFileFormat]& sp):
FileFormat.init(self, sp)
Expand Down Expand Up @@ -1283,6 +1287,9 @@ cdef class CsvFragmentScanOptions(FragmentScanOptions):
@read_options.setter
def read_options(self, ReadOptions read_options not None):
self.csv_options.read_options = deref(read_options.options)
if read_options.encoding != 'utf8':
self.csv_options.stream_transform_func = deref(
make_streamwrap_func(read_options.encoding, 'utf8'))

def equals(self, CsvFragmentScanOptions other):
return (
Expand Down
9 changes: 9 additions & 0 deletions python/pyarrow/includes/libarrow.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -1208,6 +1208,9 @@ cdef extern from "arrow/builder.h" namespace "arrow" nogil:
ctypedef void CallbackTransform(object, const shared_ptr[CBuffer]& src,
shared_ptr[CBuffer]* dest)

ctypedef CResult[shared_ptr[CInputStream]] StreamWrapFunc(
shared_ptr[CInputStream])


cdef extern from "arrow/util/cancel.h" namespace "arrow" nogil:
cdef cppclass CStopToken "arrow::StopToken":
Expand Down Expand Up @@ -1369,6 +1372,11 @@ cdef extern from "arrow/io/api.h" namespace "arrow::io" nogil:
shared_ptr[CInputStream] wrapped, CTransformInputStreamVTable vtable,
object method_arg)

shared_ptr[function[StreamWrapFunc]] MakeStreamTransformFunc \
"arrow::py::MakeStreamTransformFunc"(
CTransformInputStreamVTable vtable,
object method_arg)

# ----------------------------------------------------------------------
# HDFS

Expand Down Expand Up @@ -1723,6 +1731,7 @@ cdef extern from "arrow/csv/api.h" namespace "arrow::csv" nogil:
int32_t skip_rows_after_names
vector[c_string] column_names
c_bool autogenerate_column_names
c_string encoding

CCSVReadOptions()
CCSVReadOptions(CCSVReadOptions&&)
Expand Down
1 change: 1 addition & 0 deletions python/pyarrow/includes/libarrow_dataset.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,7 @@ cdef extern from "arrow/dataset/api.h" namespace "arrow::dataset" nogil:
"arrow::dataset::CsvFragmentScanOptions"(CFragmentScanOptions):
CCSVConvertOptions convert_options
CCSVReadOptions read_options
function[StreamWrapFunc] stream_transform_func

cdef cppclass CPartitioning "arrow::dataset::Partitioning":
c_string type_name() const
Expand Down
29 changes: 28 additions & 1 deletion python/pyarrow/io.pxi
Original file line number Diff line number Diff line change
Expand Up @@ -1547,6 +1547,33 @@ class Transcoder:
return self._encoder.encode(self._decoder.decode(buf, final), final)


cdef shared_ptr[function[StreamWrapFunc]] make_streamwrap_func(
src_encoding, dest_encoding) except *:
"""
Create a function that will add a transcoding transformation to a stream.
Data from that stream will be decoded according to ``src_encoding`` and
then re-encoded according to ``dest_encoding``.
The created function can be used to wrap streams.

Parameters
----------
src_encoding : str
The codec to use when reading data.
dest_encoding : str
The codec to use for emitted data.
"""
cdef:
shared_ptr[function[StreamWrapFunc]] empty_func
CTransformInputStreamVTable vtable

vtable.transform = _cb_transform
src_codec = codecs.lookup(src_encoding)
dest_codec = codecs.lookup(dest_encoding)
return MakeStreamTransformFunc(move(vtable),
Transcoder(src_codec.incrementaldecoder(),
dest_codec.incrementalencoder()))


def transcoding_input_stream(stream, src_encoding, dest_encoding):
"""
Add a transcoding transformation to the stream.
Expand All @@ -1558,7 +1585,7 @@ def transcoding_input_stream(stream, src_encoding, dest_encoding):
stream : NativeFile
The stream to which the transformation should be applied.
src_encoding : str
The codec to use when reading data data.
The codec to use when reading data.
dest_encoding : str
The codec to use for emitted data.
"""
Expand Down
3 changes: 3 additions & 0 deletions python/pyarrow/lib.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,9 @@ cdef shared_ptr[CInputStream] native_transcoding_input_stream(
shared_ptr[CInputStream] stream, src_encoding,
dest_encoding) except *

cdef shared_ptr[function[StreamWrapFunc]] make_streamwrap_func(
src_encoding, dest_encoding) except *

# Default is allow_none=False
cpdef DataType ensure_type(object type, bint allow_none=*)

Expand Down