From 24deda59bfd67504a26ecf74df3f393729c3d8f4 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sat, 18 Jun 2022 00:57:26 -0700 Subject: [PATCH 1/7] debugging --- .../ResolveMergeIntoTableReferences.scala | 6 +- .../analysis/RewriteMergeIntoTable.scala | 70 +++++++++-------- .../IcebergSparkSqlExtensionsParser.scala | 17 ++++- .../planning/RewrittenRowLevelCommand.scala | 4 +- .../catalyst/plans/logical/ReplaceData.scala | 1 - .../datasources/v2/MergeRowsExec.scala | 7 +- .../RowLevelCommandScanRelationPushDown.scala | 1 - .../iceberg/spark/extensions/TestUpdate.java | 76 ++++++++++++++++--- .../spark/source/SparkBatchQueryScan.java | 6 +- .../apache/iceberg/spark/SparkTestBase.java | 3 + 10 files changed, 137 insertions(+), 54 deletions(-) diff --git a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMergeIntoTableReferences.scala b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMergeIntoTableReferences.scala index 63ebdef95730..70ac93f3bd11 100644 --- a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMergeIntoTableReferences.scala +++ b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMergeIntoTableReferences.scala @@ -93,12 +93,16 @@ case class ResolveMergeIntoTableReferences(spark: SparkSession) extends Rule[Log val resolvedMergeCondition = resolveCond("SEARCH", context.mergeCondition, m) - MergeIntoIcebergTable( + val x = MergeIntoIcebergTable( targetTable, sourceTable, mergeCondition = resolvedMergeCondition, matchedActions = resolvedMatchedActions, notMatchedActions = resolvedNotMatchedActions) + x + + case m @ UnresolvedMergeIntoIcebergTable(targetTable, sourceTable, context) => + m } private def resolveCond(condName: String, cond: Expression, plan: LogicalPlan): Expression = { diff --git a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala index 886ed5002584..1f00a9816018 100644 --- a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala +++ b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala @@ -19,6 +19,7 @@ package org.apache.spark.sql.catalyst.analysis +import org.apache.iceberg.spark.source.SparkTable import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.catalyst.expressions.Alias import org.apache.spark.sql.catalyst.expressions.Attribute @@ -36,23 +37,7 @@ import org.apache.spark.sql.catalyst.plans.Inner import org.apache.spark.sql.catalyst.plans.LeftAnti import org.apache.spark.sql.catalyst.plans.LeftOuter import org.apache.spark.sql.catalyst.plans.RightOuter -import org.apache.spark.sql.catalyst.plans.logical.AppendData -import org.apache.spark.sql.catalyst.plans.logical.DeleteAction -import org.apache.spark.sql.catalyst.plans.logical.Filter -import org.apache.spark.sql.catalyst.plans.logical.HintInfo -import org.apache.spark.sql.catalyst.plans.logical.InsertAction -import org.apache.spark.sql.catalyst.plans.logical.Join -import org.apache.spark.sql.catalyst.plans.logical.JoinHint -import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan -import org.apache.spark.sql.catalyst.plans.logical.MergeAction -import org.apache.spark.sql.catalyst.plans.logical.MergeIntoIcebergTable -import org.apache.spark.sql.catalyst.plans.logical.MergeRows -import org.apache.spark.sql.catalyst.plans.logical.NO_BROADCAST_HASH -import org.apache.spark.sql.catalyst.plans.logical.NoStatsUnaryNode -import org.apache.spark.sql.catalyst.plans.logical.Project -import org.apache.spark.sql.catalyst.plans.logical.ReplaceData -import org.apache.spark.sql.catalyst.plans.logical.UpdateAction -import org.apache.spark.sql.catalyst.plans.logical.WriteDelta +import org.apache.spark.sql.catalyst.plans.logical.{AppendData, DeleteAction, Filter, HintInfo, InsertAction, Join, JoinHint, LogicalPlan, MergeAction, MergeIntoIcebergTable, MergeRows, NO_BROADCAST_HASH, NoStatsUnaryNode, Project, ReplaceData, UpdateAction, View, WriteDelta} import org.apache.spark.sql.catalyst.util.RowDeltaUtils._ import org.apache.spark.sql.connector.expressions.FieldReference import org.apache.spark.sql.connector.expressions.NamedReference @@ -82,7 +67,6 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand { override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators { case m @ MergeIntoIcebergTable(aliasedTable, source, cond, matchedActions, notMatchedActions, None) if m.resolved && m.aligned && matchedActions.isEmpty && notMatchedActions.size == 1 => - EliminateSubqueryAliases(aliasedTable) match { case r: DataSourceV2Relation => // NOT MATCHED conditions may only refer to columns in source so they can be pushed down @@ -112,7 +96,6 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand { case m @ MergeIntoIcebergTable(aliasedTable, source, cond, matchedActions, notMatchedActions, None) if m.resolved && m.aligned && matchedActions.isEmpty => - EliminateSubqueryAliases(aliasedTable) match { case r: DataSourceV2Relation => // when there are no MATCHED actions, use a left anti join to remove any matching rows @@ -144,27 +127,49 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand { throw new AnalysisException(s"$p is not an Iceberg table") } - case m @ MergeIntoIcebergTable(aliasedTable, source, cond, matchedActions, notMatchedActions, None) + case m @ MergeIntoIcebergTable(aliasedTable, _, _, _, _, None) if m.resolved && m.aligned => - EliminateSubqueryAliases(aliasedTable) match { case r @ DataSourceV2Relation(tbl: SupportsRowLevelOperations, _, _, _, _) => - val operation = buildRowLevelOperation(tbl, MERGE) - val table = RowLevelOperationTable(tbl, operation) - val rewritePlan = operation match { - case _: SupportsDelta => - buildWriteDeltaPlan(r, table, source, cond, matchedActions, notMatchedActions) - case _ => - buildReplaceDataPlan(r, table, source, cond, matchedActions, notMatchedActions) + rewriteIcebergRelation(m, r, tbl) + case p: View => + val relations = p.children.collect { case r: DataSourceV2Relation if r.table.isInstanceOf[SparkTable] => + r + } + val icebergTableView = relations.nonEmpty && relations.size == 1 + if (icebergTableView) { + val newM = rewriteIcebergRelation( + m, + relations.head, + relations.head.table.asInstanceOf[SupportsRowLevelOperations]) + newM + } else { + throw new AnalysisException(s"$p is not an Iceberg table") } - - m.copy(rewritePlan = Some(rewritePlan)) - case p => throw new AnalysisException(s"$p is not an Iceberg table") } } + private def rewriteIcebergRelation( + m: MergeIntoIcebergTable, + r: DataSourceV2Relation, + tbl: SupportsRowLevelOperations): MergeIntoIcebergTable = { + val operation = buildRowLevelOperation(tbl, MERGE) + val table = RowLevelOperationTable(tbl, operation) + val replacedSourceTable = EliminateSubqueryAliases(m.sourceTable) match { + case v: View if v.isTempViewStoringAnalyzedPlan => v.child + case other => other + } + val rewritePlan = operation match { + case _: SupportsDelta => + buildWriteDeltaPlan(r, table, replacedSourceTable, m.mergeCondition, m.matchedActions, m.notMatchedActions) + case _ => + buildReplaceDataPlan(r, table, replacedSourceTable, m.mergeCondition, m.matchedActions, m.notMatchedActions) + } + m.copy(sourceTable = replacedSourceTable, rewritePlan = Some(rewritePlan)) + } + // build a rewrite plan for sources that support replacing groups of data (e.g. files, partitions) private def buildReplaceDataPlan( relation: DataSourceV2Relation, @@ -228,7 +233,8 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand { // build a plan to replace read groups in the table val writeRelation = relation.copy(table = operationTable) - ReplaceData(writeRelation, mergeRows, relation) + val r = ReplaceData(writeRelation, mergeRows, relation) + r } // build a rewrite plan for sources that support row deltas diff --git a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala index 0339d8bff833..22fedd725655 100644 --- a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala +++ b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala @@ -20,6 +20,7 @@ package org.apache.spark.sql.catalyst.parser.extensions import java.util.Locale + import org.antlr.v4.runtime._ import org.antlr.v4.runtime.atn.PredictionMode import org.antlr.v4.runtime.misc.Interval @@ -27,7 +28,7 @@ import org.antlr.v4.runtime.misc.ParseCancellationException import org.antlr.v4.runtime.tree.TerminalNodeImpl import org.apache.iceberg.common.DynConstructors import org.apache.iceberg.spark.Spark3Util -import org.apache.iceberg.spark.source.SparkTable +import org.apache.iceberg.spark.source.{SparkBatchQueryScan, SparkFilesScan, SparkTable} import org.apache.spark.sql.AnalysisException import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.FunctionIdentifier @@ -57,6 +58,10 @@ import org.apache.spark.sql.types.StructType import scala.jdk.CollectionConverters._ import scala.util.Try +import org.apache.iceberg.hadoop.HadoopTables +import org.apache.spark.sql.execution.datasources.v2.BatchScanExec +import org.apache.spark.sql.execution.metric.SQLMetrics.cachedSQLAccumIdentifier.x + class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserInterface { import IcebergSparkSqlExtensionsParser._ @@ -143,6 +148,8 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI // see ResolveMergeIntoTableReferences for details val context = MergeIntoContext(cond, matchedActions, notMatchedActions) UnresolvedMergeIntoIcebergTable(aliasedTable, source, context) + case x => + x } object UnresolvedIcebergTable { @@ -162,8 +169,12 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI case tableCatalog: TableCatalog => Try(tableCatalog.loadTable(catalogAndIdentifier.identifier)) .map(isIcebergTable) - .getOrElse(false) - + .getOrElse(SparkSession.active.table(s"${multipartIdent.mkString(".")}").queryExecution + .executedPlan.collect { + case BatchScanExec(_, scan, _) if scan.isInstanceOf[SparkBatchQueryScan] => + val ht = new HadoopTables(SparkSession.active.sparkContext.hadoopConfiguration) + ht.exists(scan.asInstanceOf[SparkBatchQueryScan].tableScan().table().location()) + }.contains(true)) case _ => false } diff --git a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/planning/RewrittenRowLevelCommand.scala b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/planning/RewrittenRowLevelCommand.scala index 4cc7a7bf2f96..a2c888473cf2 100644 --- a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/planning/RewrittenRowLevelCommand.scala +++ b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/planning/RewrittenRowLevelCommand.scala @@ -56,7 +56,7 @@ object RewrittenRowLevelCommand { case _ => false } - rewritePlan match { + val r = rewritePlan match { case rd @ ReplaceData(DataSourceV2Relation(table, _, _, _, _), query, _, _) => val readRelation = findReadRelation(table, query, allowScanDuplication) readRelation.map((c, _, rd)) @@ -66,7 +66,7 @@ object RewrittenRowLevelCommand { case _ => None } - + r case _ => None } diff --git a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ReplaceData.scala b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ReplaceData.scala index 3bf726ffb719..7a52acc3a5bd 100644 --- a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ReplaceData.scala +++ b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/ReplaceData.scala @@ -52,7 +52,6 @@ case class ReplaceData( // they will be discarded after the logical write is built in the optimizer // metadata columns may be needed to request a correct distribution or ordering // but are not passed back to the data source during writes - table.skipSchemaResolution || (dataInput.size == table.output.size && dataInput.zip(table.output).forall { case (inAttr, outAttr) => val outType = CharVarcharUtils.getRawType(outAttr.metadata).getOrElse(outAttr.dataType) diff --git a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala index 4fbf8a523a54..0a796546450d 100644 --- a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala +++ b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/MergeRowsExec.scala @@ -184,7 +184,12 @@ case class MergeRowsExec( } rowIterator - .map(processFunc) + .map(row => { + println(s"input: $row") + val o = processFunc(row) + println(s"output: $o") + o + }) .filter(row => row != null) } } diff --git a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RowLevelCommandScanRelationPushDown.scala b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RowLevelCommandScanRelationPushDown.scala index 4e89b9a1c243..3f6424d9ca03 100644 --- a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RowLevelCommandScanRelationPushDown.scala +++ b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/execution/datasources/v2/RowLevelCommandScanRelationPushDown.scala @@ -64,7 +64,6 @@ object RowLevelCommandScanRelationPushDown extends Rule[LogicalPlan] with Predic case r: DataSourceV2Relation if r.table eq table => DataSourceV2ScanRelation(r, scan, toOutputAttrs(scan.readSchema(), r)) } - command.withNewRewritePlan(newRewritePlan) } diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java index edc3944e69fe..5899f9b23dea 100644 --- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java +++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java @@ -19,6 +19,7 @@ package org.apache.iceberg.spark.extensions; +import java.io.File; import java.util.Arrays; import java.util.List; import java.util.Map; @@ -29,13 +30,11 @@ import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import org.apache.iceberg.AssertHelpers; -import org.apache.iceberg.DataFile; -import org.apache.iceberg.RowLevelOperationMode; -import org.apache.iceberg.Snapshot; -import org.apache.iceberg.SnapshotSummary; -import org.apache.iceberg.Table; + +import org.apache.commons.io.FileUtils; +import org.apache.iceberg.*; import org.apache.iceberg.exceptions.ValidationException; +import org.apache.iceberg.hadoop.HadoopTables; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet; @@ -43,19 +42,20 @@ import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.util.concurrent.MoreExecutors; import org.apache.iceberg.spark.SparkSQLProperties; +import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.spark.SparkException; import org.apache.spark.sql.AnalysisException; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Encoders; import org.apache.spark.sql.Row; import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan; +import org.apache.spark.sql.catalyst.plans.logical.ReplaceData; +import org.apache.spark.sql.execution.SparkPlan; import org.apache.spark.sql.internal.SQLConf; +import org.apache.spark.sql.types.StructType; import org.hamcrest.CoreMatchers; -import org.junit.After; -import org.junit.Assert; -import org.junit.Assume; -import org.junit.BeforeClass; -import org.junit.Test; +import org.junit.*; import static org.apache.iceberg.DataOperations.OVERWRITE; import static org.apache.iceberg.RowLevelOperationMode.COPY_ON_WRITE; @@ -69,6 +69,9 @@ import static org.apache.iceberg.TableProperties.UPDATE_MODE_DEFAULT; import static org.apache.spark.sql.functions.lit; +import scala.collection.Iterator; +import scala.collection.JavaConverters.*; + public abstract class TestUpdate extends SparkRowLevelOperationsTestBase { public TestUpdate(String catalogName, String implementation, Map config, @@ -81,6 +84,11 @@ public static void setupSparkConf() { spark.conf().set("spark.sql.shuffle.partitions", "4"); } + @Before + public void cleanup() { + sql("DROP TABLE IF EXISTS %s", tableName); + } + @After public void removeTables() { sql("DROP TABLE IF EXISTS %s", tableName); @@ -89,6 +97,7 @@ public void removeTables() { sql("DROP TABLE IF EXISTS deleted_employee"); } + /* @Test public void testExplain() { createAndInitTable("id INT, dep STRING"); @@ -135,8 +144,51 @@ public void testUpdateWithAlias() { assertEquals("Should have expected rows", ImmutableList.of(row(1, "invalid")), sql("SELECT * FROM %s", tableName)); + }*/ + + @Test + public void testHadoopTables() throws Exception { + List ids = Lists.newArrayListWithCapacity(2); + for (int id = 1; id <= 2; id++) { + ids.add(id); + } + Dataset df = spark.createDataset(ids, Encoders.INT()) + .withColumnRenamed("value", "id"); + HadoopTables ht = new HadoopTables(spark.sparkContext().hadoopConfiguration()); + Schema tableSchema = SparkSchemaUtil.convert(df.schema()); + File dir = java.nio.file.Files.createTempDirectory("TestUpdate").toFile(); + // FileUtils.forceDeleteOnExit(dir); + String path = dir.getAbsolutePath(); + ht.create(tableSchema, path); + df.write().format("iceberg").mode("overwrite").save(path); + Dataset tableDF = spark.read().format("iceberg").load(path); + tableDF.createOrReplaceTempView("target"); + df.createOrReplaceTempView("source"); + spark.sql("select * from source").show(); + /* + LogicalPlan parsed = spark.sessionState().sqlParser().parsePlan( + "MERGE INTO target using source on target.id = source.id" + + " WHEN MATCHED THEN UPDATE SET target.id = target.id + 1 WHEN NOT MATCHED THEN INSERT *"); + LogicalPlan analyzed = spark.sessionState().analyzer().execute(parsed); + LogicalPlan optimized = spark.sessionState().optimizer().execute(analyzed); + Iterator plans = spark.sessionState().planner().plan(optimized); + while (plans.hasNext()) { + System.out.println(plans.next().treeString()); + }*/ + sql("MERGE INTO target using source on target.id = source.id " + + "WHEN MATCHED THEN UPDATE SET target.id = target.id + 1"); + spark.read().format("iceberg").load(path).show(); + /* + for (int i = 0; i < analyzed.children().size(); i++) { + if (analyzed.children().apply(i) instanceof ReplaceData) { + ReplaceData replaceData = (ReplaceData) analyzed.children().apply(i); + System.out.println(((LogicalPlan) replaceData.table()).resolved() + ":::" + replaceData.query().resolved() + ":::" + + replaceData.outputResolved()); + } + }*/ } + /* @Test public void testUpdateAlignsAssignments() { createAndInitTable("id INT, c1 INT, c2 INT"); @@ -993,7 +1045,7 @@ public void testUpdateOnNonIcebergTableNotSupported() { UnsupportedOperationException.class, "not supported temporarily", () -> sql("UPDATE %s SET c1 = -1 WHERE c2 = 1", "testtable")); } - +*/ private RowLevelOperationMode mode(Table table) { String modeName = table.properties().getOrDefault(UPDATE_MODE, UPDATE_MODE_DEFAULT); return RowLevelOperationMode.fromName(modeName); diff --git a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java index 651a411ebd7b..8f729e973856 100644 --- a/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java +++ b/spark/v3.2/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java @@ -59,7 +59,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -class SparkBatchQueryScan extends SparkScan implements SupportsRuntimeFiltering { +public class SparkBatchQueryScan extends SparkScan implements SupportsRuntimeFiltering { private static final Logger LOG = LoggerFactory.getLogger(SparkBatchQueryScan.class); @@ -121,6 +121,10 @@ private List files() { return files; } + public TableScan tableScan() { + return scan; + } + @Override protected List tasks() { if (tasks == null) { diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java index 8022e8696b63..3ed2d129be16 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java @@ -101,6 +101,9 @@ protected long waitUntilAfter(long timestampMillis) { } protected List sql(String query, Object... args) { + if (query.startsWith("MERGE INTO")) { + System.out.println(spark.sql(String.format(query, args)).queryExecution().analyzed().treeString()); + } List rows = spark.sql(String.format(query, args)).collectAsList(); if (rows.size() < 1) { return ImmutableList.of(); From d4631879bb0740cea5c4e9609382fe0ed9165bee Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sat, 18 Jun 2022 20:08:29 -0700 Subject: [PATCH 2/7] add --- .../analysis/RewriteMergeIntoTable.scala | 10 +++----- .../iceberg/spark/extensions/TestUpdate.java | 22 ++++++++++++++++-- .../v3.2/spark-extensions/table/._SUCCESS.crc | Bin 0 -> 8 bytes ...-9bc2-cbcbc66c325b-c000.snappy.parquet.crc | Bin 0 -> 12 bytes ...-9bc2-cbcbc66c325b-c000.snappy.parquet.crc | Bin 0 -> 12 bytes spark/v3.2/spark-extensions/table/_SUCCESS | 0 ...47a3-9bc2-cbcbc66c325b-c000.snappy.parquet | Bin 0 -> 452 bytes ...47a3-9bc2-cbcbc66c325b-c000.snappy.parquet | Bin 0 -> 452 bytes 8 files changed, 23 insertions(+), 9 deletions(-) create mode 100644 spark/v3.2/spark-extensions/table/._SUCCESS.crc create mode 100644 spark/v3.2/spark-extensions/table/.part-00000-cfeeb10f-708b-47a3-9bc2-cbcbc66c325b-c000.snappy.parquet.crc create mode 100644 spark/v3.2/spark-extensions/table/.part-00001-cfeeb10f-708b-47a3-9bc2-cbcbc66c325b-c000.snappy.parquet.crc create mode 100644 spark/v3.2/spark-extensions/table/_SUCCESS create mode 100644 spark/v3.2/spark-extensions/table/part-00000-cfeeb10f-708b-47a3-9bc2-cbcbc66c325b-c000.snappy.parquet create mode 100644 spark/v3.2/spark-extensions/table/part-00001-cfeeb10f-708b-47a3-9bc2-cbcbc66c325b-c000.snappy.parquet diff --git a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala index 1f00a9816018..4dbf7ae859b2 100644 --- a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala +++ b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala @@ -157,17 +157,13 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand { tbl: SupportsRowLevelOperations): MergeIntoIcebergTable = { val operation = buildRowLevelOperation(tbl, MERGE) val table = RowLevelOperationTable(tbl, operation) - val replacedSourceTable = EliminateSubqueryAliases(m.sourceTable) match { - case v: View if v.isTempViewStoringAnalyzedPlan => v.child - case other => other - } val rewritePlan = operation match { case _: SupportsDelta => - buildWriteDeltaPlan(r, table, replacedSourceTable, m.mergeCondition, m.matchedActions, m.notMatchedActions) + buildWriteDeltaPlan(r, table, m.sourceTable, m.mergeCondition, m.matchedActions, m.notMatchedActions) case _ => - buildReplaceDataPlan(r, table, replacedSourceTable, m.mergeCondition, m.matchedActions, m.notMatchedActions) + buildReplaceDataPlan(r, table, m.sourceTable, m.mergeCondition, m.matchedActions, m.notMatchedActions) } - m.copy(sourceTable = replacedSourceTable, rewritePlan = Some(rewritePlan)) + m.copy(rewritePlan = Some(rewritePlan)) } // build a rewrite plan for sources that support replacing groups of data (e.g. files, partitions) diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java index 5899f9b23dea..4fdca5e1e38b 100644 --- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java +++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java @@ -146,6 +146,23 @@ public void testUpdateWithAlias() { sql("SELECT * FROM %s", tableName)); }*/ + @Test + public void testHadoopTables2() throws Exception { + createAndInitTable("id INT"); + List ids = Lists.newArrayListWithCapacity(2); + for (int id = 1; id <= 2; id++) { + ids.add(id); + } + Dataset df = spark.createDataset(ids, Encoders.INT()) + .withColumnRenamed("value", "id"); + df.createOrReplaceTempView("source"); + sql("INSERT INTO %s VALUES (1), (2)", tableName); + sql("MERGE INTO %s using source on %s.id = source.id " + + "WHEN MATCHED THEN UPDATE SET %s.id = source.id + 1", tableName, tableName, tableName); + spark.read().table("table").show(); + } + + /* @Test public void testHadoopTables() throws Exception { List ids = Lists.newArrayListWithCapacity(2); @@ -175,8 +192,9 @@ public void testHadoopTables() throws Exception { while (plans.hasNext()) { System.out.println(plans.next().treeString()); }*/ + /* sql("MERGE INTO target using source on target.id = source.id " + - "WHEN MATCHED THEN UPDATE SET target.id = target.id + 1"); + "WHEN MATCHED THEN UPDATE SET target.id = source.id + 1"); spark.read().format("iceberg").load(path).show(); /* for (int i = 0; i < analyzed.children().size(); i++) { @@ -186,7 +204,7 @@ public void testHadoopTables() throws Exception { replaceData.outputResolved()); } }*/ - } + //} /* @Test diff --git a/spark/v3.2/spark-extensions/table/._SUCCESS.crc b/spark/v3.2/spark-extensions/table/._SUCCESS.crc new file mode 100644 index 0000000000000000000000000000000000000000..3b7b044936a890cd8d651d349a752d819d71d22c GIT binary patch literal 8 PcmYc;N@ieSU}69O2$TUk literal 0 HcmV?d00001 diff --git a/spark/v3.2/spark-extensions/table/.part-00000-cfeeb10f-708b-47a3-9bc2-cbcbc66c325b-c000.snappy.parquet.crc b/spark/v3.2/spark-extensions/table/.part-00000-cfeeb10f-708b-47a3-9bc2-cbcbc66c325b-c000.snappy.parquet.crc new file mode 100644 index 0000000000000000000000000000000000000000..72989152a2bba72b45c00ba41055826c41d58d2b GIT binary patch literal 12 TcmYc;N@ieSU}9Lid{-3!6c+=- literal 0 HcmV?d00001 diff --git a/spark/v3.2/spark-extensions/table/.part-00001-cfeeb10f-708b-47a3-9bc2-cbcbc66c325b-c000.snappy.parquet.crc b/spark/v3.2/spark-extensions/table/.part-00001-cfeeb10f-708b-47a3-9bc2-cbcbc66c325b-c000.snappy.parquet.crc new file mode 100644 index 0000000000000000000000000000000000000000..85ecbb30bb6a91d6478773718e938bfc6eced46c GIT binary patch literal 12 TcmYc;N@ieSU}E_B=eZ#O7FPsF literal 0 HcmV?d00001 diff --git a/spark/v3.2/spark-extensions/table/_SUCCESS b/spark/v3.2/spark-extensions/table/_SUCCESS new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/spark/v3.2/spark-extensions/table/part-00000-cfeeb10f-708b-47a3-9bc2-cbcbc66c325b-c000.snappy.parquet b/spark/v3.2/spark-extensions/table/part-00000-cfeeb10f-708b-47a3-9bc2-cbcbc66c325b-c000.snappy.parquet new file mode 100644 index 0000000000000000000000000000000000000000..47c9bf5a7de1d0f5ccc1322a5264a4ae405cbe93 GIT binary patch literal 452 zcmZWm%SyvQ6rGM*g1Qkp!vwMjgO(QJ&}90?jV_9~km626WSUHCFiG1y6scYNH~xjc zCEhl5<>o%l+8vi)sUCW1{0 zutg0rf&xMBTpwo(mA*V>{TELV>8*>Wf#MX5l}c>cc7e8{srE&QB2d>ZN~$oTMK#S;~8p27g&-B_69 zdZtqVk|K)KB!U6h)DHjS`(8(kD}A;q1D$TXSOVA4mDDpI@hEBq6G z#Q*WOsVg^k?m2VsIft3y<(*4`mMEuR-=9B+4kZYS)B-p=Ej*i`$3*nEH6(|HWTqiG2ssC(ZO$m}`gp)7;Mr_A+Di*5SEvpFjhNXlNcEtu2>mt|8 zD}(CRC`V|4c~A*}eb5rSr4n$7R4g_qVSqa4vt%OmT%)Pn`pZ|7q~UDJ50vzz3%vGk zB3(ohqb`;vlgaE&#wOE&&NRPYiR^uDL|>#?vh*`i7vs=GK`QzWD>2pa<|+)3-i?LR z%uGyz#B>=&dKBRRZkoFI(-i&Hdi}TtezS8tkCXB)V%e3hC%tlYv<#!5-02wA@Vkwk mQr))kTb{4{v2OdSIqI0PQl6_k&+j#Bw&UQNX9a*E{@*XDVrD4- literal 0 HcmV?d00001 From 59735b9c87bb80991b27b08ebb52ff1b7e732742 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sat, 18 Jun 2022 20:17:00 -0700 Subject: [PATCH 3/7] cleanup --- .../catalyst/analysis/ResolveMergeIntoTableReferences.scala | 6 +----- .../spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala | 3 +-- .../parser/extensions/IcebergSparkSqlExtensionsParser.scala | 2 -- .../org/apache/iceberg/spark/extensions/TestUpdate.java | 5 ----- 4 files changed, 2 insertions(+), 14 deletions(-) diff --git a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMergeIntoTableReferences.scala b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMergeIntoTableReferences.scala index 70ac93f3bd11..63ebdef95730 100644 --- a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMergeIntoTableReferences.scala +++ b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/ResolveMergeIntoTableReferences.scala @@ -93,16 +93,12 @@ case class ResolveMergeIntoTableReferences(spark: SparkSession) extends Rule[Log val resolvedMergeCondition = resolveCond("SEARCH", context.mergeCondition, m) - val x = MergeIntoIcebergTable( + MergeIntoIcebergTable( targetTable, sourceTable, mergeCondition = resolvedMergeCondition, matchedActions = resolvedMatchedActions, notMatchedActions = resolvedNotMatchedActions) - x - - case m @ UnresolvedMergeIntoIcebergTable(targetTable, sourceTable, context) => - m } private def resolveCond(condName: String, cond: Expression, plan: LogicalPlan): Expression = { diff --git a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala index 4dbf7ae859b2..47e92bf64e94 100644 --- a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala +++ b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala @@ -229,8 +229,7 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand { // build a plan to replace read groups in the table val writeRelation = relation.copy(table = operationTable) - val r = ReplaceData(writeRelation, mergeRows, relation) - r + ReplaceData(writeRelation, mergeRows, relation) } // build a rewrite plan for sources that support row deltas diff --git a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala index 22fedd725655..6c9e1f579916 100644 --- a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala +++ b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala @@ -148,8 +148,6 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI // see ResolveMergeIntoTableReferences for details val context = MergeIntoContext(cond, matchedActions, notMatchedActions) UnresolvedMergeIntoIcebergTable(aliasedTable, source, context) - case x => - x } object UnresolvedIcebergTable { diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java index 4fdca5e1e38b..867ab8408ea5 100644 --- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java +++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java @@ -84,11 +84,6 @@ public static void setupSparkConf() { spark.conf().set("spark.sql.shuffle.partitions", "4"); } - @Before - public void cleanup() { - sql("DROP TABLE IF EXISTS %s", tableName); - } - @After public void removeTables() { sql("DROP TABLE IF EXISTS %s", tableName); From a3a0d585bcf129416ced5fd583e35d07f6a0be65 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sat, 18 Jun 2022 20:51:20 -0700 Subject: [PATCH 4/7] more --- .../analysis/RewriteMergeIntoTable.scala | 18 ++++++++++++++---- .../IcebergSparkSqlExtensionsParser.scala | 15 +++++++++------ .../planning/RewrittenRowLevelCommand.scala | 3 +-- .../iceberg/spark/extensions/TestUpdate.java | 7 ++++++- 4 files changed, 30 insertions(+), 13 deletions(-) diff --git a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala index 47e92bf64e94..10e99de598d6 100644 --- a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala +++ b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala @@ -127,11 +127,20 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand { throw new AnalysisException(s"$p is not an Iceberg table") } - case m @ MergeIntoIcebergTable(aliasedTable, _, _, _, _, None) + case m @ MergeIntoIcebergTable(aliasedTable,source, cond, matchedActions, notMatchedActions, None) if m.resolved && m.aligned => EliminateSubqueryAliases(aliasedTable) match { - case r @ DataSourceV2Relation(tbl: SupportsRowLevelOperations, _, _, _, _) => - rewriteIcebergRelation(m, r, tbl) + case r@DataSourceV2Relation(tbl: SupportsRowLevelOperations, _, _, _, _) => + val operation = buildRowLevelOperation(tbl, MERGE) + val table = RowLevelOperationTable(tbl, operation) + val rewritePlan = operation match { + case _: SupportsDelta => + buildWriteDeltaPlan(r, table, source, cond, matchedActions, notMatchedActions) + case _ => + buildReplaceDataPlan(r, table, source, cond, matchedActions, notMatchedActions) + } + m.copy(rewritePlan = Some(rewritePlan)) + /* case p: View => val relations = p.children.collect { case r: DataSourceV2Relation if r.table.isInstanceOf[SparkTable] => r @@ -145,10 +154,11 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand { newM } else { throw new AnalysisException(s"$p is not an Iceberg table") - } + }*/ case p => throw new AnalysisException(s"$p is not an Iceberg table") } + } private def rewriteIcebergRelation( diff --git a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala index 6c9e1f579916..86b0426d9c7d 100644 --- a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala +++ b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala @@ -167,17 +167,20 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI case tableCatalog: TableCatalog => Try(tableCatalog.loadTable(catalogAndIdentifier.identifier)) .map(isIcebergTable) - .getOrElse(SparkSession.active.table(s"${multipartIdent.mkString(".")}").queryExecution - .executedPlan.collect { - case BatchScanExec(_, scan, _) if scan.isInstanceOf[SparkBatchQueryScan] => - val ht = new HadoopTables(SparkSession.active.sparkContext.hadoopConfiguration) - ht.exists(scan.asInstanceOf[SparkBatchQueryScan].tableScan().table().location()) - }.contains(true)) + .getOrElse(false) case _ => false } } + /** + * SparkSession.active.table(s"${multipartIdent.mkString(".")}").queryExecution + .executedPlan.collect { + case BatchScanExec(_, scan, _) if scan.isInstanceOf[SparkBatchQueryScan] => + val ht = new HadoopTables(SparkSession.active.sparkContext.hadoopConfiguration) + ht.exists(scan.asInstanceOf[SparkBatchQueryScan].tableScan().table().location()) + }.contains(true) + */ private def isIcebergTable(table: Table): Boolean = table match { case _: SparkTable => true case _ => false diff --git a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/planning/RewrittenRowLevelCommand.scala b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/planning/RewrittenRowLevelCommand.scala index a2c888473cf2..90a38877be1c 100644 --- a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/planning/RewrittenRowLevelCommand.scala +++ b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/planning/RewrittenRowLevelCommand.scala @@ -56,7 +56,7 @@ object RewrittenRowLevelCommand { case _ => false } - val r = rewritePlan match { + rewritePlan match { case rd @ ReplaceData(DataSourceV2Relation(table, _, _, _, _), query, _, _) => val readRelation = findReadRelation(table, query, allowScanDuplication) readRelation.map((c, _, rd)) @@ -66,7 +66,6 @@ object RewrittenRowLevelCommand { case _ => None } - r case _ => None } diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java index 867ab8408ea5..1e96d79c0ac1 100644 --- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java +++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java @@ -154,7 +154,12 @@ public void testHadoopTables2() throws Exception { sql("INSERT INTO %s VALUES (1), (2)", tableName); sql("MERGE INTO %s using source on %s.id = source.id " + "WHEN MATCHED THEN UPDATE SET %s.id = source.id + 1", tableName, tableName, tableName); - spark.read().table("table").show(); + List list = sql("select * from %s", tableName); + for (int i = 0; i < list.size(); i++) { + for (int j = 0; j < list.get(i).length; j++) { + System.out.println(list.get(i)[j]); + } + } } /* From 3906197ce2645b2857f276612d2c36a96a9599e6 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sat, 18 Jun 2022 20:58:26 -0700 Subject: [PATCH 5/7] yo --- .../catalyst/analysis/RewriteMergeIntoTable.scala | 15 +++------------ .../IcebergSparkSqlExtensionsParser.scala | 15 ++++++--------- .../iceberg/spark/extensions/TestUpdate.java | 9 ++++----- .../org/apache/iceberg/spark/SparkTestBase.java | 3 --- 4 files changed, 13 insertions(+), 29 deletions(-) diff --git a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala index 10e99de598d6..285feeb0d14d 100644 --- a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala +++ b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/analysis/RewriteMergeIntoTable.scala @@ -130,17 +130,8 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand { case m @ MergeIntoIcebergTable(aliasedTable,source, cond, matchedActions, notMatchedActions, None) if m.resolved && m.aligned => EliminateSubqueryAliases(aliasedTable) match { - case r@DataSourceV2Relation(tbl: SupportsRowLevelOperations, _, _, _, _) => - val operation = buildRowLevelOperation(tbl, MERGE) - val table = RowLevelOperationTable(tbl, operation) - val rewritePlan = operation match { - case _: SupportsDelta => - buildWriteDeltaPlan(r, table, source, cond, matchedActions, notMatchedActions) - case _ => - buildReplaceDataPlan(r, table, source, cond, matchedActions, notMatchedActions) - } - m.copy(rewritePlan = Some(rewritePlan)) - /* + case r @ DataSourceV2Relation(tbl: SupportsRowLevelOperations, _, _, _, _) => + rewriteIcebergRelation(m, r, tbl) case p: View => val relations = p.children.collect { case r: DataSourceV2Relation if r.table.isInstanceOf[SparkTable] => r @@ -154,7 +145,7 @@ object RewriteMergeIntoTable extends RewriteRowLevelCommand { newM } else { throw new AnalysisException(s"$p is not an Iceberg table") - }*/ + } case p => throw new AnalysisException(s"$p is not an Iceberg table") } diff --git a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala index 86b0426d9c7d..6c9e1f579916 100644 --- a/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala +++ b/spark/v3.2/spark-extensions/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/IcebergSparkSqlExtensionsParser.scala @@ -167,20 +167,17 @@ class IcebergSparkSqlExtensionsParser(delegate: ParserInterface) extends ParserI case tableCatalog: TableCatalog => Try(tableCatalog.loadTable(catalogAndIdentifier.identifier)) .map(isIcebergTable) - .getOrElse(false) + .getOrElse(SparkSession.active.table(s"${multipartIdent.mkString(".")}").queryExecution + .executedPlan.collect { + case BatchScanExec(_, scan, _) if scan.isInstanceOf[SparkBatchQueryScan] => + val ht = new HadoopTables(SparkSession.active.sparkContext.hadoopConfiguration) + ht.exists(scan.asInstanceOf[SparkBatchQueryScan].tableScan().table().location()) + }.contains(true)) case _ => false } } - /** - * SparkSession.active.table(s"${multipartIdent.mkString(".")}").queryExecution - .executedPlan.collect { - case BatchScanExec(_, scan, _) if scan.isInstanceOf[SparkBatchQueryScan] => - val ht = new HadoopTables(SparkSession.active.sparkContext.hadoopConfiguration) - ht.exists(scan.asInstanceOf[SparkBatchQueryScan].tableScan().table().location()) - }.contains(true) - */ private def isIcebergTable(table: Table): Boolean = table match { case _: SparkTable => true case _ => false diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java index 1e96d79c0ac1..a8f9ae69196a 100644 --- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java +++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java @@ -140,7 +140,7 @@ public void testUpdateWithAlias() { ImmutableList.of(row(1, "invalid")), sql("SELECT * FROM %s", tableName)); }*/ - +/* @Test public void testHadoopTables2() throws Exception { createAndInitTable("id INT"); @@ -160,9 +160,8 @@ public void testHadoopTables2() throws Exception { System.out.println(list.get(i)[j]); } } - } + }*/ - /* @Test public void testHadoopTables() throws Exception { List ids = Lists.newArrayListWithCapacity(2); @@ -192,7 +191,7 @@ public void testHadoopTables() throws Exception { while (plans.hasNext()) { System.out.println(plans.next().treeString()); }*/ - /* + sql("MERGE INTO target using source on target.id = source.id " + "WHEN MATCHED THEN UPDATE SET target.id = source.id + 1"); spark.read().format("iceberg").load(path).show(); @@ -204,7 +203,7 @@ public void testHadoopTables() throws Exception { replaceData.outputResolved()); } }*/ - //} + } /* @Test diff --git a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java index 3ed2d129be16..8022e8696b63 100644 --- a/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java +++ b/spark/v3.2/spark/src/test/java/org/apache/iceberg/spark/SparkTestBase.java @@ -101,9 +101,6 @@ protected long waitUntilAfter(long timestampMillis) { } protected List sql(String query, Object... args) { - if (query.startsWith("MERGE INTO")) { - System.out.println(spark.sql(String.format(query, args)).queryExecution().analyzed().treeString()); - } List rows = spark.sql(String.format(query, args)).collectAsList(); if (rows.size() < 1) { return ImmutableList.of(); From e0c4c9756694fc751ceb15278cd0f8ef6914f963 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sat, 18 Jun 2022 21:10:03 -0700 Subject: [PATCH 6/7] fixed --- .../org/apache/iceberg/spark/extensions/TestUpdate.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java index a8f9ae69196a..8fa46339fc66 100644 --- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java +++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java @@ -92,7 +92,6 @@ public void removeTables() { sql("DROP TABLE IF EXISTS deleted_employee"); } - /* @Test public void testExplain() { createAndInitTable("id INT, dep STRING"); @@ -139,7 +138,7 @@ public void testUpdateWithAlias() { assertEquals("Should have expected rows", ImmutableList.of(row(1, "invalid")), sql("SELECT * FROM %s", tableName)); - }*/ + } /* @Test public void testHadoopTables2() throws Exception { @@ -205,7 +204,6 @@ public void testHadoopTables() throws Exception { }*/ } - /* @Test public void testUpdateAlignsAssignments() { createAndInitTable("id INT, c1 INT, c2 INT"); @@ -1062,7 +1060,7 @@ public void testUpdateOnNonIcebergTableNotSupported() { UnsupportedOperationException.class, "not supported temporarily", () -> sql("UPDATE %s SET c1 = -1 WHERE c2 = 1", "testtable")); } -*/ + private RowLevelOperationMode mode(Table table) { String modeName = table.properties().getOrDefault(UPDATE_MODE, UPDATE_MODE_DEFAULT); return RowLevelOperationMode.fromName(modeName); From 172ec3fe60ae1354f392f61357cd18421dbf4273 Mon Sep 17 00:00:00 2001 From: CodingCat Date: Sun, 19 Jun 2022 13:39:49 -0700 Subject: [PATCH 7/7] cleanup --- .../iceberg/spark/extensions/TestUpdate.java | 42 +------------------ 1 file changed, 1 insertion(+), 41 deletions(-) diff --git a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java index 8fa46339fc66..15017ce1f8cf 100644 --- a/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java +++ b/spark/v3.2/spark-extensions/src/test/java/org/apache/iceberg/spark/extensions/TestUpdate.java @@ -139,27 +139,6 @@ public void testUpdateWithAlias() { ImmutableList.of(row(1, "invalid")), sql("SELECT * FROM %s", tableName)); } -/* - @Test - public void testHadoopTables2() throws Exception { - createAndInitTable("id INT"); - List ids = Lists.newArrayListWithCapacity(2); - for (int id = 1; id <= 2; id++) { - ids.add(id); - } - Dataset df = spark.createDataset(ids, Encoders.INT()) - .withColumnRenamed("value", "id"); - df.createOrReplaceTempView("source"); - sql("INSERT INTO %s VALUES (1), (2)", tableName); - sql("MERGE INTO %s using source on %s.id = source.id " + - "WHEN MATCHED THEN UPDATE SET %s.id = source.id + 1", tableName, tableName, tableName); - List list = sql("select * from %s", tableName); - for (int i = 0; i < list.size(); i++) { - for (int j = 0; j < list.get(i).length; j++) { - System.out.println(list.get(i)[j]); - } - } - }*/ @Test public void testHadoopTables() throws Exception { @@ -172,7 +151,7 @@ public void testHadoopTables() throws Exception { HadoopTables ht = new HadoopTables(spark.sparkContext().hadoopConfiguration()); Schema tableSchema = SparkSchemaUtil.convert(df.schema()); File dir = java.nio.file.Files.createTempDirectory("TestUpdate").toFile(); - // FileUtils.forceDeleteOnExit(dir); + FileUtils.forceDeleteOnExit(dir); String path = dir.getAbsolutePath(); ht.create(tableSchema, path); df.write().format("iceberg").mode("overwrite").save(path); @@ -180,28 +159,9 @@ public void testHadoopTables() throws Exception { tableDF.createOrReplaceTempView("target"); df.createOrReplaceTempView("source"); spark.sql("select * from source").show(); - /* - LogicalPlan parsed = spark.sessionState().sqlParser().parsePlan( - "MERGE INTO target using source on target.id = source.id" + - " WHEN MATCHED THEN UPDATE SET target.id = target.id + 1 WHEN NOT MATCHED THEN INSERT *"); - LogicalPlan analyzed = spark.sessionState().analyzer().execute(parsed); - LogicalPlan optimized = spark.sessionState().optimizer().execute(analyzed); - Iterator plans = spark.sessionState().planner().plan(optimized); - while (plans.hasNext()) { - System.out.println(plans.next().treeString()); - }*/ - sql("MERGE INTO target using source on target.id = source.id " + "WHEN MATCHED THEN UPDATE SET target.id = source.id + 1"); spark.read().format("iceberg").load(path).show(); - /* - for (int i = 0; i < analyzed.children().size(); i++) { - if (analyzed.children().apply(i) instanceof ReplaceData) { - ReplaceData replaceData = (ReplaceData) analyzed.children().apply(i); - System.out.println(((LogicalPlan) replaceData.table()).resolved() + ":::" + replaceData.query().resolved() + ":::" + - replaceData.outputResolved()); - } - }*/ } @Test