From 13870ad3cb08a5f9e05a0acdb8f73b05bd07c6a9 Mon Sep 17 00:00:00 2001 From: Greg Harris Date: Fri, 31 May 2024 16:06:46 -0700 Subject: [PATCH 1/3] KAFKA-16858: Add tests for null array schemas Signed-off-by: Greg Harris --- .../kafka/connect/data/ConnectSchemaTest.java | 109 ++++++++++++++++++ .../apache/kafka/connect/data/StructTest.java | 33 ------ 2 files changed, 109 insertions(+), 33 deletions(-) diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/ConnectSchemaTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/ConnectSchemaTest.java index 25e6db34691f9..1125cffe80fba 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/data/ConnectSchemaTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/ConnectSchemaTest.java @@ -330,4 +330,113 @@ public void testEmptyStruct() { new Struct(emptyStruct); } + private void assertInvalidValueForSchema(String fieldName, Schema schema, Object value, String message) { + Exception e = assertThrows(DataException.class, () -> ConnectSchema.validateValue(fieldName, schema, value)); + assertEquals(message, e.getMessage()); + } + + @Test + public void testValidateFieldWithInvalidValueType() { + String fieldName = "field"; + assertInvalidValueForSchema(fieldName, new FakeSchema(), new Object(), + "Invalid Java object for schema \"fake\" with type null: class java.lang.Object for field: \"field\""); + assertInvalidValueForSchema(fieldName, Schema.INT8_SCHEMA, new Object(), + "Invalid Java object for schema with type INT8: class java.lang.Object for field: \"field\""); + assertInvalidValueForSchema(fieldName, Schema.INT8_SCHEMA, new Object(), + "Invalid Java object for schema with type INT8: class java.lang.Object for field: \"field\""); + } + + @Test + public void testValidateFieldWithInvalidValueMismatchTimestamp() { + long longValue = 1000L; + String fieldName = "field"; + + ConnectSchema.validateValue(fieldName, Schema.INT64_SCHEMA, longValue); + + assertInvalidValueForSchema(fieldName, Timestamp.SCHEMA, longValue, + "Invalid Java object for schema \"org.apache.kafka.connect.data.Timestamp\" " + + "with type INT64: class java.lang.Long for field: \"field\""); + } + + @Test + public void testValidateList() { + String fieldName = "field"; + + // Optional element schema + Schema optionalStrings = SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA); + ConnectSchema.validateValue(fieldName, optionalStrings, Collections.emptyList()); + ConnectSchema.validateValue(fieldName, optionalStrings, Collections.singletonList("hello")); + ConnectSchema.validateValue(fieldName, optionalStrings, Collections.singletonList(null)); + ConnectSchema.validateValue(fieldName, optionalStrings, Arrays.asList("hello", "world")); + ConnectSchema.validateValue(fieldName, optionalStrings, Arrays.asList("hello", null)); + ConnectSchema.validateValue(fieldName, optionalStrings, Arrays.asList(null, "world")); + assertInvalidValueForSchema(fieldName, optionalStrings, Collections.singletonList(true), + "Invalid Java object for schema with type STRING: class java.lang.Boolean"); + + // Required element schema + Schema requiredStrings = SchemaBuilder.array(Schema.STRING_SCHEMA); + ConnectSchema.validateValue(fieldName, requiredStrings, Collections.emptyList()); + ConnectSchema.validateValue(fieldName, requiredStrings, Collections.singletonList("hello")); + assertInvalidValueForSchema(fieldName, requiredStrings, Collections.singletonList(null), + "Invalid value: null used for required field: \"null\", schema type: STRING"); + ConnectSchema.validateValue(fieldName, requiredStrings, Arrays.asList("hello", "world")); + assertInvalidValueForSchema(fieldName, requiredStrings, Arrays.asList("hello", null), + "Invalid value: null used for required field: \"null\", schema type: STRING"); + assertInvalidValueForSchema(fieldName, requiredStrings, Arrays.asList(null, "world"), + "Invalid value: null used for required field: \"null\", schema type: STRING"); + assertInvalidValueForSchema(fieldName, optionalStrings, Collections.singletonList(true), + "Invalid Java object for schema with type STRING: class java.lang.Boolean"); + + // Null element schema + Schema nullElements = SchemaBuilder.type(Schema.Type.ARRAY); + ConnectSchema.validateValue(fieldName, nullElements, Collections.emptyList()); + assertThrows(NullPointerException.class, () -> ConnectSchema.validateValue(fieldName, nullElements, Collections.singletonList("hello"))); + assertThrows(NullPointerException.class, () -> ConnectSchema.validateValue(fieldName, nullElements, Collections.singletonList(null))); + assertThrows(NullPointerException.class, () -> ConnectSchema.validateValue(fieldName, nullElements, Arrays.asList("hello", "world"))); + assertThrows(NullPointerException.class, () -> ConnectSchema.validateValue(fieldName, nullElements, Arrays.asList("hello", null))); + assertThrows(NullPointerException.class, () -> ConnectSchema.validateValue(fieldName, nullElements, Arrays.asList(null, "world"))); + assertThrows(NullPointerException.class, () -> ConnectSchema.validateValue(fieldName, nullElements, Collections.singletonList(true))); + } + + @Test + public void testValidateMap() { + String fieldName = "field"; + + // Optional element schema + Schema optionalStrings = SchemaBuilder.map(Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA); + ConnectSchema.validateValue(fieldName, optionalStrings, Collections.emptyMap()); + ConnectSchema.validateValue(fieldName, optionalStrings, Collections.singletonMap("key", "value")); + ConnectSchema.validateValue(fieldName, optionalStrings, Collections.singletonMap("key", null)); + ConnectSchema.validateValue(fieldName, optionalStrings, Collections.singletonMap(null, "value")); + ConnectSchema.validateValue(fieldName, optionalStrings, Collections.singletonMap(null, null)); + assertInvalidValueForSchema(fieldName, optionalStrings, Collections.singletonMap("key", true), + "Invalid Java object for schema with type STRING: class java.lang.Boolean"); + assertInvalidValueForSchema(fieldName, optionalStrings, Collections.singletonMap(true, "value"), + "Invalid Java object for schema with type STRING: class java.lang.Boolean"); + + // Required element schema + Schema requiredStrings = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA); + ConnectSchema.validateValue(fieldName, requiredStrings, Collections.emptyMap()); + ConnectSchema.validateValue(fieldName, requiredStrings, Collections.singletonMap("key", "value")); + assertInvalidValueForSchema(fieldName, requiredStrings, Collections.singletonMap("key", null), + "Invalid value: null used for required field: \"null\", schema type: STRING"); + assertInvalidValueForSchema(fieldName, requiredStrings, Collections.singletonMap(null, "value"), + "Invalid value: null used for required field: \"null\", schema type: STRING"); + assertInvalidValueForSchema(fieldName, requiredStrings, Collections.singletonMap(null, null), + "Invalid value: null used for required field: \"null\", schema type: STRING"); + assertInvalidValueForSchema(fieldName, requiredStrings, Collections.singletonMap("key", true), + "Invalid Java object for schema with type STRING: class java.lang.Boolean"); + assertInvalidValueForSchema(fieldName, requiredStrings, Collections.singletonMap(true, "value"), + "Invalid Java object for schema with type STRING: class java.lang.Boolean"); + + // Null element schema + Schema nullElements = SchemaBuilder.type(Schema.Type.MAP); + ConnectSchema.validateValue(fieldName, nullElements, Collections.emptyMap()); + assertThrows(NullPointerException.class, () -> ConnectSchema.validateValue(fieldName, nullElements, Collections.singletonMap("key", "value"))); + assertThrows(NullPointerException.class, () -> ConnectSchema.validateValue(fieldName, nullElements, Collections.singletonMap("key", null))); + assertThrows(NullPointerException.class, () -> ConnectSchema.validateValue(fieldName, nullElements, Collections.singletonMap(null, "value"))); + assertThrows(NullPointerException.class, () -> ConnectSchema.validateValue(fieldName, nullElements, Collections.singletonMap(null, null))); + assertThrows(NullPointerException.class, () -> ConnectSchema.validateValue(fieldName, nullElements, Collections.singletonMap("key", true))); + assertThrows(NullPointerException.class, () -> ConnectSchema.validateValue(fieldName, nullElements, Collections.singletonMap(true, "value"))); + } } diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java index 55ccc81beda2e..6dee26ca83ac5 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java @@ -304,39 +304,6 @@ public void testValidateStructWithNullValue() { e.getMessage()); } - @Test - public void testValidateFieldWithInvalidValueType() { - String fieldName = "field"; - FakeSchema fakeSchema = new FakeSchema(); - - Exception e = assertThrows(DataException.class, () -> ConnectSchema.validateValue(fieldName, - fakeSchema, new Object())); - assertEquals("Invalid Java object for schema \"fake\" with type null: class java.lang.Object for field: \"field\"", - e.getMessage()); - - e = assertThrows(DataException.class, () -> ConnectSchema.validateValue(fieldName, - Schema.INT8_SCHEMA, new Object())); - assertEquals("Invalid Java object for schema with type INT8: class java.lang.Object for field: \"field\"", - e.getMessage()); - - e = assertThrows(DataException.class, () -> ConnectSchema.validateValue(Schema.INT8_SCHEMA, new Object())); - assertEquals("Invalid Java object for schema with type INT8: class java.lang.Object", e.getMessage()); - } - - @Test - public void testValidateFieldWithInvalidValueMismatchTimestamp() { - String fieldName = "field"; - long longValue = 1000L; - - // Does not throw - ConnectSchema.validateValue(fieldName, Schema.INT64_SCHEMA, longValue); - - Exception e = assertThrows(DataException.class, () -> ConnectSchema.validateValue(fieldName, - Timestamp.SCHEMA, longValue)); - assertEquals("Invalid Java object for schema \"org.apache.kafka.connect.data.Timestamp\" " + - "with type INT64: class java.lang.Long for field: \"field\"", e.getMessage()); - } - @Test public void testPutNullField() { final String fieldName = "fieldName"; From 963a34c324ce1df7d9aca2ea081e7b82c9bba338 Mon Sep 17 00:00:00 2001 From: Greg Harris Date: Fri, 31 May 2024 16:37:05 -0700 Subject: [PATCH 2/3] KAFKA-16858: Throw DataException from validateValue when array and map schemas don't have inner schemas Signed-off-by: Greg Harris --- .../kafka/connect/data/ConnectSchema.java | 19 ++++-- .../kafka/connect/data/ConnectSchemaTest.java | 66 +++++++++++-------- 2 files changed, 55 insertions(+), 30 deletions(-) diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java b/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java index 2a6e71c3817c2..5a8779bbf9f93 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java @@ -251,19 +251,30 @@ public static void validateValue(String name, Schema schema, Object value) { break; case ARRAY: List array = (List) value; - for (Object entry : array) - validateValue(schema.valueSchema(), entry); + Schema arrayValueSchema = assertSchemaNotNull(name, "elements", schema.valueSchema()); + for (Object entry : array) { + validateValue("entry", arrayValueSchema, entry); + } break; case MAP: Map map = (Map) value; + Schema mapKeySchema = assertSchemaNotNull(name, "keys", schema.keySchema()); + Schema mapValueSchema = assertSchemaNotNull(name, "values", schema.valueSchema()); for (Map.Entry entry : map.entrySet()) { - validateValue(schema.keySchema(), entry.getKey()); - validateValue(schema.valueSchema(), entry.getValue()); + validateValue("key", mapKeySchema, entry.getKey()); + validateValue("value", mapValueSchema, entry.getValue()); } break; } } + private static Schema assertSchemaNotNull(String fieldName, String innerName, Schema schema) { + if (schema == null) { + throw new DataException("No schema defined for " + innerName + " of field: \"" + fieldName + "\""); + } + return schema; + } + private static List> expectedClassesFor(Schema schema) { List> expectedClasses = LOGICAL_TYPE_CLASSES.get(schema.name()); if (expectedClasses == null) diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/ConnectSchemaTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/ConnectSchemaTest.java index 1125cffe80fba..2c4f63dac8739 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/data/ConnectSchemaTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/ConnectSchemaTest.java @@ -371,31 +371,38 @@ public void testValidateList() { ConnectSchema.validateValue(fieldName, optionalStrings, Arrays.asList("hello", null)); ConnectSchema.validateValue(fieldName, optionalStrings, Arrays.asList(null, "world")); assertInvalidValueForSchema(fieldName, optionalStrings, Collections.singletonList(true), - "Invalid Java object for schema with type STRING: class java.lang.Boolean"); + "Invalid Java object for schema with type STRING: class java.lang.Boolean for field: \"entry\""); // Required element schema Schema requiredStrings = SchemaBuilder.array(Schema.STRING_SCHEMA); ConnectSchema.validateValue(fieldName, requiredStrings, Collections.emptyList()); ConnectSchema.validateValue(fieldName, requiredStrings, Collections.singletonList("hello")); assertInvalidValueForSchema(fieldName, requiredStrings, Collections.singletonList(null), - "Invalid value: null used for required field: \"null\", schema type: STRING"); + "Invalid value: null used for required field: \"entry\", schema type: STRING"); ConnectSchema.validateValue(fieldName, requiredStrings, Arrays.asList("hello", "world")); assertInvalidValueForSchema(fieldName, requiredStrings, Arrays.asList("hello", null), - "Invalid value: null used for required field: \"null\", schema type: STRING"); + "Invalid value: null used for required field: \"entry\", schema type: STRING"); assertInvalidValueForSchema(fieldName, requiredStrings, Arrays.asList(null, "world"), - "Invalid value: null used for required field: \"null\", schema type: STRING"); + "Invalid value: null used for required field: \"entry\", schema type: STRING"); assertInvalidValueForSchema(fieldName, optionalStrings, Collections.singletonList(true), - "Invalid Java object for schema with type STRING: class java.lang.Boolean"); + "Invalid Java object for schema with type STRING: class java.lang.Boolean for field: \"entry\""); // Null element schema Schema nullElements = SchemaBuilder.type(Schema.Type.ARRAY); - ConnectSchema.validateValue(fieldName, nullElements, Collections.emptyList()); - assertThrows(NullPointerException.class, () -> ConnectSchema.validateValue(fieldName, nullElements, Collections.singletonList("hello"))); - assertThrows(NullPointerException.class, () -> ConnectSchema.validateValue(fieldName, nullElements, Collections.singletonList(null))); - assertThrows(NullPointerException.class, () -> ConnectSchema.validateValue(fieldName, nullElements, Arrays.asList("hello", "world"))); - assertThrows(NullPointerException.class, () -> ConnectSchema.validateValue(fieldName, nullElements, Arrays.asList("hello", null))); - assertThrows(NullPointerException.class, () -> ConnectSchema.validateValue(fieldName, nullElements, Arrays.asList(null, "world"))); - assertThrows(NullPointerException.class, () -> ConnectSchema.validateValue(fieldName, nullElements, Collections.singletonList(true))); + assertInvalidValueForSchema(fieldName, nullElements, Collections.emptyList(), + "No schema defined for elements of field: \"field\""); + assertInvalidValueForSchema(fieldName, nullElements, Collections.singletonList("hello"), + "No schema defined for elements of field: \"field\""); + assertInvalidValueForSchema(fieldName, nullElements, Collections.singletonList(null), + "No schema defined for elements of field: \"field\""); + assertInvalidValueForSchema(fieldName, nullElements, Arrays.asList("hello", "world"), + "No schema defined for elements of field: \"field\""); + assertInvalidValueForSchema(fieldName, nullElements, Arrays.asList("hello", null), + "No schema defined for elements of field: \"field\""); + assertInvalidValueForSchema(fieldName, nullElements, Arrays.asList(null, "world"), + "No schema defined for elements of field: \"field\""); + assertInvalidValueForSchema(fieldName, nullElements, Collections.singletonList(true), + "No schema defined for elements of field: \"field\""); } @Test @@ -410,33 +417,40 @@ public void testValidateMap() { ConnectSchema.validateValue(fieldName, optionalStrings, Collections.singletonMap(null, "value")); ConnectSchema.validateValue(fieldName, optionalStrings, Collections.singletonMap(null, null)); assertInvalidValueForSchema(fieldName, optionalStrings, Collections.singletonMap("key", true), - "Invalid Java object for schema with type STRING: class java.lang.Boolean"); + "Invalid Java object for schema with type STRING: class java.lang.Boolean for field: \"value\""); assertInvalidValueForSchema(fieldName, optionalStrings, Collections.singletonMap(true, "value"), - "Invalid Java object for schema with type STRING: class java.lang.Boolean"); + "Invalid Java object for schema with type STRING: class java.lang.Boolean for field: \"key\""); // Required element schema Schema requiredStrings = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA); ConnectSchema.validateValue(fieldName, requiredStrings, Collections.emptyMap()); ConnectSchema.validateValue(fieldName, requiredStrings, Collections.singletonMap("key", "value")); assertInvalidValueForSchema(fieldName, requiredStrings, Collections.singletonMap("key", null), - "Invalid value: null used for required field: \"null\", schema type: STRING"); + "Invalid value: null used for required field: \"value\", schema type: STRING"); assertInvalidValueForSchema(fieldName, requiredStrings, Collections.singletonMap(null, "value"), - "Invalid value: null used for required field: \"null\", schema type: STRING"); + "Invalid value: null used for required field: \"key\", schema type: STRING"); assertInvalidValueForSchema(fieldName, requiredStrings, Collections.singletonMap(null, null), - "Invalid value: null used for required field: \"null\", schema type: STRING"); + "Invalid value: null used for required field: \"key\", schema type: STRING"); assertInvalidValueForSchema(fieldName, requiredStrings, Collections.singletonMap("key", true), - "Invalid Java object for schema with type STRING: class java.lang.Boolean"); + "Invalid Java object for schema with type STRING: class java.lang.Boolean for field: \"value\""); assertInvalidValueForSchema(fieldName, requiredStrings, Collections.singletonMap(true, "value"), - "Invalid Java object for schema with type STRING: class java.lang.Boolean"); + "Invalid Java object for schema with type STRING: class java.lang.Boolean for field: \"key\""); // Null element schema Schema nullElements = SchemaBuilder.type(Schema.Type.MAP); - ConnectSchema.validateValue(fieldName, nullElements, Collections.emptyMap()); - assertThrows(NullPointerException.class, () -> ConnectSchema.validateValue(fieldName, nullElements, Collections.singletonMap("key", "value"))); - assertThrows(NullPointerException.class, () -> ConnectSchema.validateValue(fieldName, nullElements, Collections.singletonMap("key", null))); - assertThrows(NullPointerException.class, () -> ConnectSchema.validateValue(fieldName, nullElements, Collections.singletonMap(null, "value"))); - assertThrows(NullPointerException.class, () -> ConnectSchema.validateValue(fieldName, nullElements, Collections.singletonMap(null, null))); - assertThrows(NullPointerException.class, () -> ConnectSchema.validateValue(fieldName, nullElements, Collections.singletonMap("key", true))); - assertThrows(NullPointerException.class, () -> ConnectSchema.validateValue(fieldName, nullElements, Collections.singletonMap(true, "value"))); + assertInvalidValueForSchema(fieldName, nullElements, Collections.emptyMap(), + "No schema defined for keys of field: \"field\""); + assertInvalidValueForSchema(fieldName, nullElements, Collections.singletonMap("key", "value"), + "No schema defined for keys of field: \"field\""); + assertInvalidValueForSchema(fieldName, nullElements, Collections.singletonMap("key", null), + "No schema defined for keys of field: \"field\""); + assertInvalidValueForSchema(fieldName, nullElements, Collections.singletonMap(null, "value"), + "No schema defined for keys of field: \"field\""); + assertInvalidValueForSchema(fieldName, nullElements, Collections.singletonMap(null, null), + "No schema defined for keys of field: \"field\""); + assertInvalidValueForSchema(fieldName, nullElements, Collections.singletonMap("key", true), + "No schema defined for keys of field: \"field\""); + assertInvalidValueForSchema(fieldName, nullElements, Collections.singletonMap(true, "value"), + "No schema defined for keys of field: \"field\""); } } From 554f648b3a7d9e3d14d203684695cc4dd42bbf6d Mon Sep 17 00:00:00 2001 From: Greg Harris Date: Tue, 4 Jun 2024 13:32:20 -0700 Subject: [PATCH 3/3] fixup: change exception message field name context to generic location Signed-off-by: Greg Harris --- .../kafka/connect/data/ConnectSchema.java | 33 ++++--- .../kafka/connect/data/ConnectSchemaTest.java | 93 +++++++++++-------- 2 files changed, 75 insertions(+), 51 deletions(-) diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java b/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java index 5a8779bbf9f93..cf5f01502c83b 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java @@ -213,11 +213,15 @@ public static void validateValue(Schema schema, Object value) { validateValue(null, schema, value); } - public static void validateValue(String name, Schema schema, Object value) { + public static void validateValue(String field, Schema schema, Object value) { + validateValue(schema, value, field == null ? "value" : "field: \"" + field + "\""); + } + + private static void validateValue(Schema schema, Object value, String location) { if (value == null) { if (!schema.isOptional()) - throw new DataException("Invalid value: null used for required field: \"" + name - + "\", schema type: " + schema.type()); + throw new DataException("Invalid value: null used for required " + location + + ", schema type: " + schema.type()); return; } @@ -236,8 +240,8 @@ public static void validateValue(String name, Schema schema, Object value) { exceptionMessage.append(" \"").append(schema.name()).append("\""); } exceptionMessage.append(" with type ").append(schema.type()).append(": ").append(value.getClass()); - if (name != null) { - exceptionMessage.append(" for field: \"").append(name).append("\""); + if (location != null) { + exceptionMessage.append(" for ").append(location); } throw new DataException(exceptionMessage.toString()); } @@ -251,26 +255,29 @@ public static void validateValue(String name, Schema schema, Object value) { break; case ARRAY: List array = (List) value; - Schema arrayValueSchema = assertSchemaNotNull(name, "elements", schema.valueSchema()); + String entryLocation = "element of array " + location; + Schema arrayValueSchema = assertSchemaNotNull(schema.valueSchema(), entryLocation); for (Object entry : array) { - validateValue("entry", arrayValueSchema, entry); + validateValue(arrayValueSchema, entry, entryLocation); } break; case MAP: Map map = (Map) value; - Schema mapKeySchema = assertSchemaNotNull(name, "keys", schema.keySchema()); - Schema mapValueSchema = assertSchemaNotNull(name, "values", schema.valueSchema()); + String keyLocation = "key of map " + location; + String valueLocation = "value of map " + location; + Schema mapKeySchema = assertSchemaNotNull(schema.keySchema(), keyLocation); + Schema mapValueSchema = assertSchemaNotNull(schema.valueSchema(), valueLocation); for (Map.Entry entry : map.entrySet()) { - validateValue("key", mapKeySchema, entry.getKey()); - validateValue("value", mapValueSchema, entry.getValue()); + validateValue(mapKeySchema, entry.getKey(), keyLocation); + validateValue(mapValueSchema, entry.getValue(), valueLocation); } break; } } - private static Schema assertSchemaNotNull(String fieldName, String innerName, Schema schema) { + private static Schema assertSchemaNotNull(Schema schema, String location) { if (schema == null) { - throw new DataException("No schema defined for " + innerName + " of field: \"" + fieldName + "\""); + throw new DataException("No schema defined for " + location); } return schema; } diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/ConnectSchemaTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/ConnectSchemaTest.java index 2c4f63dac8739..43c2342fe3b41 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/data/ConnectSchemaTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/ConnectSchemaTest.java @@ -340,8 +340,8 @@ public void testValidateFieldWithInvalidValueType() { String fieldName = "field"; assertInvalidValueForSchema(fieldName, new FakeSchema(), new Object(), "Invalid Java object for schema \"fake\" with type null: class java.lang.Object for field: \"field\""); - assertInvalidValueForSchema(fieldName, Schema.INT8_SCHEMA, new Object(), - "Invalid Java object for schema with type INT8: class java.lang.Object for field: \"field\""); + assertInvalidValueForSchema(null, Schema.INT8_SCHEMA, new Object(), + "Invalid Java object for schema with type INT8: class java.lang.Object for value"); assertInvalidValueForSchema(fieldName, Schema.INT8_SCHEMA, new Object(), "Invalid Java object for schema with type INT8: class java.lang.Object for field: \"field\""); } @@ -371,38 +371,38 @@ public void testValidateList() { ConnectSchema.validateValue(fieldName, optionalStrings, Arrays.asList("hello", null)); ConnectSchema.validateValue(fieldName, optionalStrings, Arrays.asList(null, "world")); assertInvalidValueForSchema(fieldName, optionalStrings, Collections.singletonList(true), - "Invalid Java object for schema with type STRING: class java.lang.Boolean for field: \"entry\""); + "Invalid Java object for schema with type STRING: class java.lang.Boolean for element of array field: \"field\""); // Required element schema Schema requiredStrings = SchemaBuilder.array(Schema.STRING_SCHEMA); ConnectSchema.validateValue(fieldName, requiredStrings, Collections.emptyList()); ConnectSchema.validateValue(fieldName, requiredStrings, Collections.singletonList("hello")); assertInvalidValueForSchema(fieldName, requiredStrings, Collections.singletonList(null), - "Invalid value: null used for required field: \"entry\", schema type: STRING"); + "Invalid value: null used for required element of array field: \"field\", schema type: STRING"); ConnectSchema.validateValue(fieldName, requiredStrings, Arrays.asList("hello", "world")); assertInvalidValueForSchema(fieldName, requiredStrings, Arrays.asList("hello", null), - "Invalid value: null used for required field: \"entry\", schema type: STRING"); + "Invalid value: null used for required element of array field: \"field\", schema type: STRING"); assertInvalidValueForSchema(fieldName, requiredStrings, Arrays.asList(null, "world"), - "Invalid value: null used for required field: \"entry\", schema type: STRING"); + "Invalid value: null used for required element of array field: \"field\", schema type: STRING"); assertInvalidValueForSchema(fieldName, optionalStrings, Collections.singletonList(true), - "Invalid Java object for schema with type STRING: class java.lang.Boolean for field: \"entry\""); + "Invalid Java object for schema with type STRING: class java.lang.Boolean for element of array field: \"field\""); // Null element schema Schema nullElements = SchemaBuilder.type(Schema.Type.ARRAY); assertInvalidValueForSchema(fieldName, nullElements, Collections.emptyList(), - "No schema defined for elements of field: \"field\""); + "No schema defined for element of array field: \"field\""); assertInvalidValueForSchema(fieldName, nullElements, Collections.singletonList("hello"), - "No schema defined for elements of field: \"field\""); + "No schema defined for element of array field: \"field\""); assertInvalidValueForSchema(fieldName, nullElements, Collections.singletonList(null), - "No schema defined for elements of field: \"field\""); + "No schema defined for element of array field: \"field\""); assertInvalidValueForSchema(fieldName, nullElements, Arrays.asList("hello", "world"), - "No schema defined for elements of field: \"field\""); + "No schema defined for element of array field: \"field\""); assertInvalidValueForSchema(fieldName, nullElements, Arrays.asList("hello", null), - "No schema defined for elements of field: \"field\""); + "No schema defined for element of array field: \"field\""); assertInvalidValueForSchema(fieldName, nullElements, Arrays.asList(null, "world"), - "No schema defined for elements of field: \"field\""); + "No schema defined for element of array field: \"field\""); assertInvalidValueForSchema(fieldName, nullElements, Collections.singletonList(true), - "No schema defined for elements of field: \"field\""); + "No schema defined for element of array field: \"field\""); } @Test @@ -417,40 +417,57 @@ public void testValidateMap() { ConnectSchema.validateValue(fieldName, optionalStrings, Collections.singletonMap(null, "value")); ConnectSchema.validateValue(fieldName, optionalStrings, Collections.singletonMap(null, null)); assertInvalidValueForSchema(fieldName, optionalStrings, Collections.singletonMap("key", true), - "Invalid Java object for schema with type STRING: class java.lang.Boolean for field: \"value\""); + "Invalid Java object for schema with type STRING: class java.lang.Boolean for value of map field: \"field\""); assertInvalidValueForSchema(fieldName, optionalStrings, Collections.singletonMap(true, "value"), - "Invalid Java object for schema with type STRING: class java.lang.Boolean for field: \"key\""); + "Invalid Java object for schema with type STRING: class java.lang.Boolean for key of map field: \"field\""); // Required element schema Schema requiredStrings = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA); ConnectSchema.validateValue(fieldName, requiredStrings, Collections.emptyMap()); ConnectSchema.validateValue(fieldName, requiredStrings, Collections.singletonMap("key", "value")); assertInvalidValueForSchema(fieldName, requiredStrings, Collections.singletonMap("key", null), - "Invalid value: null used for required field: \"value\", schema type: STRING"); + "Invalid value: null used for required value of map field: \"field\", schema type: STRING"); assertInvalidValueForSchema(fieldName, requiredStrings, Collections.singletonMap(null, "value"), - "Invalid value: null used for required field: \"key\", schema type: STRING"); + "Invalid value: null used for required key of map field: \"field\", schema type: STRING"); assertInvalidValueForSchema(fieldName, requiredStrings, Collections.singletonMap(null, null), - "Invalid value: null used for required field: \"key\", schema type: STRING"); + "Invalid value: null used for required key of map field: \"field\", schema type: STRING"); assertInvalidValueForSchema(fieldName, requiredStrings, Collections.singletonMap("key", true), - "Invalid Java object for schema with type STRING: class java.lang.Boolean for field: \"value\""); + "Invalid Java object for schema with type STRING: class java.lang.Boolean for value of map field: \"field\""); assertInvalidValueForSchema(fieldName, requiredStrings, Collections.singletonMap(true, "value"), - "Invalid Java object for schema with type STRING: class java.lang.Boolean for field: \"key\""); - - // Null element schema - Schema nullElements = SchemaBuilder.type(Schema.Type.MAP); - assertInvalidValueForSchema(fieldName, nullElements, Collections.emptyMap(), - "No schema defined for keys of field: \"field\""); - assertInvalidValueForSchema(fieldName, nullElements, Collections.singletonMap("key", "value"), - "No schema defined for keys of field: \"field\""); - assertInvalidValueForSchema(fieldName, nullElements, Collections.singletonMap("key", null), - "No schema defined for keys of field: \"field\""); - assertInvalidValueForSchema(fieldName, nullElements, Collections.singletonMap(null, "value"), - "No schema defined for keys of field: \"field\""); - assertInvalidValueForSchema(fieldName, nullElements, Collections.singletonMap(null, null), - "No schema defined for keys of field: \"field\""); - assertInvalidValueForSchema(fieldName, nullElements, Collections.singletonMap("key", true), - "No schema defined for keys of field: \"field\""); - assertInvalidValueForSchema(fieldName, nullElements, Collections.singletonMap(true, "value"), - "No schema defined for keys of field: \"field\""); + "Invalid Java object for schema with type STRING: class java.lang.Boolean for key of map field: \"field\""); + + // Null key schema + Schema nullKeys = SchemaBuilder.type(Schema.Type.MAP); + assertInvalidValueForSchema(fieldName, nullKeys, Collections.emptyMap(), + "No schema defined for key of map field: \"field\""); + assertInvalidValueForSchema(fieldName, nullKeys, Collections.singletonMap("key", "value"), + "No schema defined for key of map field: \"field\""); + assertInvalidValueForSchema(fieldName, nullKeys, Collections.singletonMap("key", null), + "No schema defined for key of map field: \"field\""); + assertInvalidValueForSchema(fieldName, nullKeys, Collections.singletonMap(null, "value"), + "No schema defined for key of map field: \"field\""); + assertInvalidValueForSchema(fieldName, nullKeys, Collections.singletonMap(null, null), + "No schema defined for key of map field: \"field\""); + assertInvalidValueForSchema(fieldName, nullKeys, Collections.singletonMap("key", true), + "No schema defined for key of map field: \"field\""); + assertInvalidValueForSchema(fieldName, nullKeys, Collections.singletonMap(true, "value"), + "No schema defined for key of map field: \"field\""); + + // Null value schema + Schema nullValues = SchemaBuilder.mapWithNullValues(Schema.OPTIONAL_STRING_SCHEMA); + assertInvalidValueForSchema(fieldName, nullValues, Collections.emptyMap(), + "No schema defined for value of map field: \"field\""); + assertInvalidValueForSchema(fieldName, nullValues, Collections.singletonMap("key", "value"), + "No schema defined for value of map field: \"field\""); + assertInvalidValueForSchema(fieldName, nullValues, Collections.singletonMap("key", null), + "No schema defined for value of map field: \"field\""); + assertInvalidValueForSchema(fieldName, nullValues, Collections.singletonMap(null, "value"), + "No schema defined for value of map field: \"field\""); + assertInvalidValueForSchema(fieldName, nullValues, Collections.singletonMap(null, null), + "No schema defined for value of map field: \"field\""); + assertInvalidValueForSchema(fieldName, nullValues, Collections.singletonMap("key", true), + "No schema defined for value of map field: \"field\""); + assertInvalidValueForSchema(fieldName, nullValues, Collections.singletonMap(true, "value"), + "No schema defined for value of map field: \"field\""); } }