From 29d8a5c6c22202cdf7d6cc44f1d6cbeca5946918 Mon Sep 17 00:00:00 2001 From: Narine Kokhlikyan Date: Tue, 21 Jun 2016 01:12:11 +0300 Subject: [PATCH 1/8] Fixed duplicated documentation problem + separated documentation for dapply and dapplyCollect --- R/pkg/R/DataFrame.R | 4 +++- R/pkg/R/generics.R | 2 +- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/R/pkg/R/DataFrame.R b/R/pkg/R/DataFrame.R index 0ff350d44d4b3..1170e65d31a51 100644 --- a/R/pkg/R/DataFrame.R +++ b/R/pkg/R/DataFrame.R @@ -1186,6 +1186,7 @@ dapplyInternal <- function(x, func, schema) { #' @family SparkDataFrame functions #' @rdname dapply #' @name dapply +#' @seealso dapplyCollect \link{dapplyCollect} #' @export #' @examples #' \dontrun{ @@ -1229,8 +1230,9 @@ setMethod("dapply", #' to each partition will be passed. #' The output of func should be a data.frame. #' @family SparkDataFrame functions -#' @rdname dapply +#' @rdname dapplyCollect #' @name dapplyCollect +#' @seealso dapply \link{dapply} #' @export #' @examples #' \dontrun{ diff --git a/R/pkg/R/generics.R b/R/pkg/R/generics.R index 50fc204f998a5..c7cfb11dadaed 100644 --- a/R/pkg/R/generics.R +++ b/R/pkg/R/generics.R @@ -450,7 +450,7 @@ setGeneric("covar_pop", function(col1, col2) {standardGeneric("covar_pop") }) #' @export setGeneric("dapply", function(x, func, schema) { standardGeneric("dapply") }) -#' @rdname dapply +#' @rdname dapplyCollect #' @export setGeneric("dapplyCollect", function(x, func) { standardGeneric("dapplyCollect") }) From 85a4493a03b3601a93c25ebc1eafb2868efec8d8 Mon Sep 17 00:00:00 2001 From: Narine Kokhlikyan Date: Thu, 7 Jul 2016 06:18:49 -0700 Subject: [PATCH 2/8] Adding programming guide for gapply/gapplyCollect --- docs/sparkr.md | 61 +++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 60 insertions(+), 1 deletion(-) diff --git a/docs/sparkr.md b/docs/sparkr.md index 9e74e4a96acdc..bee44d02905c9 100644 --- a/docs/sparkr.md +++ b/docs/sparkr.md @@ -1,7 +1,8 @@ --- +title: "SparkR (R on Spark)" layout: global +output: html_document displayTitle: SparkR (R on Spark) -title: SparkR (R on Spark) --- * This will become a table of contents (this text will be scraped). @@ -306,6 +307,64 @@ head(ldf, 3) {% endhighlight %} +#### Run a given function on a large dataset grouping by input column(s) and using `gapply` or `gapplyCollect` + +##### gapply +Apply a function to each group of a `SparkDataFrame`. The function is to be applied to each group of the `SparkDataFrame` and should have only two parameters: grouping key and R `data.frame` corresponding to +that key. The groups are chosen from `SparkDataFrame`s column(s). +The output of function should be a `data.frame`. Schema specifies the row format of the resulting +`SparkDataFrame`. It must match the R function's output. +
+{% highlight r %} + +# Determine six waiting times with the largest eruption time in minutes. +schema <- structType(structField("waiting", "double"), structField("max_eruption", "double")) +result <- gapply( + df, + "waiting", + function(key, x) { + y <- data.frame(key, max(x$eruptions)) + }, + schema) +head(collect(arrange(result, "max_eruption", decreasing = TRUE))) + +## waiting max_eruption +##1 64 5.100 +##2 69 5.067 +##3 71 5.033 +##4 87 5.000 +##5 63 4.933 +##6 89 4.900 +{% endhighlight %} +
+ +##### gapplyCollect +Like `gapply`, applies a function to each partition of a `SparkDataFrame` and collect the result back to R data.frame. The output of the function should be a `data.frame`. But, the schema is not required to be passed. Note that `gapplyCollect` can only be used if the output of UDF run on all the partitions can fit in driver memory. +
+{% highlight r %} + +# Determine six waiting times with the largest eruption time in minutes. +result <- gapplyCollect( + df, + "waiting", + function(key, x) { + y <- data.frame(key, max(x$eruptions)) + colnames(y) <- c("waiting", "max_eruption") + y + }) +head(result[order(result$max_eruption, decreasing = TRUE), ]) + +## waiting max_eruption +##1 64 5.100 +##2 69 5.067 +##3 71 5.033 +##4 87 5.000 +##5 63 4.933 +##6 89 4.900 + +{% endhighlight %} +
+ #### Run local R functions distributed using `spark.lapply` ##### spark.lapply From 7781d1c111f38e3608d5ebd468e6d344d52efa5c Mon Sep 17 00:00:00 2001 From: Narine Kokhlikyan Date: Thu, 7 Jul 2016 06:27:35 -0700 Subject: [PATCH 3/8] removing output format --- docs/sparkr.md | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/docs/sparkr.md b/docs/sparkr.md index bee44d02905c9..1b2dfbfea06e6 100644 --- a/docs/sparkr.md +++ b/docs/sparkr.md @@ -1,8 +1,7 @@ --- -title: "SparkR (R on Spark)" layout: global -output: html_document displayTitle: SparkR (R on Spark) +title: SparkR (R on Spark) --- * This will become a table of contents (this text will be scraped). From c1d71512a3bf0205615d1b6318029ad6f33d94dc Mon Sep 17 00:00:00 2001 From: Narine Kokhlikyan Date: Mon, 11 Jul 2016 23:05:25 -0700 Subject: [PATCH 4/8] Adding R-Spark data-type mapping --- docs/sparkr.md | 85 ++++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 79 insertions(+), 6 deletions(-) diff --git a/docs/sparkr.md b/docs/sparkr.md index 1b2dfbfea06e6..bc2ca3eb6f00c 100644 --- a/docs/sparkr.md +++ b/docs/sparkr.md @@ -263,7 +263,7 @@ In SparkR, we support several kinds of User-Defined Functions: ##### dapply Apply a function to each partition of a `SparkDataFrame`. The function to be applied to each partition of the `SparkDataFrame` and should have only one parameter, to which a `data.frame` corresponds to each partition will be passed. The output of function -should be a `data.frame`. Schema specifies the row format of the resulting a `SparkDataFrame`. It must match the R function's output. +should be a `data.frame`. Schema specifies the row format of the resulting a `SparkDataFrame`. It must match to [data types of R function's output fields](#data-type-mapping-between-r-and-spark).
{% highlight r %} @@ -285,9 +285,7 @@ head(collect(df1)) ##### dapplyCollect Like `dapply`, apply a function to each partition of a `SparkDataFrame` and collect the result back. The output of function -should be a `data.frame`. But, Schema is not required to be passed. Note that `dapplyCollect` only can be used if the -output of UDF run on all the partitions can fit in driver memory. -
+should be a `data.frame`. But, Schema is not required to be passed. Note that `dapplyCollect` can fail if the output of UDF run on all the partition cannot be pulled to the driver and fit in driver memory. {% highlight r %} # Convert waiting time from hours to seconds. @@ -312,7 +310,82 @@ head(ldf, 3) Apply a function to each group of a `SparkDataFrame`. The function is to be applied to each group of the `SparkDataFrame` and should have only two parameters: grouping key and R `data.frame` corresponding to that key. The groups are chosen from `SparkDataFrame`s column(s). The output of function should be a `data.frame`. Schema specifies the row format of the resulting -`SparkDataFrame`. It must match the R function's output. +`SparkDataFrame`. It must represent R function's output schema on the basis of Spark data types. The column names of each output field in the schema are set by user. Bellow data type mapping between R +and Spark. + +#### Data type mapping between R and Spark + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +
RSpark
bytebyte
integerinteger
floatfloat
doubledouble
numericdouble
characterstring
stringstring
binarybinary
rawbinary
logicalboolean
timestamptimestamp
datedate
arrayarray
listarray
mapmap
envmap
structstruct
+
{% highlight r %} @@ -338,7 +411,7 @@ head(collect(arrange(result, "max_eruption", decreasing = TRUE)))
##### gapplyCollect -Like `gapply`, applies a function to each partition of a `SparkDataFrame` and collect the result back to R data.frame. The output of the function should be a `data.frame`. But, the schema is not required to be passed. Note that `gapplyCollect` can only be used if the output of UDF run on all the partitions can fit in driver memory. +Like `gapply`, applies a function to each partition of a `SparkDataFrame` and collect the result back to R data.frame. The output of the function should be a `data.frame`. But, the schema is not required to be passed. Note that `gapplyCollect` can fail if the output of UDF run on all the partition cannot be pulled to the driver and fit in driver memory.
{% highlight r %} From 2af724321e0d51aed64c84dd22741a7cc6067caf Mon Sep 17 00:00:00 2001 From: Narine Kokhlikyan Date: Mon, 11 Jul 2016 23:17:26 -0700 Subject: [PATCH 5/8] bring back div tag got dapplyCollect --- docs/sparkr.md | 1 + 1 file changed, 1 insertion(+) diff --git a/docs/sparkr.md b/docs/sparkr.md index bc2ca3eb6f00c..2750640c66359 100644 --- a/docs/sparkr.md +++ b/docs/sparkr.md @@ -286,6 +286,7 @@ head(collect(df1)) ##### dapplyCollect Like `dapply`, apply a function to each partition of a `SparkDataFrame` and collect the result back. The output of function should be a `data.frame`. But, Schema is not required to be passed. Note that `dapplyCollect` can fail if the output of UDF run on all the partition cannot be pulled to the driver and fit in driver memory. +
{% highlight r %} # Convert waiting time from hours to seconds. From 8a2aff3add082e20c45136dc5814e6ccdf4b256c Mon Sep 17 00:00:00 2001 From: Narine Kokhlikyan Date: Wed, 13 Jul 2016 23:18:29 -0700 Subject: [PATCH 6/8] addressed Felix's comments --- docs/sparkr.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docs/sparkr.md b/docs/sparkr.md index 2750640c66359..82c791b93ee96 100644 --- a/docs/sparkr.md +++ b/docs/sparkr.md @@ -263,7 +263,7 @@ In SparkR, we support several kinds of User-Defined Functions: ##### dapply Apply a function to each partition of a `SparkDataFrame`. The function to be applied to each partition of the `SparkDataFrame` and should have only one parameter, to which a `data.frame` corresponds to each partition will be passed. The output of function -should be a `data.frame`. Schema specifies the row format of the resulting a `SparkDataFrame`. It must match to [data types of R function's output fields](#data-type-mapping-between-r-and-spark). +should be a `data.frame`. Schema specifies the row format of the resulting a `SparkDataFrame`. It must match to [data types](#data-type-mapping-between-r-and-spark) of returned `data.frame`.
{% highlight r %} @@ -311,7 +311,7 @@ head(ldf, 3) Apply a function to each group of a `SparkDataFrame`. The function is to be applied to each group of the `SparkDataFrame` and should have only two parameters: grouping key and R `data.frame` corresponding to that key. The groups are chosen from `SparkDataFrame`s column(s). The output of function should be a `data.frame`. Schema specifies the row format of the resulting -`SparkDataFrame`. It must represent R function's output schema on the basis of Spark data types. The column names of each output field in the schema are set by user. Bellow data type mapping between R +`SparkDataFrame`. It must represent R function's output schema on the basis of Spark data types. The column names of the returned `data.frame` are set by user. Below data type mapping between R and Spark. #### Data type mapping between R and Spark From 19e849f066e970a755401f99bc8248b8258a11c4 Mon Sep 17 00:00:00 2001 From: Narine Kokhlikyan Date: Thu, 14 Jul 2016 22:56:52 -0700 Subject: [PATCH 7/8] update data types --- docs/sparkr.md | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/docs/sparkr.md b/docs/sparkr.md index ad15c2ef6bad8..18dcb1a291905 100644 --- a/docs/sparkr.md +++ b/docs/sparkr.md @@ -367,11 +367,15 @@ and Spark. boolean + [POSIXct](https://stat.ethz.ch/R-manual/R-devel/library/base/html/DateTimeClasses.html) timestamp + + + [POSIXlt](https://stat.ethz.ch/R-manual/R-devel/library/base/html/DateTimeClasses.html) timestamp - date + [Date](https://stat.ethz.ch/R-manual/R-devel/library/base/html/Dates.html) date @@ -382,18 +386,10 @@ and Spark. list array - - map - map - env map - - struct - struct -
From f584416b81bc19d951d28eb2861cc3a4a16bc117 Mon Sep 17 00:00:00 2001 From: Narine Kokhlikyan Date: Sat, 16 Jul 2016 13:23:23 -0700 Subject: [PATCH 8/8] Fixing a-href and the div for gapply/dapply --- docs/sparkr.md | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/docs/sparkr.md b/docs/sparkr.md index 18dcb1a291905..a5235b2bf66a4 100644 --- a/docs/sparkr.md +++ b/docs/sparkr.md @@ -273,6 +273,7 @@ In SparkR, we support several kinds of User-Defined Functions: ##### dapply Apply a function to each partition of a `SparkDataFrame`. The function to be applied to each partition of the `SparkDataFrame` and should have only one parameter, to which a `data.frame` corresponds to each partition will be passed. The output of function should be a `data.frame`. Schema specifies the row format of the resulting a `SparkDataFrame`. It must match to [data types](#data-type-mapping-between-r-and-spark) of returned value. +
{% highlight r %} @@ -295,6 +296,7 @@ head(collect(df1)) ##### dapplyCollect Like `dapply`, apply a function to each partition of a `SparkDataFrame` and collect the result back. The output of function should be a `data.frame`. But, Schema is not required to be passed. Note that `dapplyCollect` can fail if the output of UDF run on all the partition cannot be pulled to the driver and fit in driver memory. +
{% highlight r %} @@ -320,7 +322,7 @@ head(ldf, 3) Apply a function to each group of a `SparkDataFrame`. The function is to be applied to each group of the `SparkDataFrame` and should have only two parameters: grouping key and R `data.frame` corresponding to that key. The groups are chosen from `SparkDataFrame`s column(s). The output of function should be a `data.frame`. Schema specifies the row format of the resulting -`SparkDataFrame`. It must represent R function's output schema on the basis of Spark data types. The column names of the returned `data.frame` are set by user. Below data type mapping between R +`SparkDataFrame`. It must represent R function's output schema on the basis of Spark data types. The column names of the returned `data.frame` are set by user. Below is the data type mapping between R and Spark. #### Data type mapping between R and Spark @@ -367,15 +369,15 @@ and Spark. boolean - [POSIXct](https://stat.ethz.ch/R-manual/R-devel/library/base/html/DateTimeClasses.html) + POSIXct timestamp - [POSIXlt](https://stat.ethz.ch/R-manual/R-devel/library/base/html/DateTimeClasses.html) + POSIXlt timestamp - [Date](https://stat.ethz.ch/R-manual/R-devel/library/base/html/Dates.html) + Date date @@ -418,6 +420,7 @@ head(collect(arrange(result, "max_eruption", decreasing = TRUE))) ##### gapplyCollect Like `gapply`, applies a function to each partition of a `SparkDataFrame` and collect the result back to R data.frame. The output of the function should be a `data.frame`. But, the schema is not required to be passed. Note that `gapplyCollect` can fail if the output of UDF run on all the partition cannot be pulled to the driver and fit in driver memory. +
{% highlight r %}