From 4777b774b0aba00aeaf30d30eed344cd0e37df6e Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Sat, 2 Apr 2016 20:39:42 -0700 Subject: [PATCH 01/13] added python api for time windowing --- python/pyspark/sql/functions.py | 38 +++++++++++++++++++ .../org/apache/spark/sql/functions.scala | 9 ++--- 2 files changed, 41 insertions(+), 6 deletions(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 3b20ba5177efd..cc3bfd948e549 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -526,6 +526,44 @@ def spark_partition_id(): return Column(sc._jvm.functions.spark_partition_id()) +@since(2.0) +def window(col, windowDuration, slideDuration=None, startTime=None): + """Bucketize rows into one or more time windows given a timestamp specifying column. Window + starts are inclusive but the window ends are exclusive, e.g. 12:05 will be in the window + [12:05,12:10) but not in [12:00,12:05). Windows can support microsecond precision. Windows in + the order of months are not supported. + + The time column must be of TimestampType. + + Durations are provided as strings, e.g. '1 second', '1 day 12 hours', '2 minutes'. Valid + interval strings are 'week', 'day', 'hour', 'minute', 'second', 'millisecond', 'microsecond'. + If the `slideDuration` is not provided, the windows will be tumbling windows. + + The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start + window intervals. For example, in order to have hourly tumbling windows that start 15 minutes + past the hour, e.g. 12:15-13:15, 13:15-14:15... provide `startTime` as `15 minutes`. + + The output column will be a struct called 'window' by default with the nested columns 'start' + and 'end'. + + >>> df = sqlContext.createDataFrame([("2016-03-11 09:00:07", 1)]).toDF("date", "val") + >>> w = df.groupBy(window("date", "5 seconds")).agg(sum("val").alias("sum")) + >>> w.select(w.window.start.cast("string"), w.window.end.cast("string"), "sum").collect() + [Row(start="2016-03-11 09:00:05, end=2016-03-11 09:00:10, sum=1)] + """ + sc = SparkContext._active_spark_context + time_col = _to_java_column(col) + if slideDuration and startTime: + res = sc._jvm.functions.window(time_col, windowDuration, slideDuration, startTime) + elif slideDuration: + res = sc._jvm.functions.window(time_col, windowDuration, slideDuration) + elif startTime: + res = sc._jvm.functions.window(time_col, windowDuration, windowDuration, startTime) + else: + res = sc._jvm.functions.window(time_col, windowDuration) + return Column(res) + + @since(1.5) def expr(str): """Parses the expression string into the column that it represents diff --git a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala index 74906050acbb3..b6b01259a3d26 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/functions.scala @@ -2575,8 +2575,7 @@ object functions { * processing time. * * @param timeColumn The column or the expression to use as the timestamp for windowing by time. - * The time can be as TimestampType or LongType, however when using LongType, - * the time must be given in seconds. + * The time column must be of TimestampType. * @param windowDuration A string specifying the width of the window, e.g. `10 minutes`, * `1 second`. Check [[org.apache.spark.unsafe.types.CalendarInterval]] for * valid duration identifiers. @@ -2630,8 +2629,7 @@ object functions { * processing time. * * @param timeColumn The column or the expression to use as the timestamp for windowing by time. - * The time can be as TimestampType or LongType, however when using LongType, - * the time must be given in seconds. + * The time column must be of TimestampType. * @param windowDuration A string specifying the width of the window, e.g. `10 minutes`, * `1 second`. Check [[org.apache.spark.unsafe.types.CalendarInterval]] for * valid duration identifiers. @@ -2673,8 +2671,7 @@ object functions { * processing time. * * @param timeColumn The column or the expression to use as the timestamp for windowing by time. - * The time can be as TimestampType or LongType, however when using LongType, - * the time must be given in seconds. + * The time column must be of TimestampType. * @param windowDuration A string specifying the width of the window, e.g. `10 minutes`, * `1 second`. Check [[org.apache.spark.unsafe.types.CalendarInterval]] for * valid duration identifiers. From 7b0fb13e1553a765c371f6eb38bda7e23c846eb3 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Sun, 3 Apr 2016 00:05:12 -0700 Subject: [PATCH 02/13] everything except R --- R/pkg/R/functions.R | 107 ++++++++++++++++++ python/pyspark/sql/functions.py | 76 ++++++------- .../catalyst/analysis/FunctionRegistry.scala | 5 +- .../sql/catalyst/expressions/TimeWindow.scala | 35 ++++++ .../expressions/TimeWindowSuite.scala | 37 +++++- .../sql/DataFrameTimeWindowingSuite.scala | 57 ++++++++++ 6 files changed, 277 insertions(+), 40 deletions(-) diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index d9c10b4a4b9fb..22d85c62f626e 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -2131,6 +2131,113 @@ setMethod("from_unixtime", signature(x = "Column"), column(jc) }) +#' window +#' +#' Bucketize rows into one or more time windows given a timestamp specifying column. Window +#' starts are inclusive but the window ends are exclusive, e.g. 12:05 will be in the window +#' [12:05,12:10) but not in [12:00,12:05). Windows can support microsecond precision. Windows in +#' the order of months are not supported. +#' +#' The time column must be of TimestampType. +#' +#' Durations are provided as strings, e.g. '1 second', '1 day 12 hours', '2 minutes'. Valid +#' interval strings are 'week', 'day', 'hour', 'minute', 'second', 'millisecond', 'microsecond'. +#' If the `slideDuration` is not provided, the windows will be tumbling windows. +#' +#' The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start +#' window intervals. For example, in order to have hourly tumbling windows that start 15 minutes +#' past the hour, e.g. 12:15-13:15, 13:15-14:15... provide `startTime` as `15 minutes`. +#' +#' The output column will be a struct called 'window' by default with the nested columns 'start' +#' and 'end'. +#' +#' @family datetime_funcs +#' @rdname window +#' @name window +#' @export +#' @examples +#'\dontrun{ +#' # One minute windows every 15 seconds 10 seconds after the minute, e.g. 09:00:10-09:01:10, +#' # 09:00:25-09:01:25, 09:00:40-09:01:40, ... +#' window(df$time, "1 minute", "15 seconds", "10 seconds") +#'} +setMethod("window", signature(x = "Column", windowDuration = "character", + slideDuration = "character", startTime = "character"), + function(x, windowDuration, slideDuration, startTime) { + jc <- callJStatic("org.apache.spark.sql.functions", + "window", + x@jc, windowDuration, slideDuration, startTime) + column(jc) + }) + +#' window +#' +#' Bucketize rows into one or more time windows given a timestamp specifying column. Window +#' starts are inclusive but the window ends are exclusive, e.g. 12:05 will be in the window +#' [12:05,12:10) but not in [12:00,12:05). Windows can support microsecond precision. Windows in +#' the order of months are not supported. +#' +#' The time column must be of TimestampType. +#' +#' Durations are provided as strings, e.g. '1 second', '1 day 12 hours', '2 minutes'. Valid +#' interval strings are 'week', 'day', 'hour', 'minute', 'second', 'millisecond', 'microsecond'. +#' If the `slideDuration` is not provided, the windows will be tumbling windows. +#' +#' The output column will be a struct called 'window' by default with the nested columns 'start' +#' and 'end'. +#' +#' @family datetime_funcs +#' @rdname window +#' @name window +#' @export +#' @examples +#'\dontrun{ +#' # Thirty second windows every 10 seconds, e.g. 09:00:00-09:00:30, 09:00:10-09:00:40, ... +#' window(df$time, "30 seconds", "10 seconds") +#'} +setMethod("window", signature(x = "Column", windowDuration = "character", + slideDuration = "character", startTime = "missing"), + function(x, windowDuration, slideDuration, startTime) { + jc <- callJStatic("org.apache.spark.sql.functions", + "window", + x@jc, windowDuration, slideDuration) + column(jc) + }) + +#' window +#' +#' Bucketize rows into one or more time windows given a timestamp specifying column. Window +#' starts are inclusive but the window ends are exclusive, e.g. 12:05 will be in the window +#' [12:05,12:10) but not in [12:00,12:05). Windows can support microsecond precision. Windows in +#' the order of months are not supported. +#' +#' The time column must be of TimestampType. +#' +#' Durations are provided as strings, e.g. '1 second', '1 day 12 hours', '2 minutes'. Valid +#' interval strings are 'week', 'day', 'hour', 'minute', 'second', 'millisecond', 'microsecond'. +#' If the `slideDuration` is not provided, the windows will be tumbling windows. +#' +#' The output column will be a struct called 'window' by default with the nested columns 'start' +#' and 'end'. +#' +#' @family datetime_funcs +#' @rdname window +#' @name window +#' @export +#' @examples +#'\dontrun{ +#' # Thirty second tumbling windows, e.g. 09:00:00-09:00:30, 09:00:30-09:01:00, ... +#' window(df$time, "30 seconds") +#'} +setMethod("window", signature(x = "Column", windowDuration = "character", + slideDuration = "missing", startTime = "missing"), + function(x, windowDuration, slideDuration, startTime) { + jc <- callJStatic("org.apache.spark.sql.functions", + "window", + x@jc, windowDuration) + column(jc) + }) + #' locate #' #' Locate the position of the first occurrence of substr. diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index cc3bfd948e549..567274a95114a 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -526,44 +526,6 @@ def spark_partition_id(): return Column(sc._jvm.functions.spark_partition_id()) -@since(2.0) -def window(col, windowDuration, slideDuration=None, startTime=None): - """Bucketize rows into one or more time windows given a timestamp specifying column. Window - starts are inclusive but the window ends are exclusive, e.g. 12:05 will be in the window - [12:05,12:10) but not in [12:00,12:05). Windows can support microsecond precision. Windows in - the order of months are not supported. - - The time column must be of TimestampType. - - Durations are provided as strings, e.g. '1 second', '1 day 12 hours', '2 minutes'. Valid - interval strings are 'week', 'day', 'hour', 'minute', 'second', 'millisecond', 'microsecond'. - If the `slideDuration` is not provided, the windows will be tumbling windows. - - The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start - window intervals. For example, in order to have hourly tumbling windows that start 15 minutes - past the hour, e.g. 12:15-13:15, 13:15-14:15... provide `startTime` as `15 minutes`. - - The output column will be a struct called 'window' by default with the nested columns 'start' - and 'end'. - - >>> df = sqlContext.createDataFrame([("2016-03-11 09:00:07", 1)]).toDF("date", "val") - >>> w = df.groupBy(window("date", "5 seconds")).agg(sum("val").alias("sum")) - >>> w.select(w.window.start.cast("string"), w.window.end.cast("string"), "sum").collect() - [Row(start="2016-03-11 09:00:05, end=2016-03-11 09:00:10, sum=1)] - """ - sc = SparkContext._active_spark_context - time_col = _to_java_column(col) - if slideDuration and startTime: - res = sc._jvm.functions.window(time_col, windowDuration, slideDuration, startTime) - elif slideDuration: - res = sc._jvm.functions.window(time_col, windowDuration, slideDuration) - elif startTime: - res = sc._jvm.functions.window(time_col, windowDuration, windowDuration, startTime) - else: - res = sc._jvm.functions.window(time_col, windowDuration) - return Column(res) - - @since(1.5) def expr(str): """Parses the expression string into the column that it represents @@ -1091,6 +1053,44 @@ def to_utc_timestamp(timestamp, tz): return Column(sc._jvm.functions.to_utc_timestamp(_to_java_column(timestamp), tz)) +@since(2.0) +def window(col, windowDuration, slideDuration=None, startTime=None): + """Bucketize rows into one or more time windows given a timestamp specifying column. Window + starts are inclusive but the window ends are exclusive, e.g. 12:05 will be in the window + [12:05,12:10) but not in [12:00,12:05). Windows can support microsecond precision. Windows in + the order of months are not supported. + + The time column must be of TimestampType. + + Durations are provided as strings, e.g. '1 second', '1 day 12 hours', '2 minutes'. Valid + interval strings are 'week', 'day', 'hour', 'minute', 'second', 'millisecond', 'microsecond'. + If the `slideDuration` is not provided, the windows will be tumbling windows. + + The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start + window intervals. For example, in order to have hourly tumbling windows that start 15 minutes + past the hour, e.g. 12:15-13:15, 13:15-14:15... provide `startTime` as `15 minutes`. + + The output column will be a struct called 'window' by default with the nested columns 'start' + and 'end'. + + >>> df = sqlContext.createDataFrame([("2016-03-11 09:00:07", 1)]).toDF("date", "val") + >>> w = df.groupBy(window("date", "5 seconds")).agg(sum("val").alias("sum")) + >>> w.select(w.window.start.cast("string"), w.window.end.cast("string"), "sum").collect() + [Row(start="2016-03-11 09:00:05, end=2016-03-11 09:00:10, sum=1)] + """ + sc = SparkContext._active_spark_context + time_col = _to_java_column(col) + if slideDuration and startTime: + res = sc._jvm.functions.window(time_col, windowDuration, slideDuration, startTime) + elif slideDuration: + res = sc._jvm.functions.window(time_col, windowDuration, slideDuration) + elif startTime: + res = sc._jvm.functions.window(time_col, windowDuration, windowDuration, startTime) + else: + res = sc._jvm.functions.window(time_col, windowDuration) + return Column(res) + + # ---------------------------- misc functions ---------------------------------- @since(1.5) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index ca8db3cbc5993..a8adfd478a63e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -362,7 +362,10 @@ object FunctionRegistry { } Try(f.newInstance(expressions : _*).asInstanceOf[Expression]) match { case Success(e) => e - case Failure(e) => throw new AnalysisException(e.getMessage) + case Failure(e) => + // the exception is an invocation exception. To get a meaningful message, we need the + // cause. + throw new AnalysisException(e.getCause.getMessage) } } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala index 8e13833486931..daf3de95dd9ea 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/TimeWindow.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.commons.lang.StringUtils +import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.TypeCheckFailure import org.apache.spark.sql.catalyst.expressions.codegen.{CodegenContext, ExprCode} @@ -34,6 +35,28 @@ case class TimeWindow( with Unevaluable with NonSQLExpression { + ////////////////////////// + // SQL Constructors + ////////////////////////// + + def this( + timeColumn: Expression, + windowDuration: Expression, + slideDuration: Expression, + startTime: Expression) = { + this(timeColumn, TimeWindow.parseExpression(windowDuration), + TimeWindow.parseExpression(windowDuration), TimeWindow.parseExpression(startTime)) + } + + def this(timeColumn: Expression, windowDuration: Expression, slideDuration: Expression) = { + this(timeColumn, TimeWindow.parseExpression(windowDuration), + TimeWindow.parseExpression(windowDuration), 0) + } + + def this(timeColumn: Expression, windowDuration: Expression) = { + this(timeColumn, windowDuration, windowDuration) + } + override def child: Expression = timeColumn override def inputTypes: Seq[AbstractDataType] = Seq(TimestampType) override def dataType: DataType = new StructType() @@ -104,6 +127,18 @@ object TimeWindow { cal.microseconds } + /** + * Parses the duration expression to generate the long value for the original constructor so + * that we can use `window` in SQL. + */ + private def parseExpression(expr: Expression): Long = expr match { + case NonNullLiteral(s, StringType) => getIntervalInMicroSeconds(s.toString) + case IntegerLiteral(i) => i.toLong + case NonNullLiteral(l, LongType) => l.toString.toLong + case _ => throw new AnalysisException("The duration and time inputs to window must be " + + "an integer, long or string literal.") + } + def apply( timeColumn: Expression, windowDuration: String, diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeWindowSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeWindowSuite.scala index 71f969aee2ee4..b82cf8d1693e2 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeWindowSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/TimeWindowSuite.scala @@ -17,10 +17,13 @@ package org.apache.spark.sql.catalyst.expressions +import org.scalatest.PrivateMethodTester + import org.apache.spark.SparkFunSuite import org.apache.spark.sql.AnalysisException +import org.apache.spark.sql.types.LongType -class TimeWindowSuite extends SparkFunSuite with ExpressionEvalHelper { +class TimeWindowSuite extends SparkFunSuite with ExpressionEvalHelper with PrivateMethodTester { test("time window is unevaluable") { intercept[UnsupportedOperationException] { @@ -73,4 +76,36 @@ class TimeWindowSuite extends SparkFunSuite with ExpressionEvalHelper { === seconds) } } + + private val parseExpression = PrivateMethod[Long]('parseExpression) + + test("parse sql expression for duration in microseconds - string") { + val dur = TimeWindow.invokePrivate(parseExpression(Literal("5 seconds"))) + assert(dur.isInstanceOf[Long]) + assert(dur === 5000000) + } + + test("parse sql expression for duration in microseconds - integer") { + val dur = TimeWindow.invokePrivate(parseExpression(Literal(100))) + assert(dur.isInstanceOf[Long]) + assert(dur === 100) + } + + test("parse sql expression for duration in microseconds - long") { + val dur = TimeWindow.invokePrivate(parseExpression(Literal.create(2 << 52, LongType))) + assert(dur.isInstanceOf[Long]) + assert(dur === (2 << 52)) + } + + test("parse sql expression for duration in microseconds - invalid interval") { + intercept[IllegalArgumentException] { + TimeWindow.invokePrivate(parseExpression(Literal("2 apples"))) + } + } + + test("parse sql expression for duration in microseconds - invalid expression") { + intercept[AnalysisException] { + TimeWindow.invokePrivate(parseExpression(Rand(123))) + } + } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala index e8103a31d5833..06584ec21e2f8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameTimeWindowingSuite.scala @@ -239,4 +239,61 @@ class DataFrameTimeWindowingSuite extends QueryTest with SharedSQLContext with B Row("2016-03-27 09:00:00.68", "2016-03-27 09:00:00.88", 1)) ) } + + private def withTempTable(f: String => Unit): Unit = { + val tableName = "temp" + Seq( + ("2016-03-27 19:39:34", 1), + ("2016-03-27 19:39:56", 2), + ("2016-03-27 19:39:27", 4)).toDF("time", "value").registerTempTable(tableName) + try { + f(tableName) + } finally { + sqlContext.dropTempTable(tableName) + } + } + + test("time window in SQL with single string expression") { + withTempTable { table => + checkAnswer( + sqlContext.sql(s"""select window(time, "10 seconds"), value from $table""") + .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"), + Seq( + Row("2016-03-27 19:39:20", "2016-03-27 19:39:30", 4), + Row("2016-03-27 19:39:30", "2016-03-27 19:39:40", 1), + Row("2016-03-27 19:39:50", "2016-03-27 19:40:00", 2) + ) + ) + } + } + + test("time window in SQL with with two expressions") { + withTempTable { table => + checkAnswer( + sqlContext.sql( + s"""select window(time, "10 seconds", 10000000), value from $table""") + .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"), + Seq( + Row("2016-03-27 19:39:20", "2016-03-27 19:39:30", 4), + Row("2016-03-27 19:39:30", "2016-03-27 19:39:40", 1), + Row("2016-03-27 19:39:50", "2016-03-27 19:40:00", 2) + ) + ) + } + } + + test("time window in SQL with with three expressions") { + withTempTable { table => + checkAnswer( + sqlContext.sql( + s"""select window(time, "10 seconds", 10000000, "5 seconds"), value from $table""") + .select($"window.start".cast(StringType), $"window.end".cast(StringType), $"value"), + Seq( + Row("2016-03-27 19:39:25", "2016-03-27 19:39:35", 1), + Row("2016-03-27 19:39:25", "2016-03-27 19:39:35", 4), + Row("2016-03-27 19:39:55", "2016-03-27 19:40:05", 2) + ) + ) + } + } } From f589469b9611b4560cb4eac262bb4f554f6c2bb7 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Sun, 3 Apr 2016 01:31:41 -0700 Subject: [PATCH 03/13] added window support for sql, python, and R --- R/pkg/NAMESPACE | 1 + R/pkg/R/functions.R | 92 ++++++----------------- R/pkg/inst/tests/testthat/test_sparkSQL.R | 36 +++++++++ 3 files changed, 59 insertions(+), 70 deletions(-) diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index fa3fb0b09a1b0..f48c61c1d59c5 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -265,6 +265,7 @@ exportMethods("%in%", "var_samp", "weekofyear", "when", + "window", "year") exportClasses("GroupedData") diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 22d85c62f626e..787f1045492de 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -2160,81 +2160,33 @@ setMethod("from_unixtime", signature(x = "Column"), #' # One minute windows every 15 seconds 10 seconds after the minute, e.g. 09:00:10-09:01:10, #' # 09:00:25-09:01:25, 09:00:40-09:01:40, ... #' window(df$time, "1 minute", "15 seconds", "10 seconds") -#'} -setMethod("window", signature(x = "Column", windowDuration = "character", - slideDuration = "character", startTime = "character"), - function(x, windowDuration, slideDuration, startTime) { - jc <- callJStatic("org.apache.spark.sql.functions", - "window", - x@jc, windowDuration, slideDuration, startTime) - column(jc) - }) - -#' window -#' -#' Bucketize rows into one or more time windows given a timestamp specifying column. Window -#' starts are inclusive but the window ends are exclusive, e.g. 12:05 will be in the window -#' [12:05,12:10) but not in [12:00,12:05). Windows can support microsecond precision. Windows in -#' the order of months are not supported. -#' -#' The time column must be of TimestampType. -#' -#' Durations are provided as strings, e.g. '1 second', '1 day 12 hours', '2 minutes'. Valid -#' interval strings are 'week', 'day', 'hour', 'minute', 'second', 'millisecond', 'microsecond'. -#' If the `slideDuration` is not provided, the windows will be tumbling windows. #' -#' The output column will be a struct called 'window' by default with the nested columns 'start' -#' and 'end'. +#' # One minute tumbling windows 15 seconds after the minute, e.g. 09:00:15-09:01:15, +# # 09:01:15-09:02:15... +#' window(df$time, "1 minute", startTime = "15 seconds") #' -#' @family datetime_funcs -#' @rdname window -#' @name window -#' @export -#' @examples -#'\dontrun{ #' # Thirty second windows every 10 seconds, e.g. 09:00:00-09:00:30, 09:00:10-09:00:40, ... #' window(df$time, "30 seconds", "10 seconds") #'} -setMethod("window", signature(x = "Column", windowDuration = "character", - slideDuration = "character", startTime = "missing"), - function(x, windowDuration, slideDuration, startTime) { - jc <- callJStatic("org.apache.spark.sql.functions", - "window", - x@jc, windowDuration, slideDuration) - column(jc) - }) - -#' window -#' -#' Bucketize rows into one or more time windows given a timestamp specifying column. Window -#' starts are inclusive but the window ends are exclusive, e.g. 12:05 will be in the window -#' [12:05,12:10) but not in [12:00,12:05). Windows can support microsecond precision. Windows in -#' the order of months are not supported. -#' -#' The time column must be of TimestampType. -#' -#' Durations are provided as strings, e.g. '1 second', '1 day 12 hours', '2 minutes'. Valid -#' interval strings are 'week', 'day', 'hour', 'minute', 'second', 'millisecond', 'microsecond'. -#' If the `slideDuration` is not provided, the windows will be tumbling windows. -#' -#' The output column will be a struct called 'window' by default with the nested columns 'start' -#' and 'end'. -#' -#' @family datetime_funcs -#' @rdname window -#' @name window -#' @export -#' @examples -#'\dontrun{ -#' # Thirty second tumbling windows, e.g. 09:00:00-09:00:30, 09:00:30-09:01:00, ... -#' window(df$time, "30 seconds") -#'} -setMethod("window", signature(x = "Column", windowDuration = "character", - slideDuration = "missing", startTime = "missing"), - function(x, windowDuration, slideDuration, startTime) { - jc <- callJStatic("org.apache.spark.sql.functions", - "window", - x@jc, windowDuration) +setMethod("window", signature(x = "Column"), + function(x, windowDuration, slideDuration = NULL, startTime = NULL) { + if (!is.null(slideDuration) && !is.null(startTime)) { + jc <- callJStatic("org.apache.spark.sql.functions", + "window", + x@jc, windowDuration, slideDuration, startTime) + } else if (!is.null(slideDuration)) { + jc <- callJStatic("org.apache.spark.sql.functions", + "window", + x@jc, windowDuration, slideDuration) + } else if (!is.null(startTime)) { + jc <- callJStatic("org.apache.spark.sql.functions", + "window", + x@jc, windowDuration, windowDuration, startTime) + } else { + jc <- callJStatic("org.apache.spark.sql.functions", + "window", + x@jc, windowDuration) + } column(jc) }) diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index eef365b42e56d..e2b7994ae156c 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -1204,6 +1204,42 @@ test_that("greatest() and least() on a DataFrame", { expect_equal(collect(select(df, least(df$a, df$b)))[, 1], c(1, 3)) }) +test_that("time windowing (window()) with all inputs", { + df <- createDataFrame(sqlContext, data.frame(t=c("2016-03-11 09:00:07"), v=c(1))) + df$window <- window(df$t, "5 seconds", "5 seconds", "0 seconds") + local <- collect(df)$v + # Not checking time windows because of possible time zone issues. Just checking that the function + # works + expect_equal(local, c(1)) +}) + +test_that("time windowing (window()) with slide duration", { + df <- createDataFrame(sqlContext, data.frame(t=c("2016-03-11 09:00:07"), v=c(1))) + df$window <- window(df$t, "5 seconds", "2 seconds") + local <- collect(df)$v + # Not checking time windows because of possible time zone issues. Just checking that the function + # works + expect_equal(local, c(1, 1)) +}) + +test_that("time windowing (window()) with start time", { + df <- createDataFrame(sqlContext, data.frame(t=c("2016-03-11 09:00:07"), v=c(1))) + df$window <- window(df$t, "5 seconds", startTime = "2 seconds") + local <- collect(df)$v + # Not checking time windows because of possible time zone issues. Just checking that the function + # works + expect_equal(local, c(1)) +}) + +test_that("time windowing (window()) with just window duration", { + df <- createDataFrame(sqlContext, data.frame(t=c("2016-03-11 09:00:07"), v=c(1))) + df$window <- window(df$t, "5 seconds") + local <- collect(df)$v + # Not checking time windows because of possible time zone issues. Just checking that the function + # works + expect_equal(local, c(1)) +}) + test_that("when(), otherwise() and ifelse() on a DataFrame", { l <- list(list(a = 1, b = 2), list(a = 3, b = 4)) df <- createDataFrame(sqlContext, l) From 12e1841adde040334c5bcb6a0179bf9ee853a218 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Sun, 3 Apr 2016 10:17:20 -0700 Subject: [PATCH 04/13] fixed R style --- R/pkg/R/functions.R | 2 +- R/pkg/inst/tests/testthat/test_sparkSQL.R | 8 ++++---- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index 787f1045492de..c15fa35737b02 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -2162,7 +2162,7 @@ setMethod("from_unixtime", signature(x = "Column"), #' window(df$time, "1 minute", "15 seconds", "10 seconds") #' #' # One minute tumbling windows 15 seconds after the minute, e.g. 09:00:15-09:01:15, -# # 09:01:15-09:02:15... +#' # 09:01:15-09:02:15... #' window(df$time, "1 minute", startTime = "15 seconds") #' #' # Thirty second windows every 10 seconds, e.g. 09:00:00-09:00:30, 09:00:10-09:00:40, ... diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index e2b7994ae156c..22eb3ec984673 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -1205,7 +1205,7 @@ test_that("greatest() and least() on a DataFrame", { }) test_that("time windowing (window()) with all inputs", { - df <- createDataFrame(sqlContext, data.frame(t=c("2016-03-11 09:00:07"), v=c(1))) + df <- createDataFrame(sqlContext, data.frame(t = c("2016-03-11 09:00:07"), v = c(1))) df$window <- window(df$t, "5 seconds", "5 seconds", "0 seconds") local <- collect(df)$v # Not checking time windows because of possible time zone issues. Just checking that the function @@ -1214,7 +1214,7 @@ test_that("time windowing (window()) with all inputs", { }) test_that("time windowing (window()) with slide duration", { - df <- createDataFrame(sqlContext, data.frame(t=c("2016-03-11 09:00:07"), v=c(1))) + df <- createDataFrame(sqlContext, data.frame(t = c("2016-03-11 09:00:07"), v = c(1))) df$window <- window(df$t, "5 seconds", "2 seconds") local <- collect(df)$v # Not checking time windows because of possible time zone issues. Just checking that the function @@ -1223,7 +1223,7 @@ test_that("time windowing (window()) with slide duration", { }) test_that("time windowing (window()) with start time", { - df <- createDataFrame(sqlContext, data.frame(t=c("2016-03-11 09:00:07"), v=c(1))) + df <- createDataFrame(sqlContext, data.frame(t = c("2016-03-11 09:00:07"), v = c(1))) df$window <- window(df$t, "5 seconds", startTime = "2 seconds") local <- collect(df)$v # Not checking time windows because of possible time zone issues. Just checking that the function @@ -1232,7 +1232,7 @@ test_that("time windowing (window()) with start time", { }) test_that("time windowing (window()) with just window duration", { - df <- createDataFrame(sqlContext, data.frame(t=c("2016-03-11 09:00:07"), v=c(1))) + df <- createDataFrame(sqlContext, data.frame(t = c("2016-03-11 09:00:07"), v = c(1))) df$window <- window(df$t, "5 seconds") local <- collect(df)$v # Not checking time windows because of possible time zone issues. Just checking that the function From e2aa616c342db09add70fb2002646476df887381 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Sun, 3 Apr 2016 19:38:12 -0700 Subject: [PATCH 05/13] remove R changes --- R/pkg/NAMESPACE | 1 - R/pkg/R/functions.R | 59 ----------------------- R/pkg/inst/tests/testthat/test_sparkSQL.R | 36 -------------- 3 files changed, 96 deletions(-) diff --git a/R/pkg/NAMESPACE b/R/pkg/NAMESPACE index f48c61c1d59c5..fa3fb0b09a1b0 100644 --- a/R/pkg/NAMESPACE +++ b/R/pkg/NAMESPACE @@ -265,7 +265,6 @@ exportMethods("%in%", "var_samp", "weekofyear", "when", - "window", "year") exportClasses("GroupedData") diff --git a/R/pkg/R/functions.R b/R/pkg/R/functions.R index c15fa35737b02..d9c10b4a4b9fb 100644 --- a/R/pkg/R/functions.R +++ b/R/pkg/R/functions.R @@ -2131,65 +2131,6 @@ setMethod("from_unixtime", signature(x = "Column"), column(jc) }) -#' window -#' -#' Bucketize rows into one or more time windows given a timestamp specifying column. Window -#' starts are inclusive but the window ends are exclusive, e.g. 12:05 will be in the window -#' [12:05,12:10) but not in [12:00,12:05). Windows can support microsecond precision. Windows in -#' the order of months are not supported. -#' -#' The time column must be of TimestampType. -#' -#' Durations are provided as strings, e.g. '1 second', '1 day 12 hours', '2 minutes'. Valid -#' interval strings are 'week', 'day', 'hour', 'minute', 'second', 'millisecond', 'microsecond'. -#' If the `slideDuration` is not provided, the windows will be tumbling windows. -#' -#' The startTime is the offset with respect to 1970-01-01 00:00:00 UTC with which to start -#' window intervals. For example, in order to have hourly tumbling windows that start 15 minutes -#' past the hour, e.g. 12:15-13:15, 13:15-14:15... provide `startTime` as `15 minutes`. -#' -#' The output column will be a struct called 'window' by default with the nested columns 'start' -#' and 'end'. -#' -#' @family datetime_funcs -#' @rdname window -#' @name window -#' @export -#' @examples -#'\dontrun{ -#' # One minute windows every 15 seconds 10 seconds after the minute, e.g. 09:00:10-09:01:10, -#' # 09:00:25-09:01:25, 09:00:40-09:01:40, ... -#' window(df$time, "1 minute", "15 seconds", "10 seconds") -#' -#' # One minute tumbling windows 15 seconds after the minute, e.g. 09:00:15-09:01:15, -#' # 09:01:15-09:02:15... -#' window(df$time, "1 minute", startTime = "15 seconds") -#' -#' # Thirty second windows every 10 seconds, e.g. 09:00:00-09:00:30, 09:00:10-09:00:40, ... -#' window(df$time, "30 seconds", "10 seconds") -#'} -setMethod("window", signature(x = "Column"), - function(x, windowDuration, slideDuration = NULL, startTime = NULL) { - if (!is.null(slideDuration) && !is.null(startTime)) { - jc <- callJStatic("org.apache.spark.sql.functions", - "window", - x@jc, windowDuration, slideDuration, startTime) - } else if (!is.null(slideDuration)) { - jc <- callJStatic("org.apache.spark.sql.functions", - "window", - x@jc, windowDuration, slideDuration) - } else if (!is.null(startTime)) { - jc <- callJStatic("org.apache.spark.sql.functions", - "window", - x@jc, windowDuration, windowDuration, startTime) - } else { - jc <- callJStatic("org.apache.spark.sql.functions", - "window", - x@jc, windowDuration) - } - column(jc) - }) - #' locate #' #' Locate the position of the first occurrence of substr. diff --git a/R/pkg/inst/tests/testthat/test_sparkSQL.R b/R/pkg/inst/tests/testthat/test_sparkSQL.R index 22eb3ec984673..eef365b42e56d 100644 --- a/R/pkg/inst/tests/testthat/test_sparkSQL.R +++ b/R/pkg/inst/tests/testthat/test_sparkSQL.R @@ -1204,42 +1204,6 @@ test_that("greatest() and least() on a DataFrame", { expect_equal(collect(select(df, least(df$a, df$b)))[, 1], c(1, 3)) }) -test_that("time windowing (window()) with all inputs", { - df <- createDataFrame(sqlContext, data.frame(t = c("2016-03-11 09:00:07"), v = c(1))) - df$window <- window(df$t, "5 seconds", "5 seconds", "0 seconds") - local <- collect(df)$v - # Not checking time windows because of possible time zone issues. Just checking that the function - # works - expect_equal(local, c(1)) -}) - -test_that("time windowing (window()) with slide duration", { - df <- createDataFrame(sqlContext, data.frame(t = c("2016-03-11 09:00:07"), v = c(1))) - df$window <- window(df$t, "5 seconds", "2 seconds") - local <- collect(df)$v - # Not checking time windows because of possible time zone issues. Just checking that the function - # works - expect_equal(local, c(1, 1)) -}) - -test_that("time windowing (window()) with start time", { - df <- createDataFrame(sqlContext, data.frame(t = c("2016-03-11 09:00:07"), v = c(1))) - df$window <- window(df$t, "5 seconds", startTime = "2 seconds") - local <- collect(df)$v - # Not checking time windows because of possible time zone issues. Just checking that the function - # works - expect_equal(local, c(1)) -}) - -test_that("time windowing (window()) with just window duration", { - df <- createDataFrame(sqlContext, data.frame(t = c("2016-03-11 09:00:07"), v = c(1))) - df$window <- window(df$t, "5 seconds") - local <- collect(df)$v - # Not checking time windows because of possible time zone issues. Just checking that the function - # works - expect_equal(local, c(1)) -}) - test_that("when(), otherwise() and ifelse() on a DataFrame", { l <- list(list(a = 1, b = 2), list(a = 3, b = 4)) df <- createDataFrame(sqlContext, l) From a59ad58fef8e8d72ad04fa75a625200e3ae02739 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Mon, 4 Apr 2016 15:43:05 -0700 Subject: [PATCH 06/13] smarter reflection --- .../spark/sql/catalyst/trees/TreeNode.scala | 23 +++++++++++++------ 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 6b7997e903a99..268ceed4a532a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -365,20 +365,29 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { * @param newArgs the new product arguments. */ def makeCopy(newArgs: Array[AnyRef]): BaseType = attachTree(this, "makeCopy") { + // Skip no-arg constructors that are just there for kryo. val ctors = getClass.getConstructors.filter(_.getParameterTypes.size != 0) if (ctors.isEmpty) { sys.error(s"No valid constructor for $nodeName") } - val defaultCtor = ctors.maxBy(_.getParameterTypes.size) + val allArgs: Array[AnyRef] = if (otherCopyArgs.isEmpty) { + newArgs + } else { + newArgs ++ otherCopyArgs + } + val defaultCtor = ctors.find { ctor => + if (ctor.getParameterTypes.length != allArgs.length) { + false + } else { + ctor.getParameterTypes.zip(allArgs.map(_.getClass)).forall { case (ctorParam, arg) => + ctorParam.isAssignableFrom(arg) + } + } + }.getOrElse(ctors.maxBy(_.getParameterTypes.length)) // fall back to older heuristic try { CurrentOrigin.withOrigin(origin) { - // Skip no-arg constructors that are just there for kryo. - if (otherCopyArgs.isEmpty) { - defaultCtor.newInstance(newArgs: _*).asInstanceOf[BaseType] - } else { - defaultCtor.newInstance((newArgs ++ otherCopyArgs).toArray: _*).asInstanceOf[BaseType] - } + defaultCtor.newInstance(allArgs.toArray: _*).asInstanceOf[BaseType] } } catch { case e: java.lang.IllegalArgumentException => From 71fa73f7b00336872aecbef8918bfefe2214e270 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Mon, 4 Apr 2016 17:12:35 -0700 Subject: [PATCH 07/13] use existing util method --- .../org/apache/spark/sql/catalyst/trees/TreeNode.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 268ceed4a532a..e429ce20ff834 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -22,6 +22,7 @@ import java.util.UUID import scala.collection.Map import scala.collection.mutable.Stack +import org.apache.commons.lang.ClassUtils import org.json4s.JsonAST._ import org.json4s.JsonDSL._ import org.json4s.jackson.JsonMethods._ @@ -379,9 +380,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { if (ctor.getParameterTypes.length != allArgs.length) { false } else { - ctor.getParameterTypes.zip(allArgs.map(_.getClass)).forall { case (ctorParam, arg) => - ctorParam.isAssignableFrom(arg) - } + val givenParams: Array[Class[_]] = allArgs.map(_.getClass) + ClassUtils.isAssignable(ctor.getParameterTypes, givenParams, true /* autoboxing */) } }.getOrElse(ctors.maxBy(_.getParameterTypes.length)) // fall back to older heuristic From c891d75c572e40b9f5ffd629ed9721d46b82091f Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Mon, 4 Apr 2016 17:26:14 -0700 Subject: [PATCH 08/13] address comments --- python/pyspark/sql/functions.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 567274a95114a..53c3df5f0d75a 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -1054,7 +1054,8 @@ def to_utc_timestamp(timestamp, tz): @since(2.0) -def window(col, windowDuration, slideDuration=None, startTime=None): +@ignore_unicode_prefix +def window(timeColumn, windowDuration, slideDuration=None, startTime=None): """Bucketize rows into one or more time windows given a timestamp specifying column. Window starts are inclusive but the window ends are exclusive, e.g. 12:05 will be in the window [12:05,12:10) but not in [12:00,12:05). Windows can support microsecond precision. Windows in @@ -1071,20 +1072,29 @@ def window(col, windowDuration, slideDuration=None, startTime=None): past the hour, e.g. 12:15-13:15, 13:15-14:15... provide `startTime` as `15 minutes`. The output column will be a struct called 'window' by default with the nested columns 'start' - and 'end'. + and 'end', where 'start' and 'end' will be of `TimestampType`. >>> df = sqlContext.createDataFrame([("2016-03-11 09:00:07", 1)]).toDF("date", "val") >>> w = df.groupBy(window("date", "5 seconds")).agg(sum("val").alias("sum")) >>> w.select(w.window.start.cast("string"), w.window.end.cast("string"), "sum").collect() - [Row(start="2016-03-11 09:00:05, end=2016-03-11 09:00:10, sum=1)] + [Row(start=u"2016-03-11 09:00:05", end=u"2016-03-11 09:00:10", sum=1)] """ + def check_string_field(field, fieldName): + if not field or type(field) is not str: + raise TypeError("%s should be provided as a string" % fieldName) + sc = SparkContext._active_spark_context - time_col = _to_java_column(col) + time_col = _to_java_column(timeColumn) + check_string_field(windowDuration, "windowDuration") if slideDuration and startTime: + check_string_field(slideDuration, "slideDuration") + check_string_field(startTime, "startTime") res = sc._jvm.functions.window(time_col, windowDuration, slideDuration, startTime) elif slideDuration: + check_string_field(slideDuration, "slideDuration") res = sc._jvm.functions.window(time_col, windowDuration, slideDuration) elif startTime: + check_string_field(startTime, "startTime") res = sc._jvm.functions.window(time_col, windowDuration, windowDuration, startTime) else: res = sc._jvm.functions.window(time_col, windowDuration) From 9a3a47f34e6797b1bfd8d0f3cb5429ba60608ab5 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Mon, 4 Apr 2016 19:32:27 -0700 Subject: [PATCH 09/13] should be fixed --- .../scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index e429ce20ff834..23f5dab22252c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -380,8 +380,8 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { if (ctor.getParameterTypes.length != allArgs.length) { false } else { - val givenParams: Array[Class[_]] = allArgs.map(_.getClass) - ClassUtils.isAssignable(ctor.getParameterTypes, givenParams, true /* autoboxing */) + val argsArray: Array[Class[_]] = allArgs.map(_.getClass) + ClassUtils.isAssignable(argsArray, ctor.getParameterTypes, true /* autoboxing */) } }.getOrElse(ctors.maxBy(_.getParameterTypes.length)) // fall back to older heuristic From 8dec7ca884ebd4bcfea9133e2cfa7232fe805e3d Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Mon, 4 Apr 2016 21:37:07 -0700 Subject: [PATCH 10/13] handle null --- .../scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index 23f5dab22252c..232ca4358865a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -379,6 +379,10 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product { val defaultCtor = ctors.find { ctor => if (ctor.getParameterTypes.length != allArgs.length) { false + } else if (allArgs.contains(null)) { + // if there is a `null`, we can't figure out the class, therefore we should just fallback + // to older heuristic + false } else { val argsArray: Array[Class[_]] = allArgs.map(_.getClass) ClassUtils.isAssignable(argsArray, ctor.getParameterTypes, true /* autoboxing */) From 891f4483e29a1f9d0d014bd48bb3a54b26d04524 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Mon, 4 Apr 2016 23:25:13 -0700 Subject: [PATCH 11/13] fix col name --- python/pyspark/sql/functions.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 53c3df5f0d75a..d658a7ef24b9e 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -1076,7 +1076,8 @@ def window(timeColumn, windowDuration, slideDuration=None, startTime=None): >>> df = sqlContext.createDataFrame([("2016-03-11 09:00:07", 1)]).toDF("date", "val") >>> w = df.groupBy(window("date", "5 seconds")).agg(sum("val").alias("sum")) - >>> w.select(w.window.start.cast("string"), w.window.end.cast("string"), "sum").collect() + >>> w.select(w.window.start.cast("string").alias("start"), + w.window.end.cast("string").alias("end"), "sum").collect() [Row(start=u"2016-03-11 09:00:05", end=u"2016-03-11 09:00:10", sum=1)] """ def check_string_field(field, fieldName): From 1bd7563ced8dca52f4156339d86b4d01535fdf58 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 5 Apr 2016 09:59:24 -0700 Subject: [PATCH 12/13] fix second line --- python/pyspark/sql/functions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index d658a7ef24b9e..759cc9325b987 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -1077,7 +1077,7 @@ def window(timeColumn, windowDuration, slideDuration=None, startTime=None): >>> df = sqlContext.createDataFrame([("2016-03-11 09:00:07", 1)]).toDF("date", "val") >>> w = df.groupBy(window("date", "5 seconds")).agg(sum("val").alias("sum")) >>> w.select(w.window.start.cast("string").alias("start"), - w.window.end.cast("string").alias("end"), "sum").collect() + ... w.window.end.cast("string").alias("end"), "sum").collect() [Row(start=u"2016-03-11 09:00:05", end=u"2016-03-11 09:00:10", sum=1)] """ def check_string_field(field, fieldName): From 68b9de2259621d3097b04c7670055229073710c5 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Tue, 5 Apr 2016 11:28:03 -0700 Subject: [PATCH 13/13] Update functions.py --- python/pyspark/sql/functions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py index 759cc9325b987..5017ab5b3646d 100644 --- a/python/pyspark/sql/functions.py +++ b/python/pyspark/sql/functions.py @@ -1078,7 +1078,7 @@ def window(timeColumn, windowDuration, slideDuration=None, startTime=None): >>> w = df.groupBy(window("date", "5 seconds")).agg(sum("val").alias("sum")) >>> w.select(w.window.start.cast("string").alias("start"), ... w.window.end.cast("string").alias("end"), "sum").collect() - [Row(start=u"2016-03-11 09:00:05", end=u"2016-03-11 09:00:10", sum=1)] + [Row(start=u'2016-03-11 09:00:05', end=u'2016-03-11 09:00:10', sum=1)] """ def check_string_field(field, fieldName): if not field or type(field) is not str: