From c735bd54b1ce712641ae9d2c4b780d954c0e985c Mon Sep 17 00:00:00 2001 From: 1ambda <1amb4a@gmail.com> Date: Mon, 2 Jan 2017 13:52:40 +0900 Subject: [PATCH 1/3] fix: Import spark submit packages in pyspark --- .../zeppelin/spark/PySparkInterpreter.java | 30 +++++++++++++++---- .../zeppelin/spark/SparkInterpreter.java | 7 ++--- 2 files changed, 27 insertions(+), 10 deletions(-) diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index 58f17e943aa..e84294dc7f4 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -153,12 +153,23 @@ public void open() { } urls = urlList.toArray(urls); - ClassLoader oldCl = Thread.currentThread().getContextClassLoader(); try { + // Get additional class paths when using SPARK_SUBMIT + // Should get current class Path before setting new class loader + // Also, add all packages to PYTHONPATH + // since there might be transitive dependencies + StringBuilder sparkSubmitPythonPaths = new StringBuilder(); + if (SparkInterpreter.useSparkSubmit()) { + List paths = SparkInterpreter.currentClassPath(); + for (File f : paths) { + sparkSubmitPythonPaths.append(f.getAbsolutePath()).append(":"); + } + } + URLClassLoader newCl = new URLClassLoader(urls, oldCl); Thread.currentThread().setContextClassLoader(newCl); - createGatewayServerAndStartScript(); + createGatewayServerAndStartScript(sparkSubmitPythonPaths.toString()); } catch (Exception e) { logger.error("Error", e); throw new InterpreterException(e); @@ -167,17 +178,24 @@ public void open() { } } - private Map setupPySparkEnv() throws IOException{ + private Map setupPySparkEnv(String sparkSubmitPaths) throws IOException{ Map env = EnvironmentUtils.getProcEnvironment(); + if (!env.containsKey("PYTHONPATH")) { SparkConf conf = getSparkConf(); - env.put("PYTHONPATH", conf.get("spark.submit.pyFiles").replaceAll(",", ":") + + env.put("PYTHONPATH", conf.get("spark.submit.pyFiles").replaceAll(",", ":") + ":../interpreter/lib/python"); } + + // Configure PYTHONPATH after comparing `!env.containsKey("PATHONPATH")` + if (!"".equals(sparkSubmitPaths)) { + env.put("PYTHONPATH", env.get("PYTHONPATH") + sparkSubmitPaths); + } + return env; } - private void createGatewayServerAndStartScript() { + private void createGatewayServerAndStartScript(String sparkSubmitPaths) { // create python script createPythonScript(); @@ -209,7 +227,7 @@ private void createGatewayServerAndStartScript() { executor.setWatchdog(new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT)); try { - Map env = setupPySparkEnv(); + Map env = setupPySparkEnv(sparkSubmitPaths); executor.execute(cmd, env, this); pythonscriptRunning = true; } catch (IOException e) { diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index e8c76bcccd4..8b76aba6dad 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -556,7 +556,7 @@ static final String toString(Object o) { return (o instanceof String) ? (String) o : ""; } - private boolean useSparkSubmit() { + public static boolean useSparkSubmit() { return null != System.getenv("SPARK_SUBMIT"); } @@ -727,7 +727,6 @@ public void open() { pathSettings.v_$eq(classpath); settings.scala$tools$nsc$settings$ScalaSettings$_setter_$classpath_$eq(pathSettings); - // set classloader for scala compiler settings.explicitParentLoader_$eq(new Some<>(Thread.currentThread() .getContextClassLoader())); @@ -976,7 +975,7 @@ public void populateSparkWebUrl(InterpreterContext ctx) { } } - private List currentClassPath() { + public static List currentClassPath() { List paths = classPath(Thread.currentThread().getContextClassLoader()); String[] cps = System.getProperty("java.class.path").split(File.pathSeparator); if (cps != null) { @@ -987,7 +986,7 @@ private List currentClassPath() { return paths; } - private List classPath(ClassLoader cl) { + private static List classPath(ClassLoader cl) { List paths = new LinkedList<>(); if (cl == null) { return paths; From f76d2c84d759aa8e70d0d41249a267140381e597 Mon Sep 17 00:00:00 2001 From: 1ambda <1amb4a@gmail.com> Date: Wed, 11 Jan 2017 08:01:11 +0900 Subject: [PATCH 2/3] fix: Do not extend PYTHONPATH in yarn-client --- .../org/apache/zeppelin/spark/PySparkInterpreter.java | 10 +++++----- .../org/apache/zeppelin/spark/SparkInterpreter.java | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index e84294dc7f4..b7c7dbf402b 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -155,12 +155,12 @@ public void open() { urls = urlList.toArray(urls); ClassLoader oldCl = Thread.currentThread().getContextClassLoader(); try { - // Get additional class paths when using SPARK_SUBMIT - // Should get current class Path before setting new class loader - // Also, add all packages to PYTHONPATH - // since there might be transitive dependencies + // get additional class paths when using SPARK_SUBMIT and not using YARN-CLIENT + // SHOULD get current class path before setting new class loader + // also, add all packages to PYTHONPATH since there might be transitive dependencies StringBuilder sparkSubmitPythonPaths = new StringBuilder(); - if (SparkInterpreter.useSparkSubmit()) { + if (SparkInterpreter.useSparkSubmit() && + !getSparkInterpreter().isYarnMode()) { List paths = SparkInterpreter.currentClassPath(); for (File f : paths) { sparkSubmitPythonPaths.append(f.getAbsolutePath()).append(":"); diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index 8b76aba6dad..18a4fd36bbf 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -296,7 +296,7 @@ private DepInterpreter getDepInterpreter() { return (DepInterpreter) p; } - private boolean isYarnMode() { + public boolean isYarnMode() { return getProperty("master").startsWith("yarn"); } From 585d48a2681872e9e3c814a39ce7539cc02a0ea1 Mon Sep 17 00:00:00 2001 From: 1ambda <1amb4a@gmail.com> Date: Wed, 11 Jan 2017 14:38:01 +0900 Subject: [PATCH 3/3] Use spark.jars instead of classpath --- .../zeppelin/spark/PySparkInterpreter.java | 33 ++++++++----------- .../zeppelin/spark/SparkInterpreter.java | 4 +-- 2 files changed, 16 insertions(+), 21 deletions(-) diff --git a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java index b7c7dbf402b..5a8e0407158 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/PySparkInterpreter.java @@ -155,21 +155,9 @@ public void open() { urls = urlList.toArray(urls); ClassLoader oldCl = Thread.currentThread().getContextClassLoader(); try { - // get additional class paths when using SPARK_SUBMIT and not using YARN-CLIENT - // SHOULD get current class path before setting new class loader - // also, add all packages to PYTHONPATH since there might be transitive dependencies - StringBuilder sparkSubmitPythonPaths = new StringBuilder(); - if (SparkInterpreter.useSparkSubmit() && - !getSparkInterpreter().isYarnMode()) { - List paths = SparkInterpreter.currentClassPath(); - for (File f : paths) { - sparkSubmitPythonPaths.append(f.getAbsolutePath()).append(":"); - } - } - URLClassLoader newCl = new URLClassLoader(urls, oldCl); Thread.currentThread().setContextClassLoader(newCl); - createGatewayServerAndStartScript(sparkSubmitPythonPaths.toString()); + createGatewayServerAndStartScript(); } catch (Exception e) { logger.error("Error", e); throw new InterpreterException(e); @@ -178,7 +166,7 @@ public void open() { } } - private Map setupPySparkEnv(String sparkSubmitPaths) throws IOException{ + private Map setupPySparkEnv() throws IOException{ Map env = EnvironmentUtils.getProcEnvironment(); if (!env.containsKey("PYTHONPATH")) { @@ -187,15 +175,22 @@ private Map setupPySparkEnv(String sparkSubmitPaths) throws IOException{ ":../interpreter/lib/python"); } - // Configure PYTHONPATH after comparing `!env.containsKey("PATHONPATH")` - if (!"".equals(sparkSubmitPaths)) { - env.put("PYTHONPATH", env.get("PYTHONPATH") + sparkSubmitPaths); + // get additional class paths when using SPARK_SUBMIT and not using YARN-CLIENT + // also, add all packages to PYTHONPATH since there might be transitive dependencies + if (SparkInterpreter.useSparkSubmit() && + !getSparkInterpreter().isYarnMode()) { + + String sparkSubmitJars = getSparkConf().get("spark.jars").replace(",", ":"); + + if (!"".equals(sparkSubmitJars)) { + env.put("PYTHONPATH", env.get("PYTHONPATH") + sparkSubmitJars); + } } return env; } - private void createGatewayServerAndStartScript(String sparkSubmitPaths) { + private void createGatewayServerAndStartScript() { // create python script createPythonScript(); @@ -227,7 +222,7 @@ private void createGatewayServerAndStartScript(String sparkSubmitPaths) { executor.setWatchdog(new ExecuteWatchdog(ExecuteWatchdog.INFINITE_TIMEOUT)); try { - Map env = setupPySparkEnv(sparkSubmitPaths); + Map env = setupPySparkEnv(); executor.execute(cmd, env, this); pythonscriptRunning = true; } catch (IOException e) { diff --git a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java index 18a4fd36bbf..383c1c0dea4 100644 --- a/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java +++ b/spark/src/main/java/org/apache/zeppelin/spark/SparkInterpreter.java @@ -975,7 +975,7 @@ public void populateSparkWebUrl(InterpreterContext ctx) { } } - public static List currentClassPath() { + public List currentClassPath() { List paths = classPath(Thread.currentThread().getContextClassLoader()); String[] cps = System.getProperty("java.class.path").split(File.pathSeparator); if (cps != null) { @@ -986,7 +986,7 @@ public static List currentClassPath() { return paths; } - private static List classPath(ClassLoader cl) { + private List classPath(ClassLoader cl) { List paths = new LinkedList<>(); if (cl == null) { return paths;