diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/BooleanDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/BooleanDeserializer.java new file mode 100644 index 0000000000000..132ea336bb516 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/serialization/BooleanDeserializer.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.serialization; + +import java.util.Map; + +import org.apache.kafka.common.errors.SerializationException; + +public class BooleanDeserializer implements Deserializer { + + public void configure(Map configs, boolean isKey) { + // nothing to do + } + + public Boolean deserialize(String topic, byte[] data) { + if (data == null) + return null; + if (data.length != 1) { + throw new SerializationException("Size of data received by IntegerDeserializer is " + + "not 1"); + } + return data[0] == 1; + } + + public void close() { + // nothing to do + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/BooleanSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/BooleanSerializer.java new file mode 100644 index 0000000000000..ee1cdce3dd9c8 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/serialization/BooleanSerializer.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.serialization; + +import java.util.Map; + +public class BooleanSerializer implements Serializer { + + public void configure(Map configs, boolean isKey) { + // nothing to do + } + + public byte[] serialize(String topic, Boolean data) { + if (data == null) + return null; + + byte b = (byte)(data?1:0); + return new byte[] {b}; + } + + public void close() { + // nothing to do + } +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ByteDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/ByteDeserializer.java new file mode 100644 index 0000000000000..d356406c53f69 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/serialization/ByteDeserializer.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.serialization; + +import java.util.Map; + +import org.apache.kafka.common.errors.SerializationException; + +public class ByteDeserializer implements Deserializer { + + public void configure(Map configs, boolean isKey) { + // nothing to do + } + + public Byte deserialize(String topic, byte[] data) { + if (data == null) + return null; + if (data.length != 1) { + throw new SerializationException("Size of data received by IntegerDeserializer is " + + "not 1"); + } + return data[0]; + } + + public void close() { + // nothing to do + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ByteSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/ByteSerializer.java new file mode 100644 index 0000000000000..a9162d29928c9 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/serialization/ByteSerializer.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.serialization; + +import java.util.Map; + +public class ByteSerializer implements Serializer { + + public void configure(Map configs, boolean isKey) { + // nothing to do + } + + public byte[] serialize(String topic, Byte data) { + if (data == null) + return null; + + return new byte[] {data}; + } + + public void close() { + // nothing to do + } +} \ No newline at end of file diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java new file mode 100644 index 0000000000000..d2836aa15cf29 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/serialization/ShortDeserializer.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.serialization; + +import java.util.Map; + +import org.apache.kafka.common.errors.SerializationException; + +public class ShortDeserializer implements Deserializer { + + public void configure(Map configs, boolean isKey) { + // nothing to do + } + + public Short deserialize(String topic, byte[] data) { + if (data == null) + return null; + if (data.length != 2) { + throw new SerializationException("Size of data received by IntegerDeserializer is " + + "not 2"); + } + + short value = 0; + for (byte b : data) { + value <<= 8; + value |= b & 0xFF; + } + return value; + } + + public void close() { + // nothing to do + } +} diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ShortSerializer.java b/clients/src/main/java/org/apache/kafka/common/serialization/ShortSerializer.java new file mode 100644 index 0000000000000..a66aaa09685d5 --- /dev/null +++ b/clients/src/main/java/org/apache/kafka/common/serialization/ShortSerializer.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.common.serialization; + +import java.util.Map; + +public class ShortSerializer implements Serializer { + + public void configure(Map configs, boolean isKey) { + // nothing to do + } + + public byte[] serialize(String topic, Short data) { + if (data == null) + return null; + + return new byte[] { + (byte) (data >>> 8), + data.byteValue() + }; + } + + public void close() { + // nothing to do + } +} \ No newline at end of file diff --git a/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectHeader.java b/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectHeader.java new file mode 100644 index 0000000000000..e96b9ece28d0f --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectHeader.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.connector; + +import org.apache.kafka.connect.data.Schema; + +/** + *

+ * Class for headers containing data to be copied to/from Kafka. This corresponds closely to + * Kafka's Header classes, and holds the data that may be used by both + * sources and sinks (key, valueSchema, value). + *

+ */ +public class ConnectHeader { + + private final String key; + private final Schema valueSchema; + private final Object value; + + public ConnectHeader(String key, Schema valueSchema, Object value) { + this.key = key; + this.valueSchema = valueSchema; + this.value = value; + } + + public String key() { + return key; + } + + public Object value() { + return value; + } + + public Schema valueSchema() { + return valueSchema; + } + +} \ No newline at end of file diff --git a/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java b/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java index 344e365fd3f08..2899afd9c05aa 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java @@ -16,6 +16,9 @@ */ package org.apache.kafka.connect.connector; +import java.util.ArrayList; +import java.util.List; + import org.apache.kafka.connect.data.Schema; /** @@ -34,11 +37,12 @@ public abstract class ConnectRecord> { private final Schema valueSchema; private final Object value; private final Long timestamp; + private final List headers; public ConnectRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value, - Long timestamp) { + Long timestamp, List headers) { this.topic = topic; this.kafkaPartition = kafkaPartition; this.keySchema = keySchema; @@ -46,6 +50,7 @@ public ConnectRecord(String topic, Integer kafkaPartition, this.valueSchema = valueSchema; this.value = value; this.timestamp = timestamp; + this.headers = headers == null ? new ArrayList() : new ArrayList<>(headers); } public String topic() { @@ -71,13 +76,22 @@ public Object value() { public Schema valueSchema() { return valueSchema; } + + public List headers() { + return headers; + } public Long timestamp() { return timestamp; } /** Generate a new record of the same type as itself, with the specified parameter values. **/ - public abstract R newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp); + public R newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp) { + return newRecord(topic, kafkaPartition, keySchema, key, valueSchema, value, timestamp, this.headers()); + } + + /** Generate a new record of the same type as itself, with the specified parameter values. **/ + public abstract R newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp, List headers); @Override public String toString() { @@ -87,7 +101,8 @@ public String toString() { ", key=" + key + ", value=" + value + ", timestamp=" + timestamp + - '}'; + ", headers=" + headers + + '}'; } @Override @@ -113,6 +128,8 @@ public boolean equals(Object o) { return false; if (timestamp != null ? !timestamp.equals(that.timestamp) : that.timestamp != null) return false; + if (!headers.equals(that.headers)) + return false; return true; } @@ -126,6 +143,7 @@ public int hashCode() { result = 31 * result + (valueSchema != null ? valueSchema.hashCode() : 0); result = 31 * result + (value != null ? value.hashCode() : 0); result = 31 * result + (timestamp != null ? timestamp.hashCode() : 0); + result = 31 * result + headers.hashCode(); return result; } } diff --git a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java index e03a1f1396f07..86e5e9da071d2 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java @@ -16,7 +16,10 @@ */ package org.apache.kafka.connect.sink; +import java.util.List; + import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.connect.connector.ConnectHeader; import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.data.Schema; @@ -38,7 +41,12 @@ public SinkRecord(String topic, int partition, Schema keySchema, Object key, Sch public SinkRecord(String topic, int partition, Schema keySchema, Object key, Schema valueSchema, Object value, long kafkaOffset, Long timestamp, TimestampType timestampType) { - super(topic, partition, keySchema, key, valueSchema, value, timestamp); + this(topic, partition, keySchema, key, valueSchema, value, kafkaOffset, timestamp, timestampType, null); + } + + public SinkRecord(String topic, int partition, Schema keySchema, Object key, Schema valueSchema, Object value, long kafkaOffset, + Long timestamp, TimestampType timestampType, List headers) { + super(topic, partition, keySchema, key, valueSchema, value, timestamp, headers); this.kafkaOffset = kafkaOffset; this.timestampType = timestampType; } @@ -52,8 +60,8 @@ public TimestampType timestampType() { } @Override - public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp) { - return new SinkRecord(topic, kafkaPartition, keySchema, key, valueSchema, value, kafkaOffset(), timestamp, timestampType); + public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp, List headers) { + return new SinkRecord(topic, kafkaPartition, keySchema, key, valueSchema, value, kafkaOffset(), timestamp, timestampType, headers); } @Override diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java index 2f3e5e471c934..f0866aba2c7a7 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java @@ -16,9 +16,11 @@ */ package org.apache.kafka.connect.source; +import org.apache.kafka.connect.connector.ConnectHeader; import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.data.Schema; +import java.util.List; import java.util.Map; /** @@ -69,7 +71,15 @@ public SourceRecord(Map sourcePartition, Map sourceOffset, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp) { - super(topic, partition, keySchema, key, valueSchema, value, timestamp); + this(sourcePartition, sourceOffset, topic, partition, keySchema, key, valueSchema, value, timestamp, null); + } + + public SourceRecord(Map sourcePartition, Map sourceOffset, + String topic, Integer partition, + Schema keySchema, Object key, + Schema valueSchema, Object value, + Long timestamp, List headers) { + super(topic, partition, keySchema, key, valueSchema, value, timestamp, headers); this.sourcePartition = sourcePartition; this.sourceOffset = sourceOffset; } @@ -83,8 +93,8 @@ public SourceRecord(Map sourcePartition, Map sourceOffset, } @Override - public SourceRecord newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp) { - return new SourceRecord(sourcePartition, sourceOffset, topic, kafkaPartition, keySchema, key, valueSchema, value, timestamp); + public SourceRecord newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp, List headers) { + return new SourceRecord(sourcePartition, sourceOffset, topic, kafkaPartition, keySchema, key, valueSchema, value, timestamp, headers); } @Override diff --git a/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java b/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java index 507489345b9e9..59d752706ba56 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/storage/Converter.java @@ -44,7 +44,7 @@ public interface Converter { */ byte[] fromConnectData(String topic, Schema schema, Object value); - /** + /**toConnectData * Convert a native object to a Kafka Connect data object. * @param topic the topic associated with the data * @param value the value to convert diff --git a/connect/api/src/main/java/org/apache/kafka/connect/storage/MemorySubjectSchemaRepo.java b/connect/api/src/main/java/org/apache/kafka/connect/storage/MemorySubjectSchemaRepo.java new file mode 100644 index 0000000000000..6f0c69725ff6d --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/storage/MemorySubjectSchemaRepo.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.storage; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.connect.data.Schema; + +public class MemorySubjectSchemaRepo implements SubjectSchemaRepo { + + public Map subjectSchemas = new HashMap<>(); + + @Override + public void register(String key, Schema schema) { + subjectSchemas.put(key, schema); + } + + @Override + public Schema schema(String key){ + return subjectSchemas.get(key); + } +} diff --git a/connect/api/src/main/java/org/apache/kafka/connect/storage/PrimativeSubjectConverter.java b/connect/api/src/main/java/org/apache/kafka/connect/storage/PrimativeSubjectConverter.java new file mode 100644 index 0000000000000..5fb9f35162ab8 --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/storage/PrimativeSubjectConverter.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.storage; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.serialization.BooleanDeserializer; +import org.apache.kafka.common.serialization.BooleanSerializer; +import org.apache.kafka.common.serialization.ByteArrayDeserializer; +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.Deserializer; +import org.apache.kafka.common.serialization.DoubleDeserializer; +import org.apache.kafka.common.serialization.DoubleSerializer; +import org.apache.kafka.common.serialization.FloatDeserializer; +import org.apache.kafka.common.serialization.FloatSerializer; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.LongDeserializer; +import org.apache.kafka.common.serialization.LongSerializer; +import org.apache.kafka.common.serialization.Serializer; +import org.apache.kafka.common.serialization.ShortDeserializer; +import org.apache.kafka.common.serialization.ShortSerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.data.SchemaBuilder; + +public class PrimativeSubjectConverter implements SubjectConverter { + + + private static Map schemaSerializerMap = new HashMap<>(); + private static Map schemaDeserializerMap = new HashMap<>(); + + static { + register(Schema.BYTES_SCHEMA, new ByteArraySerializer(), new ByteArrayDeserializer()); + register(Schema.INT8_SCHEMA, new ShortSerializer(), new ShortDeserializer()); + register(Schema.INT16_SCHEMA, new IntegerSerializer(), new IntegerDeserializer()); + register(Schema.INT32_SCHEMA, new IntegerSerializer(), new IntegerDeserializer()); + register(Schema.INT64_SCHEMA, new LongSerializer(), new LongDeserializer()); + register(Schema.FLOAT32_SCHEMA, new FloatSerializer(), new FloatDeserializer()); + register(Schema.FLOAT64_SCHEMA, new DoubleSerializer(), new DoubleDeserializer()); + register(Schema.BOOLEAN_SCHEMA, new BooleanSerializer(), new BooleanDeserializer()); + register(Schema.STRING_SCHEMA, new StringSerializer(), new StringDeserializer()); + } + + private static void register(Schema schema, Serializer serializer, Deserializer deserializer){ + schemaSerializerMap.put(schema, serializer); + schemaDeserializerMap.put(schema, deserializer); + } + + private SubjectSchemaRepo subjectSchemaRepo = new MemorySubjectSchemaRepo(); + private Schema defaultSchema = Schema.BYTES_SCHEMA; + + + @Override + public void configure(Map configs, boolean isKey) { + for(String configKey : configs.keySet()) { + if (configKey.startsWith("converter.subject.")) { + String subject = configKey.substring("converter.subject.".length()); + Schema.Type type = Schema.Type.valueOf(((String) configs.get(configKey)).toUpperCase()); + if (type != null && type.isPrimitive()) { + subjectSchemaRepo.register(subject, SchemaBuilder.type(type).build()); + } + } + } + } + + @Override + public byte[] fromConnectData(String topic, String subject, Schema schema, Object value) { + Schema registeredSchema = subjectSchemaRepo.schema(subject); + if (registeredSchema == null){ + subjectSchemaRepo.register(subject, schema); + } else if (!schema.type().equals(registeredSchema.type())) { + throw new SerializationException("schema type for subject does not match registered schema type subject=" + subject); + } else if (!schema.type().isPrimitive()) { + throw new SerializationException("only primitive types are supported subject=" + subject); + } + return schemaSerializerMap.get(schema).serialize(topic, value); + } + + @Override + public SchemaAndValue toConnectData(String topic, String subject, byte[] value) { + Schema schema = subjectSchemaRepo.schema(subject); + if (schema == null){ + schema = defaultSchema; + } + Deserializer deserializer = schemaDeserializerMap.get(schema); + Object object = deserializer.deserialize(topic, value); + return new SchemaAndValue(schema, object); + } + +} diff --git a/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java b/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java index 85fef841b0a3d..b430e588e6abb 100644 --- a/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java +++ b/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java @@ -36,7 +36,7 @@ * this class can also be configured to use the same encoding for both encoding and decoding with the converter.encoding * setting. */ -public class StringConverter implements Converter { +public class StringConverter implements Converter, SubjectConverter { private final StringSerializer serializer = new StringSerializer(); private final StringDeserializer deserializer = new StringDeserializer(); @@ -60,6 +60,16 @@ public void configure(Map configs, boolean isKey) { deserializer.configure(deserializerConfigs, isKey); } + @Override + public byte[] fromConnectData(String topic, String subject, Schema schema, Object value) { + return fromConnectData(topic, schema, value); + } + + @Override + public SchemaAndValue toConnectData(String topic, String subject, byte[] value) { + return toConnectData(topic, value); + } + @Override public byte[] fromConnectData(String topic, Schema schema, Object value) { try { diff --git a/connect/api/src/main/java/org/apache/kafka/connect/storage/SubjectConverter.java b/connect/api/src/main/java/org/apache/kafka/connect/storage/SubjectConverter.java new file mode 100644 index 0000000000000..c8428f9bab8f0 --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/storage/SubjectConverter.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.storage; + +import java.util.Map; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; + +/** + * The Converter interface provides support for translating between Kafka Connect's runtime data format + * and byte[]. Internally, this likely includes an intermediate step to the format used by the serialization + * layer (e.g. JsonNode, GenericRecord, Message). + */ +public interface SubjectConverter { + + /** + * Configure this class. + * @param configs configs in key/value pairs + * @param isKey whether is for key or value + */ + void configure(Map configs, boolean isKey); + + /** + * Convert a Kafka Connect data object to a native object for serialization. + * @param topic the topic associated with the data + * @param schema the schema for the value + * @param value the value to convert + * @return the serialized value + */ + byte[] fromConnectData(String topic, String subject, Schema schema, Object value); + + /**toConnectData + * Convert a native object to a Kafka Connect data object. + * @param topic the topic associated with the data + * @param value the value to convert + * @return an object containing the {@link Schema} and the converted value + */ + SchemaAndValue toConnectData(String topic, String subject, byte[] value); +} diff --git a/connect/api/src/main/java/org/apache/kafka/connect/storage/SubjectSchemaRepo.java b/connect/api/src/main/java/org/apache/kafka/connect/storage/SubjectSchemaRepo.java new file mode 100644 index 0000000000000..74ff2a23fb299 --- /dev/null +++ b/connect/api/src/main/java/org/apache/kafka/connect/storage/SubjectSchemaRepo.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.storage; + +import org.apache.kafka.connect.data.Schema; + +public interface SubjectSchemaRepo { + void register(String key, Schema schema); + + Schema schema(String key); +} diff --git a/connect/api/src/test/java/org/apache/kafka/connect/storage/PrimativeSubjectConverterTest.java b/connect/api/src/test/java/org/apache/kafka/connect/storage/PrimativeSubjectConverterTest.java new file mode 100644 index 0000000000000..cbc734cacb624 --- /dev/null +++ b/connect/api/src/test/java/org/apache/kafka/connect/storage/PrimativeSubjectConverterTest.java @@ -0,0 +1,96 @@ +package org.apache.kafka.connect.storage; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.UnsupportedEncodingException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.junit.Before; +import org.junit.Test; + +/** + * Created by pearcem on 03/05/2017. + */ +public class PrimativeSubjectConverterTest { + + private SubjectConverter converter; + + @Before + public void setup(){ + converter = new PrimativeSubjectConverter(); + + Map configs = new HashMap<>(); + + configs.put("converter.subject.MyStringHeader", "String"); + configs.put("converter.subject.MyIntegerHeader", "int32"); + configs.put("converter.subject.MyShortHeader", "int16"); + configs.put("converter.subject.MyBytesHeader", "bytes"); + + converter.configure(configs, false); + } + + @Test + public void testBytes() throws UnsupportedEncodingException { + + byte[] fromKafka = "test".getBytes(); + SchemaAndValue schemaAndValue = converter.toConnectData("topic", "MyBytesHeader", fromKafka); + + assertEquals(Schema.BYTES_SCHEMA, schemaAndValue.schema()); + assertTrue(Arrays.equals(fromKafka, (byte[]) schemaAndValue.value())); + + byte[] toKafka = converter.fromConnectData("topic", "MyBytesHeader", schemaAndValue.schema(), schemaAndValue.value() ); + + assertTrue(Arrays.equals(fromKafka, toKafka)); + + } + + @Test + public void testInteger() throws UnsupportedEncodingException { + + Integer integer = 42; + IntegerSerializer integerSerializer = new IntegerSerializer(); + byte[] fromKafka = integerSerializer.serialize("", integer); + + SchemaAndValue schemaAndValue = converter.toConnectData("topic", "MyIntegerHeader", fromKafka); + + assertEquals(Schema.INT32_SCHEMA, schemaAndValue.schema()); + assertEquals(integer, schemaAndValue.value()); + + byte[] toKafka = converter.fromConnectData("topic", "MyIntegerHeader", schemaAndValue.schema(), schemaAndValue.value() ); + + assertTrue(Arrays.equals(fromKafka, toKafka)); + + } + + @Test + public void testString() throws UnsupportedEncodingException { + + String string = "test"; + StringSerializer stringSerializer = new StringSerializer(); + byte[] fromKafka = stringSerializer.serialize("", string); + + + SchemaAndValue schemaAndValue = converter.toConnectData("topic", "MyStringHeader", fromKafka); + + assertEquals(Schema.STRING_SCHEMA, schemaAndValue.schema()); + assertEquals(string, schemaAndValue.value()); + + + byte[] toKafka = converter.fromConnectData("topic", "MyStringHeader", schemaAndValue.schema(), schemaAndValue.value() ); + + assertTrue(Arrays.equals(fromKafka, toKafka)); + + + + } + + +} diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/converters/ByteArrayConverter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/converters/ByteArrayConverter.java index 05dff27dce220..67527e2899655 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/converters/ByteArrayConverter.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/converters/ByteArrayConverter.java @@ -21,18 +21,29 @@ import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.storage.SubjectConverter; import java.util.Map; /** * Pass-through converter for raw byte data. */ -public class ByteArrayConverter implements Converter { +public class ByteArrayConverter implements Converter, SubjectConverter { @Override public void configure(Map configs, boolean isKey) { } + @Override + public byte[] fromConnectData(String topic, String subject, Schema schema, Object value) { + return fromConnectData(topic, schema, value); + } + + @Override + public SchemaAndValue toConnectData(String topic, String subject, byte[] value) { + return toConnectData(topic, value); + } + @Override public byte[] fromConnectData(String topic, Schema schema, Object value) { if (schema != null && schema.type() != Schema.Type.BYTES) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java index 74aef627d710c..3bace6e924759 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java @@ -70,6 +70,11 @@ public class ConnectorConfig extends AbstractConfig { public static final String VALUE_CONVERTER_CLASS_DOC = WorkerConfig.VALUE_CONVERTER_CLASS_DOC; public static final String VALUE_CONVERTER_CLASS_DISPLAY = "Value converter class"; + public static final String HEADER_CONVERTER_CLASS_CONFIG = WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG; + public static final String HEADER_CONVERTER_CLASS_DOC = WorkerConfig.HEADER_CONVERTER_CLASS_DOC; + public static final String HEADER_CONVERTER_CLASS_DEFAULT = WorkerConfig.HEADER_CONVERTER_CLASS_DEFAULT; + public static final String HEADER_CONVERTER_CLASS_DISPLAY = "Header converter class"; + public static final String TASKS_MAX_CONFIG = "tasks.max"; private static final String TASKS_MAX_DOC = "Maximum number of tasks to use for this connector."; public static final int TASKS_MAX_DEFAULT = 1; @@ -88,6 +93,7 @@ public static ConfigDef configDef() { .define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, atLeast(TASKS_MIN_CONFIG), Importance.HIGH, TASKS_MAX_DOC, COMMON_GROUP, 3, Width.SHORT, TASK_MAX_DISPLAY) .define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, null, Importance.LOW, KEY_CONVERTER_CLASS_DOC, COMMON_GROUP, 4, Width.SHORT, KEY_CONVERTER_CLASS_DISPLAY) .define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, null, Importance.LOW, VALUE_CONVERTER_CLASS_DOC, COMMON_GROUP, 5, Width.SHORT, VALUE_CONVERTER_CLASS_DISPLAY) + .define(HEADER_CONVERTER_CLASS_CONFIG, Type.CLASS, HEADER_CONVERTER_CLASS_DEFAULT, Importance.LOW, HEADER_CONVERTER_CLASS_DEFAULT, COMMON_GROUP, 6, Width.SHORT, HEADER_CONVERTER_CLASS_DISPLAY) .define(TRANSFORMS_CONFIG, Type.LIST, null, new ConfigDef.Validator() { @Override public void ensureValid(String name, Object value) { @@ -97,7 +103,7 @@ public void ensureValid(String name, Object value) { throw new ConfigException(name, value, "Duplicate alias provided."); } } - }, Importance.LOW, TRANSFORMS_DOC, TRANSFORMS_GROUP, 6, Width.LONG, TRANSFORMS_DISPLAY); + }, Importance.LOW, TRANSFORMS_DOC, TRANSFORMS_GROUP, 7, Width.LONG, TRANSFORMS_DISPLAY); } public ConnectorConfig() { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java index 400ae0856a07c..cda22c2fee3fb 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java @@ -33,6 +33,7 @@ import org.apache.kafka.connect.storage.OffsetStorageReader; import org.apache.kafka.connect.storage.OffsetStorageReaderImpl; import org.apache.kafka.connect.storage.OffsetStorageWriter; +import org.apache.kafka.connect.storage.SubjectConverter; import org.apache.kafka.connect.util.ConnectorTaskId; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,6 +70,7 @@ public class Worker { private final WorkerConfig config; private final Converter defaultKeyConverter; private final Converter defaultValueConverter; + private final SubjectConverter defaultHeaderConverter; private final Converter internalKeyConverter; private final Converter internalValueConverter; private final OffsetBackingStore offsetBackingStore; @@ -88,6 +90,8 @@ public Worker(String workerId, Time time, ConnectorFactory connectorFactory, Wor this.defaultKeyConverter.configure(config.originalsWithPrefix("key.converter."), true); this.defaultValueConverter = config.getConfiguredInstance(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Converter.class); this.defaultValueConverter.configure(config.originalsWithPrefix("value.converter."), false); + this.defaultHeaderConverter = config.getConfiguredInstance(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, SubjectConverter.class); + this.defaultHeaderConverter.configure(config.originalsWithPrefix("header.converter."), false); this.internalKeyConverter = config.getConfiguredInstance(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, Converter.class); this.internalKeyConverter.configure(config.originalsWithPrefix("internal.key.converter."), true); this.internalValueConverter = config.getConfiguredInstance(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, Converter.class); @@ -326,7 +330,13 @@ public boolean startTask( else valueConverter = defaultValueConverter; - workerTask = buildWorkerTask(connConfig, id, task, statusListener, initialState, keyConverter, valueConverter); + SubjectConverter headerConverter = connConfig.getConfiguredInstance(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, SubjectConverter.class); + if (valueConverter != null) + headerConverter.configure(connConfig.originalsWithPrefix("header.converter."), false); + else + headerConverter = defaultHeaderConverter; + + workerTask = buildWorkerTask(connConfig, id, task, statusListener, initialState, keyConverter, valueConverter, headerConverter); workerTask.initialize(taskConfig); } catch (Throwable t) { log.error("Failed to start task {}", id, t); @@ -351,7 +361,8 @@ private WorkerTask buildWorkerTask(ConnectorConfig connConfig, TaskStatus.Listener statusListener, TargetState initialState, Converter keyConverter, - Converter valueConverter) { + Converter valueConverter, + SubjectConverter headerConverter) { // Decide which type of worker task we need based on the type of task. if (task instanceof SourceTask) { TransformationChain transformationChain = new TransformationChain<>(connConfig.transformations()); @@ -361,11 +372,11 @@ private WorkerTask buildWorkerTask(ConnectorConfig connConfig, internalKeyConverter, internalValueConverter); KafkaProducer producer = new KafkaProducer<>(producerProps); return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter, - valueConverter, transformationChain, producer, offsetReader, offsetWriter, config, time); + valueConverter, headerConverter, transformationChain, producer, offsetReader, offsetWriter, config, time); } else if (task instanceof SinkTask) { TransformationChain transformationChain = new TransformationChain<>(connConfig.transformations()); return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, keyConverter, - valueConverter, transformationChain, time); + valueConverter, headerConverter, transformationChain, time); } else { log.error("Tasks must be a subclass of either SourceTask or SinkTask", task); throw new ConnectException("Tasks must be a subclass of either SourceTask or SinkTask"); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java index 680edaf97dcb4..edfea07be769e 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java @@ -20,6 +20,9 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; +import org.apache.kafka.connect.converters.ByteArrayConverter; +import org.apache.kafka.connect.storage.StringConverter; +import org.apache.kafka.connect.storage.SubjectConverter; import java.util.Map; @@ -54,6 +57,15 @@ public class WorkerConfig extends AbstractConfig { " independent of connectors it allows any connector to work with any serialization format." + " Examples of common formats include JSON and Avro."; + public static final String HEADER_CONVERTER_CLASS_CONFIG = "header.converter"; + public static final String HEADER_CONVERTER_CLASS_DOC = + "Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka." + + " This controls the format of the headers in messages written to or read from Kafka, and since this is" + + " independent of connectors it allows any connector to work with any serialization format." + + " Examples of common formats include JSON and Avro."; + public static final String HEADER_CONVERTER_CLASS_DEFAULT = "org.apache.kafka.connect.storage.StringConverter"; + + public static final String INTERNAL_KEY_CONVERTER_CLASS_CONFIG = "internal.key.converter"; public static final String INTERNAL_KEY_CONVERTER_CLASS_DOC = "Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka." + @@ -135,6 +147,9 @@ protected static ConfigDef baseConfigDef() { Importance.HIGH, KEY_CONVERTER_CLASS_DOC) .define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_CONVERTER_CLASS_DOC) + .define(HEADER_CONVERTER_CLASS_CONFIG, Type.CLASS, + HEADER_CONVERTER_CLASS_DEFAULT, + Importance.MEDIUM, HEADER_CONVERTER_CLASS_DOC) .define(INTERNAL_KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, Importance.LOW, INTERNAL_KEY_CONVERTER_CLASS_DOC) .define(INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index d5f337d4b1c61..114585df003fc 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -26,14 +26,17 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.header.Header; import org.apache.kafka.common.utils.Time; import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.connector.ConnectHeader; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.errors.RetriableException; import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.storage.SubjectConverter; import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.SinkUtils; @@ -61,6 +64,7 @@ class WorkerSinkTask extends WorkerTask { private final Time time; private final Converter keyConverter; private final Converter valueConverter; + private final SubjectConverter headerConverter; private final TransformationChain transformationChain; private KafkaConsumer consumer; private WorkerSinkTaskContext context; @@ -82,6 +86,7 @@ public WorkerSinkTask(ConnectorTaskId id, WorkerConfig workerConfig, Converter keyConverter, Converter valueConverter, + SubjectConverter headerConverter, TransformationChain transformationChain, Time time) { super(id, statusListener, initialState); @@ -90,6 +95,7 @@ public WorkerSinkTask(ConnectorTaskId id, this.task = task; this.keyConverter = keyConverter; this.valueConverter = valueConverter; + this.headerConverter = headerConverter; this.transformationChain = transformationChain; this.time = time; this.messageBatch = new ArrayList<>(); @@ -398,12 +404,17 @@ private void convertMessages(ConsumerRecords msgs) { log.trace("Consuming message with key {}, value {}", msg.key(), msg.value()); SchemaAndValue keyAndSchema = keyConverter.toConnectData(msg.topic(), msg.key()); SchemaAndValue valueAndSchema = valueConverter.toConnectData(msg.topic(), msg.value()); + List connectHeaders = new ArrayList<>(); + for(Header header : msg.headers()) { + SchemaAndValue headerValueAndSchema = headerConverter.toConnectData(msg.topic(), header.key(), header.value()); + connectHeaders.add(new ConnectHeader(header.key(), headerValueAndSchema.schema(), headerValueAndSchema.value())); + } SinkRecord record = new SinkRecord(msg.topic(), msg.partition(), keyAndSchema.schema(), keyAndSchema.value(), valueAndSchema.schema(), valueAndSchema.value(), msg.offset(), ConnectUtils.checkAndConvertTimestamp(msg.timestamp()), - msg.timestampType()); + msg.timestampType(), connectHeaders); record = transformationChain.apply(record); if (record != null) { messageBatch.add(record); diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java index ed15b85e5152e..a900289fe7186 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java @@ -23,12 +23,14 @@ import org.apache.kafka.common.KafkaException; import org.apache.kafka.common.errors.RetriableException; import org.apache.kafka.common.utils.Time; +import org.apache.kafka.connect.connector.ConnectHeader; import org.apache.kafka.connect.errors.ConnectException; import org.apache.kafka.connect.source.SourceRecord; import org.apache.kafka.connect.source.SourceTask; import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.OffsetStorageReader; import org.apache.kafka.connect.storage.OffsetStorageWriter; +import org.apache.kafka.connect.storage.SubjectConverter; import org.apache.kafka.connect.util.ConnectUtils; import org.apache.kafka.connect.util.ConnectorTaskId; import org.slf4j.Logger; @@ -55,6 +57,7 @@ class WorkerSourceTask extends WorkerTask { private final SourceTask task; private final Converter keyConverter; private final Converter valueConverter; + private final SubjectConverter headerConverter; private final TransformationChain transformationChain; private KafkaProducer producer; private final OffsetStorageReader offsetReader; @@ -81,6 +84,7 @@ public WorkerSourceTask(ConnectorTaskId id, TargetState initialState, Converter keyConverter, Converter valueConverter, + SubjectConverter headerConverter, TransformationChain transformationChain, KafkaProducer producer, OffsetStorageReader offsetReader, @@ -93,6 +97,7 @@ public WorkerSourceTask(ConnectorTaskId id, this.task = task; this.keyConverter = keyConverter; this.valueConverter = valueConverter; + this.headerConverter = headerConverter; this.transformationChain = transformationChain; this.producer = producer; this.offsetReader = offsetReader; @@ -197,6 +202,12 @@ private boolean sendRecords() { byte[] value = valueConverter.fromConnectData(record.topic(), record.valueSchema(), record.value()); final ProducerRecord producerRecord = new ProducerRecord<>(record.topic(), record.kafkaPartition(), ConnectUtils.checkAndConvertTimestamp(record.timestamp()), key, value); + + for(ConnectHeader connectHeader : record.headers()){ + byte[] headerValue = headerConverter.fromConnectData(record.topic(), connectHeader.key(), connectHeader.valueSchema(), connectHeader.value()); + producerRecord.headers().add(connectHeader.key(), headerValue); + } + log.trace("Appending record with key {}, value {}", record.key(), record.value()); // We need this queued first since the callback could happen immediately (even synchronously in some cases). // Because of this we need to be careful about handling retries -- we always save the previously attempted diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java index 26ac486c23638..2f5fc4982359f 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java @@ -35,6 +35,7 @@ import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.storage.SubjectConverter; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.MockTime; import org.easymock.Capture; @@ -108,6 +109,8 @@ public class WorkerSinkTaskTest { @Mock private Converter valueConverter; @Mock + private SubjectConverter headerConverter; + @Mock private TransformationChain transformationChain; @Mock private TaskStatus.Listener statusListener; @@ -123,6 +126,7 @@ public void setUp() { Map workerProps = new HashMap<>(); workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter"); workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter"); + workerProps.put("header.converter", "org.apache.kafka.connect.storage.StringConverter"); workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter"); workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter"); workerProps.put("internal.key.converter.schemas.enable", "false"); @@ -131,7 +135,7 @@ public void setUp() { workerConfig = new StandaloneConfig(workerProps); workerTask = PowerMock.createPartialMock( WorkerSinkTask.class, new String[]{"createConsumer"}, - taskId, sinkTask, statusListener, initialState, workerConfig, keyConverter, valueConverter, transformationChain, time); + taskId, sinkTask, statusListener, initialState, workerConfig, keyConverter, valueConverter, headerConverter, transformationChain, time); recordsReturned = 0; } @@ -140,7 +144,7 @@ public void setUp() { public void testStartPaused() throws Exception { workerTask = PowerMock.createPartialMock( WorkerSinkTask.class, new String[]{"createConsumer"}, - taskId, sinkTask, statusListener, TargetState.PAUSED, workerConfig, keyConverter, valueConverter, transformationChain, time); + taskId, sinkTask, statusListener, TargetState.PAUSED, workerConfig, keyConverter, valueConverter, headerConverter, transformationChain, time); expectInitializeTask(); expectPollInitialAssignment(); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java index fb7cf4fb54113..3e5323eef6ddc 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java @@ -33,6 +33,7 @@ import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.sink.SinkTask; import org.apache.kafka.connect.storage.Converter; +import org.apache.kafka.connect.storage.SubjectConverter; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.MockTime; import org.apache.kafka.connect.util.ThreadedTest; @@ -103,6 +104,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest { private WorkerConfig workerConfig; @Mock private Converter keyConverter; @Mock private Converter valueConverter; + @Mock private SubjectConverter subjectConverter; @Mock private TransformationChain transformationChain; private WorkerSinkTask workerTask; @Mock private KafkaConsumer consumer; @@ -120,6 +122,7 @@ public void setup() { Map workerProps = new HashMap<>(); workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter"); workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter"); + workerProps.put("header.converter", "org.apache.kafka.connect.storage.SubjectConverter"); workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter"); workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter"); workerProps.put("internal.key.converter.schemas.enable", "false"); @@ -128,7 +131,7 @@ public void setup() { workerConfig = new StandaloneConfig(workerProps); workerTask = PowerMock.createPartialMock( WorkerSinkTask.class, new String[]{"createConsumer"}, - taskId, sinkTask, statusListener, initialState, workerConfig, keyConverter, valueConverter, TransformationChain.noOp(), time); + taskId, sinkTask, statusListener, initialState, workerConfig, keyConverter, valueConverter, subjectConverter, TransformationChain.noOp(), time); recordsReturned = 0; } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java index 31204f01b2e0e..9226cc5b40c80 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java @@ -30,6 +30,7 @@ import org.apache.kafka.connect.storage.Converter; import org.apache.kafka.connect.storage.OffsetStorageReader; import org.apache.kafka.connect.storage.OffsetStorageWriter; +import org.apache.kafka.connect.storage.SubjectConverter; import org.apache.kafka.connect.util.Callback; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.ThreadedTest; @@ -85,6 +86,7 @@ public class WorkerSourceTaskTest extends ThreadedTest { @Mock private SourceTask sourceTask; @Mock private Converter keyConverter; @Mock private Converter valueConverter; + @Mock private SubjectConverter headerConverter; @Mock private TransformationChain transformationChain; @Mock private KafkaProducer producer; @Mock private OffsetStorageReader offsetReader; @@ -111,6 +113,7 @@ public void setup() { Map workerProps = new HashMap<>(); workerProps.put("key.converter", "org.apache.kafka.connect.json.JsonConverter"); workerProps.put("value.converter", "org.apache.kafka.connect.json.JsonConverter"); + workerProps.put("header.converter", "org.apache.kafka.connect.storage.StringConverter"); workerProps.put("internal.key.converter", "org.apache.kafka.connect.json.JsonConverter"); workerProps.put("internal.value.converter", "org.apache.kafka.connect.json.JsonConverter"); workerProps.put("internal.key.converter.schemas.enable", "false"); @@ -125,7 +128,7 @@ private void createWorkerTask() { } private void createWorkerTask(TargetState initialState) { - workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, initialState, keyConverter, valueConverter, transformationChain, + workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, initialState, keyConverter, valueConverter, headerConverter, transformationChain, producer, offsetReader, offsetWriter, config, Time.SYSTEM); } diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java index 47dfcef33f18c..79865b77bf008 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java @@ -34,6 +34,8 @@ import org.apache.kafka.connect.storage.OffsetBackingStore; import org.apache.kafka.connect.storage.OffsetStorageReader; import org.apache.kafka.connect.storage.OffsetStorageWriter; +import org.apache.kafka.connect.storage.StringConverter; +import org.apache.kafka.connect.storage.SubjectConverter; import org.apache.kafka.connect.util.ConnectorTaskId; import org.apache.kafka.connect.util.MockTime; import org.apache.kafka.connect.util.ThreadedTest; @@ -372,6 +374,7 @@ public void testAddRemoveTask() throws Exception { EasyMock.eq(TargetState.STARTED), EasyMock.anyObject(JsonConverter.class), EasyMock.anyObject(JsonConverter.class), + EasyMock.anyObject(StringConverter.class), EasyMock.eq(TransformationChain.noOp()), EasyMock.anyObject(KafkaProducer.class), EasyMock.anyObject(OffsetStorageReader.class), @@ -446,6 +449,7 @@ public void testCleanupTasksOnStop() throws Exception { EasyMock.eq(TargetState.STARTED), EasyMock.anyObject(JsonConverter.class), EasyMock.anyObject(JsonConverter.class), + EasyMock.anyObject(StringConverter.class), EasyMock.eq(TransformationChain.noOp()), EasyMock.anyObject(KafkaProducer.class), EasyMock.anyObject(OffsetStorageReader.class), @@ -493,6 +497,7 @@ public void testConverterOverrides() throws Exception { Capture keyConverter = EasyMock.newCapture(); Capture valueConverter = EasyMock.newCapture(); + Capture headerConverter = EasyMock.newCapture(); PowerMock.expectNew( WorkerSourceTask.class, EasyMock.eq(TASK_ID), @@ -501,6 +506,7 @@ public void testConverterOverrides() throws Exception { EasyMock.eq(TargetState.STARTED), EasyMock.capture(keyConverter), EasyMock.capture(valueConverter), + EasyMock.capture(headerConverter), EasyMock.eq(TransformationChain.noOp()), EasyMock.anyObject(KafkaProducer.class), EasyMock.anyObject(OffsetStorageReader.class), @@ -627,7 +633,7 @@ public void stop() { } } - public static class TestConverter implements Converter { + public static class TestConverter implements Converter, SubjectConverter { public Map configs; @Override @@ -635,6 +641,16 @@ public void configure(Map configs, boolean isKey) { this.configs = configs; } + @Override + public byte[] fromConnectData(String topic, String subject, Schema schema, Object value) { + return new byte[0]; + } + + @Override + public SchemaAndValue toConnectData(String topic, String subject, byte[] value) { + return null; + } + @Override public byte[] fromConnectData(String topic, Schema schema, Object value) { return new byte[0];