-
Notifications
You must be signed in to change notification settings - Fork 3k
Spark MERGE INTO Support (copy-on-write implementation) #1947
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Thanks @dilipbiswal! I'll take a closer look at this tomorrow. |
|
Ack. |
| import org.apache.spark.sql.catalyst.plans.logical.MergeIntoProcessor | ||
| import org.apache.spark.sql.execution.{SparkPlan, UnaryExecNode} | ||
|
|
||
| case class MergeIntoExec(mergeIntoProcessor: MergeIntoProcessor, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this node? It seems we rewrite the operation into ReplaceData, no?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Well, I overlooked that we use MergeInto node in RewriteMergeInto.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder whether we can use MapPartitions directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that MergeIntoProcessor and this node should be merged. That's really a physical plan node and it is strange how it is created and passed through the logical plan.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree with that. I think we can address this in the end. This bit is working and I'd focus on other things for now.
| object RewriteMergeInto extends Rule[LogicalPlan] | ||
| with PredicateHelper | ||
| with Logging { | ||
| val ROW_ID_COL = "_row_id_" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: these vals can be private
|
I think this PR is a great start, @dilipbiswal! I noted the following points that we need to address for correctness (some may be done separately):
There are also good to have points (can be done in follow-ups if too much trouble):
Let's discuss each point one by one. Cardinality check SQL standard requires an exception to be thrown if the ON clause in MERGE is such that more than 1 row in source matches a row in target. See this Hive issue for more info. Some systems do the cardinality check all the time while some, like Hive, make it optional. I'd say we should make it optional and let users configure it in the table properties by adding To sum up, I'd vote for having a flag in table properties and make the cardinality check optional (just like Hive ACID). We need to think a bit about how we implement the cardinality check. Here, I am open to suggestions. One idea is to modify nodes for dynamic file filtering. For example, we can use One way to do that is to leverage an accumulator to track matching files:
Another way is like this:
Align assignments I don't think Spark aligns the assignments inside UPDATE or MERGE. We won't be able to support updating nested fields without it. We will probably need a separate rule for this. The same rule can be applied to UPDATE. Group data before writing We need to think about how to group data before writing new files with our updates and new records. One option is to group and order by partition columns. Another option is to group and order by the sort spec. The third option is to group updates and new records separately. Let's discuss it. |
|
BTW, I can work on some of these items in parallel so that we finish this ealier. |
|
One thing we've been talking about it a bit is whether or not it would be useful to tune the write portion of this. For example it may be helpful to have an independent parameter for tuning the shuffle parameters when grouping the results before writing. This probably would be good to tune with a non-spark parameter so that users can customize it apart from the normal spark.sql.shuffle parameter. |
|
@aokolnychyi @RussellSpitzer |
| // Find the files in target that matches the JOIN condition from source. | ||
| val targetOutputCols = target.output | ||
| val newProjectCols = target.output ++ Seq(Alias(InputFileName(), FILE_NAME_COL)()) | ||
| val newTargetTable = Project(newProjectCols, target) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be helpful to group some of these plan nodes into sections, like in RewriteDelete where methods like buildFileFilterPlan and buildScanPlan give good context for what plans are being constructed and how they will be used.
spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
Outdated
Show resolved
Hide resolved
| notMatchedConditions: Seq[Expression], | ||
| notMatchedOutputs: Seq[Seq[Expression]], | ||
| targetOutput: Seq[Expression], | ||
| joinedAttributes: Seq[Attribute]) extends Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is essentially a physical plan node that is linked into both the physical plan and logical plan. I think it should be a normal physical plan node that is created in a strategy, just like other plans.
The main issue with the way this PR currently works is that it doesn't delegate enough to the rest of the Spark planner. All of the analysis is done during rewrite in the optimizer, for example. I think that this should be broken up into analysis rules to validate and update the MergeInto plan, the rewrite rule to build the optimizations and join, and a strategy to convert the logical plan into a MergeIntoExec. I think this should also have a validation rule that checks each action to ensure that the expressions for that action are correctly resolved.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue Can you please explain the idea bit more, specifically the should be broken up into analysis rules to validate and update MergeInto plan . Currently, we produce the MergeInto logical plan in the optimizer phase ? So we have gone past analysis at this point right ? The input SQL has already been parsed and resolved using MergeIntoTable by spark at this point i.e all the mergeinto inputs have been resolved ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to make sure there are analysis rules that guarantee the assumptions in this class. One possible issue that jumped out to both @aokolnychyi and I was that this assumes the expressions for insert and update actions are correct for the output of this node. We need to make sure that is the case.
Originally, I asked on Slack how that validation was being done, but I saw Anton's comment about it and I thought that probably meant that it isn't being done. If there are already rules in Spark to resolve and validate the plan, then that's great but we need to identify them and make a note here that we're relying on those for correctness. I still suspect that there aren't rules in Spark doing this because this is running the analyzer on expressions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for pointing me to the code! Looks like I was looking into it at the time you were writing this, which is why my comment below was just a bit later. I think we're all on the same page now.
|
I agree with all of the points that @aokolnychyi brought up. I also have a few suggestions on how to do this more cleanly.
Please take a look at #1955. That exposes
Agreed. I think that we should have a rule similar to the logic in And we should also have a Modifying and checking the logical plan in the analyzer like this will require analyzer rules and a logical plan that doesn't contain
Definitely. |
|
@rdblue Thanks for the comments. I will process them and get back with any questions. |
|
I can give it a try with and contribute rules that would align assignments + port our tests. It would be great if @dilipbiswal could work on the cardinality check and grouping of records on write. Once these are done, we can look into changing How does that sound? |
That is a great PR, let's get it in today.
We have this option internally and it works well in some cases. There are a few things we need to be careful about, though. First, Spark will do a skew estimation step and the actual shuffle using two separate jobs. We don't want to recompute the merge join twice. Internally, we add a repartition stage after the join if a global sort on write is requested. While it does help a bit, it is not ideal. We have seen cases where the sort on write is by far the most expensive step of MERGE. Second, even when we do a global sort, the layout within partitions won't be ideal. So people will most likely have to compact again making the global sort during MERGE redundant. That's why we have to be careful about a global sort by default. I think this ultimately depends on the use case. Shall we make this configurable in table properties? How many query engines will follow it? Should that config be At the same time, if we don't do the global sort, we may end up having too many small files after the operation. We can consider doing a repartition by the partition columns and sorting by the sort key but that will suffer if we have a lot of data for a single partition. It would be great to know the number of files and the size of data we need to rewrite per partition to make a good decision here. To sum up this case,
Sorting updated records by |
Sounds good to me Anton. |
|
@rdblue @aokolnychyi Ryan/Anton, can you tell me what do we do in terms of partitioning and sorting for CTAS and INSERT ... INTO SELECT FROM .. case today ? |
The full outer join probably requires shuffling data, which means that it will be distributed by the MATCH expression. There's no guarantee that the match expression is aligned with the table partitioning. If it isn't, then writing without a sort would introduce a ton of small files because each task would be writing to each output partition. To avoid the small files problem, we need to repartition. If we repartition by just the partition expressions from the table, there is a good chance of producing a plan with too few tasks in the write stage because Spark can't split tasks for the same key. This is what introduces the skew. To avoid that, we can use a global sort to plan tasks that are balanced. A global sort is a best practice for writing anyway because it clusters data for faster reads. |
|
@aokolnychyi, I agree with the idea to have a flag to disable global sort. Probably best to do this specific to copy-on-write because delta writes will need to be sorted by For sorting by |
|
I looked into resolution and there is a rule in Spark: https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala#L1682-L1710 Looks like if the assignments are out of order or a subset of the output columns, the expressions are left as-is. If there are no assignments, then the source table's columns are used to set the output columns by position, using an We will need an analyzer rule that fills in the missing assignments for update, checks the order of assignments by name, and validates that inserts are complete. I also think that this rule should convert to a different MergeInto logical plan. The plan in Spark is not sufficient because it considers the plan resolved when assignments are resolved, not when the assignments actually produce the expected output. That's strange because resolution produces assignments when there aren't any, but allows them to be missing when some are present. |
|
I'll cover the missing rule while @dilipbiswal is working on the cardinality check and grouping on write. |
|
Okay, seems like we agree on doing a global sort (after an extra round-robin partitioning to make sure we don't execute the merge join twice) and having a table property under
I think this is promising if we can easily nullify |
|
Is there enough consensus on making the cardinality check optional to match Hive and to avoid an extra inner join for merge-on-read? I think it should be enabled by default to prevent correctness problems. I don't think we agreed on how to implement the cardinality check. I had some thoughts in this comment. @dilipbiswal @rdblue @RussellSpitzer, what is your take on this? How do you see it is implemented? @RussellSpitzer did mention a corner case where the accumulator approach consumes a lot of memory on the driver (if each executor has a substantially large set of unique files and they are brought to the driver and merged into a single set, which leads to basically having the same copies many times). I am not sure we can overcome it, though. |
|
@aokolnychyi Hey Anton, sorry, i haven't had. chance to work on this in last couple of days. I will be looking at it from tomorrow/wednesday. Firstly. i like the option of making the count check optional. In our use case mostly we will keep the count check off as we strictly control the merge statement we issue. About count implementation Anton, i was thinking to implement it without the optimization as a first cut and optimize it in a follow-up. The reason is, implementing the "first-cut" will not take much time. So all the time we will spend is to implement the follow-up pr to optimize. That way, we can discuss the approaches in a targeted fashion in that PR. wdyt ? |
If I understand correctly, it is actually easier to do the optimization now because the optimization only requires changes in merge-on-read. Anton said this:
Since we are currently only implementing copy-on-write, I think it will be easier to do the cardinality check in the existing inner join. |
Oh.. since we have two options to choose from and were discussing which option to choose.. i thought doing a count check as a side thing (basically does the join twice) and raise an error as a start. But if we can pick one option between the two proposals now, i can give a try to implement it. |
|
I have the rule locally, @dilipbiswal @rdblue. Adding some tests and will submit a PR. |
|
I’ve been thinking about grouping of data on write during copy-on-write operations (merge-on-read is a different story). Right now, we only have a sort order in the table metadata. However, we will probably add a way to represent distribution since Spark will have such a concept. I think global and local sorts don’t address all use cases. We will want to request hash distribution on write in some cases (it is cheaper than the global sort and works well if the data size per partition is small and does not have to be split into multiple tasks). This applies to inserts as well as to other operations like updates. Since there will be a concept of distribution controlled by the user, the idea of leveraging both the distribution and sort order during row-level operations seems promising to me. DELETE Delete is an operation that does not change the order of data so we should be fine with just file and pos metadata columns. In master, we do a global sort by file and pos that is the most expensive option. I think we can switch to hash-partitioning by file and local sort by file and pos. Yes, a global sort would co-locate files from same partitions next to each other but I don’t think it is worth the price of the range-based shuffle. I’d be in favor of faster deletes and doing a compaction later instead of doing a global sort during deletes. The global sort won’t eliminate the need for compacting and will make deletes more expensive which would increase the chances of concurrent conflicts. In addition, I’d offer a table property specific to copy-on-write deletes to disable the shuffle step. If people want to have even faster deletes by skipping the shuffle, we should let them do that. They will have to compact more aggressively. UPDATE Update is the first operation that potentially changes the order of data. That’s why we should take the distribution and order into account. Our intention here is to group/sort rows that did not change by file and pos to preserve their original ordering and apply the distribution and order to updated records. If the user asks for hash-based distribution during inserts, most likely he/she wants to apply it during updates too. I’d consider the following options:
MERGE Merge is similar to update. We should consider new and updated records together. |
|
@rdblue @aokolnychyi
|
c92a2d8 to
9cb2e86
Compare
| val matchingRowsPlanBuilder = (_: DataSourceV2ScanRelation) => | ||
| Join(source, newTargetTable, Inner, Some(cond), JoinHint.NONE) | ||
| // TODO - extract the local predicates that references the target from the join condition and | ||
| // pass to buildScanPlan to ensure push-down. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dilipbiswal, this extraction is already done in the pushFilters method that @aokolnychyi implemented for delete. That's one reason why this also passes down target.output. The filters that are pushed down are the ones that only reference those attributes:
val tableAttrSet = AttributeSet(tableAttrs)
val predicates = splitConjunctivePredicates(cond).filter(_.references.subsetOf(tableAttrSet))
if (predicates.nonEmpty) {
val normalizedPredicates = DataSourceStrategy.normalizeExprs(predicates, tableAttrs)
PushDownUtils.pushFilters(scanBuilder, normalizedPredicates)
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue Yeah.. i saw it Ryan. I checked the spark code and there is an additional check for deterministic status of the expression. Not sure for delete statement, we need this check or not ? Wanted to think through and discuss with you and Anton and thats why put a to-do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The only predicates that will be pushed are those that can be converted to Filter. I don't think any non-deterministic expressions can be converted so it should be fine.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
okay.
| import org.apache.spark.sql.types.StructType | ||
| import org.apache.spark.sql.util.CaseInsensitiveStringMap | ||
|
|
||
| trait PlanHelper extends PredicateHelper { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This file is no longer used, so it can be removed.
| } | ||
|
|
||
| private def getClauseCondition(clause: MergeAction): Expression = { | ||
| clause.condition.getOrElse(Literal(true)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can use TRUE_LITERAL.
spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
Outdated
Show resolved
Hide resolved
spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/optimizer/RewriteMergeInto.scala
Outdated
Show resolved
Hide resolved
| } | ||
|
|
||
| private def buildFileFilterPlan(matchingRowsPlan: LogicalPlan): LogicalPlan = { | ||
| // TODO: For merge-into make sure _file is resolved only from target table. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can solve this problem by passing the target table attrs from the DataSourceV2ScanRelation:
val matchingFilePlan = buildFileFilterPlan(scanRelation.output, matchingRowsPlanBuilder(scanRelation))
...
private def buildFileFilterPlan(tableAttrs: Seq[AttributeReference], matchingRowsPlan: LogicalPlan): LogicalPlan = {
val fileAttr = findOutputAttr(tableAttrs, FILE_NAME_COL)
val agg = Aggregate(Seq(fileAttr), Seq(fileAttr), matchingRowsPlan)
Project(Seq(findOutputAttr(agg.output, FILE_NAME_COL)), agg)
}
protected def findOutputAttr(attrs: Seq[Attribute], attrName: String): Attribute = {
attrs.find(attr => resolver(attr.name, attrName)).getOrElse {
throw new AnalysisException(s"Cannot find $attrName in $attrs")
}
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue Don't we have an issue of the target table has a column named "_file" ? I was thinking we may need a way to solve it by creating a distinct co-relation name if _file is existing in the target relation's output ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should be fine. We should throw an exception if the table has a _file column, but that's something we can do later.
...ions/src/main/scala/org/apache/spark/sql/catalyst/utils/RewriteRowLevelOperationHelper.scala
Outdated
Show resolved
Hide resolved
...-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeIntoExec.scala
Outdated
Show resolved
Hide resolved
...-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeIntoExec.scala
Outdated
Show resolved
Hide resolved
spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeIntoTable.java
Outdated
Show resolved
Hide resolved
spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeIntoTable.java
Show resolved
Hide resolved
spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeIntoTable.java
Outdated
Show resolved
Hide resolved
spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeIntoTable.java
Outdated
Show resolved
Hide resolved
spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeIntoTable.java
Outdated
Show resolved
Hide resolved
spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeIntoTable.java
Outdated
Show resolved
Hide resolved
...-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeIntoExec.scala
Show resolved
Hide resolved
|
|
||
| // rewrite all operations that require reading the table to delete records | ||
| case DeleteFromTable(r: DataSourceV2Relation, Some(cond)) => | ||
| case DeleteFromTable(r: DataSourceV2Relation, optionalCond @ Some(cond)) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@dilipbiswal, this can be reverted as well.
| // In above case, when id = 5, it applies both that matched predicates. In this | ||
| // case the first one we see is applied. | ||
| // | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: no need for an empty comment and an empty line.
| append(targetName, new Employee(2, "emp-id-two"), new Employee(6, "emp-id-6")); | ||
| append(sourceName, new Employee(2, "emp-id-3"), new Employee(1, "emp-id-2"), new Employee(5, "emp-id-6")); | ||
| String sourceCTE = "WITH cte1 AS (SELECT id + 1 AS id, dep FROM source)"; | ||
| String sqlText = sourceCTE + " " + "MERGE INTO %s AS target " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: it looks like there are unnecessary string literals. " " + "MERGE ..." can be updated to " MERGE ...".
|
@rdblue @aokolnychyi Thanks for the detailed review and all the help !! |
Co-authored-by: Dilip Biswal <dbiswal@adobe.com>
Plan: