diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java index fe438b6871e13..c02de7c019679 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/ExecutionContext.java @@ -166,7 +166,7 @@ public ClusterDescriptor createClusterDescriptor() throws Exception { public EnvironmentInstance createEnvironmentInstance() { try { - return new EnvironmentInstance(); + return wrapClassLoader(EnvironmentInstance::new); } catch (Throwable t) { // catch everything such that a wrong environment does not affect invocations throw new SqlExecutionException("Could not create environment instance.", t); diff --git a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java index 36fd4ee0a6318..060ac4185ad03 100644 --- a/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java +++ b/flink-libraries/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java @@ -193,27 +193,31 @@ public Map getSessionProperties(SessionContext session) throws S @Override public List listTables(SessionContext session) throws SqlExecutionException { - final TableEnvironment tableEnv = getOrCreateExecutionContext(session) + final ExecutionContext context = getOrCreateExecutionContext(session); + final TableEnvironment tableEnv = context .createEnvironmentInstance() .getTableEnvironment(); - return Arrays.asList(tableEnv.listTables()); + return context.wrapClassLoader(() -> Arrays.asList(tableEnv.listTables())); } @Override public List listUserDefinedFunctions(SessionContext session) throws SqlExecutionException { - final TableEnvironment tableEnv = getOrCreateExecutionContext(session) + final ExecutionContext context = getOrCreateExecutionContext(session); + final TableEnvironment tableEnv = context .createEnvironmentInstance() .getTableEnvironment(); - return Arrays.asList(tableEnv.listUserDefinedFunctions()); + return context.wrapClassLoader(() -> Arrays.asList(tableEnv.listUserDefinedFunctions())); } @Override public TableSchema getTableSchema(SessionContext session, String name) throws SqlExecutionException { - final TableEnvironment tableEnv = getOrCreateExecutionContext(session) + final ExecutionContext context = getOrCreateExecutionContext(session); + final TableEnvironment tableEnv = context .createEnvironmentInstance() .getTableEnvironment(); try { - return tableEnv.scan(name).getSchema(); + // scanning requires table resolution step that might reference external tables + return context.wrapClassLoader(() -> tableEnv.scan(name).getSchema()); } catch (Throwable t) { // catch everything such that the query does not crash the executor throw new SqlExecutionException("No table with this name could be found.", t); @@ -229,7 +233,7 @@ public String explainStatement(SessionContext session, String statement) throws // translate try { - final Table table = createTable(tableEnv, statement); + final Table table = createTable(context, tableEnv, statement); // explanation requires an optimization step that might reference UDFs during code compilation return context.wrapClassLoader(() -> tableEnv.explain(table)); } catch (Throwable t) { @@ -240,12 +244,14 @@ public String explainStatement(SessionContext session, String statement) throws @Override public List completeStatement(SessionContext session, String statement, int position) { - final TableEnvironment tableEnv = getOrCreateExecutionContext(session) + final ExecutionContext context = getOrCreateExecutionContext(session); + final TableEnvironment tableEnv = context .createEnvironmentInstance() .getTableEnvironment(); try { - return Arrays.asList(tableEnv.getCompletionHints(statement, position)); + // planning requires table resolution step that might reference external tables + return context.wrapClassLoader(() -> Arrays.asList(tableEnv.getCompletionHints(statement, position))); } catch (Throwable t) { // catch everything such that the query does not crash the executor if (LOG.isDebugEnabled()) { @@ -402,7 +408,7 @@ private ResultDescriptor executeQueryInternal(ExecutionContext context, S final ExecutionContext.EnvironmentInstance envInst = context.createEnvironmentInstance(); // create table - final Table table = createTable(envInst.getTableEnvironment(), query); + final Table table = createTable(context, envInst.getTableEnvironment(), query); // initialize result final DynamicResult result = resultStore.createResult( @@ -448,10 +454,11 @@ private ResultDescriptor executeQueryInternal(ExecutionContext context, S /** * Creates a table using the given query in the given table environment. */ - private Table createTable(TableEnvironment tableEnv, String selectQuery) { + private Table createTable(ExecutionContext context, TableEnvironment tableEnv, String selectQuery) { // parse and validate query try { - return tableEnv.sqlQuery(selectQuery); + // query statement requires table resolution step that might reference external tables + return context.wrapClassLoader(() -> tableEnv.sqlQuery(selectQuery)); } catch (Throwable t) { // catch everything such that the query does not crash the executor throw new SqlExecutionException("Invalid SQL statement.", t); diff --git a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ConnectorDescriptor.java b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ConnectorDescriptor.java index a1d3e783cdc1c..ed62b46b36b50 100644 --- a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ConnectorDescriptor.java +++ b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ConnectorDescriptor.java @@ -68,7 +68,7 @@ protected final boolean isFormatNeeded() { /** * Converts this descriptor into a set of connector properties. Usually prefixed with - * {@link FormatDescriptorValidator#FORMAT}. + * {@link ConnectorDescriptorValidator#CONNECTOR}. */ protected abstract Map toConnectorProperties(); } diff --git a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.java b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.java index 88cf34905f452..cf9860ae69e2a 100644 --- a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.java +++ b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.java @@ -26,6 +26,11 @@ @Internal public abstract class ConnectorDescriptorValidator implements DescriptorValidator { + /** + * Prefix for connector-related properties. + */ + public static final String CONNECTOR = "connector"; + /** * Key for describing the type of the connector. Usually used for factory discovery. */ diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/descriptors/ExternalCatalogDescriptor.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/descriptors/ExternalCatalogDescriptor.java new file mode 100644 index 0000000000000..5ee7f06a9dd48 --- /dev/null +++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/descriptors/ExternalCatalogDescriptor.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors; + +import org.apache.flink.annotation.PublicEvolving; + +import java.util.Map; + +import static org.apache.flink.table.descriptors.ExternalCatalogDescriptorValidator.CATALOG_PROPERTY_VERSION; +import static org.apache.flink.table.descriptors.ExternalCatalogDescriptorValidator.CATALOG_TYPE; + +/** + * Describes a external catalog of tables, views, and functions. + */ +@PublicEvolving +public abstract class ExternalCatalogDescriptor extends DescriptorBase implements Descriptor { + + private final String type; + + private final int version; + + /** + * Constructs a {@link ExternalCatalogDescriptor}. + * + * @param type string that identifies this catalog + * @param version property version for backwards compatibility + */ + public ExternalCatalogDescriptor(String type, int version) { + this.type = type; + this.version = version; + } + + @Override + public final Map toProperties() { + final DescriptorProperties properties = new DescriptorProperties(); + properties.putString(CATALOG_TYPE, type); + properties.putLong(CATALOG_PROPERTY_VERSION, version); + properties.putProperties(toCatalogProperties()); + return properties.asMap(); + } + + /** + * Converts this descriptor into a set of catalog properties. + */ + protected abstract Map toCatalogProperties(); +} diff --git a/flink-libraries/flink-table/src/main/java/org/apache/flink/table/descriptors/ExternalCatalogDescriptorValidator.java b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/descriptors/ExternalCatalogDescriptorValidator.java new file mode 100644 index 0000000000000..6cee73c5a1615 --- /dev/null +++ b/flink-libraries/flink-table/src/main/java/org/apache/flink/table/descriptors/ExternalCatalogDescriptorValidator.java @@ -0,0 +1,45 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors; + +import org.apache.flink.annotation.Internal; + +/** + * Validator for {@link ExternalCatalogDescriptor}. + */ +@Internal +public abstract class ExternalCatalogDescriptorValidator implements DescriptorValidator { + + /** + * Key for describing the type of the catalog. Usually used for factory discovery. + */ + public static final String CATALOG_TYPE = "type"; + + /** + * Key for describing the property version. This property can be used for backwards + * compatibility in case the property format changes. + */ + public static final String CATALOG_PROPERTY_VERSION = "property-version"; + + @Override + public void validate(DescriptorProperties properties) { + properties.validateString(CATALOG_TYPE, false, 1); + properties.validateInt(CATALOG_PROPERTY_VERSION, true, 0); + } +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala index ba789638ca393..55e539a8a4b80 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala @@ -49,7 +49,7 @@ import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTabl import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory, FlinkTypeSystem} import org.apache.flink.table.catalog.{ExternalCatalog, ExternalCatalogSchema} import org.apache.flink.table.codegen.{ExpressionReducer, FunctionCodeGenerator, GeneratedFunction} -import org.apache.flink.table.descriptors.{ConnectorDescriptor, TableDescriptor} +import org.apache.flink.table.descriptors.{ConnectExternalCatalogDescriptor, ConnectorDescriptor, ExternalCatalogDescriptor, TableDescriptor} import org.apache.flink.table.expressions._ import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._ import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction} @@ -403,6 +403,15 @@ abstract class TableEnvironment(val config: TableConfig) { scan(name) } + /** + * Gets the names of all external catalogs registered in this environment. + * + * @return A list of the names of all registered external catalogs. + */ + def listExternalCatalogs(): Array[String] = { + this.externalCatalogs.keySet.toArray + } + /** * Registers an [[ExternalCatalog]] under a unique name in the TableEnvironment's schema. * All tables registered in the [[ExternalCatalog]] can be accessed. @@ -655,6 +664,31 @@ abstract class TableEnvironment(val config: TableConfig) { */ def connect(connectorDescriptor: ConnectorDescriptor): TableDescriptor + /** + * Connects to an external catalog from a descriptor. + * + * Descriptors allow for declaring the communication to external systems in an + * implementation-agnostic way. The classpath is scanned for suitable table factories that match + * the desired configuration. + * + * The following example shows how to read from an external catalog and + * registering it as "MyCatalog": + * + * {{{ + * + * tableEnv + * .connect( + * new ExternalCatalogXYZ() + * .version("2.3.0")) + * .registerExternalCatalog("MyCatalog") + * }}} + * + * @param catalogDescriptor connector descriptor describing the external system + */ + def connect(catalogDescriptor: ExternalCatalogDescriptor): ConnectExternalCatalogDescriptor = { + new ConnectExternalCatalogDescriptor(this, catalogDescriptor) + } + private[flink] def scanInternal(tablePath: Array[String]): Option[Table] = { require(tablePath != null && !tablePath.isEmpty, "tablePath must not be null or empty.") val schemaPaths = tablePath.slice(0, tablePath.length - 1) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala index 45414ee3ff6e7..ce57070ac98d7 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala @@ -270,8 +270,8 @@ class ExternalCatalogTableBuilder(private val connectorDescriptor: ConnectorDesc * Explicitly declares this external table for supporting only batch environments. */ def supportsBatch(): ExternalCatalogTableBuilder = { - isBatch = false - isStreaming = true + isBatch = true + isStreaming = false this } diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectExternalCatalogDescriptor.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectExternalCatalogDescriptor.scala new file mode 100644 index 0000000000000..5d3d944328793 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/descriptors/ConnectExternalCatalogDescriptor.scala @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import java.util + +import org.apache.flink.table.api.TableEnvironment +import org.apache.flink.table.factories.TableFactoryUtil + +/** + * Common class for external catalogs created + * with [[TableEnvironment.connect(ExternalCatalogDescriptor)]]. + */ + class ConnectExternalCatalogDescriptor( + private val tableEnv: TableEnvironment, + private val catalogDescriptor: ExternalCatalogDescriptor) + extends DescriptorBase { + + /** + * Searches for the specified external, configures it accordingly, and registers it as + * a catalog under the given name. + * + * @param name catalog name to be registered in the table environment + */ + def registerExternalCatalog(name: String): Unit = { + val externalCatalog = TableFactoryUtil.findAndCreateExternalCatalog(tableEnv, this) + tableEnv.registerExternalCatalog(name, externalCatalog) + } + + // ---------------------------------------------------------------------------------------------- + + /** + * Converts this descriptor into a set of properties. + */ + override def toProperties: util.Map[String, String] = { + catalogDescriptor.toProperties + } +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/BatchTableSinkFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/BatchTableSinkFactory.scala index 6fd1f7afb16b5..0b685239e05aa 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/BatchTableSinkFactory.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/BatchTableSinkFactory.scala @@ -23,7 +23,7 @@ import java.util import org.apache.flink.table.sinks.BatchTableSink /** - * A factory to create configured table sink instances in a streaming environment based on + * A factory to create configured table sink instances in a batch environment based on * string-based properties. See also [[TableFactory]] for more information. * * @tparam T type of records that the factory consumes diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/ExternalCatalogFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/ExternalCatalogFactory.scala new file mode 100644 index 0000000000000..ffc8d0bb1f329 --- /dev/null +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/ExternalCatalogFactory.scala @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories + +import java.util + +import org.apache.flink.table.catalog.ExternalCatalog + +/** + * A factory to create configured external catalog instances based on string-based properties. See + * also [[TableFactory]] for more information. + */ +trait ExternalCatalogFactory extends TableFactory { + + /** + * Creates and configures an [[org.apache.flink.table.catalog.ExternalCatalog]] + * using the given properties. + * + * @param properties normalized properties describing an external catalog. + * @return the configured external catalog. + */ + def createExternalCatalog(properties: util.Map[String, String]): ExternalCatalog +} diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala index 10b6a67ce6e6b..dfd36c5319f18 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryService.scala @@ -22,6 +22,7 @@ import java.util.{ServiceConfigurationError, ServiceLoader, Map => JMap} import org.apache.flink.table.api._ import org.apache.flink.table.descriptors.ConnectorDescriptorValidator._ +import org.apache.flink.table.descriptors.ExternalCatalogDescriptorValidator._ import org.apache.flink.table.descriptors.FormatDescriptorValidator._ import org.apache.flink.table.descriptors.MetadataValidator._ import org.apache.flink.table.descriptors.StatisticsValidator._ @@ -205,6 +206,7 @@ object TableFactoryService extends Logging { plainContext.remove(FORMAT_PROPERTY_VERSION) plainContext.remove(METADATA_PROPERTY_VERSION) plainContext.remove(STATISTICS_PROPERTY_VERSION) + plainContext.remove(CATALOG_PROPERTY_VERSION) // check if required context is met plainContext.forall(e => properties.contains(e._1) && properties(e._1) == e._2) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryUtil.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryUtil.scala index b773684b88f86..2376c7a129231 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryUtil.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/TableFactoryUtil.scala @@ -19,6 +19,7 @@ package org.apache.flink.table.factories import org.apache.flink.table.api.{BatchTableEnvironment, StreamTableEnvironment, TableEnvironment, TableException} +import org.apache.flink.table.catalog.ExternalCatalog import org.apache.flink.table.descriptors.Descriptor import org.apache.flink.table.sinks.TableSink import org.apache.flink.table.sources.TableSource @@ -28,6 +29,21 @@ import org.apache.flink.table.sources.TableSource */ object TableFactoryUtil { + /** + * Returns an external catalog for a table environment. + */ + def findAndCreateExternalCatalog( + tableEnvironment: TableEnvironment, + descriptor: Descriptor) + : ExternalCatalog = { + + val javaMap = descriptor.toProperties + + TableFactoryService + .find(classOf[ExternalCatalogFactory], javaMap) + .createExternalCatalog(javaMap) + } + /** * Returns a table source for a table environment. */ diff --git a/flink-libraries/flink-table/src/test/java/org/apache/flink/table/descriptors/ExternalCatalogDescriptorTest.java b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/descriptors/ExternalCatalogDescriptorTest.java new file mode 100644 index 0000000000000..1f3fc7bb0c325 --- /dev/null +++ b/flink-libraries/flink-table/src/test/java/org/apache/flink/table/descriptors/ExternalCatalogDescriptorTest.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors; + +import org.apache.flink.table.api.ValidationException; + +import org.junit.Test; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.table.descriptors.ExternalCatalogDescriptorValidator.CATALOG_PROPERTY_VERSION; +import static org.apache.flink.table.descriptors.ExternalCatalogDescriptorValidator.CATALOG_TYPE; + +/** + * Tests for the {@link ExternalCatalogDescriptor} descriptor and {@link ExternalCatalogDescriptorValidator} validator. + */ +public class ExternalCatalogDescriptorTest extends DescriptorTestBase { + + private static final String CATALOG_TYPE_VALUE = "ExternalCatalogDescriptorTest"; + private static final int CATALOG_PROPERTY_VERSION_VALUE = 1; + private static final String CATALOG_FOO = "foo"; + private static final String CATALOG_FOO_VALUE = "foo-1"; + + @Test(expected = ValidationException.class) + public void testMissingCatalogType() { + removePropertyAndVerify(descriptors().get(0), CATALOG_TYPE); + } + + @Test(expected = ValidationException.class) + public void testMissingFoo() { + removePropertyAndVerify(descriptors().get(0), CATALOG_FOO); + } + + @Override + protected List descriptors() { + final Descriptor minimumDesc = new TestExternalCatalogDescriptor(CATALOG_FOO_VALUE); + return Collections.singletonList(minimumDesc); + } + + @Override + protected List> properties() { + final Map minimumProps = new HashMap<>(); + minimumProps.put(CATALOG_TYPE, CATALOG_TYPE_VALUE); + minimumProps.put(CATALOG_PROPERTY_VERSION, "" + CATALOG_PROPERTY_VERSION_VALUE); + minimumProps.put(CATALOG_FOO, CATALOG_FOO_VALUE); + return Collections.singletonList(minimumProps); + } + + @Override + protected DescriptorValidator validator() { + return new TestExternalCatalogDescriptorValidator(); + } + + class TestExternalCatalogDescriptor extends ExternalCatalogDescriptor { + private String foo; + + public TestExternalCatalogDescriptor(@Nullable String foo) { + super(CATALOG_TYPE_VALUE, CATALOG_PROPERTY_VERSION_VALUE); + this.foo = foo; + } + + @Override + protected Map toCatalogProperties() { + DescriptorProperties properties = new DescriptorProperties(); + if (foo != null) { + properties.putString(CATALOG_FOO, foo); + } + return properties.asMap(); + } + } + + class TestExternalCatalogDescriptorValidator extends ExternalCatalogDescriptorValidator { + @Override + public void validate(DescriptorProperties properties) { + super.validate(properties); + properties.validateString(CATALOG_FOO, false, 1); + } + } +} diff --git a/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory b/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory index c97fe8e994552..3a79ceecf9eca 100644 --- a/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory +++ b/flink-libraries/flink-table/src/test/resources/META-INF/services/org.apache.flink.table.factories.TableFactory @@ -19,3 +19,4 @@ org.apache.flink.table.factories.utils.TestTableSinkFactory org.apache.flink.table.factories.utils.TestTableSourceFactory org.apache.flink.table.factories.utils.TestTableFormatFactory org.apache.flink.table.factories.utils.TestAmbiguousTableFormatFactory +org.apache.flink.table.factories.utils.TestExternalCatalogFactory diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/ConnectExternalCatalogDescriptorTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/ConnectExternalCatalogDescriptorTest.scala new file mode 100644 index 0000000000000..04afa10aaa4ac --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/ConnectExternalCatalogDescriptorTest.scala @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.descriptors + +import java.util + +import org.apache.flink.table.factories.utils.TestExternalCatalogFactory +import org.apache.flink.table.factories.utils.TestExternalCatalogFactory.CATALOG_IS_STREAMING +import org.apache.flink.table.factories.utils.TestExternalCatalogFactory.CATALOG_TYPE_VALUE_TEST +import org.apache.flink.table.utils.TableTestBase +import org.hamcrest.CoreMatchers.{is, notNullValue} +import org.junit.Assert.assertThat +import org.junit.Test + +/** + * Tests for [[ConnectExternalCatalogDescriptor]]. + */ +class ConnectExternalCatalogDescriptorTest extends TableTestBase { + + import ConnectExternalCatalogDescriptorTest._ + + @Test + def testStreamConnectExternalCatalogDescriptor(): Unit = { + testConnectExternalCatalogDescriptor(true) + } + + @Test + def testBatchConnectExternalCatalogDescriptor(): Unit = { + testConnectExternalCatalogDescriptor(false) + } + + private def testConnectExternalCatalogDescriptor(isStreaming: Boolean): Unit = { + + val tableEnv = if (isStreaming) { + streamTestUtil().tableEnv + } else { + batchTestUtil().tableEnv + } + + val catalog = testExternalCatalog(isStreaming) + val descriptor: ConnectExternalCatalogDescriptor = tableEnv.connect(catalog) + + // tests the catalog factory discovery and thus validates the result automatically + descriptor.registerExternalCatalog("test") + assertThat(tableEnv.listExternalCatalogs(), is(Array("test"))) + + val tb1 = tableEnv.scan("test", "db1", "tb1") + assertThat(tb1, is(notNullValue())) + } +} + +object ConnectExternalCatalogDescriptorTest { + + /** + * Gets a descriptor for the external catalog produced by [[TestExternalCatalogFactory]]. + */ + private def testExternalCatalog(isStreaming: Boolean) = { + new ExternalCatalogDescriptor(CATALOG_TYPE_VALUE_TEST, 1) { + override protected def toCatalogProperties: util.Map[String, String] = + java.util.Collections.singletonMap(CATALOG_IS_STREAMING, isStreaming.toString) + } + } +} diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/ExternalCatalogFactoryServiceTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/ExternalCatalogFactoryServiceTest.scala new file mode 100644 index 0000000000000..acd119ca9ea7f --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/ExternalCatalogFactoryServiceTest.scala @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories + +import java.util.{HashMap => JHashMap, Map => JMap} + +import org.apache.flink.table.api.NoMatchingTableFactoryException +import org.apache.flink.table.descriptors.ExternalCatalogDescriptorValidator.{CATALOG_PROPERTY_VERSION, CATALOG_TYPE} +import org.apache.flink.table.factories.utils.TestExternalCatalogFactory +import org.apache.flink.table.factories.utils.TestExternalCatalogFactory.CATALOG_TYPE_VALUE_TEST +import org.junit.Assert._ +import org.junit.Test + +/** + * Tests for testing external catalog discovery using [[TableFactoryService]]. The tests assume the + * external catalog factory [[TestExternalCatalogFactory]] is registered. + */ +class ExternalCatalogFactoryServiceTest { + + @Test + def testValidProperties(): Unit = { + val props = properties() + assertTrue(TableFactoryService.find(classOf[ExternalCatalogFactory], props) + .isInstanceOf[TestExternalCatalogFactory]) + } + + @Test(expected = classOf[NoMatchingTableFactoryException]) + def testInvalidContext(): Unit = { + val props = properties() + props.put(CATALOG_TYPE, "unknown-catalog-type") + TableFactoryService.find(classOf[ExternalCatalogFactory], props) + } + + @Test + def testDifferentContextVersion(): Unit = { + val props = properties() + props.put(CATALOG_PROPERTY_VERSION, "2") + // the external catalog should still be found + assertTrue(TableFactoryService.find(classOf[ExternalCatalogFactory], props) + .isInstanceOf[TestExternalCatalogFactory]) + } + + @Test(expected = classOf[NoMatchingTableFactoryException]) + def testUnsupportedProperty(): Unit = { + val props = properties() + props.put("unknown-property", "/new/path") + TableFactoryService.find(classOf[ExternalCatalogFactory], props) + } + + private def properties(): JMap[String, String] = { + val properties = new JHashMap[String, String]() + properties.put(CATALOG_TYPE, CATALOG_TYPE_VALUE_TEST) + properties.put(CATALOG_PROPERTY_VERSION, "1") + properties + } +} diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestExternalCatalogFactory.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestExternalCatalogFactory.scala new file mode 100644 index 0000000000000..dd83121b00e6d --- /dev/null +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/factories/utils/TestExternalCatalogFactory.scala @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.factories.utils + +import java.util +import java.util.Collections + +import org.apache.flink.table.catalog.ExternalCatalog +import org.apache.flink.table.descriptors.DescriptorProperties +import org.apache.flink.table.descriptors.ExternalCatalogDescriptorValidator.{CATALOG_PROPERTY_VERSION, CATALOG_TYPE} +import org.apache.flink.table.factories.utils.TestExternalCatalogFactory._ +import org.apache.flink.table.factories.ExternalCatalogFactory +import org.apache.flink.table.runtime.utils.CommonTestData + +/** + * External catalog factory for testing. + * + * This factory provides the in-memory catalog from [[CommonTestData]] as a + * catalog of type "test". + * + * The catalog produces tables intended for either a streaming or batch environment, + * based on the descriptor property {{{ is-streaming }}}. + */ +class TestExternalCatalogFactory extends ExternalCatalogFactory { + + override def requiredContext: util.Map[String, String] = { + val context = new util.HashMap[String, String] + context.put(CATALOG_TYPE, CATALOG_TYPE_VALUE_TEST) + context.put(CATALOG_PROPERTY_VERSION, "1") + context + } + + override def supportedProperties: util.List[String] = + Collections.singletonList(CATALOG_IS_STREAMING) + + override def createExternalCatalog(properties: util.Map[String, String]): ExternalCatalog = { + val props = new DescriptorProperties() + props.putProperties(properties) + + CommonTestData.getInMemoryTestCatalog( + isStreaming = props.getOptionalBoolean(CATALOG_IS_STREAMING).orElse(false)) + } +} + +object TestExternalCatalogFactory { + val CATALOG_TYPE_VALUE_TEST = "test" + val CATALOG_IS_STREAMING = "is-streaming" +} diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala index 64fcc8ac7c480..1209595837bf3 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala @@ -85,7 +85,9 @@ object CommonTestData { .withSchema(schemaDesc1) if (isStreaming) { - externalTableBuilder1.inAppendMode() + externalTableBuilder1.supportsStreaming().inAppendMode() + } else { + externalTableBuilder1.supportsBatch() } val csvRecord2 = Seq( @@ -126,7 +128,9 @@ object CommonTestData { .withSchema(schemaDesc2) if (isStreaming) { - externalTableBuilder2.inAppendMode() + externalTableBuilder2.supportsStreaming().inAppendMode() + } else { + externalTableBuilder2.supportsBatch() } val tempFilePath3 = writeToTempFile("", "csv-test3", "tmp") @@ -145,7 +149,9 @@ object CommonTestData { .withSchema(schemaDesc3) if (isStreaming) { - externalTableBuilder3.inAppendMode() + externalTableBuilder3.supportsStreaming().inAppendMode() + } else { + externalTableBuilder3.supportsBatch() } val catalog = new InMemoryExternalCatalog("test")