From 84dd4a7bc0eba8b04bb5cf53d73042ac5078d611 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Thu, 6 Sep 2018 11:19:02 -0700 Subject: [PATCH 1/2] Add test for binaryFiles minPartitions --- .../scala/org/apache/spark/FileSuite.scala | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index a441b9c8ab97a..1bf4bcde92bcf 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -19,10 +19,12 @@ package org.apache.spark import java.io._ import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets import java.util.zip.GZIPOutputStream import scala.io.Source +import com.google.common.io.Files import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.io._ @@ -299,6 +301,25 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { } } + test("SPARK-22357 test binaryFiles minPartitions") { + sc = new SparkContext(new SparkConf().setAppName("test").setMaster("local") + .set("spark.files.openCostInBytes", "0") + .set("spark.default.parallelism", "1")) + + val tempDir = Utils.createTempDir() + val tempDirPath = tempDir.getAbsolutePath + + for (i <- 0 until 8) { + val tempFile = new File(tempDir, s"part-0000$i") + Files.write("someline1 in file1\nsomeline2 in file1\nsomeline3 in file1", tempFile, + StandardCharsets.UTF_8) + } + + assert(sc.binaryFiles(tempDirPath, minPartitions = 1).getNumPartitions === 1) + assert(sc.binaryFiles(tempDirPath, minPartitions = 2).getNumPartitions === 2) + assert(sc.binaryFiles(tempDirPath, minPartitions = 8).getNumPartitions === 8) + } + test("fixed record length binary file as byte array") { sc = new SparkContext("local", "test") val testOutput = Array[Byte](1, 2, 3, 4, 5, 6) From 6e1d8fd43091d7b8ad83bda18f4b3701a829dc10 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Thu, 6 Sep 2018 11:50:27 -0700 Subject: [PATCH 2/2] Make tests a loop --- core/src/test/scala/org/apache/spark/FileSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/src/test/scala/org/apache/spark/FileSuite.scala b/core/src/test/scala/org/apache/spark/FileSuite.scala index 1bf4bcde92bcf..81b18c71f30ee 100644 --- a/core/src/test/scala/org/apache/spark/FileSuite.scala +++ b/core/src/test/scala/org/apache/spark/FileSuite.scala @@ -315,9 +315,9 @@ class FileSuite extends SparkFunSuite with LocalSparkContext { StandardCharsets.UTF_8) } - assert(sc.binaryFiles(tempDirPath, minPartitions = 1).getNumPartitions === 1) - assert(sc.binaryFiles(tempDirPath, minPartitions = 2).getNumPartitions === 2) - assert(sc.binaryFiles(tempDirPath, minPartitions = 8).getNumPartitions === 8) + for (p <- Seq(1, 2, 8)) { + assert(sc.binaryFiles(tempDirPath, minPartitions = p).getNumPartitions === p) + } } test("fixed record length binary file as byte array") {