diff --git a/docs/content/spark/sql-ddl.md b/docs/content/spark/sql-ddl.md index cfe105f6ac00..940be86e949e 100644 --- a/docs/content/spark/sql-ddl.md +++ b/docs/content/spark/sql-ddl.md @@ -258,6 +258,15 @@ CREATE VIEW v1 AS SELECT * FROM t1; CREATE OR REPLACE VIEW v1 AS SELECT * FROM t1; ``` +### Purge Table + +Purge table with delete data and metadata directly. + +```sql +-- purge a table +PURGE TABLE t1; +``` + ### Drop View DROP VIEW removes the metadata associated with a specified view from the catalog. diff --git a/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/PurgeTableTest.scala b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/PurgeTableTest.scala new file mode 100644 index 000000000000..e39abfcf7894 --- /dev/null +++ b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/PurgeTableTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class PurgeTableTest extends PaimonPurgeTableTestBase {} diff --git a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/PurgeTableTest.scala b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/PurgeTableTest.scala new file mode 100644 index 000000000000..e39abfcf7894 --- /dev/null +++ b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/PurgeTableTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class PurgeTableTest extends PaimonPurgeTableTestBase {} diff --git a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/PurgeTableTest.scala b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/PurgeTableTest.scala new file mode 100644 index 000000000000..e39abfcf7894 --- /dev/null +++ b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/PurgeTableTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class PurgeTableTest extends PaimonPurgeTableTestBase {} diff --git a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/PurgeTableTest.scala b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/PurgeTableTest.scala new file mode 100644 index 000000000000..e39abfcf7894 --- /dev/null +++ b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/PurgeTableTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class PurgeTableTest extends PaimonPurgeTableTestBase {} diff --git a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/PurgeTableTest.scala b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/PurgeTableTest.scala new file mode 100644 index 000000000000..e39abfcf7894 --- /dev/null +++ b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/PurgeTableTest.scala @@ -0,0 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +class PurgeTableTest extends PaimonPurgeTableTestBase {} diff --git a/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4 b/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4 index 207d9732160f..194c913241a7 100644 --- a/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4 +++ b/paimon-spark/paimon-spark-common/src/main/antlr4/org.apache.spark.sql.catalyst.parser.extensions/PaimonSqlExtensions.g4 @@ -74,7 +74,8 @@ statement | ALTER TABLE multipartIdentifier createReplaceTagClause #createOrReplaceTag | ALTER TABLE multipartIdentifier DELETE TAG (IF EXISTS)? identifier #deleteTag | ALTER TABLE multipartIdentifier RENAME TAG identifier TO identifier #renameTag - ; + | PURGE TABLE multipartIdentifier #purgeTable + ; callArgument : expression #positionalArgument @@ -179,6 +180,7 @@ TAG: 'TAG'; TAGS: 'TAGS'; TO: 'TO'; VERSION: 'VERSION'; +PURGE: 'PURGE'; TRUE: 'TRUE'; FALSE: 'FALSE'; diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PurgeTableCommand.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PurgeTableCommand.scala new file mode 100644 index 000000000000..187973a35a51 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/catalyst/plans/logical/PurgeTableCommand.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.catalyst.plans.logical + +import org.apache.paimon.spark.leafnode.PaimonLeafCommand + +import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference} +import org.apache.spark.sql.types.StringType + +case class PurgeTableCommand(table: Seq[String]) extends PaimonLeafCommand { + + override def output: Seq[Attribute] = + Seq(AttributeReference("purge", StringType, nullable = false)()) + + override def simpleString(maxFields: Int): String = { + s"Purge data for table: $table" + } +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala index fb7bc6b22cd3..23bfb204f03a 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PaimonStrategy.scala @@ -21,7 +21,7 @@ package org.apache.paimon.spark.execution import org.apache.paimon.spark.{SparkCatalog, SparkUtils} import org.apache.paimon.spark.catalog.SupportView import org.apache.paimon.spark.catalyst.analysis.ResolvedPaimonView -import org.apache.paimon.spark.catalyst.plans.logical.{CreateOrReplaceTagCommand, CreatePaimonView, DeleteTagCommand, DropPaimonView, PaimonCallCommand, RenameTagCommand, ResolvedIdentifier, ShowPaimonViews, ShowTagsCommand} +import org.apache.paimon.spark.catalyst.plans.logical.{CreateOrReplaceTagCommand, CreatePaimonView, DeleteTagCommand, DropPaimonView, PaimonCallCommand, PurgeTableCommand, RenameTagCommand, ResolvedIdentifier, ShowPaimonViews, ShowTagsCommand} import org.apache.spark.sql.{SparkSession, Strategy} import org.apache.spark.sql.catalyst.InternalRow @@ -50,6 +50,9 @@ case class PaimonStrategy(spark: SparkSession) val input = buildInternalRow(args) PaimonCallExec(c.output, procedure, input) :: Nil + case p @ PurgeTableCommand(PaimonCatalogAndIdentifier(catalog, ident)) => + PurgeTableExec(catalog, ident, p.output) :: Nil + case t @ ShowTagsCommand(PaimonCatalogAndIdentifier(catalog, ident)) => ShowTagsExec(catalog, ident, t.output) :: Nil diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PurgeTableExec.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PurgeTableExec.scala new file mode 100644 index 000000000000..2b2c5015a926 --- /dev/null +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/execution/PurgeTableExec.scala @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.execution + +import org.apache.paimon.spark.SparkTable +import org.apache.paimon.spark.leafnode.PaimonLeafV2CommandExec +import org.apache.paimon.table.FileStoreTable + +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.util.UTF8StringUtils +import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog} +import org.apache.spark.unsafe.types.UTF8String + +case class PurgeTableExec(catalog: TableCatalog, ident: Identifier, out: Seq[Attribute]) + extends PaimonLeafV2CommandExec { + override protected def run(): Seq[InternalRow] = { + val table = catalog.loadTable(ident) + assert(table.isInstanceOf[SparkTable]) + + table.asInstanceOf[SparkTable].getTable match { + case paimonTable: FileStoreTable => + val fileIO = paimonTable.fileIO() + val tablePath = paimonTable.snapshotManager().tablePath() + fileIO + .listStatus(tablePath) + .filter(f => !f.getPath.getName.contains("schema")) + .foreach(f => fileIO.delete(f.getPath, true)) + + case t => + throw new UnsupportedOperationException( + s"Only support purge table command for FileStoreTable: $t") + } + + Seq(InternalRow(UTF8String.fromString(s"${table.name()} had been purged success."))) + } + + override def output: Seq[Attribute] = out +} diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala index 557b0735c74d..e5ed76bfd71e 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/AbstractPaimonSparkSqlExtensionsParser.scala @@ -121,7 +121,8 @@ abstract class AbstractPaimonSparkSqlExtensionsParser(val delegate: ParserInterf (normalized.contains("create tag") || normalized.contains("replace tag") || normalized.contains("rename tag") || - normalized.contains("delete tag"))) + normalized.contains("delete tag"))) || + normalized.contains("purge table") } protected def parse[T](command: String)(toResult: PaimonSqlExtensionsParser => T): T = { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala index a1289a5f0b50..67ff433e1a1d 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/catalyst/parser/extensions/PaimonSqlExtensionsAstBuilder.scala @@ -153,6 +153,11 @@ class PaimonSqlExtensionsAstBuilder(delegate: ParserInterface) ctx.identifier(1).getText) } + /** Create a command for purge table. */ + override def visitPurgeTable(ctx: PurgeTableContext): PurgeTableCommand = withOrigin(ctx) { + PurgeTableCommand(typedVisit[Seq[String]](ctx.multipartIdentifier)) + } + private def toBuffer[T](list: java.util.List[T]) = list.asScala private def toSeq[T](list: java.util.List[T]) = toBuffer(list) diff --git a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPurgeTableTestBase.scala b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPurgeTableTestBase.scala new file mode 100644 index 000000000000..38354ad6431f --- /dev/null +++ b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/PaimonPurgeTableTestBase.scala @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.spark.sql + +import org.apache.paimon.spark.PaimonSparkTestBase + +import org.apache.spark.sql.Row + +abstract class PaimonPurgeTableTestBase extends PaimonSparkTestBase { + + test("purge table test") { + spark.sql("""CREATE TABLE T (id INT, name STRING) + |USING PAIMON + |TBLPROPERTIES ('primary-key'='id', 'file.format'='orc')""".stripMargin) + + spark.sql( + "insert into T select 1, 'aa'" + ) + + checkAnswer( + spark.sql("select * from `T`"), + Row(1, "aa") :: Nil + ) + + // test for purge table + spark.sql( + "PURGE TABLE T" + ) + + spark.sql( + "REFRESH TABLE T" + ) + + checkAnswer( + spark.sql("select * from `T`"), + Nil + ) + + spark.sql( + "insert into T select 2, 'bb'" + ) + + checkAnswer( + spark.sql("select * from `T`"), + Row(2, "bb") :: Nil + ) + + } + +}