From 5dcb72a8ce37658e309934b5e815edd4bf97d552 Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Tue, 2 Jul 2024 10:34:55 +0000 Subject: [PATCH 1/6] support csv option --- java/dataset/src/main/cpp/jni_wrapper.cc | 116 ++++++++++++++++-- .../file/FileSystemDatasetFactory.java | 40 +++++- .../apache/arrow/dataset/file/JniWrapper.java | 12 +- .../apache/arrow/dataset/jni/JniWrapper.java | 4 + .../arrow/dataset/jni/NativeDataset.java | 11 +- .../dataset/scanner/FragmentScanOptions.java | 46 +++++++ .../arrow/dataset/scanner/ScanOptions.java | 21 ++++ .../scanner/csv/CsvConvertOptions.java | 48 ++++++++ .../scanner/csv/CsvFragmentScanOptions.java | 95 ++++++++++++++ .../dataset/TestFragmentScanOptions.java | 85 +++++++++++++ .../src/test/resources/data/student.csv | 4 + 11 files changed, 461 insertions(+), 21 deletions(-) create mode 100644 java/dataset/src/main/java/org/apache/arrow/dataset/scanner/FragmentScanOptions.java create mode 100644 java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvConvertOptions.java create mode 100644 java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvFragmentScanOptions.java create mode 100644 java/dataset/src/test/java/org/apache/arrow/dataset/TestFragmentScanOptions.java create mode 100644 java/dataset/src/test/resources/data/student.csv diff --git a/java/dataset/src/main/cpp/jni_wrapper.cc b/java/dataset/src/main/cpp/jni_wrapper.cc index 4ef2a2ffd92..ebb0969e540 100644 --- a/java/dataset/src/main/cpp/jni_wrapper.cc +++ b/java/dataset/src/main/cpp/jni_wrapper.cc @@ -25,6 +25,7 @@ #include "arrow/c/helpers.h" #include "arrow/dataset/api.h" #include "arrow/dataset/file_base.h" +#include "arrow/dataset/file_csv.h" #include "arrow/filesystem/api.h" #include "arrow/filesystem/path_util.h" #include "arrow/engine/substrait/util.h" @@ -363,6 +364,64 @@ std::shared_ptr LoadArrowBufferFromByteBuffer(JNIEnv* env, jobjec return buffer; } +inline bool ParseBool(const std::string& value) { return value == "true" ? true : false; } + +/// \brief Construct FragmentScanOptions from config map +#ifdef ARROW_CSV +arrow::Result> +ToCsvFragmentScanOptions(const std::unordered_map& configs) { + std::shared_ptr options = + std::make_shared(); + for (auto const& it : configs) { + auto& key = it.first; + auto& value = it.second; + if (key == "delimiter") { + options->parse_options.delimiter = value.data()[0]; + } else if (key == "quoting") { + options->parse_options.quoting = ParseBool(value); + } else if (key == "column_types") { + int64_t schema_address = std::stol(value); + ArrowSchema* cSchema = reinterpret_cast(schema_address); + ARROW_ASSIGN_OR_RAISE(auto schema, arrow::ImportSchema(cSchema)); + auto& column_types = options->convert_options.column_types; + for (auto field : schema->fields()) { + column_types[field->name()] = field->type(); + } + } else if (key == "strings_can_be_null") { + options->convert_options.strings_can_be_null = ParseBool(value); + } else { + return arrow::Status::Invalid("Config " + it.first + " is not supported."); + } + } + return options; +} +#endif + +arrow::Result> +GetFragmentScanOptions(jint file_format_id, + const std::unordered_map& configs) { + switch (file_format_id) { +#ifdef ARROW_CSV + case 3: + return ToCsvFragmentScanOptions(configs); +#endif + default: + return arrow::Status::Invalid("Illegal file format id: ", file_format_id); + } +} + +std::unordered_map ToStringMap(JNIEnv* env, + jobjectArray& str_array) { + int length = env->GetArrayLength(str_array); + std::unordered_map map; + for (int i = 0; i < length; i += 2) { + auto key = reinterpret_cast(env->GetObjectArrayElement(str_array, i)); + auto value = reinterpret_cast(env->GetObjectArrayElement(str_array, i + 1)); + map[JStringToCString(env, key)] = JStringToCString(env, value); + } + return map; +} + /* * Class: org_apache_arrow_dataset_jni_NativeMemoryPool * Method: getDefaultMemoryPool @@ -501,12 +560,13 @@ JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_closeDataset /* * Class: org_apache_arrow_dataset_jni_JniWrapper * Method: createScanner - * Signature: (J[Ljava/lang/String;Ljava/nio/ByteBuffer;Ljava/nio/ByteBuffer;JJ)J + * Signature: + * (J[Ljava/lang/String;Ljava/nio/ByteBuffer;Ljava/nio/ByteBuffer;JJ;[Ljava/lang/String;J)J */ JNIEXPORT jlong JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_createScanner( JNIEnv* env, jobject, jlong dataset_id, jobjectArray columns, - jobject substrait_projection, jobject substrait_filter, - jlong batch_size, jlong memory_pool_id) { + jobject substrait_projection, jobject substrait_filter, jlong batch_size, + jlong file_format_id, jobjectArray options, jlong memory_pool_id) { JNI_METHOD_START arrow::MemoryPool* pool = reinterpret_cast(memory_pool_id); if (pool == nullptr) { @@ -555,6 +615,12 @@ JNIEXPORT jlong JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_createScann } JniAssertOkOrThrow(scanner_builder->Filter(*filter_expr)); } + if (file_format_id != -1 && options != nullptr) { + std::unordered_map option_map = ToStringMap(env, options); + std::shared_ptr scan_options = + JniGetOrThrow(GetFragmentScanOptions(file_format_id, option_map)); + JniAssertOkOrThrow(scanner_builder->FragmentScanOptions(scan_options)); + } JniAssertOkOrThrow(scanner_builder->BatchSize(batch_size)); auto scanner = JniGetOrThrow(scanner_builder->Finish()); @@ -668,14 +734,29 @@ JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_ensureS3Fina /* * Class: org_apache_arrow_dataset_file_JniWrapper * Method: makeFileSystemDatasetFactory - * Signature: (Ljava/lang/String;II)J + * Signature: (Ljava/lang/String;II;Ljava/lang/String;Ljava/lang/String)J */ JNIEXPORT jlong JNICALL -Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory__Ljava_lang_String_2I( - JNIEnv* env, jobject, jstring uri, jint file_format_id) { +Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory( + JNIEnv* env, jobject, jstring uri, jint file_format_id, jobjectArray options) { JNI_METHOD_START std::shared_ptr file_format = JniGetOrThrow(GetFileFormat(file_format_id)); + if (options != nullptr) { + std::unordered_map option_map = ToStringMap(env, options); + std::shared_ptr scan_options = + JniGetOrThrow(GetFragmentScanOptions(file_format_id, option_map)); + file_format->default_fragment_scan_options = scan_options; +#ifdef ARROW_CSV + if (file_format_id == 3) { + std::shared_ptr csv_file_format = + std::dynamic_pointer_cast(file_format); + csv_file_format->parse_options = + std::dynamic_pointer_cast(scan_options) + ->parse_options; + } +#endif + } arrow::dataset::FileSystemFactoryOptions options; std::shared_ptr d = JniGetOrThrow(arrow::dataset::FileSystemDatasetFactory::Make( @@ -686,16 +767,31 @@ Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory__Ljav /* * Class: org_apache_arrow_dataset_file_JniWrapper - * Method: makeFileSystemDatasetFactory - * Signature: ([Ljava/lang/String;II)J + * Method: makeFileSystemDatasetFactoryWithFiles + * Signature: ([Ljava/lang/String;II;[Ljava/lang/String)J */ JNIEXPORT jlong JNICALL -Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory___3Ljava_lang_String_2I( - JNIEnv* env, jobject, jobjectArray uris, jint file_format_id) { +Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactoryWithFiles( + JNIEnv* env, jobject, jobjectArray uris, jint file_format_id, jobjectArray options) { JNI_METHOD_START std::shared_ptr file_format = JniGetOrThrow(GetFileFormat(file_format_id)); + if (options != nullptr) { + std::unordered_map option_map = ToStringMap(env, options); + std::shared_ptr scan_options = + JniGetOrThrow(GetFragmentScanOptions(file_format_id, option_map)); + file_format->default_fragment_scan_options = scan_options; +#ifdef ARROW_CSV + if (file_format_id == 3) { + std::shared_ptr csv_file_format = + std::dynamic_pointer_cast(file_format); + csv_file_format->parse_options = + std::dynamic_pointer_cast(scan_options) + ->parse_options; + } +#endif + } arrow::dataset::FileSystemFactoryOptions options; std::vector uri_vec = ToStringVector(env, uris); diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java index 36ac6288af6..fcf124a61f8 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java @@ -16,8 +16,10 @@ */ package org.apache.arrow.dataset.file; +import java.util.Optional; import org.apache.arrow.dataset.jni.NativeDatasetFactory; import org.apache.arrow.dataset.jni.NativeMemoryPool; +import org.apache.arrow.dataset.scanner.FragmentScanOptions; import org.apache.arrow.memory.BufferAllocator; /** Java binding of the C++ FileSystemDatasetFactory. */ @@ -25,19 +27,45 @@ public class FileSystemDatasetFactory extends NativeDatasetFactory { public FileSystemDatasetFactory( BufferAllocator allocator, NativeMemoryPool memoryPool, FileFormat format, String uri) { - super(allocator, memoryPool, createNative(format, uri)); + super(allocator, memoryPool, createNative(format, uri, Optional.empty())); + } + + public FileSystemDatasetFactory( + BufferAllocator allocator, + NativeMemoryPool memoryPool, + FileFormat format, + String uri, + Optional fragmentScanOptions) { + super(allocator, memoryPool, createNative(format, uri, fragmentScanOptions)); } public FileSystemDatasetFactory( BufferAllocator allocator, NativeMemoryPool memoryPool, FileFormat format, String[] uris) { - super(allocator, memoryPool, createNative(format, uris)); + super(allocator, memoryPool, createNative(format, uris, Optional.empty())); + } + + public FileSystemDatasetFactory( + BufferAllocator allocator, + NativeMemoryPool memoryPool, + FileFormat format, + String[] uris, + Optional fragmentScanOptions) { + super(allocator, memoryPool, createNative(format, uris, fragmentScanOptions)); } - private static long createNative(FileFormat format, String uri) { - return JniWrapper.get().makeFileSystemDatasetFactory(uri, format.id()); + private static long createNative( + FileFormat format, String uri, Optional fragmentScanOptions) { + return JniWrapper.get() + .makeFileSystemDatasetFactory( + uri, format.id(), fragmentScanOptions.map(FragmentScanOptions::serialize).orElse(null)); } - private static long createNative(FileFormat format, String[] uris) { - return JniWrapper.get().makeFileSystemDatasetFactory(uris, format.id()); + private static long createNative( + FileFormat format, String[] uris, Optional fragmentScanOptions) { + return JniWrapper.get() + .makeFileSystemDatasetFactoryWithFiles( + uris, + format.id(), + fragmentScanOptions.map(FragmentScanOptions::serialize).orElse(null)); } } diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java index dfac293ccb5..d2f842f99e5 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java @@ -37,22 +37,26 @@ private JniWrapper() {} * intermediate shared_ptr of the factory instance. * * @param uri file uri to read, either a file or a directory - * @param fileFormat file format ID + * @param fileFormat file format ID. + * @param serializedFragmentScanOptions serialized FragmentScanOptions. * @return the native pointer of the arrow::dataset::FileSystemDatasetFactory instance. * @see FileFormat */ - public native long makeFileSystemDatasetFactory(String uri, int fileFormat); + public native long makeFileSystemDatasetFactory( + String uri, int fileFormat, String[] serializedFragmentScanOptions); /** * Create FileSystemDatasetFactory and return its native pointer. The pointer is pointing to a * intermediate shared_ptr of the factory instance. * * @param uris List of file uris to read, each path pointing to an individual file - * @param fileFormat file format ID + * @param fileFormat file format ID. + * @param serializedFragmentScanOptions serialized FragmentScanOptions. * @return the native pointer of the arrow::dataset::FileSystemDatasetFactory instance. * @see FileFormat */ - public native long makeFileSystemDatasetFactory(String[] uris, int fileFormat); + public native long makeFileSystemDatasetFactoryWithFiles( + String[] uris, int fileFormat, String[] serializedFragmentScanOptions); /** * Write the content in a {@link org.apache.arrow.c.ArrowArrayStream} into files. This internally diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java index b5aa3d918ac..e0d7dddee26 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java @@ -71,6 +71,8 @@ private JniWrapper() {} * @param substraitProjection substrait extended expression to evaluate for project new columns * @param substraitFilter substrait extended expression to evaluate for apply filter * @param batchSize batch size of scanned record batches. + * @param fileFormat file format ID. + * @param serializedFragmentScanOptions serialized FragmentScanOptions. * @param memoryPool identifier of memory pool used in the native scanner. * @return the native pointer of the arrow::dataset::Scanner instance. */ @@ -80,6 +82,8 @@ public native long createScanner( ByteBuffer substraitProjection, ByteBuffer substraitFilter, long batchSize, + long fileFormat, + String[] serializedFragmentScanOptions, long memoryPool); /** diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java index 83a9ff1f322..2edb47ca1db 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java @@ -16,6 +16,7 @@ */ package org.apache.arrow.dataset.jni; +import org.apache.arrow.dataset.scanner.FragmentScanOptions; import org.apache.arrow.dataset.scanner.ScanOptions; import org.apache.arrow.dataset.source.Dataset; @@ -37,7 +38,13 @@ public synchronized NativeScanner newScan(ScanOptions options) { if (closed) { throw new NativeInstanceReleasedException(); } - + int fileFormat = -1; + String[] serialized = null; + if (options.getFragmentScanOptions().isPresent()) { + FragmentScanOptions fragmentScanOptions = options.getFragmentScanOptions().get(); + fileFormat = fragmentScanOptions.fileFormatId(); + serialized = fragmentScanOptions.serialize(); + } long scannerId = JniWrapper.get() .createScanner( @@ -46,6 +53,8 @@ public synchronized NativeScanner newScan(ScanOptions options) { options.getSubstraitProjection().orElse(null), options.getSubstraitFilter().orElse(null), options.getBatchSize(), + fileFormat, + serialized, context.getMemoryPool().getNativeInstanceId()); return new NativeScanner(context, scannerId); diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/FragmentScanOptions.java b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/FragmentScanOptions.java new file mode 100644 index 00000000000..e5d05396316 --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/FragmentScanOptions.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.dataset.scanner; + +import java.util.Map; + +public interface FragmentScanOptions { + String typeName(); + + int fileFormatId(); + + String[] serialize(); + + /** + * Serialize the map to string array. + * + * @param config config map + * @return string array for serialization + */ + default String[] serializeMap(Map config) { + if (config.isEmpty()) { + return null; + } + String[] configs = new String[config.size() * 2]; + int i = 0; + for (Map.Entry entry : config.entrySet()) { + configs[i++] = entry.getKey(); + configs[i++] = entry.getValue(); + } + return configs; + } +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java index 837016ad1e9..68fc3943b3e 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/ScanOptions.java @@ -27,6 +27,8 @@ public class ScanOptions { private final Optional substraitProjection; private final Optional substraitFilter; + private final Optional fragmentScanOptions; + /** * Constructor. * @@ -65,6 +67,7 @@ public ScanOptions(long batchSize, Optional columns) { this.columns = columns; this.substraitProjection = Optional.empty(); this.substraitFilter = Optional.empty(); + this.fragmentScanOptions = Optional.empty(); } public ScanOptions(long batchSize) { @@ -87,12 +90,17 @@ public Optional getSubstraitFilter() { return substraitFilter; } + public Optional getFragmentScanOptions() { + return fragmentScanOptions; + } + /** Builder for Options used during scanning. */ public static class Builder { private final long batchSize; private Optional columns; private ByteBuffer substraitProjection; private ByteBuffer substraitFilter; + private FragmentScanOptions fragmentScanOptions; /** * Constructor. @@ -140,6 +148,18 @@ public Builder substraitFilter(ByteBuffer substraitFilter) { return this; } + /** + * Set the FragmentScanOptions. + * + * @param fragmentScanOptions fragment scan options + * @return the ScanOptions configured. + */ + public Builder fragmentScanOptions(FragmentScanOptions fragmentScanOptions) { + Preconditions.checkNotNull(fragmentScanOptions); + this.fragmentScanOptions = fragmentScanOptions; + return this; + } + public ScanOptions build() { return new ScanOptions(this); } @@ -150,5 +170,6 @@ private ScanOptions(Builder builder) { columns = builder.columns; substraitProjection = Optional.ofNullable(builder.substraitProjection); substraitFilter = Optional.ofNullable(builder.substraitFilter); + fragmentScanOptions = Optional.ofNullable(builder.fragmentScanOptions); } } diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvConvertOptions.java b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvConvertOptions.java new file mode 100644 index 00000000000..15e257896b8 --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvConvertOptions.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.dataset.scanner.csv; + +import java.util.Map; +import java.util.Optional; +import org.apache.arrow.c.ArrowSchema; + +public class CsvConvertOptions { + + private final Map configs; + + private Optional cSchema = Optional.empty(); + + public CsvConvertOptions(Map configs) { + this.configs = configs; + } + + public Optional getArrowSchema() { + return cSchema; + } + + public Map getConfigs() { + return configs; + } + + public void set(String key, String value) { + configs.put(key, value); + } + + public void setArrowSchema(ArrowSchema cSchema) { + this.cSchema = Optional.of(cSchema); + } +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvFragmentScanOptions.java b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvFragmentScanOptions.java new file mode 100644 index 00000000000..c6005b2a366 --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvFragmentScanOptions.java @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.dataset.scanner.csv; + +import java.io.Serializable; +import java.util.Locale; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.apache.arrow.dataset.file.FileFormat; +import org.apache.arrow.dataset.scanner.FragmentScanOptions; + +public class CsvFragmentScanOptions implements Serializable, FragmentScanOptions { + private final CsvConvertOptions convertOptions; + private final Map readOptions; + private final Map parseOptions; + + /** + * csv scan options, map to CPP struct CsvFragmentScanOptions. + * + * @param convertOptions same struct in CPP + * @param readOptions same struct in CPP + * @param parseOptions same struct in CPP + */ + public CsvFragmentScanOptions( + CsvConvertOptions convertOptions, + Map readOptions, + Map parseOptions) { + this.convertOptions = convertOptions; + this.readOptions = readOptions; + this.parseOptions = parseOptions; + } + + public String typeName() { + return FileFormat.CSV.name().toLowerCase(Locale.ROOT); + } + + /** + * File format id. + * + * @return id + */ + public int fileFormatId() { + return FileFormat.CSV.id(); + } + + /** + * Serialize this class to ByteBuffer and then called by jni call. + * + * @return DirectByteBuffer + */ + public String[] serialize() { + Map options = + Stream.concat( + Stream.concat(readOptions.entrySet().stream(), parseOptions.entrySet().stream()), + convertOptions.getConfigs().entrySet().stream()) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + if (convertOptions.getArrowSchema().isPresent()) { + options.put( + "column_types", Long.toString(convertOptions.getArrowSchema().get().memoryAddress())); + } + return serializeMap(options); + } + + public static CsvFragmentScanOptions deserialize(String serialized) { + throw new UnsupportedOperationException("Not implemented now"); + } + + public CsvConvertOptions getConvertOptions() { + return convertOptions; + } + + public Map getReadOptions() { + return readOptions; + } + + public Map getParseOptions() { + return parseOptions; + } +} diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/TestFragmentScanOptions.java b/java/dataset/src/test/java/org/apache/arrow/dataset/TestFragmentScanOptions.java new file mode 100644 index 00000000000..8da80a6ff34 --- /dev/null +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/TestFragmentScanOptions.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.dataset; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.google.common.collect.ImmutableMap; +import java.util.Arrays; +import java.util.Optional; +import org.apache.arrow.c.ArrowSchema; +import org.apache.arrow.c.CDataDictionaryProvider; +import org.apache.arrow.c.Data; +import org.apache.arrow.dataset.file.FileFormat; +import org.apache.arrow.dataset.file.FileSystemDatasetFactory; +import org.apache.arrow.dataset.jni.NativeMemoryPool; +import org.apache.arrow.dataset.scanner.ScanOptions; +import org.apache.arrow.dataset.scanner.Scanner; +import org.apache.arrow.dataset.scanner.csv.CsvConvertOptions; +import org.apache.arrow.dataset.scanner.csv.CsvFragmentScanOptions; +import org.apache.arrow.dataset.source.Dataset; +import org.apache.arrow.dataset.source.DatasetFactory; +import org.apache.arrow.memory.BufferAllocator; +import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.ipc.ArrowReader; +import org.apache.arrow.vector.types.pojo.ArrowType; +import org.apache.arrow.vector.types.pojo.Field; +import org.apache.arrow.vector.types.pojo.Schema; +import org.junit.jupiter.api.Test; + +public class TestFragmentScanOptions { + + @Test + public void testCsvConvertOptions() throws Exception { + final Schema schema = + new Schema( + Arrays.asList( + Field.nullable("Id", new ArrowType.Int(32, true)), + Field.nullable("Name", new ArrowType.Utf8()), + Field.nullable("Language", new ArrowType.Utf8())), + null); + String path = "file://" + getClass().getResource("/").getPath() + "/data/student.csv"; + BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + try (ArrowSchema cSchema = ArrowSchema.allocateNew(allocator); + CDataDictionaryProvider provider = new CDataDictionaryProvider()) { + Data.exportSchema(allocator, schema, provider, cSchema); + CsvConvertOptions convertOptions = new CsvConvertOptions(ImmutableMap.of("delimiter", ";")); + convertOptions.setArrowSchema(cSchema); + CsvFragmentScanOptions fragmentScanOptions = + new CsvFragmentScanOptions(convertOptions, ImmutableMap.of(), ImmutableMap.of()); + ScanOptions options = + new ScanOptions.Builder(/*batchSize*/ 32768) + .columns(Optional.empty()) + .fragmentScanOptions(fragmentScanOptions) + .build(); + try (DatasetFactory datasetFactory = + new FileSystemDatasetFactory( + allocator, NativeMemoryPool.getDefault(), FileFormat.CSV, path); + Dataset dataset = datasetFactory.finish(); + Scanner scanner = dataset.newScan(options); + ArrowReader reader = scanner.scanBatches()) { + assertEquals(schema.getFields(), reader.getVectorSchemaRoot().getSchema().getFields()); + int rowCount = 0; + while (reader.loadNextBatch()) { + assertEquals("[1, 2, 3]", reader.getVectorSchemaRoot().getVector("Id").toString()); + rowCount += reader.getVectorSchemaRoot().getRowCount(); + } + assertEquals(3, rowCount); + } + } + } +} diff --git a/java/dataset/src/test/resources/data/student.csv b/java/dataset/src/test/resources/data/student.csv new file mode 100644 index 00000000000..32919460921 --- /dev/null +++ b/java/dataset/src/test/resources/data/student.csv @@ -0,0 +1,4 @@ +Id;Name;Language +1;Juno;Java +2;Peter;Python +3;Celin;C++ From 826d44247a7e8ad416d33329a62b2fb64aa2f3f0 Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Tue, 2 Jul 2024 11:21:18 +0000 Subject: [PATCH 2/6] address comments --- java/dataset/src/main/cpp/jni_wrapper.cc | 5 ++- .../arrow/dataset/jni/NativeDataset.java | 6 +-- .../dataset/scanner/FragmentScanOptions.java | 26 ++---------- .../apache/arrow/dataset/scanner/MapUtil.java | 41 +++++++++++++++++++ .../scanner/csv/CsvFragmentScanOptions.java | 37 ++++++++--------- .../dataset/TestFragmentScanOptions.java | 8 +++- 6 files changed, 74 insertions(+), 49 deletions(-) create mode 100644 java/dataset/src/main/java/org/apache/arrow/dataset/scanner/MapUtil.java diff --git a/java/dataset/src/main/cpp/jni_wrapper.cc b/java/dataset/src/main/cpp/jni_wrapper.cc index ebb0969e540..7f423b6cd1d 100644 --- a/java/dataset/src/main/cpp/jni_wrapper.cc +++ b/java/dataset/src/main/cpp/jni_wrapper.cc @@ -381,8 +381,8 @@ ToCsvFragmentScanOptions(const std::unordered_map& con options->parse_options.quoting = ParseBool(value); } else if (key == "column_types") { int64_t schema_address = std::stol(value); - ArrowSchema* cSchema = reinterpret_cast(schema_address); - ARROW_ASSIGN_OR_RAISE(auto schema, arrow::ImportSchema(cSchema)); + ArrowSchema* c_schema = reinterpret_cast(schema_address); + ARROW_ASSIGN_OR_RAISE(auto schema, arrow::ImportSchema(c_schema)); auto& column_types = options->convert_options.column_types; for (auto field : schema->fields()) { column_types[field->name()] = field->type(); @@ -414,6 +414,7 @@ std::unordered_map ToStringMap(JNIEnv* env, jobjectArray& str_array) { int length = env->GetArrayLength(str_array); std::unordered_map map; + map.reserve(length / 2); for (int i = 0; i < length; i += 2) { auto key = reinterpret_cast(env->GetObjectArrayElement(str_array, i)); auto value = reinterpret_cast(env->GetObjectArrayElement(str_array, i + 1)); diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java index 2edb47ca1db..8f8cdc49d48 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/NativeDataset.java @@ -38,11 +38,11 @@ public synchronized NativeScanner newScan(ScanOptions options) { if (closed) { throw new NativeInstanceReleasedException(); } - int fileFormat = -1; + int fileFormatId = -1; String[] serialized = null; if (options.getFragmentScanOptions().isPresent()) { FragmentScanOptions fragmentScanOptions = options.getFragmentScanOptions().get(); - fileFormat = fragmentScanOptions.fileFormatId(); + fileFormatId = fragmentScanOptions.fileFormat().id(); serialized = fragmentScanOptions.serialize(); } long scannerId = @@ -53,7 +53,7 @@ public synchronized NativeScanner newScan(ScanOptions options) { options.getSubstraitProjection().orElse(null), options.getSubstraitFilter().orElse(null), options.getBatchSize(), - fileFormat, + fileFormatId, serialized, context.getMemoryPool().getNativeInstanceId()); diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/FragmentScanOptions.java b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/FragmentScanOptions.java index e5d05396316..d48d0bd2b76 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/FragmentScanOptions.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/FragmentScanOptions.java @@ -16,31 +16,11 @@ */ package org.apache.arrow.dataset.scanner; -import java.util.Map; +import org.apache.arrow.dataset.file.FileFormat; +/** The file fragment scan options interface. It is used to transfer to JNI call. */ public interface FragmentScanOptions { - String typeName(); - - int fileFormatId(); + FileFormat fileFormat(); String[] serialize(); - - /** - * Serialize the map to string array. - * - * @param config config map - * @return string array for serialization - */ - default String[] serializeMap(Map config) { - if (config.isEmpty()) { - return null; - } - String[] configs = new String[config.size() * 2]; - int i = 0; - for (Map.Entry entry : config.entrySet()) { - configs[i++] = entry.getKey(); - configs[i++] = entry.getValue(); - } - return configs; - } } diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/MapUtil.java b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/MapUtil.java new file mode 100644 index 00000000000..792ea8e9454 --- /dev/null +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/MapUtil.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.arrow.dataset.scanner; + +import java.util.Map; + +/** The utility class for Map. */ +public class MapUtil { + /** + * Convert the map to string array as JNI bridge. + * + * @param config config map + * @return string array for serialization + */ + public static String[] convertMapToStringArray(Map config) { + if (config.isEmpty()) { + return null; + } + String[] configs = new String[config.size() * 2]; + int i = 0; + for (Map.Entry entry : config.entrySet()) { + configs[i++] = entry.getKey(); + configs[i++] = entry.getValue(); + } + return configs; + } +} diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvFragmentScanOptions.java b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvFragmentScanOptions.java index c6005b2a366..e76fe140857 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvFragmentScanOptions.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvFragmentScanOptions.java @@ -16,25 +16,28 @@ */ package org.apache.arrow.dataset.scanner.csv; -import java.io.Serializable; -import java.util.Locale; import java.util.Map; import java.util.stream.Collectors; import java.util.stream.Stream; import org.apache.arrow.dataset.file.FileFormat; import org.apache.arrow.dataset.scanner.FragmentScanOptions; +import org.apache.arrow.dataset.scanner.MapUtil; -public class CsvFragmentScanOptions implements Serializable, FragmentScanOptions { +public class CsvFragmentScanOptions implements FragmentScanOptions { private final CsvConvertOptions convertOptions; private final Map readOptions; private final Map parseOptions; /** - * csv scan options, map to CPP struct CsvFragmentScanOptions. + * CSV scan options, map to CPP struct CsvFragmentScanOptions. The key in config map is the field + * name of mapping cpp struct * - * @param convertOptions same struct in CPP - * @param readOptions same struct in CPP - * @param parseOptions same struct in CPP + * @param convertOptions similar to CsvFragmentScanOptions#convert_options in CPP, the ArrowSchema + * represents column_types, convert data option such as null value recognition. + * @param readOptions similar to CsvFragmentScanOptions#read_options in CPP, specify how to read + * the file such as block_size + * @param parseOptions similar to CsvFragmentScanOptions#parse_options in CPP, parse file option + * such as delimiter */ public CsvFragmentScanOptions( CsvConvertOptions convertOptions, @@ -45,24 +48,22 @@ public CsvFragmentScanOptions( this.parseOptions = parseOptions; } - public String typeName() { - return FileFormat.CSV.name().toLowerCase(Locale.ROOT); - } - /** * File format id. * * @return id */ - public int fileFormatId() { - return FileFormat.CSV.id(); + @Override + public FileFormat fileFormat() { + return FileFormat.CSV; } /** - * Serialize this class to ByteBuffer and then called by jni call. + * Serialize this class to string array and then called by JNI call. * - * @return DirectByteBuffer + * @return string array as Map JNI bridge format. */ + @Override public String[] serialize() { Map options = Stream.concat( @@ -74,11 +75,7 @@ public String[] serialize() { options.put( "column_types", Long.toString(convertOptions.getArrowSchema().get().memoryAddress())); } - return serializeMap(options); - } - - public static CsvFragmentScanOptions deserialize(String serialized) { - throw new UnsupportedOperationException("Not implemented now"); + return MapUtil.convertMapToStringArray(options); } public CsvConvertOptions getConvertOptions() { diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/TestFragmentScanOptions.java b/java/dataset/src/test/java/org/apache/arrow/dataset/TestFragmentScanOptions.java index 8da80a6ff34..0c694864321 100644 --- a/java/dataset/src/test/java/org/apache/arrow/dataset/TestFragmentScanOptions.java +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/TestFragmentScanOptions.java @@ -16,6 +16,7 @@ */ package org.apache.arrow.dataset; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; import com.google.common.collect.ImmutableMap; @@ -35,10 +36,12 @@ import org.apache.arrow.dataset.source.DatasetFactory; import org.apache.arrow.memory.BufferAllocator; import org.apache.arrow.memory.RootAllocator; +import org.apache.arrow.vector.ValueIterableVector; import org.apache.arrow.vector.ipc.ArrowReader; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.Field; import org.apache.arrow.vector.types.pojo.Schema; +import org.hamcrest.collection.IsIterableContainingInOrder; import org.junit.jupiter.api.Test; public class TestFragmentScanOptions { @@ -72,10 +75,13 @@ public void testCsvConvertOptions() throws Exception { Dataset dataset = datasetFactory.finish(); Scanner scanner = dataset.newScan(options); ArrowReader reader = scanner.scanBatches()) { + assertEquals(schema.getFields(), reader.getVectorSchemaRoot().getSchema().getFields()); int rowCount = 0; while (reader.loadNextBatch()) { - assertEquals("[1, 2, 3]", reader.getVectorSchemaRoot().getVector("Id").toString()); + final ValueIterableVector idVector = + (ValueIterableVector) reader.getVectorSchemaRoot().getVector("Id"); + assertThat(idVector.getValueIterable(), IsIterableContainingInOrder.contains(1, 2, 3)); rowCount += reader.getVectorSchemaRoot().getRowCount(); } assertEquals(3, rowCount); From e96a08730a99f17348e43286dd9c671a09e3f7af Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Tue, 2 Jul 2024 11:32:32 +0000 Subject: [PATCH 3/6] minor --- java/dataset/src/main/cpp/jni_wrapper.cc | 4 ++-- .../main/java/org/apache/arrow/dataset/jni/JniWrapper.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/java/dataset/src/main/cpp/jni_wrapper.cc b/java/dataset/src/main/cpp/jni_wrapper.cc index 7f423b6cd1d..1f3e5fb9d4b 100644 --- a/java/dataset/src/main/cpp/jni_wrapper.cc +++ b/java/dataset/src/main/cpp/jni_wrapper.cc @@ -562,12 +562,12 @@ JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_closeDataset * Class: org_apache_arrow_dataset_jni_JniWrapper * Method: createScanner * Signature: - * (J[Ljava/lang/String;Ljava/nio/ByteBuffer;Ljava/nio/ByteBuffer;JJ;[Ljava/lang/String;J)J + * (J[Ljava/lang/String;Ljava/nio/ByteBuffer;Ljava/nio/ByteBuffer;JI;[Ljava/lang/String;J)J */ JNIEXPORT jlong JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_createScanner( JNIEnv* env, jobject, jlong dataset_id, jobjectArray columns, jobject substrait_projection, jobject substrait_filter, jlong batch_size, - jlong file_format_id, jobjectArray options, jlong memory_pool_id) { + jint file_format_id, jobjectArray options, jlong memory_pool_id) { JNI_METHOD_START arrow::MemoryPool* pool = reinterpret_cast(memory_pool_id); if (pool == nullptr) { diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java index e0d7dddee26..6637c113d9e 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java @@ -82,7 +82,7 @@ public native long createScanner( ByteBuffer substraitProjection, ByteBuffer substraitFilter, long batchSize, - long fileFormat, + int fileFormat, String[] serializedFragmentScanOptions, long memoryPool); From 78a27db99e967c2ddce54d9a302abdac20691c60 Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Wed, 3 Jul 2024 09:14:48 +0000 Subject: [PATCH 4/6] minor --- java/dataset/src/main/cpp/jni_wrapper.cc | 2 ++ 1 file changed, 2 insertions(+) diff --git a/java/dataset/src/main/cpp/jni_wrapper.cc b/java/dataset/src/main/cpp/jni_wrapper.cc index 1f3e5fb9d4b..985d09e50cf 100644 --- a/java/dataset/src/main/cpp/jni_wrapper.cc +++ b/java/dataset/src/main/cpp/jni_wrapper.cc @@ -25,7 +25,9 @@ #include "arrow/c/helpers.h" #include "arrow/dataset/api.h" #include "arrow/dataset/file_base.h" +#ifdef ARROW_CSV #include "arrow/dataset/file_csv.h" +#endif #include "arrow/filesystem/api.h" #include "arrow/filesystem/path_util.h" #include "arrow/engine/substrait/util.h" From a3f817af42076a8597fa5a4bab9c5e3017783947 Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Mon, 29 Jul 2024 08:55:28 +0000 Subject: [PATCH 5/6] address comments --- java/dataset/src/main/cpp/jni_wrapper.cc | 4 +- .../scanner/csv/CsvFragmentScanOptions.java | 9 ++- .../dataset/{scanner => utils}/MapUtil.java | 4 +- .../dataset/TestFragmentScanOptions.java | 77 +++++++++++++++++++ 4 files changed, 86 insertions(+), 8 deletions(-) rename java/dataset/src/main/java/org/apache/arrow/dataset/{scanner => utils}/MapUtil.java (95%) diff --git a/java/dataset/src/main/cpp/jni_wrapper.cc b/java/dataset/src/main/cpp/jni_wrapper.cc index 985d09e50cf..c835b3518c6 100644 --- a/java/dataset/src/main/cpp/jni_wrapper.cc +++ b/java/dataset/src/main/cpp/jni_wrapper.cc @@ -374,9 +374,7 @@ arrow::Result> ToCsvFragmentScanOptions(const std::unordered_map& configs) { std::shared_ptr options = std::make_shared(); - for (auto const& it : configs) { - auto& key = it.first; - auto& value = it.second; + for (auto const& [key, value] : configs) { if (key == "delimiter") { options->parse_options.delimiter = value.data()[0]; } else if (key == "quoting") { diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvFragmentScanOptions.java b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvFragmentScanOptions.java index e76fe140857..39271b5f063 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvFragmentScanOptions.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/csv/CsvFragmentScanOptions.java @@ -21,7 +21,7 @@ import java.util.stream.Stream; import org.apache.arrow.dataset.file.FileFormat; import org.apache.arrow.dataset.scanner.FragmentScanOptions; -import org.apache.arrow.dataset.scanner.MapUtil; +import org.apache.arrow.dataset.utils.MapUtil; public class CsvFragmentScanOptions implements FragmentScanOptions { private final CsvConvertOptions convertOptions; @@ -49,9 +49,9 @@ public CsvFragmentScanOptions( } /** - * File format id. + * File format. * - * @return id + * @return file format. */ @Override public FileFormat fileFormat() { @@ -59,7 +59,8 @@ public FileFormat fileFormat() { } /** - * Serialize this class to string array and then called by JNI call. + * This is an internal function to invoke by serializer. Serialize this class to string array and + * then called by JNI call. * * @return string array as Map JNI bridge format. */ diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/MapUtil.java b/java/dataset/src/main/java/org/apache/arrow/dataset/utils/MapUtil.java similarity index 95% rename from java/dataset/src/main/java/org/apache/arrow/dataset/scanner/MapUtil.java rename to java/dataset/src/main/java/org/apache/arrow/dataset/utils/MapUtil.java index 792ea8e9454..4df6cf1e0e0 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/scanner/MapUtil.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/utils/MapUtil.java @@ -14,12 +14,14 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.arrow.dataset.scanner; +package org.apache.arrow.dataset.utils; import java.util.Map; /** The utility class for Map. */ public class MapUtil { + private MapUtil() {} + /** * Convert the map to string array as JNI bridge. * diff --git a/java/dataset/src/test/java/org/apache/arrow/dataset/TestFragmentScanOptions.java b/java/dataset/src/test/java/org/apache/arrow/dataset/TestFragmentScanOptions.java index 0c694864321..9787e8308e7 100644 --- a/java/dataset/src/test/java/org/apache/arrow/dataset/TestFragmentScanOptions.java +++ b/java/dataset/src/test/java/org/apache/arrow/dataset/TestFragmentScanOptions.java @@ -21,6 +21,7 @@ import com.google.common.collect.ImmutableMap; import java.util.Arrays; +import java.util.Collections; import java.util.Optional; import org.apache.arrow.c.ArrowSchema; import org.apache.arrow.c.CDataDictionaryProvider; @@ -88,4 +89,80 @@ public void testCsvConvertOptions() throws Exception { } } } + + @Test + public void testCsvConvertOptionsDelimiterNotSet() throws Exception { + final Schema schema = + new Schema( + Arrays.asList( + Field.nullable("Id", new ArrowType.Int(32, true)), + Field.nullable("Name", new ArrowType.Utf8()), + Field.nullable("Language", new ArrowType.Utf8())), + null); + String path = "file://" + getClass().getResource("/").getPath() + "/data/student.csv"; + BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + try (ArrowSchema cSchema = ArrowSchema.allocateNew(allocator); + CDataDictionaryProvider provider = new CDataDictionaryProvider()) { + Data.exportSchema(allocator, schema, provider, cSchema); + CsvConvertOptions convertOptions = new CsvConvertOptions(ImmutableMap.of()); + convertOptions.setArrowSchema(cSchema); + CsvFragmentScanOptions fragmentScanOptions = + new CsvFragmentScanOptions(convertOptions, ImmutableMap.of(), ImmutableMap.of()); + ScanOptions options = + new ScanOptions.Builder(/*batchSize*/ 32768) + .columns(Optional.empty()) + .fragmentScanOptions(fragmentScanOptions) + .build(); + try (DatasetFactory datasetFactory = + new FileSystemDatasetFactory( + allocator, NativeMemoryPool.getDefault(), FileFormat.CSV, path); + Dataset dataset = datasetFactory.finish(); + Scanner scanner = dataset.newScan(options); + ArrowReader reader = scanner.scanBatches()) { + + assertEquals(schema.getFields(), reader.getVectorSchemaRoot().getSchema().getFields()); + int rowCount = 0; + while (reader.loadNextBatch()) { + final ValueIterableVector idVector = + (ValueIterableVector) reader.getVectorSchemaRoot().getVector("Id"); + assertThat(idVector.getValueIterable(), IsIterableContainingInOrder.contains(1, 2, 3)); + rowCount += reader.getVectorSchemaRoot().getRowCount(); + } + assertEquals(3, rowCount); + } + } + } + + @Test + public void testCsvConvertOptionsNoOption() throws Exception { + final Schema schema = + new Schema( + Collections.singletonList(Field.nullable("Id;Name;Language", new ArrowType.Utf8())), + null); + String path = "file://" + getClass().getResource("/").getPath() + "/data/student.csv"; + BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE); + ScanOptions options = + new ScanOptions.Builder(/*batchSize*/ 32768).columns(Optional.empty()).build(); + try (DatasetFactory datasetFactory = + new FileSystemDatasetFactory( + allocator, NativeMemoryPool.getDefault(), FileFormat.CSV, path); + Dataset dataset = datasetFactory.finish(); + Scanner scanner = dataset.newScan(options); + ArrowReader reader = scanner.scanBatches()) { + + assertEquals(schema.getFields(), reader.getVectorSchemaRoot().getSchema().getFields()); + int rowCount = 0; + while (reader.loadNextBatch()) { + final ValueIterableVector idVector = + (ValueIterableVector) + reader.getVectorSchemaRoot().getVector("Id;Name;Language"); + assertThat( + idVector.getValueIterable(), + IsIterableContainingInOrder.contains( + "1;Juno;Java\n" + "2;Peter;Python\n" + "3;Celin;C++")); + rowCount += reader.getVectorSchemaRoot().getRowCount(); + } + assertEquals(3, rowCount); + } + } } From 98a3beb7da91166fc362f7c32b984049f362e581 Mon Sep 17 00:00:00 2001 From: Chengcheng Jin Date: Mon, 29 Jul 2024 10:10:25 +0000 Subject: [PATCH 6/6] fix compile --- java/dataset/src/main/cpp/jni_wrapper.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java/dataset/src/main/cpp/jni_wrapper.cc b/java/dataset/src/main/cpp/jni_wrapper.cc index c835b3518c6..f324f87d6c3 100644 --- a/java/dataset/src/main/cpp/jni_wrapper.cc +++ b/java/dataset/src/main/cpp/jni_wrapper.cc @@ -390,7 +390,7 @@ ToCsvFragmentScanOptions(const std::unordered_map& con } else if (key == "strings_can_be_null") { options->convert_options.strings_can_be_null = ParseBool(value); } else { - return arrow::Status::Invalid("Config " + it.first + " is not supported."); + return arrow::Status::Invalid("Config " + key + " is not supported."); } } return options;