From 15174554fb4abaf6c968d45f066157dcd7a809ab Mon Sep 17 00:00:00 2001 From: EronWright Date: Sun, 30 Dec 2018 19:59:47 -0800 Subject: [PATCH 1/2] FLINK-11234 - ExternalTableCatalogBuilder unable to build a batch-only table - fix the logic in supportsBatch to properly declare a batch-only table - adjust CommonTestData to provide batch-only or streaming-only tables Signed-off-by: EronWright --- .../flink/table/catalog/ExternalCatalogTable.scala | 4 ++-- .../flink/table/runtime/utils/CommonTestData.scala | 12 +++++++++--- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala index 45414ee3ff6e7..ce57070ac98d7 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/ExternalCatalogTable.scala @@ -270,8 +270,8 @@ class ExternalCatalogTableBuilder(private val connectorDescriptor: ConnectorDesc * Explicitly declares this external table for supporting only batch environments. */ def supportsBatch(): ExternalCatalogTableBuilder = { - isBatch = false - isStreaming = true + isBatch = true + isStreaming = false this } diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala index 64fcc8ac7c480..1209595837bf3 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/utils/CommonTestData.scala @@ -85,7 +85,9 @@ object CommonTestData { .withSchema(schemaDesc1) if (isStreaming) { - externalTableBuilder1.inAppendMode() + externalTableBuilder1.supportsStreaming().inAppendMode() + } else { + externalTableBuilder1.supportsBatch() } val csvRecord2 = Seq( @@ -126,7 +128,9 @@ object CommonTestData { .withSchema(schemaDesc2) if (isStreaming) { - externalTableBuilder2.inAppendMode() + externalTableBuilder2.supportsStreaming().inAppendMode() + } else { + externalTableBuilder2.supportsBatch() } val tempFilePath3 = writeToTempFile("", "csv-test3", "tmp") @@ -145,7 +149,9 @@ object CommonTestData { .withSchema(schemaDesc3) if (isStreaming) { - externalTableBuilder3.inAppendMode() + externalTableBuilder3.supportsStreaming().inAppendMode() + } else { + externalTableBuilder3.supportsBatch() } val catalog = new InMemoryExternalCatalog("test") From 765a19e2c9924bc3487a39b8a554ee76452332b8 Mon Sep 17 00:00:00 2001 From: EronWright Date: Mon, 31 Dec 2018 09:02:17 -0800 Subject: [PATCH 2/2] [hotfix][table] Fix typos in Table javadoc. Signed-off-by: EronWright --- .../apache/flink/table/descriptors/ConnectorDescriptor.java | 2 +- .../table/descriptors/ConnectorDescriptorValidator.java | 5 +++++ .../apache/flink/table/factories/BatchTableSinkFactory.scala | 2 +- 3 files changed, 7 insertions(+), 2 deletions(-) diff --git a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ConnectorDescriptor.java b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ConnectorDescriptor.java index a1d3e783cdc1c..ed62b46b36b50 100644 --- a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ConnectorDescriptor.java +++ b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ConnectorDescriptor.java @@ -68,7 +68,7 @@ protected final boolean isFormatNeeded() { /** * Converts this descriptor into a set of connector properties. Usually prefixed with - * {@link FormatDescriptorValidator#FORMAT}. + * {@link ConnectorDescriptorValidator#CONNECTOR}. */ protected abstract Map toConnectorProperties(); } diff --git a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.java b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.java index 88cf34905f452..cf9860ae69e2a 100644 --- a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.java +++ b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/ConnectorDescriptorValidator.java @@ -26,6 +26,11 @@ @Internal public abstract class ConnectorDescriptorValidator implements DescriptorValidator { + /** + * Prefix for connector-related properties. + */ + public static final String CONNECTOR = "connector"; + /** * Key for describing the type of the connector. Usually used for factory discovery. */ diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/BatchTableSinkFactory.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/BatchTableSinkFactory.scala index 6fd1f7afb16b5..0b685239e05aa 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/BatchTableSinkFactory.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/factories/BatchTableSinkFactory.scala @@ -23,7 +23,7 @@ import java.util import org.apache.flink.table.sinks.BatchTableSink /** - * A factory to create configured table sink instances in a streaming environment based on + * A factory to create configured table sink instances in a batch environment based on * string-based properties. See also [[TableFactory]] for more information. * * @tparam T type of records that the factory consumes