From dce515ebbcf4a6488121629e6d76c1c85bec653c Mon Sep 17 00:00:00 2001 From: Aegeaner Date: Thu, 9 Feb 2017 16:29:37 +0800 Subject: [PATCH 1/6] KAFKA-4709 --- .../kafka/connect/data/ConnectSchema.java | 2 +- .../org/apache/kafka/connect/data/Struct.java | 6 +++++- .../apache/kafka/connect/data/StructTest.java | 19 +++++++++++++++++++ 3 files changed, 25 insertions(+), 2 deletions(-) diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java b/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java index d1fd9cdaa7aa6..e8e32b5a322ee 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java @@ -209,7 +209,7 @@ public Schema valueSchema() { public static void validateValue(Schema schema, Object value) { if (value == null) { if (!schema.isOptional()) - throw new DataException("Invalid value: null used for required field"); + throw new DataException("Null value"); else return; } diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java b/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java index 698c6ee8cb895..e9511962c1a0b 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java @@ -229,7 +229,11 @@ public void validate() { Object value = values[field.index()]; if (value == null && (fieldSchema.isOptional() || fieldSchema.defaultValue() != null)) continue; - ConnectSchema.validateValue(fieldSchema, value); + try { + ConnectSchema.validateValue(fieldSchema, value); + } catch(DataException e) { + throw new DataException("Invalid value: null used for required field: " + field.name() + ", schema type: " + fieldSchema.type()); + } } } diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java index 11c9fb08fc3b0..a8b81a2339222 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java @@ -18,7 +18,9 @@ package org.apache.kafka.connect.data; import org.apache.kafka.connect.errors.DataException; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import java.nio.ByteBuffer; import java.util.Arrays; @@ -234,4 +236,21 @@ public void testEquals() { assertEquals(struct1, struct2); assertNotEquals(struct1, struct3); } + + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void structValidateWithNullValue() { + Schema schema = SchemaBuilder.struct() + .field("one", Schema.STRING_SCHEMA) + .field("two", Schema.STRING_SCHEMA) + .field("three", Schema.STRING_SCHEMA) + .build(); + + Struct struct = new Struct(schema); + thrown.expect(DataException.class); + thrown.expectMessage("Invalid value: null used for required field: one, schema type: STRING"); + struct.validate(); + } } From 1465cf4e46529493caf2115bddb075ae69842961 Mon Sep 17 00:00:00 2001 From: Aegeaner Date: Mon, 13 Feb 2017 10:44:53 +0800 Subject: [PATCH 2/6] keep the original exception message and add some additional field info on it --- .../main/java/org/apache/kafka/connect/data/ConnectSchema.java | 2 +- .../src/main/java/org/apache/kafka/connect/data/Struct.java | 3 ++- .../test/java/org/apache/kafka/connect/data/StructTest.java | 2 +- 3 files changed, 4 insertions(+), 3 deletions(-) diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java b/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java index e8e32b5a322ee..6a2b25645a296 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java @@ -209,7 +209,7 @@ public Schema valueSchema() { public static void validateValue(Schema schema, Object value) { if (value == null) { if (!schema.isOptional()) - throw new DataException("Null value"); + throw new DataException("Null value for schema type " + schema.type()); else return; } diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java b/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java index e9511962c1a0b..d409dea7a6e75 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java @@ -232,7 +232,8 @@ public void validate() { try { ConnectSchema.validateValue(fieldSchema, value); } catch(DataException e) { - throw new DataException("Invalid value: null used for required field: " + field.name() + ", schema type: " + fieldSchema.type()); + throw new DataException("Validate failed for required field: \"" + field.name() + "\", " + + "reason: [ " + e.getMessage() + " ].", e); } } } diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java index a8b81a2339222..8e4cc25c82e63 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java @@ -250,7 +250,7 @@ public void structValidateWithNullValue() { Struct struct = new Struct(schema); thrown.expect(DataException.class); - thrown.expectMessage("Invalid value: null used for required field: one, schema type: STRING"); + thrown.expectMessage("Validate failed for required field: \"one\", reason: [ Null value for schema type STRING ]."); struct.validate(); } } From 2380425f85e6dbda7920bf805ea42bac29b9fb57 Mon Sep 17 00:00:00 2001 From: Aegeaner Date: Mon, 13 Feb 2017 12:35:58 +0800 Subject: [PATCH 3/6] no getmessage call on original exception --- .../src/main/java/org/apache/kafka/connect/data/Struct.java | 3 +-- .../test/java/org/apache/kafka/connect/data/StructTest.java | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java b/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java index d409dea7a6e75..6e9097b15d9f1 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java @@ -232,8 +232,7 @@ public void validate() { try { ConnectSchema.validateValue(fieldSchema, value); } catch(DataException e) { - throw new DataException("Validate failed for required field: \"" + field.name() + "\", " - + "reason: [ " + e.getMessage() + " ].", e); + throw new DataException("Validate failed for required field: \"" + field.name() + "\"."); } } } diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java index 8e4cc25c82e63..60d52642481b1 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java @@ -250,7 +250,7 @@ public void structValidateWithNullValue() { Struct struct = new Struct(schema); thrown.expect(DataException.class); - thrown.expectMessage("Validate failed for required field: \"one\", reason: [ Null value for schema type STRING ]."); + thrown.expectMessage("Validate failed for required field: \"one\"."); struct.validate(); } } From 24d6ea3dfeb95f5c7650ad65e5afddb452594738 Mon Sep 17 00:00:00 2001 From: Aegeaner Date: Wed, 15 Feb 2017 11:22:21 +0800 Subject: [PATCH 4/6] change validateValue(..) signature to pass field name --- .../java/org/apache/kafka/connect/data/ConnectSchema.java | 7 ++++++- .../main/java/org/apache/kafka/connect/data/Struct.java | 6 +----- .../java/org/apache/kafka/connect/data/StructTest.java | 2 +- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java b/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java index 6a2b25645a296..4cb486a718d4b 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java @@ -207,9 +207,14 @@ public Schema valueSchema() { * @param value value to test */ public static void validateValue(Schema schema, Object value) { + validateValue(null, schema, value); + } + + public static void validateValue(String name, Schema schema, Object value) { if (value == null) { if (!schema.isOptional()) - throw new DataException("Null value for schema type " + schema.type()); + throw new DataException("Invalid value null used for required field: \"" + name + + "\", schema type: " + schema.type()); else return; } diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java b/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java index 6e9097b15d9f1..200a1c00a9df5 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Struct.java @@ -229,11 +229,7 @@ public void validate() { Object value = values[field.index()]; if (value == null && (fieldSchema.isOptional() || fieldSchema.defaultValue() != null)) continue; - try { - ConnectSchema.validateValue(fieldSchema, value); - } catch(DataException e) { - throw new DataException("Validate failed for required field: \"" + field.name() + "\"."); - } + ConnectSchema.validateValue(field.name(), fieldSchema, value); } } diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java index 60d52642481b1..2a6a96fd771e5 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java @@ -250,7 +250,7 @@ public void structValidateWithNullValue() { Struct struct = new Struct(schema); thrown.expect(DataException.class); - thrown.expectMessage("Validate failed for required field: \"one\"."); + thrown.expectMessage("Invalid value null used for required field: \"one\", schema type: STRING"); struct.validate(); } } From b6c315615313217ae559d811a93bda8ec8c632b2 Mon Sep 17 00:00:00 2001 From: Aegeaner Date: Wed, 15 Feb 2017 15:09:03 +0800 Subject: [PATCH 5/6] Add field info to all DataException message --- .../kafka/connect/data/ConnectSchema.java | 10 ++- .../apache/kafka/connect/data/FakeSchema.java | 84 +++++++++++++++++++ .../apache/kafka/connect/data/StructTest.java | 18 +++- 3 files changed, 107 insertions(+), 5 deletions(-) create mode 100644 connect/api/src/test/java/org/apache/kafka/connect/data/FakeSchema.java diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java b/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java index 4cb486a718d4b..97abb78218f2e 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java @@ -213,7 +213,7 @@ public static void validateValue(Schema schema, Object value) { public static void validateValue(String name, Schema schema, Object value) { if (value == null) { if (!schema.isOptional()) - throw new DataException("Invalid value null used for required field: \"" + name + throw new DataException("Invalid value: null used for required field: \"" + name + "\", schema type: " + schema.type()); else return; @@ -225,7 +225,9 @@ public static void validateValue(String name, Schema schema, Object value) { expectedClasses = SCHEMA_TYPE_CLASSES.get(schema.type()); if (expectedClasses == null) - throw new DataException("Invalid Java object for schema type " + schema.type() + ": " + value.getClass()); + throw new DataException("Invalid Java object for schema type " + schema.type() + + ": " + value.getClass() + + " for required field: \"" + name + "\""); boolean foundMatch = false; for (Class expectedClass : expectedClasses) { @@ -235,7 +237,9 @@ public static void validateValue(String name, Schema schema, Object value) { } } if (!foundMatch) - throw new DataException("Invalid Java object for schema type " + schema.type() + ": " + value.getClass()); + throw new DataException("Invalid Java object for schema type " + schema.type() + + ": " + value.getClass() + + " for required field: \"" + name + "\""); switch (schema.type()) { case STRUCT: diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/FakeSchema.java b/connect/api/src/test/java/org/apache/kafka/connect/data/FakeSchema.java new file mode 100644 index 0000000000000..266e36ef8f3c1 --- /dev/null +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/FakeSchema.java @@ -0,0 +1,84 @@ +package org.apache.kafka.connect.data; + +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +import java.util.List; +import java.util.Map; + +public class FakeSchema implements Schema { + @Override + public Type type() { + return null; + } + + @Override + public boolean isOptional() { + return false; + } + + @Override + public Object defaultValue() { + return null; + } + + @Override + public String name() { + return "fake"; + } + + @Override + public Integer version() { + return null; + } + + @Override + public String doc() { + return null; + } + + @Override + public Map parameters() { + return null; + } + + @Override + public Schema keySchema() { + return null; + } + + @Override + public Schema valueSchema() { + return null; + } + + @Override + public List fields() { + return null; + } + + @Override + public Field field(String fieldName) { + return null; + } + + @Override + public Schema schema() { + return null; + } +} diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java index 2a6a96fd771e5..6617b8db72097 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java @@ -241,7 +241,7 @@ public void testEquals() { public ExpectedException thrown = ExpectedException.none(); @Test - public void structValidateWithNullValue() { + public void testValidateStructWithNullValue() { Schema schema = SchemaBuilder.struct() .field("one", Schema.STRING_SCHEMA) .field("two", Schema.STRING_SCHEMA) @@ -250,7 +250,21 @@ public void structValidateWithNullValue() { Struct struct = new Struct(schema); thrown.expect(DataException.class); - thrown.expectMessage("Invalid value null used for required field: \"one\", schema type: STRING"); + thrown.expectMessage("Invalid value: null used for required field: \"one\", schema type: STRING"); struct.validate(); } + + @Test + public void testValidateFieldWithInvalidValueType() { + String fieldName = "field"; + FakeSchema fakeSchema = new FakeSchema(); + + thrown.expect(DataException.class); + thrown.expectMessage("Invalid Java object for schema type null: class java.lang.Object for required field: \"field\""); + ConnectSchema.validateValue(fieldName, fakeSchema, new Object()); + + thrown.expect(DataException.class); + thrown.expectMessage("Invalid Java object for schema type INT8: class java.lang.Object for required field: \"field\""); + ConnectSchema.validateValue(fieldName, Schema.INT8_SCHEMA, new Object()); + } } From 5fbff0665dacb4dcf15d6e15c12bc2390546763f Mon Sep 17 00:00:00 2001 From: Aegeaner Date: Thu, 16 Feb 2017 10:45:48 +0800 Subject: [PATCH 6/6] add lisence header --- .../java/org/apache/kafka/connect/data/ConnectSchema.java | 4 ++-- .../java/org/apache/kafka/connect/data/FakeSchema.java | 7 +++---- .../java/org/apache/kafka/connect/data/StructTest.java | 4 ++-- 3 files changed, 7 insertions(+), 8 deletions(-) diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java b/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java index 97abb78218f2e..e052534a7a955 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/data/ConnectSchema.java @@ -227,7 +227,7 @@ public static void validateValue(String name, Schema schema, Object value) { if (expectedClasses == null) throw new DataException("Invalid Java object for schema type " + schema.type() + ": " + value.getClass() - + " for required field: \"" + name + "\""); + + " for field: \"" + name + "\""); boolean foundMatch = false; for (Class expectedClass : expectedClasses) { @@ -239,7 +239,7 @@ public static void validateValue(String name, Schema schema, Object value) { if (!foundMatch) throw new DataException("Invalid Java object for schema type " + schema.type() + ": " + value.getClass() - + " for required field: \"" + name + "\""); + + " for field: \"" + name + "\""); switch (schema.type()) { case STRUCT: diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/FakeSchema.java b/connect/api/src/test/java/org/apache/kafka/connect/data/FakeSchema.java index 266e36ef8f3c1..ff2e24f6a2b01 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/data/FakeSchema.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/FakeSchema.java @@ -1,6 +1,4 @@ -package org.apache.kafka.connect.data; - -/* +/** * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. @@ -15,8 +13,9 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - */ + **/ +package org.apache.kafka.connect.data; import java.util.List; import java.util.Map; diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java index 6617b8db72097..82f6d89a90f68 100644 --- a/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java +++ b/connect/api/src/test/java/org/apache/kafka/connect/data/StructTest.java @@ -260,11 +260,11 @@ public void testValidateFieldWithInvalidValueType() { FakeSchema fakeSchema = new FakeSchema(); thrown.expect(DataException.class); - thrown.expectMessage("Invalid Java object for schema type null: class java.lang.Object for required field: \"field\""); + thrown.expectMessage("Invalid Java object for schema type null: class java.lang.Object for field: \"field\""); ConnectSchema.validateValue(fieldName, fakeSchema, new Object()); thrown.expect(DataException.class); - thrown.expectMessage("Invalid Java object for schema type INT8: class java.lang.Object for required field: \"field\""); + thrown.expectMessage("Invalid Java object for schema type INT8: class java.lang.Object for field: \"field\""); ConnectSchema.validateValue(fieldName, Schema.INT8_SCHEMA, new Object()); } }