From 1a01bd037832ba9af7be7ec4a3cd4ed3843d800c Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Sat, 14 Dec 2024 14:25:59 +0800 Subject: [PATCH 01/10] [spark] Support purge table command to clear data --- docs/content/spark/sql-ddl.md | 9 +++ .../paimon/spark/sql/PurgeTableTest.scala | 21 +++++++ .../paimon/spark/sql/PurgeTableTest.scala | 21 +++++++ .../paimon/spark/sql/PurgeTableTest.scala | 21 +++++++ .../paimon/spark/sql/PurgeTableTest.scala | 21 +++++++ .../paimon/spark/sql/PurgeTableTest.scala | 21 +++++++ .../PaimonSqlExtensions.g4 | 4 +- .../plans/logical/PurgeTableCommand.scala | 34 ++++++++++ .../spark/execution/PaimonStrategy.scala | 5 +- .../spark/execution/PurgeTableExec.scala | 53 ++++++++++++++++ ...stractPaimonSparkSqlExtensionsParser.scala | 3 +- .../PaimonSqlExtensionsAstBuilder.scala | 5 ++ .../procedure/RollbackProcedureTest.scala | 5 ++ .../spark/sql/PaimonPurgeTableTestBase.scala | 62 +++++++++++++++++++ 14 files changed, 282 insertions(+), 3 deletions(-) create mode 100644 paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/PurgeTableTest.scala create mode 100644 paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/PurgeTableTest.scala create mode 100644 paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/PurgeTableTest.scala create mode 100644 paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/PurgeTableTest.scala create mode 100644 paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/PurgeTableTest.scala create mode 100644 paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PurgeTableCommand.scala create mode 100644 paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PurgeTableExec.scala create mode 100644 paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPurgeTableTestBase.scala diff --git a/docs/content/spark/sql-ddl.md b/docs/content/spark/sql-ddl.md index cfe105f6ac00..940be86e949e 100644 --- a/docs/content/spark/sql-ddl.md +++ b/docs/content/spark/sql-ddl.md @@ -258,6 +258,15 @@ CREATE VIEW v1 AS SELECT * FROM t1; CREATE OR REPLACE VIEW v1 AS SELECT * FROM t1; ``` +### Purge Table + +Purge table with delete data and metadata directly. + +```sql +-- purge a table +PURGE TABLE t1; +``` + ### Drop View DROP VIEW removes the metadata associated with a specified view from the catalog. diff --git a/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/PurgeTableTest.scala b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/PurgeTableTest.scala new file mode 100644 index 000000000000..e39abfcf7894 --- /dev/null +++ b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/PurgeTableTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class PurgeTableTest extends PaimonPurgeTableTestBase {} diff --git a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/PurgeTableTest.scala b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/PurgeTableTest.scala new file mode 100644 index 000000000000..e39abfcf7894 --- /dev/null +++ b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/PurgeTableTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class PurgeTableTest extends PaimonPurgeTableTestBase {} diff --git a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/PurgeTableTest.scala b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/PurgeTableTest.scala new file mode 100644 index 000000000000..e39abfcf7894 --- /dev/null +++ b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/PurgeTableTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class PurgeTableTest extends PaimonPurgeTableTestBase {} diff --git a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/PurgeTableTest.scala b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/PurgeTableTest.scala new file mode 100644 index 000000000000..e39abfcf7894 --- /dev/null +++ b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/PurgeTableTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class PurgeTableTest extends PaimonPurgeTableTestBase {} diff --git a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/PurgeTableTest.scala b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/PurgeTableTest.scala new file mode 100644 index 000000000000..e39abfcf7894 --- /dev/null +++ b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/PurgeTableTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class PurgeTableTest extends PaimonPurgeTableTestBase {} diff --git a/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4 b/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4 index 207d9732160f..194c913241a7 100644 --- a/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4 +++ b/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4 @@ -74,7 +74,8 @@ statement | ALTER TABLE multipartIdentifier createReplaceTagClause #createOrReplaceTag | ALTER TABLE multipartIdentifier DELETE TAG (IF EXISTS)? identifier #deleteTag | ALTER TABLE multipartIdentifier RENAME TAG identifier TO identifier #renameTag - ; + | PURGE TABLE multipartIdentifier #purgeTable + ; callArgument : expression #positionalArgument @@ -179,6 +180,7 @@ TAG: 'TAG'; TAGS: 'TAGS'; TO: 'TO'; VERSION: 'VERSION'; +PURGE: 'PURGE'; TRUE: 'TRUE'; FALSE: 'FALSE'; diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PurgeTableCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PurgeTableCommand.scala new file mode 100644 index 000000000000..187973a35a51 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PurgeTableCommand.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.catalyst.plans.logical + +import org.apache.paimon.spark.leafnode.PaimonLeafCommand + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.types.StringType + +case class PurgeTableCommand(table: Seq[String]) extends PaimonLeafCommand { + + override def output: Seq[Attribute] = + Seq(AttributeReference("purge", StringType, nullable = false)()) + + override def simpleString(maxFields: Int): String = { + s"Purge data for table: $table" + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala index fb7bc6b22cd3..23bfb204f03a 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala @@ -21,7 +21,7 @@ package org.apache.paimon.spark.execution import org.apache.paimon.spark.{SparkCatalog, SparkUtils} import org.apache.paimon.spark.catalog.SupportView import org.apache.paimon.spark.catalyst.analysis.ResolvedPaimonView -import org.apache.paimon.spark.catalyst.plans.logical.{CreateOrReplaceTagCommand, CreatePaimonView, DeleteTagCommand, DropPaimonView, PaimonCallCommand, RenameTagCommand, ResolvedIdentifier, ShowPaimonViews, ShowTagsCommand} +import org.apache.paimon.spark.catalyst.plans.logical.{CreateOrReplaceTagCommand, CreatePaimonView, DeleteTagCommand, DropPaimonView, PaimonCallCommand, PurgeTableCommand, RenameTagCommand, ResolvedIdentifier, ShowPaimonViews, ShowTagsCommand} import org.apache.spark.sql.{SparkSession, Strategy} import org.apache.spark.sql.catalyst.InternalRow @@ -50,6 +50,9 @@ case class PaimonStrategy(spark: SparkSession) val input = buildInternalRow(args) PaimonCallExec(c.output, procedure, input) :: Nil + case p @ PurgeTableCommand(PaimonCatalogAndIdentifier(catalog, ident)) => + PurgeTableExec(catalog, ident, p.output) :: Nil + case t @ ShowTagsCommand(PaimonCatalogAndIdentifier(catalog, ident)) => ShowTagsExec(catalog, ident, t.output) :: Nil diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PurgeTableExec.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PurgeTableExec.scala new file mode 100644 index 000000000000..b95a45bf5cac --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PurgeTableExec.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.execution + +import org.apache.paimon.spark.SparkTable +import org.apache.paimon.spark.leafnode.PaimonLeafV2CommandExec +import org.apache.paimon.table.FileStoreTable + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} + +case class PurgeTableExec(catalog: TableCatalog, ident: Identifier, out: Seq[Attribute]) + extends PaimonLeafV2CommandExec { + override protected def run(): Seq[InternalRow] = { + val table = catalog.loadTable(ident) + assert(table.isInstanceOf[SparkTable]) + + var tags: Seq[InternalRow] = Nil + table.asInstanceOf[SparkTable].getTable match { + case paimonTable: FileStoreTable => + val fileIO = paimonTable.fileIO() + val tablePath = paimonTable.snapshotManager().tablePath() + fileIO + .listStatus(tablePath) + .filter(f => !f.getPath.getName.contains("schema")) + .foreach(f => fileIO.delete(f.getPath, true)) + + case t => + throw new UnsupportedOperationException( + s"Only support purge table command for FileStoreTable: $t") + } + tags + } + + override def output: Seq[Attribute] = out +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala index 557b0735c74d..e5ed76bfd71e 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala @@ -121,7 +121,8 @@ abstract class AbstractPaimonSparkSqlExtensionsParser(val delegate: ParserInterf (normalized.contains("create tag") || normalized.contains("replace tag") || normalized.contains("rename tag") || - normalized.contains("delete tag"))) + normalized.contains("delete tag"))) || + normalized.contains("purge table") } protected def parse[T](command: String)(toResult: PaimonSqlExtensionsParser => T): T = { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala index a1289a5f0b50..67ff433e1a1d 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala @@ -153,6 +153,11 @@ class PaimonSqlExtensionsAstBuilder(delegate: ParserInterface) ctx.identifier(1).getText) } + /** Create a command for purge table. */ + override def visitPurgeTable(ctx: PurgeTableContext): PurgeTableCommand = withOrigin(ctx) { + PurgeTableCommand(typedVisit[Seq[String]](ctx.multipartIdentifier)) + } + private def toBuffer[T](list: java.util.List[T]) = list.asScala private def toSeq[T](list: java.util.List[T]) = toBuffer(list) diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala index 457c5ba513ec..3dced90ee080 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala @@ -19,6 +19,7 @@ package org.apache.paimon.spark.procedure import org.apache.paimon.spark.PaimonSparkTestBase +import org.apache.paimon.table.FileStoreTable import org.apache.spark.sql.{Dataset, Row} import org.apache.spark.sql.execution.streaming.MemoryStream @@ -39,6 +40,10 @@ class RollbackProcedureTest extends PaimonSparkTestBase with StreamTest { |""".stripMargin) val location = loadTable("T").location().toString + val table: FileStoreTable = loadTable("T") + val fileIO = table.fileIO() + table.snapshotManager().tablePath() + val inputData = MemoryStream[(Int, String)] val stream = inputData .toDS() diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPurgeTableTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPurgeTableTestBase.scala new file mode 100644 index 000000000000..3c27a4ddd453 --- /dev/null +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPurgeTableTestBase.scala @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +import org.apache.paimon.spark.PaimonSparkTestBase + +import org.apache.spark.sql.Row + +abstract class PaimonPurgeTableTestBase extends PaimonSparkTestBase { + + test("purge table test") { + spark.sql("""CREATE TABLE T (id INT, name STRING) + |USING PAIMON + |TBLPROPERTIES ('primary-key'='id', 'file.format'='orc')""".stripMargin) + + spark.sql( + "insert into T select 1, 'aa'" + ) + + checkAnswer( + spark.sql("select * from `T`"), + Row(1, "aa") :: Nil + ) + + // test for purge table + spark.sql( + "PURGE TABLE T" + ) + + checkAnswer( + spark.sql("select * from `T`"), + Nil + ) + + spark.sql( + "insert into T select 2, 'bb'" + ) + + checkAnswer( + spark.sql("select * from `T`"), + Row(2, "bb") :: Nil + ) + + } + +} From df667fabb4792fe55b884358548b5d420184f701 Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Sat, 14 Dec 2024 15:01:57 +0800 Subject: [PATCH 02/10] fix --- .../src/main/java/org/apache/orc/impl/RecordReaderImpl.java | 2 +- .../apache/paimon/spark/sql/PaimonPurgeTableTestBase.scala | 4 ++++ 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/paimon-format/src/main/java/org/apache/orc/impl/RecordReaderImpl.java b/paimon-format/src/main/java/org/apache/orc/impl/RecordReaderImpl.java index 93aa0719caea..b9c8a432fb25 100644 --- a/paimon-format/src/main/java/org/apache/orc/impl/RecordReaderImpl.java +++ b/paimon-format/src/main/java/org/apache/orc/impl/RecordReaderImpl.java @@ -126,7 +126,7 @@ public class RecordReaderImpl implements RecordReader { private final TypeReader.ReadPhase startReadPhase; // identifies that follow columns bytes must be read private boolean needsFollowColumnsRead; - private final boolean noSelectedVector; + private boolean noSelectedVector = false; // identifies whether the file has bad bloom filters that we should not use. private final boolean skipBloomFilters; @Nullable private final FileIndexResult fileIndexResult; diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPurgeTableTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPurgeTableTestBase.scala index 3c27a4ddd453..38354ad6431f 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPurgeTableTestBase.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPurgeTableTestBase.scala @@ -43,6 +43,10 @@ abstract class PaimonPurgeTableTestBase extends PaimonSparkTestBase { "PURGE TABLE T" ) + spark.sql( + "REFRESH TABLE T" + ) + checkAnswer( spark.sql("select * from `T`"), Nil From abfa3f7b85130977cd50746782739ba67797161f Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Sat, 14 Dec 2024 15:38:53 +0800 Subject: [PATCH 03/10] fix --- .../org/apache/paimon/spark/execution/PurgeTableExec.scala | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PurgeTableExec.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PurgeTableExec.scala index b95a45bf5cac..982ed3804a65 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PurgeTableExec.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PurgeTableExec.scala @@ -32,7 +32,6 @@ case class PurgeTableExec(catalog: TableCatalog, ident: Identifier, out: Seq[Att val table = catalog.loadTable(ident) assert(table.isInstanceOf[SparkTable]) - var tags: Seq[InternalRow] = Nil table.asInstanceOf[SparkTable].getTable match { case paimonTable: FileStoreTable => val fileIO = paimonTable.fileIO() @@ -46,7 +45,7 @@ case class PurgeTableExec(catalog: TableCatalog, ident: Identifier, out: Seq[Att throw new UnsupportedOperationException( s"Only support purge table command for FileStoreTable: $t") } - tags + } override def output: Seq[Attribute] = out From 6524bba09c6f78cd980d74d422edade21ef97a00 Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Sat, 14 Dec 2024 15:42:30 +0800 Subject: [PATCH 04/10] fix --- .../org/apache/paimon/spark/execution/PurgeTableExec.scala | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PurgeTableExec.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PurgeTableExec.scala index 982ed3804a65..fb1d31b0176c 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PurgeTableExec.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PurgeTableExec.scala @@ -21,10 +21,11 @@ package org.apache.paimon.spark.execution import org.apache.paimon.spark.SparkTable import org.apache.paimon.spark.leafnode.PaimonLeafV2CommandExec import org.apache.paimon.table.FileStoreTable - import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.util.UTF8StringUtils import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} +import org.apache.spark.unsafe.types.UTF8String case class PurgeTableExec(catalog: TableCatalog, ident: Identifier, out: Seq[Attribute]) extends PaimonLeafV2CommandExec { @@ -45,7 +46,8 @@ case class PurgeTableExec(catalog: TableCatalog, ident: Identifier, out: Seq[Att throw new UnsupportedOperationException( s"Only support purge table command for FileStoreTable: $t") } - + + Seq(InternalRow(UTF8String.fromString("true"))) } override def output: Seq[Attribute] = out From 7dd797491b722978e44776b2d2314543f75745aa Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Sat, 14 Dec 2024 15:47:05 +0800 Subject: [PATCH 05/10] fix --- .../org/apache/paimon/spark/execution/PurgeTableExec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PurgeTableExec.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PurgeTableExec.scala index fb1d31b0176c..3189960a1a37 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PurgeTableExec.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PurgeTableExec.scala @@ -47,7 +47,7 @@ case class PurgeTableExec(catalog: TableCatalog, ident: Identifier, out: Seq[Att s"Only support purge table command for FileStoreTable: $t") } - Seq(InternalRow(UTF8String.fromString("true"))) + Seq(InternalRow(UTF8String.fromString(s"${table.name()} had been purged success."))) } override def output: Seq[Attribute] = out From 7a47462052b25c9b7fbdb5051e2e0ebee0ce79b3 Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Sat, 14 Dec 2024 15:49:01 +0800 Subject: [PATCH 06/10] fix style --- .../scala/org/apache/paimon/spark/execution/PurgeTableExec.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PurgeTableExec.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PurgeTableExec.scala index 3189960a1a37..2b2c5015a926 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PurgeTableExec.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PurgeTableExec.scala @@ -21,6 +21,7 @@ package org.apache.paimon.spark.execution import org.apache.paimon.spark.SparkTable import org.apache.paimon.spark.leafnode.PaimonLeafV2CommandExec import org.apache.paimon.table.FileStoreTable + import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.catalyst.util.UTF8StringUtils From f7e3254f0f55b0bc07630e871380d9e767607abc Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Sat, 14 Dec 2024 15:50:11 +0800 Subject: [PATCH 07/10] fix style --- .../src/main/java/org/apache/orc/impl/RecordReaderImpl.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-format/src/main/java/org/apache/orc/impl/RecordReaderImpl.java b/paimon-format/src/main/java/org/apache/orc/impl/RecordReaderImpl.java index b9c8a432fb25..93aa0719caea 100644 --- a/paimon-format/src/main/java/org/apache/orc/impl/RecordReaderImpl.java +++ b/paimon-format/src/main/java/org/apache/orc/impl/RecordReaderImpl.java @@ -126,7 +126,7 @@ public class RecordReaderImpl implements RecordReader { private final TypeReader.ReadPhase startReadPhase; // identifies that follow columns bytes must be read private boolean needsFollowColumnsRead; - private boolean noSelectedVector = false; + private final boolean noSelectedVector; // identifies whether the file has bad bloom filters that we should not use. private final boolean skipBloomFilters; @Nullable private final FileIndexResult fileIndexResult; From 62a51de3572003a8d1bbd49c7f210c8b3699e13e Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Sat, 14 Dec 2024 15:54:34 +0800 Subject: [PATCH 08/10] remove case --- .../paimon/spark/procedure/RollbackProcedureTest.scala | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala index 3dced90ee080..54c2a52b20d3 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala @@ -39,11 +39,7 @@ class RollbackProcedureTest extends PaimonSparkTestBase with StreamTest { |TBLPROPERTIES ('primary-key'='a', 'bucket'='3') |""".stripMargin) val location = loadTable("T").location().toString - - val table: FileStoreTable = loadTable("T") - val fileIO = table.fileIO() - table.snapshotManager().tablePath() - + val inputData = MemoryStream[(Int, String)] val stream = inputData .toDS() From 8d96fa10e8d8d4bc723cc611e797f35b73862498 Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Sat, 14 Dec 2024 16:00:56 +0800 Subject: [PATCH 09/10] remove case --- .../apache/paimon/spark/procedure/RollbackProcedureTest.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala index 54c2a52b20d3..fcae7c2dee94 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala @@ -39,7 +39,7 @@ class RollbackProcedureTest extends PaimonSparkTestBase with StreamTest { |TBLPROPERTIES ('primary-key'='a', 'bucket'='3') |""".stripMargin) val location = loadTable("T").location().toString - + val inputData = MemoryStream[(Int, String)] val stream = inputData .toDS() From 2188bb015852bd942fe379d90889bd45f0995bcb Mon Sep 17 00:00:00 2001 From: xuyu <11161569@vivo.com> Date: Sat, 14 Dec 2024 16:17:03 +0800 Subject: [PATCH 10/10] 1 --- .../apache/paimon/spark/procedure/RollbackProcedureTest.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala index fcae7c2dee94..457c5ba513ec 100644 --- a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/procedure/RollbackProcedureTest.scala @@ -19,7 +19,6 @@ package org.apache.paimon.spark.procedure import org.apache.paimon.spark.PaimonSparkTestBase -import org.apache.paimon.table.FileStoreTable import org.apache.spark.sql.{Dataset, Row} import org.apache.spark.sql.execution.streaming.MemoryStream