diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java b/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java index d1fd9cdaa7aa6..e052534a7a955 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java @@ -207,9 +207,14 @@ public Schema valueSchema() { * @param value value to test */ public static void validateValue(Schema schema, Object value) { + validateValue(null, schema, value); + } + + public static void validateValue(String name, Schema schema, Object value) { if (value == null) { if (!schema.isOptional()) - throw new DataException("Invalid value: null used for required field"); + throw new DataException("Invalid value: null used for required field: \"" + name + + "\", schema type: " + schema.type()); else return; } @@ -220,7 +225,9 @@ public static void validateValue(Schema schema, Object value) { expectedClasses = SCHEMA_TYPE_CLASSES.get(schema.type()); if (expectedClasses == null) - throw new DataException("Invalid Java object for schema type " + schema.type() + ": " + value.getClass()); + throw new DataException("Invalid Java object for schema type " + schema.type() + + ": " + value.getClass() + + " for field: \"" + name + "\""); boolean foundMatch = false; for (Class expectedClass : expectedClasses) { @@ -230,7 +237,9 @@ public static void validateValue(Schema schema, Object value) { } } if (!foundMatch) - throw new DataException("Invalid Java object for schema type " + schema.type() + ": " + value.getClass()); + throw new DataException("Invalid Java object for schema type " + schema.type() + + ": " + value.getClass() + + " for field: \"" + name + "\""); switch (schema.type()) { case STRUCT: diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java b/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java index 698c6ee8cb895..200a1c00a9df5 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java @@ -229,7 +229,7 @@ public void validate() { Object value = values[field.index()]; if (value == null && (fieldSchema.isOptional() || fieldSchema.defaultValue() != null)) continue; - ConnectSchema.validateValue(fieldSchema, value); + ConnectSchema.validateValue(field.name(), fieldSchema, value); } } diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/FakeSchema.java b/connect/api/src/test/java/org/apache/kafka/connect/data/FakeSchema.java new file mode 100644 index 0000000000000..ff2e24f6a2b01 --- /dev/null +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/FakeSchema.java @@ -0,0 +1,83 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.data; + +import java.util.List; +import java.util.Map; + +public class FakeSchema implements Schema { + @Override + public Type type() { + return null; + } + + @Override + public boolean isOptional() { + return false; + } + + @Override + public Object defaultValue() { + return null; + } + + @Override + public String name() { + return "fake"; + } + + @Override + public Integer version() { + return null; + } + + @Override + public String doc() { + return null; + } + + @Override + public Map parameters() { + return null; + } + + @Override + public Schema keySchema() { + return null; + } + + @Override + public Schema valueSchema() { + return null; + } + + @Override + public List fields() { + return null; + } + + @Override + public Field field(String fieldName) { + return null; + } + + @Override + public Schema schema() { + return null; + } +} diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java index 11c9fb08fc3b0..82f6d89a90f68 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java @@ -18,7 +18,9 @@ package org.apache.kafka.connect.data; import org.apache.kafka.connect.errors.DataException; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import java.nio.ByteBuffer; import java.util.Arrays; @@ -234,4 +236,35 @@ public void testEquals() { assertEquals(struct1, struct2); assertNotEquals(struct1, struct3); } + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testValidateStructWithNullValue() { + Schema schema = SchemaBuilder.struct() + .field("one", Schema.STRING_SCHEMA) + .field("two", Schema.STRING_SCHEMA) + .field("three", Schema.STRING_SCHEMA) + .build(); + + Struct struct = new Struct(schema); + thrown.expect(DataException.class); + thrown.expectMessage("Invalid value: null used for required field: \"one\", schema type: STRING"); + struct.validate(); + } + + @Test + public void testValidateFieldWithInvalidValueType() { + String fieldName = "field"; + FakeSchema fakeSchema = new FakeSchema(); + + thrown.expect(DataException.class); + thrown.expectMessage("Invalid Java object for schema type null: class java.lang.Object for field: \"field\""); + ConnectSchema.validateValue(fieldName, fakeSchema, new Object()); + + thrown.expect(DataException.class); + thrown.expectMessage("Invalid Java object for schema type INT8: class java.lang.Object for field: \"field\""); + ConnectSchema.validateValue(fieldName, Schema.INT8_SCHEMA, new Object()); + } }