Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.kafka.copycat.errors.DataException;

import java.math.BigDecimal;
import java.nio.ByteBuffer;
import java.util.*;

Expand All @@ -27,6 +28,10 @@ public class CopycatSchema implements Schema {
* Maps Schema.Types to a list of Java classes that can be used to represent them.
*/
private static final Map<Type, List<Class>> SCHEMA_TYPE_CLASSES = new HashMap<>();
/**
* Maps known logical types to a list of Java classes that can be used to represent them.
*/
private static final Map<String, List<Class>> LOGICAL_TYPE_CLASSES = new HashMap<>();

/**
* Maps the Java classes to the corresponding Schema.Type.
Expand Down Expand Up @@ -54,6 +59,14 @@ public class CopycatSchema implements Schema {
for (Class<?> schemaClass : schemaClasses.getValue())
JAVA_CLASS_SCHEMA_TYPES.put(schemaClass, schemaClasses.getKey());
}

LOGICAL_TYPE_CLASSES.put(Decimal.LOGICAL_NAME, Arrays.asList((Class) BigDecimal.class));
LOGICAL_TYPE_CLASSES.put(Date.LOGICAL_NAME, Arrays.asList((Class) java.util.Date.class));
LOGICAL_TYPE_CLASSES.put(Time.LOGICAL_NAME, Arrays.asList((Class) java.util.Date.class));
LOGICAL_TYPE_CLASSES.put(Timestamp.LOGICAL_NAME, Arrays.asList((Class) java.util.Date.class));
// We don't need to put these into JAVA_CLASS_SCHEMA_TYPES since that's only used to determine schemas for
// schemaless data and logical types will have ambiguous schemas (e.g. many of them use the same Java class) so
// they should not be used without schemas.
}

// The type of the field
Expand Down Expand Up @@ -195,7 +208,11 @@ public static void validateValue(Schema schema, Object value) {
return;
}

final List<Class> expectedClasses = SCHEMA_TYPE_CLASSES.get(schema.type());
List<Class> expectedClasses = LOGICAL_TYPE_CLASSES.get(schema.name());

if (expectedClasses == null)
expectedClasses = SCHEMA_TYPE_CLASSES.get(schema.type());

if (expectedClasses == null)
throw new DataException("Invalid Java object for schema type " + schema.type() + ": " + value.getClass());

Expand Down
77 changes: 77 additions & 0 deletions copycat/api/src/main/java/org/apache/kafka/copycat/data/Time.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/

package org.apache.kafka.copycat.data;

import org.apache.kafka.copycat.errors.DataException;

import java.util.Calendar;
import java.util.TimeZone;

/**
* <p>
* A time representing a specific point in a day, not tied to any specific date. The corresponding Java type is a
* java.util.Date where only hours, minutes, seconds, and milliseconds can be non-zero. This effectively makes it a
* point in time during the first day after the Unix epoch. The underlying representation is an integer
* representing the number of milliseconds after midnight.
* </p>
*/
public class Time {
public static final String LOGICAL_NAME = "org.apache.kafka.copycat.data.Time";

private static final long MILLIS_PER_DAY = 24 * 60 * 60 * 1000;

private static final TimeZone UTC = TimeZone.getTimeZone("UTC");

/**
* Returns a SchemaBuilder for a Time. By returning a SchemaBuilder you can override additional schema settings such
* as required/optional, default value, and documentation.
* @return a SchemaBuilder
*/
public static SchemaBuilder builder() {
return SchemaBuilder.int32()
.name(LOGICAL_NAME)
.version(1);
}

public static final Schema SCHEMA = builder().schema();

/**
* Convert a value from its logical format (Time) to it's encoded format.
* @param value the logical value
* @return the encoded value
*/
public static int fromLogical(Schema schema, java.util.Date value) {
if (schema.name() == null || !(schema.name().equals(LOGICAL_NAME)))
throw new DataException("Requested conversion of Time object but the schema does not match.");
Calendar calendar = Calendar.getInstance(UTC);
calendar.setTime(value);
long unixMillis = calendar.getTimeInMillis();
if (unixMillis < 0 || unixMillis > MILLIS_PER_DAY) {
throw new DataException("Copycat Time type should not have any time fields set to non-zero values.");
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the claim in the exception correct? We should have have non-zero values for some fields.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right. It should be "any date field"

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, good catch. Filed #291.

}
return (int) unixMillis;
}

public static java.util.Date toLogical(Schema schema, int value) {
if (schema.name() == null || !(schema.name().equals(LOGICAL_NAME)))
throw new DataException("Requested conversion of Date object but the schema does not match.");
if (value < 0 || value > MILLIS_PER_DAY)
throw new DataException("Time values must use number of milliseconds greater than 0 and less than 86400000");
return new java.util.Date(value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.kafka.copycat.errors.DataException;
import org.junit.Test;

import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.CharBuffer;
import java.util.Arrays;
Expand Down Expand Up @@ -97,6 +99,14 @@ public void testValidateValueMatchingType() {
CopycatSchema.validateValue(STRUCT_SCHEMA, structValue);
}

@Test
public void testValidateValueMatchingLogicalType() {
CopycatSchema.validateValue(Decimal.schema(2), new BigDecimal(new BigInteger("156"), 2));
CopycatSchema.validateValue(Date.SCHEMA, new java.util.Date(0));
CopycatSchema.validateValue(Time.SCHEMA, new java.util.Date(0));
CopycatSchema.validateValue(Timestamp.SCHEMA, new java.util.Date(0));
}

// To avoid requiring excessive numbers of tests, these checks for invalid types use a similar type where possible
// to only include a single test for each type

Expand Down Expand Up @@ -204,6 +214,25 @@ public void testValidateValueMismatchStructWrongNestedSchema() {
);
}

@Test(expected = DataException.class)
public void testValidateValueMismatchDecimal() {
CopycatSchema.validateValue(Decimal.schema(2), new BigInteger("156"));
}

@Test(expected = DataException.class)
public void testValidateValueMismatchDate() {
CopycatSchema.validateValue(Date.SCHEMA, 1000L);
}

@Test(expected = DataException.class)
public void testValidateValueMismatchTime() {
CopycatSchema.validateValue(Time.SCHEMA, 1000L);
}

@Test(expected = DataException.class)
public void testValidateValueMismatchTimestamp() {
CopycatSchema.validateValue(Timestamp.SCHEMA, 1000L);
}

@Test
public void testPrimitiveEquality() {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
**/

package org.apache.kafka.copycat.data;

import org.apache.kafka.copycat.errors.DataException;
import org.junit.Test;

import java.util.Calendar;
import java.util.GregorianCalendar;
import java.util.TimeZone;

import static org.junit.Assert.assertEquals;

public class TimeTest {
private static final GregorianCalendar EPOCH;
private static final GregorianCalendar EPOCH_PLUS_DATE_COMPONENT;
private static final GregorianCalendar EPOCH_PLUS_TEN_THOUSAND_MILLIS;
static {
EPOCH = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0);
EPOCH.setTimeZone(TimeZone.getTimeZone("UTC"));

EPOCH_PLUS_TEN_THOUSAND_MILLIS = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0);
EPOCH_PLUS_TEN_THOUSAND_MILLIS.setTimeZone(TimeZone.getTimeZone("UTC"));
EPOCH_PLUS_TEN_THOUSAND_MILLIS.add(Calendar.MILLISECOND, 10000);


EPOCH_PLUS_DATE_COMPONENT = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0);
EPOCH_PLUS_DATE_COMPONENT.setTimeZone(TimeZone.getTimeZone("UTC"));
EPOCH_PLUS_DATE_COMPONENT.add(Calendar.DATE, 10000);
}

@Test
public void testBuilder() {
Schema plain = Time.SCHEMA;
assertEquals(Time.LOGICAL_NAME, plain.name());
assertEquals(1, (Object) plain.version());
}

@Test
public void testFromLogical() {
assertEquals(0, Time.fromLogical(Time.SCHEMA, EPOCH.getTime()));
assertEquals(10000, Time.fromLogical(Time.SCHEMA, EPOCH_PLUS_TEN_THOUSAND_MILLIS.getTime()));
}

@Test(expected = DataException.class)
public void testFromLogicalInvalidSchema() {
Time.fromLogical(Time.builder().name("invalid").build(), EPOCH.getTime());
}

@Test(expected = DataException.class)
public void testFromLogicalInvalidHasDateComponents() {
Time.fromLogical(Time.SCHEMA, EPOCH_PLUS_DATE_COMPONENT.getTime());
}

@Test
public void testToLogical() {
assertEquals(EPOCH.getTime(), Time.toLogical(Time.SCHEMA, 0));
assertEquals(EPOCH_PLUS_TEN_THOUSAND_MILLIS.getTime(), Time.toLogical(Time.SCHEMA, 10000));
}

@Test(expected = DataException.class)
public void testToLogicalInvalidSchema() {
Time.toLogical(Time.builder().name("invalid").build(), 0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,15 @@ public Object convert(Schema schema, Object value) {
}
});

TO_COPYCAT_LOGICAL_CONVERTERS.put(Time.LOGICAL_NAME, new LogicalTypeConverter() {
@Override
public Object convert(Schema schema, Object value) {
if (!(value instanceof Integer))
throw new DataException("Invalid type for Time, underlying representation should be int32 but was " + value.getClass());
return Time.toLogical(schema, (int) value);
}
});

TO_COPYCAT_LOGICAL_CONVERTERS.put(Timestamp.LOGICAL_NAME, new LogicalTypeConverter() {
@Override
public Object convert(Schema schema, Object value) {
Expand Down Expand Up @@ -247,6 +256,15 @@ public Object convert(Schema schema, Object value) {
}
});

TO_JSON_LOGICAL_CONVERTERS.put(Time.LOGICAL_NAME, new LogicalTypeConverter() {
@Override
public Object convert(Schema schema, Object value) {
if (!(value instanceof java.util.Date))
throw new DataException("Invalid type for Time, expected Date but was " + value.getClass());
return Time.fromLogical(schema, (java.util.Date) value);
}
});

TO_JSON_LOGICAL_CONVERTERS.put(Timestamp.LOGICAL_NAME, new LogicalTypeConverter() {
@Override
public Object convert(Schema schema, Object value) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.kafka.copycat.data.SchemaAndValue;
import org.apache.kafka.copycat.data.SchemaBuilder;
import org.apache.kafka.copycat.data.Struct;
import org.apache.kafka.copycat.data.Time;
import org.apache.kafka.copycat.data.Timestamp;
import org.apache.kafka.copycat.errors.DataException;
import org.junit.Before;
Expand Down Expand Up @@ -229,6 +230,20 @@ public void dateToCopycat() {
assertEquals(reference, converted);
}

@Test
public void timeToCopycat() {
Schema schema = Time.SCHEMA;
GregorianCalendar calendar = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0);
calendar.setTimeZone(TimeZone.getTimeZone("UTC"));
calendar.add(Calendar.MILLISECOND, 14400000);
java.util.Date reference = calendar.getTime();
String msg = "{ \"schema\": { \"type\": \"int32\", \"name\": \"org.apache.kafka.copycat.data.Time\", \"version\": 1 }, \"payload\": 14400000 }";
SchemaAndValue schemaAndValue = converter.toCopycatData(TOPIC, msg.getBytes());
java.util.Date converted = (java.util.Date) schemaAndValue.value();
assertEquals(schema, schemaAndValue.schema());
assertEquals(reference, converted);
}

@Test
public void timestampToCopycat() {
Schema schema = Timestamp.SCHEMA;
Expand Down Expand Up @@ -454,6 +469,22 @@ public void dateToJson() throws IOException {
assertEquals(10000, payload.intValue());
}

@Test
public void timeToJson() throws IOException {
GregorianCalendar calendar = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0);
calendar.setTimeZone(TimeZone.getTimeZone("UTC"));
calendar.add(Calendar.MILLISECOND, 14400000);
java.util.Date date = calendar.getTime();

JsonNode converted = parse(converter.fromCopycatData(TOPIC, Time.SCHEMA, date));
validateEnvelope(converted);
assertEquals(parse("{ \"type\": \"int32\", \"optional\": false, \"name\": \"org.apache.kafka.copycat.data.Time\", \"version\": 1 }"),
converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
JsonNode payload = converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME);
assertTrue(payload.isInt());
assertEquals(14400000, payload.longValue());
}

@Test
public void timestampToJson() throws IOException {
GregorianCalendar calendar = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0);
Expand Down