From 071168290b6cfd332c02dd3e44415c7e069dbb46 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Mon, 5 Sep 2016 18:49:15 +0900 Subject: [PATCH 1/6] Avoid direct use of URI for file paths in Windows --- core/src/main/scala/org/apache/spark/SparkContext.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 744d5d0f7aa8e..60611b3096042 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -992,7 +992,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli // This is a hack to enforce loading hdfs-site.xml. // See SPARK-11227 for details. - FileSystem.get(new URI(path), hadoopConfiguration) + FileSystem.get(Utils.resolveURI(path), hadoopConfiguration) // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it. val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration)) @@ -1081,7 +1081,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli // This is a hack to enforce loading hdfs-site.xml. // See SPARK-11227 for details. - FileSystem.get(new URI(path), hadoopConfiguration) + FileSystem.get(Utils.resolveURI(path), hadoopConfiguration) // The call to NewHadoopJob automatically adds security credentials to conf, // so we don't need to explicitly add them ourselves From f42c182cc3d7eaf863ccab7c52b59ddc3e12c17a Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Mon, 5 Sep 2016 20:37:15 +0900 Subject: [PATCH 2/6] Handle Windows specific path issues --- .../main/scala/org/apache/spark/util/Utils.scala | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 9b4274a27b3be..b9092b80ddff2 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1900,7 +1900,20 @@ private[spark] object Utils extends Logging { */ def resolveURI(path: String): URI = { try { - val uri = new URI(path) + val osSafePath = if (Path.isWindowsAbsolutePath(path, false)) { + // Make sure C:/ part becomes /C/. + val windowsUri = new URI(path) + val driveLetter = windowsUri.getScheme + s"/$driveLetter/${windowsUri.getSchemeSpecificPart()}" + } else if (Path.isWindowsAbsolutePath(path, true)) { + // Make sure /C:/ part becomes /C/. + val windowsUri = new URI(path.substring(1)) + val driveLetter = windowsUri.getScheme + s"/$driveLetter/${windowsUri.getSchemeSpecificPart()}" + } else { + path + } + val uri = new URI(osSafePath) if (uri.getScheme() != null) { return uri } From f5bde71e8e1abae545af45e19a058883ae0105b2 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 6 Sep 2016 07:55:11 +0900 Subject: [PATCH 3/6] Revert manual Windows path handling --- .../main/scala/org/apache/spark/util/Utils.scala | 15 +-------------- 1 file changed, 1 insertion(+), 14 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index b9092b80ddff2..9b4274a27b3be 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -1900,20 +1900,7 @@ private[spark] object Utils extends Logging { */ def resolveURI(path: String): URI = { try { - val osSafePath = if (Path.isWindowsAbsolutePath(path, false)) { - // Make sure C:/ part becomes /C/. - val windowsUri = new URI(path) - val driveLetter = windowsUri.getScheme - s"/$driveLetter/${windowsUri.getSchemeSpecificPart()}" - } else if (Path.isWindowsAbsolutePath(path, true)) { - // Make sure /C:/ part becomes /C/. - val windowsUri = new URI(path.substring(1)) - val driveLetter = windowsUri.getScheme - s"/$driveLetter/${windowsUri.getSchemeSpecificPart()}" - } else { - path - } - val uri = new URI(osSafePath) + val uri = new URI(path) if (uri.getScheme() != null) { return uri } From 93924214af7530ccb7b9b0faba8d25179b7c3d38 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 6 Sep 2016 15:03:09 +0900 Subject: [PATCH 4/6] Use new Path(...).toUri --- core/src/main/scala/org/apache/spark/SparkContext.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 60611b3096042..5a8488846b18f 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -992,7 +992,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli // This is a hack to enforce loading hdfs-site.xml. // See SPARK-11227 for details. - FileSystem.get(Utils.resolveURI(path), hadoopConfiguration) + FileSystem.get(new Path(path).toUri, hadoopConfiguration) // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it. val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration)) @@ -1081,7 +1081,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli // This is a hack to enforce loading hdfs-site.xml. // See SPARK-11227 for details. - FileSystem.get(Utils.resolveURI(path), hadoopConfiguration) + FileSystem.get(new Path(path).toUri, hadoopConfiguration) // The call to NewHadoopJob automatically adds security credentials to conf, // so we don't need to explicitly add them ourselves From 790d5b2304473555d1edf113f9bbee3034134fac Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Tue, 6 Sep 2016 15:04:45 +0900 Subject: [PATCH 5/6] Fix R tests more --- R/pkg/inst/tests/testthat/test_mllib.R | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/R/pkg/inst/tests/testthat/test_mllib.R b/R/pkg/inst/tests/testthat/test_mllib.R index ca25f2c7e8263..ac896cfbcfff7 100644 --- a/R/pkg/inst/tests/testthat/test_mllib.R +++ b/R/pkg/inst/tests/testthat/test_mllib.R @@ -22,6 +22,11 @@ context("MLlib functions") # Tests for MLlib functions in SparkR sparkSession <- sparkR.session(enableHiveSupport = FALSE) +absoluteSparkPath <- function(x) { + sparkHome <- sparkR.conf("spark.home") + file.path(sparkHome, x) +} + test_that("formula of spark.glm", { training <- suppressWarnings(createDataFrame(iris)) # directly calling the spark API @@ -354,7 +359,8 @@ test_that("spark.kmeans", { }) test_that("spark.mlp", { - df <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm") + df <- read.df(absoluteSparkPath("data/mllib/sample_multiclass_classification_data.txt"), + source = "libsvm") model <- spark.mlp(df, blockSize = 128, layers = c(4, 5, 4, 3), solver = "l-bfgs", maxIter = 100, tol = 0.5, stepSize = 1, seed = 1) @@ -616,7 +622,7 @@ test_that("spark.gaussianMixture", { }) test_that("spark.lda with libsvm", { - text <- read.df("data/mllib/sample_lda_libsvm_data.txt", source = "libsvm") + text <- read.df(absoluteSparkPath("data/mllib/sample_lda_libsvm_data.txt"), source = "libsvm") model <- spark.lda(text, optimizer = "em") stats <- summary(model, 10) @@ -652,7 +658,7 @@ test_that("spark.lda with libsvm", { }) test_that("spark.lda with text input", { - text <- read.text("data/mllib/sample_lda_data.txt") + text <- read.text(absoluteSparkPath("data/mllib/sample_lda_data.txt")) model <- spark.lda(text, optimizer = "online", features = "value") stats <- summary(model) @@ -688,7 +694,7 @@ test_that("spark.lda with text input", { }) test_that("spark.posterior and spark.perplexity", { - text <- read.text("data/mllib/sample_lda_data.txt") + text <- read.text(absoluteSparkPath("data/mllib/sample_lda_data.txt")) model <- spark.lda(text, features = "value", k = 3) # Assert perplexities are equal From 41aaaf127e949af7563024c1584567a177295409 Mon Sep 17 00:00:00 2001 From: hyukjinkwon Date: Wed, 7 Sep 2016 14:31:47 +0900 Subject: [PATCH 6/6] Replace FileSystem.get to FileSystem.getLocal --- core/src/main/scala/org/apache/spark/SparkContext.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 5a8488846b18f..4aa795a58a28e 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -992,7 +992,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli // This is a hack to enforce loading hdfs-site.xml. // See SPARK-11227 for details. - FileSystem.get(new Path(path).toUri, hadoopConfiguration) + FileSystem.getLocal(hadoopConfiguration) // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it. val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration)) @@ -1081,7 +1081,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli // This is a hack to enforce loading hdfs-site.xml. // See SPARK-11227 for details. - FileSystem.get(new Path(path).toUri, hadoopConfiguration) + FileSystem.getLocal(hadoopConfiguration) // The call to NewHadoopJob automatically adds security credentials to conf, // so we don't need to explicitly add them ourselves