From ad57a051d7d107c2a2bf92e48d14f63544f9e5ab Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Sat, 23 May 2020 12:52:36 +0300 Subject: [PATCH 01/11] Add test --- .../datasources/parquet/ParquetIOSuite.scala | 41 ++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 87b4db3fe087a..98e8aed1af381 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -45,7 +45,7 @@ import org.apache.spark.{SPARK_VERSION_SHORT, SparkException, SparkUpgradeExcept import org.apache.spark.sql._ import org.apache.spark.sql.catalyst.{InternalRow, ScalaReflection} import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, UnsafeRow} -import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils} import org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf @@ -883,6 +883,45 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } + test("...: generate test files for checking compatibility with Spark 2.4") { + val baseDir = "/Users/maximgekk/tmp/parquet_compat/" + val N = 8 + def save( + in: Seq[(String, String)], + t: String, + subdir: String, + options: Map[String, String] = Map.empty): Unit = { + in.toDF("dict", "plain") + .select($"dict".cast(t), $"plain".cast(t)) + .repartition(1) + .write + .mode("overwrite") + .options(options) + .parquet(baseDir + subdir) + } + DateTimeTestUtils.withDefaultTimeZone(DateTimeTestUtils.LA) { + withSQLConf( + SQLConf.SESSION_LOCAL_TIMEZONE.key -> DateTimeTestUtils.LA.getId, + SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> "CORRECTED") { + save((1 to N).map(i => ("1000-01-01", s"1000-01-$i")), "date", "dates") + withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "TIMESTAMP_MILLIS") { + save( + (1 to N).map(i => ("1001-01-01 01:02:03.123", s"1001-01-$i 01:02:03.123")), + "timestamp", + "ts_millis") + } + val usTs = (1 to N).map(i => ("1001-01-01 01:02:03.123456", s"1001-01-$i 01:02:03.123456")) + withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "TIMESTAMP_MICROS") { + save(usTs, "timestamp", "ts_micros") + } + withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "INT96") { + save(usTs, "timestamp", "ts_int96_plain", Map("parquet.enable.dictionary" -> "false")) + save(usTs, "timestamp", "ts_int96_dict", Map("parquet.enable.dictionary" -> "true")) + } + } + } + } + test("SPARK-31159: compatibility with Spark 2.4 in reading dates/timestamps") { // test reading the existing 2.4 files and new 3.0 files (with rebase on/off) together. def checkReadMixedFiles(fileName: String, dt: String, dataStr: String): Unit = { From 30dd3b3df7dc0a853cdefd96f7a473de2ad08042 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Sat, 23 May 2020 19:42:28 +0300 Subject: [PATCH 02/11] Update test gen --- .../datasources/parquet/ParquetIOSuite.scala | 47 +++++++++++++------ 1 file changed, 33 insertions(+), 14 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 98e8aed1af381..f92bf1a8a6135 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -17,6 +17,7 @@ package org.apache.spark.sql.execution.datasources.parquet +import java.nio.file.{Files, Paths, StandardCopyOption} import java.sql.{Date, Timestamp} import java.time._ import java.util.Locale @@ -884,39 +885,57 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } test("...: generate test files for checking compatibility with Spark 2.4") { - val baseDir = "/Users/maximgekk/tmp/parquet_compat/" + val resourceDir = "/Users/maximgekk/tmp/parquet_compat/" + val version = "2_4_5" val N = 8 def save( in: Seq[(String, String)], t: String, - subdir: String, + dstFile: String, options: Map[String, String] = Map.empty): Unit = { - in.toDF("dict", "plain") - .select($"dict".cast(t), $"plain".cast(t)) - .repartition(1) - .write - .mode("overwrite") - .options(options) - .parquet(baseDir + subdir) + withTempDir { dir => + in.toDF("dict", "plain") + .select($"dict".cast(t), $"plain".cast(t)) + .repartition(1) + .write + .mode("overwrite") + .options(options) + .parquet(dir.getCanonicalPath) + Files.copy( + dir.listFiles().filter(_.getName.endsWith(".snappy.parquet")).head.toPath, + Paths.get(resourceDir, dstFile), + StandardCopyOption.REPLACE_EXISTING) + } } DateTimeTestUtils.withDefaultTimeZone(DateTimeTestUtils.LA) { withSQLConf( SQLConf.SESSION_LOCAL_TIMEZONE.key -> DateTimeTestUtils.LA.getId, SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> "CORRECTED") { - save((1 to N).map(i => ("1000-01-01", s"1000-01-$i")), "date", "dates") + save( + (1 to N).map(i => ("1000-01-01", s"1000-01-$i")), + "date", + s"before_1582_date_v$version.snappy.parquet") withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "TIMESTAMP_MILLIS") { save( (1 to N).map(i => ("1001-01-01 01:02:03.123", s"1001-01-$i 01:02:03.123")), "timestamp", - "ts_millis") + s"before_1582_timestamp_millis_v$version.snappy.parquet") } val usTs = (1 to N).map(i => ("1001-01-01 01:02:03.123456", s"1001-01-$i 01:02:03.123456")) withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "TIMESTAMP_MICROS") { - save(usTs, "timestamp", "ts_micros") + save(usTs, "timestamp", s"before_1582_timestamp_micros_v$version.snappy.parquet") } withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "INT96") { - save(usTs, "timestamp", "ts_int96_plain", Map("parquet.enable.dictionary" -> "false")) - save(usTs, "timestamp", "ts_int96_dict", Map("parquet.enable.dictionary" -> "true")) + save( + usTs, + "timestamp", + s"before_1582_timestamp_int96_plain_v$version.snappy.parquet", + Map("parquet.enable.dictionary" -> "false")) + save( + usTs, + "timestamp", + s"before_1582_timestamp_int96_dict_v$version.snappy.parquet", + Map("parquet.enable.dictionary" -> "true")) } } } From 4359c4d7f5eb100800d55b2d6139bcb23f1891a6 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Sat, 23 May 2020 23:15:03 +0300 Subject: [PATCH 03/11] Re-gen parquet files --- .../before_1582_date_v2_4.snappy.parquet | Bin 398 -> 0 bytes .../before_1582_date_v2_4_5.snappy.parquet | Bin 0 -> 660 bytes .../before_1582_date_v2_4_6.snappy.parquet | Bin 0 -> 694 bytes ...582_timestamp_int96_dict_v2_4_5.snappy.parquet | Bin 0 -> 737 bytes ...582_timestamp_int96_dict_v2_4_6.snappy.parquet | Bin 0 -> 771 bytes ...82_timestamp_int96_plain_v2_4_5.snappy.parquet | Bin 0 -> 693 bytes ...82_timestamp_int96_plain_v2_4_6.snappy.parquet | Bin 0 -> 727 bytes ...efore_1582_timestamp_int96_v2_4.snappy.parquet | Bin 494 -> 0 bytes ...fore_1582_timestamp_micros_v2_4.snappy.parquet | Bin 436 -> 0 bytes ...re_1582_timestamp_micros_v2_4_5.snappy.parquet | Bin 0 -> 767 bytes ...re_1582_timestamp_micros_v2_4_6.snappy.parquet | Bin 0 -> 801 bytes ...fore_1582_timestamp_millis_v2_4.snappy.parquet | Bin 436 -> 0 bytes ...re_1582_timestamp_millis_v2_4_5.snappy.parquet | Bin 0 -> 761 bytes ...re_1582_timestamp_millis_v2_4_6.snappy.parquet | Bin 0 -> 795 bytes .../datasources/parquet/ParquetIOSuite.scala | 2 +- 15 files changed, 1 insertion(+), 1 deletion(-) delete mode 100644 sql/core/src/test/resources/test-data/before_1582_date_v2_4.snappy.parquet create mode 100644 sql/core/src/test/resources/test-data/before_1582_date_v2_4_5.snappy.parquet create mode 100644 sql/core/src/test/resources/test-data/before_1582_date_v2_4_6.snappy.parquet create mode 100644 sql/core/src/test/resources/test-data/before_1582_timestamp_int96_dict_v2_4_5.snappy.parquet create mode 100644 sql/core/src/test/resources/test-data/before_1582_timestamp_int96_dict_v2_4_6.snappy.parquet create mode 100644 sql/core/src/test/resources/test-data/before_1582_timestamp_int96_plain_v2_4_5.snappy.parquet create mode 100644 sql/core/src/test/resources/test-data/before_1582_timestamp_int96_plain_v2_4_6.snappy.parquet delete mode 100644 sql/core/src/test/resources/test-data/before_1582_timestamp_int96_v2_4.snappy.parquet delete mode 100644 sql/core/src/test/resources/test-data/before_1582_timestamp_micros_v2_4.snappy.parquet create mode 100644 sql/core/src/test/resources/test-data/before_1582_timestamp_micros_v2_4_5.snappy.parquet create mode 100644 sql/core/src/test/resources/test-data/before_1582_timestamp_micros_v2_4_6.snappy.parquet delete mode 100644 sql/core/src/test/resources/test-data/before_1582_timestamp_millis_v2_4.snappy.parquet create mode 100644 sql/core/src/test/resources/test-data/before_1582_timestamp_millis_v2_4_5.snappy.parquet create mode 100644 sql/core/src/test/resources/test-data/before_1582_timestamp_millis_v2_4_6.snappy.parquet diff --git a/sql/core/src/test/resources/test-data/before_1582_date_v2_4.snappy.parquet b/sql/core/src/test/resources/test-data/before_1582_date_v2_4.snappy.parquet deleted file mode 100644 index 7d5cc12eefe04f18f8a2ba99db22326dc0399a80..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 398 zcmaKp!Arw16vneSMv)y&X&{FZ+}NOo*0F9aUc9NN;YkpYbWL^Gc3aX;Wc2F);J@lc z-`oUGULNo9eeeCgklE#JKoB`0jz=&w9bz%3{r>U!wXZfB{Fx9!$EO(TQEO)*n0i-T zvy%Gd(afJ!AqhqV_psAWWust)0sTc|uWJig!Z zQWiUD7ZZeo@i3Kjp35W$9Tt_xe^2sw+cXaia&8X5vqHNYrOjelx`7CM5x9e-UgW7O tr%{?svnU#!C!xy1a5PH&&<`Uu_UBPDi6(xck~A1n#^8C^Ere$96W`?#bIbq$ diff --git a/sql/core/src/test/resources/test-data/before_1582_date_v2_4_5.snappy.parquet b/sql/core/src/test/resources/test-data/before_1582_date_v2_4_5.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..0f54f8cb438086ada6179bee5eea90bf8006b5cd GIT binary patch literal 660 zcmb7CO-tiY6uk*ivgjfUUr5F{mIoGv7!r%o6gTcfToku5$V>C8VbTwi)L|(72QC#{ z%z75H^k4ZGW;32N)#^qV$hqOX`_9QdmvoQM1Ew*ZHO`pIG$O5GJ^ua2h#73nMbX$Q z(;Z8zPU~vY_B%#Ix}%~R3K@`{v2G=!g>3VKY{?*K-Pf|p*#^D<>wp7n0AGPhz*PI_ zq*07T{{6D(UCV*MoJ`$PEiFlV8dC`wsuu?RrPN*oulh4{v)_vOM?n*A%V5=ZmW*r%HW&~#<`X8sfweQr zuRJjp$l}dodBsiUA>YkJmR>&k;SFRd5>X24L7DRM=UDO%FUowCbbcH%+yQAUw#K;pE1*C_5qTQU){G#wXdFp`dY7$x3I2SV62J9K;%89Z?F1&W?W6 z@}bNw&hyS6)sr|U(KmH!#7U>vtW{)Tn98J&%(j@r+=l6(aDUM-S=C2$+p@1U`$lV+ zN8jXOE@n1}4w#|5aL{|L@-l(Z>N8Wb2hD!eu!wH+VC8n^448v9I3U{CPdEWEm(oG@ zTgairEMgiBaX`J)cuQBh-URf#^g@suo3Kg>cC{1OQVjYFbKlT-t%*jvy literal 0 HcmV?d00001 diff --git a/sql/core/src/test/resources/test-data/before_1582_timestamp_int96_dict_v2_4_5.snappy.parquet b/sql/core/src/test/resources/test-data/before_1582_timestamp_int96_dict_v2_4_5.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..667ca9c44840d058a920347d88e4122596f3e34a GIT binary patch literal 737 zcmbVKPm9w)6n{x$37A9FoiQURmH|r_Vn{5TZAvfRME0x>~f{-`Kkr?F&Zc*)x)jy3hxP1{GHi+KLvv~mwzG~-M zt%2lL!BR7mptO0piJ*rt4Z36oh$`nfcy)-}~O=mV{E>0S06WwOUpXm0ELjNx)0O+=gv#CJM_E90H$2Bj&vn|Q%2Hflzu!})3 zD8{XhF=klJg|^jHM#=5V$^xW@YyC!{)q4BsCsD2O1Eb7E@$zXoc_A}_Vx)anWm=p} zs%E*9ry`jFH7(n6d%8{9)?r)IttN-4)TCb(`c3KS<88OwyMK7MTy(4vweDyA0qA{K zF14B>)?)8f`dLA6jDD`(qCZ;H#B;{x#d9ulfwuVCA%7){GMUfV&OSfj0kFL*8Rc2Z zu^DFzS@B|i&NEqwSX6?2P?k>hCYR30DXU^JsT|jtCNhmnXY_dK%tTg0b*S1nW-!2g8#Uu!oz%?`1`I5gg5FH8E_@IOOv0DjbY8Gm2P=!=Cie-E$4@qvoFu544>YjCW=Qs-==<0J5o_@^2?uWRrEgJ4jMmvyag;52Jm z6LX@P{X?zHdGYmAIs76rPAcf&wQ5wNWGI*ElRV|g2&isarroSrq-yR}p=K^IM5Q|Y zt#5@6Q$%?jz^vOmf0GrBGf|Yuc+~J5 z&vm>;E1V>0)ZpD9io0W-+bTu%;fap(~to9WRXLkxR|v`$72q>9jfps(i)Qjlmx{EI;J(dTX+AOQbf+p*J_YJ>kNf?ON80?T-2uMV5SzO6 zyG=h^{myusbbIv=Ztfu+4unAcuhw;wb@x@59~S*+VR4MuH{|qNi@Lyl#%A?PsVapa za>XRib=~B%Dcd=ahcW`T|C^#YDSDeXth7ysNYriN@6Pi4nfY za$Tg27(H8vsVXl@7pH`UjnL$!Zj34`+?vjdLL~)?@N4ahOJgxwESLV3!Rl=P!{PZ7 z*ptoR56Zf?A?<}S3}o2bOXhiz_S7&=vtbs;gM%c}SriQhX%GcbtdD|8oE*o;L86m1 R+~>XrfDi2A1)Sn9{R=)LuO$Eg literal 0 HcmV?d00001 diff --git a/sql/core/src/test/resources/test-data/before_1582_timestamp_int96_v2_4.snappy.parquet b/sql/core/src/test/resources/test-data/before_1582_timestamp_int96_v2_4.snappy.parquet deleted file mode 100644 index 13254bd93a5e6c54a13beb4314757d76b98117e5..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 494 zcmaixPfNov7{=dbrS!0ah$#(3ECt5~wX}|P>+s@D#KZ6^BI%lHvE6Eu`iHS^;dk@m z(GOxeCb)}U65i+c=6#+dv-9f^>)6Kh3L~robUk>Tu=hiN{f{HSfcNl$*CK3SliJ<`2AN7y&kpk&UDP3lU0a&!pHOBSSYyt72p! z>r=D7w4QmkYrI?i6N7l_Dtc=Nn+>3yihxjiu&nQWRxuUG-(>T}Q{k(SPYsuTy?pUY zE?LSXvu{?ee5<%Sb%m^(xpX}@%XyIscdF2Wl`WD&$=f!Wmt06zR!UklMZuCnwWzb) zbAP6~)75%?y9WDm>v>Yv&NHt?zFat=9|nHt^phqpQio0AG@E2`JUmSzo<-4cm ztAFr>n#y1R0Gl`hLf!7Xy60jc44ePf;A&ZL(`Q*r&8zt{FQpc#(83+Bn0{GEHe^ay z^;9#TWw|U;#fD}y6Xj;4m9e*LIxnTtqFfkjRu_dx3bSIe(r3F{*>JU9PuAdGZr@-X0`*GcNUNIfx#(`=B%aql#VWEMreUK&I}6wC8q8Ylg@A0#qK!!DuJ MJaKFRjLd)i023L1p8x;= diff --git a/sql/core/src/test/resources/test-data/before_1582_timestamp_micros_v2_4_5.snappy.parquet b/sql/core/src/test/resources/test-data/before_1582_timestamp_micros_v2_4_5.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..7f6e37d8a5cf3ac32cdee5762827d23af7c8bdb5 GIT binary patch literal 767 zcmbV~zl+pB6vyA>;*sKtfHxt*6zjk(w-7_(a=FcV2sUzv#c`Ez9x~a?YB1T&C0Rwo z!WD;t*ol>eh1FFycK)aYYdgXE{t63y$!1+}wGqO+neTh~%zNKt@7lggWvbHkn^dAQ zfMUY?H>bA_U;Ly{$rdg$U8RbtEAz}^r@`i5RH!f#t||c)G0zEoLMjlQC{1Ovot=vC5#_+wf@)E-pa`6$AhTH0IF=Pnv3&hSgR~DjQp?vSQ5YsHCadJEDCc zZDnWC`C|7EGYkt2l&J(beiZ=PpUI33$evwW!=FjoiG<0DzT`zsUsAy^5eqd|`?R8| zDCnJWcGu=9$64)qY&$=SZJhsTA__YjKeQ83@Q@c=e@#kOJW2)Yv%JVAgMwLX7>PK{ zS^uD9Bc4>L$|g)Sg+-Kzyx>WS(r6OLJczLf-xd~|gIT|vPOaa7+S>mh98N*M(KLQ3 z$yyIYmPg}J%e7s{c3YdlB#Ofp@A=`dH}w7P<-ij|&+B$W$8$ViY&ioz*!H)bKm?)N QF)R}RA2^SnVGn=(U&Eo{Gynhq literal 0 HcmV?d00001 diff --git a/sql/core/src/test/resources/test-data/before_1582_timestamp_micros_v2_4_6.snappy.parquet b/sql/core/src/test/resources/test-data/before_1582_timestamp_micros_v2_4_6.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..442f12ceacb9414441a4bd796c2152e51cdb358c GIT binary patch literal 801 zcmbW0zl+pB6vyA>az~0QQrv_DQ>=koZXpiIt()7dhhQUzSR7Xg=OL5LtOk?aT#{Ap z!h(e;h@DtjSXfavVZNsr7~6N#!V_w z89*`N{hPB}M=yR-sAL-#nXXaA)RlQ=vD0AdFDg_R30IYXikRnwJ|Ptd=Agr;r{{RZ zAD|@_5!d%>P~8<=D8Ix+SuYRHA7S|3T6u}#3b}ZO;nlaRABoca@%Hvtq(jZ4lCitD zo~Jy!e<$zX6A8ybsk|xaav1fC&5hY%HH~>R#FM5PreQY~lgdVCO;(Iq9hEdydq=bn zq^ayKI$!MmVTNI&fijf<$FBlF^D~*T6)^PKwNv;rNqa0|vZ60}F{Ll5V3>%7DOUTm zqN!lmx<1P8GM;jrl}*QH4@8znqoKaK%`P(!^wv^EejGD&=y)s&mW>{=L=-&a1=nAb z(kf0;VfC!M$j1G`vaLZR;xM;*hov>-NtLRM#6(?KM2W}?o}?%Z$8pSq7>n?2VOw*s z)hj0x`!`@}{(lgTCZOM_8^4rfje63C%Up-KjjdoD#bJYY{czA7_2*bF?tXWLP`TUl)%OYHMGuk>+s@DJc-vKlCG(iwOiMu${70_9{hd& zkD1BFE_w+s@AtiV-jK(;rw{|C*z++mqK$3p+G-B#;Cr__DF=FgxBzf*gUoqm9U~YK z|NdN6qMpB~*_$i{Mu1x#`?7sM}xGuUu5Zu=#Hdu2uy%eO8Usyk36rQfiS3E!^pb>2)R9gehG&GtGRK z<+4Z>o0!o;l>3oZ#y+g+yp&3dvNG19DGHGkX2oQs&yKgU$!5EK-h%sZU|yBA_bzLd zFBe|O!+?igFKO~3^~5+%vvC&3gX<)cSriQhX%GcbEQi4?PDb%4NMw?ReL|^u;?x3o HH2?JjJC=Zu diff --git a/sql/core/src/test/resources/test-data/before_1582_timestamp_millis_v2_4_5.snappy.parquet b/sql/core/src/test/resources/test-data/before_1582_timestamp_millis_v2_4_5.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..c0cf8358af72a1b6df0111d14a77eee39b6497a5 GIT binary patch literal 761 zcmbV~F^kkd6vrpI>@md^L7k95ie==MJBT51x$I`SMi3EEIBZ2^vYFLz$!;z)dvdI+ z#Wh!GqlH+AU&8VV3&9UzE4H?F=bL2L1y>s({4?`^FTXeMWe#rK4~aoca^)`3iGdJy z#UVNgzFk}pq-+nwAe#icHeN;$ZKT;yZeMCPtuklX7n+`lwmwS_kj14!v6aV+@W5 zqcez(pmX&?sPkEC*a+B7*&2SPTR$}jD*D=RM2oLATzOy*^wW4Yt^GalsaXa+G zzz^Fy$u!H;HtWafs6UG1?)4<%qbTZj(;y0>nC}I{IO)Z`AmK?G?mC`}5IpEI{DuR# G^uGW;!{*fh literal 0 HcmV?d00001 diff --git a/sql/core/src/test/resources/test-data/before_1582_timestamp_millis_v2_4_6.snappy.parquet b/sql/core/src/test/resources/test-data/before_1582_timestamp_millis_v2_4_6.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..f81afe412bc4622875ded7bc5f9d70e6a5d56222 GIT binary patch literal 795 zcmbW0F^kkd6vtm;_DFF>5GQ0H#S*#Y4&sp9y4lTgjUXbTaM+5-WHYPblHFV;s~qcU zVRMBxT8M@CB`mM75d09fVry%6z9hRYxY`KepPBbwesA7C%;B}WKGBFlF5M<7(ExN? z9>IzC?fjfT6?RcHvPE>;(pQ=F%7op&M4`f{sD=UtVqPlbX~lsR=g0Q@s>2_gjVKas zTyHq5*78f|SxeV1-+6)hT%MwSefJ8r|LJv0AH4bgwgSr~im3G9=C;gvar8ir9*K?l#YI|KJ Date: Sat, 23 May 2020 23:15:35 +0300 Subject: [PATCH 04/11] Ignore gen test --- .../sql/execution/datasources/parquet/ParquetIOSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 9e63bfd97f5a4..88d4ced9930ea 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -884,7 +884,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } - test("...: generate test files for checking compatibility with Spark 2.4") { + ignore("...: generate test files for checking compatibility with Spark 2.4") { val resourceDir = "sql/core/src/test/resources/test-data" val version = "2_4_5" val N = 8 From b07290efd23e498880d04e9f88e9415fe02952c3 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Sun, 24 May 2020 12:28:51 +0300 Subject: [PATCH 05/11] Fix dates --- .../sql/execution/datasources/parquet/ParquetIOSuite.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 88d4ced9930ea..cc5f27232d74b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -912,16 +912,16 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession SQLConf.SESSION_LOCAL_TIMEZONE.key -> DateTimeTestUtils.LA.getId, SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> "CORRECTED") { save( - (1 to N).map(i => ("1000-01-01", s"1000-01-$i")), + (1 to N).map(i => ("1001-01-01", s"1001-01-0$i")), "date", s"before_1582_date_v$version.snappy.parquet") withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "TIMESTAMP_MILLIS") { save( - (1 to N).map(i => ("1001-01-01 01:02:03.123", s"1001-01-$i 01:02:03.123")), + (1 to N).map(i => ("1001-01-01 01:02:03.123", s"1001-01-0$i 01:02:03.123")), "timestamp", s"before_1582_timestamp_millis_v$version.snappy.parquet") } - val usTs = (1 to N).map(i => ("1001-01-01 01:02:03.123456", s"1001-01-$i 01:02:03.123456")) + val usTs = (1 to N).map(i => ("1001-01-01 01:02:03.123456", s"1001-01-0$i 01:02:03.123456")) withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "TIMESTAMP_MICROS") { save(usTs, "timestamp", s"before_1582_timestamp_micros_v$version.snappy.parquet") } From 707e936778a8192c7b9247a5f8070cfed15a3602 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Sun, 24 May 2020 14:47:09 +0300 Subject: [PATCH 06/11] Bug fix tests --- .../before_1582_date_v2_4_5.snappy.parquet | Bin 660 -> 660 bytes .../before_1582_date_v2_4_6.snappy.parquet | Bin 694 -> 694 bytes ...timestamp_int96_dict_v2_4_5.snappy.parquet | Bin 737 -> 737 bytes ...timestamp_int96_dict_v2_4_6.snappy.parquet | Bin 771 -> 771 bytes ...imestamp_int96_plain_v2_4_5.snappy.parquet | Bin 693 -> 693 bytes ...imestamp_int96_plain_v2_4_6.snappy.parquet | Bin 727 -> 727 bytes ...582_timestamp_micros_v2_4_5.snappy.parquet | Bin 767 -> 767 bytes ...582_timestamp_micros_v2_4_6.snappy.parquet | Bin 801 -> 801 bytes ...582_timestamp_millis_v2_4_5.snappy.parquet | Bin 761 -> 761 bytes ...582_timestamp_millis_v2_4_6.snappy.parquet | Bin 795 -> 795 bytes .../datasources/parquet/ParquetIOSuite.scala | 48 +++++++++++++----- 11 files changed, 34 insertions(+), 14 deletions(-) diff --git a/sql/core/src/test/resources/test-data/before_1582_date_v2_4_5.snappy.parquet b/sql/core/src/test/resources/test-data/before_1582_date_v2_4_5.snappy.parquet index 0f54f8cb438086ada6179bee5eea90bf8006b5cd..edd61c9b9fec8239636030cbda2789e223fc6320 100644 GIT binary patch delta 228 zcmbQjI)&9fz%j^Bltq+7l*dPuNtA_wfrY1S=CA*v459*}B04~VMU+jHLq>uHBnlzL z7&PF#iS~+)y(lsW9s>h|+7u=rDZmKQ+z!MYK->w$T|nFo#63XVJ9FYfdwmWz7D)+~ wl+5H3Q6@0~u^vXTDU50yHn{Aa%+Dyx%)!7mS)Wlt3}Ge6X)xaASVnV30ET{6-~a#s delta 228 zcmbQjI)&9fz%j^Bltq+7l*dPuNtA_wfrUq6#;^aP459*}B04~VMU+jHLq>uHBnlzL z7&PF#iS~+)@+dM09s>h|+7u=rDZmKQED6L?Kr9W!GC(W~#Bx9^KV#xTdwn(*4oL}? wl+5H3Q6@0~u^vXTDU50yHn{Aa%+Dyx%*MbmS)Wlt3}Ge6X)xaASVnV302_f!2><{9 diff --git a/sql/core/src/test/resources/test-data/before_1582_date_v2_4_6.snappy.parquet b/sql/core/src/test/resources/test-data/before_1582_date_v2_4_6.snappy.parquet index 6e224dd0dd2e15e0464c5d4def9adb2b4379cca3..01f4887f5e994c77fc587378e7f3a19868745681 100644 GIT binary patch delta 228 zcmdnSx{cL7z%j^Bltq+7l*dPuNtA_wfrY1S=CA*v459*}B04~VMU+jHLq>uHBnlzL z7&PF#iS~+)y(lsW9s>h|+7u=rDZmKQ+z!MYK->w$T|nFo#63XVJ9FYfdwmu*4oL}? wl+5H3Q6@0~u^vXTDU50yHn{Aa%+Dyx%)rJmS)Wlt3}Ge6X)xaASjI?30K)uNMgRZ+ delta 228 zcmdnSx{cL7z%j^Bltq+7l*dPuNtA_wfrUq6#;^aP459*}B04~VMU+jHLq>uHBnlzL z7&PF#iS~+)@+dM09s>h|+7u=rDZmKQED6L?Kr9W!GC(W~#Bx9^KV#xTdwn(*4oL}? wl+5H3Q6@0~u^vXTDU50yHn{Aa%+Dyx%*MemS)Wlt3}Ge6X)xaASjI?309YAJZvX%Q diff --git a/sql/core/src/test/resources/test-data/before_1582_timestamp_int96_dict_v2_4_5.snappy.parquet b/sql/core/src/test/resources/test-data/before_1582_timestamp_int96_dict_v2_4_5.snappy.parquet index 667ca9c44840d058a920347d88e4122596f3e34a..c7e8d3926f63ab372d898a40667cb5dc33437ede 100644 GIT binary patch delta 24 dcmaFJ`jB;l2qQBG8_Q&I#!L{sxtTGJ5dck%1!e#M delta 24 dcmaFJ`jB;l2qQBa3&&(}#!L{sxtTGJ5dckr1!e#M diff --git a/sql/core/src/test/resources/test-data/before_1582_timestamp_int96_dict_v2_4_6.snappy.parquet b/sql/core/src/test/resources/test-data/before_1582_timestamp_int96_dict_v2_4_6.snappy.parquet index 580a9999ebd2ddca5fb1e6f076a896df3c8c53ab..939e2b8088eb0f15d870f7881ac421e26187036a 100644 GIT binary patch delta 22 ccmZo>Yi8RZ!pOwJHd%}@3rKBlVr*mt05@XYi8RZ!pOwNGFgl<3rKBlVr*mt05@j@yZ`_I diff --git a/sql/core/src/test/resources/test-data/before_1582_timestamp_int96_plain_v2_4_5.snappy.parquet b/sql/core/src/test/resources/test-data/before_1582_timestamp_int96_plain_v2_4_5.snappy.parquet index c59fef987f48a0b504b489829406bfb2b85b6c0f..88a94ac482052a66482aad13059be4897b10b850 100644 GIT binary patch delta 22 bcmdnWx|MapH)ak7wuwKJK-A_q#&AXeRd)t+ delta 22 bcmdnWx|MapH)b{lj)^~$K-A_q#&AXeRc{7! diff --git a/sql/core/src/test/resources/test-data/before_1582_timestamp_int96_plain_v2_4_6.snappy.parquet b/sql/core/src/test/resources/test-data/before_1582_timestamp_int96_plain_v2_4_6.snappy.parquet index 06acbe6374b5651bf56376ca1ebbea82c7d31b00..68bfa33aac13f7d8af14dca353195c8f421bdea2 100644 GIT binary patch delta 22 bcmcc4dYyH`H)aMlj)^~$K-A_q#!5y2TRR5X delta 22 bcmcc4dYyH`H)b{thKWCuK-A_q#!5y2TUQ3z diff --git a/sql/core/src/test/resources/test-data/before_1582_timestamp_micros_v2_4_5.snappy.parquet b/sql/core/src/test/resources/test-data/before_1582_timestamp_micros_v2_4_5.snappy.parquet index 7f6e37d8a5cf3ac32cdee5762827d23af7c8bdb5..62e6048354dc1026f86af85a5c8b29eb492dfaf8 100644 GIT binary patch delta 24 fcmey*`k!@!A|o>g8_Q%B#z!3&&&?#z3mE}G#swt+ delta 23 ecmZ3;wvcUuA|n$U%VcH7C}uVehRqF(3mE}G)&(U1 diff --git a/sql/core/src/test/resources/test-data/before_1582_timestamp_millis_v2_4_5.snappy.parquet b/sql/core/src/test/resources/test-data/before_1582_timestamp_millis_v2_4_5.snappy.parquet index c0cf8358af72a1b6df0111d14a77eee39b6497a5..a7cef9e60f13425560f516f4ffb32f0bb4b1831a 100644 GIT binary patch delta 24 fcmey#`jd5oG$S(y8_Q%l#z (String, String)): Unit = { withTempPaths(2) { paths => paths.foreach(_.delete()) val path2_4 = getResourceParquetFilePath("test-data/" + fileName) val path3_0 = paths(0).getCanonicalPath val path3_0_rebase = paths(1).getCanonicalPath if (dt == "date") { - val df = Seq(dataStr).toDF("str").select($"str".cast("date").as("date")) - + val df = Seq.tabulate(N)(rowFunc) + .toDF("dict", "plain") + .select($"dict".cast("date"), $"plain".cast("date")) // By default we should fail to write ancient datetime values. var e = intercept[SparkException](df.write.parquet(path3_0)) assert(e.getCause.getCause.getCause.isInstanceOf[SparkUpgradeException]) @@ -971,10 +976,17 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key -> LEGACY.toString) { checkAnswer( spark.read.format("parquet").load(path2_4, path3_0, path3_0_rebase), - 1.to(3).map(_ => Row(java.sql.Date.valueOf(dataStr)))) + (0 until N).flatMap { i => + val (dictS, plainS) = rowFunc(i) + Seq.tabulate(3) { _ => + Row(java.sql.Date.valueOf(dictS), java.sql.Date.valueOf(plainS)) + } + }) } } else { - val df = Seq(dataStr).toDF("str").select($"str".cast("timestamp").as("ts")) + val df = Seq.tabulate(N)(rowFunc) + .toDF("dict", "plain") + .select($"dict".cast("timestamp"), $"plain".cast("timestamp")) withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> dt) { // By default we should fail to write ancient datetime values. var e = intercept[SparkException](df.write.parquet(path3_0)) @@ -995,7 +1007,12 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key -> LEGACY.toString) { checkAnswer( spark.read.format("parquet").load(path2_4, path3_0, path3_0_rebase), - 1.to(3).map(_ => Row(java.sql.Timestamp.valueOf(dataStr)))) + (0 until N).flatMap { i => + val (dictS, plainS) = rowFunc(i) + Seq.tabulate(3) { _ => + Row(java.sql.Timestamp.valueOf(dictS), java.sql.Timestamp.valueOf(plainS)) + } + }) } } } @@ -1003,20 +1020,23 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession Seq(false, true).foreach { vectorized => withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> vectorized.toString) { - checkReadMixedFiles("before_1582_date_v2_4.snappy.parquet", "date", "1001-01-01") checkReadMixedFiles( - "before_1582_timestamp_micros_v2_4.snappy.parquet", + "before_1582_date_v2_4_5.snappy.parquet", + "date", + (i: Int) => ("1001-01-01", s"1001-01-0${i + 1}")) + checkReadMixedFiles( + "before_1582_timestamp_micros_v2_4_5.snappy.parquet", "TIMESTAMP_MICROS", - "1001-01-01 01:02:03.123456") + (i: Int) => ("1001-01-01 01:02:03.123456", s"1001-01-0${i + 1} 01:02:03.123456")) checkReadMixedFiles( - "before_1582_timestamp_millis_v2_4.snappy.parquet", + "before_1582_timestamp_millis_v2_4_5.snappy.parquet", "TIMESTAMP_MILLIS", - "1001-01-01 01:02:03.123") + (i: Int) => ("1001-01-01 01:02:03.123", s"1001-01-0${i + 1} 01:02:03.123")) // INT96 is a legacy timestamp format and we always rebase the seconds for it. - checkAnswer(readResourceParquetFile( - "test-data/before_1582_timestamp_int96_v2_4.snappy.parquet"), - Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456"))) +// checkAnswer(readResourceParquetFile( +// "test-data/before_1582_timestamp_int96_v2_4.snappy.parquet"), +// Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456"))) } } } From 95e73bca6a62e85644164b1dff35d533470f7181 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Sun, 24 May 2020 15:10:54 +0300 Subject: [PATCH 07/11] Fix merge --- .../datasources/parquet/ParquetIOSuite.scala | 21 +++++++++++-------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 74e9708897679..c4dcec464f475 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -1010,22 +1010,25 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } - Seq(false, true).foreach { version => + Seq("2_4_5").foreach { version => withAllParquetReaders { - checkReadMixedFiles("before_1582_date_v2_4.snappy.parquet", "date", "1001-01-01") checkReadMixedFiles( - "before_1582_timestamp_micros_v2_4.snappy.parquet", + s"before_1582_date_v$version.snappy.parquet", + "date", + (i: Int) => ("1001-01-01", s"1001-01-0${i + 1}")) + checkReadMixedFiles( + s"before_1582_timestamp_micros_v$version.snappy.parquet", "TIMESTAMP_MICROS", - "1001-01-01 01:02:03.123456") + (i: Int) => ("1001-01-01 01:02:03.123456", s"1001-01-0${i + 1} 01:02:03.123456")) checkReadMixedFiles( - "before_1582_timestamp_millis_v2_4.snappy.parquet", + s"before_1582_timestamp_millis_v$version.snappy.parquet", "TIMESTAMP_MILLIS", - "1001-01-01 01:02:03.123") + (i: Int) => ("1001-01-01 01:02:03.123", s"1001-01-0${i + 1} 01:02:03.123")) // INT96 is a legacy timestamp format and we always rebase the seconds for it. - checkAnswer(readResourceParquetFile( - "test-data/before_1582_timestamp_int96_v2_4.snappy.parquet"), - Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456"))) + // checkAnswer(readResourceParquetFile( + // "test-data/before_1582_timestamp_int96_v2_4.snappy.parquet"), + // Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456"))) } } } From b0e4a32098542262d0219a8dcd847cf03c80f46e Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Sun, 24 May 2020 17:30:58 +0300 Subject: [PATCH 08/11] Check 2.4 files in read by default --- .../datasources/parquet/ParquetIOSuite.scala | 123 ++++++++---------- 1 file changed, 55 insertions(+), 68 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index c4dcec464f475..04d68c9a5fa95 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -876,7 +876,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } - ignore("...: generate test files for checking compatibility with Spark 2.4") { + ignore("generate test files for checking compatibility with Spark 2.4") { val resourceDir = "sql/core/src/test/resources/test-data" val version = "2_4_5" val N = 8 @@ -936,99 +936,86 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession test("SPARK-31159: compatibility with Spark 2.4 in reading dates/timestamps") { val N = 8 // test reading the existing 2.4 files and new 3.0 files (with rebase on/off) together. - def checkReadMixedFiles( + def checkReadMixedFiles[T]( fileName: String, - dt: String, - rowFunc: Int => (String, String)): Unit = { + catalystType: String, + rowFunc: Int => (String, String), + toJavaType: String => T, + checkDefaultLegacyRead: String => Unit, + tsOutputType: String = "TIMESTAMP_MICROS"): Unit = { withTempPaths(2) { paths => paths.foreach(_.delete()) val path2_4 = getResourceParquetFilePath("test-data/" + fileName) val path3_0 = paths(0).getCanonicalPath val path3_0_rebase = paths(1).getCanonicalPath - if (dt == "date") { - val df = Seq.tabulate(N)(rowFunc) - .toDF("dict", "plain") - .select($"dict".cast("date"), $"plain".cast("date")) + val df = Seq.tabulate(N)(rowFunc).toDF("dict", "plain") + .select($"dict".cast(catalystType), $"plain".cast(catalystType)) + withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> tsOutputType) { + checkDefaultLegacyRead(path2_4) // By default we should fail to write ancient datetime values. - var e = intercept[SparkException](df.write.parquet(path3_0)) + val e = intercept[SparkException](df.write.parquet(path3_0)) assert(e.getCause.getCause.getCause.isInstanceOf[SparkUpgradeException]) - // By default we should fail to read ancient datetime values. - e = intercept[SparkException](spark.read.parquet(path2_4).collect()) - assert(e.getCause.isInstanceOf[SparkUpgradeException]) - withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> CORRECTED.toString) { df.write.mode("overwrite").parquet(path3_0) } withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) { df.write.parquet(path3_0_rebase) } - - // For Parquet files written by Spark 3.0, we know the writer info and don't need the - // config to guide the rebase behavior. - withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key -> LEGACY.toString) { - checkAnswer( - spark.read.format("parquet").load(path2_4, path3_0, path3_0_rebase), - (0 until N).flatMap { i => - val (dictS, plainS) = rowFunc(i) - Seq.tabulate(3) { _ => - Row(java.sql.Date.valueOf(dictS), java.sql.Date.valueOf(plainS)) - } - }) - } - } else { - val df = Seq.tabulate(N)(rowFunc) - .toDF("dict", "plain") - .select($"dict".cast("timestamp"), $"plain".cast("timestamp")) - withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> dt) { - // By default we should fail to write ancient datetime values. - var e = intercept[SparkException](df.write.parquet(path3_0)) - assert(e.getCause.getCause.getCause.isInstanceOf[SparkUpgradeException]) - // By default we should fail to read ancient datetime values. - e = intercept[SparkException](spark.read.parquet(path2_4).collect()) - assert(e.getCause.isInstanceOf[SparkUpgradeException]) - - withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> CORRECTED.toString) { - df.write.mode("overwrite").parquet(path3_0) - } - withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> LEGACY.toString) { - df.write.parquet(path3_0_rebase) - } - } - // For Parquet files written by Spark 3.0, we know the writer info and don't need the - // config to guide the rebase behavior. - withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key -> LEGACY.toString) { - checkAnswer( - spark.read.format("parquet").load(path2_4, path3_0, path3_0_rebase), - (0 until N).flatMap { i => - val (dictS, plainS) = rowFunc(i) - Seq.tabulate(3) { _ => - Row(java.sql.Timestamp.valueOf(dictS), java.sql.Timestamp.valueOf(plainS)) - } - }) - } + } + // For Parquet files written by Spark 3.0, we know the writer info and don't need the + // config to guide the rebase behavior. + withSQLConf(SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_READ.key -> LEGACY.toString) { + checkAnswer( + spark.read.format("parquet").load(path2_4, path3_0, path3_0_rebase), + (0 until N).flatMap { i => + val (dictS, plainS) = rowFunc(i) + Seq.tabulate(3) { _ => + Row(toJavaType(dictS), toJavaType(plainS)) + } + }) } } } - - Seq("2_4_5").foreach { version => + def failInRead(path: String): Unit = { + val e = intercept[SparkException](spark.read.parquet(path).collect()) + assert(e.getCause.isInstanceOf[SparkUpgradeException]) + } + def successInRead(path: String): Unit = spark.read.parquet(path).collect() + Seq( + // By default we should fail to read ancient datetime values when parquet files don't + // contain Spark version. + "2_4_5" -> failInRead _, + "2_4_6" -> successInRead _).foreach { case (version, checkDefaultRead) => withAllParquetReaders { checkReadMixedFiles( s"before_1582_date_v$version.snappy.parquet", "date", - (i: Int) => ("1001-01-01", s"1001-01-0${i + 1}")) + (i: Int) => ("1001-01-01", s"1001-01-0${i + 1}"), + java.sql.Date.valueOf, + checkDefaultRead) checkReadMixedFiles( s"before_1582_timestamp_micros_v$version.snappy.parquet", - "TIMESTAMP_MICROS", - (i: Int) => ("1001-01-01 01:02:03.123456", s"1001-01-0${i + 1} 01:02:03.123456")) + "timestamp", + (i: Int) => ("1001-01-01 01:02:03.123456", s"1001-01-0${i + 1} 01:02:03.123456"), + java.sql.Timestamp.valueOf, + checkDefaultRead) checkReadMixedFiles( s"before_1582_timestamp_millis_v$version.snappy.parquet", - "TIMESTAMP_MILLIS", - (i: Int) => ("1001-01-01 01:02:03.123", s"1001-01-0${i + 1} 01:02:03.123")) - + "timestamp", + (i: Int) => ("1001-01-01 01:02:03.123", s"1001-01-0${i + 1} 01:02:03.123"), + java.sql.Timestamp.valueOf, + checkDefaultRead, + tsOutputType = "TIMESTAMP_MILLIS") // INT96 is a legacy timestamp format and we always rebase the seconds for it. - // checkAnswer(readResourceParquetFile( - // "test-data/before_1582_timestamp_int96_v2_4.snappy.parquet"), - // Row(java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456"))) + Seq("plain", "dict").foreach { enc => + checkAnswer(readResourceParquetFile( + s"test-data/before_1582_timestamp_int96_${enc}_v$version.snappy.parquet"), + Seq.tabulate(N) { i => + Row( + java.sql.Timestamp.valueOf("1001-01-01 01:02:03.123456"), + java.sql.Timestamp.valueOf(s"1001-01-0${i + 1} 01:02:03.123456")) + }) + } } } } From 0add1a22dfff851bae71d9330f984ace41e5663f Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Mon, 25 May 2020 09:57:25 +0300 Subject: [PATCH 09/11] Add comments --- .../sql/execution/datasources/parquet/ParquetIOSuite.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 04d68c9a5fa95..250ea4c29e4cf 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -876,7 +876,9 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } - ignore("generate test files for checking compatibility with Spark 2.4") { + // It generates input files for the test below: + // "SPARK-31159: compatibility with Spark 2.4 in reading dates/timestamps" + ignore("SPARK-31806: generate test files for checking compatibility with Spark 2.4") { val resourceDir = "sql/core/src/test/resources/test-data" val version = "2_4_5" val N = 8 From 4419760694fbdd5b5a922a79c52b2ac9c492e053 Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Mon, 25 May 2020 19:32:09 +0300 Subject: [PATCH 10/11] Don't set rebase in write in test input generation --- .../sql/execution/datasources/parquet/ParquetIOSuite.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 250ea4c29e4cf..a735886af6673 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -902,9 +902,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession } } DateTimeTestUtils.withDefaultTimeZone(DateTimeTestUtils.LA) { - withSQLConf( - SQLConf.SESSION_LOCAL_TIMEZONE.key -> DateTimeTestUtils.LA.getId, - SQLConf.LEGACY_PARQUET_REBASE_MODE_IN_WRITE.key -> "CORRECTED") { + withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> DateTimeTestUtils.LA.getId) { save( (1 to N).map(i => ("1001-01-01", s"1001-01-0$i")), "date", From 3f2b474d8e19a89ee0b1f19183b8ac1c48f441df Mon Sep 17 00:00:00 2001 From: Max Gekk Date: Mon, 25 May 2020 19:32:34 +0300 Subject: [PATCH 11/11] Address Wenchen's review comment --- .../sql/execution/datasources/parquet/ParquetIOSuite.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index a735886af6673..79c32976f02ec 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -918,6 +918,11 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession save(usTs, "timestamp", s"before_1582_timestamp_micros_v$version.snappy.parquet") } withSQLConf(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key -> "INT96") { + // Comparing to other logical types, Parquet-MR chooses dictionary encoding for the + // INT96 logical type because it consumes less memory for small column cardinality. + // Huge parquet files doesn't make sense to place to the resource folder. That's why + // we explicitly set `parquet.enable.dictionary` and generate two files w/ and w/o + // dictionary encoding. save( usTs, "timestamp",