Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions docs/content/spark/sql-ddl.md
Original file line number Diff line number Diff line change
Expand Up @@ -258,6 +258,15 @@ CREATE VIEW v1 AS SELECT * FROM t1;
CREATE OR REPLACE VIEW v1 AS SELECT * FROM t1;
```

### Purge Table

Purge table with delete data and metadata directly.

```sql
-- purge a table
PURGE TABLE t1;
```

### Drop View

DROP VIEW removes the metadata associated with a specified view from the catalog.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.sql

class PurgeTableTest extends PaimonPurgeTableTestBase {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.sql

class PurgeTableTest extends PaimonPurgeTableTestBase {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.sql

class PurgeTableTest extends PaimonPurgeTableTestBase {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.sql

class PurgeTableTest extends PaimonPurgeTableTestBase {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.sql

class PurgeTableTest extends PaimonPurgeTableTestBase {}
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ statement
| ALTER TABLE multipartIdentifier createReplaceTagClause #createOrReplaceTag
| ALTER TABLE multipartIdentifier DELETE TAG (IF EXISTS)? identifier #deleteTag
| ALTER TABLE multipartIdentifier RENAME TAG identifier TO identifier #renameTag
;
| PURGE TABLE multipartIdentifier #purgeTable
;

callArgument
: expression #positionalArgument
Expand Down Expand Up @@ -179,6 +180,7 @@ TAG: 'TAG';
TAGS: 'TAGS';
TO: 'TO';
VERSION: 'VERSION';
PURGE: 'PURGE';

TRUE: 'TRUE';
FALSE: 'FALSE';
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.catalyst.plans.logical

import org.apache.paimon.spark.leafnode.PaimonLeafCommand

import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.types.StringType

case class PurgeTableCommand(table: Seq[String]) extends PaimonLeafCommand {

override def output: Seq[Attribute] =
Seq(AttributeReference("purge", StringType, nullable = false)())

override def simpleString(maxFields: Int): String = {
s"Purge data for table: $table"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package org.apache.paimon.spark.execution
import org.apache.paimon.spark.{SparkCatalog, SparkUtils}
import org.apache.paimon.spark.catalog.SupportView
import org.apache.paimon.spark.catalyst.analysis.ResolvedPaimonView
import org.apache.paimon.spark.catalyst.plans.logical.{CreateOrReplaceTagCommand, CreatePaimonView, DeleteTagCommand, DropPaimonView, PaimonCallCommand, RenameTagCommand, ResolvedIdentifier, ShowPaimonViews, ShowTagsCommand}
import org.apache.paimon.spark.catalyst.plans.logical.{CreateOrReplaceTagCommand, CreatePaimonView, DeleteTagCommand, DropPaimonView, PaimonCallCommand, PurgeTableCommand, RenameTagCommand, ResolvedIdentifier, ShowPaimonViews, ShowTagsCommand}

import org.apache.spark.sql.{SparkSession, Strategy}
import org.apache.spark.sql.catalyst.InternalRow
Expand Down Expand Up @@ -50,6 +50,9 @@ case class PaimonStrategy(spark: SparkSession)
val input = buildInternalRow(args)
PaimonCallExec(c.output, procedure, input) :: Nil

case p @ PurgeTableCommand(PaimonCatalogAndIdentifier(catalog, ident)) =>
PurgeTableExec(catalog, ident, p.output) :: Nil

case t @ ShowTagsCommand(PaimonCatalogAndIdentifier(catalog, ident)) =>
ShowTagsExec(catalog, ident, t.output) :: Nil

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.execution

import org.apache.paimon.spark.SparkTable
import org.apache.paimon.spark.leafnode.PaimonLeafV2CommandExec
import org.apache.paimon.table.FileStoreTable

import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.util.UTF8StringUtils
import org.apache.spark.sql.connector.catalog.{Identifier, TableCatalog}
import org.apache.spark.unsafe.types.UTF8String

case class PurgeTableExec(catalog: TableCatalog, ident: Identifier, out: Seq[Attribute])
extends PaimonLeafV2CommandExec {
override protected def run(): Seq[InternalRow] = {
val table = catalog.loadTable(ident)
assert(table.isInstanceOf[SparkTable])

table.asInstanceOf[SparkTable].getTable match {
case paimonTable: FileStoreTable =>
val fileIO = paimonTable.fileIO()
val tablePath = paimonTable.snapshotManager().tablePath()
fileIO
.listStatus(tablePath)
.filter(f => !f.getPath.getName.contains("schema"))
.foreach(f => fileIO.delete(f.getPath, true))

case t =>
throw new UnsupportedOperationException(
s"Only support purge table command for FileStoreTable: $t")
}

Seq(InternalRow(UTF8String.fromString(s"${table.name()} had been purged success.")))
}

override def output: Seq[Attribute] = out
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ abstract class AbstractPaimonSparkSqlExtensionsParser(val delegate: ParserInterf
(normalized.contains("create tag") ||
normalized.contains("replace tag") ||
normalized.contains("rename tag") ||
normalized.contains("delete tag")))
normalized.contains("delete tag"))) ||
normalized.contains("purge table")
}

protected def parse[T](command: String)(toResult: PaimonSqlExtensionsParser => T): T = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,11 @@ class PaimonSqlExtensionsAstBuilder(delegate: ParserInterface)
ctx.identifier(1).getText)
}

/** Create a command for purge table. */
override def visitPurgeTable(ctx: PurgeTableContext): PurgeTableCommand = withOrigin(ctx) {
PurgeTableCommand(typedVisit[Seq[String]](ctx.multipartIdentifier))
}

private def toBuffer[T](list: java.util.List[T]) = list.asScala

private def toSeq[T](list: java.util.List[T]) = toBuffer(list)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.paimon.spark.sql

import org.apache.paimon.spark.PaimonSparkTestBase

import org.apache.spark.sql.Row

abstract class PaimonPurgeTableTestBase extends PaimonSparkTestBase {

test("purge table test") {
spark.sql("""CREATE TABLE T (id INT, name STRING)
|USING PAIMON
|TBLPROPERTIES ('primary-key'='id', 'file.format'='orc')""".stripMargin)

spark.sql(
"insert into T select 1, 'aa'"
)

checkAnswer(
spark.sql("select * from `T`"),
Row(1, "aa") :: Nil
)

// test for purge table
spark.sql(
"PURGE TABLE T"
)

spark.sql(
"REFRESH TABLE T"
)

checkAnswer(
spark.sql("select * from `T`"),
Nil
)

spark.sql(
"insert into T select 2, 'bb'"
)

checkAnswer(
spark.sql("select * from `T`"),
Row(2, "bb") :: Nil
)

}

}
Loading