From 97beaf8dddbdd04333e8c113952ca276ffd08a6c Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 3 Mar 2015 10:12:58 -0600 Subject: [PATCH 1/7] SPARK-5949 HighlyCompressedMapStatus needs more classes registered w/ kryo --- .../apache/spark/serializer/KryoSerializer.scala | 8 ++++++++ .../spark/serializer/KryoSerializerSuite.scala | 13 +++++++++++++ 2 files changed, 21 insertions(+) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 02158aa0f866e..ee1c50c591f45 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -24,6 +24,8 @@ import com.esotericsoftware.kryo.{Kryo, KryoException} import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer} import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator} +import org.roaringbitmap.{ArrayContainer, RoaringArray, RoaringBitmap} + import org.apache.spark._ import org.apache.spark.api.python.PythonBroadcast @@ -202,6 +204,12 @@ private[serializer] object KryoSerializer { classOf[GetBlock], classOf[CompressedMapStatus], classOf[HighlyCompressedMapStatus], + classOf[RoaringBitmap], + classOf[RoaringArray], + classOf[RoaringArray.Element], + classOf[Array[RoaringArray.Element]], + classOf[ArrayContainer], + classOf[Array[Short]], classOf[CompactBuffer[_]], classOf[BlockManagerId], classOf[Array[Byte]], diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index a70f67af2e62e..490cf00a607a6 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -17,6 +17,9 @@ package org.apache.spark.serializer +import org.apache.spark.scheduler.HighlyCompressedMapStatus +import org.apache.spark.storage.BlockManagerId + import scala.collection.mutable import scala.reflect.ClassTag @@ -242,6 +245,16 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { ser.newInstance().deserialize[ClassLoaderTestingObject](bytes) } } + + test("registration of HighlyCompressedMapStatus") { + val conf = new SparkConf(false) + conf.set("spark.kryo.registrationRequired", "true") + val hcmo = HighlyCompressedMapStatus(BlockManagerId("exec-1", "host", 1234), Array(0l,2l,5l)) + val ser = new KryoSerializer(conf) + val serInstance = ser.newInstance() + serInstance.serialize(hcmo) + + } } From f9a0b7cf4e0d423b570ae7c748b7782f30e08c9f Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 3 Mar 2015 14:03:28 -0600 Subject: [PATCH 2/7] put primitive array registrations together --- .../main/scala/org/apache/spark/serializer/KryoSerializer.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index ee1c50c591f45..087bb0df758b8 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -209,10 +209,10 @@ private[serializer] object KryoSerializer { classOf[RoaringArray.Element], classOf[Array[RoaringArray.Element]], classOf[ArrayContainer], - classOf[Array[Short]], classOf[CompactBuffer[_]], classOf[BlockManagerId], classOf[Array[Byte]], + classOf[Array[Short]], classOf[BoundedPriorityQueue[_]], classOf[SparkConf] ) From 09610c6ec71e81ae0f4850a1e7a183da0a833ee5 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 3 Mar 2015 14:10:48 -0600 Subject: [PATCH 3/7] formatting --- .../org/apache/spark/serializer/KryoSerializerSuite.scala | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index 490cf00a607a6..7edb0925bf513 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -17,17 +17,16 @@ package org.apache.spark.serializer -import org.apache.spark.scheduler.HighlyCompressedMapStatus -import org.apache.spark.storage.BlockManagerId - import scala.collection.mutable import scala.reflect.ClassTag import com.esotericsoftware.kryo.Kryo import org.scalatest.FunSuite -import org.apache.spark.{SparkConf, SharedSparkContext} +import org.apache.spark.scheduler.HighlyCompressedMapStatus import org.apache.spark.serializer.KryoTest._ +import org.apache.spark.storage.BlockManagerId +import org.apache.spark.{SharedSparkContext, SparkConf} class KryoSerializerSuite extends FunSuite with SharedSparkContext { @@ -253,7 +252,6 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { val ser = new KryoSerializer(conf) val serInstance = ser.newInstance() serInstance.serialize(hcmo) - } } From a5cb74403b73bab3d8f6bbc5a59a25da50c42008 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 3 Mar 2015 14:34:58 -0600 Subject: [PATCH 4/7] update tests to cover both types of RoaringBitmapContainers --- .../spark/serializer/KryoSerializer.scala | 13 +++++++------ .../spark/serializer/KryoSerializerSuite.scala | 17 +++++++++++++---- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 087bb0df758b8..6975acb85073b 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -20,24 +20,23 @@ package org.apache.spark.serializer import java.io.{EOFException, InputStream, OutputStream} import java.nio.ByteBuffer -import com.esotericsoftware.kryo.{Kryo, KryoException} +import scala.reflect.ClassTag + import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer} +import com.esotericsoftware.kryo.{Kryo, KryoException} import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator} -import org.roaringbitmap.{ArrayContainer, RoaringArray, RoaringBitmap} - +import org.roaringbitmap.{ArrayContainer, BitmapContainer, RoaringArray, RoaringBitmap} import org.apache.spark._ import org.apache.spark.api.python.PythonBroadcast import org.apache.spark.broadcast.HttpBroadcast -import org.apache.spark.network.nio.{PutBlock, GotBlock, GetBlock} +import org.apache.spark.network.nio.{GetBlock, GotBlock, PutBlock} import org.apache.spark.scheduler.{CompressedMapStatus, HighlyCompressedMapStatus} import org.apache.spark.storage._ import org.apache.spark.util.BoundedPriorityQueue import org.apache.spark.util.collection.CompactBuffer -import scala.reflect.ClassTag - /** * A Spark serializer that uses the [[https://code.google.com/p/kryo/ Kryo serialization library]]. * @@ -209,10 +208,12 @@ private[serializer] object KryoSerializer { classOf[RoaringArray.Element], classOf[Array[RoaringArray.Element]], classOf[ArrayContainer], + classOf[BitmapContainer], classOf[CompactBuffer[_]], classOf[BlockManagerId], classOf[Array[Byte]], classOf[Array[Short]], + classOf[Array[Long]], classOf[BoundedPriorityQueue[_]], classOf[SparkConf] ) diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index 7edb0925bf513..6a328bdd9099b 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -248,10 +248,19 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { test("registration of HighlyCompressedMapStatus") { val conf = new SparkConf(false) conf.set("spark.kryo.registrationRequired", "true") - val hcmo = HighlyCompressedMapStatus(BlockManagerId("exec-1", "host", 1234), Array(0l,2l,5l)) - val ser = new KryoSerializer(conf) - val serInstance = ser.newInstance() - serInstance.serialize(hcmo) + + // these cases require knowing the internals of RoaringBitmap a little. Blocks span 2^16 + // values, and they use a bitmap (dense) if they have more than 4096 values, and an + // array (sparse) if they use less. So we just create two cases, one sparse and one dense. + // and we use a roaring bitmap for the empty blocks, so we trigger the dense case w/ mostly + // empty blocks + + val ser = new KryoSerializer(conf).newInstance() + val denseBlockSizes = new Array[Long](5000) + val sparseBlockSizes = Array[Long](0l, 1l, 0l, 2l) + Seq(denseBlockSizes, sparseBlockSizes).foreach{ blockSizes => + ser.serialize(HighlyCompressedMapStatus(BlockManagerId("exec-1", "host", 1234), blockSizes)) + } } } From 709bfe098da0c305669b69d0cde64f7335d16874 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 3 Mar 2015 14:43:32 -0600 Subject: [PATCH 5/7] style --- .../scala/org/apache/spark/serializer/KryoSerializerSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index 6a328bdd9099b..8a41f49267a21 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -257,7 +257,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { val ser = new KryoSerializer(conf).newInstance() val denseBlockSizes = new Array[Long](5000) - val sparseBlockSizes = Array[Long](0l, 1l, 0l, 2l) + val sparseBlockSizes = Array[Long](0L, 1L, 0L, 2L) Seq(denseBlockSizes, sparseBlockSizes).foreach{ blockSizes => ser.serialize(HighlyCompressedMapStatus(BlockManagerId("exec-1", "host", 1234), blockSizes)) } From 5f6bb6d3b668757d16455d02d0ea5b469e5282d0 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 3 Mar 2015 14:44:21 -0600 Subject: [PATCH 6/7] more style --- .../main/scala/org/apache/spark/serializer/KryoSerializer.scala | 2 +- .../scala/org/apache/spark/serializer/KryoSerializerSuite.scala | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 6975acb85073b..9ce64d41fbc40 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -22,9 +22,9 @@ import java.nio.ByteBuffer import scala.reflect.ClassTag +import com.esotericsoftware.kryo.{Kryo, KryoException} import com.esotericsoftware.kryo.io.{Input => KryoInput, Output => KryoOutput} import com.esotericsoftware.kryo.serializers.{JavaSerializer => KryoJavaSerializer} -import com.esotericsoftware.kryo.{Kryo, KryoException} import com.twitter.chill.{AllScalaRegistrar, EmptyScalaKryoInstantiator} import org.roaringbitmap.{ArrayContainer, BitmapContainer, RoaringArray, RoaringBitmap} diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index 8a41f49267a21..5769a69fc4466 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -23,10 +23,10 @@ import scala.reflect.ClassTag import com.esotericsoftware.kryo.Kryo import org.scalatest.FunSuite +import org.apache.spark.{SharedSparkContext, SparkConf} import org.apache.spark.scheduler.HighlyCompressedMapStatus import org.apache.spark.serializer.KryoTest._ import org.apache.spark.storage.BlockManagerId -import org.apache.spark.{SharedSparkContext, SparkConf} class KryoSerializerSuite extends FunSuite with SharedSparkContext { From 7e133162a4ba26809ba58b201e0edb610f5b2050 Mon Sep 17 00:00:00 2001 From: Imran Rashid Date: Tue, 3 Mar 2015 15:18:27 -0600 Subject: [PATCH 7/7] style style style --- .../org/apache/spark/serializer/KryoSerializerSuite.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index 5769a69fc4466..523d898207447 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -28,7 +28,6 @@ import org.apache.spark.scheduler.HighlyCompressedMapStatus import org.apache.spark.serializer.KryoTest._ import org.apache.spark.storage.BlockManagerId - class KryoSerializerSuite extends FunSuite with SharedSparkContext { conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") conf.set("spark.kryo.registrator", classOf[MyRegistrator].getName) @@ -258,7 +257,7 @@ class KryoSerializerSuite extends FunSuite with SharedSparkContext { val ser = new KryoSerializer(conf).newInstance() val denseBlockSizes = new Array[Long](5000) val sparseBlockSizes = Array[Long](0L, 1L, 0L, 2L) - Seq(denseBlockSizes, sparseBlockSizes).foreach{ blockSizes => + Seq(denseBlockSizes, sparseBlockSizes).foreach { blockSizes => ser.serialize(HighlyCompressedMapStatus(BlockManagerId("exec-1", "host", 1234), blockSizes)) } }