From af9ec203cc5ce25963bbd79d98c0d273675f6c8e Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 21 Jul 2025 14:52:10 -0400 Subject: [PATCH 1/5] support DATABASE concept in Beam SQL --- .../trigger_files/beam_PostCommit_SQL.json | 2 +- CHANGES.md | 1 + .../sql/src/main/codegen/config.fmpp | 6 + .../src/main/codegen/includes/parserImpls.ftl | 89 +++++++++++ .../sql/impl/parser/SqlCreateDatabase.java | 122 +++++++++++++++ .../sql/impl/parser/SqlDropDatabase.java | 122 +++++++++++++++ .../sql/impl/parser/SqlUseDatabase.java | 103 +++++++++++++ .../extensions/sql/meta/catalog/Catalog.java | 45 ++++++ .../sql/meta/catalog/InMemoryCatalog.java | 41 +++++ .../meta/provider/iceberg/IcebergCatalog.java | 47 +++++- .../iceberg/IcebergTableProvider.java | 24 +-- .../sql/BeamSqlCliDatabaseTest.java | 101 +++++++++++++ .../iceberg/BeamSqlCliIcebergTest.java | 140 ++++++++++++++++++ .../iceberg/IcebergTableProviderTest.java | 10 +- .../sdk/io/iceberg/IcebergCatalogConfig.java | 58 ++++++++ 15 files changed, 882 insertions(+), 29 deletions(-) create mode 100644 sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateDatabase.java create mode 100644 sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDropDatabase.java create mode 100644 sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlUseDatabase.java create mode 100644 sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliDatabaseTest.java create mode 100644 sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/BeamSqlCliIcebergTest.java diff --git a/.github/trigger_files/beam_PostCommit_SQL.json b/.github/trigger_files/beam_PostCommit_SQL.json index 5df3841d2363..833fd9b0d174 100644 --- a/.github/trigger_files/beam_PostCommit_SQL.json +++ b/.github/trigger_files/beam_PostCommit_SQL.json @@ -1,4 +1,4 @@ { "comment": "Modify this file in a trivial way to cause this test suite to run ", - "modification": 3 + "modification": 2 } diff --git a/CHANGES.md b/CHANGES.md index ea826d50713d..e86dc7bee304 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -84,6 +84,7 @@ * Milvus enrichment handler added (Python) ([#35216](https://github.com/apache/beam/pull/35216)). Beam now supports Milvus enrichment handler capabilities for vector, keyword, and hybrid search operations. +* [Beam SQL] Add support for DATABASEs, with an implementation for Iceberg ([]()) ## Breaking Changes diff --git a/sdks/java/extensions/sql/src/main/codegen/config.fmpp b/sdks/java/extensions/sql/src/main/codegen/config.fmpp index 623a3e2792f7..7914cf6fcb79 100644 --- a/sdks/java/extensions/sql/src/main/codegen/config.fmpp +++ b/sdks/java/extensions/sql/src/main/codegen/config.fmpp @@ -26,11 +26,14 @@ data: { "org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlDrop" "org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.type.SqlTypeName" "org.apache.beam.sdk.extensions.sql.impl.parser.SqlCreateCatalog" + "org.apache.beam.sdk.extensions.sql.impl.parser.SqlCreateDatabase" "org.apache.beam.sdk.extensions.sql.impl.parser.SqlCreateExternalTable" "org.apache.beam.sdk.extensions.sql.impl.parser.SqlCreateFunction" "org.apache.beam.sdk.extensions.sql.impl.parser.SqlDropCatalog" + "org.apache.beam.sdk.extensions.sql.impl.parser.SqlDropDatabase" "org.apache.beam.sdk.extensions.sql.impl.parser.SqlDdlNodes" "org.apache.beam.sdk.extensions.sql.impl.parser.SqlUseCatalog" + "org.apache.beam.sdk.extensions.sql.impl.parser.SqlUseDatabase" "org.apache.beam.sdk.extensions.sql.impl.parser.SqlSetOptionBeam" "org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils" "org.apache.beam.sdk.schemas.Schema" @@ -395,6 +398,7 @@ data: { # Example: SqlShowDatabases(), SqlShowTables(). statementParserMethods: [ "SqlUseCatalog(Span.of(), null)" + "SqlUseDatabase(Span.of(), null)" "SqlSetOptionBeam(Span.of(), null)" ] @@ -427,6 +431,7 @@ data: { # Each must accept arguments "(SqlParserPos pos, boolean replace)". createStatementParserMethods: [ "SqlCreateCatalog" + "SqlCreateDatabase" "SqlCreateExternalTable" "SqlCreateFunction" "SqlCreateTableNotSupportedMessage" @@ -436,6 +441,7 @@ data: { # Each must accept arguments "(SqlParserPos pos)". dropStatementParserMethods: [ "SqlDropTable" + "SqlDropDatabase" "SqlDropCatalog" ] diff --git a/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl b/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl index 450c6eeaff7f..470cbb443895 100644 --- a/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl +++ b/sdks/java/extensions/sql/src/main/codegen/includes/parserImpls.ftl @@ -27,6 +27,15 @@ boolean IfExistsOpt() : { return false; } } +boolean CascadeOpt() : +{ +} +{ + { return true; } +| + { return false; } +} + SqlNodeList Options() : { final Span s; @@ -255,6 +264,86 @@ SqlDrop SqlDropCatalog(Span s, boolean replace) : } } +/** + * CREATE DATABASE ( IF NOT EXISTS )? database_name + */ +SqlCreate SqlCreateDatabase(Span s, boolean replace) : +{ + final boolean ifNotExists; + final SqlNode databaseName; +} +{ + { + s.add(this); + } + + ifNotExists = IfNotExistsOpt() + ( + databaseName = StringLiteral() + | + databaseName = SimpleIdentifier() + ) + + { + return new SqlCreateDatabase( + s.end(this), + replace, + ifNotExists, + databaseName); + } +} + +/** + * USE DATABASE database_name + */ +SqlCall SqlUseDatabase(Span s, String scope) : +{ + final SqlNode databaseName; +} +{ + { + s.add(this); + } + + ( + databaseName = StringLiteral() + | + databaseName = SimpleIdentifier() + ) + { + return new SqlUseDatabase( + s.end(this), + scope, + databaseName); + } +} + +/** + * DROP DATABASE [ IF EXISTS ] database_name [ RESTRICT | CASCADE ] + */ +SqlDrop SqlDropDatabase(Span s, boolean replace) : +{ + final boolean ifExists; + final SqlNode databaseName; + final boolean cascade; +} +{ + + ifExists = IfExistsOpt() + ( + databaseName = StringLiteral() + | + databaseName = SimpleIdentifier() + ) + + cascade = CascadeOpt() + + { + return new SqlDropDatabase(s.end(this), ifExists, databaseName, cascade); + } +} + + SqlNodeList PartitionFieldList() : { final List list = new ArrayList(); diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateDatabase.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateDatabase.java new file mode 100644 index 000000000000..feb3cd46a65a --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateDatabase.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.parser; + +import static java.lang.String.format; +import static org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.util.Static.RESOURCE; + +import java.util.List; +import org.apache.beam.sdk.extensions.sql.impl.BeamCalciteSchema; +import org.apache.beam.sdk.extensions.sql.meta.catalog.Catalog; +import org.apache.beam.sdk.extensions.sql.meta.catalog.CatalogManager; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.jdbc.CalcitePrepare; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.jdbc.CalciteSchema; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.schema.Schema; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlCreate; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlIdentifier; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlKind; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlNode; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlOperator; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlUtil; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlWriter; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.util.Pair; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SqlCreateDatabase extends SqlCreate implements BeamSqlParser.ExecutableStatement { + private static final Logger LOG = LoggerFactory.getLogger(SqlCreateDatabase.class); + private final SqlIdentifier databaseName; + private static final SqlOperator OPERATOR = + new SqlSpecialOperator("CREATE DATABASE", SqlKind.OTHER_DDL); + + public SqlCreateDatabase( + SqlParserPos pos, boolean replace, boolean ifNotExists, SqlNode databaseName) { + super(OPERATOR, pos, replace, ifNotExists); + this.databaseName = SqlDdlNodes.getIdentifier(databaseName, pos); + } + + @Override + public List getOperandList() { + ImmutableList.Builder operands = ImmutableList.builder(); + operands.add(databaseName); + return operands.build(); + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + writer.keyword("CREATE"); + if (getReplace()) { + writer.keyword("OR REPLACE"); + } + writer.keyword("DATABASE"); + if (ifNotExists) { + writer.keyword("IF NOT EXISTS"); + } + databaseName.unparse(writer, leftPrec, rightPrec); + writer.keyword("TYPE"); + } + + @Override + public void execute(CalcitePrepare.Context context) { + final Pair pair = SqlDdlNodes.schema(context, true, databaseName); + Schema schema = pair.left.schema; + String name = pair.right; + + if (!(schema instanceof BeamCalciteSchema)) { + throw SqlUtil.newContextException( + databaseName.getParserPosition(), + RESOURCE.internal("Schema is not of instance BeamCalciteSchema")); + } + + @Nullable CatalogManager catalogManager = ((BeamCalciteSchema) schema).getCatalogManager(); + if (catalogManager == null) { + throw SqlUtil.newContextException( + databaseName.getParserPosition(), + RESOURCE.internal( + format( + "Unexpected 'CREATE DATABASE' call using Schema '%s' that is not a Catalog.", + name))); + } + + // Attempt to create the database. + Catalog catalog = catalogManager.currentCatalog(); + try { + LOG.info("Creating database '{}'", name); + boolean created = catalog.createDatabase(name); + + if (created) { + LOG.info("Successfully created database '{}'", name); + } else if (ifNotExists) { + LOG.info("Database '{}' already exists.", name); + } else { + throw SqlUtil.newContextException( + databaseName.getParserPosition(), + RESOURCE.internal(format("Database '%s' already exists.", name))); + } + } catch (Exception e) { + throw SqlUtil.newContextException( + databaseName.getParserPosition(), + RESOURCE.internal( + format("Encountered an error when creating database '%s': %s", name, e))); + } + } +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDropDatabase.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDropDatabase.java new file mode 100644 index 000000000000..639edc9ca15d --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlDropDatabase.java @@ -0,0 +1,122 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.parser; + +import static java.lang.String.format; +import static org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.util.Static.RESOURCE; + +import java.util.List; +import org.apache.beam.sdk.extensions.sql.impl.BeamCalciteSchema; +import org.apache.beam.sdk.extensions.sql.meta.catalog.Catalog; +import org.apache.beam.sdk.extensions.sql.meta.catalog.CatalogManager; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.jdbc.CalcitePrepare; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.jdbc.CalciteSchema; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.schema.Schema; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlDrop; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlIdentifier; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlKind; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlNode; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlOperator; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlUtil; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlWriter; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.util.Pair; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SqlDropDatabase extends SqlDrop implements BeamSqlParser.ExecutableStatement { + private static final Logger LOG = LoggerFactory.getLogger(SqlDropDatabase.class); + private static final SqlOperator OPERATOR = + new SqlSpecialOperator("DROP DATABASE", SqlKind.OTHER_DDL); + private final SqlIdentifier databaseName; + private final boolean cascade; + + public SqlDropDatabase( + SqlParserPos pos, boolean ifExists, SqlNode databaseName, boolean cascade) { + super(OPERATOR, pos, ifExists); + this.databaseName = SqlDdlNodes.getIdentifier(databaseName, pos); + this.cascade = cascade; + } + + @Override + public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { + writer.keyword(getOperator().getName()); + if (ifExists) { + writer.keyword("IF EXISTS"); + } + databaseName.unparse(writer, leftPrec, rightPrec); + if (cascade) { + writer.keyword("CASCADE"); + } else { + writer.keyword("RESTRICT"); + } + } + + @Override + public void execute(CalcitePrepare.Context context) { + final Pair pair = SqlDdlNodes.schema(context, true, databaseName); + Schema schema = pair.left.schema; + String name = pair.right; + + if (!(schema instanceof BeamCalciteSchema)) { + throw SqlUtil.newContextException( + databaseName.getParserPosition(), + RESOURCE.internal("Schema is not of instance BeamCalciteSchema")); + } + + BeamCalciteSchema beamCalciteSchema = (BeamCalciteSchema) schema; + @Nullable CatalogManager catalogManager = beamCalciteSchema.getCatalogManager(); + if (catalogManager == null) { + throw SqlUtil.newContextException( + databaseName.getParserPosition(), + RESOURCE.internal( + String.format( + "Unexpected 'DROP DATABASE' call using Schema '%s' that is not a Catalog.", + name))); + } + + Catalog catalog = catalogManager.currentCatalog(); + try { + LOG.info("Dropping database '{}'", name); + boolean dropped = catalog.dropDatabase(name, cascade); + + if (dropped) { + LOG.info("Successfully dropped database '{}'", name); + } else if (ifExists) { + LOG.info("Database '{}' does not exist.", name); + } else { + throw SqlUtil.newContextException( + databaseName.getParserPosition(), + RESOURCE.internal(String.format("Database '%s' does not exist.", name))); + } + } catch (Exception e) { + throw SqlUtil.newContextException( + databaseName.getParserPosition(), + RESOURCE.internal( + format("Encountered an error when dropping database '%s': %s", name, e))); + } + } + + @Override + public List getOperandList() { + return ImmutableList.of(databaseName); + } +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlUseDatabase.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlUseDatabase.java new file mode 100644 index 000000000000..40523e50a63f --- /dev/null +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlUseDatabase.java @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.impl.parser; + +import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull; +import static org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.util.Static.RESOURCE; + +import java.util.Collections; +import java.util.List; +import org.apache.beam.sdk.extensions.sql.impl.BeamCalciteSchema; +import org.apache.beam.sdk.extensions.sql.meta.catalog.Catalog; +import org.apache.beam.sdk.extensions.sql.meta.catalog.CatalogManager; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.jdbc.CalcitePrepare; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.jdbc.CalciteSchema; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.schema.Schema; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlIdentifier; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlKind; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlNode; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlOperator; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlSetOption; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlSpecialOperator; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.SqlUtil; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.parser.SqlParserPos; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.util.Pair; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class SqlUseDatabase extends SqlSetOption implements BeamSqlParser.ExecutableStatement { + private static final Logger LOG = LoggerFactory.getLogger(SqlUseDatabase.class); + private final SqlIdentifier databaseName; + + private static final SqlOperator OPERATOR = new SqlSpecialOperator("USE DATABASE", SqlKind.OTHER); + + public SqlUseDatabase(SqlParserPos pos, String scope, SqlNode databaseName) { + super(pos, scope, SqlDdlNodes.getIdentifier(databaseName, pos), null); + this.databaseName = SqlDdlNodes.getIdentifier(databaseName, pos); + } + + @Override + public SqlOperator getOperator() { + return OPERATOR; + } + + @Override + public List getOperandList() { + return Collections.singletonList(databaseName); + } + + @Override + public void execute(CalcitePrepare.Context context) { + final Pair pair = SqlDdlNodes.schema(context, true, databaseName); + Schema schema = pair.left.schema; + String name = checkStateNotNull(pair.right); + + if (!(schema instanceof BeamCalciteSchema)) { + throw SqlUtil.newContextException( + databaseName.getParserPosition(), + RESOURCE.internal("Schema is not of instance BeamCalciteSchema")); + } + + BeamCalciteSchema beamCalciteSchema = (BeamCalciteSchema) schema; + @Nullable CatalogManager catalogManager = beamCalciteSchema.getCatalogManager(); + if (catalogManager == null) { + throw SqlUtil.newContextException( + databaseName.getParserPosition(), + RESOURCE.internal( + String.format( + "Unexpected 'USE DATABASE' call using Schema '%s' that is not a Catalog.", + name))); + } + + Catalog catalog = catalogManager.currentCatalog(); + if (!catalog.listDatabases().contains(name)) { + throw SqlUtil.newContextException( + databaseName.getParserPosition(), + RESOURCE.internal(String.format("Cannot use database: '%s' not found.", name))); + } + + if (name.equals(catalog.currentDatabase())) { + LOG.info("Database '{}' is already in use.", name); + return; + } + + catalog.useDatabase(name); + LOG.info("Switched to database '{}'.", name); + } +} diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/Catalog.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/Catalog.java index 2a99209e06f5..e347584654cd 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/Catalog.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/Catalog.java @@ -18,8 +18,10 @@ package org.apache.beam.sdk.extensions.sql.meta.catalog; import java.util.Map; +import java.util.Set; import org.apache.beam.sdk.annotations.Internal; import org.apache.beam.sdk.extensions.sql.meta.store.MetaStore; +import org.checkerframework.checker.nullness.qual.Nullable; /** * Represents a named and configurable container for managing tables. Is defined with a type and @@ -28,12 +30,55 @@ */ @Internal public interface Catalog { + // Default database name + String DEFAULT = "default"; + /** A type that defines this catalog. */ String type(); /** The underlying {@link MetaStore} that actually manages tables. */ MetaStore metaStore(); + /** + * Produces the currently active database. Can be null if no database is active. + * + * @return the current active database + */ + @Nullable + String currentDatabase(); + + /** + * Creates a database with this name. + * + * @param databaseName + * @return true if the database was created, false otherwise. + */ + boolean createDatabase(String databaseName); + + /** + * Returns a set of existing databases accessible to this catalog. + * + * @return a set of existing database names + */ + Set listDatabases(); + + /** + * Switches to use the specified database. + * + * @param databaseName + */ + void useDatabase(String databaseName); + + /** + * Drops the database with this name. If cascade is true, the catalog should first drop all tables + * contained in this database. + * + * @param databaseName + * @param cascade + * @return true if the database was dropped, false otherwise. + */ + boolean dropDatabase(String databaseName, boolean cascade); + /** The name of this catalog, specified by the user. */ String name(); diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java index 68ab13ef6187..2bcf484c0424 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java @@ -17,15 +17,24 @@ */ package org.apache.beam.sdk.extensions.sql.meta.catalog; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; +import static org.apache.curator.shaded.com.google.common.base.Preconditions.checkState; + +import java.util.Collections; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore; import org.apache.beam.sdk.extensions.sql.meta.store.MetaStore; import org.apache.beam.sdk.util.Preconditions; +import org.checkerframework.checker.nullness.qual.Nullable; public class InMemoryCatalog implements Catalog { private final String name; private final Map properties; private final InMemoryMetaStore metaStore = new InMemoryMetaStore(); + private final HashSet databases = new HashSet<>(Collections.singleton(DEFAULT)); + protected @Nullable String currentDatabase = DEFAULT; public InMemoryCatalog(String name, Map properties) { this.name = name; @@ -52,4 +61,36 @@ public MetaStore metaStore() { public Map properties() { return Preconditions.checkStateNotNull(properties, "InMemoryCatalog has not been initialized"); } + + @Override + public boolean createDatabase(String database) { + return databases.add(database); + } + + @Override + public void useDatabase(String database) { + checkArgument(listDatabases().contains(database), "Database '%s' does not exist."); + currentDatabase = database; + } + + @Override + public @Nullable String currentDatabase() { + return currentDatabase; + } + + @Override + public boolean dropDatabase(String database, boolean cascade) { + checkState(!cascade, getClass().getSimpleName() + " does not support CASCADE."); + + boolean removed = databases.remove(database); + if (database.equals(currentDatabase)) { + currentDatabase = null; + } + return removed; + } + + @Override + public Set listDatabases() { + return databases; + } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java index 58c686da12f4..1209d2b4663d 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergCatalog.java @@ -18,15 +18,41 @@ package org.apache.beam.sdk.extensions.sql.meta.provider.iceberg; import java.util.Map; +import java.util.Set; import org.apache.beam.sdk.extensions.sql.meta.catalog.InMemoryCatalog; import org.apache.beam.sdk.extensions.sql.meta.store.InMemoryMetaStore; +import org.apache.beam.sdk.io.iceberg.IcebergCatalogConfig; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; public class IcebergCatalog extends InMemoryCatalog { + // TODO(ahmedabu98): extend this to the IO implementation so + // other SDKs can make use of it too + private static final String BEAM_HADOOP_PREFIX = "beam.catalog.hadoop"; private final InMemoryMetaStore metaStore = new InMemoryMetaStore(); + @VisibleForTesting final IcebergCatalogConfig catalogConfig; public IcebergCatalog(String name, Map properties) { super(name, properties); - metaStore.registerProvider(new IcebergTableProvider(name, properties)); + + ImmutableMap.Builder catalogProps = ImmutableMap.builder(); + ImmutableMap.Builder hadoopProps = ImmutableMap.builder(); + + for (Map.Entry entry : properties.entrySet()) { + if (entry.getKey().startsWith(BEAM_HADOOP_PREFIX)) { + hadoopProps.put(entry.getKey(), entry.getValue()); + } else { + catalogProps.put(entry.getKey(), entry.getValue()); + } + } + + catalogConfig = + IcebergCatalogConfig.builder() + .setCatalogName(name) + .setCatalogProperties(catalogProps.build()) + .setConfigProperties(hadoopProps.build()) + .build(); + metaStore.registerProvider(new IcebergTableProvider(catalogConfig)); } @Override @@ -38,4 +64,23 @@ public InMemoryMetaStore metaStore() { public String type() { return "iceberg"; } + + @Override + public boolean createDatabase(String database) { + return catalogConfig.createNamespace(database); + } + + @Override + public boolean dropDatabase(String database, boolean cascade) { + boolean removed = catalogConfig.dropNamespace(database, cascade); + if (database.equals(currentDatabase)) { + currentDatabase = null; + } + return removed; + } + + @Override + public Set listDatabases() { + return catalogConfig.listNamespaces(); + } } diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProvider.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProvider.java index e52900ce81e5..568893716581 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProvider.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProvider.java @@ -28,7 +28,6 @@ import org.apache.beam.sdk.io.iceberg.IcebergCatalogConfig; import org.apache.beam.sdk.io.iceberg.TableAlreadyExistsException; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; -import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,30 +37,11 @@ */ public class IcebergTableProvider implements TableProvider { private static final Logger LOG = LoggerFactory.getLogger(IcebergTableProvider.class); - // TODO(ahmedabu98): extend this to the IO implementation so - // other SDKs can make use of it too - private static final String BEAM_HADOOP_PREFIX = "beam.catalog.hadoop"; @VisibleForTesting final IcebergCatalogConfig catalogConfig; private final Map tables = new HashMap<>(); - public IcebergTableProvider(String name, Map properties) { - ImmutableMap.Builder catalogProps = ImmutableMap.builder(); - ImmutableMap.Builder hadoopProps = ImmutableMap.builder(); - - for (Map.Entry entry : properties.entrySet()) { - if (entry.getKey().startsWith(BEAM_HADOOP_PREFIX)) { - hadoopProps.put(entry.getKey(), entry.getValue()); - } else { - catalogProps.put(entry.getKey(), entry.getValue()); - } - } - - catalogConfig = - IcebergCatalogConfig.builder() - .setCatalogName(name) - .setCatalogProperties(catalogProps.build()) - .setConfigProperties(hadoopProps.build()) - .build(); + public IcebergTableProvider(IcebergCatalogConfig catalogConfig) { + this.catalogConfig = catalogConfig; } @Override diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliDatabaseTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliDatabaseTest.java new file mode 100644 index 000000000000..1530141c6e22 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/BeamSqlCliDatabaseTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql; + +import static org.junit.Assert.assertEquals; + +import org.apache.beam.sdk.extensions.sql.meta.catalog.InMemoryCatalogManager; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.runtime.CalciteContextException; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; + +/** UnitTest for {@link BeamSqlCli} using databases. */ +public class BeamSqlCliDatabaseTest { + @Rule public transient ExpectedException thrown = ExpectedException.none(); + private InMemoryCatalogManager catalogManager; + private BeamSqlCli cli; + + @Before + public void setupCli() { + catalogManager = new InMemoryCatalogManager(); + cli = new BeamSqlCli().catalogManager(catalogManager); + } + + @Test + public void testCreateDatabase() { + cli.execute("CREATE DATABASE my_database"); + assertEquals( + ImmutableSet.of("default", "my_database"), catalogManager.currentCatalog().listDatabases()); + } + + @Test + public void testCreateDuplicateDatabase_error() { + cli.execute("CREATE DATABASE my_database"); + thrown.expect(CalciteContextException.class); + thrown.expectMessage("Database 'my_database' already exists."); + cli.execute("CREATE DATABASE my_database"); + } + + @Test + public void testCreateDuplicateDatabase_ifNotExists() { + cli.execute("CREATE DATABASE my_database"); + cli.execute("CREATE DATABASE IF NOT EXISTS my_database"); + assertEquals( + ImmutableSet.of("default", "my_database"), catalogManager.currentCatalog().listDatabases()); + } + + @Test + public void testUseDatabase() { + assertEquals("default", catalogManager.currentCatalog().currentDatabase()); + cli.execute("CREATE DATABASE my_database"); + cli.execute("CREATE DATABASE my_database2"); + assertEquals("default", catalogManager.currentCatalog().currentDatabase()); + cli.execute("USE DATABASE my_database"); + assertEquals("my_database", catalogManager.currentCatalog().currentDatabase()); + cli.execute("USE DATABASE my_database2"); + assertEquals("my_database2", catalogManager.currentCatalog().currentDatabase()); + } + + @Test + public void testUseDatabase_doesNotExist() { + assertEquals("default", catalogManager.currentCatalog().currentDatabase()); + thrown.expect(CalciteContextException.class); + thrown.expectMessage("Cannot use database: 'non_existent' not found."); + cli.execute("USE DATABASE non_existent"); + } + + @Test + public void testDropDatabase() { + cli.execute("CREATE DATABASE my_database"); + assertEquals( + ImmutableSet.of("default", "my_database"), catalogManager.currentCatalog().listDatabases()); + cli.execute("DROP DATABASE my_database"); + assertEquals(ImmutableSet.of("default"), catalogManager.currentCatalog().listDatabases()); + } + + @Test + public void testDropDatabase_nonexistent() { + assertEquals(ImmutableSet.of("default"), catalogManager.currentCatalog().listDatabases()); + thrown.expect(CalciteContextException.class); + thrown.expectMessage("Database 'my_database' does not exist."); + cli.execute("DROP DATABASE my_database"); + } +} diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/BeamSqlCliIcebergTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/BeamSqlCliIcebergTest.java new file mode 100644 index 000000000000..217bf90c09a9 --- /dev/null +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/BeamSqlCliIcebergTest.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.extensions.sql.meta.provider.iceberg; + +import static java.lang.String.format; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; + +import java.io.File; +import java.io.IOException; +import java.util.UUID; +import org.apache.beam.sdk.extensions.sql.BeamSqlCli; +import org.apache.beam.sdk.extensions.sql.meta.catalog.InMemoryCatalogManager; +import org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.runtime.CalciteContextException; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; +import org.junit.Assert; +import org.junit.Before; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.rules.TemporaryFolder; + +/** UnitTest for {@link BeamSqlCli} using Iceberg catalog. */ +public class BeamSqlCliIcebergTest { + @Rule public transient ExpectedException thrown = ExpectedException.none(); + private InMemoryCatalogManager catalogManager; + private BeamSqlCli cli; + private String warehouse; + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); + + @Before + public void setup() throws IOException { + catalogManager = new InMemoryCatalogManager(); + cli = new BeamSqlCli().catalogManager(catalogManager); + File warehouseFile = TEMPORARY_FOLDER.newFolder(); + Assert.assertTrue(warehouseFile.delete()); + warehouse = "file:" + warehouseFile + "/" + UUID.randomUUID(); + } + + private String createCatalog(String name) { + return format("CREATE CATALOG %s \n", name) + + "TYPE iceberg \n" + + "PROPERTIES (\n" + + " 'type' = 'hadoop', \n" + + format(" 'warehouse' = '%s')", warehouse); + } + + @Test + public void testCreateCatalog() { + assertEquals("default", catalogManager.currentCatalog().name()); + + cli.execute(createCatalog("my_catalog")); + assertNotNull(catalogManager.getCatalog("my_catalog")); + assertEquals("default", catalogManager.currentCatalog().name()); + + cli.execute("USE CATALOG my_catalog"); + assertEquals("my_catalog", catalogManager.currentCatalog().name()); + assertEquals("iceberg", catalogManager.currentCatalog().type()); + } + + @Test + public void testCreateNamespace() { + testCreateCatalog(); + + IcebergCatalog catalog = (IcebergCatalog) catalogManager.currentCatalog(); + assertEquals("default", catalog.currentDatabase()); + cli.execute("CREATE DATABASE new_namespace"); + assertEquals("new_namespace", Iterables.getOnlyElement(catalog.listDatabases())); + + // Specifies IF NOT EXISTS, so should be a no-op + cli.execute("CREATE DATABASE IF NOT EXISTS new_namespace"); + assertEquals("new_namespace", Iterables.getOnlyElement(catalog.listDatabases())); + + // This one doesn't, so it should throw an error. + thrown.expect(CalciteContextException.class); + thrown.expectMessage("Database 'new_namespace' already exists."); + cli.execute("CREATE DATABASE new_namespace"); + + // cleanup + catalog.dropDatabase("new_namespace", true); + } + + @Test + public void testUseNamespace() { + testCreateCatalog(); + + IcebergCatalog catalog = (IcebergCatalog) catalogManager.currentCatalog(); + cli.execute("CREATE DATABASE new_namespace"); + assertEquals("default", catalog.currentDatabase()); + cli.execute("USE DATABASE new_namespace"); + assertEquals("new_namespace", catalog.currentDatabase()); + + // Cannot use a non-existent namespace + thrown.expect(CalciteContextException.class); + thrown.expectMessage("Cannot use database: 'non_existent' not found."); + cli.execute("USE DATABASE non_existent"); + + // cleanup + catalog.dropDatabase("new_namespace", true); + } + + @Test + public void testDropNamespace() { + testCreateCatalog(); + + IcebergCatalog catalog = (IcebergCatalog) catalogManager.currentCatalog(); + cli.execute("CREATE DATABASE new_namespace"); + cli.execute("USE DATABASE new_namespace"); + assertEquals("new_namespace", catalog.currentDatabase()); + cli.execute("DROP DATABASE new_namespace"); + assertTrue(catalog.listDatabases().isEmpty()); + assertNull(catalog.currentDatabase()); + + // Drop non-existent namespace with IF EXISTS + cli.execute("DROP DATABASE IF EXISTS new_namespace"); + + // Throw an error when IF EXISTS is not specified + thrown.expect(CalciteContextException.class); + thrown.expectMessage("Database 'new_namespace' does not exist."); + cli.execute("DROP DATABASE new_namespace"); + } +} diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProviderTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProviderTest.java index d829ee3bed08..dcd37a203b54 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProviderTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProviderTest.java @@ -34,8 +34,8 @@ /** UnitTest for {@link IcebergTableProvider}. */ public class IcebergTableProviderTest { - private final IcebergTableProvider provider = - new IcebergTableProvider( + private final IcebergCatalog catalog = + new IcebergCatalog( "test_catalog", ImmutableMap.of( "catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog", @@ -46,7 +46,7 @@ public class IcebergTableProviderTest { @Test public void testGetTableType() { - assertEquals("iceberg", provider.getTableType()); + assertEquals("iceberg", catalog.metaStore().getTableType()); } @Test @@ -59,14 +59,14 @@ public void testBuildBeamSqlTable() throws Exception { fakeTableBuilder("my_table") .properties(TableUtils.parseProperties(propertiesString)) .build(); - BeamSqlTable sqlTable = provider.buildBeamSqlTable(table); + BeamSqlTable sqlTable = catalog.metaStore().buildBeamSqlTable(table); assertNotNull(sqlTable); assertTrue(sqlTable instanceof IcebergTable); IcebergTable icebergTable = (IcebergTable) sqlTable; assertEquals("namespace.my_table", icebergTable.tableIdentifier); - assertEquals(provider.catalogConfig, icebergTable.catalogConfig); + assertEquals(catalog.catalogConfig, icebergTable.catalogConfig); } private static Table.Builder fakeTableBuilder(String name) { diff --git a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java index 7929d028bcdc..96357b44e54b 100644 --- a/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java +++ b/sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/IcebergCatalogConfig.java @@ -21,13 +21,20 @@ import java.io.Serializable; import java.util.List; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; import org.apache.beam.sdk.schemas.Schema; import org.apache.beam.sdk.util.ReleaseInfo; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; import org.apache.hadoop.conf.Configuration; import org.apache.iceberg.CatalogUtil; import org.apache.iceberg.PartitionSpec; import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; import org.apache.iceberg.catalog.TableIdentifier; import org.apache.iceberg.exceptions.AlreadyExistsException; import org.checkerframework.checker.nullness.qual.MonotonicNonNull; @@ -83,6 +90,51 @@ public org.apache.iceberg.catalog.Catalog catalog() { return cachedCatalog; } + private void checkSupportsNamespaces() { + Preconditions.checkState( + catalog() instanceof SupportsNamespaces, + "Catalog '%s' does not support handling namespaces.", + catalog().name()); + } + + public boolean createNamespace(String namespace) { + checkSupportsNamespaces(); + String[] components = Iterables.toArray(Splitter.on('.').split(namespace), String.class); + + try { + ((SupportsNamespaces) catalog()).createNamespace(Namespace.of(components)); + return true; + } catch (AlreadyExistsException e) { + return false; + } + } + + public Set listNamespaces() { + checkSupportsNamespaces(); + + return ((SupportsNamespaces) catalog()) + .listNamespaces().stream().map(Namespace::toString).collect(Collectors.toSet()); + } + + public boolean dropNamespace(String namespace, boolean cascade) { + checkSupportsNamespaces(); + + String[] components = Iterables.toArray(Splitter.on('.').split(namespace), String.class); + Namespace ns = Namespace.of(components); + + if (!((SupportsNamespaces) catalog()).namespaceExists(ns)) { + return false; + } + + // Cascade will delete all contained tables first + if (cascade) { + catalog().listTables(ns).forEach(catalog()::dropTable); + } + + // Drop the namespace + return ((SupportsNamespaces) catalog()).dropNamespace(Namespace.of(components)); + } + public void createTable( String tableIdentifier, Schema tableSchema, @Nullable List partitionFields) { TableIdentifier icebergIdentifier = TableIdentifier.parse(tableIdentifier); @@ -105,6 +157,12 @@ public boolean dropTable(String tableIdentifier) { return catalog().dropTable(icebergIdentifier); } + public Set listTables(String namespace) { + return catalog().listTables(Namespace.of(namespace)).stream() + .map(TableIdentifier::name) + .collect(Collectors.toSet()); + } + @AutoValue.Builder public abstract static class Builder { public abstract Builder setCatalogName(@Nullable String catalogName); From a1a3690f89a022d9c524473e66388032ea4d02a7 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Mon, 21 Jul 2025 17:06:38 -0400 Subject: [PATCH 2/5] fix test --- .../sql/meta/provider/iceberg/IcebergTableProviderTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProviderTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProviderTest.java index dcd37a203b54..3e63eb8457e0 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProviderTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/IcebergTableProviderTest.java @@ -46,7 +46,7 @@ public class IcebergTableProviderTest { @Test public void testGetTableType() { - assertEquals("iceberg", catalog.metaStore().getTableType()); + assertNotNull(catalog.metaStore().getProvider("iceberg")); } @Test From 99e1728e341d744dd5f7fd2f7b24f1ef85df9312 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 22 Jul 2025 09:04:51 -0400 Subject: [PATCH 3/5] fix dep --- .../beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java index 2bcf484c0424..64d2fefe2f63 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/catalog/InMemoryCatalog.java @@ -18,7 +18,7 @@ package org.apache.beam.sdk.extensions.sql.meta.catalog; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; -import static org.apache.curator.shaded.com.google.common.base.Preconditions.checkState; +import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; import java.util.Collections; import java.util.HashSet; From 033f1017108fddc1912aa799fa2fbf31f7a2f6b7 Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 22 Jul 2025 11:06:05 -0400 Subject: [PATCH 4/5] remove tYPE --- .../beam/sdk/extensions/sql/impl/parser/SqlCreateDatabase.java | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateDatabase.java b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateDatabase.java index feb3cd46a65a..9938ad0e699c 100644 --- a/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateDatabase.java +++ b/sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/impl/parser/SqlCreateDatabase.java @@ -72,7 +72,6 @@ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { writer.keyword("IF NOT EXISTS"); } databaseName.unparse(writer, leftPrec, rightPrec); - writer.keyword("TYPE"); } @Override From 17993a3eeb655935306e696f6685ba811bb3c77a Mon Sep 17 00:00:00 2001 From: Ahmed Abualsaud Date: Tue, 22 Jul 2025 12:04:29 -0400 Subject: [PATCH 5/5] extra assertions --- .../sql/meta/provider/iceberg/BeamSqlCliIcebergTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/BeamSqlCliIcebergTest.java b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/BeamSqlCliIcebergTest.java index 217bf90c09a9..cc6e3b426ec3 100644 --- a/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/BeamSqlCliIcebergTest.java +++ b/sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/iceberg/BeamSqlCliIcebergTest.java @@ -94,8 +94,8 @@ public void testCreateNamespace() { thrown.expectMessage("Database 'new_namespace' already exists."); cli.execute("CREATE DATABASE new_namespace"); - // cleanup - catalog.dropDatabase("new_namespace", true); + // assert there was a database, and cleanup + assertTrue(catalog.dropDatabase("new_namespace", true)); } @Test @@ -113,8 +113,8 @@ public void testUseNamespace() { thrown.expectMessage("Cannot use database: 'non_existent' not found."); cli.execute("USE DATABASE non_existent"); - // cleanup - catalog.dropDatabase("new_namespace", true); + // assert there was a database, and cleanup + assertTrue(catalog.dropDatabase("new_namespace", true)); } @Test