Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2063,16 +2063,17 @@ object SQLConf {
.booleanConf
.createWithDefault(true)

val NESTED_PREDICATE_PUSHDOWN_ENABLED =
buildConf("spark.sql.optimizer.nestedPredicatePushdown.enabled")
.internal()
.doc("When true, Spark tries to push down predicates for nested columns and or names " +
"containing `dots` to data sources. Currently, Parquet implements both optimizations " +
"while ORC only supports predicates for names containing `dots`. The other data sources" +
"don't support this feature yet.")
val NESTED_PREDICATE_PUSHDOWN_FILE_SOURCE_LIST =
buildConf("spark.sql.optimizer.nestedPredicatePushdown.supportedFileSources")
.internal()
.doc("A comma-separated list of data source short names or fully qualified data source " +
"implementation class names for which Spark tries to push down predicates for nested " +
"columns and/or names containing `dots` to data sources. Currently, Parquet implements " +
"both optimizations while ORC only supports predicates for names containing `dots`. The " +
"other data sources don't support this feature yet. So the default value is 'parquet,orc'.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems we decided to only make this configuration effective against DSv1, which seems okay because only DSv1 will have compatibility issues.

But shell we at least explicitly mention that this configuration is only effective with DSv1, (or make this configuration effective against DSv2)? Seems like it's going to be confusing to both end users or developers.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think DSv2 API supposes nested column capacity like pushdown and pruning, so we only need to deal with DSv1 compatibility issues here. Precisely, file source.

I will create a simple followup to refine the doc of this configuration for this point. Thanks.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks!

.version("3.0.0")
.booleanConf
.createWithDefault(true)
.stringConf
Copy link
Member

@maropu maropu Apr 29, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need .transform(_.toUpperCase(Locale.ROOT))? Also, could we validate input by checkValues? btw, is this feature expected to cover custom data sources except for the prebuilt ones (parquet, orc, ...)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We compare this list with toLowerCase when we need it. So seems to be fine to leave it here. Another similar example is spark.sql.sources.useV1SourceList. And as useV1SourceList too, seems checkValues is not needed.

Currently I think it is safer to assume custom data sources don't support this feature. I actually also think if custom data source wants to support it, it is better to adapt data source v2.

We don't have a common API for v1 data sources that tells if it supports nested predicate pushdown. If we really want to allow custom v1 data sources have that, we can consider adding one common v1 API for the purpose. But, again, seems to me that we will encourage adapting v2 instead adding new things to v1.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently I think it is safer to assume custom data sources don't support this feature.

Yea, +1 on your thought.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Currently I think it is safer to assume custom data sources don't support this feature.

Looks fine. @dbtsai are you good with it? Do you have use cases that need nested predicate pushdown for non-file-source?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In v1, we don't have any use-case for supporting it in custom data source. I'm good with it.

.createWithDefault("parquet,orc")

val SERIALIZER_NESTED_SCHEMA_PRUNING_ENABLED =
buildConf("spark.sql.optimizer.serializer.nestedSchemaPruning.enabled")
Expand Down Expand Up @@ -3098,8 +3099,6 @@ class SQLConf extends Serializable with Logging {

def nestedSchemaPruningEnabled: Boolean = getConf(NESTED_SCHEMA_PRUNING_ENABLED)

def nestedPredicatePushdownEnabled: Boolean = getConf(NESTED_PREDICATE_PUSHDOWN_ENABLED)

def serializerNestedSchemaPruningEnabled: Boolean =
getConf(SERIALIZER_NESTED_SCHEMA_PRUNING_ENABLED)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,10 @@ case class FileSourceScanExec(
}

@transient
private lazy val pushedDownFilters = dataFilters.flatMap(DataSourceStrategy.translateFilter)
private lazy val pushedDownFilters = {
val supportNestedPredicatePushdown = DataSourceUtils.supportNestedPredicatePushdown(relation)
dataFilters.flatMap(DataSourceStrategy.translateFilter(_, supportNestedPredicatePushdown))
}

override lazy val metadata: Map[String, String] = {
def seqToString(seq: Seq[Any]) = seq.mkString("[", ", ", "]")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,60 +448,62 @@ object DataSourceStrategy {
}
}

private def translateLeafNodeFilter(predicate: Expression): Option[Filter] = predicate match {
case expressions.EqualTo(PushableColumn(name), Literal(v, t)) =>
private def translateLeafNodeFilter(
predicate: Expression,
pushableColumn: PushableColumnBase): Option[Filter] = predicate match {
case expressions.EqualTo(pushableColumn(name), Literal(v, t)) =>
Some(sources.EqualTo(name, convertToScala(v, t)))
case expressions.EqualTo(Literal(v, t), PushableColumn(name)) =>
case expressions.EqualTo(Literal(v, t), pushableColumn(name)) =>
Some(sources.EqualTo(name, convertToScala(v, t)))

case expressions.EqualNullSafe(PushableColumn(name), Literal(v, t)) =>
case expressions.EqualNullSafe(pushableColumn(name), Literal(v, t)) =>
Some(sources.EqualNullSafe(name, convertToScala(v, t)))
case expressions.EqualNullSafe(Literal(v, t), PushableColumn(name)) =>
case expressions.EqualNullSafe(Literal(v, t), pushableColumn(name)) =>
Some(sources.EqualNullSafe(name, convertToScala(v, t)))

case expressions.GreaterThan(PushableColumn(name), Literal(v, t)) =>
case expressions.GreaterThan(pushableColumn(name), Literal(v, t)) =>
Some(sources.GreaterThan(name, convertToScala(v, t)))
case expressions.GreaterThan(Literal(v, t), PushableColumn(name)) =>
case expressions.GreaterThan(Literal(v, t), pushableColumn(name)) =>
Some(sources.LessThan(name, convertToScala(v, t)))

case expressions.LessThan(PushableColumn(name), Literal(v, t)) =>
case expressions.LessThan(pushableColumn(name), Literal(v, t)) =>
Some(sources.LessThan(name, convertToScala(v, t)))
case expressions.LessThan(Literal(v, t), PushableColumn(name)) =>
case expressions.LessThan(Literal(v, t), pushableColumn(name)) =>
Some(sources.GreaterThan(name, convertToScala(v, t)))

case expressions.GreaterThanOrEqual(PushableColumn(name), Literal(v, t)) =>
case expressions.GreaterThanOrEqual(pushableColumn(name), Literal(v, t)) =>
Some(sources.GreaterThanOrEqual(name, convertToScala(v, t)))
case expressions.GreaterThanOrEqual(Literal(v, t), PushableColumn(name)) =>
case expressions.GreaterThanOrEqual(Literal(v, t), pushableColumn(name)) =>
Some(sources.LessThanOrEqual(name, convertToScala(v, t)))

case expressions.LessThanOrEqual(PushableColumn(name), Literal(v, t)) =>
case expressions.LessThanOrEqual(pushableColumn(name), Literal(v, t)) =>
Some(sources.LessThanOrEqual(name, convertToScala(v, t)))
case expressions.LessThanOrEqual(Literal(v, t), PushableColumn(name)) =>
case expressions.LessThanOrEqual(Literal(v, t), pushableColumn(name)) =>
Some(sources.GreaterThanOrEqual(name, convertToScala(v, t)))

case expressions.InSet(e @ PushableColumn(name), set) =>
case expressions.InSet(e @ pushableColumn(name), set) =>
val toScala = CatalystTypeConverters.createToScalaConverter(e.dataType)
Some(sources.In(name, set.toArray.map(toScala)))

// Because we only convert In to InSet in Optimizer when there are more than certain
// items. So it is possible we still get an In expression here that needs to be pushed
// down.
case expressions.In(e @ PushableColumn(name), list) if list.forall(_.isInstanceOf[Literal]) =>
case expressions.In(e @ pushableColumn(name), list) if list.forall(_.isInstanceOf[Literal]) =>
val hSet = list.map(_.eval(EmptyRow))
val toScala = CatalystTypeConverters.createToScalaConverter(e.dataType)
Some(sources.In(name, hSet.toArray.map(toScala)))

case expressions.IsNull(PushableColumn(name)) =>
case expressions.IsNull(pushableColumn(name)) =>
Some(sources.IsNull(name))
case expressions.IsNotNull(PushableColumn(name)) =>
case expressions.IsNotNull(pushableColumn(name)) =>
Some(sources.IsNotNull(name))
case expressions.StartsWith(PushableColumn(name), Literal(v: UTF8String, StringType)) =>
case expressions.StartsWith(pushableColumn(name), Literal(v: UTF8String, StringType)) =>
Some(sources.StringStartsWith(name, v.toString))

case expressions.EndsWith(PushableColumn(name), Literal(v: UTF8String, StringType)) =>
case expressions.EndsWith(pushableColumn(name), Literal(v: UTF8String, StringType)) =>
Some(sources.StringEndsWith(name, v.toString))

case expressions.Contains(PushableColumn(name), Literal(v: UTF8String, StringType)) =>
case expressions.Contains(pushableColumn(name), Literal(v: UTF8String, StringType)) =>
Some(sources.StringContains(name, v.toString))

case expressions.Literal(true, BooleanType) =>
Expand All @@ -518,8 +520,9 @@ object DataSourceStrategy {
*
* @return a `Some[Filter]` if the input [[Expression]] is convertible, otherwise a `None`.
*/
protected[sql] def translateFilter(predicate: Expression): Option[Filter] = {
translateFilterWithMapping(predicate, None)
protected[sql] def translateFilter(
predicate: Expression, supportNestedPredicatePushdown: Boolean): Option[Filter] = {
translateFilterWithMapping(predicate, None, supportNestedPredicatePushdown)
}

/**
Expand All @@ -529,11 +532,13 @@ object DataSourceStrategy {
* @param translatedFilterToExpr An optional map from leaf node filter expressions to its
* translated [[Filter]]. The map is used for rebuilding
* [[Expression]] from [[Filter]].
* @param nestedPredicatePushdownEnabled Whether nested predicate pushdown is enabled.
* @return a `Some[Filter]` if the input [[Expression]] is convertible, otherwise a `None`.
*/
protected[sql] def translateFilterWithMapping(
predicate: Expression,
translatedFilterToExpr: Option[mutable.HashMap[sources.Filter, Expression]])
translatedFilterToExpr: Option[mutable.HashMap[sources.Filter, Expression]],
nestedPredicatePushdownEnabled: Boolean)
: Option[Filter] = {
predicate match {
case expressions.And(left, right) =>
Expand All @@ -547,21 +552,26 @@ object DataSourceStrategy {
// Pushing one leg of AND down is only safe to do at the top level.
// You can see ParquetFilters' createFilter for more details.
for {
leftFilter <- translateFilterWithMapping(left, translatedFilterToExpr)
rightFilter <- translateFilterWithMapping(right, translatedFilterToExpr)
leftFilter <- translateFilterWithMapping(
left, translatedFilterToExpr, nestedPredicatePushdownEnabled)
rightFilter <- translateFilterWithMapping(
right, translatedFilterToExpr, nestedPredicatePushdownEnabled)
} yield sources.And(leftFilter, rightFilter)

case expressions.Or(left, right) =>
for {
leftFilter <- translateFilterWithMapping(left, translatedFilterToExpr)
rightFilter <- translateFilterWithMapping(right, translatedFilterToExpr)
leftFilter <- translateFilterWithMapping(
left, translatedFilterToExpr, nestedPredicatePushdownEnabled)
rightFilter <- translateFilterWithMapping(
right, translatedFilterToExpr, nestedPredicatePushdownEnabled)
} yield sources.Or(leftFilter, rightFilter)

case expressions.Not(child) =>
translateFilterWithMapping(child, translatedFilterToExpr).map(sources.Not)
translateFilterWithMapping(child, translatedFilterToExpr, nestedPredicatePushdownEnabled)
.map(sources.Not)

case other =>
val filter = translateLeafNodeFilter(other)
val filter = translateLeafNodeFilter(other, PushableColumn(nestedPredicatePushdownEnabled))
if (filter.isDefined && translatedFilterToExpr.isDefined) {
translatedFilterToExpr.get(filter.get) = predicate
}
Expand Down Expand Up @@ -608,8 +618,9 @@ object DataSourceStrategy {

// A map from original Catalyst expressions to corresponding translated data source filters.
// If a predicate is not in this map, it means it cannot be pushed down.
val supportNestedPredicatePushdown = DataSourceUtils.supportNestedPredicatePushdown(relation)
val translatedMap: Map[Expression, Filter] = predicates.flatMap { p =>
translateFilter(p).map(f => p -> f)
translateFilter(p, supportNestedPredicatePushdown).map(f => p -> f)
}.toMap

val pushedFilters: Seq[Filter] = translatedMap.values.toSeq
Expand Down Expand Up @@ -650,9 +661,10 @@ object DataSourceStrategy {
/**
* Find the column name of an expression that can be pushed down.
*/
object PushableColumn {
abstract class PushableColumnBase {
val nestedPredicatePushdownEnabled: Boolean

def unapply(e: Expression): Option[String] = {
val nestedPredicatePushdownEnabled = SQLConf.get.nestedPredicatePushdownEnabled
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits.MultipartIdentifierHelper
def helper(e: Expression): Option[Seq[String]] = e match {
case a: Attribute =>
Expand All @@ -668,3 +680,21 @@ object PushableColumn {
helper(e).map(_.quoted)
}
}

object PushableColumn {
def apply(nestedPredicatePushdownEnabled: Boolean): PushableColumnBase = {
if (nestedPredicatePushdownEnabled) {
PushableColumnAndNestedColumn
} else {
PushableColumnWithoutNestedColumn
}
}
}

object PushableColumnAndNestedColumn extends PushableColumnBase {
override val nestedPredicatePushdownEnabled = true
}

object PushableColumnWithoutNestedColumn extends PushableColumnBase {
override val nestedPredicatePushdownEnabled = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,16 @@

package org.apache.spark.sql.execution.datasources

import java.util.Locale

import org.apache.hadoop.fs.Path
import org.json4s.NoTypeHints
import org.json4s.jackson.Serialization

import org.apache.spark.sql.{SPARK_LEGACY_DATETIME, SPARK_VERSION_METADATA_KEY}
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.sources.BaseRelation
import org.apache.spark.sql.types._
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -68,6 +71,19 @@ object DataSourceUtils {
private[sql] def isDataFile(fileName: String) =
!(fileName.startsWith("_") || fileName.startsWith("."))

/**
* Returns if the given relation's V1 datasource provider supports nested predicate pushdown.
*/
private[sql] def supportNestedPredicatePushdown(relation: BaseRelation): Boolean =
relation match {
case hs: HadoopFsRelation =>
val supportedDatasources =
Utils.stringToSeq(SQLConf.get.getConf(SQLConf.NESTED_PREDICATE_PUSHDOWN_FILE_SOURCE_LIST)
.toLowerCase(Locale.ROOT))
supportedDatasources.contains(hs.toString)
case _ => false
}

def needRebaseDateTime(lookupFileMeta: String => String): Option[Boolean] = {
if (Utils.isTesting && SQLConf.get.getConfString("spark.test.forceNoRebase", "") == "true") {
return Some(false)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,11 @@ object FileSourceStrategy extends Strategy with Logging {
// Partition keys are not available in the statistics of the files.
val dataFilters =
normalizedFiltersWithoutSubqueries.filter(_.references.intersect(partitionSet).isEmpty)
logInfo(s"Pushed Filters: " +
s"${dataFilters.flatMap(DataSourceStrategy.translateFilter).mkString(",")}")
val supportNestedPredicatePushdown =
DataSourceUtils.supportNestedPredicatePushdown(fsRelation)
val pushedFilters = dataFilters
.flatMap(DataSourceStrategy.translateFilter(_, supportNestedPredicatePushdown))
logInfo(s"Pushed Filters: ${pushedFilters.mkString(",")}")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to have it propagated back so when an user does explain(true), the filters that are pushed down can be shown?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In FileSourceScanExec, this pushed down filters are shown there.


// Predicates with both partition keys and attributes need to be evaluated after the scan.
val afterScanFilters = filterSet -- partitionKeyFilters.filter(_.references.nonEmpty)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,8 +180,9 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
case OverwriteByExpression(r: DataSourceV2Relation, deleteExpr, query, writeOptions, _) =>
// fail if any filter cannot be converted. correctness depends on removing all matching data.
val filters = splitConjunctivePredicates(deleteExpr).map {
filter => DataSourceStrategy.translateFilter(deleteExpr).getOrElse(
throw new AnalysisException(s"Cannot translate expression to source filter: $filter"))
filter => DataSourceStrategy.translateFilter(deleteExpr,
supportNestedPredicatePushdown = true).getOrElse(
throw new AnalysisException(s"Cannot translate expression to source filter: $filter"))
}.toArray
r.table.asWritable match {
case v1 if v1.supports(TableCapability.V1_BATCH_WRITE) =>
Expand All @@ -205,7 +206,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
// correctness depends on removing all matching data.
val filters = DataSourceStrategy.normalizeExprs(condition.toSeq, output)
.flatMap(splitConjunctivePredicates(_).map {
f => DataSourceStrategy.translateFilter(f).getOrElse(
f => DataSourceStrategy.translateFilter(f, true).getOrElse(
throw new AnalysisException(s"Exec update failed:" +
s" cannot translate expression to source filter: $f"))
}).toArray
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ object PushDownUtils extends PredicateHelper {

for (filterExpr <- filters) {
val translated =
DataSourceStrategy.translateFilterWithMapping(filterExpr, Some(translatedFilterToExpr))
DataSourceStrategy.translateFilterWithMapping(filterExpr, Some(translatedFilterToExpr),
nestedPredicatePushdownEnabled = true)
if (translated.isEmpty) {
untranslatableExprs += filterExpr
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ object V2ScanRelationPushDown extends Rule[LogicalPlan] {

val wrappedScan = scan match {
case v1: V1Scan =>
val translated = filters.flatMap(DataSourceStrategy.translateFilter)
val translated = filters.flatMap(DataSourceStrategy.translateFilter(_, true))
V1ScanWrapper(v1, translated, pushedFilters)
case _ => scan
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,12 +48,12 @@ object ParquetNestedPredicatePushDownBenchmark extends SqlBasedBenchmark {
private def addCase(
benchmark: Benchmark,
inputPath: String,
enableNestedPD: Boolean,
enableNestedPD: String,
name: String,
withFilter: DataFrame => DataFrame): Unit = {
val loadDF = spark.read.parquet(inputPath)
benchmark.addCase(name) { _ =>
withSQLConf((SQLConf.NESTED_PREDICATE_PUSHDOWN_ENABLED.key, enableNestedPD.toString)) {
withSQLConf((SQLConf.NESTED_PREDICATE_PUSHDOWN_FILE_SOURCE_LIST.key, enableNestedPD)) {
withFilter(loadDF).noop()
}
}
Expand All @@ -67,13 +67,13 @@ object ParquetNestedPredicatePushDownBenchmark extends SqlBasedBenchmark {
addCase(
benchmark,
outputPath,
enableNestedPD = false,
enableNestedPD = "",
"Without nested predicate Pushdown",
withFilter)
addCase(
benchmark,
outputPath,
enableNestedPD = true,
enableNestedPD = "parquet",
"With nested predicate Pushdown",
withFilter)
benchmark.run()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -289,14 +289,26 @@ class DataSourceStrategySuite extends PlanTest with SharedSparkSession {
test("SPARK-31027 test `PushableColumn.unapply` that finds the column name of " +
"an expression that can be pushed down") {
attrInts.foreach { case (attrInt, colName) =>
assert(PushableColumn.unapply(attrInt) === Some(colName))
assert(PushableColumnAndNestedColumn.unapply(attrInt) === Some(colName))

if (colName.contains(".")) {
assert(PushableColumnWithoutNestedColumn.unapply(attrInt) === None)
} else {
assert(PushableColumnWithoutNestedColumn.unapply(attrInt) === Some(colName))
}
}
attrStrs.foreach { case (attrStr, colName) =>
assert(PushableColumn.unapply(attrStr) === Some(colName))
assert(PushableColumnAndNestedColumn.unapply(attrStr) === Some(colName))

if (colName.contains(".")) {
assert(PushableColumnWithoutNestedColumn.unapply(attrStr) === None)
} else {
assert(PushableColumnWithoutNestedColumn.unapply(attrStr) === Some(colName))
}
}

// `Abs(col)` can not be pushed down, so it returns `None`
assert(PushableColumn.unapply(Abs('col.int)) === None)
assert(PushableColumnAndNestedColumn.unapply(Abs('col.int)) === None)
}

/**
Expand All @@ -305,7 +317,7 @@ class DataSourceStrategySuite extends PlanTest with SharedSparkSession {
*/
def testTranslateFilter(catalystFilter: Expression, result: Option[sources.Filter]): Unit = {
assertResult(result) {
DataSourceStrategy.translateFilter(catalystFilter)
DataSourceStrategy.translateFilter(catalystFilter, true)
}
}
}
Loading