From 87c0a580ccce1438301c6653162b63a22a7ce23f Mon Sep 17 00:00:00 2001 From: NarineK Date: Tue, 3 May 2016 21:00:52 -0700 Subject: [PATCH 01/12] Implement repartitionByColumns for SparkR DataFrames --- R/pkg/NAMESPACE | 1 + R/pkg/R/DataFrame.R | 29 +++++++++++++++++++++++ R/pkg/R/generics.R | 8 +++++++ R/pkg/inst/tests/testthat/test_sparkSQL.R | 14 +++++++++++ 4 files changed, 52 insertions(+) diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 73f7c595f4437..b25eba9b10f63 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -82,6 +82,7 @@ exportMethods("arrange", "registerTempTable", "rename", "repartition", + "repartitionByColumns", "sample", "sample_frac", "sampleBy", diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 9e30fa0dbf26a..6a6df06a1be3b 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -594,6 +594,35 @@ setMethod("repartition", dataFrame(sdf) }) +#' RepartitionByColumns +#' +#' Return a new SparkDataFrame which has as many partitions as the number of unique +#' groups identified by column(s) values which are being specified by the input. +#' +#' @param x A SparkDataFrame +#' @param col The column by which the partitioning will be performed +#' +#' @family SparkDataFrame functions +#' @rdname repartition +#' @name repartition +#' @export +#' @examples +#'\dontrun{ +#' sc <- sparkR.init() +#' sqlContext <- sparkRSQL.init(sc) +#' path <- "path/to/file.json" +#' df <- read.json(sqlContext, path) +#' newDF <- repartitionByColumns(df, df$col1, df$col2) +#'} +setMethod("repartitionByColumns", + signature(x = "SparkDataFrame", col = "Column"), + function(x, col, ...) { + cols <- list(col, ...) + jcol <- lapply(cols, function(c) { c@jc }) + sdf <- callJMethod(x@sdf, "repartition", jcol) + dataFrame(sdf) + }) + #' toJSON #' #' Convert the rows of a SparkDataFrame into JSON objects and return an RDD where diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index f936ea6039981..8211d7621eddc 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -547,6 +547,14 @@ setGeneric("rename", function(x, ...) { standardGeneric("rename") }) #' @export setGeneric("registerTempTable", function(x, tableName) { standardGeneric("registerTempTable") }) +# @rdname repartitionByColumns +# @seealso repartition +# @export +setGeneric("repartitionByColumns", + function(x, col, ...) { + standardGeneric("repartitionByColumns") + }) + #' @rdname sample #' @export setGeneric("sample", diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 5cf9dc405b169..d76f72bb68b43 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -2083,6 +2083,20 @@ test_that("dapply() on a DataFrame", { expect_identical(expected, result) }) +test_that("repartitionByColumns on DataFrame", { + df <- createDataFrame ( + sqlContext, + list(list(1L, 1, "1", 0.1), list(1L, 2, "2", 0.2), list(3L, 3, "3", 0.3)), + c("a", "b", "c", "d")) + + # repartition by key + actual <- repartitionByColumns(df, df$"a", df$"b") + + # since we cannot access the number of partitions from dataframe, checking + # that at least the dimensions are identical + expect_identical(dim(df), dim(actual)) +}) + unlink(parquetPath) unlink(jsonPath) unlink(jsonPathNa) From 5752f2d6d49651dc3bbb8605e69a8b6c71c83b8a Mon Sep 17 00:00:00 2001 From: NarineK Date: Tue, 3 May 2016 21:36:40 -0700 Subject: [PATCH 02/12] rename repartitionByColumns to repartitionByColumn --- R/pkg/NAMESPACE | 2 +- R/pkg/R/DataFrame.R | 6 +++--- R/pkg/R/generics.R | 4 ++-- R/pkg/inst/tests/testthat/test_sparkSQL.R | 6 +++--- 4 files changed, 9 insertions(+), 9 deletions(-) diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index b25eba9b10f63..0b93d3c143805 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -82,7 +82,7 @@ exportMethods("arrange", "registerTempTable", "rename", "repartition", - "repartitionByColumns", + "repartitionByColumn", "sample", "sample_frac", "sampleBy", diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 6a6df06a1be3b..36ca2f6eaeafa 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -594,7 +594,7 @@ setMethod("repartition", dataFrame(sdf) }) -#' RepartitionByColumns +#' RepartitionByColumn #' #' Return a new SparkDataFrame which has as many partitions as the number of unique #' groups identified by column(s) values which are being specified by the input. @@ -612,9 +612,9 @@ setMethod("repartition", #' sqlContext <- sparkRSQL.init(sc) #' path <- "path/to/file.json" #' df <- read.json(sqlContext, path) -#' newDF <- repartitionByColumns(df, df$col1, df$col2) +#' newDF <- repartitionByColumn(df, df$col1, df$col2) #'} -setMethod("repartitionByColumns", +setMethod("repartitionByColumn", signature(x = "SparkDataFrame", col = "Column"), function(x, col, ...) { cols <- list(col, ...) diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 8211d7621eddc..68efd0849235e 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -550,9 +550,9 @@ setGeneric("registerTempTable", function(x, tableName) { standardGeneric("regist # @rdname repartitionByColumns # @seealso repartition # @export -setGeneric("repartitionByColumns", +setGeneric("repartitionByColumn", function(x, col, ...) { - standardGeneric("repartitionByColumns") + standardGeneric("repartitionByColumn") }) #' @rdname sample diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index d76f72bb68b43..6b0206fc55b5b 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -2083,14 +2083,14 @@ test_that("dapply() on a DataFrame", { expect_identical(expected, result) }) -test_that("repartitionByColumns on DataFrame", { +test_that("repartitionByColumn on DataFrame", { df <- createDataFrame ( sqlContext, list(list(1L, 1, "1", 0.1), list(1L, 2, "2", 0.2), list(3L, 3, "3", 0.3)), c("a", "b", "c", "d")) - # repartition by key - actual <- repartitionByColumns(df, df$"a", df$"b") + # repartition by columns + actual <- repartitionByColumn(df, df$"a", df$"b") # since we cannot access the number of partitions from dataframe, checking # that at least the dimensions are identical From 3ee277ac709028d7966522aeab299216042facd5 Mon Sep 17 00:00:00 2001 From: NarineK Date: Tue, 3 May 2016 22:08:13 -0700 Subject: [PATCH 03/12] added an example with dapply --- R/pkg/inst/tests/testthat/test_sparkSQL.R | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 6b0206fc55b5b..378f1fa94db3d 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -2095,6 +2095,19 @@ test_that("repartitionByColumn on DataFrame", { # since we cannot access the number of partitions from dataframe, checking # that at least the dimensions are identical expect_identical(dim(df), dim(actual)) + + # a test case with dapply + schema <- structType(structField("a", "integer"), structField("avg", "double")) + df <- repartitionByColumn(df, df$"a") + df1 <- dapply( + df, + function(x) { + y <- (data.frame(x$a[1], mean(x$b))) + }, + schema) + + # Number of partitions partitions is equal to 2 + expect_equal(nrow(df1), 2) }) unlink(parquetPath) From 8e9e34c597fe366c72ae817f4beedcab734a8db0 Mon Sep 17 00:00:00 2001 From: NarineK Date: Wed, 4 May 2016 12:56:01 -0700 Subject: [PATCH 04/12] moving repartitionByColumn to repartition --- R/pkg/NAMESPACE | 1 - R/pkg/R/DataFrame.R | 55 +++++++++-------------- R/pkg/R/RDD.R | 9 +++- R/pkg/R/generics.R | 4 +- R/pkg/inst/tests/testthat/test_sparkSQL.R | 6 +-- 5 files changed, 34 insertions(+), 41 deletions(-) diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index 0b93d3c143805..73f7c595f4437 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -82,7 +82,6 @@ exportMethods("arrange", "registerTempTable", "rename", "repartition", - "repartitionByColumn", "sample", "sample_frac", "sampleBy", diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 36ca2f6eaeafa..965cea7f7964c 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -570,10 +570,17 @@ setMethod("unpersist", #' Repartition #' -#' Return a new SparkDataFrame that has exactly numPartitions partitions. +#' There are two different options for repartition +#' Option 1 +#' Return a new SparkDataFrame that has exactly numPartitions partitions. +#' Option 2 +#' Return a new SparkDataFrame which has as many partitions as the number of unique +#' groups identified by column(s) values which are being specified by the input. +#' If both numPartitions and columns are specified, Option 1 will be chosen. #' #' @param x A SparkDataFrame #' @param numPartitions The number of partitions to use. +#' @param col The column by which the partitioning will be performed. #' #' @family SparkDataFrame functions #' @rdname repartition @@ -586,40 +593,22 @@ setMethod("unpersist", #' path <- "path/to/file.json" #' df <- read.json(sqlContext, path) #' newDF <- repartition(df, 2L) +#' newDF <- repartition(df, numPartitions = 2L) +#' newDF <- repartition(df, col = df$"col1", df$"col2") #'} setMethod("repartition", - signature(x = "SparkDataFrame", numPartitions = "numeric"), - function(x, numPartitions) { - sdf <- callJMethod(x@sdf, "repartition", numToInt(numPartitions)) - dataFrame(sdf) - }) - -#' RepartitionByColumn -#' -#' Return a new SparkDataFrame which has as many partitions as the number of unique -#' groups identified by column(s) values which are being specified by the input. -#' -#' @param x A SparkDataFrame -#' @param col The column by which the partitioning will be performed -#' -#' @family SparkDataFrame functions -#' @rdname repartition -#' @name repartition -#' @export -#' @examples -#'\dontrun{ -#' sc <- sparkR.init() -#' sqlContext <- sparkRSQL.init(sc) -#' path <- "path/to/file.json" -#' df <- read.json(sqlContext, path) -#' newDF <- repartitionByColumn(df, df$col1, df$col2) -#'} -setMethod("repartitionByColumn", - signature(x = "SparkDataFrame", col = "Column"), - function(x, col, ...) { - cols <- list(col, ...) - jcol <- lapply(cols, function(c) { c@jc }) - sdf <- callJMethod(x@sdf, "repartition", jcol) + signature(x = "SparkDataFrame"), + function(x, numPartitions = NULL, col = NULL, ...) { + if (!is.null(numPartitions) && (class(numPartitions) == "numeric" + || class(numPartitions) == "integer")) { + sdf <- callJMethod(x@sdf, "repartition", numToInt(numPartitions)) + } else if (!is.null(col) && class(col) == "Column") { + cols <- list(col, ...) + jcol <- lapply(cols, function(c) { c@jc }) + sdf <- callJMethod(x@sdf, "repartition", jcol) + } else { + stop("Please specify numPartitions or at least one column") + } dataFrame(sdf) }) diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R index 34d29ddbfdd52..550cb96cf3072 100644 --- a/R/pkg/R/RDD.R +++ b/R/pkg/R/RDD.R @@ -1023,9 +1023,14 @@ setMethod("keyBy", #' @aliases repartition,RDD #' @noRd setMethod("repartition", - signature(x = "RDD", numPartitions = "numeric"), + signature(x = "RDD"), function(x, numPartitions) { - coalesce(x, numPartitions, TRUE) + if (!is.null(numPartitions) && (class(numPartitions) == "numeric" + || class(numPartitions) == "integer")) { + coalesce(x, numPartitions, TRUE) + } else { + stop("Please, specify the number of partitions") + } }) #' Return a new RDD that is reduced into numPartitions partitions. diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 68efd0849235e..92f54e0999624 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -167,7 +167,7 @@ setGeneric("reduce", function(x, func) { standardGeneric("reduce") }) # @rdname repartition # @seealso coalesce # @export -setGeneric("repartition", function(x, numPartitions) { standardGeneric("repartition") }) +setGeneric("repartition", function(x, ...) { standardGeneric("repartition") }) # @rdname sampleRDD # @export @@ -547,7 +547,7 @@ setGeneric("rename", function(x, ...) { standardGeneric("rename") }) #' @export setGeneric("registerTempTable", function(x, tableName) { standardGeneric("registerTempTable") }) -# @rdname repartitionByColumns +# @rdname repartitionByColumn # @seealso repartition # @export setGeneric("repartitionByColumn", diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 378f1fa94db3d..94babadc51d9c 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -2083,14 +2083,14 @@ test_that("dapply() on a DataFrame", { expect_identical(expected, result) }) -test_that("repartitionByColumn on DataFrame", { +test_that("repartition by columns on DataFrame", { df <- createDataFrame ( sqlContext, list(list(1L, 1, "1", 0.1), list(1L, 2, "2", 0.2), list(3L, 3, "3", 0.3)), c("a", "b", "c", "d")) # repartition by columns - actual <- repartitionByColumn(df, df$"a", df$"b") + actual <- repartition(df, col = df$"a") # since we cannot access the number of partitions from dataframe, checking # that at least the dimensions are identical @@ -2098,7 +2098,7 @@ test_that("repartitionByColumn on DataFrame", { # a test case with dapply schema <- structType(structField("a", "integer"), structField("avg", "double")) - df <- repartitionByColumn(df, df$"a") + df <- repartition(df, col = df$"a") df1 <- dapply( df, function(x) { From 01beaba3f228570acab0243c410d5de08c7c30cd Mon Sep 17 00:00:00 2001 From: NarineK Date: Wed, 4 May 2016 13:49:58 -0700 Subject: [PATCH 05/12] remove repartitionByColumn --- R/pkg/R/generics.R | 8 -------- 1 file changed, 8 deletions(-) diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 92f54e0999624..3db1ac07666b3 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -547,14 +547,6 @@ setGeneric("rename", function(x, ...) { standardGeneric("rename") }) #' @export setGeneric("registerTempTable", function(x, tableName) { standardGeneric("registerTempTable") }) -# @rdname repartitionByColumn -# @seealso repartition -# @export -setGeneric("repartitionByColumn", - function(x, col, ...) { - standardGeneric("repartitionByColumn") - }) - #' @rdname sample #' @export setGeneric("sample", From 02f81db96c1e2552912d937bdfd63b7d7ee1b400 Mon Sep 17 00:00:00 2001 From: NarineK Date: Wed, 4 May 2016 16:02:50 -0700 Subject: [PATCH 06/12] Adding an option to include both number of partitions and the cols --- R/pkg/R/DataFrame.R | 36 +++++++++++++++-------- R/pkg/R/RDD.R | 6 ++-- R/pkg/inst/tests/testthat/test_sparkSQL.R | 2 +- 3 files changed, 28 insertions(+), 16 deletions(-) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 965cea7f7964c..3e2c106d31415 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -570,14 +570,16 @@ setMethod("unpersist", #' Repartition #' -#' There are two different options for repartition -#' Option 1 -#' Return a new SparkDataFrame that has exactly numPartitions partitions. -#' Option 2 -#' Return a new SparkDataFrame which has as many partitions as the number of unique -#' groups identified by column(s) values which are being specified by the input. -#' If both numPartitions and columns are specified, Option 1 will be chosen. -#' +#' The following options for repartitioning are possible: +#' \itemize{ +#' \item{"Option 1"} {Return a new SparkDataFrame partitioned by +#' the given columns into `numPartitions`.} +#' \item{"Option 2"} {Return a new SparkDataFrame that has exactly `numPartitions`.} +#' \item{"Option 3"} {Return a new SparkDataFrame partitioned by the given columns, +#' preserving the existing number of partitions.} +#' \item{"Option 4"} {Return a new SparkDataFrame that has exactly the default +#' number of numPartitions: 200.} +#'} #' @param x A SparkDataFrame #' @param numPartitions The number of partitions to use. #' @param col The column by which the partitioning will be performed. @@ -595,19 +597,29 @@ setMethod("unpersist", #' newDF <- repartition(df, 2L) #' newDF <- repartition(df, numPartitions = 2L) #' newDF <- repartition(df, col = df$"col1", df$"col2") +#' newDF <- repartition(df, 3L, col = df$"col1", df$"col2") #'} setMethod("repartition", signature(x = "SparkDataFrame"), function(x, numPartitions = NULL, col = NULL, ...) { - if (!is.null(numPartitions) && (class(numPartitions) == "numeric" - || class(numPartitions) == "integer")) { - sdf <- callJMethod(x@sdf, "repartition", numToInt(numPartitions)) + if (!is.null(numPartitions) && (class(numPartitions) == "numeric" || + class(numPartitions) == "integer")) { + # number of partitions and columns both are specified + if (!is.null(col) && class(col) == "Column") { + cols <- list(col, ...) + jcol <- lapply(cols, function(c) { c@jc }) + sdf <- callJMethod(x@sdf, "repartition", numToInt(numPartitions), jcol) + } else { + # only number of partitions is specified + sdf <- callJMethod(x@sdf, "repartition", numToInt(numPartitions)) + } } else if (!is.null(col) && class(col) == "Column") { + # only columns are specified cols <- list(col, ...) jcol <- lapply(cols, function(c) { c@jc }) sdf <- callJMethod(x@sdf, "repartition", jcol) } else { - stop("Please specify numPartitions or at least one column") + sdf <- callJMethod(x@sdf, "repartition", 200L) } dataFrame(sdf) }) diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R index 550cb96cf3072..b8d066868689e 100644 --- a/R/pkg/R/RDD.R +++ b/R/pkg/R/RDD.R @@ -1025,11 +1025,11 @@ setMethod("keyBy", setMethod("repartition", signature(x = "RDD"), function(x, numPartitions) { - if (!is.null(numPartitions) && (class(numPartitions) == "numeric" - || class(numPartitions) == "integer")) { + if (!is.null(numPartitions) && (class(numPartitions) == "numeric" || + class(numPartitions) == "integer")) { coalesce(x, numPartitions, TRUE) } else { - stop("Please, specify the number of partitions") + coalesce(x, 200L) } }) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 94babadc51d9c..06eb46226a573 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -2106,7 +2106,7 @@ test_that("repartition by columns on DataFrame", { }, schema) - # Number of partitions partitions is equal to 2 + # Number of partitions is equal to 2 expect_equal(nrow(df1), 2) }) From e48bb22503d8af87976e1e0d1924f2e8789a572b Mon Sep 17 00:00:00 2001 From: NarineK Date: Wed, 4 May 2016 16:10:30 -0700 Subject: [PATCH 07/12] small typo fix in RDD.R --- R/pkg/R/RDD.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R index b8d066868689e..d3e0736919cab 100644 --- a/R/pkg/R/RDD.R +++ b/R/pkg/R/RDD.R @@ -1029,7 +1029,7 @@ setMethod("repartition", class(numPartitions) == "integer")) { coalesce(x, numPartitions, TRUE) } else { - coalesce(x, 200L) + coalesce(x, 200L, TRUE) } }) From 88528924c0d68297f2a519b3767439e67a6b480c Mon Sep 17 00:00:00 2001 From: NarineK Date: Wed, 4 May 2016 16:23:00 -0700 Subject: [PATCH 08/12] use is.numeric --- R/pkg/R/DataFrame.R | 3 +-- R/pkg/R/RDD.R | 3 +-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 3e2c106d31415..4e35f34164497 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -602,8 +602,7 @@ setMethod("unpersist", setMethod("repartition", signature(x = "SparkDataFrame"), function(x, numPartitions = NULL, col = NULL, ...) { - if (!is.null(numPartitions) && (class(numPartitions) == "numeric" || - class(numPartitions) == "integer")) { + if (!is.null(numPartitions) && is.numeric(numPartitions)) { # number of partitions and columns both are specified if (!is.null(col) && class(col) == "Column") { cols <- list(col, ...) diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R index d3e0736919cab..8cc30d4058e90 100644 --- a/R/pkg/R/RDD.R +++ b/R/pkg/R/RDD.R @@ -1025,8 +1025,7 @@ setMethod("keyBy", setMethod("repartition", signature(x = "RDD"), function(x, numPartitions) { - if (!is.null(numPartitions) && (class(numPartitions) == "numeric" || - class(numPartitions) == "integer")) { + if (!is.null(numPartitions) && is.numeric(numPartitions)) { coalesce(x, numPartitions, TRUE) } else { coalesce(x, 200L, TRUE) From cf54f09047d3c399126432ea1b354f5c22f6a968 Mon Sep 17 00:00:00 2001 From: NarineK Date: Wed, 4 May 2016 17:05:05 -0700 Subject: [PATCH 09/12] Raise error instead of default --- R/pkg/R/DataFrame.R | 6 ++---- R/pkg/R/RDD.R | 2 +- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 4e35f34164497..36141b0d2922f 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -570,15 +570,13 @@ setMethod("unpersist", #' Repartition #' -#' The following options for repartitioning are possible: +#' The following options for repartition are possible: #' \itemize{ #' \item{"Option 1"} {Return a new SparkDataFrame partitioned by #' the given columns into `numPartitions`.} #' \item{"Option 2"} {Return a new SparkDataFrame that has exactly `numPartitions`.} #' \item{"Option 3"} {Return a new SparkDataFrame partitioned by the given columns, #' preserving the existing number of partitions.} -#' \item{"Option 4"} {Return a new SparkDataFrame that has exactly the default -#' number of numPartitions: 200.} #'} #' @param x A SparkDataFrame #' @param numPartitions The number of partitions to use. @@ -618,7 +616,7 @@ setMethod("repartition", jcol <- lapply(cols, function(c) { c@jc }) sdf <- callJMethod(x@sdf, "repartition", jcol) } else { - sdf <- callJMethod(x@sdf, "repartition", 200L) + stop("Please, specify the number of partitions and/or a column(s)") } dataFrame(sdf) }) diff --git a/R/pkg/R/RDD.R b/R/pkg/R/RDD.R index 8cc30d4058e90..f1badf4364da0 100644 --- a/R/pkg/R/RDD.R +++ b/R/pkg/R/RDD.R @@ -1028,7 +1028,7 @@ setMethod("repartition", if (!is.null(numPartitions) && is.numeric(numPartitions)) { coalesce(x, numPartitions, TRUE) } else { - coalesce(x, 200L, TRUE) + stop("Please, specify the number of partitions") } }) From 9f85704ef7c336c73a5034c2349e380fcd5b4f17 Mon Sep 17 00:00:00 2001 From: NarineK Date: Wed, 4 May 2016 20:09:04 -0700 Subject: [PATCH 10/12] Add test cases for all possible scenarios --- R/pkg/inst/tests/testthat/test_sparkSQL.R | 15 ++++++++++++--- 1 file changed, 12 insertions(+), 3 deletions(-) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 06eb46226a573..e8aedd1a2be10 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -2089,14 +2089,23 @@ test_that("repartition by columns on DataFrame", { list(list(1L, 1, "1", 0.1), list(1L, 2, "2", 0.2), list(3L, 3, "3", 0.3)), c("a", "b", "c", "d")) - # repartition by columns - actual <- repartition(df, col = df$"a") + # no column and number of partitions specified + retError <- tryCatch(repartition(df), error = function(e) e) + expect_equal(grepl + ("Please, specify the number of partitions and/or a column\\(s\\)", retError), TRUE) + + # repartition by column and number of partitions + actual <- repartition(df, 3L, col = df$"a") # since we cannot access the number of partitions from dataframe, checking # that at least the dimensions are identical expect_identical(dim(df), dim(actual)) - # a test case with dapply + # repartition by number of partitions + actual <- repartition(df, 13L) + expect_identical(dim(df), dim(actual)) + + # a test case with a column and dapply schema <- structType(structField("a", "integer"), structField("avg", "double")) df <- repartition(df, col = df$"a") df1 <- dapply( From 700f88699b258d3a05ba08607f099a285e5e508e Mon Sep 17 00:00:00 2001 From: NarineK Date: Wed, 4 May 2016 22:35:07 -0700 Subject: [PATCH 11/12] fixed comment on number of default partitions --- R/pkg/R/DataFrame.R | 4 ++-- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 5 +++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 36141b0d2922f..fcf473ac7b76e 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -575,8 +575,8 @@ setMethod("unpersist", #' \item{"Option 1"} {Return a new SparkDataFrame partitioned by #' the given columns into `numPartitions`.} #' \item{"Option 2"} {Return a new SparkDataFrame that has exactly `numPartitions`.} -#' \item{"Option 3"} {Return a new SparkDataFrame partitioned by the given columns, -#' preserving the existing number of partitions.} +#' \item{"Option 3"} {Return a new SparkDataFrame partitioned by the given column(s), +#' using `spark.sql.shuffle.partitions` as number of partitions.} #'} #' @param x A SparkDataFrame #' @param numPartitions The number of partitions to use. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 08be94e8d4f12..2a3e6bc2b807a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2180,8 +2180,9 @@ class Dataset[T] private[sql]( } /** - * Returns a new [[Dataset]] partitioned by the given partitioning expressions preserving - * the existing number of partitions. The resulting Datasetis hash partitioned. + * Returns a new [[Dataset]] partitioned by the given partitioning expressions, using + * `spark.sql.shuffle.partitions` as number of partitions. + * The resulting Datasetis hash partitioned. * * This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL). * From cad7ce89f5ea81467bc108ca59f1e4b5d8ffbb7c Mon Sep 17 00:00:00 2001 From: NarineK Date: Thu, 5 May 2016 11:08:29 -0700 Subject: [PATCH 12/12] Minor comment change --- sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala index 2a3e6bc2b807a..318c714f2ecb4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala @@ -2182,7 +2182,7 @@ class Dataset[T] private[sql]( /** * Returns a new [[Dataset]] partitioned by the given partitioning expressions, using * `spark.sql.shuffle.partitions` as number of partitions. - * The resulting Datasetis hash partitioned. + * The resulting Dataset is hash partitioned. * * This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL). *