Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -213,11 +213,15 @@ public static void validateValue(Schema schema, Object value) {
validateValue(null, schema, value);
}

public static void validateValue(String name, Schema schema, Object value) {
public static void validateValue(String field, Schema schema, Object value) {
validateValue(schema, value, field == null ? "value" : "field: \"" + field + "\"");
}

private static void validateValue(Schema schema, Object value, String location) {
if (value == null) {
if (!schema.isOptional())
throw new DataException("Invalid value: null used for required field: \"" + name
+ "\", schema type: " + schema.type());
throw new DataException("Invalid value: null used for required " + location
+ ", schema type: " + schema.type());
return;
}

Expand All @@ -236,8 +240,8 @@ public static void validateValue(String name, Schema schema, Object value) {
exceptionMessage.append(" \"").append(schema.name()).append("\"");
}
exceptionMessage.append(" with type ").append(schema.type()).append(": ").append(value.getClass());
if (name != null) {
exceptionMessage.append(" for field: \"").append(name).append("\"");
if (location != null) {
exceptionMessage.append(" for ").append(location);
}
throw new DataException(exceptionMessage.toString());
}
Expand All @@ -251,19 +255,33 @@ public static void validateValue(String name, Schema schema, Object value) {
break;
case ARRAY:
List<?> array = (List<?>) value;
for (Object entry : array)
validateValue(schema.valueSchema(), entry);
String entryLocation = "element of array " + location;
Schema arrayValueSchema = assertSchemaNotNull(schema.valueSchema(), entryLocation);
for (Object entry : array) {
validateValue(arrayValueSchema, entry, entryLocation);
}
break;
case MAP:
Map<?, ?> map = (Map<?, ?>) value;
String keyLocation = "key of map " + location;
String valueLocation = "value of map " + location;
Schema mapKeySchema = assertSchemaNotNull(schema.keySchema(), keyLocation);
Schema mapValueSchema = assertSchemaNotNull(schema.valueSchema(), valueLocation);
for (Map.Entry<?, ?> entry : map.entrySet()) {
validateValue(schema.keySchema(), entry.getKey());
validateValue(schema.valueSchema(), entry.getValue());
validateValue(mapKeySchema, entry.getKey(), keyLocation);
validateValue(mapValueSchema, entry.getValue(), valueLocation);
}
break;
}
}

private static Schema assertSchemaNotNull(Schema schema, String location) {
if (schema == null) {
throw new DataException("No schema defined for " + location);
}
return schema;
}

private static List<Class<?>> expectedClassesFor(Schema schema) {
List<Class<?>> expectedClasses = LOGICAL_TYPE_CLASSES.get(schema.name());
if (expectedClasses == null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -330,4 +330,144 @@ public void testEmptyStruct() {
new Struct(emptyStruct);
}

private void assertInvalidValueForSchema(String fieldName, Schema schema, Object value, String message) {
Exception e = assertThrows(DataException.class, () -> ConnectSchema.validateValue(fieldName, schema, value));
assertEquals(message, e.getMessage());
}

@Test
public void testValidateFieldWithInvalidValueType() {
String fieldName = "field";
assertInvalidValueForSchema(fieldName, new FakeSchema(), new Object(),
"Invalid Java object for schema \"fake\" with type null: class java.lang.Object for field: \"field\"");
assertInvalidValueForSchema(null, Schema.INT8_SCHEMA, new Object(),
"Invalid Java object for schema with type INT8: class java.lang.Object for value");
assertInvalidValueForSchema(fieldName, Schema.INT8_SCHEMA, new Object(),
"Invalid Java object for schema with type INT8: class java.lang.Object for field: \"field\"");
}

@Test
public void testValidateFieldWithInvalidValueMismatchTimestamp() {
long longValue = 1000L;
String fieldName = "field";

ConnectSchema.validateValue(fieldName, Schema.INT64_SCHEMA, longValue);

assertInvalidValueForSchema(fieldName, Timestamp.SCHEMA, longValue,
"Invalid Java object for schema \"org.apache.kafka.connect.data.Timestamp\" " +
"with type INT64: class java.lang.Long for field: \"field\"");
}

@Test
public void testValidateList() {
String fieldName = "field";

// Optional element schema
Schema optionalStrings = SchemaBuilder.array(Schema.OPTIONAL_STRING_SCHEMA);
ConnectSchema.validateValue(fieldName, optionalStrings, Collections.emptyList());
ConnectSchema.validateValue(fieldName, optionalStrings, Collections.singletonList("hello"));
ConnectSchema.validateValue(fieldName, optionalStrings, Collections.singletonList(null));
ConnectSchema.validateValue(fieldName, optionalStrings, Arrays.asList("hello", "world"));
ConnectSchema.validateValue(fieldName, optionalStrings, Arrays.asList("hello", null));
ConnectSchema.validateValue(fieldName, optionalStrings, Arrays.asList(null, "world"));
assertInvalidValueForSchema(fieldName, optionalStrings, Collections.singletonList(true),
"Invalid Java object for schema with type STRING: class java.lang.Boolean for element of array field: \"field\"");

// Required element schema
Schema requiredStrings = SchemaBuilder.array(Schema.STRING_SCHEMA);
ConnectSchema.validateValue(fieldName, requiredStrings, Collections.emptyList());
ConnectSchema.validateValue(fieldName, requiredStrings, Collections.singletonList("hello"));
assertInvalidValueForSchema(fieldName, requiredStrings, Collections.singletonList(null),
"Invalid value: null used for required element of array field: \"field\", schema type: STRING");
ConnectSchema.validateValue(fieldName, requiredStrings, Arrays.asList("hello", "world"));
assertInvalidValueForSchema(fieldName, requiredStrings, Arrays.asList("hello", null),
"Invalid value: null used for required element of array field: \"field\", schema type: STRING");
assertInvalidValueForSchema(fieldName, requiredStrings, Arrays.asList(null, "world"),
"Invalid value: null used for required element of array field: \"field\", schema type: STRING");
assertInvalidValueForSchema(fieldName, optionalStrings, Collections.singletonList(true),
"Invalid Java object for schema with type STRING: class java.lang.Boolean for element of array field: \"field\"");

// Null element schema
Schema nullElements = SchemaBuilder.type(Schema.Type.ARRAY);
assertInvalidValueForSchema(fieldName, nullElements, Collections.emptyList(),
"No schema defined for element of array field: \"field\"");
assertInvalidValueForSchema(fieldName, nullElements, Collections.singletonList("hello"),
"No schema defined for element of array field: \"field\"");
assertInvalidValueForSchema(fieldName, nullElements, Collections.singletonList(null),
"No schema defined for element of array field: \"field\"");
assertInvalidValueForSchema(fieldName, nullElements, Arrays.asList("hello", "world"),
"No schema defined for element of array field: \"field\"");
assertInvalidValueForSchema(fieldName, nullElements, Arrays.asList("hello", null),
"No schema defined for element of array field: \"field\"");
assertInvalidValueForSchema(fieldName, nullElements, Arrays.asList(null, "world"),
"No schema defined for element of array field: \"field\"");
assertInvalidValueForSchema(fieldName, nullElements, Collections.singletonList(true),
"No schema defined for element of array field: \"field\"");
}

@Test
public void testValidateMap() {
String fieldName = "field";

// Optional element schema
Schema optionalStrings = SchemaBuilder.map(Schema.OPTIONAL_STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA);
ConnectSchema.validateValue(fieldName, optionalStrings, Collections.emptyMap());
ConnectSchema.validateValue(fieldName, optionalStrings, Collections.singletonMap("key", "value"));
ConnectSchema.validateValue(fieldName, optionalStrings, Collections.singletonMap("key", null));
ConnectSchema.validateValue(fieldName, optionalStrings, Collections.singletonMap(null, "value"));
ConnectSchema.validateValue(fieldName, optionalStrings, Collections.singletonMap(null, null));
assertInvalidValueForSchema(fieldName, optionalStrings, Collections.singletonMap("key", true),
"Invalid Java object for schema with type STRING: class java.lang.Boolean for value of map field: \"field\"");
assertInvalidValueForSchema(fieldName, optionalStrings, Collections.singletonMap(true, "value"),
"Invalid Java object for schema with type STRING: class java.lang.Boolean for key of map field: \"field\"");

// Required element schema
Schema requiredStrings = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA);
ConnectSchema.validateValue(fieldName, requiredStrings, Collections.emptyMap());
ConnectSchema.validateValue(fieldName, requiredStrings, Collections.singletonMap("key", "value"));
assertInvalidValueForSchema(fieldName, requiredStrings, Collections.singletonMap("key", null),
"Invalid value: null used for required value of map field: \"field\", schema type: STRING");
assertInvalidValueForSchema(fieldName, requiredStrings, Collections.singletonMap(null, "value"),
"Invalid value: null used for required key of map field: \"field\", schema type: STRING");
assertInvalidValueForSchema(fieldName, requiredStrings, Collections.singletonMap(null, null),
"Invalid value: null used for required key of map field: \"field\", schema type: STRING");
assertInvalidValueForSchema(fieldName, requiredStrings, Collections.singletonMap("key", true),
"Invalid Java object for schema with type STRING: class java.lang.Boolean for value of map field: \"field\"");
assertInvalidValueForSchema(fieldName, requiredStrings, Collections.singletonMap(true, "value"),
"Invalid Java object for schema with type STRING: class java.lang.Boolean for key of map field: \"field\"");

// Null key schema
Schema nullKeys = SchemaBuilder.type(Schema.Type.MAP);
assertInvalidValueForSchema(fieldName, nullKeys, Collections.emptyMap(),
"No schema defined for key of map field: \"field\"");
assertInvalidValueForSchema(fieldName, nullKeys, Collections.singletonMap("key", "value"),
"No schema defined for key of map field: \"field\"");
assertInvalidValueForSchema(fieldName, nullKeys, Collections.singletonMap("key", null),
"No schema defined for key of map field: \"field\"");
assertInvalidValueForSchema(fieldName, nullKeys, Collections.singletonMap(null, "value"),
"No schema defined for key of map field: \"field\"");
assertInvalidValueForSchema(fieldName, nullKeys, Collections.singletonMap(null, null),
"No schema defined for key of map field: \"field\"");
assertInvalidValueForSchema(fieldName, nullKeys, Collections.singletonMap("key", true),
"No schema defined for key of map field: \"field\"");
assertInvalidValueForSchema(fieldName, nullKeys, Collections.singletonMap(true, "value"),
"No schema defined for key of map field: \"field\"");

// Null value schema
Schema nullValues = SchemaBuilder.mapWithNullValues(Schema.OPTIONAL_STRING_SCHEMA);
assertInvalidValueForSchema(fieldName, nullValues, Collections.emptyMap(),
"No schema defined for value of map field: \"field\"");
assertInvalidValueForSchema(fieldName, nullValues, Collections.singletonMap("key", "value"),
"No schema defined for value of map field: \"field\"");
assertInvalidValueForSchema(fieldName, nullValues, Collections.singletonMap("key", null),
"No schema defined for value of map field: \"field\"");
assertInvalidValueForSchema(fieldName, nullValues, Collections.singletonMap(null, "value"),
"No schema defined for value of map field: \"field\"");
assertInvalidValueForSchema(fieldName, nullValues, Collections.singletonMap(null, null),
"No schema defined for value of map field: \"field\"");
assertInvalidValueForSchema(fieldName, nullValues, Collections.singletonMap("key", true),
"No schema defined for value of map field: \"field\"");
assertInvalidValueForSchema(fieldName, nullValues, Collections.singletonMap(true, "value"),
"No schema defined for value of map field: \"field\"");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -304,39 +304,6 @@ public void testValidateStructWithNullValue() {
e.getMessage());
}

@Test
public void testValidateFieldWithInvalidValueType() {
String fieldName = "field";
FakeSchema fakeSchema = new FakeSchema();

Exception e = assertThrows(DataException.class, () -> ConnectSchema.validateValue(fieldName,
fakeSchema, new Object()));
assertEquals("Invalid Java object for schema \"fake\" with type null: class java.lang.Object for field: \"field\"",
e.getMessage());

e = assertThrows(DataException.class, () -> ConnectSchema.validateValue(fieldName,
Schema.INT8_SCHEMA, new Object()));
assertEquals("Invalid Java object for schema with type INT8: class java.lang.Object for field: \"field\"",
e.getMessage());

e = assertThrows(DataException.class, () -> ConnectSchema.validateValue(Schema.INT8_SCHEMA, new Object()));
assertEquals("Invalid Java object for schema with type INT8: class java.lang.Object", e.getMessage());
}

@Test
public void testValidateFieldWithInvalidValueMismatchTimestamp() {
String fieldName = "field";
long longValue = 1000L;

// Does not throw
ConnectSchema.validateValue(fieldName, Schema.INT64_SCHEMA, longValue);

Exception e = assertThrows(DataException.class, () -> ConnectSchema.validateValue(fieldName,
Timestamp.SCHEMA, longValue));
assertEquals("Invalid Java object for schema \"org.apache.kafka.connect.data.Timestamp\" " +
"with type INT64: class java.lang.Long for field: \"field\"", e.getMessage());
}

@Test
public void testPutNullField() {
final String fieldName = "fieldName";
Expand Down