Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ object SQLConf {
val ORC_FILTER_PUSHDOWN_ENABLED = buildConf("spark.sql.orc.filterPushdown")
.doc("When true, enable filter pushdown for ORC files.")
.booleanConf
.createWithDefault(false)
.createWithDefault(true)

val HIVE_VERIFY_PARTITION_PATH = buildConf("spark.sql.hive.verifyPartitionPath")
.doc("When true, check all the partition paths under the table\'s root directory " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,19 +65,17 @@ class OrcHadoopFsRelationSuite extends HadoopFsRelationTest {
test("SPARK-12218: 'Not' is included in ORC filter pushdown") {
import testImplicits._

withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") {
withTempPath { dir =>
val path = s"${dir.getCanonicalPath}/table1"
(1 to 5).map(i => (i, (i % 2).toString)).toDF("a", "b").write.orc(path)

checkAnswer(
spark.read.orc(path).where("not (a = 2) or not(b in ('1'))"),
(1 to 5).map(i => Row(i, (i % 2).toString)))

checkAnswer(
spark.read.orc(path).where("not (a = 2 and b in ('1'))"),
(1 to 5).map(i => Row(i, (i % 2).toString)))
}
withTempPath { dir =>
val path = s"${dir.getCanonicalPath}/table1"
(1 to 5).map(i => (i, (i % 2).toString)).toDF("a", "b").write.orc(path)

checkAnswer(
spark.read.orc(path).where("not (a = 2) or not(b in ('1'))"),
(1 to 5).map(i => Row(i, (i % 2).toString)))

checkAnswer(
spark.read.orc(path).where("not (a = 2 and b in ('1'))"),
(1 to 5).map(i => Row(i, (i % 2).toString)))
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -389,53 +389,51 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {

test("SPARK-10623 Enable ORC PPD") {
withTempPath { dir =>
withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") {
import testImplicits._
val path = dir.getCanonicalPath

// For field "a", the first column has odds integers. This is to check the filtered count
// when `isNull` is performed. For Field "b", `isNotNull` of ORC file filters rows
// only when all the values are null (maybe this works differently when the data
// or query is complicated). So, simply here a column only having `null` is added.
val data = (0 until 10).map { i =>
val maybeInt = if (i % 2 == 0) None else Some(i)
val nullValue: Option[String] = None
(maybeInt, nullValue)
}
// It needs to repartition data so that we can have several ORC files
// in order to skip stripes in ORC.
createDataFrame(data).toDF("a", "b").repartition(10).write.orc(path)
val df = spark.read.orc(path)

def checkPredicate(pred: Column, answer: Seq[Row]): Unit = {
val sourceDf = stripSparkFilter(df.where(pred))
val data = sourceDf.collect().toSet
val expectedData = answer.toSet

// When a filter is pushed to ORC, ORC can apply it to rows. So, we can check
// the number of rows returned from the ORC to make sure our filter pushdown work.
// A tricky part is, ORC does not process filter rows fully but return some possible
// results. So, this checks if the number of result is less than the original count
// of data, and then checks if it contains the expected data.
assert(
sourceDf.count < 10 && expectedData.subsetOf(data),
s"No data was filtered for predicate: $pred")
}
import testImplicits._
val path = dir.getCanonicalPath

checkPredicate('a === 5, List(5).map(Row(_, null)))
checkPredicate('a <=> 5, List(5).map(Row(_, null)))
checkPredicate('a < 5, List(1, 3).map(Row(_, null)))
checkPredicate('a <= 5, List(1, 3, 5).map(Row(_, null)))
checkPredicate('a > 5, List(7, 9).map(Row(_, null)))
checkPredicate('a >= 5, List(5, 7, 9).map(Row(_, null)))
checkPredicate('a.isNull, List(null).map(Row(_, null)))
checkPredicate('b.isNotNull, List())
checkPredicate('a.isin(3, 5, 7), List(3, 5, 7).map(Row(_, null)))
checkPredicate('a > 0 && 'a < 3, List(1).map(Row(_, null)))
checkPredicate('a < 1 || 'a > 8, List(9).map(Row(_, null)))
checkPredicate(!('a > 3), List(1, 3).map(Row(_, null)))
checkPredicate(!('a > 0 && 'a < 3), List(3, 5, 7, 9).map(Row(_, null)))
// For field "a", the first column has odds integers. This is to check the filtered count
// when `isNull` is performed. For Field "b", `isNotNull` of ORC file filters rows
// only when all the values are null (maybe this works differently when the data
// or query is complicated). So, simply here a column only having `null` is added.
val data = (0 until 10).map { i =>
val maybeInt = if (i % 2 == 0) None else Some(i)
val nullValue: Option[String] = None
(maybeInt, nullValue)
}
// It needs to repartition data so that we can have several ORC files
// in order to skip stripes in ORC.
createDataFrame(data).toDF("a", "b").repartition(10).write.orc(path)
val df = spark.read.orc(path)

def checkPredicate(pred: Column, answer: Seq[Row]): Unit = {
val sourceDf = stripSparkFilter(df.where(pred))
val data = sourceDf.collect().toSet
val expectedData = answer.toSet

// When a filter is pushed to ORC, ORC can apply it to rows. So, we can check
// the number of rows returned from the ORC to make sure our filter pushdown work.
// A tricky part is, ORC does not process filter rows fully but return some possible
// results. So, this checks if the number of result is less than the original count
// of data, and then checks if it contains the expected data.
assert(
sourceDf.count < 10 && expectedData.subsetOf(data),
s"No data was filtered for predicate: $pred")
}

checkPredicate('a === 5, List(5).map(Row(_, null)))
checkPredicate('a <=> 5, List(5).map(Row(_, null)))
checkPredicate('a < 5, List(1, 3).map(Row(_, null)))
checkPredicate('a <= 5, List(1, 3, 5).map(Row(_, null)))
checkPredicate('a > 5, List(7, 9).map(Row(_, null)))
checkPredicate('a >= 5, List(5, 7, 9).map(Row(_, null)))
checkPredicate('a.isNull, List(null).map(Row(_, null)))
checkPredicate('b.isNotNull, List())
checkPredicate('a.isin(3, 5, 7), List(3, 5, 7).map(Row(_, null)))
checkPredicate('a > 0 && 'a < 3, List(1).map(Row(_, null)))
checkPredicate('a < 1 || 'a > 8, List(9).map(Row(_, null)))
checkPredicate(!('a > 3), List(1, 3).map(Row(_, null)))
checkPredicate(!('a > 0 && 'a < 3), List(3, 5, 7, 9).map(Row(_, null)))
}
}

Expand Down Expand Up @@ -511,63 +509,55 @@ class OrcQuerySuite extends QueryTest with BeforeAndAfterAll with OrcTest {
}

test("SPARK-14962 Produce correct results on array type with isnotnull") {
withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") {
val data = (0 until 10).map(i => Tuple1(Array(i)))
withOrcFile(data) { file =>
val actual = spark
.read
.orc(file)
.where("_1 is not null")
val expected = data.toDF()
checkAnswer(actual, expected)
}
val data = (0 until 10).map(i => Tuple1(Array(i)))
withOrcFile(data) { file =>
val actual = spark
.read
.orc(file)
.where("_1 is not null")
val expected = data.toDF()
checkAnswer(actual, expected)
}
}

test("SPARK-15198 Support for pushing down filters for boolean types") {
withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") {
val data = (0 until 10).map(_ => (true, false))
withOrcFile(data) { file =>
val df = spark.read.orc(file).where("_2 == true")
val actual = stripSparkFilter(df).count()

// ORC filter should be applied and the total count should be 0.
assert(actual === 0)
}
val data = (0 until 10).map(_ => (true, false))
withOrcFile(data) { file =>
val df = spark.read.orc(file).where("_2 == true")
val actual = stripSparkFilter(df).count()

// ORC filter should be applied and the total count should be 0.
assert(actual === 0)
}
}

test("Support for pushing down filters for decimal types") {
withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") {
val data = (0 until 10).map(i => Tuple1(BigDecimal.valueOf(i)))
withTempPath { file =>
// It needs to repartition data so that we can have several ORC files
// in order to skip stripes in ORC.
createDataFrame(data).toDF("a").repartition(10).write.orc(file.getCanonicalPath)
val df = spark.read.orc(file.getCanonicalPath).where("a == 2")
val actual = stripSparkFilter(df).count()

assert(actual < 10)
}
val data = (0 until 10).map(i => Tuple1(BigDecimal.valueOf(i)))
withTempPath { file =>
// It needs to repartition data so that we can have several ORC files
// in order to skip stripes in ORC.
createDataFrame(data).toDF("a").repartition(10).write.orc(file.getCanonicalPath)
val df = spark.read.orc(file.getCanonicalPath).where("a == 2")
val actual = stripSparkFilter(df).count()

assert(actual < 10)
}
}

test("Support for pushing down filters for timestamp types") {
withSQLConf(SQLConf.ORC_FILTER_PUSHDOWN_ENABLED.key -> "true") {
val timeString = "2015-08-20 14:57:00"
val data = (0 until 10).map { i =>
val milliseconds = Timestamp.valueOf(timeString).getTime + i * 3600
Tuple1(new Timestamp(milliseconds))
}
withTempPath { file =>
// It needs to repartition data so that we can have several ORC files
// in order to skip stripes in ORC.
createDataFrame(data).toDF("a").repartition(10).write.orc(file.getCanonicalPath)
val df = spark.read.orc(file.getCanonicalPath).where(s"a == '$timeString'")
val actual = stripSparkFilter(df).count()

assert(actual < 10)
}
val timeString = "2015-08-20 14:57:00"
val data = (0 until 10).map { i =>
val milliseconds = Timestamp.valueOf(timeString).getTime + i * 3600
Tuple1(new Timestamp(milliseconds))
}
withTempPath { file =>
// It needs to repartition data so that we can have several ORC files
// in order to skip stripes in ORC.
createDataFrame(data).toDF("a").repartition(10).write.orc(file.getCanonicalPath)
val df = spark.read.orc(file.getCanonicalPath).where(s"a == '$timeString'")
val actual = stripSparkFilter(df).count()

assert(actual < 10)
}
}

Expand Down