From e2cda3dc3e566ecbf6d5828d25f0439147706270 Mon Sep 17 00:00:00 2001 From: Raymond Liu Date: Tue, 1 Jul 2014 16:33:33 +0800 Subject: [PATCH 1/2] Workaround Hadoop conf ConcurrentModification issue --- core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index 98dcbf4e2dbfa..fb3eb8d7563bb 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -141,7 +141,7 @@ class HadoopRDD[K, V]( // local process. The local cache is accessed through HadoopRDD.putCachedMetadata(). // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects. // synchronize to prevent ConcurrentModificationException (Spark-1097, Hadoop-10456) - broadcastedConf.synchronized { + broadcastedConf.value.value.synchronized { val newJobConf = new JobConf(broadcastedConf.value.value) initLocalJobConfFuncOpt.map(f => f(newJobConf)) HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf) From 994e98bee4c0dc9c39e998abf1a7c6fee37ec327 Mon Sep 17 00:00:00 2001 From: Raymond Liu Date: Fri, 4 Jul 2014 09:17:13 +0800 Subject: [PATCH 2/2] Address comments --- core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala index fb3eb8d7563bb..041028514399b 100644 --- a/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala @@ -141,8 +141,8 @@ class HadoopRDD[K, V]( // local process. The local cache is accessed through HadoopRDD.putCachedMetadata(). // The caching helps minimize GC, since a JobConf can contain ~10KB of temporary objects. // synchronize to prevent ConcurrentModificationException (Spark-1097, Hadoop-10456) - broadcastedConf.value.value.synchronized { - val newJobConf = new JobConf(broadcastedConf.value.value) + conf.synchronized { + val newJobConf = new JobConf(conf) initLocalJobConfFuncOpt.map(f => f(newJobConf)) HadoopRDD.putCachedMetadata(jobConfCacheKey, newJobConf) newJobConf