Skip to content
Closed
37 changes: 32 additions & 5 deletions R/pkg/R/DataFrame.R
Original file line number Diff line number Diff line change
Expand Up @@ -570,10 +570,17 @@ setMethod("unpersist",

#' Repartition
#'
#' Return a new SparkDataFrame that has exactly numPartitions partitions.
#'
#' The following options for repartition are possible:
#' \itemize{
#' \item{"Option 1"} {Return a new SparkDataFrame partitioned by
#' the given columns into `numPartitions`.}
#' \item{"Option 2"} {Return a new SparkDataFrame that has exactly `numPartitions`.}
#' \item{"Option 3"} {Return a new SparkDataFrame partitioned by the given column(s),
#' using `spark.sql.shuffle.partitions` as number of partitions.}
#'}
#' @param x A SparkDataFrame
#' @param numPartitions The number of partitions to use.
#' @param col The column by which the partitioning will be performed.
#'
#' @family SparkDataFrame functions
#' @rdname repartition
Expand All @@ -586,11 +593,31 @@ setMethod("unpersist",
#' path <- "path/to/file.json"
#' df <- read.json(sqlContext, path)
#' newDF <- repartition(df, 2L)
#' newDF <- repartition(df, numPartitions = 2L)
#' newDF <- repartition(df, col = df$"col1", df$"col2")
#' newDF <- repartition(df, 3L, col = df$"col1", df$"col2")
#'}
setMethod("repartition",
signature(x = "SparkDataFrame", numPartitions = "numeric"),
function(x, numPartitions) {
sdf <- callJMethod(x@sdf, "repartition", numToInt(numPartitions))
signature(x = "SparkDataFrame"),
function(x, numPartitions = NULL, col = NULL, ...) {
if (!is.null(numPartitions) && is.numeric(numPartitions)) {
# number of partitions and columns both are specified
if (!is.null(col) && class(col) == "Column") {
cols <- list(col, ...)
jcol <- lapply(cols, function(c) { c@jc })
sdf <- callJMethod(x@sdf, "repartition", numToInt(numPartitions), jcol)
} else {
# only number of partitions is specified
sdf <- callJMethod(x@sdf, "repartition", numToInt(numPartitions))
}
} else if (!is.null(col) && class(col) == "Column") {
# only columns are specified
cols <- list(col, ...)
jcol <- lapply(cols, function(c) { c@jc })
sdf <- callJMethod(x@sdf, "repartition", jcol)
} else {
stop("Please, specify the number of partitions and/or a column(s)")
}
dataFrame(sdf)
})

Expand Down
8 changes: 6 additions & 2 deletions R/pkg/R/RDD.R
Original file line number Diff line number Diff line change
Expand Up @@ -1023,9 +1023,13 @@ setMethod("keyBy",
#' @aliases repartition,RDD
#' @noRd
setMethod("repartition",
signature(x = "RDD", numPartitions = "numeric"),
signature(x = "RDD"),
function(x, numPartitions) {
coalesce(x, numPartitions, TRUE)
if (!is.null(numPartitions) && is.numeric(numPartitions)) {
coalesce(x, numPartitions, TRUE)
} else {
stop("Please, specify the number of partitions")
}
})

#' Return a new RDD that is reduced into numPartitions partitions.
Expand Down
2 changes: 1 addition & 1 deletion R/pkg/R/generics.R
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ setGeneric("reduce", function(x, func) { standardGeneric("reduce") })
# @rdname repartition
# @seealso coalesce
# @export
setGeneric("repartition", function(x, numPartitions) { standardGeneric("repartition") })
setGeneric("repartition", function(x, ...) { standardGeneric("repartition") })
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can this be function(x, numPartitions, ...) ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nvm, we want the numPartitions be optional.


# @rdname sampleRDD
# @export
Expand Down
36 changes: 36 additions & 0 deletions R/pkg/inst/tests/testthat/test_sparkSQL.R
Original file line number Diff line number Diff line change
Expand Up @@ -2083,6 +2083,42 @@ test_that("dapply() on a DataFrame", {
expect_identical(expected, result)
})

test_that("repartition by columns on DataFrame", {
df <- createDataFrame (
sqlContext,
list(list(1L, 1, "1", 0.1), list(1L, 2, "2", 0.2), list(3L, 3, "3", 0.3)),
c("a", "b", "c", "d"))

# no column and number of partitions specified
retError <- tryCatch(repartition(df), error = function(e) e)
expect_equal(grepl
("Please, specify the number of partitions and/or a column\\(s\\)", retError), TRUE)

# repartition by column and number of partitions
actual <- repartition(df, 3L, col = df$"a")

# since we cannot access the number of partitions from dataframe, checking
# that at least the dimensions are identical
expect_identical(dim(df), dim(actual))

# repartition by number of partitions
actual <- repartition(df, 13L)
expect_identical(dim(df), dim(actual))

# a test case with a column and dapply
schema <- structType(structField("a", "integer"), structField("avg", "double"))
df <- repartition(df, col = df$"a")
df1 <- dapply(
df,
function(x) {
y <- (data.frame(x$a[1], mean(x$b)))
},
schema)

# Number of partitions is equal to 2
expect_equal(nrow(df1), 2)
})

unlink(parquetPath)
unlink(jsonPath)
unlink(jsonPathNa)
5 changes: 3 additions & 2 deletions sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2180,8 +2180,9 @@ class Dataset[T] private[sql](
}

/**
* Returns a new [[Dataset]] partitioned by the given partitioning expressions preserving
* the existing number of partitions. The resulting Datasetis hash partitioned.
* Returns a new [[Dataset]] partitioned by the given partitioning expressions, using
* `spark.sql.shuffle.partitions` as number of partitions.
* The resulting Dataset is hash partitioned.
*
* This is the same operation as "DISTRIBUTE BY" in SQL (Hive QL).
*
Expand Down