From e4673a8ab322b9e98d4912fe656139e4fdac74fb Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Wed, 7 Oct 2015 15:03:02 -0700 Subject: [PATCH 1/2] KAFKA-2622: Add Time logical type for Copycat. --- .../org/apache/kafka/copycat/data/Time.java | 77 ++++++++++++++++++ .../apache/kafka/copycat/data/TimeTest.java | 80 +++++++++++++++++++ .../kafka/copycat/json/JsonConverter.java | 18 +++++ .../kafka/copycat/json/JsonConverterTest.java | 31 +++++++ 4 files changed, 206 insertions(+) create mode 100644 copycat/api/src/main/java/org/apache/kafka/copycat/data/Time.java create mode 100644 copycat/api/src/test/java/org/apache/kafka/copycat/data/TimeTest.java diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/Time.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Time.java new file mode 100644 index 0000000000000..76bf28c808712 --- /dev/null +++ b/copycat/api/src/main/java/org/apache/kafka/copycat/data/Time.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.data; + +import org.apache.kafka.copycat.errors.DataException; + +import java.util.Calendar; +import java.util.TimeZone; + +/** + *

+ * A time representing a specific point in a day, not tied to any specific date. The corresponding Java type is a + * java.util.Date where only hours, minutes, seconds, and milliseconds can be non-zero. This effectively makes it a + * point in time during the first day after the Unix epoch. The underlying representation is an integer + * representing the number of milliseconds after midnight. + *

+ */ +public class Time { + public static final String LOGICAL_NAME = "org.apache.kafka.copycat.data.Time"; + + private static final long MILLIS_PER_DAY = 24 * 60 * 60 * 1000; + + private static final TimeZone UTC = TimeZone.getTimeZone("UTC"); + + /** + * Returns a SchemaBuilder for a Time. By returning a SchemaBuilder you can override additional schema settings such + * as required/optional, default value, and documentation. + * @return a SchemaBuilder + */ + public static SchemaBuilder builder() { + return SchemaBuilder.int32() + .name(LOGICAL_NAME) + .version(1); + } + + public static final Schema SCHEMA = builder().schema(); + + /** + * Convert a value from its logical format (Time) to it's encoded format. + * @param value the logical value + * @return the encoded value + */ + public static int fromLogical(Schema schema, java.util.Date value) { + if (schema.name() == null || !(schema.name().equals(LOGICAL_NAME))) + throw new DataException("Requested conversion of Time object but the schema does not match."); + Calendar calendar = Calendar.getInstance(UTC); + calendar.setTime(value); + long unixMillis = calendar.getTimeInMillis(); + if (unixMillis < 0 || unixMillis > MILLIS_PER_DAY) { + throw new DataException("Copycat Time type should not have any time fields set to non-zero values."); + } + return (int) unixMillis; + } + + public static java.util.Date toLogical(Schema schema, int value) { + if (schema.name() == null || !(schema.name().equals(LOGICAL_NAME))) + throw new DataException("Requested conversion of Date object but the schema does not match."); + if (value < 0 || value > MILLIS_PER_DAY) + throw new DataException("Time values must use number of milliseconds greater than 0 and less than 86400000"); + return new java.util.Date(value); + } +} diff --git a/copycat/api/src/test/java/org/apache/kafka/copycat/data/TimeTest.java b/copycat/api/src/test/java/org/apache/kafka/copycat/data/TimeTest.java new file mode 100644 index 0000000000000..8e54cb2538272 --- /dev/null +++ b/copycat/api/src/test/java/org/apache/kafka/copycat/data/TimeTest.java @@ -0,0 +1,80 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.data; + +import org.apache.kafka.copycat.errors.DataException; +import org.junit.Test; + +import java.util.Calendar; +import java.util.GregorianCalendar; +import java.util.TimeZone; + +import static org.junit.Assert.assertEquals; + +public class TimeTest { + private static final GregorianCalendar EPOCH; + private static final GregorianCalendar EPOCH_PLUS_DATE_COMPONENT; + private static final GregorianCalendar EPOCH_PLUS_TEN_THOUSAND_MILLIS; + static { + EPOCH = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0); + EPOCH.setTimeZone(TimeZone.getTimeZone("UTC")); + + EPOCH_PLUS_TEN_THOUSAND_MILLIS = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0); + EPOCH_PLUS_TEN_THOUSAND_MILLIS.setTimeZone(TimeZone.getTimeZone("UTC")); + EPOCH_PLUS_TEN_THOUSAND_MILLIS.add(Calendar.MILLISECOND, 10000); + + + EPOCH_PLUS_DATE_COMPONENT = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0); + EPOCH_PLUS_DATE_COMPONENT.setTimeZone(TimeZone.getTimeZone("UTC")); + EPOCH_PLUS_DATE_COMPONENT.add(Calendar.DATE, 10000); + } + + @Test + public void testBuilder() { + Schema plain = Time.SCHEMA; + assertEquals(Time.LOGICAL_NAME, plain.name()); + assertEquals(1, (Object) plain.version()); + } + + @Test + public void testFromLogical() { + assertEquals(0, Time.fromLogical(Time.SCHEMA, EPOCH.getTime())); + assertEquals(10000, Time.fromLogical(Time.SCHEMA, EPOCH_PLUS_TEN_THOUSAND_MILLIS.getTime())); + } + + @Test(expected = DataException.class) + public void testFromLogicalInvalidSchema() { + Time.fromLogical(Time.builder().name("invalid").build(), EPOCH.getTime()); + } + + @Test(expected = DataException.class) + public void testFromLogicalInvalidHasDateComponents() { + Time.fromLogical(Time.SCHEMA, EPOCH_PLUS_DATE_COMPONENT.getTime()); + } + + @Test + public void testToLogical() { + assertEquals(EPOCH.getTime(), Time.toLogical(Time.SCHEMA, 0)); + assertEquals(EPOCH_PLUS_TEN_THOUSAND_MILLIS.getTime(), Time.toLogical(Time.SCHEMA, 10000)); + } + + @Test(expected = DataException.class) + public void testToLogicalInvalidSchema() { + Time.toLogical(Time.builder().name("invalid").build(), 0); + } +} diff --git a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java index 8910b276e1919..ca8f029fcba6c 100644 --- a/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java +++ b/copycat/json/src/main/java/org/apache/kafka/copycat/json/JsonConverter.java @@ -217,6 +217,15 @@ public Object convert(Schema schema, Object value) { } }); + TO_COPYCAT_LOGICAL_CONVERTERS.put(Time.LOGICAL_NAME, new LogicalTypeConverter() { + @Override + public Object convert(Schema schema, Object value) { + if (!(value instanceof Integer)) + throw new DataException("Invalid type for Time, underlying representation should be int32 but was " + value.getClass()); + return Time.toLogical(schema, (int) value); + } + }); + TO_COPYCAT_LOGICAL_CONVERTERS.put(Timestamp.LOGICAL_NAME, new LogicalTypeConverter() { @Override public Object convert(Schema schema, Object value) { @@ -247,6 +256,15 @@ public Object convert(Schema schema, Object value) { } }); + TO_JSON_LOGICAL_CONVERTERS.put(Time.LOGICAL_NAME, new LogicalTypeConverter() { + @Override + public Object convert(Schema schema, Object value) { + if (!(value instanceof java.util.Date)) + throw new DataException("Invalid type for Time, expected Date but was " + value.getClass()); + return Time.fromLogical(schema, (java.util.Date) value); + } + }); + TO_JSON_LOGICAL_CONVERTERS.put(Timestamp.LOGICAL_NAME, new LogicalTypeConverter() { @Override public Object convert(Schema schema, Object value) { diff --git a/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java b/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java index 8a8e243db9150..6b40046799ae6 100644 --- a/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java +++ b/copycat/json/src/test/java/org/apache/kafka/copycat/json/JsonConverterTest.java @@ -29,6 +29,7 @@ import org.apache.kafka.copycat.data.SchemaAndValue; import org.apache.kafka.copycat.data.SchemaBuilder; import org.apache.kafka.copycat.data.Struct; +import org.apache.kafka.copycat.data.Time; import org.apache.kafka.copycat.data.Timestamp; import org.apache.kafka.copycat.errors.DataException; import org.junit.Before; @@ -229,6 +230,20 @@ public void dateToCopycat() { assertEquals(reference, converted); } + @Test + public void timeToCopycat() { + Schema schema = Time.SCHEMA; + GregorianCalendar calendar = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0); + calendar.setTimeZone(TimeZone.getTimeZone("UTC")); + calendar.add(Calendar.MILLISECOND, 14400000); + java.util.Date reference = calendar.getTime(); + String msg = "{ \"schema\": { \"type\": \"int32\", \"name\": \"org.apache.kafka.copycat.data.Time\", \"version\": 1 }, \"payload\": 14400000 }"; + SchemaAndValue schemaAndValue = converter.toCopycatData(TOPIC, msg.getBytes()); + java.util.Date converted = (java.util.Date) schemaAndValue.value(); + assertEquals(schema, schemaAndValue.schema()); + assertEquals(reference, converted); + } + @Test public void timestampToCopycat() { Schema schema = Timestamp.SCHEMA; @@ -454,6 +469,22 @@ public void dateToJson() throws IOException { assertEquals(10000, payload.intValue()); } + @Test + public void timeToJson() throws IOException { + GregorianCalendar calendar = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0); + calendar.setTimeZone(TimeZone.getTimeZone("UTC")); + calendar.add(Calendar.MILLISECOND, 14400000); + java.util.Date date = calendar.getTime(); + + JsonNode converted = parse(converter.fromCopycatData(TOPIC, Time.SCHEMA, date)); + validateEnvelope(converted); + assertEquals(parse("{ \"type\": \"int32\", \"optional\": false, \"name\": \"org.apache.kafka.copycat.data.Time\", \"version\": 1 }"), + converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME)); + JsonNode payload = converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME); + assertTrue(payload.isInt()); + assertEquals(14400000, payload.longValue()); + } + @Test public void timestampToJson() throws IOException { GregorianCalendar calendar = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0); From 4e0891dff9fc9703553526d9d6fda626e516d36a Mon Sep 17 00:00:00 2001 From: Ewen Cheslack-Postava Date: Wed, 7 Oct 2015 15:49:59 -0700 Subject: [PATCH 2/2] Handle logical types in CopycatSchema validation. --- .../kafka/copycat/data/CopycatSchema.java | 19 +++++++++++- .../kafka/copycat/data/CopycatSchemaTest.java | 29 +++++++++++++++++++ 2 files changed, 47 insertions(+), 1 deletion(-) diff --git a/copycat/api/src/main/java/org/apache/kafka/copycat/data/CopycatSchema.java b/copycat/api/src/main/java/org/apache/kafka/copycat/data/CopycatSchema.java index 6b7771753c3a2..104abf1c99549 100644 --- a/copycat/api/src/main/java/org/apache/kafka/copycat/data/CopycatSchema.java +++ b/copycat/api/src/main/java/org/apache/kafka/copycat/data/CopycatSchema.java @@ -19,6 +19,7 @@ import org.apache.kafka.copycat.errors.DataException; +import java.math.BigDecimal; import java.nio.ByteBuffer; import java.util.*; @@ -27,6 +28,10 @@ public class CopycatSchema implements Schema { * Maps Schema.Types to a list of Java classes that can be used to represent them. */ private static final Map> SCHEMA_TYPE_CLASSES = new HashMap<>(); + /** + * Maps known logical types to a list of Java classes that can be used to represent them. + */ + private static final Map> LOGICAL_TYPE_CLASSES = new HashMap<>(); /** * Maps the Java classes to the corresponding Schema.Type. @@ -54,6 +59,14 @@ public class CopycatSchema implements Schema { for (Class schemaClass : schemaClasses.getValue()) JAVA_CLASS_SCHEMA_TYPES.put(schemaClass, schemaClasses.getKey()); } + + LOGICAL_TYPE_CLASSES.put(Decimal.LOGICAL_NAME, Arrays.asList((Class) BigDecimal.class)); + LOGICAL_TYPE_CLASSES.put(Date.LOGICAL_NAME, Arrays.asList((Class) java.util.Date.class)); + LOGICAL_TYPE_CLASSES.put(Time.LOGICAL_NAME, Arrays.asList((Class) java.util.Date.class)); + LOGICAL_TYPE_CLASSES.put(Timestamp.LOGICAL_NAME, Arrays.asList((Class) java.util.Date.class)); + // We don't need to put these into JAVA_CLASS_SCHEMA_TYPES since that's only used to determine schemas for + // schemaless data and logical types will have ambiguous schemas (e.g. many of them use the same Java class) so + // they should not be used without schemas. } // The type of the field @@ -195,7 +208,11 @@ public static void validateValue(Schema schema, Object value) { return; } - final List expectedClasses = SCHEMA_TYPE_CLASSES.get(schema.type()); + List expectedClasses = LOGICAL_TYPE_CLASSES.get(schema.name()); + + if (expectedClasses == null) + expectedClasses = SCHEMA_TYPE_CLASSES.get(schema.type()); + if (expectedClasses == null) throw new DataException("Invalid Java object for schema type " + schema.type() + ": " + value.getClass()); diff --git a/copycat/api/src/test/java/org/apache/kafka/copycat/data/CopycatSchemaTest.java b/copycat/api/src/test/java/org/apache/kafka/copycat/data/CopycatSchemaTest.java index 9e97cad243c99..49769509a21f0 100644 --- a/copycat/api/src/test/java/org/apache/kafka/copycat/data/CopycatSchemaTest.java +++ b/copycat/api/src/test/java/org/apache/kafka/copycat/data/CopycatSchemaTest.java @@ -20,6 +20,8 @@ import org.apache.kafka.copycat.errors.DataException; import org.junit.Test; +import java.math.BigDecimal; +import java.math.BigInteger; import java.nio.ByteBuffer; import java.nio.CharBuffer; import java.util.Arrays; @@ -97,6 +99,14 @@ public void testValidateValueMatchingType() { CopycatSchema.validateValue(STRUCT_SCHEMA, structValue); } + @Test + public void testValidateValueMatchingLogicalType() { + CopycatSchema.validateValue(Decimal.schema(2), new BigDecimal(new BigInteger("156"), 2)); + CopycatSchema.validateValue(Date.SCHEMA, new java.util.Date(0)); + CopycatSchema.validateValue(Time.SCHEMA, new java.util.Date(0)); + CopycatSchema.validateValue(Timestamp.SCHEMA, new java.util.Date(0)); + } + // To avoid requiring excessive numbers of tests, these checks for invalid types use a similar type where possible // to only include a single test for each type @@ -204,6 +214,25 @@ public void testValidateValueMismatchStructWrongNestedSchema() { ); } + @Test(expected = DataException.class) + public void testValidateValueMismatchDecimal() { + CopycatSchema.validateValue(Decimal.schema(2), new BigInteger("156")); + } + + @Test(expected = DataException.class) + public void testValidateValueMismatchDate() { + CopycatSchema.validateValue(Date.SCHEMA, 1000L); + } + + @Test(expected = DataException.class) + public void testValidateValueMismatchTime() { + CopycatSchema.validateValue(Time.SCHEMA, 1000L); + } + + @Test(expected = DataException.class) + public void testValidateValueMismatchTimestamp() { + CopycatSchema.validateValue(Timestamp.SCHEMA, 1000L); + } @Test public void testPrimitiveEquality() {