Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .rat-excludes
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ spark-env.sh.template
log4j-defaults.properties
bootstrap-tooltip.js
jquery-1.11.1.min.js
d3.min.js
dagre-d3.min.js
graphlib-dot.min.js
sorttable.js
.*avsc
.*txt
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/resources/org/apache/spark/ui/static/d3.min.js

Large diffs are not rendered by default.

30 changes: 30 additions & 0 deletions core/src/main/resources/org/apache/spark/ui/static/dagre-d3.min.js

Large diffs are not rendered by default.

Large diffs are not rendered by default.

130 changes: 130 additions & 0 deletions core/src/main/resources/org/apache/spark/ui/static/spark-stage-viz.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

var stageVizIsRendered = false

/*
* Render or remove the stage visualization on the UI.
* This assumes that the visualization is stored in the "#viz-graph" element.
*/
function toggleStageViz() {
$(".expand-visualization-arrow").toggleClass('arrow-closed');
$(".expand-visualization-arrow").toggleClass('arrow-open');
var shouldRender = $(".expand-visualization-arrow").hasClass("arrow-open");
if (shouldRender) {
// If the viz is already rendered, just show it
if (stageVizIsRendered) {
$("#viz-graph").show();
} else {
renderStageViz();
stageVizIsRendered = true;
}
} else {
// Instead of emptying the element once and for all, cache it for use
// again later in case we want to expand the visualization again
$("#viz-graph").hide();
}
}

/*
* Render a DAG that describes the RDDs for a given stage.
*
* Input: The content of a dot file, stored in the text of the "#viz-dot-file" element
* Output: An SVG that visualizes the DAG, stored in the "#viz-graph" element
*
* This relies on a custom implementation of dagre-d3, which can be found under
* http://github.com/andrewor14/dagre-d3/dist/dagre-d3.js. For more detail, please
* track the changes in that project after it was forked.
*/
function renderStageViz() {

// If there is not a dot file to render, report error
if (d3.select("#viz-dot-file").empty()) {
d3.select("#viz-graph")
.append("div")
.text("No visualization information available for this stage.");
return;
}

// Parse the dot file and render it in an SVG
var dot = d3.select("#viz-dot-file").text();
var escaped_dot = dot
.replace(/&lt;/g, "<")
.replace(/&gt;/g, ">")
.replace(/&quot;/g, "\"");
var g = graphlibDot.read(escaped_dot);
var render = new dagreD3.render();
var svg = d3.select("#viz-graph").append("svg");
svg.call(render, g);

// Set the appropriate SVG dimensions to ensure that all elements are displayed
var svgMargin = 20;
var boundingBox = svg.node().getBBox();
svg.style("width", (boundingBox.width + svgMargin) + "px");
svg.style("height", (boundingBox.height + svgMargin) + "px");

// Add style to clusters, nodes and edges
d3.selectAll("svg g.cluster rect")
.style("fill", "none")
.style("stroke", "#AADFFF")
.style("stroke-width", "4px")
.style("stroke-opacity", "0.5");
d3.selectAll("svg g.node rect")
.style("fill", "white")
.style("stroke", "black")
.style("stroke-width", "2px")
.style("fill-opacity", "0.8")
.style("stroke-opacity", "0.9");
d3.selectAll("svg g.edgePath path")
.style("stroke", "black")
.style("stroke-width", "2px");

// Add labels to clusters
d3.selectAll("svg g.cluster").each(function(cluster_data) {
var cluster = d3.select(this);
cluster.selectAll("rect").each(function(rect_data) {
var rect = d3.select(this);
// Shift the boxes up a little
rect.attr("y", toFloat(rect.attr("y")) - 10);
rect.attr("height", toFloat(rect.attr("height")) + 10);
var labelX = toFloat(rect.attr("x")) + toFloat(rect.attr("width")) - 5;
var labelY = toFloat(rect.attr("y")) + 15;
var labelText = cluster.attr("id").replace(/^cluster/, "").replace(/_.*$/, "");
cluster.append("text")
.attr("x", labelX)
.attr("y", labelY)
.attr("fill", "#AAAAAA")
.attr("font-size", "11px")
.attr("text-anchor", "end")
.text(labelText);
});
});

// We have shifted a few elements upwards, so we should fix the SVG views
var startX = -svgMargin;
var startY = -svgMargin;
var endX = toFloat(svg.style("width")) + svgMargin;
var endY = toFloat(svg.style("height")) + svgMargin;
var newViewBox = startX + " " + startY + " " + endX + " " + endY;
svg.attr("viewBox", newViewBox);
}

/* Helper method to convert attributes to numeric values. */
function toFloat(f) {
return parseFloat(f.replace(/px$/, ""))
}

Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ pre {
border: none;
}

span.expand-additional-metrics {
span.expand-additional-metrics, span.expand-visualization {
cursor: pointer;
}

Expand Down
27 changes: 24 additions & 3 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat => NewFileInputFor

import org.apache.mesos.MesosNativeLibrary

import org.apache.spark.annotation.{DeveloperApi, Experimental}
import org.apache.spark.annotation.{DeveloperApi, Experimental, RDDScoped}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.{LocalSparkCluster, SparkHadoopUtil}
import org.apache.spark.executor.{ExecutorEndpoint, TriggerThreadDump}
Expand Down Expand Up @@ -641,6 +641,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @note avoid using `parallelize(Seq())` to create an empty `RDD`. Consider `emptyRDD` for an
* RDD with no partitions, or `parallelize(Seq[T]())` for an RDD of `T` with empty partitions.
*/
@RDDScoped
def parallelize[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
Expand All @@ -650,13 +651,15 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*
* This method is identical to `parallelize`.
*/
@RDDScoped
def makeRDD[T: ClassTag](seq: Seq[T], numSlices: Int = defaultParallelism): RDD[T] = {
parallelize(seq, numSlices)
}

/** Distribute a local Scala collection to form an RDD, with one or more
* location preferences (hostnames of Spark nodes) for each object.
* Create a new partition for each collection item. */
@RDDScoped
def makeRDD[T: ClassTag](seq: Seq[(T, Seq[String])]): RDD[T] = {
assertNotStopped()
val indexToPrefs = seq.zipWithIndex.map(t => (t._2, t._1._2)).toMap
Expand All @@ -667,10 +670,11 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.
*/
@RDDScoped
def textFile(path: String, minPartitions: Int = defaultMinPartitions): RDD[String] = {
assertNotStopped()
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
minPartitions).map(pair => pair._2.toString).setName(path)
minPartitions).map(pair => pair._2.toString)
}

/**
Expand Down Expand Up @@ -700,6 +704,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
*
* @param minPartitions A suggestion value of the minimal splitting number for input data.
*/
@RDDScoped
def wholeTextFiles(path: String, minPartitions: Int = defaultMinPartitions):
RDD[(String, String)] = {
assertNotStopped()
Expand Down Expand Up @@ -746,6 +751,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @note Small files are preferred; very large files may cause bad performance.
*/
@Experimental
@RDDScoped
def binaryFiles(path: String, minPartitions: Int = defaultMinPartitions):
RDD[(String, PortableDataStream)] = {
assertNotStopped()
Expand Down Expand Up @@ -774,6 +780,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* @return An RDD of data with values, represented as byte arrays
*/
@Experimental
@RDDScoped
def binaryRecords(path: String, recordLength: Int, conf: Configuration = hadoopConfiguration)
: RDD[Array[Byte]] = {
assertNotStopped()
Expand Down Expand Up @@ -811,6 +818,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
@RDDScoped
def hadoopRDD[K, V](
conf: JobConf,
inputFormatClass: Class[_ <: InputFormat[K, V]],
Expand All @@ -832,6 +840,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
@RDDScoped
def hadoopFile[K, V](
path: String,
inputFormatClass: Class[_ <: InputFormat[K, V]],
Expand All @@ -850,7 +859,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
inputFormatClass,
keyClass,
valueClass,
minPartitions).setName(path)
minPartitions).setName(s"HadoopRDD[$path]")
}

/**
Expand All @@ -867,6 +876,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
@RDDScoped
def hadoopFile[K, V, F <: InputFormat[K, V]]
(path: String, minPartitions: Int)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
Expand All @@ -891,11 +901,13 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
@RDDScoped
def hadoopFile[K, V, F <: InputFormat[K, V]](path: String)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] =
hadoopFile[K, V, F](path, defaultMinPartitions)

/** Get an RDD for a Hadoop file with an arbitrary new API InputFormat. */
@RDDScoped
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]]
(path: String)
(implicit km: ClassTag[K], vm: ClassTag[V], fm: ClassTag[F]): RDD[(K, V)] = {
Expand All @@ -916,6 +928,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
@RDDScoped
def newAPIHadoopFile[K, V, F <: NewInputFormat[K, V]](
path: String,
fClass: Class[F],
Expand Down Expand Up @@ -949,6 +962,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
@RDDScoped
def newAPIHadoopRDD[K, V, F <: NewInputFormat[K, V]](
conf: Configuration = hadoopConfiguration,
fClass: Class[F],
Expand All @@ -969,6 +983,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
@RDDScoped
def sequenceFile[K, V](path: String,
keyClass: Class[K],
valueClass: Class[V],
Expand All @@ -987,6 +1002,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
* */
@RDDScoped
def sequenceFile[K, V](path: String, keyClass: Class[K], valueClass: Class[V]): RDD[(K, V)] = {
assertNotStopped()
sequenceFile(path, keyClass, valueClass, defaultMinPartitions)
Expand Down Expand Up @@ -1014,6 +1030,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* If you plan to directly cache, sort, or aggregate Hadoop writable objects, you should first
* copy them using a `map` function.
*/
@RDDScoped
def sequenceFile[K, V]
(path: String, minPartitions: Int = defaultMinPartitions)
(implicit km: ClassTag[K], vm: ClassTag[V],
Expand All @@ -1037,6 +1054,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
* though the nice thing about it is that there's very little effort required to save arbitrary
* objects.
*/
@RDDScoped
def objectFile[T: ClassTag](
path: String,
minPartitions: Int = defaultMinPartitions
Expand All @@ -1046,13 +1064,15 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
.flatMap(x => Utils.deserialize[Array[T]](x._2.getBytes, Utils.getContextOrSparkClassLoader))
}

@RDDScoped
protected[spark] def checkpointFile[T: ClassTag](
path: String
): RDD[T] = {
new CheckpointRDD[T](this, path)
}

/** Build the union of a list of RDDs. */
@RDDScoped
def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = {
val partitioners = rdds.flatMap(_.partitioner).toSet
if (partitioners.size == 1) {
Expand All @@ -1063,6 +1083,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
}

/** Build the union of a list of RDDs passed as variable-length arguments. */
@RDDScoped
def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T] =
union(Seq(first) ++ rest)

Expand Down
29 changes: 29 additions & 0 deletions core/src/main/scala/org/apache/spark/annotation/RDDScoped.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.annotation;

import java.lang.annotation.*;

/**
* An annotation to mark a method as an RDD operation that encloses its body in a scope.
* This is used to compute the scope of an RDD when it is instantiated.
*/
// TODO: This should really be private[spark]
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD})
public @interface RDDScoped {}
Loading