diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index 58f17e943aa..5a8e0407158 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -153,7 +153,6 @@ public void open() { } urls = urlList.toArray(urls); - ClassLoader oldCl = Thread.currentThread().getContextClassLoader(); try { URLClassLoader newCl = new URLClassLoader(urls, oldCl); @@ -169,11 +168,25 @@ public void open() { private Map setupPySparkEnv() throws IOException{ Map env = EnvironmentUtils.getProcEnvironment(); + if (!env.containsKey("PYTHONPATH")) { SparkConf conf = getSparkConf(); - env.put("PYTHONPATH", conf.get("spark.submit.pyFiles").replaceAll(",", ":") + + env.put("PYTHONPATH", conf.get("spark.submit.pyFiles").replaceAll(",", ":") + ":../interpreter/lib/python"); } + + // get additional class paths when using SPARK_SUBMIT and not using YARN-CLIENT + // also, add all packages to PYTHONPATH since there might be transitive dependencies + if (SparkInterpreter.useSparkSubmit() && + !getSparkInterpreter().isYarnMode()) { + + String sparkSubmitJars = getSparkConf().get("spark.jars").replace(",", ":"); + + if (!"".equals(sparkSubmitJars)) { + env.put("PYTHONPATH", env.get("PYTHONPATH") + sparkSubmitJars); + } + } + return env; } diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index e8c76bcccd4..383c1c0dea4 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -296,7 +296,7 @@ private DepInterpreter getDepInterpreter() { return (DepInterpreter) p; } - private boolean isYarnMode() { + public boolean isYarnMode() { return getProperty("master").startsWith("yarn"); } @@ -556,7 +556,7 @@ static final String toString(Object o) { return (o instanceof String) ? (String) o : ""; } - private boolean useSparkSubmit() { + public static boolean useSparkSubmit() { return null != System.getenv("SPARK_SUBMIT"); } @@ -727,7 +727,6 @@ public void open() { pathSettings.v_$eq(classpath); settings.scala$tools$nsc$settings$ScalaSettings$_setter_$classpath_$eq(pathSettings); - // set classloader for scala compiler settings.explicitParentLoader_$eq(new Some<>(Thread.currentThread() .getContextClassLoader())); @@ -976,7 +975,7 @@ public void populateSparkWebUrl(InterpreterContext ctx) { } } - private List currentClassPath() { + public List currentClassPath() { List paths = classPath(Thread.currentThread().getContextClassLoader()); String[] cps = System.getProperty("java.class.path").split(File.pathSeparator); if (cps != null) {