/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package org.apache.spark.sql.vectorized import org.apache.spark.{SparkException, TaskContext} import org.apache.spark.sql.{DataFrame, Encoders, QueryTest, SparkSession} import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.spark.sql.types.IntegerType class BugTest extends QueryTest with SharedSparkSession { /* test("no retries") { withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { val baseDf = spark.createDataset( Seq((1L, "a"), (2L, "b"), (3L, "c"), (null, "w"), (null, "x"), (null, "y"), (null, "z")))( Encoders.tupleEncoder(Encoders.LONG, Encoders.STRING)).toDF("pkLeft", "strleft") val leftOuter = baseDf.select( $"strleft", when(isnull($"pkLeft"), monotonically_increasing_id() + Literal(100)). otherwise($"pkLeft").as("pkLeft")) leftOuter.show(10000) val innerRight = spark.createDataset( Seq((1L, "11"), (2L, "22"), (3L, "33")))( Encoders.tupleEncoder(Encoders.LONG, Encoders.STRING)).toDF("pkRight", "strright") val innerjoin = leftOuter.join(innerRight, $"pkLeft" === $"pkRight", "inner") innerjoin.show(1000) val outerjoin = leftOuter.join(innerRight, $"pkLeft" === $"pkRight", "left_outer") outerjoin.show(1000) } } */ test("with retries") { withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false", SQLConf.SHUFFLE_PARTITIONS.key -> "2", "spark.task.maxFailures" -> "4", "spark.network.timeout" -> "100000s", "spark.shuffle.sort.bypassMergeThreshold" -> "1", "spark.sql.files.maxPartitionNum" -> "2", "spark.sql.files.minPartitionNum" -> "2") { withTable("outer", "inner") { createBaseTables(spark) val outerjoin: DataFrame = getOuterJoinDF(spark) println("Initial data") // outerjoin.show(100) val correctRows = outerjoin.collect() TaskContext.unset() for( i <- 0 until 100) { try { println("before query exec") TaskContext.setFailResult() val rowsAfterRetry = getOuterJoinDF(spark).collect() TaskContext.unsetFailResult() // import scala.jdk.CollectionConverters._ // val temp = spark.createDataFrame(rowsAfterRetry.toSeq.asJava, outerjoin.schema) // temp.show(100) if (correctRows.length != rowsAfterRetry.length) { println(s"encounterted test failure incorrect query result. run index = $i ") } assert(correctRows.length == rowsAfterRetry.length) val retriedResults = rowsAfterRetry.toBuffer correctRows.foreach(r => { val index = retriedResults.indexWhere(x => r.getString(0) == x.getString(0) && ( (r.getInt(1) < 2 && r.getInt(1) == x.getInt(1) && r.getInt(2) == x.getInt(2) && r.getString(3) == x.getString(3)) || (r.isNullAt(2) && r.isNullAt(3) && x.isNullAt(3) && x.isNullAt(2)) )) assert(index >= 0) retriedResults.remove(index) } ) assert(retriedResults.isEmpty) println(s"found successful query exec on iter index = $i") } catch { case se: SparkException if se.getMessage.contains("Please eliminate the" + " indeterminacy by checkpointing the RDD before repartition and try again") => println(s"correctly encountered exception on iter index = $i") // OK expected } } } } } /* def createBaseTables(spark: SparkSession): Unit = { spark.sql("drop table if exists outer ") spark.sql("drop table if exists inner ") val data = for (i <- 1L until 1000L) yield { val rem = i % 4 val tup = if (rem == 0) { java.lang.Long.valueOf(i) -> "aa" } else if (rem == 1) { java.lang.Long.valueOf(i) -> "bb" } else if (rem == 2) { java.lang.Long.valueOf(i) -> "cc" } else if (rem == 3) { val str = if (i % 3 == 0) { "aa" } else if (i % 3 == 1) { "bb" } else { "cc" } (null, str) } tup } val data1 = for (i <- 1L until 100L) yield { val rem = i % 3 if (rem == 0) { i -> "aa" } else if (rem == 1) { i -> "bb" } else if (rem == 2) { i -> "cc" } } val outerDf = spark.createDataset(data)( Encoders.tupleEncoder(Encoders.LONG, Encoders.STRING)).toDF("pkLeftt", "strleft") this.logInfo("saving outer table") outerDf.write.format("parquet").partitionBy("strleft").saveAsTable("outer") val innerDf = spark.createDataset(data1)( Encoders.tupleEncoder(Encoders.LONG, Encoders.STRING)).toDF("pkRight", "strright") this.logInfo("saving inner table") innerDf.write.format("parquet").partitionBy("strright").saveAsTable("inner") } */ def createBaseTables(spark: SparkSession): Unit = { spark.sql("drop table if exists outer ") spark.sql("drop table if exists inner ") val data = Seq((0, "aa"), (1, "aa"), (1, "aa"), (0, "aa"), (0, "aa"), (0, "aa"), (null, "bb"), (null, "bb"), (null, "bb"), (null, "bb"), (null, "bb"), (null, "bb") ) val data1 = Seq((0, "bb"), (1, "bb")) val outerDf = spark.createDataset(data)( Encoders.tupleEncoder(Encoders.INT, Encoders.STRING)).toDF("pkLeftt", "strleft") this.logInfo("saving outer table") outerDf.write.format("parquet").partitionBy("strleft").saveAsTable("outer") val innerDf = spark.createDataset(data1)( Encoders.tupleEncoder(Encoders.INT, Encoders.STRING)).toDF("pkRight", "strright") this.logInfo("saving inner table") innerDf.write.format("parquet").partitionBy("strright").saveAsTable("inner") } private def getOuterJoinDF(spark: SparkSession) = { import org.apache.spark.sql.functions.udf val myudf = udf(() => Counter.getHash()).asNondeterministic() spark.udf.register("myudf", myudf.asNondeterministic()) val leftOuter = spark.table("outer").select( col("strleft"), when(isnull(col("pkLeftt")), myudf(). cast(IntegerType)). otherwise(col("pkLeftt")).as("pkLeft")) val innerRight = spark.table("inner") val outerjoin = leftOuter.hint("SHUFFLE_HASH"). join(innerRight, col("pkLeft") === col("pkRight"), "left_outer") outerjoin } } object Counter { var counter = 0 var retVal = 12 def getHash(): Int = this.synchronized { counter += 1 val x = retVal if (counter % 6 == 0) { retVal += 1 } x } }