diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java b/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java index 2a6e71c3817c2..cf5f01502c83b 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java @@ -213,11 +213,15 @@ public static void validateValue(Schema schema, Object value) { validateValue(null, schema, value); } - public static void validateValue(String name, Schema schema, Object value) { + public static void validateValue(String field, Schema schema, Object value) { + validateValue(schema, value, field == null ? "value" : "field: \"" + field + "\""); + } + + private static void validateValue(Schema schema, Object value, String location) { if (value == null) { if (!schema.isOptional()) - throw new DataException("Invalid value: null used for required field: \"" + name - + "\", schema type: " + schema.type()); + throw new DataException("Invalid value: null used for required " + location + + ", schema type: " + schema.type()); return; } @@ -236,8 +240,8 @@ public static void validateValue(String name, Schema schema, Object value) { exceptionMessage.append(" \"").append(schema.name()).append("\""); } exceptionMessage.append(" with type ").append(schema.type()).append(": ").append(value.getClass()); - if (name != null) { - exceptionMessage.append(" for field: \"").append(name).append("\""); + if (location != null) { + exceptionMessage.append(" for ").append(location); } throw new DataException(exceptionMessage.toString()); } @@ -251,19 +255,33 @@ public static void validateValue(String name, Schema schema, Object value) { break; case ARRAY: List array = (List) value; - for (Object entry : array) - validateValue(schema.valueSchema(), entry); + String entryLocation = "element of array " + location; + Schema arrayValueSchema = assertSchemaNotNull(schema.valueSchema(), entryLocation); + for (Object entry : array) { + validateValue(arrayValueSchema, entry, entryLocation); + } break; case MAP: Map map = (Map) value; + String keyLocation = "key of map " + location; + String valueLocation = "value of map " + location; + Schema mapKeySchema = assertSchemaNotNull(schema.keySchema(), keyLocation); + Schema mapValueSchema = assertSchemaNotNull(schema.valueSchema(), valueLocation); for (Map.Entry entry : map.entrySet()) { - validateValue(schema.keySchema(), entry.getKey()); - validateValue(schema.valueSchema(), entry.getValue()); + validateValue(mapKeySchema, entry.getKey(), keyLocation); + validateValue(mapValueSchema, entry.getValue(), valueLocation); } break; } } + private static Schema assertSchemaNotNull(Schema schema, String location) { + if (schema == null) { + throw new DataException("No schema defined for " + location); + } + return schema; + } + private static List> expectedClassesFor(Schema schema) { List> expectedClasses = LOGICAL_TYPE_CLASSES.get(schema.name()); if (expectedClasses == null) diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/ConnectSchemaTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/ConnectSchemaTest.java index 25e6db34691f9..43c2342fe3b41 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/data/ConnectSchemaTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/ConnectSchemaTest.java @@ -330,4 +330,144 @@ public void testEmptyStruct() { new Struct(emptyStruct); } + private void assertInvalidValueForSchema(String fieldName, Schema schema, Object value, String message) { + Exception e = assertThrows(DataException.class, () -> ConnectSchema.validateValue(fieldName, schema, value)); + assertEquals(message, e.getMessage()); + } + + @Test + public void testValidateFieldWithInvalidValueType() { + String fieldName = "field"; + assertInvalidValueForSchema(fieldName, new FakeSchema(), new Object(), + "Invalid Java object for schema \"fake\" with type null: class java.lang.Object for field: \"field\""); + assertInvalidValueForSchema(null, Schema.INT8_SCHEMA, new Object(), + "Invalid Java object for schema with type INT8: class java.lang.Object for value"); + assertInvalidValueForSchema(fieldName, Schema.INT8_SCHEMA, new Object(), + "Invalid Java object for schema with type INT8: class java.lang.Object for field: \"field\""); + } + + @Test + public void testValidateFieldWithInvalidValueMismatchTimestamp() { + long longValue = 1000L; + String fieldName = "field"; + + ConnectSchema.validateValue(fieldName, Schema.INT64_SCHEMA, longValue); + + assertInvalidValueForSchema(fieldName, Timestamp.SCHEMA, longValue, + "Invalid Java object for schema \"org.apache.kafka.connect.data.Timestamp\" " + + "with type INT64: class java.lang.Long for field: \"field\""); + } + + @Test + public void testValidateList() { + String fieldName = "field"; + + // Optional element schema + Schema optionalStrings = SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA); + ConnectSchema.validateValue(fieldName, optionalStrings, Collections.emptyList()); + ConnectSchema.validateValue(fieldName, optionalStrings, Collections.singletonList("hello")); + ConnectSchema.validateValue(fieldName, optionalStrings, Collections.singletonList(null)); + ConnectSchema.validateValue(fieldName, optionalStrings, Arrays.asList("hello", "world")); + ConnectSchema.validateValue(fieldName, optionalStrings, Arrays.asList("hello", null)); + ConnectSchema.validateValue(fieldName, optionalStrings, Arrays.asList(null, "world")); + assertInvalidValueForSchema(fieldName, optionalStrings, Collections.singletonList(true), + "Invalid Java object for schema with type STRING: class java.lang.Boolean for element of array field: \"field\""); + + // Required element schema + Schema requiredStrings = SchemaBuilder.array(Schema.STRING_SCHEMA); + ConnectSchema.validateValue(fieldName, requiredStrings, Collections.emptyList()); + ConnectSchema.validateValue(fieldName, requiredStrings, Collections.singletonList("hello")); + assertInvalidValueForSchema(fieldName, requiredStrings, Collections.singletonList(null), + "Invalid value: null used for required element of array field: \"field\", schema type: STRING"); + ConnectSchema.validateValue(fieldName, requiredStrings, Arrays.asList("hello", "world")); + assertInvalidValueForSchema(fieldName, requiredStrings, Arrays.asList("hello", null), + "Invalid value: null used for required element of array field: \"field\", schema type: STRING"); + assertInvalidValueForSchema(fieldName, requiredStrings, Arrays.asList(null, "world"), + "Invalid value: null used for required element of array field: \"field\", schema type: STRING"); + assertInvalidValueForSchema(fieldName, optionalStrings, Collections.singletonList(true), + "Invalid Java object for schema with type STRING: class java.lang.Boolean for element of array field: \"field\""); + + // Null element schema + Schema nullElements = SchemaBuilder.type(Schema.Type.ARRAY); + assertInvalidValueForSchema(fieldName, nullElements, Collections.emptyList(), + "No schema defined for element of array field: \"field\""); + assertInvalidValueForSchema(fieldName, nullElements, Collections.singletonList("hello"), + "No schema defined for element of array field: \"field\""); + assertInvalidValueForSchema(fieldName, nullElements, Collections.singletonList(null), + "No schema defined for element of array field: \"field\""); + assertInvalidValueForSchema(fieldName, nullElements, Arrays.asList("hello", "world"), + "No schema defined for element of array field: \"field\""); + assertInvalidValueForSchema(fieldName, nullElements, Arrays.asList("hello", null), + "No schema defined for element of array field: \"field\""); + assertInvalidValueForSchema(fieldName, nullElements, Arrays.asList(null, "world"), + "No schema defined for element of array field: \"field\""); + assertInvalidValueForSchema(fieldName, nullElements, Collections.singletonList(true), + "No schema defined for element of array field: \"field\""); + } + + @Test + public void testValidateMap() { + String fieldName = "field"; + + // Optional element schema + Schema optionalStrings = SchemaBuilder.map(Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA); + ConnectSchema.validateValue(fieldName, optionalStrings, Collections.emptyMap()); + ConnectSchema.validateValue(fieldName, optionalStrings, Collections.singletonMap("key", "value")); + ConnectSchema.validateValue(fieldName, optionalStrings, Collections.singletonMap("key", null)); + ConnectSchema.validateValue(fieldName, optionalStrings, Collections.singletonMap(null, "value")); + ConnectSchema.validateValue(fieldName, optionalStrings, Collections.singletonMap(null, null)); + assertInvalidValueForSchema(fieldName, optionalStrings, Collections.singletonMap("key", true), + "Invalid Java object for schema with type STRING: class java.lang.Boolean for value of map field: \"field\""); + assertInvalidValueForSchema(fieldName, optionalStrings, Collections.singletonMap(true, "value"), + "Invalid Java object for schema with type STRING: class java.lang.Boolean for key of map field: \"field\""); + + // Required element schema + Schema requiredStrings = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA); + ConnectSchema.validateValue(fieldName, requiredStrings, Collections.emptyMap()); + ConnectSchema.validateValue(fieldName, requiredStrings, Collections.singletonMap("key", "value")); + assertInvalidValueForSchema(fieldName, requiredStrings, Collections.singletonMap("key", null), + "Invalid value: null used for required value of map field: \"field\", schema type: STRING"); + assertInvalidValueForSchema(fieldName, requiredStrings, Collections.singletonMap(null, "value"), + "Invalid value: null used for required key of map field: \"field\", schema type: STRING"); + assertInvalidValueForSchema(fieldName, requiredStrings, Collections.singletonMap(null, null), + "Invalid value: null used for required key of map field: \"field\", schema type: STRING"); + assertInvalidValueForSchema(fieldName, requiredStrings, Collections.singletonMap("key", true), + "Invalid Java object for schema with type STRING: class java.lang.Boolean for value of map field: \"field\""); + assertInvalidValueForSchema(fieldName, requiredStrings, Collections.singletonMap(true, "value"), + "Invalid Java object for schema with type STRING: class java.lang.Boolean for key of map field: \"field\""); + + // Null key schema + Schema nullKeys = SchemaBuilder.type(Schema.Type.MAP); + assertInvalidValueForSchema(fieldName, nullKeys, Collections.emptyMap(), + "No schema defined for key of map field: \"field\""); + assertInvalidValueForSchema(fieldName, nullKeys, Collections.singletonMap("key", "value"), + "No schema defined for key of map field: \"field\""); + assertInvalidValueForSchema(fieldName, nullKeys, Collections.singletonMap("key", null), + "No schema defined for key of map field: \"field\""); + assertInvalidValueForSchema(fieldName, nullKeys, Collections.singletonMap(null, "value"), + "No schema defined for key of map field: \"field\""); + assertInvalidValueForSchema(fieldName, nullKeys, Collections.singletonMap(null, null), + "No schema defined for key of map field: \"field\""); + assertInvalidValueForSchema(fieldName, nullKeys, Collections.singletonMap("key", true), + "No schema defined for key of map field: \"field\""); + assertInvalidValueForSchema(fieldName, nullKeys, Collections.singletonMap(true, "value"), + "No schema defined for key of map field: \"field\""); + + // Null value schema + Schema nullValues = SchemaBuilder.mapWithNullValues(Schema.OPTIONAL_STRING_SCHEMA); + assertInvalidValueForSchema(fieldName, nullValues, Collections.emptyMap(), + "No schema defined for value of map field: \"field\""); + assertInvalidValueForSchema(fieldName, nullValues, Collections.singletonMap("key", "value"), + "No schema defined for value of map field: \"field\""); + assertInvalidValueForSchema(fieldName, nullValues, Collections.singletonMap("key", null), + "No schema defined for value of map field: \"field\""); + assertInvalidValueForSchema(fieldName, nullValues, Collections.singletonMap(null, "value"), + "No schema defined for value of map field: \"field\""); + assertInvalidValueForSchema(fieldName, nullValues, Collections.singletonMap(null, null), + "No schema defined for value of map field: \"field\""); + assertInvalidValueForSchema(fieldName, nullValues, Collections.singletonMap("key", true), + "No schema defined for value of map field: \"field\""); + assertInvalidValueForSchema(fieldName, nullValues, Collections.singletonMap(true, "value"), + "No schema defined for value of map field: \"field\""); + } } diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java index 55ccc81beda2e..6dee26ca83ac5 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java @@ -304,39 +304,6 @@ public void testValidateStructWithNullValue() { e.getMessage()); } - @Test - public void testValidateFieldWithInvalidValueType() { - String fieldName = "field"; - FakeSchema fakeSchema = new FakeSchema(); - - Exception e = assertThrows(DataException.class, () -> ConnectSchema.validateValue(fieldName, - fakeSchema, new Object())); - assertEquals("Invalid Java object for schema \"fake\" with type null: class java.lang.Object for field: \"field\"", - e.getMessage()); - - e = assertThrows(DataException.class, () -> ConnectSchema.validateValue(fieldName, - Schema.INT8_SCHEMA, new Object())); - assertEquals("Invalid Java object for schema with type INT8: class java.lang.Object for field: \"field\"", - e.getMessage()); - - e = assertThrows(DataException.class, () -> ConnectSchema.validateValue(Schema.INT8_SCHEMA, new Object())); - assertEquals("Invalid Java object for schema with type INT8: class java.lang.Object", e.getMessage()); - } - - @Test - public void testValidateFieldWithInvalidValueMismatchTimestamp() { - String fieldName = "field"; - long longValue = 1000L; - - // Does not throw - ConnectSchema.validateValue(fieldName, Schema.INT64_SCHEMA, longValue); - - Exception e = assertThrows(DataException.class, () -> ConnectSchema.validateValue(fieldName, - Timestamp.SCHEMA, longValue)); - assertEquals("Invalid Java object for schema \"org.apache.kafka.connect.data.Timestamp\" " + - "with type INT64: class java.lang.Long for field: \"field\"", e.getMessage()); - } - @Test public void testPutNullField() { final String fieldName = "fieldName";