From f53b94c6d6a30b3c3b4a27b5e51ca7670ccddb75 Mon Sep 17 00:00:00 2001 From: Don Drake Date: Tue, 10 Feb 2015 22:45:51 -0600 Subject: [PATCH 1/8] fix for SPARK-5722 infer long type in python similar to Java long --- python/pyspark/sql.py | 4 ++++ python/pyspark/tests.py | 16 +++++++++++++++- 2 files changed, 19 insertions(+), 1 deletion(-) diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index ae288471b0e51..391ef0fa3e1f9 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -605,6 +605,10 @@ def _infer_type(obj): dataType = _type_mappings.get(type(obj)) if dataType is not None: + # Conform to Java int/long sizes SPARK-5722 + if dataType == IntegerType: + if obj > 2**31 - 1 or obj < -2**31: + dataType = LongType return dataType() if isinstance(obj, dict): diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index bca52a7ce6d58..ffab68db0d789 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -51,7 +51,7 @@ CloudPickleSerializer, CompressedSerializer from pyspark.shuffle import Aggregator, InMemoryMerger, ExternalMerger, ExternalSorter from pyspark.sql import SQLContext, IntegerType, Row, ArrayType, StructType, StructField, \ - UserDefinedType, DoubleType + UserDefinedType, DoubleType, LongType, _infer_type from pyspark import shuffle _have_scipy = False @@ -923,6 +923,20 @@ def test_infer_schema(self): result = self.sqlCtx.sql("SELECT l[0].a from test2 where d['key'].d = '2'") self.assertEqual(1, result.first()[0]) + def test_infer_long_type(self): + longrow = [Row(f1='a', f2=100000000000000)] + lrdd = self.sc.parallelize(longrow) + slrdd = self.sqlCtx.inferSchema(lrdd) + self.assertEqual(slrdd.schema().fields[1].dataType, LongType()) + + self.assertEqual(_infer_type(1), IntegerType()) + self.assertEqual(_infer_type(2**10), IntegerType()) + self.assertEqual(_infer_type(2**20), IntegerType()) + self.assertEqual(_infer_type(2**31 - 1), IntegerType()) + self.assertEqual(_infer_type(2**31), LongType()) + self.assertEqual(_infer_type(2**61), LongType()) + self.assertEqual(_infer_type(2**71), LongType()) + def test_convert_row_to_dict(self): row = Row(l=[Row(a=1, b='s')], d={"key": Row(c=1.0, d="2")}) self.assertEqual(1, row.asDict()['l'][0].a) From c07e6f2fc82eec2dc3b85ef59a5a081d304533fa Mon Sep 17 00:00:00 2001 From: Don Drake Date: Wed, 11 Feb 2015 13:22:25 -0600 Subject: [PATCH 2/8] added comment for SPARK-5722 --- python/pyspark/sql.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index 391ef0fa3e1f9..f19722369668e 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -606,6 +606,9 @@ def _infer_type(obj): dataType = _type_mappings.get(type(obj)) if dataType is not None: # Conform to Java int/long sizes SPARK-5722 + # Inference is usually done on a sample of the dataset + # so, if values that should be Long do not appear in + # the sample, the dataType will be chosen as IntegerType if dataType == IntegerType: if obj > 2**31 - 1 or obj < -2**31: dataType = LongType From 16d63f473dd211c9cd2ae0fa4177c7ea42ba3376 Mon Sep 17 00:00:00 2001 From: Don Drake Date: Wed, 11 Feb 2015 14:07:11 -0600 Subject: [PATCH 3/8] added a saveAsParquetFile() as part of test for SPARK-5722 --- python/pyspark/tests.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index ffab68db0d789..d4726e2e672aa 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -929,6 +929,13 @@ def test_infer_long_type(self): slrdd = self.sqlCtx.inferSchema(lrdd) self.assertEqual(slrdd.schema().fields[1].dataType, LongType()) + # this saving as Parquet caused issues as well. + output_dir = os.path.join(self.tempdir.name, "infer_long_type") + slrdd.saveAsParquetFile(output_dir) + df1 = self.sqlCtx.parquetFile(output_dir) + self.assertEquals('a', df1.first().f1) + self.assertEquals(100000000000000, df1.first().f2) + self.assertEqual(_infer_type(1), IntegerType()) self.assertEqual(_infer_type(2**10), IntegerType()) self.assertEqual(_infer_type(2**20), IntegerType()) From 09be060c415befd6fadd65b988b93447dae64b93 Mon Sep 17 00:00:00 2001 From: Don Drake Date: Wed, 11 Feb 2015 14:12:47 -0600 Subject: [PATCH 4/8] added saveAsParquetFile() for SPARK-5722 --- python/pyspark/tests.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index ffab68db0d789..d4726e2e672aa 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -929,6 +929,13 @@ def test_infer_long_type(self): slrdd = self.sqlCtx.inferSchema(lrdd) self.assertEqual(slrdd.schema().fields[1].dataType, LongType()) + # this saving as Parquet caused issues as well. + output_dir = os.path.join(self.tempdir.name, "infer_long_type") + slrdd.saveAsParquetFile(output_dir) + df1 = self.sqlCtx.parquetFile(output_dir) + self.assertEquals('a', df1.first().f1) + self.assertEquals(100000000000000, df1.first().f2) + self.assertEqual(_infer_type(1), IntegerType()) self.assertEqual(_infer_type(2**10), IntegerType()) self.assertEqual(_infer_type(2**20), IntegerType()) From 41f363a3e4270f9d855facb08b0ee51700c78a2a Mon Sep 17 00:00:00 2001 From: Don Drake Date: Wed, 11 Feb 2015 20:46:23 -0600 Subject: [PATCH 5/8] after dialog on PR, just infer all int's to long's. --- python/pyspark/sql.py | 12 ++++-------- python/pyspark/tests.py | 9 +++++---- 2 files changed, 9 insertions(+), 12 deletions(-) diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index f19722369668e..469421605e6c6 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -574,10 +574,13 @@ def _parse_datatype_json_value(json_value): # Mapping Python types to Spark SQL DataType +# int -> LongType below so we can do not have to deal with +# the differences between Java int and Python ints when +# inferring data types. SPARK-5722 _type_mappings = { type(None): NullType, bool: BooleanType, - int: IntegerType, + int: LongType, long: LongType, float: DoubleType, str: StringType, @@ -605,13 +608,6 @@ def _infer_type(obj): dataType = _type_mappings.get(type(obj)) if dataType is not None: - # Conform to Java int/long sizes SPARK-5722 - # Inference is usually done on a sample of the dataset - # so, if values that should be Long do not appear in - # the sample, the dataType will be chosen as IntegerType - if dataType == IntegerType: - if obj > 2**31 - 1 or obj < -2**31: - dataType = LongType return dataType() if isinstance(obj, dict): diff --git a/python/pyspark/tests.py b/python/pyspark/tests.py index d4726e2e672aa..159debfff22f6 100644 --- a/python/pyspark/tests.py +++ b/python/pyspark/tests.py @@ -923,6 +923,7 @@ def test_infer_schema(self): result = self.sqlCtx.sql("SELECT l[0].a from test2 where d['key'].d = '2'") self.assertEqual(1, result.first()[0]) + # SPARK-5722 def test_infer_long_type(self): longrow = [Row(f1='a', f2=100000000000000)] lrdd = self.sc.parallelize(longrow) @@ -936,10 +937,10 @@ def test_infer_long_type(self): self.assertEquals('a', df1.first().f1) self.assertEquals(100000000000000, df1.first().f2) - self.assertEqual(_infer_type(1), IntegerType()) - self.assertEqual(_infer_type(2**10), IntegerType()) - self.assertEqual(_infer_type(2**20), IntegerType()) - self.assertEqual(_infer_type(2**31 - 1), IntegerType()) + self.assertEqual(_infer_type(1), LongType()) + self.assertEqual(_infer_type(2**10), LongType()) + self.assertEqual(_infer_type(2**20), LongType()) + self.assertEqual(_infer_type(2**31 - 1), LongType()) self.assertEqual(_infer_type(2**31), LongType()) self.assertEqual(_infer_type(2**61), LongType()) self.assertEqual(_infer_type(2**71), LongType()) From 0d2bcd2517f8f81512f1ef400577af47b465d2a1 Mon Sep 17 00:00:00 2001 From: Don Drake Date: Thu, 12 Feb 2015 08:26:55 -0600 Subject: [PATCH 6/8] PEP 8 fix --- python/pyspark/sql.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index 469421605e6c6..78a9af0c7a93d 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -575,7 +575,7 @@ def _parse_datatype_json_value(json_value): # Mapping Python types to Spark SQL DataType # int -> LongType below so we can do not have to deal with -# the differences between Java int and Python ints when +# the differences between Java int and Python ints when # inferring data types. SPARK-5722 _type_mappings = { type(None): NullType, From 97cf704030636b7edee36756352e6ea36a766391 Mon Sep 17 00:00:00 2001 From: Don Drake Date: Mon, 16 Feb 2015 22:09:51 -0600 Subject: [PATCH 7/8] ensure LongType's are converted to Long --- .../main/scala/org/apache/spark/sql/execution/pythonUdfs.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala index 2b4a88d5e864e..efca1de3dbd6c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUdfs.scala @@ -139,6 +139,8 @@ object EvaluatePython { case (dec: BigDecimal, dt: DecimalType) => dec.underlying() // Pyrolite can handle BigDecimal + case (_, LongType) => obj.asInstanceOf[Long] + // Pyrolite can handle Timestamp case (other, _) => other } From 69ce6d0cc8f42a12edd4e0bf0ab2df249f499af5 Mon Sep 17 00:00:00 2001 From: Don Drake Date: Mon, 16 Feb 2015 22:10:41 -0600 Subject: [PATCH 8/8] LongType's are now converted --- python/pyspark/sql.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/python/pyspark/sql.py b/python/pyspark/sql.py index 78a9af0c7a93d..d806e98baa4de 100644 --- a/python/pyspark/sql.py +++ b/python/pyspark/sql.py @@ -684,6 +684,8 @@ def _need_python_to_sql_conversion(dataType): _need_python_to_sql_conversion(dataType.valueType) elif isinstance(dataType, UserDefinedType): return True + elif isinstance(dataType, LongType): + return True else: return False @@ -737,6 +739,8 @@ def converter(obj): return lambda m: dict([(key_converter(k), value_converter(v)) for k, v in m.items()]) elif isinstance(dataType, UserDefinedType): return lambda obj: dataType.serialize(obj) + elif isinstance(dataType, LongType): + return lambda x: long(x) else: raise ValueError("Unexpected type %r" % dataType) @@ -929,11 +933,11 @@ def _infer_schema_type(obj, dataType): >>> schema = _parse_schema_abstract("a b c d") >>> row = (1, 1.0, "str", datetime.date(2014, 10, 10)) >>> _infer_schema_type(row, schema) - StructType...IntegerType...DoubleType...StringType...DateType... + StructType...LongType...DoubleType...StringType...DateType... >>> row = [[1], {"key": (1, 2.0)}] >>> schema = _parse_schema_abstract("a[] b{c d}") >>> _infer_schema_type(row, schema) - StructType...a,ArrayType...b,MapType(StringType,...c,IntegerType... + StructType...a,ArrayType...b,MapType(StringType,...c,LongType... """ if dataType is None: return _infer_type(obj)