diff --git a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala index 4b661b3c2def..120584ce2e5e 100644 --- a/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala +++ b/backends-clickhouse/src/main/scala/org/apache/gluten/backendsapi/clickhouse/CHListenerApi.scala @@ -36,7 +36,7 @@ import org.apache.spark.rpc.{GlutenDriverEndpoint, GlutenExecutorEndpoint} import org.apache.spark.sql.execution.datasources.GlutenWriterColumnarRules import org.apache.spark.sql.execution.datasources.v1._ import org.apache.spark.sql.utils.ExpressionUtil -import org.apache.spark.util.SparkDirectoryUtil +import org.apache.spark.util.{SparkDirectoryUtil, SparkShutdownManagerUtil} import org.apache.commons.lang3.StringUtils @@ -87,6 +87,7 @@ class CHListenerApi extends ListenerApi with Logging { val executorLibPath = conf.get(GlutenConfig.GLUTEN_EXECUTOR_LIB_PATH.key, libPath) JniLibLoader.loadFromPath(executorLibPath, true) } + CHListenerApi.addShutdownHook // Add configs import org.apache.gluten.backendsapi.clickhouse.CHConfig._ conf.setCHConfig( @@ -133,3 +134,17 @@ class CHListenerApi extends ListenerApi with Logging { CHNativeExpressionEvaluator.finalizeNative() } } + +object CHListenerApi { + var initialized = false + + def addShutdownHook: Unit = { + if (!initialized) { + initialized = true + SparkShutdownManagerUtil.addHookForLibUnloading( + () => { + JniLibLoader.forceUnloadAll + }) + } + } +} diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala index 2aa19afe7a12..109f3015a691 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxListenerApi.scala @@ -41,10 +41,11 @@ import org.apache.spark.sql.execution.datasources.GlutenWriterColumnarRules import org.apache.spark.sql.execution.datasources.velox.{VeloxParquetWriterInjects, VeloxRowSplitter} import org.apache.spark.sql.expression.UDFResolver import org.apache.spark.sql.internal.{GlutenConfigUtil, StaticSQLConf} -import org.apache.spark.util.{SparkDirectoryUtil, SparkResourceUtil} +import org.apache.spark.util.{SparkDirectoryUtil, SparkResourceUtil, SparkShutdownManagerUtil} import org.apache.commons.lang3.StringUtils +import java.util.UUID import java.util.concurrent.atomic.AtomicBoolean class VeloxListenerApi extends ListenerApi with Logging { @@ -116,7 +117,6 @@ class VeloxListenerApi extends ListenerApi with Logging { } SparkDirectoryUtil.init(conf) - UDFResolver.resolveUdfConf(conf, isDriver = true) initialize(conf, isDriver = true) UdfJniWrapper.registerFunctionSignatures() } @@ -143,13 +143,29 @@ class VeloxListenerApi extends ListenerApi with Logging { } SparkDirectoryUtil.init(conf) - UDFResolver.resolveUdfConf(conf, isDriver = false) initialize(conf, isDriver = false) } override def onExecutorShutdown(): Unit = shutdown() private def initialize(conf: SparkConf, isDriver: Boolean): Unit = { + addShutdownHook + // Sets this configuration only once, since not undoable. + // DebugInstance should be created first. + if (conf.getBoolean(GlutenConfig.DEBUG_KEEP_JNI_WORKSPACE.key, defaultValue = false)) { + val debugDir = conf.get(GlutenConfig.DEBUG_KEEP_JNI_WORKSPACE_DIR.key) + JniWorkspace.enableDebug(debugDir) + } else { + JniWorkspace.initializeDefault( + () => + SparkDirectoryUtil.get + .namespace("jni") + .mkChildDirRandomly(UUID.randomUUID.toString) + .getAbsolutePath) + } + + UDFResolver.resolveUdfConf(conf, isDriver) + // Do row / batch type initializations. Convention.ensureSparkRowAndBatchTypesRegistered() ArrowJavaBatch.ensureRegistered() @@ -169,12 +185,6 @@ class VeloxListenerApi extends ListenerApi with Logging { classOf[ColumnarShuffleManager].getName ) - // Sets this configuration only once, since not undoable. - if (conf.getBoolean(GlutenConfig.DEBUG_KEEP_JNI_WORKSPACE.key, defaultValue = false)) { - val debugDir = conf.get(GlutenConfig.DEBUG_KEEP_JNI_WORKSPACE_DIR.key) - JniWorkspace.enableDebug(debugDir) - } - // Set the system properties. // Use appending policy for children with the same name in a arrow struct vector. System.setProperty("arrow.struct.conflict.policy", "CONFLICT_APPEND") @@ -241,4 +251,11 @@ object VeloxListenerApi { private def inLocalMode(conf: SparkConf): Boolean = { SparkResourceUtil.isLocalMaster(conf) } + + private def addShutdownHook: Unit = { + SparkShutdownManagerUtil.addHookForLibUnloading( + () => { + JniLibLoader.forceUnloadAll + }) + } } diff --git a/gluten-core/src/main/java/org/apache/gluten/jni/JniLibLoader.java b/gluten-core/src/main/java/org/apache/gluten/jni/JniLibLoader.java index 2d07c3d79b9a..72898823f5b3 100644 --- a/gluten-core/src/main/java/org/apache/gluten/jni/JniLibLoader.java +++ b/gluten-core/src/main/java/org/apache/gluten/jni/JniLibLoader.java @@ -18,7 +18,6 @@ import org.apache.gluten.exception.GlutenException; -import org.apache.spark.util.SparkShutdownManagerUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -40,22 +39,12 @@ import java.util.Set; import java.util.Vector; -import scala.runtime.BoxedUnit; - public class JniLibLoader { private static final Logger LOG = LoggerFactory.getLogger(JniLibLoader.class); private static final Set LOADED_LIBRARY_PATHS = new HashSet<>(); private static final Set REQUIRE_UNLOAD_LIBRARY_PATHS = new LinkedHashSet<>(); - static { - SparkShutdownManagerUtil.addHookForLibUnloading( - () -> { - forceUnloadAll(); - return BoxedUnit.UNIT; - }); - } - private final String workDir; private final Set loadedLibraries = new HashSet<>(); diff --git a/gluten-core/src/main/java/org/apache/gluten/jni/JniWorkspace.java b/gluten-core/src/main/java/org/apache/gluten/jni/JniWorkspace.java index a37aac03c583..50827c7a0185 100644 --- a/gluten-core/src/main/java/org/apache/gluten/jni/JniWorkspace.java +++ b/gluten-core/src/main/java/org/apache/gluten/jni/JniWorkspace.java @@ -20,7 +20,6 @@ import com.google.common.base.Preconditions; import org.apache.commons.io.FileUtils; -import org.apache.spark.util.SparkDirectoryUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,6 +31,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Supplier; public class JniWorkspace { private static final Logger LOG = LoggerFactory.getLogger(JniWorkspace.class); @@ -60,19 +60,6 @@ private JniWorkspace(String rootDir) { } } - private static JniWorkspace createDefault() { - try { - final String tempRoot = - SparkDirectoryUtil.get() - .namespace("jni") - .mkChildDirRandomly(UUID.randomUUID().toString()) - .getAbsolutePath(); - return createOrGet(tempRoot); - } catch (Exception e) { - throw new GlutenException(e); - } - } - public static void enableDebug(String debugDir) { // Preserve the JNI libraries even after process exits. // This is useful for debugging native code if the debug symbols were embedded in @@ -99,16 +86,19 @@ public static void enableDebug(String debugDir) { } } - public static JniWorkspace getDefault() { + public static void initializeDefault(Supplier rootDir) { synchronized (DEFAULT_INSTANCE_INIT_LOCK) { if (DEFAULT_INSTANCE == null) { - DEFAULT_INSTANCE = createDefault(); + DEFAULT_INSTANCE = createOrGet(rootDir.get()); } - Preconditions.checkNotNull(DEFAULT_INSTANCE); - return DEFAULT_INSTANCE; } } + public static JniWorkspace getDefault() { + Preconditions.checkNotNull(DEFAULT_INSTANCE, "Not call initializeDefault yet"); + return DEFAULT_INSTANCE; + } + private static JniWorkspace createOrGet(String rootDir) { return INSTANCES.computeIfAbsent(rootDir, JniWorkspace::new); }