-
Notifications
You must be signed in to change notification settings - Fork 3k
Implement logic to group and sort rows before writing rows for MERGE INTO. #2022
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement logic to group and sort rows before writing rows for MERGE INTO. #2022
Conversation
|
@aokolnychyi @rdblue |
| } | ||
|
|
||
| def isCountCheckEnabled(table: Table): Boolean = { | ||
| // TODO - can we avoid the cast below ? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. The Spark table should have a properties map.
spark3-extensions/src/main/scala/org/apache/spark/sql/catalyst/utils/PlanHelper.scala
Outdated
Show resolved
Hide resolved
| val partitionExpressions = toCatalyst(iceTable.spec(), plan) | ||
| val sortExpressions = toCatalyst(iceTable.sortOrder(), plan, iceTable.schema()) | ||
| val numShufflePartitions = SQLConf.get.numShufflePartitions | ||
| (partitionExpressions.isEmpty, sortExpressions.isEmpty) match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think the logic here that always adds RepartitionByExpression is correct. If the sort is global, that will automatically add the repartition by expression required by that sort, right? And that might not be the same as the partition expressions. When performing a global sort, the partition expressions and sort must be merged to produce a sort that meets the sort requirements within each partition.
| } | ||
|
|
||
| def buildWritePlan(plan: LogicalPlan, table: Table, targetJoinAttrs: Seq[Attribute]): LogicalPlan = { | ||
| val iceTable = table.asInstanceOf[SparkTable].table() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this should cast the table. Instead, it should attempt to handle non-Iceberg tables and have a branch for Iceberg tables that adds the sort.
d69e834 to
fcc0e0d
Compare
| public static final String MERGE_WRITE_CARDINALITY_CHECK = "write.merge.cardinality-check.enabled"; | ||
| public static final boolean MERGE_WRITE_CARDINALITY_CHECK_DEFAULT = true; | ||
|
|
||
| public static final String MERGE_WRITE_SORT_MODE = "write.merge.sort.mode"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should consider this with the context from the thread about Flink write distribution.
| import org.apache.spark.sql.types.{AbstractDataType, DataType, Decimal, DecimalType, IntegerType, StringType, TimestampType} | ||
| import org.apache.spark.unsafe.types.UTF8String | ||
|
|
||
| abstract class IcebergTransformExpression |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you implement truncate or add a TODO for it?
| attr => cond.references.exists(attr.semanticEquals(_)) | ||
| ) | ||
| val writePlan = buildWritePlan(joinPlan, Project(target.output, joinPlan), target.table, joinKeysFromTarget) | ||
| val mergePlan = MergeInto(mergeParams, target, writePlan) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this adds the sort in buildWritePlan to the joined data, not the merged data. I think that will be less efficient than adding the sort after the merge for two reasons:
- Deleted rows will be included, only to be removed on the write nodes
- The joined row will contain both the target columns and source columns, so this would be shuffling both the original values and update values in a lot of cases when only the updated values are needed.
I think that it should be this instead:
val mergePlan = MergeInto(mergeParams, target, joinPlan)
val writePlan = buildWritePlan(mergePlan, table)
val batchWrite = ...
ReplaceData(target, batchWrite, writePlan)I think that will also allow you to remove some of the additional arguments from buildWritePlan.
| def buildWritePlan(childPlan: LogicalPlan, | ||
| planToResolveFrom: LogicalPlan, | ||
| table: Table, targetJoinAttrs: Seq[Attribute]): LogicalPlan = { | ||
| val iceTable = table.asInstanceOf[SparkTable].table() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than using asInstanceOf, I think this should return childPlan if the table isn't a SparkTable.
| Collection<SortField> sortFields = sortFieldIndex.get(field.sourceId()); | ||
| boolean isSorted = sortFields.stream().anyMatch(sortField -> | ||
| field.transform().equals(sortField.transform()) || | ||
| sortField.transform().satisfiesOrderOf(field.transform())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this would fit on one line if the indentation were fixed. A continuing indent should be 2 indents, 4 spaces.
| org.apache.iceberg.SortDirection direction, NullOrder nullOrder) { | ||
| return new OrderField(Expressions.apply( | ||
| "truncate", Expressions.column(fieldName), Expressions.literal(width)), | ||
| toSpark(direction), toSpark(nullOrder)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Indentation is off in this file as well.
| } | ||
| } | ||
|
|
||
| object BucketTransform { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
private?
|
|
||
| def toCatalyst(distribution: Distribution, | ||
| plan: LogicalPlan): Seq[catalyst.expressions.Expression] = { | ||
| val resolver = SQLConf.get.resolver |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to remember to get the resolver from the def resolver method when rebased.
| } | ||
| } | ||
|
|
||
| def toCatalyst(distribution: Distribution, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should probably be private as well.
| case dt: DaysTransform => | ||
| IcebergDayTransform(resolve(FieldReference(dt.ref.fieldNames()))) | ||
| case ht: HoursTransform => | ||
| IcebergHourTransform(resolve(FieldReference(ht.ref.fieldNames()))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of constructing FieldReference, I think this could just rewrite the resolve method to pass parts: Seq[String].
| table: Table, targetJoinAttrs: Seq[Attribute]): LogicalPlan = { | ||
| val iceTable = table.asInstanceOf[SparkTable].table() | ||
| val globalSortEnabled = isGlobalSortEnabled(table) | ||
| val partitionExpressions = toCatalyst(toRequiredDistribution(iceTable.spec), planToResolveFrom) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that if MergeInto is passed in, it will no longer be necessary to use planToResolveFrom. That could just be the child plan.
| return Distributions.ordered(convert(builder.build())); | ||
| } | ||
|
|
||
| public static Distribution toRequiredDistribution(PartitionSpec spec) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think that the name toRequiredDistribution makes sense for the context these methods are called in. I think it would be better to use toClusteredDistribution(PartitionSpec) and toOrderedDistribution(PartitionSpec, SortOrder, boolean).
| // locally sort the data based on the join key attributes. | ||
| if (targetJoinAttrs.nonEmpty) { | ||
| val repartition = RepartitionByExpression(targetJoinAttrs, childPlan, numShufflePartitions) | ||
| Sort(buildSortOrder(targetJoinAttrs), global = globalSortEnabled, repartition) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since globalSortEnabled is always false, it would be more clear for these cases to use global = false.
| } | ||
| case (_, true, false) => | ||
| // Only sort order is specified but no partition spec is defined. In this case | ||
| // Reparttion the data by sort order expression and then locally sort the data |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo: Reparttion -> Repartition
| iceTable.sortOrder(), false), planToResolveFrom).asInstanceOf[Seq[SortOrder]] | ||
| val dist = toRequiredDistribution(iceTable.spec(), | ||
| iceTable.sortOrder(), true) | ||
| val globalSortExprs: Seq[SortOrder] = toCatalyst(dist, planToResolveFrom).asInstanceOf[Seq[SortOrder]] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find this method really hard to follow. This creates expressions for every possible combination and then decides which ones to use.
The logic below also seems more complicated than it needs to be, I think because it uses a different order to check the possible combinations of isGlobal, isPartitioned and isSorted. For example, if isGlobal is true, but there is no ordering, then it needs to create an order rather than just not ordering. I was expecting something a bit simpler, like this:
val hasSortOrder = !table.sortOrder.isUnordered();
if (hasSortOrder) {
if (distributionMode == Sort) {
val order = toCatalyst(toOrderedDistribution(spec, order, true))
val roundRobin = Repartition(numShufflePartitions, shuffle = true, childPlan)
Sort(order, global = true, roundRobin)
} else if (distributionMode == Partition) {
val order = toCatalyst(order)
val hashPartitioned = RepartitionByExpression(toCatalyst(toClusteredDistribution(spec)), childPlan, numShufflePartitions)
Sort(order, global = false, hashPartitioned)
} else {
val order = toCatalyst(order)
Sort(order, global = false, childPlan)
}
} else if (distributionMode == Partition) {
RepartitionByExpression(toCatalyst(toClusteredDistribution(spec)), childPlan, numShufflePartitions)
} else {
childPlan
}That is a bit simpler because global ordering is only considered if the table has an ordering. It may still be a good idea to do what you're doing and create an order from join attributes, but I'd like to make sure that we're choosing to do that. The simplest option is to turn off sorting if there is no global order.
Another alternative if the "distribution mode" is to sort: we could sort by partition keys, and the join attributes. As long as we are using a range partitioner, we should try to minimize files created in each partition.
Also: in my code above, I added a round-robin shuffle before the range partitioner to avoid running the join and merge twice (once to calculate skew and once to produce records).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Following up on this, I think we should implement the proposal from our discussion about Flink hash distribution. That PR is going to add write.distribution-mode with 3 values: none, hash, and range. Here's what we would use them for here:
| Spark | none |
hash |
range |
|---|---|---|---|
| unordered | no distribution, locally sort by partition key | hash distribute by partition key, locally sort by partition key | range distribute by partition key, locally sort by partition key |
| ordered | no distribution, locally sorted | hash distribute by partition key, locally sorted | globally sorted |
Or in terms of the methods you've added:
| Spark | none |
hash |
range |
|---|---|---|---|
| unordered | no distribution, local sort by toOrderedDistribution(spec, order, true) |
distribute by toClusteredDistribution(spec), local sort by toOrderedDistribution(spec, order, true) |
global sort by toOrderedDistribution(spec, order, true) |
| ordered | no distribution, local sort by toOrderedDistribution(spec, order, true) |
distribute by toClusteredDistribution(spec), local sort by toOrderedDistribution(spec, order, true) |
global sort by toOrderedDistribution(spec, order, true) |
The result of toOrderedDistribution(spec, order, true) is always used to sort, whether locally or globally. If the sort order is unordered, then it will infer the order from the partition spec just as we wanted from the top table. For hash partitioning, we can use toClusteredDistribution that you added. Then the only other concern is that we need to add a round-robin shuffle before the global sort.
With that addition, let me revise my code from above:
val distributionMode = table.properties.getOrDefault("write.distribution-mode", "range")
val order = toCatalyst(toOrderedDistribution(spec, order, true))
distributionMode.toLower(Locale.ROOT) match {
case "none" =>
Sort(order, global = false, childPlan)
case "hash" =>
val clustering = toCatalyst(toClusteredDistribution(spec))
val hashPartitioned = RepartitionByExpression(clustering, childPlan, numShufflePartitions)
Sort(order, global = false, hashPartitioned)
case "range" =>
val roundRobin = Repartition(numShufflePartitions, shuffle = true, childPlan)
Sort(order, global = true, roundRobin)
}There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue Thank you very much !!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 here
fcc0e0d to
e5e2673
Compare
Co-authored-by: Ashish Mehta<mehta.ashish23@gmail.com>
e5e2673 to
162ac3c
Compare
| .getOrDefault("write.distribution-mode", TableProperties.WRITE_DISTRIBUTION_MODE_RANGE) | ||
| val order = toCatalyst(toOrderedDistribution(table.spec(), table.sortOrder(), true), childPlan) | ||
| distributionMode.toLowerCase(Locale.ROOT) match { | ||
| case TableProperties.WRITE_DISTRIBUTION_MODE_DEFAULT => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should rename this to WRITE_DISTRIBUTION_MODE_NONE in a follow-up, since the default depends on the engine. We can also add WRITE_DISTRIBUTION_MODE_FLINK_DEFAULT and WRITE_DISTRIBUTION_MODE_SPARK_DEFAULT.
| val distributionMode: String = table.properties | ||
| .getOrDefault("write.distribution-mode", TableProperties.WRITE_DISTRIBUTION_MODE_RANGE) | ||
| val order = toCatalyst(toOrderedDistribution(table.spec(), table.sortOrder(), true), childPlan) | ||
| distributionMode.toLowerCase(Locale.ROOT) match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Flink commit also added a Java enum for this. We could use that instead of string matching here. It handles case insensitive mapping, too: DistributionMode.fromName(modeName)
| case class MergeInto( | ||
| mergeIntoProcessor: MergeIntoParams, | ||
| targetRelation: DataSourceV2Relation, | ||
| targetOutput: Seq[Attribute], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this could just be output and you wouldn't need to override def output below.
| case class MergeIntoExec( | ||
| mergeIntoParams: MergeIntoParams, | ||
| @transient targetRelation: DataSourceV2Relation, | ||
| targetOutput: Seq[Attribute], |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here, using output would make the method definition unnecessary.
| case MergeInto(mergeIntoProcessor, targetRelation, child) => | ||
| MergeIntoExec(mergeIntoProcessor, targetRelation, planLater(child)) :: Nil | ||
| case MergeInto(mergeIntoParms, targetAttributes, child) => | ||
| MergeIntoExec(mergeIntoParms, targetAttributes, planLater(child)) :: Nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be more clear to use output instead of targetAttributes here since that's what this is setting, but this is minor.
...ions/src/main/scala/org/apache/spark/sql/catalyst/utils/RewriteRowLevelOperationHelper.scala
Show resolved
Hide resolved
| val numShufflePartitions = spark.sessionState.conf.numShufflePartitions | ||
| val table = iceTable.table() | ||
| val distributionMode: String = table.properties | ||
| .getOrDefault("write.distribution-mode", TableProperties.WRITE_DISTRIBUTION_MODE_RANGE) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't write.distribution-mode listed in TableProperties? We should use the constant.
| } | ||
|
|
||
| @Test | ||
| public void testSingleConditionalDeleteCountCheck() throws NoSuchTableException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like this test case was accidentally deleted?
| "WHEN MATCHED AND target.id = 6 THEN DELETE \n" + | ||
| "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * "; | ||
|
|
||
| sql(sqlText, ""); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: passing an extra empty string and passing table names embedded in the SQL text.
| List<Employee> input = Arrays.asList(employees); | ||
| Dataset<Row> inputDF = spark.createDataFrame(input, Employee.class); | ||
| inputDF.coalesce(1).writeTo(tabName).append(); | ||
| private void setWriteMode(String tabName, String mode) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be setDistributionMode instead because setWriteMode sounds more general, like "copy-on-write" or "merge-on-read".
| sql(sqlText, ""); | ||
| assertEquals("Should have expected rows", | ||
| ImmutableList.of(row(1, "emp-id-1"), row(2, "emp-id-2")), | ||
| sql("SELECT * FROM %s ORDER BY id ASC NULLS LAST", targetName)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Indentation is off in the new methods. Should be 2 indents or 4 spaces for continuations.
spark3-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestMergeIntoTable.java
Show resolved
Hide resolved
| ) | ||
|
|
||
| val mergePlan = MergeInto(mergeParams, target, joinPlan) | ||
| val mergePlan = MergeInto(mergeParams, target.output, joinPlan) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not sort this case as well? If the user has requested a sort order on the table, it makes sense to enforce it even if we aren't also rewriting files.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rdblue Wouldn't AppendData have logic to do the right thing based on child's distribution and ordering ? I remember seeing something from Anton on this. I will go ahead and change it for now.
| val batchWrite = mergeBuilder.asWriteBuilder.buildForBatch() | ||
|
|
||
| ReplaceData(target, batchWrite, mergePlan) | ||
| ReplaceData(target, batchWrite, writePlan) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: newline was removed, which is a whitespace change.
| .getOrDefault(TableProperties.WRITE_DISTRIBUTION_MODE, TableProperties.WRITE_DISTRIBUTION_MODE_RANGE) | ||
| val mode = DistributionMode.fromName(distributionMode) | ||
| val order = toCatalyst(toOrderedDistribution(table.spec(), table.sortOrder(), true), childPlan) | ||
| mode match { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Minor: The mode variable isn't really needed. It could be DistributionMode.fromName(distributionMode) match { ... }
| "WHEN MATCHED AND target.id = 6 THEN DELETE \n" + | ||
| "WHEN NOT MATCHED AND source.id = 2 THEN INSERT * "; | ||
|
|
||
| sql(sqlText, ""); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: use of "" instead of filling in table names in this test as well.
|
Thanks @dilipbiswal and @mehtaashish23! Looks good. |
Co-authored-by: Ashish Mehta<mehta.ashish23@gmail.com> Co-authored-by: Dilip Biswal <dbiswal@adobe.com>
Note:
There is proposal from Anton to group and sort rows based on
_fileand_posfor unchanged rows and partition spec and sort order for unchanged rows. This can be added in a follow-up.