From 6058031f4db83a1c6fc148b01aa7293b66bf1f83 Mon Sep 17 00:00:00 2001 From: Dane Pitkin Date: Fri, 28 Jul 2023 16:37:29 -0400 Subject: [PATCH 1/2] GH-36069: [Java] Ensure S3 is finalized on shutdown --- java/dataset/src/main/cpp/jni_wrapper.cc | 13 +++++++++++++ .../dataset/file/FileSystemDatasetFactory.java | 12 ++++++++++++ .../org/apache/arrow/dataset/file/JniWrapper.java | 6 ++++++ 3 files changed, 31 insertions(+) diff --git a/java/dataset/src/main/cpp/jni_wrapper.cc b/java/dataset/src/main/cpp/jni_wrapper.cc index 871a2e95b94..1cfea1e2aed 100644 --- a/java/dataset/src/main/cpp/jni_wrapper.cc +++ b/java/dataset/src/main/cpp/jni_wrapper.cc @@ -27,6 +27,7 @@ #include "arrow/dataset/file_base.h" #include "arrow/filesystem/localfs.h" #include "arrow/filesystem/path_util.h" +#include "arrow/filesystem/s3fs.h" #include "arrow/engine/substrait/util.h" #include "arrow/ipc/api.h" #include "arrow/util/iterator.h" @@ -678,6 +679,18 @@ Java_org_apache_arrow_dataset_file_JniWrapper_writeFromScannerToFile( JNI_METHOD_END() } +/* + * Class: org_apache_arrow_dataset_file_JniWrapper + * Method: ensureS3Finalized + * Signature: (J)V + */ +JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_file_JniWrapper_ensureS3Finalized( + JNIEnv* env, jobject) { + JNI_METHOD_START + JniAssertOkOrThrow(arrow::fs::EnsureS3Finalized()); + JNI_METHOD_END() +} + /* * Class: org_apache_arrow_dataset_substrait_JniWrapper * Method: executeSerializedPlan diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java index aa315690592..cd9bb751252 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java @@ -17,6 +17,8 @@ package org.apache.arrow.dataset.file; +import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.arrow.dataset.jni.NativeDatasetFactory; import org.apache.arrow.dataset.jni.NativeMemoryPool; import org.apache.arrow.memory.BufferAllocator; @@ -26,14 +28,18 @@ */ public class FileSystemDatasetFactory extends NativeDatasetFactory { + private static final AtomicBoolean addedS3ShutdownHook = new AtomicBoolean(false); + public FileSystemDatasetFactory(BufferAllocator allocator, NativeMemoryPool memoryPool, FileFormat format, String uri) { super(allocator, memoryPool, createNative(format, uri)); + ensureS3FinalizedOnShutdown(); } public FileSystemDatasetFactory(BufferAllocator allocator, NativeMemoryPool memoryPool, FileFormat format, String[] uris) { super(allocator, memoryPool, createNative(format, uris)); + ensureS3FinalizedOnShutdown(); } private static long createNative(FileFormat format, String uri) { @@ -44,4 +50,10 @@ private static long createNative(FileFormat format, String[] uris) { return JniWrapper.get().makeFileSystemDatasetFactory(uris, format.id()); } + private static void ensureS3FinalizedOnShutdown() { + if (addedS3ShutdownHook.compareAndSet(false, true)) { + Runtime.getRuntime().addShutdownHook(new Thread(() -> { JniWrapper.get().ensureS3Finalized(); })); + } + } + } diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java index c3a1a4e58a1..c59a9d3fe82 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java @@ -75,4 +75,10 @@ public native void writeFromScannerToFile(long streamAddress, int maxPartitions, String baseNameTemplate); + /** + * Ensure the S3 APIs are shutdown, but only if not already done. If the S3 APIs are unintialized, + * then this is a noop. + */ + public native void ensureS3Finalized(); + } From 556a822cd56ba79da93dd8a416c31fd2e7d67c9f Mon Sep 17 00:00:00 2001 From: Dane Pitkin Date: Mon, 31 Jul 2023 14:51:13 -0400 Subject: [PATCH 2/2] Move implementation to JniLoader --- java/dataset/src/main/cpp/jni_wrapper.cc | 24 +++++++++---------- .../file/FileSystemDatasetFactory.java | 12 ---------- .../apache/arrow/dataset/file/JniWrapper.java | 6 ----- .../apache/arrow/dataset/jni/JniLoader.java | 5 ++++ .../apache/arrow/dataset/jni/JniWrapper.java | 6 +++++ 5 files changed, 23 insertions(+), 30 deletions(-) diff --git a/java/dataset/src/main/cpp/jni_wrapper.cc b/java/dataset/src/main/cpp/jni_wrapper.cc index 1cfea1e2aed..5640bc43496 100644 --- a/java/dataset/src/main/cpp/jni_wrapper.cc +++ b/java/dataset/src/main/cpp/jni_wrapper.cc @@ -570,6 +570,18 @@ JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_releaseBuffe JNI_METHOD_END() } +/* + * Class: org_apache_arrow_dataset_jni_JniWrapper + * Method: ensureS3Finalized + * Signature: (J)V + */ +JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_jni_JniWrapper_ensureS3Finalized( + JNIEnv* env, jobject) { + JNI_METHOD_START + JniAssertOkOrThrow(arrow::fs::EnsureS3Finalized()); + JNI_METHOD_END() +} + /* * Class: org_apache_arrow_dataset_file_JniWrapper * Method: makeFileSystemDatasetFactory @@ -679,18 +691,6 @@ Java_org_apache_arrow_dataset_file_JniWrapper_writeFromScannerToFile( JNI_METHOD_END() } -/* - * Class: org_apache_arrow_dataset_file_JniWrapper - * Method: ensureS3Finalized - * Signature: (J)V - */ -JNIEXPORT void JNICALL Java_org_apache_arrow_dataset_file_JniWrapper_ensureS3Finalized( - JNIEnv* env, jobject) { - JNI_METHOD_START - JniAssertOkOrThrow(arrow::fs::EnsureS3Finalized()); - JNI_METHOD_END() -} - /* * Class: org_apache_arrow_dataset_substrait_JniWrapper * Method: executeSerializedPlan diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java index cd9bb751252..aa315690592 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/FileSystemDatasetFactory.java @@ -17,8 +17,6 @@ package org.apache.arrow.dataset.file; -import java.util.concurrent.atomic.AtomicBoolean; - import org.apache.arrow.dataset.jni.NativeDatasetFactory; import org.apache.arrow.dataset.jni.NativeMemoryPool; import org.apache.arrow.memory.BufferAllocator; @@ -28,18 +26,14 @@ */ public class FileSystemDatasetFactory extends NativeDatasetFactory { - private static final AtomicBoolean addedS3ShutdownHook = new AtomicBoolean(false); - public FileSystemDatasetFactory(BufferAllocator allocator, NativeMemoryPool memoryPool, FileFormat format, String uri) { super(allocator, memoryPool, createNative(format, uri)); - ensureS3FinalizedOnShutdown(); } public FileSystemDatasetFactory(BufferAllocator allocator, NativeMemoryPool memoryPool, FileFormat format, String[] uris) { super(allocator, memoryPool, createNative(format, uris)); - ensureS3FinalizedOnShutdown(); } private static long createNative(FileFormat format, String uri) { @@ -50,10 +44,4 @@ private static long createNative(FileFormat format, String[] uris) { return JniWrapper.get().makeFileSystemDatasetFactory(uris, format.id()); } - private static void ensureS3FinalizedOnShutdown() { - if (addedS3ShutdownHook.compareAndSet(false, true)) { - Runtime.getRuntime().addShutdownHook(new Thread(() -> { JniWrapper.get().ensureS3Finalized(); })); - } - } - } diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java index c59a9d3fe82..c3a1a4e58a1 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/file/JniWrapper.java @@ -75,10 +75,4 @@ public native void writeFromScannerToFile(long streamAddress, int maxPartitions, String baseNameTemplate); - /** - * Ensure the S3 APIs are shutdown, but only if not already done. If the S3 APIs are unintialized, - * then this is a noop. - */ - public native void ensureS3Finalized(); - } diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniLoader.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniLoader.java index 7ada21c0582..a3b31c73e85 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniLoader.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniLoader.java @@ -59,6 +59,7 @@ public void ensureLoaded() { return; } loadRemaining(); + ensureS3FinalizedOnShutdown(); } private synchronized void loadRemaining() { @@ -109,4 +110,8 @@ private String getNormalizedArch() { } return arch; } + + private void ensureS3FinalizedOnShutdown() { + Runtime.getRuntime().addShutdownHook(new Thread(() -> { JniWrapper.get().ensureS3Finalized(); })); + } } diff --git a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java index 1a9d4188c16..93cc5d7a370 100644 --- a/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java +++ b/java/dataset/src/main/java/org/apache/arrow/dataset/jni/JniWrapper.java @@ -108,4 +108,10 @@ private JniWrapper() { * @param bufferId the native pointer of the arrow::Buffer instance. */ public native void releaseBuffer(long bufferId); + + /** + * Ensure the S3 APIs are shutdown, but only if not already done. If the S3 APIs are unintialized, + * then this is a noop. + */ + public native void ensureS3Finalized(); }