Skip to content
6 changes: 6 additions & 0 deletions java/dataset/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,12 @@
<version>2.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.4</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<resources>
Expand Down
64 changes: 63 additions & 1 deletion java/dataset/src/main/cpp/jni_wrapper.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

#include "arrow/array.h"
#include "arrow/array/concatenate.h"
#include "arrow/c/bridge.h"
#include "arrow/c/helpers.h"
#include "arrow/dataset/api.h"
#include "arrow/dataset/file_base.h"
#include "arrow/filesystem/localfs.h"
Expand Down Expand Up @@ -176,6 +178,21 @@ class DisposableScannerAdaptor {
}
};

arrow::Result<std::shared_ptr<arrow::Schema>> SchemaFromColumnNames(
const std::shared_ptr<arrow::Schema>& input,
const std::vector<std::string>& column_names) {
std::vector<std::shared_ptr<arrow::Field>> columns;
for (arrow::FieldRef ref : column_names) {
auto maybe_field = ref.GetOne(*input);
if (maybe_field.ok()) {
columns.push_back(std::move(maybe_field).ValueOrDie());
} else {
return arrow::Status::Invalid("Partition column '", ref.ToString(), "' is not in dataset schema");
}
}

return schema(std::move(columns))->WithMetadata(input->metadata());
}
} // namespace

using arrow::dataset::jni::CreateGlobalClassReference;
Expand Down Expand Up @@ -229,7 +246,6 @@ jint JNI_OnLoad(JavaVM* vm, void* reserved) {
GetMethodID(env, java_reservation_listener_class, "unreserve", "(J)V"));

default_memory_pool_id = reinterpret_cast<jlong>(arrow::default_memory_pool());

return JNI_VERSION;
JNI_METHOD_END(JNI_ERR)
}
Expand Down Expand Up @@ -516,3 +532,49 @@ Java_org_apache_arrow_dataset_file_JniWrapper_makeFileSystemDatasetFactory(
return CreateNativeRef(d);
JNI_METHOD_END(-1L)
}

/*
* Class: org_apache_arrow_dataset_file_JniWrapper
* Method: writeFromScannerToFile
* Signature:
* (JJJLjava/lang/String;[Ljava/lang/String;ILjava/lang/String;)V
*/
JNIEXPORT void JNICALL
Java_org_apache_arrow_dataset_file_JniWrapper_writeFromScannerToFile(
JNIEnv* env, jobject, jlong c_arrow_array_stream_address,
jlong file_format_id, jstring uri, jobjectArray partition_columns,
jint max_partitions, jstring base_name_template) {
JNI_METHOD_START
JavaVM* vm;
if (env->GetJavaVM(&vm) != JNI_OK) {
JniThrow("Unable to get JavaVM instance");
}

auto* arrow_stream = reinterpret_cast<ArrowArrayStream*>(c_arrow_array_stream_address);
std::shared_ptr<arrow::RecordBatchReader> reader =
JniGetOrThrow(arrow::ImportRecordBatchReader(arrow_stream));
std::shared_ptr<arrow::dataset::ScannerBuilder> scanner_builder =
arrow::dataset::ScannerBuilder::FromRecordBatchReader(reader);
JniAssertOkOrThrow(scanner_builder->Pool(arrow::default_memory_pool()));
auto scanner = JniGetOrThrow(scanner_builder->Finish());

std::shared_ptr<arrow::Schema> schema = reader->schema();

std::shared_ptr<arrow::dataset::FileFormat> file_format =
JniGetOrThrow(GetFileFormat(file_format_id));
arrow::dataset::FileSystemDatasetWriteOptions options;
std::string output_path;
auto filesystem = JniGetOrThrow(
arrow::fs::FileSystemFromUri(JStringToCString(env, uri), &output_path));
std::vector<std::string> partition_column_vector =
ToStringVector(env, partition_columns);
options.file_write_options = file_format->DefaultWriteOptions();
options.filesystem = filesystem;
options.base_dir = output_path;
options.basename_template = JStringToCString(env, base_name_template);
options.partitioning = std::make_shared<arrow::dataset::HivePartitioning>(
SchemaFromColumnNames(schema, partition_column_vector).ValueOrDie());
options.max_partitions = max_partitions;
JniAssertOkOrThrow(arrow::dataset::FileSystemDataset::Write(options, scanner));
JNI_METHOD_END()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.dataset.file;

import org.apache.arrow.c.ArrowArrayStream;
import org.apache.arrow.c.Data;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.ipc.ArrowReader;

/**
* JNI-based utility to write datasets into files. It internally depends on C++ static method
* FileSystemDataset::Write.
*/
public class DatasetFileWriter {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's add some unit tests.

You can refer to the previous PR https://github.com/apache/arrow/pull/10201/files


/**
* Write the contents of an ArrowReader as a dataset.
*
* @param reader the datasource for writing
* @param format target file format
* @param uri target file uri
* @param maxPartitions maximum partitions to be included in written files
* @param partitionColumns columns used to partition output files. Empty to disable partitioning
* @param baseNameTemplate file name template used to make partitions. E.g. "dat_{i}", i is current partition
* ID around all written files.
*/
public static void write(BufferAllocator allocator, ArrowReader reader, FileFormat format, String uri,
String[] partitionColumns, int maxPartitions, String baseNameTemplate) {
try (final ArrowArrayStream stream = ArrowArrayStream.allocateNew(allocator)) {
Data.exportArrayStream(allocator, reader, stream);
JniWrapper.get().writeFromScannerToFile(stream.memoryAddress(),
format.id(), uri, partitionColumns, maxPartitions, baseNameTemplate);
}
}

/**
* Write the contents of an ArrowReader as a dataset, with default partitioning settings.
*
* @param reader the datasource for writing
* @param format target file format
* @param uri target file uri
*/
public static void write(BufferAllocator allocator, ArrowReader reader, FileFormat format, String uri) {
write(allocator, reader, format, uri, new String[0], 1024, "data_{i}");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,23 @@ private JniWrapper() {
*/
public native long makeFileSystemDatasetFactory(String uri, int fileFormat);

/**
* Write the content in a {@link org.apache.arrow.c.ArrowArrayStream} into files. This internally
* depends on C++ write API: FileSystemDataset::Write.
*
* @param streamAddress the ArrowArrayStream address
* @param fileFormat target file format (ID)
* @param uri target file uri
* @param partitionColumns columns used to partition output files
* @param maxPartitions maximum partitions to be included in written files
* @param baseNameTemplate file name template used to make partitions. E.g. "dat_{i}", i is current partition
* ID around all written files.
*/
public native void writeFromScannerToFile(long streamAddress,
long fileFormat,
String uri,
String[] partitionColumns,
int maxPartitions,
String baseNameTemplate);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.arrow.dataset.scanner;

import java.io.IOException;
import java.util.Iterator;

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.ipc.ArrowReader;
import org.apache.arrow.vector.ipc.message.ArrowDictionaryBatch;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
import org.apache.arrow.vector.types.pojo.Schema;

/**
* An implementation of {@link ArrowReader} that reads
* the dataset from {@link Scanner}.
*/
public class ArrowScannerReader extends ArrowReader {
private final Scanner scanner;

private Iterator<? extends ScanTask> taskIterator;

private ScanTask currentTask = null;
private ArrowReader currentReader = null;

/**
* Constructs a scanner reader using a Scanner.
*
* @param scanner scanning data over dataset
* @param allocator to allocate new buffers
*/
public ArrowScannerReader(Scanner scanner, BufferAllocator allocator) {
super(allocator);
this.scanner = scanner;
this.taskIterator = scanner.scan().iterator();
if (taskIterator.hasNext()) {
currentTask = taskIterator.next();
currentReader = currentTask.execute();
}
}

@Override
protected void loadRecordBatch(ArrowRecordBatch batch) {
throw new UnsupportedOperationException();
}

@Override
protected void loadDictionary(ArrowDictionaryBatch dictionaryBatch) {
throw new UnsupportedOperationException();
}

@Override
public boolean loadNextBatch() throws IOException {
if (currentReader == null) {
return false;
}
boolean result = currentReader.loadNextBatch();

if (!result) {
try {
currentTask.close();
currentReader.close();
} catch (Exception e) {
throw new IOException(e);
}

while (!result) {
if (!taskIterator.hasNext()) {
return false;
} else {
currentTask = taskIterator.next();
currentReader = currentTask.execute();
result = currentReader.loadNextBatch();
}
}
}

VectorLoader loader = new VectorLoader(this.getVectorSchemaRoot());
VectorUnloader unloader =
new VectorUnloader(currentReader.getVectorSchemaRoot());
try (ArrowRecordBatch recordBatch = unloader.getRecordBatch()) {
loader.load(recordBatch);
}
return true;
}

@Override
public long bytesRead() {
return 0L;
}

@Override
protected void closeReadSource() throws IOException {
try {
currentTask.close();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there an issue if we potentially double-close these?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If already closed, the close method will directly return and no further operator.

currentReader.close();
scanner.close();
} catch (Exception e) {
throw new IOException(e);
}
}

@Override
protected Schema readSchema() throws IOException {
return scanner.schema();
}
}
Loading