From 1e290a7a38ab7216a2d0e881df3b1a06e5c1bce2 Mon Sep 17 00:00:00 2001 From: Xingcan Cui Date: Tue, 25 Dec 2018 12:03:39 +0800 Subject: [PATCH 1/2] [FLINK-11227] [table] The DescriptorProperties contains some bounds checking errors --- .../descriptors/DescriptorProperties.java | 4 +- .../DescriptorPropertiesTest.scala | 45 +++++++++++++++++-- 2 files changed, 44 insertions(+), 5 deletions(-) diff --git a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java index c47698147284e..76289128b5bca 100644 --- a/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java +++ b/flink-libraries/flink-table-common/src/main/java/org/apache/flink/table/descriptors/DescriptorProperties.java @@ -938,7 +938,7 @@ public void validateFixedIndexedProperties(String key, boolean allowEmpty, Map> subKey : subKeyValidation.entrySet()) { final String fullKey = key + '.' + i + '.' + subKey.getKey(); if (properties.containsKey(fullKey)) { @@ -1134,7 +1134,7 @@ public void validateArray(String key, Consumer elementValidation, int mi } // validate array elements - for (int i = 0; i < maxIndex; i++) { + for (int i = 0; i <= maxIndex; i++) { final String fullKey = key + '.' + i; if (properties.containsKey(fullKey)) { // run validation logic diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorPropertiesTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorPropertiesTest.scala index b2a8ec9f567d8..cf2ea9e022105 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorPropertiesTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorPropertiesTest.scala @@ -20,6 +20,7 @@ package org.apache.flink.table.descriptors import java.util import java.util.Collections +import java.util.function.Consumer import org.apache.flink.table.api.ValidationException import org.apache.flink.table.util.JavaScalaConversionUtil.toJava @@ -32,6 +33,9 @@ import org.junit.Test class DescriptorPropertiesTest { private val ARRAY_KEY = "my-array" + private val FIXED_INDEXED_PROPERTY_KEY = "my-fixed-indexed-property" + private val PROPERTY_1_KEY = "property-1" + private val PROPERTY_2_KEY = "property-2" @Test def testEquals(): Unit = { @@ -97,8 +101,8 @@ class DescriptorPropertiesTest { def testArrayInvalidValues(): Unit = { val properties = new DescriptorProperties() properties.putString(s"$ARRAY_KEY.0", "12") - properties.putString(s"$ARRAY_KEY.1", "INVALID") - properties.putString(s"$ARRAY_KEY.2", "66") + properties.putString(s"$ARRAY_KEY.1", "66") + properties.putString(s"$ARRAY_KEY.2", "INVALID") testArrayValidation(properties, 1, Integer.MAX_VALUE) } @@ -118,6 +122,19 @@ class DescriptorPropertiesTest { testArrayValidation(properties, 1, Integer.MAX_VALUE) } + @Test(expected = classOf[ValidationException]) + def testInvalidFixedIndexedProperty(): Unit = { + val property = new DescriptorProperties() + val list = new util.ArrayList[util.List[String]]() + list.add(util.Arrays.asList("1", "string")) + list.add(util.Arrays.asList("INVALID", "string")) + property.putIndexedFixedProperties( + FIXED_INDEXED_PROPERTY_KEY, + util.Arrays.asList(PROPERTY_1_KEY, PROPERTY_2_KEY), + list) + testFixedFieldsValidation(property) + } + @Test def testRemoveKeys(): Unit = { val properties = new DescriptorProperties() @@ -155,7 +172,7 @@ class DescriptorPropertiesTest { minLength: Int, maxLength: Int) : Unit = { - val validator: (String) => Unit = (key: String) => { + val validator: String => Unit = (key: String) => { properties.validateInt(key, false) } @@ -165,4 +182,26 @@ class DescriptorPropertiesTest { minLength, maxLength) } + + private def testFixedFieldsValidation(properties: DescriptorProperties): Unit = { + + val validatorMap = new util.HashMap[String, Consumer[String]]() + + // PROPERTY_1 should be Int + val validator1: String => Unit = (key: String) => { + properties.validateInt(key, false) + } + validatorMap.put(PROPERTY_1_KEY, toJava(validator1)) + // PROPERTY_2 should be String + val validator2: String => Unit = (key: String) => { + properties.validateString(key, false) + } + validatorMap.put(PROPERTY_2_KEY, toJava(validator2)) + + properties.validateFixedIndexedProperties( + FIXED_INDEXED_PROPERTY_KEY, + false, + validatorMap + ) + } } From cf017f0d0de3c8cf1e89630883f8dcf191f3ab3e Mon Sep 17 00:00:00 2001 From: Xingcan Cui Date: Mon, 31 Dec 2018 22:07:42 +0800 Subject: [PATCH 2/2] Feedback addressed --- .../flink/table/descriptors/DescriptorPropertiesTest.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorPropertiesTest.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorPropertiesTest.scala index cf2ea9e022105..fe7c75df6cf78 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorPropertiesTest.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/descriptors/DescriptorPropertiesTest.scala @@ -123,7 +123,7 @@ class DescriptorPropertiesTest { } @Test(expected = classOf[ValidationException]) - def testInvalidFixedIndexedProperty(): Unit = { + def testInvalidFixedIndexedProperties(): Unit = { val property = new DescriptorProperties() val list = new util.ArrayList[util.List[String]]() list.add(util.Arrays.asList("1", "string")) @@ -132,7 +132,7 @@ class DescriptorPropertiesTest { FIXED_INDEXED_PROPERTY_KEY, util.Arrays.asList(PROPERTY_1_KEY, PROPERTY_2_KEY), list) - testFixedFieldsValidation(property) + testFixedIndexedPropertiesValidation(property) } @Test @@ -183,7 +183,7 @@ class DescriptorPropertiesTest { maxLength) } - private def testFixedFieldsValidation(properties: DescriptorProperties): Unit = { + private def testFixedIndexedPropertiesValidation(properties: DescriptorProperties): Unit = { val validatorMap = new util.HashMap[String, Consumer[String]]()