Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ import org.apache.spark.rpc.{GlutenDriverEndpoint, GlutenExecutorEndpoint}
import org.apache.spark.sql.execution.datasources.GlutenWriterColumnarRules
import org.apache.spark.sql.execution.datasources.v1._
import org.apache.spark.sql.utils.ExpressionUtil
import org.apache.spark.util.SparkDirectoryUtil
import org.apache.spark.util.{SparkDirectoryUtil, SparkShutdownManagerUtil}

import org.apache.commons.lang3.StringUtils

Expand Down Expand Up @@ -87,6 +87,7 @@ class CHListenerApi extends ListenerApi with Logging {
val executorLibPath = conf.get(GlutenConfig.GLUTEN_EXECUTOR_LIB_PATH.key, libPath)
JniLibLoader.loadFromPath(executorLibPath, true)
}
CHListenerApi.addShutdownHook
// Add configs
import org.apache.gluten.backendsapi.clickhouse.CHConfig._
conf.setCHConfig(
Expand Down Expand Up @@ -133,3 +134,17 @@ class CHListenerApi extends ListenerApi with Logging {
CHNativeExpressionEvaluator.finalizeNative()
}
}

object CHListenerApi {
var initialized = false

def addShutdownHook: Unit = {
if (!initialized) {
initialized = true
SparkShutdownManagerUtil.addHookForLibUnloading(
() => {
JniLibLoader.forceUnloadAll
})
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ import org.apache.spark.sql.execution.datasources.GlutenWriterColumnarRules
import org.apache.spark.sql.execution.datasources.velox.{VeloxParquetWriterInjects, VeloxRowSplitter}
import org.apache.spark.sql.expression.UDFResolver
import org.apache.spark.sql.internal.{GlutenConfigUtil, StaticSQLConf}
import org.apache.spark.util.{SparkDirectoryUtil, SparkResourceUtil}
import org.apache.spark.util.{SparkDirectoryUtil, SparkResourceUtil, SparkShutdownManagerUtil}

import org.apache.commons.lang3.StringUtils

import java.util.UUID
import java.util.concurrent.atomic.AtomicBoolean

class VeloxListenerApi extends ListenerApi with Logging {
Expand Down Expand Up @@ -116,7 +117,6 @@ class VeloxListenerApi extends ListenerApi with Logging {
}

SparkDirectoryUtil.init(conf)
UDFResolver.resolveUdfConf(conf, isDriver = true)
initialize(conf, isDriver = true)
UdfJniWrapper.registerFunctionSignatures()
}
Expand All @@ -143,13 +143,29 @@ class VeloxListenerApi extends ListenerApi with Logging {
}

SparkDirectoryUtil.init(conf)
UDFResolver.resolveUdfConf(conf, isDriver = false)
initialize(conf, isDriver = false)
}

override def onExecutorShutdown(): Unit = shutdown()

private def initialize(conf: SparkConf, isDriver: Boolean): Unit = {
addShutdownHook
// Sets this configuration only once, since not undoable.
// DebugInstance should be created first.
if (conf.getBoolean(GlutenConfig.DEBUG_KEEP_JNI_WORKSPACE.key, defaultValue = false)) {
val debugDir = conf.get(GlutenConfig.DEBUG_KEEP_JNI_WORKSPACE_DIR.key)
JniWorkspace.enableDebug(debugDir)
} else {
JniWorkspace.initializeDefault(
() =>
SparkDirectoryUtil.get
.namespace("jni")
.mkChildDirRandomly(UUID.randomUUID.toString)
.getAbsolutePath)
}

UDFResolver.resolveUdfConf(conf, isDriver)

// Do row / batch type initializations.
Convention.ensureSparkRowAndBatchTypesRegistered()
ArrowJavaBatch.ensureRegistered()
Expand All @@ -169,12 +185,6 @@ class VeloxListenerApi extends ListenerApi with Logging {
classOf[ColumnarShuffleManager].getName
)

// Sets this configuration only once, since not undoable.
if (conf.getBoolean(GlutenConfig.DEBUG_KEEP_JNI_WORKSPACE.key, defaultValue = false)) {
val debugDir = conf.get(GlutenConfig.DEBUG_KEEP_JNI_WORKSPACE_DIR.key)
JniWorkspace.enableDebug(debugDir)
}

// Set the system properties.
// Use appending policy for children with the same name in a arrow struct vector.
System.setProperty("arrow.struct.conflict.policy", "CONFLICT_APPEND")
Expand Down Expand Up @@ -241,4 +251,11 @@ object VeloxListenerApi {
private def inLocalMode(conf: SparkConf): Boolean = {
SparkResourceUtil.isLocalMaster(conf)
}

private def addShutdownHook: Unit = {
SparkShutdownManagerUtil.addHookForLibUnloading(
() => {
JniLibLoader.forceUnloadAll
})
}
}
11 changes: 0 additions & 11 deletions gluten-core/src/main/java/org/apache/gluten/jni/JniLibLoader.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import org.apache.gluten.exception.GlutenException;

import org.apache.spark.util.SparkShutdownManagerUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -40,22 +39,12 @@
import java.util.Set;
import java.util.Vector;

import scala.runtime.BoxedUnit;

public class JniLibLoader {
private static final Logger LOG = LoggerFactory.getLogger(JniLibLoader.class);

private static final Set<String> LOADED_LIBRARY_PATHS = new HashSet<>();
private static final Set<String> REQUIRE_UNLOAD_LIBRARY_PATHS = new LinkedHashSet<>();

static {
SparkShutdownManagerUtil.addHookForLibUnloading(
() -> {
forceUnloadAll();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems the forceUnloadAll is used to fix the coredump in ch backends. Can we remove this now? @taiyang-li @baibaichen

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it is still needed.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It seems can not be removed, or will still coredump

return BoxedUnit.UNIT;
});
}

private final String workDir;
private final Set<String> loadedLibraries = new HashSet<>();

Expand Down
26 changes: 8 additions & 18 deletions gluten-core/src/main/java/org/apache/gluten/jni/JniWorkspace.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import com.google.common.base.Preconditions;
import org.apache.commons.io.FileUtils;
import org.apache.spark.util.SparkDirectoryUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -32,6 +31,7 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Supplier;

public class JniWorkspace {
private static final Logger LOG = LoggerFactory.getLogger(JniWorkspace.class);
Expand Down Expand Up @@ -60,19 +60,6 @@ private JniWorkspace(String rootDir) {
}
}

private static JniWorkspace createDefault() {
try {
final String tempRoot =
SparkDirectoryUtil.get()
.namespace("jni")
.mkChildDirRandomly(UUID.randomUUID().toString())
.getAbsolutePath();
return createOrGet(tempRoot);
} catch (Exception e) {
throw new GlutenException(e);
}
}

public static void enableDebug(String debugDir) {
// Preserve the JNI libraries even after process exits.
// This is useful for debugging native code if the debug symbols were embedded in
Expand All @@ -99,16 +86,19 @@ public static void enableDebug(String debugDir) {
}
}

public static JniWorkspace getDefault() {
public static void initializeDefault(Supplier<String> rootDir) {
synchronized (DEFAULT_INSTANCE_INIT_LOCK) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need synchronized? Seems it can only be executed by one thread in initialize context after this refactoring.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In tests, driver and executors are in the same process, so initialize may be call by driver and executor at the same time.

if (DEFAULT_INSTANCE == null) {
DEFAULT_INSTANCE = createDefault();
DEFAULT_INSTANCE = createOrGet(rootDir.get());
}
Preconditions.checkNotNull(DEFAULT_INSTANCE);
return DEFAULT_INSTANCE;
}
}

public static JniWorkspace getDefault() {
Preconditions.checkNotNull(DEFAULT_INSTANCE, "Not call initializeDefault yet");
return DEFAULT_INSTANCE;
}

private static JniWorkspace createOrGet(String rootDir) {
return INSTANCES.computeIfAbsent(rootDir, JniWorkspace::new);
}
Expand Down
Loading