Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ protected final boolean isFormatNeeded() {

/**
* Converts this descriptor into a set of connector properties. Usually prefixed with
* {@link FormatDescriptorValidator#FORMAT}.
* {@link ConnectorDescriptorValidator#CONNECTOR}.
*/
protected abstract Map<String, String> toConnectorProperties();
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,11 @@
@Internal
public abstract class ConnectorDescriptorValidator implements DescriptorValidator {

/**
* Prefix for connector-related properties.
*/
public static final String CONNECTOR = "connector";

/**
* Key for describing the type of the connector. Usually used for factory discovery.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,8 +270,8 @@ class ExternalCatalogTableBuilder(private val connectorDescriptor: ConnectorDesc
* Explicitly declares this external table for supporting only batch environments.
*/
def supportsBatch(): ExternalCatalogTableBuilder = {
isBatch = false
isStreaming = true
isBatch = true
isStreaming = false
this
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import java.util
import org.apache.flink.table.sinks.BatchTableSink

/**
* A factory to create configured table sink instances in a streaming environment based on
* A factory to create configured table sink instances in a batch environment based on
* string-based properties. See also [[TableFactory]] for more information.
*
* @tparam T type of records that the factory consumes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ object CommonTestData {
.withSchema(schemaDesc1)

if (isStreaming) {
externalTableBuilder1.inAppendMode()
externalTableBuilder1.supportsStreaming().inAppendMode()
} else {
externalTableBuilder1.supportsBatch()
}

val csvRecord2 = Seq(
Expand Down Expand Up @@ -126,7 +128,9 @@ object CommonTestData {
.withSchema(schemaDesc2)

if (isStreaming) {
externalTableBuilder2.inAppendMode()
externalTableBuilder2.supportsStreaming().inAppendMode()
} else {
externalTableBuilder2.supportsBatch()
}

val tempFilePath3 = writeToTempFile("", "csv-test3", "tmp")
Expand All @@ -145,7 +149,9 @@ object CommonTestData {
.withSchema(schemaDesc3)

if (isStreaming) {
externalTableBuilder3.inAppendMode()
externalTableBuilder3.supportsStreaming().inAppendMode()
} else {
externalTableBuilder3.supportsBatch()
}

val catalog = new InMemoryExternalCatalog("test")
Expand Down