diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
index 1ec0e717fac29..719ce5b9e3698 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala
@@ -37,14 +37,14 @@ private[spark] object PythonEvalType {
val SQL_BATCHED_UDF = 100
- val SQL_PANDAS_SCALAR_UDF = 200
- val SQL_PANDAS_GROUP_MAP_UDF = 201
+ val SQL_SCALAR_PANDAS_UDF = 200
+ val SQL_GROUPED_MAP_PANDAS_UDF = 201
def toString(pythonEvalType: Int): String = pythonEvalType match {
case NON_UDF => "NON_UDF"
case SQL_BATCHED_UDF => "SQL_BATCHED_UDF"
- case SQL_PANDAS_SCALAR_UDF => "SQL_PANDAS_SCALAR_UDF"
- case SQL_PANDAS_GROUP_MAP_UDF => "SQL_PANDAS_GROUP_MAP_UDF"
+ case SQL_SCALAR_PANDAS_UDF => "SQL_SCALAR_PANDAS_UDF"
+ case SQL_GROUPED_MAP_PANDAS_UDF => "SQL_GROUPED_MAP_PANDAS_UDF"
}
}
diff --git a/docs/sql-programming-guide.md b/docs/sql-programming-guide.md
index d49c8d869cba6..a0e221b39cc34 100644
--- a/docs/sql-programming-guide.md
+++ b/docs/sql-programming-guide.md
@@ -1684,7 +1684,7 @@ Spark will fall back to create the DataFrame without Arrow.
Pandas UDFs are user defined functions that are executed by Spark using Arrow to transfer data and
Pandas to work with the data. A Pandas UDF is defined using the keyword `pandas_udf` as a decorator
or to wrap the function, no additional configuration is required. Currently, there are two types of
-Pandas UDF: Scalar and Group Map.
+Pandas UDF: Scalar and Grouped Map.
### Scalar
@@ -1702,8 +1702,8 @@ The following example shows how to create a scalar Pandas UDF that computes the
-### Group Map
-Group map Pandas UDFs are used with `groupBy().apply()` which implements the "split-apply-combine" pattern.
+### Grouped Map
+Grouped map Pandas UDFs are used with `groupBy().apply()` which implements the "split-apply-combine" pattern.
Split-apply-combine consists of three steps:
* Split the data into groups by using `DataFrame.groupBy`.
* Apply a function on each group. The input and output of the function are both `pandas.DataFrame`. The
@@ -1723,7 +1723,7 @@ The following example shows how to use `groupby().apply()` to subtract the mean
-{% include_example group_map_pandas_udf python/sql/arrow.py %}
+{% include_example grouped_map_pandas_udf python/sql/arrow.py %}
diff --git a/examples/src/main/python/sql/arrow.py b/examples/src/main/python/sql/arrow.py
index 6c0028b3f1c1f..4c5aefb6ff4a6 100644
--- a/examples/src/main/python/sql/arrow.py
+++ b/examples/src/main/python/sql/arrow.py
@@ -86,15 +86,15 @@ def multiply_func(a, b):
# $example off:scalar_pandas_udf$
-def group_map_pandas_udf_example(spark):
- # $example on:group_map_pandas_udf$
+def grouped_map_pandas_udf_example(spark):
+ # $example on:grouped_map_pandas_udf$
from pyspark.sql.functions import pandas_udf, PandasUDFType
df = spark.createDataFrame(
[(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
("id", "v"))
- @pandas_udf("id long, v double", PandasUDFType.GROUP_MAP)
+ @pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP)
def substract_mean(pdf):
# pdf is a pandas.DataFrame
v = pdf.v
@@ -110,7 +110,7 @@ def substract_mean(pdf):
# | 2|-1.0|
# | 2| 4.0|
# +---+----+
- # $example off:group_map_pandas_udf$
+ # $example off:grouped_map_pandas_udf$
if __name__ == "__main__":
@@ -123,7 +123,7 @@ def substract_mean(pdf):
dataframe_with_arrow_example(spark)
print("Running pandas_udf scalar example")
scalar_pandas_udf_example(spark)
- print("Running pandas_udf group map example")
- group_map_pandas_udf_example(spark)
+ print("Running pandas_udf grouped map example")
+ grouped_map_pandas_udf_example(spark)
spark.stop()
diff --git a/python/pyspark/rdd.py b/python/pyspark/rdd.py
index 340bc3a6b7470..9a6010e87679c 100644
--- a/python/pyspark/rdd.py
+++ b/python/pyspark/rdd.py
@@ -68,8 +68,8 @@ class PythonEvalType(object):
SQL_BATCHED_UDF = 100
- SQL_PANDAS_SCALAR_UDF = 200
- SQL_PANDAS_GROUP_MAP_UDF = 201
+ SQL_SCALAR_PANDAS_UDF = 200
+ SQL_GROUPED_MAP_PANDAS_UDF = 201
def portable_hash(x):
diff --git a/python/pyspark/sql/functions.py b/python/pyspark/sql/functions.py
index 961b3267b44cf..b637707ac3cf5 100644
--- a/python/pyspark/sql/functions.py
+++ b/python/pyspark/sql/functions.py
@@ -1737,8 +1737,8 @@ def translate(srcCol, matching, replace):
def create_map(*cols):
"""Creates a new map column.
- :param cols: list of column names (string) or list of :class:`Column` expressions that grouped
- as key-value pairs, e.g. (key1, value1, key2, value2, ...).
+ :param cols: list of column names (string) or list of :class:`Column` expressions that are
+ grouped as key-value pairs, e.g. (key1, value1, key2, value2, ...).
>>> df.select(create_map('name', 'age').alias("map")).collect()
[Row(map={u'Alice': 2}), Row(map={u'Bob': 5})]
@@ -2085,9 +2085,9 @@ def map_values(col):
class PandasUDFType(object):
"""Pandas UDF Types. See :meth:`pyspark.sql.functions.pandas_udf`.
"""
- SCALAR = PythonEvalType.SQL_PANDAS_SCALAR_UDF
+ SCALAR = PythonEvalType.SQL_SCALAR_PANDAS_UDF
- GROUP_MAP = PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF
+ GROUPED_MAP = PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF
@since(1.3)
@@ -2191,20 +2191,20 @@ def pandas_udf(f=None, returnType=None, functionType=None):
Therefore, this can be used, for example, to ensure the length of each returned
`pandas.Series`, and can not be used as the column length.
- 2. GROUP_MAP
+ 2. GROUPED_MAP
- A group map UDF defines transformation: A `pandas.DataFrame` -> A `pandas.DataFrame`
+ A grouped map UDF defines transformation: A `pandas.DataFrame` -> A `pandas.DataFrame`
The returnType should be a :class:`StructType` describing the schema of the returned
`pandas.DataFrame`.
The length of the returned `pandas.DataFrame` can be arbitrary.
- Group map UDFs are used with :meth:`pyspark.sql.GroupedData.apply`.
+ Grouped map UDFs are used with :meth:`pyspark.sql.GroupedData.apply`.
>>> from pyspark.sql.functions import pandas_udf, PandasUDFType
>>> df = spark.createDataFrame(
... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
... ("id", "v")) # doctest: +SKIP
- >>> @pandas_udf("id long, v double", PandasUDFType.GROUP_MAP) # doctest: +SKIP
+ >>> @pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP) # doctest: +SKIP
... def normalize(pdf):
... v = pdf.v
... return pdf.assign(v=(v - v.mean()) / v.std())
@@ -2254,20 +2254,20 @@ def pandas_udf(f=None, returnType=None, functionType=None):
eval_type = returnType
else:
# @pandas_udf(dataType) or @pandas_udf(returnType=dataType)
- eval_type = PythonEvalType.SQL_PANDAS_SCALAR_UDF
+ eval_type = PythonEvalType.SQL_SCALAR_PANDAS_UDF
else:
return_type = returnType
if functionType is not None:
eval_type = functionType
else:
- eval_type = PythonEvalType.SQL_PANDAS_SCALAR_UDF
+ eval_type = PythonEvalType.SQL_SCALAR_PANDAS_UDF
if return_type is None:
raise ValueError("Invalid returnType: returnType can not be None")
- if eval_type not in [PythonEvalType.SQL_PANDAS_SCALAR_UDF,
- PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF]:
+ if eval_type not in [PythonEvalType.SQL_SCALAR_PANDAS_UDF,
+ PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF]:
raise ValueError("Invalid functionType: "
"functionType must be one the values from PandasUDFType")
diff --git a/python/pyspark/sql/group.py b/python/pyspark/sql/group.py
index 22061b83eb78c..330faf2addcbe 100644
--- a/python/pyspark/sql/group.py
+++ b/python/pyspark/sql/group.py
@@ -212,14 +212,14 @@ def apply(self, udf):
This function does not support partial aggregation, and requires shuffling all the data in
the :class:`DataFrame`.
- :param udf: a group map user-defined function returned by
- :meth:`pyspark.sql.functions.pandas_udf`.
+ :param udf: a grouped map user-defined function returned by
+ :func:`pyspark.sql.functions.pandas_udf`.
>>> from pyspark.sql.functions import pandas_udf, PandasUDFType
>>> df = spark.createDataFrame(
... [(1, 1.0), (1, 2.0), (2, 3.0), (2, 5.0), (2, 10.0)],
... ("id", "v"))
- >>> @pandas_udf("id long, v double", PandasUDFType.GROUP_MAP) # doctest: +SKIP
+ >>> @pandas_udf("id long, v double", PandasUDFType.GROUPED_MAP) # doctest: +SKIP
... def normalize(pdf):
... v = pdf.v
... return pdf.assign(v=(v - v.mean()) / v.std())
@@ -239,9 +239,9 @@ def apply(self, udf):
"""
# Columns are special because hasattr always return True
if isinstance(udf, Column) or not hasattr(udf, 'func') \
- or udf.evalType != PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF:
+ or udf.evalType != PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF:
raise ValueError("Invalid udf: the udf argument must be a pandas_udf of type "
- "GROUP_MAP.")
+ "GROUPED_MAP.")
df = self._df
udf_column = udf(*[df[col] for col in df.columns])
jdf = self._jgd.flatMapGroupsInPandas(udf_column._jc.expr())
diff --git a/python/pyspark/sql/tests.py b/python/pyspark/sql/tests.py
index a8e337d8cd69f..0d5bc137abf47 100644
--- a/python/pyspark/sql/tests.py
+++ b/python/pyspark/sql/tests.py
@@ -3621,34 +3621,34 @@ def test_pandas_udf_basic(self):
udf = pandas_udf(lambda x: x, DoubleType())
self.assertEqual(udf.returnType, DoubleType())
- self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
+ self.assertEqual(udf.evalType, PythonEvalType.SQL_SCALAR_PANDAS_UDF)
udf = pandas_udf(lambda x: x, DoubleType(), PandasUDFType.SCALAR)
self.assertEqual(udf.returnType, DoubleType())
- self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
+ self.assertEqual(udf.evalType, PythonEvalType.SQL_SCALAR_PANDAS_UDF)
udf = pandas_udf(lambda x: x, 'double', PandasUDFType.SCALAR)
self.assertEqual(udf.returnType, DoubleType())
- self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
+ self.assertEqual(udf.evalType, PythonEvalType.SQL_SCALAR_PANDAS_UDF)
udf = pandas_udf(lambda x: x, StructType([StructField("v", DoubleType())]),
- PandasUDFType.GROUP_MAP)
+ PandasUDFType.GROUPED_MAP)
self.assertEqual(udf.returnType, StructType([StructField("v", DoubleType())]))
- self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
+ self.assertEqual(udf.evalType, PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF)
- udf = pandas_udf(lambda x: x, 'v double', PandasUDFType.GROUP_MAP)
+ udf = pandas_udf(lambda x: x, 'v double', PandasUDFType.GROUPED_MAP)
self.assertEqual(udf.returnType, StructType([StructField("v", DoubleType())]))
- self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
+ self.assertEqual(udf.evalType, PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF)
udf = pandas_udf(lambda x: x, 'v double',
- functionType=PandasUDFType.GROUP_MAP)
+ functionType=PandasUDFType.GROUPED_MAP)
self.assertEqual(udf.returnType, StructType([StructField("v", DoubleType())]))
- self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
+ self.assertEqual(udf.evalType, PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF)
udf = pandas_udf(lambda x: x, returnType='v double',
- functionType=PandasUDFType.GROUP_MAP)
+ functionType=PandasUDFType.GROUPED_MAP)
self.assertEqual(udf.returnType, StructType([StructField("v", DoubleType())]))
- self.assertEqual(udf.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
+ self.assertEqual(udf.evalType, PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF)
def test_pandas_udf_decorator(self):
from pyspark.rdd import PythonEvalType
@@ -3659,45 +3659,45 @@ def test_pandas_udf_decorator(self):
def foo(x):
return x
self.assertEqual(foo.returnType, DoubleType())
- self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
+ self.assertEqual(foo.evalType, PythonEvalType.SQL_SCALAR_PANDAS_UDF)
@pandas_udf(returnType=DoubleType())
def foo(x):
return x
self.assertEqual(foo.returnType, DoubleType())
- self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
+ self.assertEqual(foo.evalType, PythonEvalType.SQL_SCALAR_PANDAS_UDF)
schema = StructType([StructField("v", DoubleType())])
- @pandas_udf(schema, PandasUDFType.GROUP_MAP)
+ @pandas_udf(schema, PandasUDFType.GROUPED_MAP)
def foo(x):
return x
self.assertEqual(foo.returnType, schema)
- self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
+ self.assertEqual(foo.evalType, PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF)
- @pandas_udf('v double', PandasUDFType.GROUP_MAP)
+ @pandas_udf('v double', PandasUDFType.GROUPED_MAP)
def foo(x):
return x
self.assertEqual(foo.returnType, schema)
- self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
+ self.assertEqual(foo.evalType, PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF)
- @pandas_udf(schema, functionType=PandasUDFType.GROUP_MAP)
+ @pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def foo(x):
return x
self.assertEqual(foo.returnType, schema)
- self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
+ self.assertEqual(foo.evalType, PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF)
@pandas_udf(returnType='v double', functionType=PandasUDFType.SCALAR)
def foo(x):
return x
self.assertEqual(foo.returnType, schema)
- self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
+ self.assertEqual(foo.evalType, PythonEvalType.SQL_SCALAR_PANDAS_UDF)
- @pandas_udf(returnType=schema, functionType=PandasUDFType.GROUP_MAP)
+ @pandas_udf(returnType=schema, functionType=PandasUDFType.GROUPED_MAP)
def foo(x):
return x
self.assertEqual(foo.returnType, schema)
- self.assertEqual(foo.evalType, PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF)
+ self.assertEqual(foo.evalType, PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF)
def test_udf_wrong_arg(self):
from pyspark.sql.functions import pandas_udf, PandasUDFType
@@ -3724,15 +3724,15 @@ def zero_with_type():
return 1
with self.assertRaisesRegexp(TypeError, 'Invalid returnType'):
- @pandas_udf(returnType=PandasUDFType.GROUP_MAP)
+ @pandas_udf(returnType=PandasUDFType.GROUPED_MAP)
def foo(df):
return df
with self.assertRaisesRegexp(ValueError, 'Invalid returnType'):
- @pandas_udf(returnType='double', functionType=PandasUDFType.GROUP_MAP)
+ @pandas_udf(returnType='double', functionType=PandasUDFType.GROUPED_MAP)
def foo(df):
return df
with self.assertRaisesRegexp(ValueError, 'Invalid function'):
- @pandas_udf(returnType='k int, v double', functionType=PandasUDFType.GROUP_MAP)
+ @pandas_udf(returnType='k int, v double', functionType=PandasUDFType.GROUPED_MAP)
def foo(k, v):
return k
@@ -3804,11 +3804,11 @@ def test_register_nondeterministic_vectorized_udf_basic(self):
random_pandas_udf = pandas_udf(
lambda x: random.randint(6, 6) + x, IntegerType()).asNondeterministic()
self.assertEqual(random_pandas_udf.deterministic, False)
- self.assertEqual(random_pandas_udf.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
+ self.assertEqual(random_pandas_udf.evalType, PythonEvalType.SQL_SCALAR_PANDAS_UDF)
nondeterministic_pandas_udf = self.spark.catalog.registerFunction(
"randomPandasUDF", random_pandas_udf)
self.assertEqual(nondeterministic_pandas_udf.deterministic, False)
- self.assertEqual(nondeterministic_pandas_udf.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
+ self.assertEqual(nondeterministic_pandas_udf.evalType, PythonEvalType.SQL_SCALAR_PANDAS_UDF)
[row] = self.spark.sql("SELECT randomPandasUDF(1)").collect()
self.assertEqual(row[0], 7)
@@ -4206,7 +4206,7 @@ def test_register_vectorized_udf_basic(self):
col('id').cast('int').alias('b'))
original_add = pandas_udf(lambda x, y: x + y, IntegerType())
self.assertEqual(original_add.deterministic, True)
- self.assertEqual(original_add.evalType, PythonEvalType.SQL_PANDAS_SCALAR_UDF)
+ self.assertEqual(original_add.evalType, PythonEvalType.SQL_SCALAR_PANDAS_UDF)
new_add = self.spark.catalog.registerFunction("add1", original_add)
res1 = df.select(new_add(col('a'), col('b')))
res2 = self.spark.sql(
@@ -4243,20 +4243,20 @@ def test_simple(self):
StructField('v', IntegerType()),
StructField('v1', DoubleType()),
StructField('v2', LongType())]),
- PandasUDFType.GROUP_MAP
+ PandasUDFType.GROUPED_MAP
)
result = df.groupby('id').apply(foo_udf).sort('id').toPandas()
expected = df.toPandas().groupby('id').apply(foo_udf.func).reset_index(drop=True)
self.assertFramesEqual(expected, result)
- def test_register_group_map_udf(self):
+ def test_register_grouped_map_udf(self):
from pyspark.sql.functions import pandas_udf, PandasUDFType
- foo_udf = pandas_udf(lambda x: x, "id long", PandasUDFType.GROUP_MAP)
+ foo_udf = pandas_udf(lambda x: x, "id long", PandasUDFType.GROUPED_MAP)
with QuietTest(self.sc):
with self.assertRaisesRegexp(ValueError, 'f must be either SQL_BATCHED_UDF or '
- 'SQL_PANDAS_SCALAR_UDF'):
+ 'SQL_SCALAR_PANDAS_UDF'):
self.spark.catalog.registerFunction("foo_udf", foo_udf)
def test_decorator(self):
@@ -4265,7 +4265,7 @@ def test_decorator(self):
@pandas_udf(
'id long, v int, v1 double, v2 long',
- PandasUDFType.GROUP_MAP
+ PandasUDFType.GROUPED_MAP
)
def foo(pdf):
return pdf.assign(v1=pdf.v * pdf.id * 1.0, v2=pdf.v + pdf.id)
@@ -4281,7 +4281,7 @@ def test_coerce(self):
foo = pandas_udf(
lambda pdf: pdf,
'id long, v double',
- PandasUDFType.GROUP_MAP
+ PandasUDFType.GROUPED_MAP
)
result = df.groupby('id').apply(foo).sort('id').toPandas()
@@ -4295,7 +4295,7 @@ def test_complex_groupby(self):
@pandas_udf(
'id long, v int, norm double',
- PandasUDFType.GROUP_MAP
+ PandasUDFType.GROUPED_MAP
)
def normalize(pdf):
v = pdf.v
@@ -4314,7 +4314,7 @@ def test_empty_groupby(self):
@pandas_udf(
'id long, v int, norm double',
- PandasUDFType.GROUP_MAP
+ PandasUDFType.GROUPED_MAP
)
def normalize(pdf):
v = pdf.v
@@ -4334,7 +4334,7 @@ def test_datatype_string(self):
foo_udf = pandas_udf(
lambda pdf: pdf.assign(v1=pdf.v * pdf.id * 1.0, v2=pdf.v + pdf.id),
'id long, v int, v1 double, v2 long',
- PandasUDFType.GROUP_MAP
+ PandasUDFType.GROUPED_MAP
)
result = df.groupby('id').apply(foo_udf).sort('id').toPandas()
@@ -4348,7 +4348,7 @@ def test_wrong_return_type(self):
foo = pandas_udf(
lambda pdf: pdf,
'id long, v map',
- PandasUDFType.GROUP_MAP
+ PandasUDFType.GROUPED_MAP
)
with QuietTest(self.sc):
@@ -4374,7 +4374,7 @@ def test_wrong_args(self):
with self.assertRaisesRegexp(ValueError, 'Invalid udf'):
df.groupby('id').apply(
pandas_udf(lambda x, y: x, StructType([StructField("d", DoubleType())])))
- with self.assertRaisesRegexp(ValueError, 'Invalid udf.*GROUP_MAP'):
+ with self.assertRaisesRegexp(ValueError, 'Invalid udf.*GROUPED_MAP'):
df.groupby('id').apply(
pandas_udf(lambda x, y: x, StructType([StructField("d", DoubleType())]),
PandasUDFType.SCALAR))
@@ -4385,7 +4385,7 @@ def test_unsupported_types(self):
[StructField("id", LongType(), True),
StructField("map", MapType(StringType(), IntegerType()), True)])
df = self.spark.createDataFrame([(1, None,)], schema=schema)
- f = pandas_udf(lambda x: x, df.schema, PandasUDFType.GROUP_MAP)
+ f = pandas_udf(lambda x: x, df.schema, PandasUDFType.GROUPED_MAP)
with QuietTest(self.sc):
with self.assertRaisesRegexp(Exception, 'Unsupported data type'):
df.groupby('id').apply(f).collect()
diff --git a/python/pyspark/sql/udf.py b/python/pyspark/sql/udf.py
index 07952558a036e..82a28c859525c 100644
--- a/python/pyspark/sql/udf.py
+++ b/python/pyspark/sql/udf.py
@@ -36,24 +36,24 @@ def _wrap_function(sc, func, returnType):
def _create_udf(f, returnType, evalType):
- if evalType == PythonEvalType.SQL_PANDAS_SCALAR_UDF or \
- evalType == PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF:
+ if evalType in (PythonEvalType.SQL_SCALAR_PANDAS_UDF,
+ PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF):
import inspect
from pyspark.sql.utils import require_minimum_pyarrow_version
require_minimum_pyarrow_version()
argspec = inspect.getargspec(f)
- if evalType == PythonEvalType.SQL_PANDAS_SCALAR_UDF and len(argspec.args) == 0 and \
+ if evalType == PythonEvalType.SQL_SCALAR_PANDAS_UDF and len(argspec.args) == 0 and \
argspec.varargs is None:
raise ValueError(
"Invalid function: 0-arg pandas_udfs are not supported. "
"Instead, create a 1-arg pandas_udf and ignore the arg in your function."
)
- if evalType == PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF and len(argspec.args) != 1:
+ if evalType == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF and len(argspec.args) != 1:
raise ValueError(
- "Invalid function: pandas_udfs with function type GROUP_MAP "
+ "Invalid function: pandas_udfs with function type GROUPED_MAP "
"must take a single arg that is a pandas DataFrame."
)
@@ -109,10 +109,10 @@ def returnType(self):
else:
self._returnType_placeholder = _parse_datatype_string(self._returnType)
- if self.evalType == PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF \
+ if self.evalType == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF \
and not isinstance(self._returnType_placeholder, StructType):
raise ValueError("Invalid returnType: returnType must be a StructType for "
- "pandas_udf with function type GROUP_MAP")
+ "pandas_udf with function type GROUPED_MAP")
return self._returnType_placeholder
@@ -285,9 +285,9 @@ def register(self, name, f, returnType=None):
"Invalid returnType: data type can not be specified when f is"
"a user-defined function, but got %s." % returnType)
if f.evalType not in [PythonEvalType.SQL_BATCHED_UDF,
- PythonEvalType.SQL_PANDAS_SCALAR_UDF]:
+ PythonEvalType.SQL_SCALAR_PANDAS_UDF]:
raise ValueError(
- "Invalid f: f must be either SQL_BATCHED_UDF or SQL_PANDAS_SCALAR_UDF")
+ "Invalid f: f must be either SQL_BATCHED_UDF or SQL_SCALAR_PANDAS_UDF")
register_udf = UserDefinedFunction(f.func, returnType=f.returnType, name=name,
evalType=f.evalType,
deterministic=f.deterministic)
diff --git a/python/pyspark/worker.py b/python/pyspark/worker.py
index e6737ae1c1285..8c41c3d1d6183 100644
--- a/python/pyspark/worker.py
+++ b/python/pyspark/worker.py
@@ -74,7 +74,7 @@ def wrap_udf(f, return_type):
return lambda *a: f(*a)
-def wrap_pandas_scalar_udf(f, return_type):
+def wrap_scalar_pandas_udf(f, return_type):
arrow_return_type = to_arrow_type(return_type)
def verify_result_length(*a):
@@ -90,7 +90,7 @@ def verify_result_length(*a):
return lambda *a: (verify_result_length(*a), arrow_return_type)
-def wrap_pandas_group_map_udf(f, return_type):
+def wrap_grouped_map_pandas_udf(f, return_type):
def wrapped(*series):
import pandas as pd
@@ -122,10 +122,10 @@ def read_single_udf(pickleSer, infile, eval_type):
row_func = chain(row_func, f)
# the last returnType will be the return type of UDF
- if eval_type == PythonEvalType.SQL_PANDAS_SCALAR_UDF:
- return arg_offsets, wrap_pandas_scalar_udf(row_func, return_type)
- elif eval_type == PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF:
- return arg_offsets, wrap_pandas_group_map_udf(row_func, return_type)
+ if eval_type == PythonEvalType.SQL_SCALAR_PANDAS_UDF:
+ return arg_offsets, wrap_scalar_pandas_udf(row_func, return_type)
+ elif eval_type == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF:
+ return arg_offsets, wrap_grouped_map_pandas_udf(row_func, return_type)
else:
return arg_offsets, wrap_udf(row_func, return_type)
@@ -148,8 +148,8 @@ def read_udfs(pickleSer, infile, eval_type):
func = lambda _, it: map(mapper, it)
- if eval_type == PythonEvalType.SQL_PANDAS_SCALAR_UDF \
- or eval_type == PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF:
+ if eval_type in (PythonEvalType.SQL_SCALAR_PANDAS_UDF,
+ PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF):
timezone = utf8_deserializer.loads(infile)
ser = ArrowStreamPandasSerializer(timezone)
else:
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
index a009c00b0abc5..4e0785a19de34 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/RelationalGroupedDataset.scala
@@ -450,8 +450,8 @@ class RelationalGroupedDataset protected[sql](
* workers.
*/
private[sql] def flatMapGroupsInPandas(expr: PythonUDF): DataFrame = {
- require(expr.evalType == PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF,
- "Must pass a group map udf")
+ require(expr.evalType == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF,
+ "Must pass a grouped map udf")
require(expr.dataType.isInstanceOf[StructType],
"The returnType of the udf must be a StructType")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
index 47b146f076b62..c4de214679ae4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ArrowEvalPythonExec.scala
@@ -81,7 +81,7 @@ case class ArrowEvalPythonExec(udfs: Seq[PythonUDF], output: Seq[Attribute], chi
val columnarBatchIter = new ArrowPythonRunner(
funcs, bufferSize, reuseWorker,
- PythonEvalType.SQL_PANDAS_SCALAR_UDF, argOffsets, schema,
+ PythonEvalType.SQL_SCALAR_PANDAS_UDF, argOffsets, schema,
sessionLocalTimeZone, pandasRespectSessionTimeZone)
.compute(batchIter, context.partitionId(), context)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
index e6616d013cba6..78521526476b9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala
@@ -152,7 +152,7 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
if (validUdfs.nonEmpty) {
require(validUdfs.forall(udf =>
udf.evalType == PythonEvalType.SQL_BATCHED_UDF ||
- udf.evalType == PythonEvalType.SQL_PANDAS_SCALAR_UDF
+ udf.evalType == PythonEvalType.SQL_SCALAR_PANDAS_UDF
), "Can only extract scalar vectorized udf or sql batch udf")
val resultAttrs = udfs.zipWithIndex.map { case (u, i) =>
@@ -160,7 +160,7 @@ object ExtractPythonUDFs extends Rule[SparkPlan] with PredicateHelper {
}
val evaluation = validUdfs.partition(
- _.evalType == PythonEvalType.SQL_PANDAS_SCALAR_UDF
+ _.evalType == PythonEvalType.SQL_SCALAR_PANDAS_UDF
) match {
case (vectorizedUdfs, plainUdfs) if plainUdfs.isEmpty =>
ArrowEvalPythonExec(vectorizedUdfs, child.output ++ resultAttrs, child)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
index 59db66bd7adf1..c798fe5a92c54 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/python/FlatMapGroupsInPandasExec.scala
@@ -96,7 +96,7 @@ case class FlatMapGroupsInPandasExec(
val columnarBatchIter = new ArrowPythonRunner(
chainedFunc, bufferSize, reuseWorker,
- PythonEvalType.SQL_PANDAS_GROUP_MAP_UDF, argOffsets, schema,
+ PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF, argOffsets, schema,
sessionLocalTimeZone, pandasRespectSessionTimeZone)
.compute(grouped, context.partitionId(), context)