Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,7 @@ public void validateFixedIndexedProperties(String key, boolean allowEmpty, Map<S
}

// validate
for (int i = 0; i < maxIndex; i++) {
for (int i = 0; i <= maxIndex; i++) {
for (Map.Entry<String, Consumer<String>> subKey : subKeyValidation.entrySet()) {
final String fullKey = key + '.' + i + '.' + subKey.getKey();
if (properties.containsKey(fullKey)) {
Expand Down Expand Up @@ -1134,7 +1134,7 @@ public void validateArray(String key, Consumer<String> elementValidation, int mi
}

// validate array elements
for (int i = 0; i < maxIndex; i++) {
for (int i = 0; i <= maxIndex; i++) {
final String fullKey = key + '.' + i;
if (properties.containsKey(fullKey)) {
// run validation logic
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package org.apache.flink.table.descriptors

import java.util
import java.util.Collections
import java.util.function.Consumer

import org.apache.flink.table.api.ValidationException
import org.apache.flink.table.util.JavaScalaConversionUtil.toJava
Expand All @@ -32,6 +33,9 @@ import org.junit.Test
class DescriptorPropertiesTest {

private val ARRAY_KEY = "my-array"
private val FIXED_INDEXED_PROPERTY_KEY = "my-fixed-indexed-property"
private val PROPERTY_1_KEY = "property-1"
private val PROPERTY_2_KEY = "property-2"

@Test
def testEquals(): Unit = {
Expand Down Expand Up @@ -97,8 +101,8 @@ class DescriptorPropertiesTest {
def testArrayInvalidValues(): Unit = {
val properties = new DescriptorProperties()
properties.putString(s"$ARRAY_KEY.0", "12")
properties.putString(s"$ARRAY_KEY.1", "INVALID")
properties.putString(s"$ARRAY_KEY.2", "66")
properties.putString(s"$ARRAY_KEY.1", "66")
properties.putString(s"$ARRAY_KEY.2", "INVALID")

testArrayValidation(properties, 1, Integer.MAX_VALUE)
}
Expand All @@ -118,6 +122,19 @@ class DescriptorPropertiesTest {
testArrayValidation(properties, 1, Integer.MAX_VALUE)
}

@Test(expected = classOf[ValidationException])
def testInvalidFixedIndexedProperties(): Unit = {
val property = new DescriptorProperties()
val list = new util.ArrayList[util.List[String]]()
list.add(util.Arrays.asList("1", "string"))
list.add(util.Arrays.asList("INVALID", "string"))
property.putIndexedFixedProperties(
FIXED_INDEXED_PROPERTY_KEY,
util.Arrays.asList(PROPERTY_1_KEY, PROPERTY_2_KEY),
list)
testFixedIndexedPropertiesValidation(property)
}

@Test
def testRemoveKeys(): Unit = {
val properties = new DescriptorProperties()
Expand Down Expand Up @@ -155,7 +172,7 @@ class DescriptorPropertiesTest {
minLength: Int,
maxLength: Int)
: Unit = {
val validator: (String) => Unit = (key: String) => {
val validator: String => Unit = (key: String) => {
properties.validateInt(key, false)
}

Expand All @@ -165,4 +182,26 @@ class DescriptorPropertiesTest {
minLength,
maxLength)
}

private def testFixedIndexedPropertiesValidation(properties: DescriptorProperties): Unit = {

val validatorMap = new util.HashMap[String, Consumer[String]]()

// PROPERTY_1 should be Int
val validator1: String => Unit = (key: String) => {
properties.validateInt(key, false)
}
validatorMap.put(PROPERTY_1_KEY, toJava(validator1))
// PROPERTY_2 should be String
val validator2: String => Unit = (key: String) => {
properties.validateString(key, false)
}
validatorMap.put(PROPERTY_2_KEY, toJava(validator2))

properties.validateFixedIndexedProperties(
FIXED_INDEXED_PROPERTY_KEY,
false,
validatorMap
)
}
}