From 681b36f5fb63e14dc89e17813894227be9e2324f Mon Sep 17 00:00:00 2001 From: nravi Date: Thu, 8 May 2014 00:05:33 -0700 Subject: [PATCH 1/4] Fix for SPARK-1758: failing test org.apache.spark.JavaAPISuite.wholeTextFiles The prefix "file:" is missing in the string inserted as key in HashMap --- core/src/test/java/org/apache/spark/JavaAPISuite.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index 7193223addf66..dc5d3261f98e4 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -639,8 +639,8 @@ public void wholeTextFiles() throws IOException { ds.close(); HashMap container = new HashMap(); - container.put(tempDirName+"/part-00000", new Text(content1).toString()); - container.put(tempDirName+"/part-00001", new Text(content2).toString()); + container.put("file:" + tempDirName+"/part-00000", new Text(content1).toString()); + container.put("file:" + tempDirName+"/part-00001", new Text(content2).toString()); JavaPairRDD readRDD = sc.wholeTextFiles(tempDirName, 3); List> result = readRDD.collect(); From 5108700230fd70b995e76598f49bdf328c971e77 Mon Sep 17 00:00:00 2001 From: nravi Date: Tue, 3 Jun 2014 15:25:22 -0700 Subject: [PATCH 2/4] Fix in Spark for the Concurrent thread modification issue (SPARK-1097, HADOOP-10456) --- core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 6547755764dcf..8346d25a29d49 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -126,7 +126,7 @@ class HadoopRDD[K, V]( private val createTime = new Date() // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads. - protected def getJobConf(): JobConf = { + protected def getJobConf(): JobConf = synchronized { val conf: Configuration = broadcastedConf.value.value if (conf.isInstanceOf[JobConf]) { // A user-broadcasted JobConf was provided to the HadoopRDD, so always use it. From 6b840f017870207d23e75de224710971ada0b3d0 Mon Sep 17 00:00:00 2001 From: nravi Date: Tue, 3 Jun 2014 15:34:02 -0700 Subject: [PATCH 3/4] Undo the fix for SPARK-1758 (the problem is fixed) --- core/src/test/java/org/apache/spark/JavaAPISuite.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index dc5d3261f98e4..7193223addf66 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -639,8 +639,8 @@ public void wholeTextFiles() throws IOException { ds.close(); HashMap container = new HashMap(); - container.put("file:" + tempDirName+"/part-00000", new Text(content1).toString()); - container.put("file:" + tempDirName+"/part-00001", new Text(content2).toString()); + container.put(tempDirName+"/part-00000", new Text(content1).toString()); + container.put(tempDirName+"/part-00001", new Text(content2).toString()); JavaPairRDD readRDD = sc.wholeTextFiles(tempDirName, 3); List> result = readRDD.collect(); From df2aeb179fca4fc893803c72a657317f5b5539d7 Mon Sep 17 00:00:00 2001 From: nravi Date: Mon, 9 Jun 2014 12:02:59 -0700 Subject: [PATCH 4/4] Improved fix for ConcurrentModificationIssue (Spark-1097, Hadoop-10456) --- .../main/scala/org/apache/spark/rdd/HadoopRDD.scala | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 8346d25a29d49..2aa111d600e9b 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -126,7 +126,7 @@ class HadoopRDD[K, V]( private val createTime = new Date() // Returns a JobConf that will be used on slaves to obtain input splits for Hadoop reads. - protected def getJobConf(): JobConf = synchronized { + protected def getJobConf(): JobConf = { val conf: Configuration = broadcastedConf.value.value if (conf.isInstanceOf[JobConf]) { // A user-broadcasted JobConf was provided to the HadoopRDD, so always use it. @@ -139,10 +139,13 @@ class HadoopRDD[K, V]( // Create a JobConf that will be cached and used across this RDD's getJobConf() calls in the // local process. The local cache is accessed through HadoopRDD.putCachedMetadata(). // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects. - val newJobConf = new JobConf(broadcastedConf.value.value) - initLocalJobConfFuncOpt.map(f => f(newJobConf)) - HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf) - newJobConf + // synchronize to prevent ConcurrentModificationException (Spark-1097, Hadoop-10456) + broadcastedConf.synchronized { + val newJobConf = new JobConf(broadcastedConf.value.value) + initLocalJobConfFuncOpt.map(f => f(newJobConf)) + HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf) + newJobConf + } } }