diff --git a/core/src/main/java/org/apache/spark/api/plugin/DriverPlugin.java b/core/src/main/java/org/apache/spark/api/plugin/DriverPlugin.java new file mode 100644 index 0000000000000..0c0d0df8ae682 --- /dev/null +++ b/core/src/main/java/org/apache/spark/api/plugin/DriverPlugin.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.api.plugin; + +import java.util.Collections; +import java.util.Map; + +import org.apache.spark.SparkContext; +import org.apache.spark.annotation.DeveloperApi; + +/** + * :: DeveloperApi :: + * Driver component of a {@link SparkPlugin}. + * + * @since 3.0.0 + */ +@DeveloperApi +public interface DriverPlugin { + + /** + * Initialize the plugin. + *
+ * This method is called early in the initialization of the Spark driver. Explicitly, it is + * called before the Spark driver's task scheduler is initialized. This means that a lot + * of other Spark subsystems may yet not have been initialized. This call also blocks driver + * initialization. + *
+ * It's recommended that plugins be careful about what operations are performed in this call,
+ * preferrably performing expensive operations in a separate thread, or postponing them until
+ * the application has fully started.
+ *
+ * @param sc The SparkContext loading the plugin.
+ * @param pluginContext Additional plugin-specific about the Spark application where the plugin
+ * is running.
+ * @return A map that will be provided to the {@link ExecutorPlugin#init(PluginContext,Map)}
+ * method.
+ */
+ default Map
+ * This method is called later in the initialization of the Spark application, after most
+ * subsystems are up and the application ID is known. If there are metrics registered in
+ * the registry ({@link PluginContext#metricRegistry()}), then a metrics source with the
+ * plugin name will be created.
+ *
+ * Note that even though the metric registry is still accessible after this method is called,
+ * registering new metrics after this method is called may result in the metrics not being
+ * available.
+ *
+ * @param appId The application ID from the cluster manager.
+ * @param pluginContext Additional plugin-specific about the Spark application where the plugin
+ * is running.
+ */
+ default void registerMetrics(String appId, PluginContext pluginContext) {}
+
+ /**
+ * RPC message handler.
+ *
+ * Plugins can use Spark's RPC system to send messages from executors to the driver (but not
+ * the other way around, currently). Messages sent by the executor component of the plugin will
+ * be delivered to this method, and the returned value will be sent back to the executor as
+ * the reply, if the executor has requested one.
+ *
+ * Any exception thrown will be sent back to the executor as an error, in case it is expecting
+ * a reply. In case a reply is not expected, a log message will be written to the driver log.
+ *
+ * The implementation of this handler should be thread-safe.
+ *
+ * Note all plugins share RPC dispatch threads, and this method is called synchronously. So
+ * performing expensive operations in this handler may affect the operation of other active
+ * plugins. Internal Spark endpoints are not directly affected, though, since they use different
+ * threads.
+ *
+ * Spark guarantees that the driver component will be ready to receive messages through this
+ * handler when executors are started.
+ *
+ * @param message The incoming message.
+ * @return Value to be returned to the caller. Ignored if the caller does not expect a reply.
+ */
+ default Object receive(Object message) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Informs the plugin that the Spark application is shutting down.
+ *
+ * This method is called during the driver shutdown phase. It is recommended that plugins
+ * not use any Spark functions (e.g. send RPC messages) during this call.
+ */
+ default void shutdown() {}
+
+}
diff --git a/core/src/main/java/org/apache/spark/api/plugin/ExecutorPlugin.java b/core/src/main/java/org/apache/spark/api/plugin/ExecutorPlugin.java
new file mode 100644
index 0000000000000..4961308035163
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/api/plugin/ExecutorPlugin.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.plugin;
+
+import java.util.Map;
+
+import org.apache.spark.annotation.DeveloperApi;
+
+/**
+ * :: DeveloperApi ::
+ * Executor component of a {@link SparkPlugin}.
+ *
+ * @since 3.0.0
+ */
+@DeveloperApi
+public interface ExecutorPlugin {
+
+ /**
+ * Initialize the executor plugin.
+ *
+ * When a Spark plugin provides an executor plugin, this method will be called during the
+ * initialization of the executor process. It will block executor initialization until it
+ * returns.
+ *
+ * Executor plugins that publish metrics should register all metrics with the context's
+ * registry ({@link PluginContext#metricRegistry()}) when this method is called. Metrics
+ * registered afterwards are not guaranteed to show up.
+ *
+ * @param ctx Context information for the executor where the plugin is running.
+ * @param extraConf Extra configuration provided by the driver component during its
+ * initialization.
+ */
+ default void init(PluginContext ctx, Map
+ * This method is called during the executor shutdown phase, and blocks executor shutdown.
+ */
+ default void shutdown() {}
+
+}
diff --git a/core/src/main/java/org/apache/spark/api/plugin/PluginContext.java b/core/src/main/java/org/apache/spark/api/plugin/PluginContext.java
new file mode 100644
index 0000000000000..b9413cf828aa1
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/api/plugin/PluginContext.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.plugin;
+
+import java.io.IOException;
+
+import com.codahale.metrics.MetricRegistry;
+
+import org.apache.spark.SparkConf;
+import org.apache.spark.annotation.DeveloperApi;
+
+/**
+ * :: DeveloperApi ::
+ * Context information and operations for plugins loaded by Spark.
+ *
+ * An instance of this class is provided to plugins in their initialization method. It is safe
+ * for plugins to keep a reference to the instance for later use (for example, to send messages
+ * to the plugin's driver component).
+ *
+ * Context instances are plugin-specific, so metrics and messages are tied each plugin. It is
+ * not possible for a plugin to directly interact with other plugins.
+ *
+ * @since 3.0.0
+ */
+@DeveloperApi
+public interface PluginContext {
+
+ /**
+ * Registry where to register metrics published by the plugin associated with this context.
+ */
+ MetricRegistry metricRegistry();
+
+ /** Configuration of the Spark application. */
+ SparkConf conf();
+
+ /** Executor ID of the process. On the driver, this will identify the driver. */
+ String executorID();
+
+ /** The host name which is being used by the Spark process for communication. */
+ String hostname();
+
+ /**
+ * Send a message to the plugin's driver-side component.
+ *
+ * This method sends a message to the driver-side component of the plugin, without expecting
+ * a reply. It returns as soon as the message is enqueued for sending.
+ *
+ * The message must be serializable.
+ *
+ * @param message Message to be sent.
+ */
+ void send(Object message) throws IOException;
+
+ /**
+ * Send an RPC to the plugin's driver-side component.
+ *
+ * This method sends a message to the driver-side component of the plugin, and blocks until a
+ * reply arrives, or the configured RPC ask timeout (
+ * If the driver replies with an error, an exception with the corresponding error will be thrown.
+ *
+ * The message must be serializable.
+ *
+ * @param message Message to be sent.
+ * @return The reply from the driver-side component.
+ */
+ Object ask(Object message) throws Exception;
+
+}
diff --git a/core/src/main/java/org/apache/spark/api/plugin/SparkPlugin.java b/core/src/main/java/org/apache/spark/api/plugin/SparkPlugin.java
new file mode 100644
index 0000000000000..a500f5d2188f0
--- /dev/null
+++ b/core/src/main/java/org/apache/spark/api/plugin/SparkPlugin.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.api.plugin;
+
+import org.apache.spark.annotation.DeveloperApi;
+
+/**
+ * :: DeveloperApi ::
+ * A plugin that can be dynamically loaded into a Spark application.
+ *
+ * Plugins can be loaded by adding the plugin's class name to the appropriate Spark configuration.
+ * Check the Spark configuration documentation for details.
+ *
+ * Plugins have two optional components: a driver-side component, of which a single instance is
+ * created per application, inside the Spark driver. And an executor-side component, of which one
+ * instance is created in each executor that is started by Spark. Details of each component can be
+ * found in the documentation for {@link DriverPlugin} and {@link ExecutorPlugin}.
+ *
+ * @since 3.0.0
+ */
+@DeveloperApi
+public interface SparkPlugin {
+
+ /**
+ * Return the plugin's driver-side component.
+ *
+ * @return The driver-side component, or null if one is not needed.
+ */
+ DriverPlugin driverPlugin();
+
+ /**
+ * Return the plugin's executor-side component.
+ *
+ * @return The executor-side component, or null if one is not needed.
+ */
+ ExecutorPlugin executorPlugin();
+
+}
diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala
index 2db880976c3a1..cad88ad8aec67 100644
--- a/core/src/main/scala/org/apache/spark/SparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/SparkContext.scala
@@ -48,6 +48,7 @@ import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.internal.config.Tests._
import org.apache.spark.internal.config.UI._
+import org.apache.spark.internal.plugin.PluginContainer
import org.apache.spark.io.CompressionCodec
import org.apache.spark.metrics.source.JVMCPUSource
import org.apache.spark.partial.{ApproximateEvaluator, PartialResult}
@@ -220,6 +221,7 @@ class SparkContext(config: SparkConf) extends Logging {
private var _heartbeater: Heartbeater = _
private var _resources: scala.collection.immutable.Map[String, ResourceInformation] = _
private var _shuffleDriverComponents: ShuffleDriverComponents = _
+ private var _plugins: Option[PluginContainer] = None
/* ------------------------------------------------------------------------------------- *
| Accessors and public fields. These provide access to the internal state of the |
@@ -539,6 +541,9 @@ class SparkContext(config: SparkConf) extends Logging {
_heartbeatReceiver = env.rpcEnv.setupEndpoint(
HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
+ // Initialize any plugins before the task scheduler is initialized.
+ _plugins = PluginContainer(this)
+
// Create and start the scheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
@@ -621,6 +626,7 @@ class SparkContext(config: SparkConf) extends Logging {
_env.metricsSystem.registerSource(e.executorAllocationManagerSource)
}
appStatusSource.foreach(_env.metricsSystem.registerSource(_))
+ _plugins.foreach(_.registerMetrics(applicationId))
// Make sure the context is stopped if the user forgets about it. This avoids leaving
// unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM
// is killed, though.
@@ -1976,6 +1982,9 @@ class SparkContext(config: SparkConf) extends Logging {
_listenerBusStarted = false
}
}
+ Utils.tryLogNonFatalError {
+ _plugins.foreach(_.shutdown())
+ }
Utils.tryLogNonFatalError {
_eventLogger.foreach(_.stop())
}
diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala
index ce6d0322bafd5..0f595d095a229 100644
--- a/core/src/main/scala/org/apache/spark/executor/Executor.scala
+++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala
@@ -37,6 +37,7 @@ import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
+import org.apache.spark.internal.plugin.PluginContainer
import org.apache.spark.memory.{SparkOutOfMemoryError, TaskMemoryManager}
import org.apache.spark.metrics.source.JVMCPUSource
import org.apache.spark.rpc.RpcTimeout
@@ -165,6 +166,11 @@ private[spark] class Executor(
}
}
+ // Plugins need to load using a class loader that includes the executor's user classpath
+ private val plugins: Option[PluginContainer] = Utils.withContextClassLoader(replClassLoader) {
+ PluginContainer(env)
+ }
+
// Max size of direct result. If task result is bigger than this, we use the block manager
// to send the result back.
private val maxDirectResultSize = Math.min(
@@ -297,6 +303,7 @@ private[spark] class Executor(
logWarning("Plugin " + plugin.getClass().getCanonicalName() + " shutdown failed", e)
}
}
+ plugins.foreach(_.shutdown())
}
if (!isLocal) {
env.stop()
diff --git a/core/src/main/scala/org/apache/spark/internal/config/package.scala b/core/src/main/scala/org/apache/spark/internal/config/package.scala
index 36211dc2ed4f8..35811aeec1988 100644
--- a/core/src/main/scala/org/apache/spark/internal/config/package.scala
+++ b/core/src/main/scala/org/apache/spark/internal/config/package.scala
@@ -1159,6 +1159,17 @@ package object config {
s"The value must be in allowed range [1,048,576, ${MAX_BUFFER_SIZE_BYTES}].")
.createWithDefault(1024 * 1024)
+ private[spark] val DEFAULT_PLUGINS_LIST = "spark.plugins.defaultList"
+
+ private[spark] val PLUGINS =
+ ConfigBuilder("spark.plugins")
+ .withPrepended(DEFAULT_PLUGINS_LIST, separator = ",")
+ .doc("Comma-separated list of class names implementing " +
+ "org.apache.spark.api.plugin.SparkPlugin to load into the application.")
+ .stringConf
+ .toSequence
+ .createWithDefault(Nil)
+
private[spark] val EXECUTOR_PLUGINS =
ConfigBuilder("spark.executor.plugins")
.doc("Comma-separated list of class names for \"plugins\" implementing " +
diff --git a/core/src/main/scala/org/apache/spark/internal/plugin/PluginContainer.scala b/core/src/main/scala/org/apache/spark/internal/plugin/PluginContainer.scala
new file mode 100644
index 0000000000000..fc7a9d85957c0
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/internal/plugin/PluginContainer.scala
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.plugin
+
+import scala.collection.JavaConverters._
+import scala.util.{Either, Left, Right}
+
+import org.apache.spark.{SparkContext, SparkEnv}
+import org.apache.spark.api.plugin._
+import org.apache.spark.internal.Logging
+import org.apache.spark.internal.config._
+import org.apache.spark.util.Utils
+
+sealed abstract class PluginContainer {
+
+ def shutdown(): Unit
+ def registerMetrics(appId: String): Unit
+
+}
+
+private class DriverPluginContainer(sc: SparkContext, plugins: Seq[SparkPlugin])
+ extends PluginContainer with Logging {
+
+ private val driverPlugins: Seq[(String, DriverPlugin, PluginContextImpl)] = plugins.flatMap { p =>
+ val driverPlugin = p.driverPlugin()
+ if (driverPlugin != null) {
+ val name = p.getClass().getName()
+ val ctx = new PluginContextImpl(name, sc.env.rpcEnv, sc.env.metricsSystem, sc.conf,
+ sc.env.executorId)
+
+ val extraConf = driverPlugin.init(sc, ctx)
+ if (extraConf != null) {
+ extraConf.asScala.foreach { case (k, v) =>
+ sc.conf.set(s"${PluginContainer.EXTRA_CONF_PREFIX}$name.$k", v)
+ }
+ }
+ logInfo(s"Initialized driver component for plugin $name.")
+ Some((p.getClass().getName(), driverPlugin, ctx))
+ } else {
+ None
+ }
+ }
+
+ if (driverPlugins.nonEmpty) {
+ val pluginsByName = driverPlugins.map { case (name, plugin, _) => (name, plugin) }.toMap
+ sc.env.rpcEnv.setupEndpoint(classOf[PluginEndpoint].getName(),
+ new PluginEndpoint(pluginsByName, sc.env.rpcEnv))
+ }
+
+ override def registerMetrics(appId: String): Unit = {
+ driverPlugins.foreach { case (_, plugin, ctx) =>
+ plugin.registerMetrics(appId, ctx)
+ ctx.registerMetrics()
+ }
+ }
+
+ override def shutdown(): Unit = {
+ driverPlugins.foreach { case (name, plugin, _) =>
+ try {
+ logDebug(s"Stopping plugin $name.")
+ plugin.shutdown()
+ } catch {
+ case t: Throwable =>
+ logInfo(s"Exception while shutting down plugin $name.", t)
+ }
+ }
+ }
+
+}
+
+private class ExecutorPluginContainer(env: SparkEnv, plugins: Seq[SparkPlugin])
+ extends PluginContainer with Logging {
+
+ private val executorPlugins: Seq[(String, ExecutorPlugin)] = {
+ val allExtraConf = env.conf.getAllWithPrefix(PluginContainer.EXTRA_CONF_PREFIX)
+
+ plugins.flatMap { p =>
+ val executorPlugin = p.executorPlugin()
+ if (executorPlugin != null) {
+ val name = p.getClass().getName()
+ val prefix = name + "."
+ val extraConf = allExtraConf
+ .filter { case (k, v) => k.startsWith(prefix) }
+ .map { case (k, v) => k.substring(prefix.length()) -> v }
+ .toMap
+ .asJava
+ val ctx = new PluginContextImpl(name, env.rpcEnv, env.metricsSystem, env.conf,
+ env.executorId)
+ executorPlugin.init(ctx, extraConf)
+ ctx.registerMetrics()
+
+ logInfo(s"Initialized executor component for plugin $name.")
+ Some(p.getClass().getName() -> executorPlugin)
+ } else {
+ None
+ }
+ }
+ }
+
+ override def registerMetrics(appId: String): Unit = {
+ throw new IllegalStateException("Should not be called for the executor container.")
+ }
+
+ override def shutdown(): Unit = {
+ executorPlugins.foreach { case (name, plugin) =>
+ try {
+ logDebug(s"Stopping plugin $name.")
+ plugin.shutdown()
+ } catch {
+ case t: Throwable =>
+ logInfo(s"Exception while shutting down plugin $name.", t)
+ }
+ }
+ }
+}
+
+object PluginContainer {
+
+ val EXTRA_CONF_PREFIX = "spark.plugins.internal.conf."
+
+ def apply(sc: SparkContext): Option[PluginContainer] = PluginContainer(Left(sc))
+
+ def apply(env: SparkEnv): Option[PluginContainer] = PluginContainer(Right(env))
+
+ private def apply(ctx: Either[SparkContext, SparkEnv]): Option[PluginContainer] = {
+ val conf = ctx.fold(_.conf, _.conf)
+ val plugins = Utils.loadExtensions(classOf[SparkPlugin], conf.get(PLUGINS).distinct, conf)
+ if (plugins.nonEmpty) {
+ ctx match {
+ case Left(sc) => Some(new DriverPluginContainer(sc, plugins))
+ case Right(env) => Some(new ExecutorPluginContainer(env, plugins))
+ }
+ } else {
+ None
+ }
+ }
+}
diff --git a/core/src/main/scala/org/apache/spark/internal/plugin/PluginContextImpl.scala b/core/src/main/scala/org/apache/spark/internal/plugin/PluginContextImpl.scala
new file mode 100644
index 0000000000000..279f3d388fb2e
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/internal/plugin/PluginContextImpl.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.plugin
+
+import com.codahale.metrics.MetricRegistry
+
+import org.apache.spark.{SparkConf, SparkException}
+import org.apache.spark.api.plugin.PluginContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.metrics.MetricsSystem
+import org.apache.spark.metrics.source.Source
+import org.apache.spark.rpc.RpcEnv
+import org.apache.spark.util.RpcUtils
+
+private class PluginContextImpl(
+ pluginName: String,
+ rpcEnv: RpcEnv,
+ metricsSystem: MetricsSystem,
+ override val conf: SparkConf,
+ override val executorID: String)
+ extends PluginContext with Logging {
+
+ override def hostname(): String = rpcEnv.address.hostPort.split(":")(0)
+
+ private val registry = new MetricRegistry()
+
+ private lazy val driverEndpoint = try {
+ RpcUtils.makeDriverRef(classOf[PluginEndpoint].getName(), conf, rpcEnv)
+ } catch {
+ case e: Exception =>
+ logWarning(s"Failed to create driver plugin endpoint ref.", e)
+ null
+ }
+
+ override def metricRegistry(): MetricRegistry = registry
+
+ override def send(message: AnyRef): Unit = {
+ if (driverEndpoint == null) {
+ throw new IllegalStateException("Driver endpoint is not known.")
+ }
+ driverEndpoint.send(PluginMessage(pluginName, message))
+ }
+
+ override def ask(message: AnyRef): AnyRef = {
+ try {
+ if (driverEndpoint != null) {
+ driverEndpoint.askSync[AnyRef](PluginMessage(pluginName, message))
+ } else {
+ throw new IllegalStateException("Driver endpoint is not known.")
+ }
+ } catch {
+ case e: SparkException if e.getCause() != null =>
+ throw e.getCause()
+ }
+ }
+
+ def registerMetrics(): Unit = {
+ if (!registry.getMetrics().isEmpty()) {
+ val src = new PluginMetricsSource(s"plugin.$pluginName", registry)
+ metricsSystem.registerSource(src)
+ }
+ }
+
+ class PluginMetricsSource(
+ override val sourceName: String,
+ override val metricRegistry: MetricRegistry)
+ extends Source
+
+}
diff --git a/core/src/main/scala/org/apache/spark/internal/plugin/PluginEndpoint.scala b/core/src/main/scala/org/apache/spark/internal/plugin/PluginEndpoint.scala
new file mode 100644
index 0000000000000..9a59b6bf678f9
--- /dev/null
+++ b/core/src/main/scala/org/apache/spark/internal/plugin/PluginEndpoint.scala
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.plugin
+
+import org.apache.spark.api.plugin.DriverPlugin
+import org.apache.spark.internal.Logging
+import org.apache.spark.rpc.{IsolatedRpcEndpoint, RpcCallContext, RpcEnv}
+
+case class PluginMessage(pluginName: String, message: AnyRef)
+
+private class PluginEndpoint(
+ plugins: Map[String, DriverPlugin],
+ override val rpcEnv: RpcEnv)
+ extends IsolatedRpcEndpoint with Logging {
+
+ override def receive: PartialFunction[Any, Unit] = {
+ case PluginMessage(pluginName, message) =>
+ plugins.get(pluginName) match {
+ case Some(plugin) =>
+ try {
+ val reply = plugin.receive(message)
+ if (reply != null) {
+ logInfo(
+ s"Plugin $pluginName returned reply for one-way message of type " +
+ s"${message.getClass().getName()}.")
+ }
+ } catch {
+ case e: Exception =>
+ logWarning(s"Error in plugin $pluginName when handling message of type " +
+ s"${message.getClass().getName()}.", e)
+ }
+
+ case None =>
+ throw new IllegalArgumentException(s"Received message for unknown plugin $pluginName.")
+ }
+ }
+
+ override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
+ case PluginMessage(pluginName, message) =>
+ plugins.get(pluginName) match {
+ case Some(plugin) =>
+ context.reply(plugin.receive(message))
+
+ case None =>
+ throw new IllegalArgumentException(s"Received message for unknown plugin $pluginName.")
+ }
+ }
+
+}
diff --git a/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala b/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala
new file mode 100644
index 0000000000000..24fa017363654
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.internal.plugin
+
+import java.io.File
+import java.nio.charset.StandardCharsets
+import java.util.{Map => JMap}
+
+import scala.collection.JavaConverters._
+import scala.concurrent.duration._
+
+import com.codahale.metrics.Gauge
+import com.google.common.io.Files
+import org.mockito.ArgumentMatchers.{any, eq => meq}
+import org.mockito.Mockito.{mock, spy, verify, when}
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.concurrent.Eventually.{eventually, interval, timeout}
+
+import org.apache.spark.{ExecutorPlugin => _, _}
+import org.apache.spark.api.plugin._
+import org.apache.spark.internal.config._
+import org.apache.spark.launcher.SparkLauncher
+import org.apache.spark.util.Utils
+
+class PluginContainerSuite extends SparkFunSuite with BeforeAndAfterEach with LocalSparkContext {
+
+ override def afterEach(): Unit = {
+ TestSparkPlugin.reset()
+ super.afterEach()
+ }
+
+ test("plugin initialization and communication") {
+ val conf = new SparkConf()
+ .setAppName(getClass().getName())
+ .set(SparkLauncher.SPARK_MASTER, "local[1]")
+ .set(PLUGINS, Seq(classOf[TestSparkPlugin].getName()))
+
+ TestSparkPlugin.extraConf = Map("foo" -> "bar", "bar" -> "baz").asJava
+
+ sc = new SparkContext(conf)
+
+ assert(TestSparkPlugin.driverPlugin != null)
+ verify(TestSparkPlugin.driverPlugin).init(meq(sc), any())
+
+ assert(TestSparkPlugin.executorPlugin != null)
+ verify(TestSparkPlugin.executorPlugin).init(any(), meq(TestSparkPlugin.extraConf))
+
+ assert(TestSparkPlugin.executorContext != null)
+
+ // One way messages don't block, so need to loop checking whether it arrives.
+ TestSparkPlugin.executorContext.send("oneway")
+ eventually(timeout(10.seconds), interval(10.millis)) {
+ verify(TestSparkPlugin.driverPlugin).receive("oneway")
+ }
+
+ assert(TestSparkPlugin.executorContext.ask("ask") === "reply")
+
+ val err = intercept[Exception] {
+ TestSparkPlugin.executorContext.ask("unknown message")
+ }
+ assert(err.getMessage().contains("unknown message"))
+
+ // It should be possible for the driver plugin to send a message to itself, even if that doesn't
+ // make a whole lot of sense. It at least allows the same context class to be used on both
+ // sides.
+ assert(TestSparkPlugin.driverContext != null)
+ assert(TestSparkPlugin.driverContext.ask("ask") === "reply")
+
+ val metricSources = sc.env.metricsSystem
+ .getSourcesByName(s"plugin.${classOf[TestSparkPlugin].getName()}")
+ assert(metricSources.size === 2)
+
+ def findMetric(name: String): Int = {
+ val allFound = metricSources.filter(_.metricRegistry.getGauges().containsKey(name))
+ assert(allFound.size === 1)
+ allFound.head.metricRegistry.getGauges().get(name).asInstanceOf[Gauge[Int]].getValue()
+ }
+
+ assert(findMetric("driverMetric") === 42)
+ assert(findMetric("executorMetric") === 84)
+
+ sc.stop()
+ sc = null
+
+ verify(TestSparkPlugin.driverPlugin).shutdown()
+ verify(TestSparkPlugin.executorPlugin).shutdown()
+ }
+
+ test("do nothing if plugins are not configured") {
+ val conf = new SparkConf()
+ val env = mock(classOf[SparkEnv])
+ when(env.conf).thenReturn(conf)
+ assert(PluginContainer(env) === None)
+ }
+
+ test("merging of config options") {
+ val conf = new SparkConf()
+ .setAppName(getClass().getName())
+ .set(SparkLauncher.SPARK_MASTER, "local[1]")
+ .set(PLUGINS, Seq(classOf[TestSparkPlugin].getName()))
+ .set(DEFAULT_PLUGINS_LIST, classOf[TestSparkPlugin].getName())
+
+ assert(conf.get(PLUGINS).size === 2)
+
+ sc = new SparkContext(conf)
+ // Just check plugin is loaded. The plugin code below checks whether a single copy was loaded.
+ assert(TestSparkPlugin.driverPlugin != null)
+ }
+
+ test("plugin initialization in non-local mode") {
+ val path = Utils.createTempDir()
+
+ val conf = new SparkConf()
+ .setAppName(getClass().getName())
+ .set(SparkLauncher.SPARK_MASTER, "local-cluster[2,1,1024]")
+ .set(PLUGINS, Seq(classOf[NonLocalModeSparkPlugin].getName()))
+ .set(NonLocalModeSparkPlugin.TEST_PATH_CONF, path.getAbsolutePath())
+
+ sc = new SparkContext(conf)
+ TestUtils.waitUntilExecutorsUp(sc, 2, 10000)
+
+ eventually(timeout(10.seconds), interval(100.millis)) {
+ val children = path.listFiles()
+ assert(children != null)
+ assert(children.length >= 3)
+ }
+ }
+}
+
+class NonLocalModeSparkPlugin extends SparkPlugin {
+
+ override def driverPlugin(): DriverPlugin = {
+ new DriverPlugin() {
+ override def init(sc: SparkContext, ctx: PluginContext): JMap[String, String] = {
+ NonLocalModeSparkPlugin.writeFile(ctx.conf(), ctx.executorID())
+ Map.empty.asJava
+ }
+ }
+ }
+
+ override def executorPlugin(): ExecutorPlugin = {
+ new ExecutorPlugin() {
+ override def init(ctx: PluginContext, extraConf: JMap[String, String]): Unit = {
+ NonLocalModeSparkPlugin.writeFile(ctx.conf(), ctx.executorID())
+ }
+ }
+ }
+}
+
+object NonLocalModeSparkPlugin {
+ val TEST_PATH_CONF = "spark.nonLocalPlugin.path"
+
+ def writeFile(conf: SparkConf, id: String): Unit = {
+ val path = conf.get(TEST_PATH_CONF)
+ Files.write(id, new File(path, id), StandardCharsets.UTF_8)
+ }
+}
+
+class TestSparkPlugin extends SparkPlugin {
+
+ override def driverPlugin(): DriverPlugin = {
+ val p = new TestDriverPlugin()
+ require(TestSparkPlugin.driverPlugin == null, "Driver plugin already initialized.")
+ TestSparkPlugin.driverPlugin = spy(p)
+ TestSparkPlugin.driverPlugin
+ }
+
+ override def executorPlugin(): ExecutorPlugin = {
+ val p = new TestExecutorPlugin()
+ require(TestSparkPlugin.executorPlugin == null, "Executor plugin already initialized.")
+ TestSparkPlugin.executorPlugin = spy(p)
+ TestSparkPlugin.executorPlugin
+ }
+
+}
+
+private class TestDriverPlugin extends DriverPlugin {
+
+ override def init(sc: SparkContext, ctx: PluginContext): JMap[String, String] = {
+ TestSparkPlugin.driverContext = ctx
+ TestSparkPlugin.extraConf
+ }
+
+ override def registerMetrics(appId: String, ctx: PluginContext): Unit = {
+ ctx.metricRegistry().register("driverMetric", new Gauge[Int] {
+ override def getValue(): Int = 42
+ })
+ }
+
+ override def receive(msg: AnyRef): AnyRef = msg match {
+ case "oneway" => null
+ case "ask" => "reply"
+ case other => throw new IllegalArgumentException(s"unknown: $other")
+ }
+
+}
+
+private class TestExecutorPlugin extends ExecutorPlugin {
+
+ override def init(ctx: PluginContext, extraConf: JMap[String, String]): Unit = {
+ ctx.metricRegistry().register("executorMetric", new Gauge[Int] {
+ override def getValue(): Int = 84
+ })
+ TestSparkPlugin.executorContext = ctx
+ }
+
+}
+
+private object TestSparkPlugin {
+ var driverPlugin: TestDriverPlugin = _
+ var driverContext: PluginContext = _
+
+ var executorPlugin: TestExecutorPlugin = _
+ var executorContext: PluginContext = _
+
+ var extraConf: JMap[String, String] = _
+
+ def reset(): Unit = {
+ driverPlugin = null
+ driverContext = null
+ executorPlugin = null
+ executorContext = null
+ extraConf = null
+ }
+}
diff --git a/docs/monitoring.md b/docs/monitoring.md
index 8cb237df0ba70..4062e16a25d34 100644
--- a/docs/monitoring.md
+++ b/docs/monitoring.md
@@ -991,6 +991,11 @@ This is the component with the largest amount of instrumented metrics
- namespace=JVMCPU
- jvmCpuTime
+- namespace=plugin.\spark.rpc.askTimeout) elapses.
+ * spark.plugins
+- spark.plugins.defaultList
+
+Both take a comma-separated list of class names that implement the
+org.apache.spark.api.plugin.SparkPlugin interface. The two names exist so that it's
+possible for one list to be placed in the Spark default config file, allowing users to
+easily add other plugins from the command line without overwriting the config file's list. Duplicate
+plugins are ignored.
+
+Distribution of the jar files containing the plugin code is currently not done by Spark. The user
+or admin should make sure that the jar files are available to Spark applications, for example, by
+including the plugin jar with the Spark distribution. The exception to this rule is the YARN
+backend, where the --jars command line option (or equivalent config entry) can be
+used to make the plugin code available to both executors and cluster-mode drivers.