Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ public ClusterDescriptor<T> createClusterDescriptor() throws Exception {

public EnvironmentInstance createEnvironmentInstance() {
try {
return new EnvironmentInstance();
return wrapClassLoader(EnvironmentInstance::new);
} catch (Throwable t) {
// catch everything such that a wrong environment does not affect invocations
throw new SqlExecutionException("Could not create environment instance.", t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,27 +193,31 @@ public Map<String, String> getSessionProperties(SessionContext session) throws S

@Override
public List<String> listTables(SessionContext session) throws SqlExecutionException {
final TableEnvironment tableEnv = getOrCreateExecutionContext(session)
final ExecutionContext<?> context = getOrCreateExecutionContext(session);
final TableEnvironment tableEnv = context
.createEnvironmentInstance()
.getTableEnvironment();
return Arrays.asList(tableEnv.listTables());
return context.wrapClassLoader(() -> Arrays.asList(tableEnv.listTables()));
}

@Override
public List<String> listUserDefinedFunctions(SessionContext session) throws SqlExecutionException {
final TableEnvironment tableEnv = getOrCreateExecutionContext(session)
final ExecutionContext<?> context = getOrCreateExecutionContext(session);
final TableEnvironment tableEnv = context
.createEnvironmentInstance()
.getTableEnvironment();
return Arrays.asList(tableEnv.listUserDefinedFunctions());
return context.wrapClassLoader(() -> Arrays.asList(tableEnv.listUserDefinedFunctions()));
}

@Override
public TableSchema getTableSchema(SessionContext session, String name) throws SqlExecutionException {
final TableEnvironment tableEnv = getOrCreateExecutionContext(session)
final ExecutionContext<?> context = getOrCreateExecutionContext(session);
final TableEnvironment tableEnv = context
.createEnvironmentInstance()
.getTableEnvironment();
try {
return tableEnv.scan(name).getSchema();
// scanning requires table resolution step that might reference external tables
return context.wrapClassLoader(() -> tableEnv.scan(name).getSchema());
} catch (Throwable t) {
// catch everything such that the query does not crash the executor
throw new SqlExecutionException("No table with this name could be found.", t);
Expand All @@ -229,7 +233,7 @@ public String explainStatement(SessionContext session, String statement) throws

// translate
try {
final Table table = createTable(tableEnv, statement);
final Table table = createTable(context, tableEnv, statement);
// explanation requires an optimization step that might reference UDFs during code compilation
return context.wrapClassLoader(() -> tableEnv.explain(table));
} catch (Throwable t) {
Expand All @@ -240,12 +244,14 @@ public String explainStatement(SessionContext session, String statement) throws

@Override
public List<String> completeStatement(SessionContext session, String statement, int position) {
final TableEnvironment tableEnv = getOrCreateExecutionContext(session)
final ExecutionContext<?> context = getOrCreateExecutionContext(session);
final TableEnvironment tableEnv = context
.createEnvironmentInstance()
.getTableEnvironment();

try {
return Arrays.asList(tableEnv.getCompletionHints(statement, position));
// planning requires table resolution step that might reference external tables
return context.wrapClassLoader(() -> Arrays.asList(tableEnv.getCompletionHints(statement, position)));
} catch (Throwable t) {
// catch everything such that the query does not crash the executor
if (LOG.isDebugEnabled()) {
Expand Down Expand Up @@ -402,7 +408,7 @@ private <C> ResultDescriptor executeQueryInternal(ExecutionContext<C> context, S
final ExecutionContext.EnvironmentInstance envInst = context.createEnvironmentInstance();

// create table
final Table table = createTable(envInst.getTableEnvironment(), query);
final Table table = createTable(context, envInst.getTableEnvironment(), query);

// initialize result
final DynamicResult<C> result = resultStore.createResult(
Expand Down Expand Up @@ -448,10 +454,11 @@ private <C> ResultDescriptor executeQueryInternal(ExecutionContext<C> context, S
/**
* Creates a table using the given query in the given table environment.
*/
private Table createTable(TableEnvironment tableEnv, String selectQuery) {
private <C> Table createTable(ExecutionContext<C> context, TableEnvironment tableEnv, String selectQuery) {
// parse and validate query
try {
return tableEnv.sqlQuery(selectQuery);
// query statement requires table resolution step that might reference external tables
return context.wrapClassLoader(() -> tableEnv.sqlQuery(selectQuery));
} catch (Throwable t) {
// catch everything such that the query does not crash the executor
throw new SqlExecutionException("Invalid SQL statement.", t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ protected final boolean isFormatNeeded() {

/**
* Converts this descriptor into a set of connector properties. Usually prefixed with
* {@link FormatDescriptorValidator#FORMAT}.
* {@link ConnectorDescriptorValidator#CONNECTOR}.
*/
protected abstract Map<String, String> toConnectorProperties();
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@
@Internal
public abstract class ConnectorDescriptorValidator implements DescriptorValidator {

/**
* Prefix for connector-related properties.
*/
public static final String CONNECTOR = "connector";

/**
* Key for describing the type of the connector. Usually used for factory discovery.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.descriptors;

import org.apache.flink.annotation.PublicEvolving;

import java.util.Map;

import static org.apache.flink.table.descriptors.ExternalCatalogDescriptorValidator.CATALOG_PROPERTY_VERSION;
import static org.apache.flink.table.descriptors.ExternalCatalogDescriptorValidator.CATALOG_TYPE;

/**
* Describes a external catalog of tables, views, and functions.
*/
@PublicEvolving
public abstract class ExternalCatalogDescriptor extends DescriptorBase implements Descriptor {

private final String type;

private final int version;

/**
* Constructs a {@link ExternalCatalogDescriptor}.
*
* @param type string that identifies this catalog
* @param version property version for backwards compatibility
*/
public ExternalCatalogDescriptor(String type, int version) {
this.type = type;
this.version = version;
}

@Override
public final Map<String, String> toProperties() {
final DescriptorProperties properties = new DescriptorProperties();
properties.putString(CATALOG_TYPE, type);
properties.putLong(CATALOG_PROPERTY_VERSION, version);
properties.putProperties(toCatalogProperties());
return properties.asMap();
}

/**
* Converts this descriptor into a set of catalog properties.
*/
protected abstract Map<String, String> toCatalogProperties();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.descriptors;

import org.apache.flink.annotation.Internal;

/**
* Validator for {@link ExternalCatalogDescriptor}.
*/
@Internal
public abstract class ExternalCatalogDescriptorValidator implements DescriptorValidator {

/**
* Key for describing the type of the catalog. Usually used for factory discovery.
*/
public static final String CATALOG_TYPE = "type";

/**
* Key for describing the property version. This property can be used for backwards
* compatibility in case the property format changes.
*/
public static final String CATALOG_PROPERTY_VERSION = "property-version";

@Override
public void validate(DescriptorProperties properties) {
properties.validateString(CATALOG_TYPE, false, 1);
properties.validateInt(CATALOG_PROPERTY_VERSION, true, 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ import org.apache.flink.table.api.scala.{BatchTableEnvironment => ScalaBatchTabl
import org.apache.flink.table.calcite.{FlinkPlannerImpl, FlinkRelBuilder, FlinkTypeFactory, FlinkTypeSystem}
import org.apache.flink.table.catalog.{ExternalCatalog, ExternalCatalogSchema}
import org.apache.flink.table.codegen.{ExpressionReducer, FunctionCodeGenerator, GeneratedFunction}
import org.apache.flink.table.descriptors.{ConnectorDescriptor, TableDescriptor}
import org.apache.flink.table.descriptors.{ConnectExternalCatalogDescriptor, ConnectorDescriptor, ExternalCatalogDescriptor, TableDescriptor}
import org.apache.flink.table.expressions._
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils._
import org.apache.flink.table.functions.{AggregateFunction, ScalarFunction, TableFunction}
Expand Down Expand Up @@ -403,6 +403,15 @@ abstract class TableEnvironment(val config: TableConfig) {
scan(name)
}

/**
* Gets the names of all external catalogs registered in this environment.
*
* @return A list of the names of all registered external catalogs.
*/
def listExternalCatalogs(): Array[String] = {
this.externalCatalogs.keySet.toArray
}

/**
* Registers an [[ExternalCatalog]] under a unique name in the TableEnvironment's schema.
* All tables registered in the [[ExternalCatalog]] can be accessed.
Expand Down Expand Up @@ -655,6 +664,31 @@ abstract class TableEnvironment(val config: TableConfig) {
*/
def connect(connectorDescriptor: ConnectorDescriptor): TableDescriptor

/**
* Connects to an external catalog from a descriptor.
*
* Descriptors allow for declaring the communication to external systems in an
* implementation-agnostic way. The classpath is scanned for suitable table factories that match
* the desired configuration.
*
* The following example shows how to read from an external catalog and
* registering it as "MyCatalog":
*
* {{{
*
* tableEnv
* .connect(
* new ExternalCatalogXYZ()
* .version("2.3.0"))
* .registerExternalCatalog("MyCatalog")
* }}}
*
* @param catalogDescriptor connector descriptor describing the external system
*/
def connect(catalogDescriptor: ExternalCatalogDescriptor): ConnectExternalCatalogDescriptor = {
new ConnectExternalCatalogDescriptor(this, catalogDescriptor)
}

private[flink] def scanInternal(tablePath: Array[String]): Option[Table] = {
require(tablePath != null && !tablePath.isEmpty, "tablePath must not be null or empty.")
val schemaPaths = tablePath.slice(0, tablePath.length - 1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,8 @@ class ExternalCatalogTableBuilder(private val connectorDescriptor: ConnectorDesc
* Explicitly declares this external table for supporting only batch environments.
*/
def supportsBatch(): ExternalCatalogTableBuilder = {
isBatch = false
isStreaming = true
isBatch = true
isStreaming = false
this
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.descriptors

import java.util

import org.apache.flink.table.api.TableEnvironment
import org.apache.flink.table.factories.TableFactoryUtil

/**
* Common class for external catalogs created
* with [[TableEnvironment.connect(ExternalCatalogDescriptor)]].
*/
class ConnectExternalCatalogDescriptor(
private val tableEnv: TableEnvironment,
private val catalogDescriptor: ExternalCatalogDescriptor)
extends DescriptorBase {

/**
* Searches for the specified external, configures it accordingly, and registers it as
* a catalog under the given name.
*
* @param name catalog name to be registered in the table environment
*/
def registerExternalCatalog(name: String): Unit = {
val externalCatalog = TableFactoryUtil.findAndCreateExternalCatalog(tableEnv, this)
tableEnv.registerExternalCatalog(name, externalCatalog)
}

// ----------------------------------------------------------------------------------------------

/**
* Converts this descriptor into a set of properties.
*/
override def toProperties: util.Map[String, String] = {
catalogDescriptor.toProperties
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util
import org.apache.flink.table.sinks.BatchTableSink

/**
* A factory to create configured table sink instances in a streaming environment based on
* A factory to create configured table sink instances in a batch environment based on
* string-based properties. See also [[TableFactory]] for more information.
*
* @tparam T type of records that the factory consumes
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.factories

import java.util

import org.apache.flink.table.catalog.ExternalCatalog

/**
* A factory to create configured external catalog instances based on string-based properties. See
* also [[TableFactory]] for more information.
*/
trait ExternalCatalogFactory extends TableFactory {

/**
* Creates and configures an [[org.apache.flink.table.catalog.ExternalCatalog]]
* using the given properties.
*
* @param properties normalized properties describing an external catalog.
* @return the configured external catalog.
*/
def createExternalCatalog(properties: util.Map[String, String]): ExternalCatalog
}
Loading