diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 318384fca9c0b..c98cfab787d9f 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -189,6 +189,7 @@ + diff --git a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java index 150dac1b73cb0..58075d628927e 100644 --- a/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java +++ b/clients/src/main/java/org/apache/kafka/clients/CommonClientConfigs.java @@ -118,6 +118,26 @@ public class CommonClientConfigs { + "elapses the client will resend the request if necessary or fail the request if " + "retries are exhausted."; + public static final String DEFAULT_LIST_KEY_SERDE_INNER_CLASS = "default.list.key.serde.inner"; + public static final String DEFAULT_LIST_KEY_SERDE_INNER_CLASS_DOC = "Default inner class of list serde for key that implements the org.apache.kafka.common.serialization.Serde interface. " + + "This configuration will be read if and only if default.key.serde configuration is set to org.apache.kafka.common.serialization.Serdes.ListSerde"; + + public static final String DEFAULT_LIST_VALUE_SERDE_INNER_CLASS = "default.list.value.serde.inner"; + public static final String DEFAULT_LIST_VALUE_SERDE_INNER_CLASS_DOC = "Default inner class of list serde for value that implements the org.apache.kafka.common.serialization.Serde interface. " + + "This configuration will be read if and only if default.value.serde configuration is set to org.apache.kafka.common.serialization.Serdes.ListSerde"; + + public static final String DEFAULT_LIST_KEY_SERDE_TYPE_CLASS = "default.list.key.serde.type"; + public static final String DEFAULT_LIST_KEY_SERDE_TYPE_CLASS_DOC = "Default class for key that implements the java.util.List interface. " + + "This configuration will be read if and only if default.key.serde configuration is set to org.apache.kafka.common.serialization.Serdes.ListSerde " + + "Note when list serde class is used, one needs to set the inner serde class that implements the org.apache.kafka.common.serialization.Serde interface via '" + + DEFAULT_LIST_KEY_SERDE_INNER_CLASS + "'"; + + public static final String DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS = "default.list.value.serde.type"; + public static final String DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS_DOC = "Default class for value that implements the java.util.List interface. " + + "This configuration will be read if and only if default.value.serde configuration is set to org.apache.kafka.common.serialization.Serdes.ListSerde " + + "Note when list serde class is used, one needs to set the inner serde class that implements the org.apache.kafka.common.serialization.Serde interface via '" + + DEFAULT_LIST_VALUE_SERDE_INNER_CLASS + "'"; + public static final String GROUP_ID_CONFIG = "group.id"; public static final String GROUP_ID_DOC = "A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using subscribe(topic) or the Kafka-based offset management strategy."; diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ListDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/ListDeserializer.java new file mode 100644 index 0000000000000..272cbad52a74c --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/serialization/ListDeserializer.java @@ -0,0 +1,191 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.serialization; + +import static org.apache.kafka.common.serialization.Serdes.ListSerde.SerializationStrategy; +import static org.apache.kafka.common.utils.Utils.mkEntry; +import static org.apache.kafka.common.utils.Utils.mkMap; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.IOException; +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.serialization.Serdes.ListSerde; +import org.apache.kafka.common.utils.Utils; + +public class ListDeserializer implements Deserializer> { + + private static final Map>, Integer> FIXED_LENGTH_DESERIALIZERS = mkMap( + mkEntry(ShortDeserializer.class, Short.BYTES), + mkEntry(IntegerDeserializer.class, Integer.BYTES), + mkEntry(FloatDeserializer.class, Float.BYTES), + mkEntry(LongDeserializer.class, Long.BYTES), + mkEntry(DoubleDeserializer.class, Double.BYTES), + mkEntry(UUIDDeserializer.class, 36) + ); + + private Deserializer inner; + private Class listClass; + private Integer primitiveSize; + + public ListDeserializer() {} + + public > ListDeserializer(Class listClass, Deserializer inner) { + if (listClass == null || inner == null) { + throw new IllegalArgumentException("ListDeserializer requires both \"listClass\" and \"innerDeserializer\" parameters to be provided during initialization"); + } + this.listClass = listClass; + this.inner = inner; + this.primitiveSize = FIXED_LENGTH_DESERIALIZERS.get(inner.getClass()); + } + + public Deserializer innerDeserializer() { + return inner; + } + + @Override + public void configure(Map configs, boolean isKey) { + if (listClass != null || inner != null) { + throw new ConfigException("List deserializer was already initialized using a non-default constructor"); + } + configureListClass(configs, isKey); + configureInnerSerde(configs, isKey); + } + + private void configureListClass(Map configs, boolean isKey) { + String listTypePropertyName = isKey ? CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_TYPE_CLASS : CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS; + final Object listClassOrName = configs.get(listTypePropertyName); + if (listClassOrName == null) { + throw new ConfigException("Not able to determine the list class because it was neither passed via the constructor nor set in the config."); + } + try { + if (listClassOrName instanceof String) { + listClass = Utils.loadClass((String) listClassOrName, Object.class); + } else if (listClassOrName instanceof Class) { + listClass = (Class) listClassOrName; + } else { + throw new KafkaException("Could not determine the list class instance using \"" + listTypePropertyName + "\" property."); + } + } catch (final ClassNotFoundException e) { + throw new ConfigException(listTypePropertyName, listClassOrName, "Deserializer's list class \"" + listClassOrName + "\" could not be found."); + } + } + + @SuppressWarnings("unchecked") + private void configureInnerSerde(Map configs, boolean isKey) { + String innerSerdePropertyName = isKey ? CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS : CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS; + final Object innerSerdeClassOrName = configs.get(innerSerdePropertyName); + if (innerSerdeClassOrName == null) { + throw new ConfigException("Not able to determine the inner serde class because it was neither passed via the constructor nor set in the config."); + } + try { + if (innerSerdeClassOrName instanceof String) { + inner = Utils.newInstance((String) innerSerdeClassOrName, Serde.class).deserializer(); + } else if (innerSerdeClassOrName instanceof Class) { + inner = (Deserializer) ((Serde) Utils.newInstance((Class) innerSerdeClassOrName)).deserializer(); + } else { + throw new KafkaException("Could not determine the inner serde class instance using \"" + innerSerdePropertyName + "\" property."); + } + inner.configure(configs, isKey); + primitiveSize = FIXED_LENGTH_DESERIALIZERS.get(inner.getClass()); + } catch (final ClassNotFoundException e) { + throw new ConfigException(innerSerdePropertyName, innerSerdeClassOrName, "Deserializer's inner serde class \"" + innerSerdeClassOrName + "\" could not be found."); + } + } + + @SuppressWarnings("unchecked") + private List createListInstance(int listSize) { + try { + Constructor> listConstructor; + try { + listConstructor = (Constructor>) listClass.getConstructor(Integer.TYPE); + return listConstructor.newInstance(listSize); + } catch (NoSuchMethodException e) { + listConstructor = (Constructor>) listClass.getConstructor(); + return listConstructor.newInstance(); + } + } catch (InstantiationException | IllegalAccessException | NoSuchMethodException | + IllegalArgumentException | InvocationTargetException e) { + throw new KafkaException("Could not construct a list instance of \"" + listClass.getCanonicalName() + "\"", e); + } + } + + private SerializationStrategy parseSerializationStrategyFlag(final int serializationStrategyFlag) throws IOException { + if (serializationStrategyFlag < 0 || serializationStrategyFlag >= SerializationStrategy.VALUES.length) { + throw new SerializationException("Invalid serialization strategy flag value"); + } + return SerializationStrategy.VALUES[serializationStrategyFlag]; + } + + private List deserializeNullIndexList(final DataInputStream dis) throws IOException { + int nullIndexListSize = dis.readInt(); + List nullIndexList = new ArrayList<>(nullIndexListSize); + while (nullIndexListSize != 0) { + nullIndexList.add(dis.readInt()); + nullIndexListSize--; + } + return nullIndexList; + } + + @Override + public List deserialize(String topic, byte[] data) { + if (data == null) { + return null; + } + try (final DataInputStream dis = new DataInputStream(new ByteArrayInputStream(data))) { + SerializationStrategy serStrategy = parseSerializationStrategyFlag(dis.readByte()); + List nullIndexList = null; + if (serStrategy == SerializationStrategy.CONSTANT_SIZE) { + // In CONSTANT_SIZE strategy, indexes of null entries are decoded from a null index list + nullIndexList = deserializeNullIndexList(dis); + } + final int size = dis.readInt(); + List deserializedList = createListInstance(size); + for (int i = 0; i < size; i++) { + int entrySize = serStrategy == SerializationStrategy.CONSTANT_SIZE ? primitiveSize : dis.readInt(); + if (entrySize == ListSerde.NULL_ENTRY_VALUE || (nullIndexList != null && nullIndexList.contains(i))) { + deserializedList.add(null); + continue; + } + byte[] payload = new byte[entrySize]; + if (dis.read(payload) == -1) { + throw new SerializationException("End of the stream was reached prematurely"); + } + deserializedList.add(inner.deserialize(topic, payload)); + } + return deserializedList; + } catch (IOException e) { + throw new KafkaException("Unable to deserialize into a List", e); + } + } + + @Override + public void close() { + if (inner != null) { + inner.close(); + } + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ListSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/ListSerializer.java new file mode 100644 index 0000000000000..6274c9d3f7b53 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/serialization/ListSerializer.java @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.serialization; + +import java.util.ArrayList; +import java.util.Iterator; +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.utils.Utils; + +import java.io.ByteArrayOutputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import static org.apache.kafka.common.serialization.Serdes.ListSerde.SerializationStrategy; + +public class ListSerializer implements Serializer> { + + private static final List>> FIXED_LENGTH_SERIALIZERS = Arrays.asList( + ShortSerializer.class, + IntegerSerializer.class, + FloatSerializer.class, + LongSerializer.class, + DoubleSerializer.class, + UUIDSerializer.class); + + private Serializer inner; + private SerializationStrategy serStrategy; + + public ListSerializer() {} + + public ListSerializer(Serializer inner) { + if (inner == null) { + throw new IllegalArgumentException("ListSerializer requires \"serializer\" parameter to be provided during initialization"); + } + this.inner = inner; + this.serStrategy = FIXED_LENGTH_SERIALIZERS.contains(inner.getClass()) ? SerializationStrategy.CONSTANT_SIZE : SerializationStrategy.VARIABLE_SIZE; + } + + public Serializer getInnerSerializer() { + return inner; + } + + @SuppressWarnings("unchecked") + @Override + public void configure(Map configs, boolean isKey) { + if (inner != null) { + throw new ConfigException("List serializer was already initialized using a non-default constructor"); + } + final String innerSerdePropertyName = isKey ? CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS : CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS; + final Object innerSerdeClassOrName = configs.get(innerSerdePropertyName); + if (innerSerdeClassOrName == null) { + throw new ConfigException("Not able to determine the serializer class because it was neither passed via the constructor nor set in the config."); + } + try { + if (innerSerdeClassOrName instanceof String) { + inner = Utils.newInstance((String) innerSerdeClassOrName, Serde.class).serializer(); + } else if (innerSerdeClassOrName instanceof Class) { + inner = (Serializer) ((Serde) Utils.newInstance((Class) innerSerdeClassOrName)).serializer(); + } else { + throw new KafkaException("Could not create a serializer class instance using \"" + innerSerdePropertyName + "\" property."); + } + inner.configure(configs, isKey); + serStrategy = FIXED_LENGTH_SERIALIZERS.contains(inner.getClass()) ? SerializationStrategy.CONSTANT_SIZE : SerializationStrategy.VARIABLE_SIZE; + } catch (final ClassNotFoundException e) { + throw new ConfigException(innerSerdePropertyName, innerSerdeClassOrName, "Serializer class " + innerSerdeClassOrName + " could not be found."); + } + } + + private void serializeNullIndexList(final DataOutputStream out, List data) throws IOException { + int i = 0; + List nullIndexList = new ArrayList<>(); + for (Iterator it = data.listIterator(); it.hasNext(); i++) { + if (it.next() == null) { + nullIndexList.add(i); + } + } + out.writeInt(nullIndexList.size()); + for (int nullIndex : nullIndexList) { + out.writeInt(nullIndex); + } + } + + @Override + public byte[] serialize(String topic, List data) { + if (data == null) { + return null; + } + try (final ByteArrayOutputStream baos = new ByteArrayOutputStream(); + final DataOutputStream out = new DataOutputStream(baos)) { + out.writeByte(serStrategy.ordinal()); // write serialization strategy flag + if (serStrategy == SerializationStrategy.CONSTANT_SIZE) { + // In CONSTANT_SIZE strategy, indexes of null entries are encoded in a null index list + serializeNullIndexList(out, data); + } + final int size = data.size(); + out.writeInt(size); + for (Inner entry : data) { + if (entry == null) { + if (serStrategy == SerializationStrategy.VARIABLE_SIZE) { + out.writeInt(Serdes.ListSerde.NULL_ENTRY_VALUE); + } + } else { + final byte[] bytes = inner.serialize(topic, entry); + if (serStrategy == SerializationStrategy.VARIABLE_SIZE) { + out.writeInt(bytes.length); + } + out.write(bytes); + } + } + return baos.toByteArray(); + } catch (IOException e) { + throw new KafkaException("Failed to serialize List", e); + } + } + + @Override + public void close() { + if (inner != null) { + inner.close(); + } + } + +} diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java b/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java index 347bf8713ece8..4a150e0c022ba 100644 --- a/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java +++ b/clients/src/main/java/org/apache/kafka/common/serialization/Serdes.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.utils.Bytes; import java.nio.ByteBuffer; +import java.util.List; import java.util.Map; import java.util.UUID; @@ -125,6 +126,27 @@ public UUIDSerde() { } } + static public final class ListSerde extends WrapperSerde> { + + final static int NULL_ENTRY_VALUE = -1; + + enum SerializationStrategy { + CONSTANT_SIZE, + VARIABLE_SIZE; + + public static final SerializationStrategy[] VALUES = SerializationStrategy.values(); + } + + public ListSerde() { + super(new ListSerializer<>(), new ListDeserializer<>()); + } + + public > ListSerde(Class listClass, Serde serde) { + super(new ListSerializer<>(serde.serializer()), new ListDeserializer<>(listClass, serde.deserializer())); + } + + } + @SuppressWarnings("unchecked") static public Serde serdeFrom(Class type) { if (String.class.isAssignableFrom(type)) { @@ -265,4 +287,12 @@ static public Serde ByteArray() { static public Serde Void() { return new VoidSerde(); } + + /* + * A serde for {@code List} type + */ + static public , Inner> Serde> ListSerde(Class listClass, Serde innerSerde) { + return new ListSerde<>(listClass, innerSerde); + } + } diff --git a/clients/src/test/java/org/apache/kafka/common/serialization/ListDeserializerTest.java b/clients/src/test/java/org/apache/kafka/common/serialization/ListDeserializerTest.java new file mode 100644 index 0000000000000..aff01e3fe8a89 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/serialization/ListDeserializerTest.java @@ -0,0 +1,251 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.serialization; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.config.ConfigException; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; +import org.junit.jupiter.api.Test; + +@SuppressWarnings("unchecked") +public class ListDeserializerTest { + private final ListDeserializer listDeserializer = new ListDeserializer<>(); + private final Map props = new HashMap<>(); + private final String nonExistingClass = "non.existing.class"; + private static class FakeObject { + } + + @Test + public void testListKeyDeserializerNoArgConstructorsWithClassNames() { + props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_TYPE_CLASS, ArrayList.class.getName()); + props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS, Serdes.StringSerde.class.getName()); + listDeserializer.configure(props, true); + final Deserializer inner = listDeserializer.innerDeserializer(); + assertNotNull(inner, "Inner deserializer should be not null"); + assertTrue(inner instanceof StringDeserializer, "Inner deserializer type should be StringDeserializer"); + } + + @Test + public void testListValueDeserializerNoArgConstructorsWithClassNames() { + props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS, ArrayList.class.getName()); + props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS, Serdes.IntegerSerde.class.getName()); + listDeserializer.configure(props, false); + final Deserializer inner = listDeserializer.innerDeserializer(); + assertNotNull(inner, "Inner deserializer should be not null"); + assertTrue(inner instanceof IntegerDeserializer, "Inner deserializer type should be IntegerDeserializer"); + } + + @Test + public void testListKeyDeserializerNoArgConstructorsWithClassObjects() { + props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_TYPE_CLASS, ArrayList.class); + props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS, Serdes.StringSerde.class); + listDeserializer.configure(props, true); + final Deserializer inner = listDeserializer.innerDeserializer(); + assertNotNull(inner, "Inner deserializer should be not null"); + assertTrue(inner instanceof StringDeserializer, "Inner deserializer type should be StringDeserializer"); + } + + @Test + public void testListValueDeserializerNoArgConstructorsWithClassObjects() { + props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS, ArrayList.class); + props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS, Serdes.StringSerde.class); + listDeserializer.configure(props, false); + final Deserializer inner = listDeserializer.innerDeserializer(); + assertNotNull(inner, "Inner deserializer should be not null"); + assertTrue(inner instanceof StringDeserializer, "Inner deserializer type should be StringDeserializer"); + } + + @Test + public void testListKeyDeserializerNoArgConstructorsShouldThrowConfigExceptionDueMissingInnerClassProp() { + props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_TYPE_CLASS, ArrayList.class); + final ConfigException exception = assertThrows( + ConfigException.class, + () -> listDeserializer.configure(props, true) + ); + assertEquals("Not able to determine the inner serde class because " + + "it was neither passed via the constructor nor set in the config.", exception.getMessage()); + } + + @Test + public void testListValueDeserializerNoArgConstructorsShouldThrowConfigExceptionDueMissingInnerClassProp() { + props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS, ArrayList.class); + final ConfigException exception = assertThrows( + ConfigException.class, + () -> listDeserializer.configure(props, false) + ); + assertEquals("Not able to determine the inner serde class because " + + "it was neither passed via the constructor nor set in the config.", exception.getMessage()); + } + + @Test + public void testListKeyDeserializerNoArgConstructorsShouldThrowConfigExceptionDueMissingTypeClassProp() { + props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS, Serdes.StringSerde.class); + final ConfigException exception = assertThrows( + ConfigException.class, + () -> listDeserializer.configure(props, true) + ); + assertEquals("Not able to determine the list class because " + + "it was neither passed via the constructor nor set in the config.", exception.getMessage()); + } + + @Test + public void testListValueDeserializerNoArgConstructorsShouldThrowConfigExceptionDueMissingTypeClassProp() { + props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS, Serdes.StringSerde.class); + final ConfigException exception = assertThrows( + ConfigException.class, + () -> listDeserializer.configure(props, false) + ); + assertEquals("Not able to determine the list class because " + + "it was neither passed via the constructor nor set in the config.", exception.getMessage()); + } + + @Test + public void testListKeyDeserializerNoArgConstructorsShouldThrowKafkaExceptionDueInvalidTypeClass() { + props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_TYPE_CLASS, new FakeObject()); + props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS, Serdes.StringSerde.class); + final KafkaException exception = assertThrows( + KafkaException.class, + () -> listDeserializer.configure(props, true) + ); + assertEquals("Could not determine the list class instance using " + + "\"" + CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_TYPE_CLASS + "\" property.", exception.getMessage()); + } + + @Test + public void testListValueDeserializerNoArgConstructorsShouldThrowKafkaExceptionDueInvalidTypeClass() { + props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS, new FakeObject()); + props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS, Serdes.StringSerde.class); + final KafkaException exception = assertThrows( + KafkaException.class, + () -> listDeserializer.configure(props, false) + ); + assertEquals("Could not determine the list class instance using " + + "\"" + CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS + "\" property.", exception.getMessage()); + } + + @Test + public void testListKeyDeserializerNoArgConstructorsShouldThrowKafkaExceptionDueInvalidInnerClass() { + props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_TYPE_CLASS, ArrayList.class); + props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS, new FakeObject()); + final KafkaException exception = assertThrows( + KafkaException.class, + () -> listDeserializer.configure(props, true) + ); + assertEquals("Could not determine the inner serde class instance using " + + "\"" + CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS + "\" property.", exception.getMessage()); + } + + @Test + public void testListValueDeserializerNoArgConstructorsShouldThrowKafkaExceptionDueInvalidInnerClass() { + props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS, ArrayList.class); + props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS, new FakeObject()); + final KafkaException exception = assertThrows( + KafkaException.class, + () -> listDeserializer.configure(props, false) + ); + assertEquals("Could not determine the inner serde class instance using " + + "\"" + CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS + "\" property.", exception.getMessage()); + } + + @Test + public void testListKeyDeserializerNoArgConstructorsShouldThrowConfigExceptionDueListClassNotFound() { + props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_TYPE_CLASS, nonExistingClass); + props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS, Serdes.StringSerde.class); + final ConfigException exception = assertThrows( + ConfigException.class, + () -> listDeserializer.configure(props, true) + ); + assertEquals("Invalid value " + nonExistingClass + " for configuration " + + CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_TYPE_CLASS + ": Deserializer's list class " + + "\"" + nonExistingClass + "\" could not be found.", exception.getMessage()); + } + + @Test + public void testListValueDeserializerNoArgConstructorsShouldThrowConfigExceptionDueListClassNotFound() { + props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS, nonExistingClass); + props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS, Serdes.StringSerde.class); + final ConfigException exception = assertThrows( + ConfigException.class, + () -> listDeserializer.configure(props, false) + ); + assertEquals("Invalid value " + nonExistingClass + " for configuration " + + CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS + ": Deserializer's list class " + + "\"" + nonExistingClass + "\" could not be found.", exception.getMessage()); + } + + @Test + public void testListKeyDeserializerNoArgConstructorsShouldThrowConfigExceptionDueInnerSerdeClassNotFound() { + props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_TYPE_CLASS, ArrayList.class); + props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS, nonExistingClass); + final ConfigException exception = assertThrows( + ConfigException.class, + () -> listDeserializer.configure(props, true) + ); + assertEquals("Invalid value " + nonExistingClass + " for configuration " + + CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS + ": Deserializer's inner serde class " + + "\"" + nonExistingClass + "\" could not be found.", exception.getMessage()); + } + + @Test + public void testListValueDeserializerNoArgConstructorsShouldThrowConfigExceptionDueInnerSerdeClassNotFound() { + props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS, ArrayList.class); + props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS, nonExistingClass); + final ConfigException exception = assertThrows( + ConfigException.class, + () -> listDeserializer.configure(props, false) + ); + assertEquals("Invalid value " + nonExistingClass + " for configuration " + + CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS + ": Deserializer's inner serde class " + + "\"" + nonExistingClass + "\" could not be found.", exception.getMessage()); + } + + @Test + public void testListKeyDeserializerShouldThrowConfigExceptionDueAlreadyInitialized() { + props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_TYPE_CLASS, ArrayList.class); + props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS, Serdes.StringSerde.class); + final ListDeserializer initializedListDeserializer = new ListDeserializer<>(ArrayList.class, + Serdes.Integer().deserializer()); + final ConfigException exception = assertThrows( + ConfigException.class, + () -> initializedListDeserializer.configure(props, true) + ); + assertEquals("List deserializer was already initialized using a non-default constructor", exception.getMessage()); + } + + @Test + public void testListValueDeserializerShouldThrowConfigExceptionDueAlreadyInitialized() { + props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS, ArrayList.class); + props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS, Serdes.StringSerde.class); + final ListDeserializer initializedListDeserializer = new ListDeserializer<>(ArrayList.class, + Serdes.Integer().deserializer()); + final ConfigException exception = assertThrows( + ConfigException.class, + () -> initializedListDeserializer.configure(props, true) + ); + assertEquals("List deserializer was already initialized using a non-default constructor", exception.getMessage()); + } + +} diff --git a/clients/src/test/java/org/apache/kafka/common/serialization/ListSerializerTest.java b/clients/src/test/java/org/apache/kafka/common/serialization/ListSerializerTest.java new file mode 100644 index 0000000000000..a8ab191cad203 --- /dev/null +++ b/clients/src/test/java/org/apache/kafka/common/serialization/ListSerializerTest.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.serialization; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.config.ConfigException; +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Map; + + +public class ListSerializerTest { + private final ListSerializer listSerializer = new ListSerializer<>(); + private final Map props = new HashMap<>(); + private final String nonExistingClass = "non.existing.class"; + private static class FakeObject { + } + + @Test + public void testListKeySerializerNoArgConstructorsWithClassName() { + props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS, Serdes.StringSerde.class.getName()); + listSerializer.configure(props, true); + final Serializer inner = listSerializer.getInnerSerializer(); + assertNotNull(inner, "Inner serializer should be not null"); + assertTrue(inner instanceof StringSerializer, "Inner serializer type should be StringSerializer"); + } + + @Test + public void testListValueSerializerNoArgConstructorsWithClassName() { + props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS, Serdes.StringSerde.class.getName()); + listSerializer.configure(props, false); + final Serializer inner = listSerializer.getInnerSerializer(); + assertNotNull(inner, "Inner serializer should be not null"); + assertTrue(inner instanceof StringSerializer, "Inner serializer type should be StringSerializer"); + } + + @Test + public void testListKeySerializerNoArgConstructorsWithClassObject() { + props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS, Serdes.StringSerde.class); + listSerializer.configure(props, true); + final Serializer inner = listSerializer.getInnerSerializer(); + assertNotNull(inner, "Inner serializer should be not null"); + assertTrue(inner instanceof StringSerializer, "Inner serializer type should be StringSerializer"); + } + + @Test + public void testListValueSerializerNoArgConstructorsWithClassObject() { + props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS, Serdes.StringSerde.class); + listSerializer.configure(props, false); + final Serializer inner = listSerializer.getInnerSerializer(); + assertNotNull(inner, "Inner serializer should be not null"); + assertTrue(inner instanceof StringSerializer, "Inner serializer type should be StringSerializer"); + } + + @Test + public void testListSerializerNoArgConstructorsShouldThrowConfigExceptionDueMissingProp() { + ConfigException exception = assertThrows( + ConfigException.class, + () -> listSerializer.configure(props, true) + ); + assertEquals("Not able to determine the serializer class because it was neither passed via the constructor nor set in the config.", exception.getMessage()); + + exception = assertThrows( + ConfigException.class, + () -> listSerializer.configure(props, false) + ); + assertEquals("Not able to determine the serializer class because it was neither passed via the constructor nor set in the config.", exception.getMessage()); + } + + @Test + public void testListKeySerializerNoArgConstructorsShouldThrowKafkaExceptionDueInvalidClass() { + props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS, new FakeObject()); + final KafkaException exception = assertThrows( + KafkaException.class, + () -> listSerializer.configure(props, true) + ); + assertEquals("Could not create a serializer class instance using \"" + CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS + "\" property.", exception.getMessage()); + } + + @Test + public void testListValueSerializerNoArgConstructorsShouldThrowKafkaExceptionDueInvalidClass() { + props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS, new FakeObject()); + final KafkaException exception = assertThrows( + KafkaException.class, + () -> listSerializer.configure(props, false) + ); + assertEquals("Could not create a serializer class instance using \"" + CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS + "\" property.", exception.getMessage()); + } + + @Test + public void testListKeySerializerNoArgConstructorsShouldThrowKafkaExceptionDueClassNotFound() { + props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS, nonExistingClass); + final KafkaException exception = assertThrows( + KafkaException.class, + () -> listSerializer.configure(props, true) + ); + assertEquals("Invalid value non.existing.class for configuration " + CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS + ": Serializer class " + nonExistingClass + " could not be found.", exception.getMessage()); + } + + @Test + public void testListValueSerializerNoArgConstructorsShouldThrowKafkaExceptionDueClassNotFound() { + props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS, nonExistingClass); + final KafkaException exception = assertThrows( + KafkaException.class, + () -> listSerializer.configure(props, false) + ); + assertEquals("Invalid value non.existing.class for configuration " + CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS + ": Serializer class " + nonExistingClass + " could not be found.", exception.getMessage()); + } + + @Test + public void testListKeySerializerShouldThrowConfigExceptionDueAlreadyInitialized() { + props.put(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS, Serdes.StringSerde.class); + final ListSerializer initializedListSerializer = new ListSerializer<>(Serdes.Integer().serializer()); + final ConfigException exception = assertThrows( + ConfigException.class, + () -> initializedListSerializer.configure(props, true) + ); + assertEquals("List serializer was already initialized using a non-default constructor", exception.getMessage()); + } + + @Test + public void testListValueSerializerShouldThrowConfigExceptionDueAlreadyInitialized() { + props.put(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS, Serdes.StringSerde.class); + final ListSerializer initializedListSerializer = new ListSerializer<>(Serdes.Integer().serializer()); + final ConfigException exception = assertThrows( + ConfigException.class, + () -> initializedListSerializer.configure(props, false) + ); + assertEquals("List serializer was already initialized using a non-default constructor", exception.getMessage()); + } + +} diff --git a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java index 0446bafc2fbe4..85c09dd17ae09 100644 --- a/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java +++ b/clients/src/test/java/org/apache/kafka/common/serialization/SerializationTest.java @@ -27,7 +27,11 @@ import java.util.List; import java.util.Map; import java.util.UUID; +import java.util.ArrayList; +import java.util.LinkedList; +import java.util.Stack; +import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -106,6 +110,190 @@ public void stringSerdeShouldSupportDifferentEncodings() { } } + @SuppressWarnings("unchecked") + @Test + public void listSerdeShouldReturnEmptyCollection() { + List testData = Arrays.asList(); + Serde> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.Integer()); + assertEquals(testData, + listSerde.deserializer().deserialize(topic, listSerde.serializer().serialize(topic, testData)), + "Should get empty collection after serialization and deserialization on an empty list"); + } + + @SuppressWarnings("unchecked") + @Test + public void listSerdeShouldReturnNull() { + List testData = null; + Serde> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.Integer()); + assertEquals(testData, + listSerde.deserializer().deserialize(topic, listSerde.serializer().serialize(topic, testData)), + "Should get null after serialization and deserialization on an empty list"); + } + + @SuppressWarnings("unchecked") + @Test + public void listSerdeShouldRoundtripIntPrimitiveInput() { + List testData = Arrays.asList(1, 2, 3); + Serde> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.Integer()); + assertEquals(testData, + listSerde.deserializer().deserialize(topic, listSerde.serializer().serialize(topic, testData)), + "Should get the original collection of integer primitives after serialization and deserialization"); + } + + @SuppressWarnings("unchecked") + @Test + public void listSerdeSerializerShouldReturnByteArrayOfFixedSizeForIntPrimitiveInput() { + List testData = Arrays.asList(1, 2, 3); + Serde> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.Integer()); + assertEquals(21, listSerde.serializer().serialize(topic, testData).length, + "Should get length of 21 bytes after serialization"); + } + + @SuppressWarnings("unchecked") + @Test + public void listSerdeShouldRoundtripShortPrimitiveInput() { + List testData = Arrays.asList((short) 1, (short) 2, (short) 3); + Serde> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.Short()); + assertEquals(testData, + listSerde.deserializer().deserialize(topic, listSerde.serializer().serialize(topic, testData)), + "Should get the original collection of short primitives after serialization and deserialization"); + } + + @SuppressWarnings("unchecked") + @Test + public void listSerdeSerializerShouldReturnByteArrayOfFixedSizeForShortPrimitiveInput() { + List testData = Arrays.asList((short) 1, (short) 2, (short) 3); + Serde> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.Short()); + assertEquals(15, listSerde.serializer().serialize(topic, testData).length, + "Should get length of 15 bytes after serialization"); + } + + @SuppressWarnings("unchecked") + @Test + public void listSerdeShouldRoundtripFloatPrimitiveInput() { + List testData = Arrays.asList((float) 1, (float) 2, (float) 3); + Serde> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.Float()); + assertEquals(testData, + listSerde.deserializer().deserialize(topic, listSerde.serializer().serialize(topic, testData)), + "Should get the original collection of float primitives after serialization and deserialization"); + } + + @SuppressWarnings("unchecked") + @Test + public void listSerdeSerializerShouldReturnByteArrayOfFixedSizeForFloatPrimitiveInput() { + List testData = Arrays.asList((float) 1, (float) 2, (float) 3); + Serde> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.Float()); + assertEquals(21, listSerde.serializer().serialize(topic, testData).length, + "Should get length of 21 bytes after serialization"); + } + + @SuppressWarnings("unchecked") + @Test + public void listSerdeShouldRoundtripLongPrimitiveInput() { + List testData = Arrays.asList((long) 1, (long) 2, (long) 3); + Serde> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.Long()); + assertEquals(testData, + listSerde.deserializer().deserialize(topic, listSerde.serializer().serialize(topic, testData)), + "Should get the original collection of long primitives after serialization and deserialization"); + } + + @SuppressWarnings("unchecked") + @Test + public void listSerdeSerializerShouldReturnByteArrayOfFixedSizeForLongPrimitiveInput() { + List testData = Arrays.asList((long) 1, (long) 2, (long) 3); + Serde> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.Long()); + assertEquals(33, listSerde.serializer().serialize(topic, testData).length, + "Should get length of 33 bytes after serialization"); + } + + @SuppressWarnings("unchecked") + @Test + public void listSerdeShouldRoundtripDoublePrimitiveInput() { + List testData = Arrays.asList((double) 1, (double) 2, (double) 3); + Serde> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.Double()); + assertEquals(testData, + listSerde.deserializer().deserialize(topic, listSerde.serializer().serialize(topic, testData)), + "Should get the original collection of double primitives after serialization and deserialization"); + } + + @SuppressWarnings("unchecked") + @Test + public void listSerdeSerializerShouldReturnByteArrayOfFixedSizeForDoublePrimitiveInput() { + List testData = Arrays.asList((double) 1, (double) 2, (double) 3); + Serde> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.Double()); + assertEquals(33, listSerde.serializer().serialize(topic, testData).length, + "Should get length of 33 bytes after serialization"); + } + + @SuppressWarnings("unchecked") + @Test + public void listSerdeShouldRoundtripUUIDInput() { + List testData = Arrays.asList(UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID()); + Serde> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.UUID()); + assertEquals(testData, + listSerde.deserializer().deserialize(topic, listSerde.serializer().serialize(topic, testData)), + "Should get the original collection of UUID after serialization and deserialization"); + } + + @SuppressWarnings("unchecked") + @Test + public void listSerdeSerializerShouldReturnByteArrayOfFixedSizeForUUIDInput() { + List testData = Arrays.asList(UUID.randomUUID(), UUID.randomUUID(), UUID.randomUUID()); + Serde> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.UUID()); + assertEquals(117, listSerde.serializer().serialize(topic, testData).length, + "Should get length of 117 bytes after serialization"); + } + + @SuppressWarnings("unchecked") + @Test + public void listSerdeShouldRoundtripNonPrimitiveInput() { + List testData = Arrays.asList("A", "B", "C"); + Serde> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.String()); + assertEquals(testData, + listSerde.deserializer().deserialize(topic, listSerde.serializer().serialize(topic, testData)), + "Should get the original collection of strings list after serialization and deserialization"); + } + + @SuppressWarnings("unchecked") + @Test + public void listSerdeShouldRoundtripPrimitiveInputWithNullEntries() { + List testData = Arrays.asList(1, null, 3); + Serde> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.Integer()); + assertEquals(testData, + listSerde.deserializer().deserialize(topic, listSerde.serializer().serialize(topic, testData)), + "Should get the original collection of integer primitives with null entries " + + "after serialization and deserialization"); + } + + @SuppressWarnings("unchecked") + @Test + public void listSerdeShouldRoundtripNonPrimitiveInputWithNullEntries() { + List testData = Arrays.asList("A", null, "C"); + Serde> listSerde = Serdes.ListSerde(ArrayList.class, Serdes.String()); + assertEquals(testData, + listSerde.deserializer().deserialize(topic, listSerde.serializer().serialize(topic, testData)), + "Should get the original collection of strings list with null entries " + + "after serialization and deserialization"); + } + + @SuppressWarnings("unchecked") + @Test + public void listSerdeShouldReturnLinkedList() { + List testData = new LinkedList<>(); + Serde> listSerde = Serdes.ListSerde(LinkedList.class, Serdes.Integer()); + assertTrue(listSerde.deserializer().deserialize(topic, listSerde.serializer().serialize(topic, testData)) + instanceof LinkedList, "Should return List instance of type LinkedList"); + } + + @SuppressWarnings("unchecked") + @Test + public void listSerdeShouldReturnStack() { + List testData = new Stack<>(); + Serde> listSerde = Serdes.ListSerde(Stack.class, Serdes.Integer()); + assertTrue(listSerde.deserializer().deserialize(topic, listSerde.serializer().serialize(topic, testData)) + instanceof Stack, "Should return List instance of type Stack"); + } + @Test public void floatDeserializerShouldThrowSerializationExceptionOnZeroBytes() { try (Serde serde = Serdes.Float()) { diff --git a/docs/streams/developer-guide/datatypes.html b/docs/streams/developer-guide/datatypes.html index e6930bc31ce83..2201b5b69d35a 100644 --- a/docs/streams/developer-guide/datatypes.html +++ b/docs/streams/developer-guide/datatypes.html @@ -140,10 +140,12 @@

Primitive and basic typesUUID Serdes.UUID() - Void Serdes.Void() + List + Serdes.ListSerde() +
diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index de747bb862f33..3ae4573d2071f 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -150,6 +150,11 @@

Streams API (meaning: use broker default replication factor). The replication.factor value of -1 requires broker version 2.4 or newer.

+

The new serde type was introduced ListSerde:

+
    +
  • Added class ListSerde to (de)serialize List-based objects
  • +
  • Introduced ListSerializer and ListDeserializer to power the new functionality
  • +

Streams API changes in 2.8.0

diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 0e9e6a149f9b6..274322ee218d4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -643,6 +643,26 @@ public class StreamsConfig extends AbstractConfig { Serdes.ByteArraySerde.class.getName(), Importance.MEDIUM, DEFAULT_KEY_SERDE_CLASS_DOC) + .define(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS, + Type.CLASS, + null, + Importance.MEDIUM, + CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_INNER_CLASS_DOC) + .define(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS, + Type.CLASS, + null, + Importance.MEDIUM, + CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_INNER_CLASS_DOC) + .define(CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_TYPE_CLASS, + Type.CLASS, + null, + Importance.MEDIUM, + CommonClientConfigs.DEFAULT_LIST_KEY_SERDE_TYPE_CLASS_DOC) + .define(CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS, + Type.CLASS, + null, + Importance.MEDIUM, + CommonClientConfigs.DEFAULT_LIST_VALUE_SERDE_TYPE_CLASS_DOC) .define(DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, Type.CLASS, DefaultProductionExceptionHandler.class.getName(),