From f190d62460829dcfb84ff1a8e6dd6fe9cbd25719 Mon Sep 17 00:00:00 2001 From: zero323 Date: Fri, 12 May 2017 17:54:46 +0200 Subject: [PATCH 1/5] Initial implementation --- R/pkg/R/DataFrame.R | 30 ++++++++++++++++++++++ R/pkg/R/context.R | 4 +-- R/pkg/R/generics.R | 4 +++ R/pkg/inst/tests/testthat/test_broadcast.R | 2 +- R/pkg/inst/tests/testthat/test_sparkSQL.R | 5 ++++ R/pkg/inst/tests/testthat/test_utils.R | 2 +- 6 files changed, 43 insertions(+), 4 deletions(-) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index b56dddcb9f2ef..78e197b47e847 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -3769,3 +3769,33 @@ setMethod("alias", sdf <- callJMethod(object@sdf, "alias", data) dataFrame(sdf) }) + + +#' broadcast +#' +#' Return a new SparkDataFrame marked as small enough for use in broadcast joins. +#' +#' Equivalent to hint(x, "broadcast). +#' +#' @param x a SparkDataFrame. +#' @return a SparkDataFrame. +#' +#' @aliases broadcast,SparkDataFrame-method +#' @family SparkDataFrame functions +#' @rdname broadcast +#' @name broadcast +#' @export +#' @examples +#' \dontrun{ +#' df <- createDataFrame(mtcars) +#' avg_mpg <- mean(groupBy(createDataFrame(mtcars), "cyl"), "mpg") +#' +#' head(join(df, broadcast(avg_mpg), df$cyl == avg_mpg$cyl)) +#' } +#' @note broadcast since 2.3.0 +setMethod("broadcast", + signature(x = "SparkDataFrame"), + function(x) { + sdf <- callJStatic("org.apache.spark.sql.functions", "broadcast", x@sdf) + dataFrame(sdf) + }) diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R index 50856e3d9856c..b9fd45219a4ae 100644 --- a/R/pkg/R/context.R +++ b/R/pkg/R/context.R @@ -258,7 +258,7 @@ includePackage <- function(sc, pkg) { #' #' # Large Matrix object that we want to broadcast #' randomMat <- matrix(nrow=100, ncol=10, data=rnorm(1000)) -#' randomMatBr <- broadcast(sc, randomMat) +#' randomMatBr <- broadcast_(sc, randomMat) #' #' # Use the broadcast variable inside the function #' useBroadcast <- function(x) { @@ -266,7 +266,7 @@ includePackage <- function(sc, pkg) { #' } #' sumRDD <- lapply(rdd, useBroadcast) #'} -broadcast <- function(sc, object) { +broadcast_ <- function(sc, object) { objName <- as.character(substitute(object)) serializedObj <- serialize(object, connection = NULL) diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 3c84bf8a4803e..514ca99d45cd3 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -799,6 +799,10 @@ setGeneric("write.df", function(df, path = NULL, ...) { standardGeneric("write.d #' @export setGeneric("randomSplit", function(x, weights, seed) { standardGeneric("randomSplit") }) +#' @rdname broadcast +#' @export +setGeneric("broadcast", function(x) { standardGeneric("broadcast") }) + ###################### Column Methods ########################## #' @rdname columnfunctions diff --git a/R/pkg/inst/tests/testthat/test_broadcast.R b/R/pkg/inst/tests/testthat/test_broadcast.R index 504ded4fc8623..bf29ad44bbeea 100644 --- a/R/pkg/inst/tests/testthat/test_broadcast.R +++ b/R/pkg/inst/tests/testthat/test_broadcast.R @@ -29,7 +29,7 @@ test_that("using broadcast variable", { skip_on_cran() randomMat <- matrix(nrow = 10, ncol = 10, data = rnorm(100)) - randomMatBr <- broadcast(sc, randomMat) + randomMatBr <- broadcast_(sc, randomMat) useBroadcast <- function(x) { sum(SparkR:::value(randomMatBr) * x) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 19aa61e9a56c3..7f6f4cf333930 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -2216,6 +2216,11 @@ test_that("join(), crossJoin() and merge() on a DataFrame", { explain(join(df1, hint(df2, "broadcast"), df1$id == df2$id)) ) expect_true(any(grepl("BroadcastHashJoin", execution_plan_hint))) + + execution_plan_broadcast <- capture.output( + explain(join(df1, broadcast(df2), df1$id == df2$id)) + ) + expect_true(any(grepl("BroadcastHashJoin", execution_plan_broadcast))) }) test_that("toJSON() on DataFrame", { diff --git a/R/pkg/inst/tests/testthat/test_utils.R b/R/pkg/inst/tests/testthat/test_utils.R index 4a01e875405ff..5ccec1060cee7 100644 --- a/R/pkg/inst/tests/testthat/test_utils.R +++ b/R/pkg/inst/tests/testthat/test_utils.R @@ -136,7 +136,7 @@ test_that("cleanClosure on R functions", { # Test for broadcast variables. a <- matrix(nrow = 10, ncol = 10, data = rnorm(100)) - aBroadcast <- broadcast(sc, a) + aBroadcast <- broadcast_(sc, a) normMultiply <- function(x) { norm(aBroadcast$value) * x } newnormMultiply <- SparkR:::cleanClosure(normMultiply) env <- environment(newnormMultiply) From 397ab1f7b4b4e2b9e51b697c92e3be197fed4554 Mon Sep 17 00:00:00 2001 From: zero323 Date: Fri, 12 May 2017 19:38:31 +0200 Subject: [PATCH 2/5] Fix style --- R/pkg/inst/tests/testthat/test_sparkSQL.R | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 7f6f4cf333930..ee104e0d32624 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -2216,7 +2216,7 @@ test_that("join(), crossJoin() and merge() on a DataFrame", { explain(join(df1, hint(df2, "broadcast"), df1$id == df2$id)) ) expect_true(any(grepl("BroadcastHashJoin", execution_plan_hint))) - + execution_plan_broadcast <- capture.output( explain(join(df1, broadcast(df2), df1$id == df2$id)) ) From 246b91f8af84115af8f6283fb783000c9cc613ec Mon Sep 17 00:00:00 2001 From: zero323 Date: Sat, 13 May 2017 12:08:08 +0200 Subject: [PATCH 3/5] Style --- R/pkg/R/DataFrame.R | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 78e197b47e847..aab2fc17aedaf 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -3770,16 +3770,15 @@ setMethod("alias", dataFrame(sdf) }) - #' broadcast -#' -#' Return a new SparkDataFrame marked as small enough for use in broadcast joins. -#' -#' Equivalent to hint(x, "broadcast). -#' +#' +#' Return a new SparkDataFrame marked as small enough for use in broadcast joins. +#' +#' Equivalent to \code{hint(x, "broadcast")}. +#' #' @param x a SparkDataFrame. #' @return a SparkDataFrame. -#' +#' #' @aliases broadcast,SparkDataFrame-method #' @family SparkDataFrame functions #' @rdname broadcast From 1530785f7469830446cd95717d524eb42d88e4ab Mon Sep 17 00:00:00 2001 From: zero323 Date: Sat, 13 May 2017 12:38:50 +0200 Subject: [PATCH 4/5] Rename broadcast_ to broadcastRDD --- R/pkg/R/context.R | 4 ++-- R/pkg/inst/tests/testthat/test_broadcast.R | 2 +- R/pkg/inst/tests/testthat/test_utils.R | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/R/pkg/R/context.R b/R/pkg/R/context.R index b9fd45219a4ae..8349b57a30a93 100644 --- a/R/pkg/R/context.R +++ b/R/pkg/R/context.R @@ -258,7 +258,7 @@ includePackage <- function(sc, pkg) { #' #' # Large Matrix object that we want to broadcast #' randomMat <- matrix(nrow=100, ncol=10, data=rnorm(1000)) -#' randomMatBr <- broadcast_(sc, randomMat) +#' randomMatBr <- broadcastRDD(sc, randomMat) #' #' # Use the broadcast variable inside the function #' useBroadcast <- function(x) { @@ -266,7 +266,7 @@ includePackage <- function(sc, pkg) { #' } #' sumRDD <- lapply(rdd, useBroadcast) #'} -broadcast_ <- function(sc, object) { +broadcastRDD <- function(sc, object) { objName <- as.character(substitute(object)) serializedObj <- serialize(object, connection = NULL) diff --git a/R/pkg/inst/tests/testthat/test_broadcast.R b/R/pkg/inst/tests/testthat/test_broadcast.R index bf29ad44bbeea..49570041c1159 100644 --- a/R/pkg/inst/tests/testthat/test_broadcast.R +++ b/R/pkg/inst/tests/testthat/test_broadcast.R @@ -29,7 +29,7 @@ test_that("using broadcast variable", { skip_on_cran() randomMat <- matrix(nrow = 10, ncol = 10, data = rnorm(100)) - randomMatBr <- broadcast_(sc, randomMat) + randomMatBr <- broadcastRDD(sc, randomMat) useBroadcast <- function(x) { sum(SparkR:::value(randomMatBr) * x) diff --git a/R/pkg/inst/tests/testthat/test_utils.R b/R/pkg/inst/tests/testthat/test_utils.R index 5ccec1060cee7..f6de3394086b0 100644 --- a/R/pkg/inst/tests/testthat/test_utils.R +++ b/R/pkg/inst/tests/testthat/test_utils.R @@ -136,7 +136,7 @@ test_that("cleanClosure on R functions", { # Test for broadcast variables. a <- matrix(nrow = 10, ncol = 10, data = rnorm(100)) - aBroadcast <- broadcast_(sc, a) + aBroadcast <- broadcastRDD(sc, a) normMultiply <- function(x) { norm(aBroadcast$value) * x } newnormMultiply <- SparkR:::cleanClosure(normMultiply) env <- environment(newnormMultiply) From d6c343538a731a45353bec5a7c290a201756039b Mon Sep 17 00:00:00 2001 From: zero323 Date: Sun, 14 May 2017 21:08:03 +0200 Subject: [PATCH 5/5] Add broadcast to NAMESPACE --- R/pkg/NAMESPACE | 1 + 1 file changed, 1 insertion(+) diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index ba0fe7708bcc3..5c074d3c0fd40 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -84,6 +84,7 @@ exportClasses("SparkDataFrame") exportMethods("arrange", "as.data.frame", "attach", + "broadcast", "cache", "checkpoint", "coalesce",