diff --git a/build.gradle b/build.gradle index 6f4d2501f7345..e191a51c3b89a 100644 --- a/build.gradle +++ b/build.gradle @@ -508,7 +508,7 @@ project(':core') { task siteDocsTar(dependsOn: ['genProtocolErrorDocs', 'genProtocolApiKeyDocs', 'genProtocolMessageDocs', 'genProducerConfigDocs', 'genConsumerConfigDocs', 'genKafkaConfigDocs', - 'genTopicConfigDocs', ':connect:runtime:genConnectConfigDocs', + 'genTopicConfigDocs', ':connect:runtime:genConnectConfigDocs', ':connect:runtime:genConnectTransformationDocs', ':streams:genStreamsConfigDocs'], type: Tar) { classifier = 'site-docs' compression = Compression.GZIP @@ -948,6 +948,13 @@ project(':connect:runtime') { if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() } standardOutput = new File(generatedDocsDir, "connect_config.html").newOutputStream() } + + task genConnectTransformationDocs(type: JavaExec) { + classpath = sourceSets.main.runtimeClasspath + main = 'org.apache.kafka.connect.tools.TransformationDoc' + if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() } + standardOutput = new File(generatedDocsDir, "connect_transforms.html").newOutputStream() + } } project(':connect:file') { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java new file mode 100644 index 0000000000000..6746042baa743 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.tools; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.transforms.ExtractField; +import org.apache.kafka.connect.transforms.HoistField; +import org.apache.kafka.connect.transforms.InsertField; +import org.apache.kafka.connect.transforms.MaskField; +import org.apache.kafka.connect.transforms.RegexRouter; +import org.apache.kafka.connect.transforms.ReplaceField; +import org.apache.kafka.connect.transforms.SetSchemaMetadata; +import org.apache.kafka.connect.transforms.TimestampRouter; +import org.apache.kafka.connect.transforms.ValueToKey; + +import java.io.PrintStream; +import java.util.Arrays; +import java.util.List; + +public class TransformationDoc { + + private static final class DocInfo { + final String transformationName; + final String overview; + final ConfigDef configDef; + + private DocInfo(String transformationName, String overview, ConfigDef configDef) { + this.transformationName = transformationName; + this.overview = overview; + this.configDef = configDef; + } + } + + private static final List TRANSFORMATIONS = Arrays.asList( + new DocInfo(InsertField.class.getName(), InsertField.OVERVIEW_DOC, InsertField.CONFIG_DEF), + new DocInfo(ReplaceField.class.getName(), ReplaceField.OVERVIEW_DOC, ReplaceField.CONFIG_DEF), + new DocInfo(MaskField.class.getName(), MaskField.OVERVIEW_DOC, MaskField.CONFIG_DEF), + new DocInfo(ValueToKey.class.getName(), ValueToKey.OVERVIEW_DOC, ValueToKey.CONFIG_DEF), + new DocInfo(HoistField.class.getName(), HoistField.OVERVIEW_DOC, HoistField.CONFIG_DEF), + new DocInfo(ExtractField.class.getName(), ExtractField.OVERVIEW_DOC, ExtractField.CONFIG_DEF), + new DocInfo(SetSchemaMetadata.class.getName(), SetSchemaMetadata.OVERVIEW_DOC, SetSchemaMetadata.CONFIG_DEF), + new DocInfo(TimestampRouter.class.getName(), TimestampRouter.OVERVIEW_DOC, TimestampRouter.CONFIG_DEF), + new DocInfo(RegexRouter.class.getName(), RegexRouter.OVERVIEW_DOC, RegexRouter.CONFIG_DEF) + ); + + private static void printTransformationHtml(PrintStream out, DocInfo docInfo) { + out.println("

"); + + out.print("
"); + out.print(docInfo.transformationName); + out.println("
"); + + out.println(docInfo.overview); + + out.println("

"); + + out.println(docInfo.configDef.toHtmlTable()); + + out.println("

"); + } + + private static void printHtml(PrintStream out) throws NoSuchFieldException, IllegalAccessException, InstantiationException { + for (final DocInfo docInfo : TRANSFORMATIONS) { + printTransformationHtml(out, docInfo); + } + } + + public static void main(String... args) throws Exception { + printHtml(System.out); + } + +} diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java new file mode 100644 index 0000000000000..b7063136e9df1 --- /dev/null +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.transforms; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.transforms.util.SimpleConfig; + +import java.util.Map; + +import static org.apache.kafka.connect.transforms.util.Requirements.requireMap; +import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct; + +public abstract class ExtractField> implements Transformation { + + public static final String OVERVIEW_DOC = + "Extract the specified field from a Struct when schema present, or a Map in the case of schemaless data." + + "

Use the concrete transformation type designed for the record key (" + Key.class.getCanonicalName() + ") " + + "or value (" + Value.class.getCanonicalName() + ")."; + + private static final String FIELD_CONFIG = "field"; + + public static final ConfigDef CONFIG_DEF = new ConfigDef() + .define(FIELD_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM, "Field name to extract."); + + private static final String PURPOSE = "field extraction"; + + private String fieldName; + + @Override + public void configure(Map props) { + final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props); + fieldName = config.getString(FIELD_CONFIG); + } + + @Override + public R apply(R record) { + final Schema schema = operatingSchema(record); + if (schema == null) { + final Map value = requireMap(operatingValue(record), PURPOSE); + return newRecord(record, null, value.get(fieldName)); + } else { + final Struct value = requireStruct(operatingValue(record), PURPOSE); + return newRecord(record, schema.field(fieldName).schema(), value.get(fieldName)); + } + } + + @Override + public void close() { + } + + @Override + public ConfigDef config() { + return CONFIG_DEF; + } + + protected abstract Schema operatingSchema(R record); + + protected abstract Object operatingValue(R record); + + protected abstract R newRecord(R record, Schema updatedSchema, Object updatedValue); + + public static class Key> extends ExtractField { + @Override + protected Schema operatingSchema(R record) { + return record.keySchema(); + } + + @Override + protected Object operatingValue(R record) { + return record.key(); + } + + @Override + protected R newRecord(R record, Schema updatedSchema, Object updatedValue) { + return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, updatedValue, record.valueSchema(), record.value(), record.timestamp()); + } + } + + public static class Value> extends ExtractField { + @Override + protected Schema operatingSchema(R record) { + return record.valueSchema(); + } + + @Override + protected Object operatingValue(R record) { + return record.value(); + } + + @Override + protected R newRecord(R record, Schema updatedSchema, Object updatedValue) { + return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, updatedValue, record.timestamp()); + } + } + +} diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HoistToStruct.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HoistField.java similarity index 73% rename from connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HoistToStruct.java rename to connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HoistField.java index c2726ca84378b..1f2ed7c8e626f 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HoistToStruct.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HoistField.java @@ -27,15 +27,21 @@ import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.transforms.util.SimpleConfig; +import java.util.Collections; import java.util.Map; -public abstract class HoistToStruct> implements Transformation { +public abstract class HoistField> implements Transformation { - public static final String FIELD_CONFIG = "field"; + public static final String OVERVIEW_DOC = + "Wrap data using the specified field name in a Struct when schema present, or a Map in the case of schemaless data." + + "

Use the concrete transformation type designed for the record key (" + Key.class.getCanonicalName() + ") " + + "or value (" + Value.class.getCanonicalName() + ")."; - private static final ConfigDef CONFIG_DEF = new ConfigDef() + private static final String FIELD_CONFIG = "field"; + + public static final ConfigDef CONFIG_DEF = new ConfigDef() .define(FIELD_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM, - "Field name for the single field that will be created in the resulting Struct."); + "Field name for the single field that will be created in the resulting Struct or Map."); private Cache schemaUpdateCache; @@ -53,15 +59,19 @@ public R apply(R record) { final Schema schema = operatingSchema(record); final Object value = operatingValue(record); - Schema updatedSchema = schemaUpdateCache.get(schema); - if (updatedSchema == null) { - updatedSchema = SchemaBuilder.struct().field(fieldName, schema).build(); - schemaUpdateCache.put(schema, updatedSchema); - } + if (schema == null) { + return newRecord(record, null, Collections.singletonMap(fieldName, value)); + } else { + Schema updatedSchema = schemaUpdateCache.get(schema); + if (updatedSchema == null) { + updatedSchema = SchemaBuilder.struct().field(fieldName, schema).build(); + schemaUpdateCache.put(schema, updatedSchema); + } - final Struct updatedValue = new Struct(updatedSchema).put(fieldName, value); + final Struct updatedValue = new Struct(updatedSchema).put(fieldName, value); - return newRecord(record, updatedSchema, updatedValue); + return newRecord(record, updatedSchema, updatedValue); + } } @Override @@ -80,11 +90,7 @@ public ConfigDef config() { protected abstract R newRecord(R record, Schema updatedSchema, Object updatedValue); - /** - * Wraps the record key in a {@link org.apache.kafka.connect.data.Struct} with specified field name. - */ - public static class Key> extends HoistToStruct { - + public static class Key> extends HoistField { @Override protected Schema operatingSchema(R record) { return record.keySchema(); @@ -99,14 +105,9 @@ protected Object operatingValue(R record) { protected R newRecord(R record, Schema updatedSchema, Object updatedValue) { return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, updatedValue, record.valueSchema(), record.value(), record.timestamp()); } - } - /** - * Wraps the record value in a {@link org.apache.kafka.connect.data.Struct} with specified field name. - */ - public static class Value> extends HoistToStruct { - + public static class Value> extends HoistField { @Override protected Schema operatingSchema(R record) { return record.valueSchema(); @@ -121,7 +122,6 @@ protected Object operatingValue(R record) { protected R newRecord(R record, Schema updatedSchema, Object updatedValue) { return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, updatedValue, record.timestamp()); } - } } diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java index d67fea0893237..f32d6ed2e9465 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java @@ -21,23 +21,32 @@ import org.apache.kafka.common.cache.LRUCache; import org.apache.kafka.common.cache.SynchronizedCache; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Timestamp; -import org.apache.kafka.connect.errors.DataException; -import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.transforms.util.SimpleConfig; +import org.apache.kafka.connect.transforms.util.SchemaUtil; import java.util.Date; import java.util.HashMap; import java.util.Map; +import static org.apache.kafka.connect.transforms.util.Requirements.requireMap; +import static org.apache.kafka.connect.transforms.util.Requirements.requireSinkRecord; +import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct; + public abstract class InsertField> implements Transformation { - public interface Keys { + public static final String OVERVIEW_DOC = + "Insert field(s) using attributes from the record metadata or a configured static value." + + "

Use the concrete transformation type designed for the record key (" + Key.class.getCanonicalName() + ") " + + "or value (" + Value.class.getCanonicalName() + ")."; + + private interface ConfigName { String TOPIC_FIELD = "topic.field"; String PARTITION_FIELD = "partition.field"; String OFFSET_FIELD = "offset.field"; @@ -46,22 +55,24 @@ public interface Keys { String STATIC_VALUE = "static.value"; } - private static final String OPTIONALITY_DOC = "Suffix with '!' to make this a required field, or '?' to keep it optional (the default)."; - - private static final ConfigDef CONFIG_DEF = new ConfigDef() - .define(Keys.TOPIC_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, - "Field name for Kafka topic.\n" + OPTIONALITY_DOC) - .define(Keys.PARTITION_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, - "Field name for Kafka partition.\n" + OPTIONALITY_DOC) - .define(Keys.OFFSET_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, - "Field name for Kafka offset - only applicable to sink connectors.\n" + OPTIONALITY_DOC) - .define(Keys.TIMESTAMP_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, - "Field name for record timestamp.\n" + OPTIONALITY_DOC) - .define(Keys.STATIC_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, - "Field name for static data field.\n" + OPTIONALITY_DOC) - .define(Keys.STATIC_VALUE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, + private static final String OPTIONALITY_DOC = "Suffix with ! to make this a required field, or ? to keep it optional (the default)."; + + public static final ConfigDef CONFIG_DEF = new ConfigDef() + .define(ConfigName.TOPIC_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, + "Field name for Kafka topic. " + OPTIONALITY_DOC) + .define(ConfigName.PARTITION_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, + "Field name for Kafka partition. " + OPTIONALITY_DOC) + .define(ConfigName.OFFSET_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, + "Field name for Kafka offset - only applicable to sink connectors.
" + OPTIONALITY_DOC) + .define(ConfigName.TIMESTAMP_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, + "Field name for record timestamp. " + OPTIONALITY_DOC) + .define(ConfigName.STATIC_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, + "Field name for static data field. " + OPTIONALITY_DOC) + .define(ConfigName.STATIC_VALUE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "Static field value, if field name configured."); + private static final String PURPOSE = "field insertion"; + private static final Schema OPTIONAL_TIMESTAMP_SCHEMA = Timestamp.builder().optional().build(); private static final class InsertionSpec { @@ -91,46 +102,42 @@ public static InsertionSpec parse(String spec) { private InsertionSpec timestampField; private InsertionSpec staticField; private String staticValue; - private boolean applicable; private Cache schemaUpdateCache; @Override public void configure(Map props) { final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props); - topicField = InsertionSpec.parse(config.getString(Keys.TOPIC_FIELD)); - partitionField = InsertionSpec.parse(config.getString(Keys.PARTITION_FIELD)); - offsetField = InsertionSpec.parse(config.getString(Keys.OFFSET_FIELD)); - timestampField = InsertionSpec.parse(config.getString(Keys.TIMESTAMP_FIELD)); - staticField = InsertionSpec.parse(config.getString(Keys.STATIC_FIELD)); - staticValue = config.getString(Keys.STATIC_VALUE); - applicable = topicField != null || partitionField != null || offsetField != null || timestampField != null; + topicField = InsertionSpec.parse(config.getString(ConfigName.TOPIC_FIELD)); + partitionField = InsertionSpec.parse(config.getString(ConfigName.PARTITION_FIELD)); + offsetField = InsertionSpec.parse(config.getString(ConfigName.OFFSET_FIELD)); + timestampField = InsertionSpec.parse(config.getString(ConfigName.TIMESTAMP_FIELD)); + staticField = InsertionSpec.parse(config.getString(ConfigName.STATIC_FIELD)); + staticValue = config.getString(ConfigName.STATIC_VALUE); + + if (topicField == null && partitionField == null && offsetField == null && timestampField == null && staticField == null) { + throw new ConfigException("No field insertion configured"); + } + + if (staticField != null && staticValue == null) { + throw new ConfigException(ConfigName.STATIC_VALUE, null, "No value specified for static field: " + staticField); + } schemaUpdateCache = new SynchronizedCache<>(new LRUCache(16)); } @Override public R apply(R record) { - if (!applicable) return record; - - final Schema schema = operatingSchema(record); - final Object value = operatingValue(record); - - if (value == null) - throw new DataException("null value"); - - if (schema == null) { - if (!(value instanceof Map)) - throw new DataException("Can only operate on Map value in schemaless mode: " + value.getClass().getName()); - return applySchemaless(record, (Map) value); + if (operatingSchema(record) == null) { + return applySchemaless(record); } else { - if (schema.type() != Schema.Type.STRUCT) - throw new DataException("Can only operate on Struct types: " + value.getClass().getName()); - return applyWithSchema(record, schema, (Struct) value); + return applyWithSchema(record); } } - private R applySchemaless(R record, Map value) { + private R applySchemaless(R record) { + final Map value = requireMap(operatingValue(record), PURPOSE); + final Map updatedValue = new HashMap<>(value); if (topicField != null) { @@ -140,9 +147,7 @@ private R applySchemaless(R record, Map value) { updatedValue.put(partitionField.name, record.kafkaPartition()); } if (offsetField != null) { - if (!(record instanceof SinkRecord)) - throw new DataException("Offset insertion is only supported for sink connectors, record is of type: " + record.getClass()); - updatedValue.put(offsetField.name, ((SinkRecord) record).kafkaOffset()); + updatedValue.put(offsetField.name, requireSinkRecord(record, PURPOSE).kafkaOffset()); } if (timestampField != null && record.timestamp() != null) { updatedValue.put(timestampField.name, record.timestamp()); @@ -150,36 +155,46 @@ private R applySchemaless(R record, Map value) { if (staticField != null && staticValue != null) { updatedValue.put(staticField.name, staticValue); } + return newRecord(record, null, updatedValue); } - private R applyWithSchema(R record, Schema schema, Struct value) { - Schema updatedSchema = schemaUpdateCache.get(schema); + private R applyWithSchema(R record) { + final Struct value = requireStruct(operatingValue(record), PURPOSE); + + Schema updatedSchema = schemaUpdateCache.get(value.schema()); if (updatedSchema == null) { - updatedSchema = makeUpdatedSchema(schema); - schemaUpdateCache.put(schema, updatedSchema); + updatedSchema = makeUpdatedSchema(value.schema()); + schemaUpdateCache.put(value.schema(), updatedSchema); } final Struct updatedValue = new Struct(updatedSchema); - copyFields(value, updatedValue); + for (Field field : value.schema().fields()) { + updatedValue.put(field.name(), value.get(field)); + } - insertFields(record, updatedValue); + if (topicField != null) { + updatedValue.put(topicField.name, record.topic()); + } + if (partitionField != null && record.kafkaPartition() != null) { + updatedValue.put(partitionField.name, record.kafkaPartition()); + } + if (offsetField != null) { + updatedValue.put(offsetField.name, requireSinkRecord(record, PURPOSE).kafkaOffset()); + } + if (timestampField != null && record.timestamp() != null) { + updatedValue.put(timestampField.name, new Date(record.timestamp())); + } + if (staticField != null && staticValue != null) { + updatedValue.put(staticField.name, staticValue); + } return newRecord(record, updatedSchema, updatedValue); } private Schema makeUpdatedSchema(Schema schema) { - final SchemaBuilder builder = SchemaBuilder.struct(); - - builder.name(schema.name()); - builder.version(schema.version()); - builder.doc(schema.doc()); - - final Map params = schema.parameters(); - if (params != null) { - builder.parameters(params); - } + final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct()); for (Field field : schema.fields()) { builder.field(field.name(), field.schema()); @@ -204,33 +219,6 @@ private Schema makeUpdatedSchema(Schema schema) { return builder.build(); } - private void copyFields(Struct value, Struct updatedValue) { - for (Field field : value.schema().fields()) { - updatedValue.put(field.name(), value.get(field)); - } - } - - private void insertFields(R record, Struct value) { - if (topicField != null) { - value.put(topicField.name, record.topic()); - } - if (partitionField != null && record.kafkaPartition() != null) { - value.put(partitionField.name, record.kafkaPartition()); - } - if (offsetField != null) { - if (!(record instanceof SinkRecord)) { - throw new DataException("Offset insertion is only supported for sink connectors, record is of type: " + record.getClass()); - } - value.put(offsetField.name, ((SinkRecord) record).kafkaOffset()); - } - if (timestampField != null && record.timestamp() != null) { - value.put(timestampField.name, new Date(record.timestamp())); - } - if (staticField != null && staticValue != null) { - value.put(staticField.name, staticValue); - } - } - @Override public void close() { schemaUpdateCache = null; @@ -247,10 +235,6 @@ public ConfigDef config() { protected abstract R newRecord(R record, Schema updatedSchema, Object updatedValue); - /** - * This transformation allows inserting configured attributes of the record metadata as fields in the record key. - * It also allows adding a static data field. - */ public static class Key> extends InsertField { @Override @@ -270,10 +254,6 @@ protected R newRecord(R record, Schema updatedSchema, Object updatedValue) { } - /** - * This transformation allows inserting configured attributes of the record metadata as fields in the record value. - * It also allows adding a static data field. - */ public static class Value> extends InsertField { @Override diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java new file mode 100644 index 0000000000000..d7ef2aa8429ad --- /dev/null +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java @@ -0,0 +1,172 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.transforms; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.transforms.util.NonEmptyListValidator; +import org.apache.kafka.connect.transforms.util.SimpleConfig; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.kafka.connect.transforms.util.Requirements.requireMap; +import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct; + +public abstract class MaskField> implements Transformation { + + public static final String OVERVIEW_DOC = + "Mask specified fields with a valid null value for the field type (i.e. 0, false, empty string, and so on)." + + "

Use the concrete transformation type designed for the record key (" + Key.class.getCanonicalName() + ") " + + "or value (" + Value.class.getCanonicalName() + ")."; + + private static final String FIELDS_CONFIG = "fields"; + + public static final ConfigDef CONFIG_DEF = new ConfigDef() + .define(FIELDS_CONFIG, ConfigDef.Type.LIST, ConfigDef.NO_DEFAULT_VALUE, new NonEmptyListValidator(), ConfigDef.Importance.HIGH, "Names of fields to mask."); + + private static final String PURPOSE = "mask fields"; + + private static final Map, Object> PRIMITIVE_VALUE_MAPPING = new HashMap<>(); + + static { + PRIMITIVE_VALUE_MAPPING.put(Boolean.class, Boolean.FALSE); + PRIMITIVE_VALUE_MAPPING.put(Byte.class, (byte) 0); + PRIMITIVE_VALUE_MAPPING.put(Short.class, (short) 0); + PRIMITIVE_VALUE_MAPPING.put(Integer.class, 0); + PRIMITIVE_VALUE_MAPPING.put(Long.class, 0L); + PRIMITIVE_VALUE_MAPPING.put(Float.class, 0f); + PRIMITIVE_VALUE_MAPPING.put(Double.class, 0d); + PRIMITIVE_VALUE_MAPPING.put(BigInteger.class, BigInteger.ZERO); + PRIMITIVE_VALUE_MAPPING.put(BigDecimal.class, BigDecimal.ZERO); + PRIMITIVE_VALUE_MAPPING.put(Date.class, new Date(0)); + PRIMITIVE_VALUE_MAPPING.put(String.class, ""); + } + + private Set maskedFields; + + @Override + public void configure(Map props) { + final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props); + maskedFields = new HashSet<>(config.getList(FIELDS_CONFIG)); + } + + @Override + public R apply(R record) { + if (operatingSchema(record) == null) { + return applySchemaless(record); + } else { + return applyWithSchema(record); + } + } + + private R applySchemaless(R record) { + final Map value = requireMap(operatingValue(record), PURPOSE); + final HashMap updatedValue = new HashMap<>(value); + for (String field : maskedFields) { + updatedValue.put(field, masked(value.get(field))); + } + return newRecord(record, updatedValue); + } + + private R applyWithSchema(R record) { + final Struct value = requireStruct(operatingValue(record), PURPOSE); + final Struct updatedValue = new Struct(value.schema()); + for (Field field : value.schema().fields()) { + final Object origFieldValue = value.get(field); + updatedValue.put(field, maskedFields.contains(field.name()) ? masked(origFieldValue) : origFieldValue); + } + return newRecord(record, updatedValue); + } + + private static Object masked(Object value) { + if (value == null) + return null; + Object maskedValue = PRIMITIVE_VALUE_MAPPING.get(value.getClass()); + if (maskedValue == null) { + if (value instanceof List) + maskedValue = Collections.emptyList(); + else if (value instanceof Map) + maskedValue = Collections.emptyMap(); + else + throw new DataException("Cannot mask value of type: " + value.getClass()); + } + return maskedValue; + } + + @Override + public ConfigDef config() { + return CONFIG_DEF; + } + + @Override + public void close() { + } + + protected abstract Schema operatingSchema(R record); + + protected abstract Object operatingValue(R record); + + protected abstract R newRecord(R base, Object value); + + public static final class Key> extends MaskField { + @Override + protected Schema operatingSchema(R record) { + return record.keySchema(); + } + + @Override + protected Object operatingValue(R record) { + return record.key(); + } + + @Override + protected R newRecord(R record, Object updatedValue) { + return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), updatedValue, record.valueSchema(), record.value(), record.timestamp()); + } + } + + public static final class Value> extends MaskField { + @Override + protected Schema operatingSchema(R record) { + return record.valueSchema(); + } + + @Override + protected Object operatingValue(R record) { + return record.value(); + } + + @Override + protected R newRecord(R record, Object updatedValue) { + return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), record.valueSchema(), updatedValue, record.timestamp()); + } + } + +} diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/RegexRouter.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/RegexRouter.java new file mode 100644 index 0000000000000..f16560e3c5748 --- /dev/null +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/RegexRouter.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.transforms; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.transforms.util.RegexValidator; +import org.apache.kafka.connect.transforms.util.SimpleConfig; + +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class RegexRouter> implements Transformation { + + public static final String OVERVIEW_DOC = "Update the record topic using the configured regular expression and replacement string." + + "

Under the hood, the regex is compiled to a java.util.regex.Pattern. " + + "If the pattern matches the input topic, java.util.regex.Matcher#replaceFirst() is used with the replacement string to obtain the new topic."; + + public static final ConfigDef CONFIG_DEF = new ConfigDef() + .define(ConfigName.REGEX, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, new RegexValidator(), ConfigDef.Importance.HIGH, + "Regular expression to use for matching.") + .define(ConfigName.REPLACEMENT, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.HIGH, + "Replacement string."); + + private interface ConfigName { + String REGEX = "regex"; + String REPLACEMENT = "replacement"; + } + + private Pattern regex; + private String replacement; + + @Override + public void configure(Map props) { + final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props); + regex = Pattern.compile(config.getString(ConfigName.REGEX)); + replacement = config.getString(ConfigName.REPLACEMENT); + } + + @Override + public R apply(R record) { + final Matcher matcher = regex.matcher(record.topic()); + if (matcher.matches()) { + final String topic = matcher.replaceFirst(replacement); + return record.newRecord(topic, record.kafkaPartition(), record.keySchema(), record.key(), record.valueSchema(), record.value(), record.timestamp()); + } + return record; + } + + @Override + public void close() { + } + + @Override + public ConfigDef config() { + return CONFIG_DEF; + } + +} diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java new file mode 100644 index 0000000000000..6faf84250af07 --- /dev/null +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java @@ -0,0 +1,230 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.transforms; + +import org.apache.kafka.common.cache.Cache; +import org.apache.kafka.common.cache.LRUCache; +import org.apache.kafka.common.cache.SynchronizedCache; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.transforms.util.SchemaUtil; +import org.apache.kafka.connect.transforms.util.SimpleConfig; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.kafka.connect.transforms.util.Requirements.requireMap; +import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct; + +public abstract class ReplaceField> implements Transformation { + + public static final String OVERVIEW_DOC = "Filter or rename fields."; + + interface ConfigName { + String BLACKLIST = "blacklist"; + String WHITELIST = "whitelist"; + String RENAME = "renames"; + } + + public static final ConfigDef CONFIG_DEF = new ConfigDef() + .define(ConfigName.BLACKLIST, ConfigDef.Type.LIST, Collections.emptyList(), ConfigDef.Importance.MEDIUM, + "Fields to exclude. This takes precedence over the whitelist.") + .define(ConfigName.WHITELIST, ConfigDef.Type.LIST, Collections.emptyList(), ConfigDef.Importance.MEDIUM, + "Fields to include. If specified, only these fields will be used.") + .define(ConfigName.RENAME, ConfigDef.Type.LIST, Collections.emptyList(), new ConfigDef.Validator() { + @Override + public void ensureValid(String name, Object value) { + parseRenameMappings((List) value); + } + + @Override + public String toString() { + return "list of colon-delimited pairs, e.g. foo:bar,abc:xyz"; + } + }, ConfigDef.Importance.MEDIUM, "Field rename mappings."); + + private static final String PURPOSE = "field replacement"; + + private List blacklist; + private List whitelist; + private Map renames; + private Map reverseRenames; + + private Cache schemaUpdateCache; + + @Override + public void configure(Map configs) { + final SimpleConfig config = new SimpleConfig(CONFIG_DEF, configs); + blacklist = config.getList(ConfigName.BLACKLIST); + whitelist = config.getList(ConfigName.WHITELIST); + renames = parseRenameMappings(config.getList(ConfigName.RENAME)); + reverseRenames = invert(renames); + + schemaUpdateCache = new SynchronizedCache<>(new LRUCache(16)); + } + + static Map parseRenameMappings(List mappings) { + final Map m = new HashMap<>(); + for (String mapping : mappings) { + final String[] parts = mapping.split(":"); + if (parts.length != 2) { + throw new ConfigException(ConfigName.RENAME, mappings, "Invalid rename mapping: " + mapping); + } + m.put(parts[0], parts[1]); + } + return m; + } + + static Map invert(Map source) { + final Map m = new HashMap<>(); + for (Map.Entry e : source.entrySet()) { + m.put(e.getValue(), e.getKey()); + } + return m; + } + + boolean filter(String fieldName) { + return !blacklist.contains(fieldName) && (whitelist.isEmpty() || whitelist.contains(fieldName)); + } + + String renamed(String fieldName) { + final String mapping = renames.get(fieldName); + return mapping == null ? fieldName : mapping; + } + + String reverseRenamed(String fieldName) { + final String mapping = reverseRenames.get(fieldName); + return mapping == null ? fieldName : mapping; + } + + @Override + public R apply(R record) { + if (operatingSchema(record) == null) { + return applySchemaless(record); + } else { + return applyWithSchema(record); + } + } + + private R applySchemaless(R record) { + final Map value = requireMap(operatingValue(record), PURPOSE); + + final Map updatedValue = new HashMap<>(value.size()); + + for (Map.Entry e : value.entrySet()) { + final String fieldName = e.getKey(); + if (filter(fieldName)) { + final Object fieldValue = e.getValue(); + updatedValue.put(renamed(fieldName), fieldValue); + } + } + + return newRecord(record, null, updatedValue); + } + + private R applyWithSchema(R record) { + final Struct value = requireStruct(operatingValue(record), PURPOSE); + + Schema updatedSchema = schemaUpdateCache.get(value.schema()); + if (updatedSchema == null) { + updatedSchema = makeUpdatedSchema(value.schema()); + schemaUpdateCache.put(value.schema(), updatedSchema); + } + + final Struct updatedValue = new Struct(updatedSchema); + + for (Field field : updatedSchema.fields()) { + final Object fieldValue = value.get(reverseRenamed(field.name())); + updatedValue.put(field.name(), fieldValue); + } + + return newRecord(record, updatedSchema, updatedValue); + } + + private Schema makeUpdatedSchema(Schema schema) { + final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct()); + for (Field field : schema.fields()) { + if (filter(field.name())) { + builder.field(renamed(field.name()), field.schema()); + } + } + return builder.build(); + } + + @Override + public ConfigDef config() { + return CONFIG_DEF; + } + + @Override + public void close() { + schemaUpdateCache = null; + } + + protected abstract Schema operatingSchema(R record); + + protected abstract Object operatingValue(R record); + + protected abstract R newRecord(R record, Schema updatedSchema, Object updatedValue); + + public static class Key> extends ReplaceField { + + @Override + protected Schema operatingSchema(R record) { + return record.keySchema(); + } + + @Override + protected Object operatingValue(R record) { + return record.key(); + } + + @Override + protected R newRecord(R record, Schema updatedSchema, Object updatedValue) { + return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, updatedValue, record.valueSchema(), record.value(), record.timestamp()); + } + + } + + public static class Value> extends ReplaceField { + + @Override + protected Schema operatingSchema(R record) { + return record.valueSchema(); + } + + @Override + protected Object operatingValue(R record) { + return record.value(); + } + + @Override + protected R newRecord(R record, Schema updatedSchema, Object updatedValue) { + return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, updatedValue, record.timestamp()); + } + + } + +} diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java new file mode 100644 index 0000000000000..f3076b445719b --- /dev/null +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.transforms; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.ConnectSchema; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.transforms.util.SimpleConfig; + +import java.util.Map; + +import static org.apache.kafka.connect.transforms.util.Requirements.requireSchema; + +public abstract class SetSchemaMetadata> implements Transformation { + + public static final String OVERVIEW_DOC = + "Set the schema name, version or both on the record's key (" + Key.class.getCanonicalName() + ")" + + " or value (" + Value.class.getCanonicalName() + ") schema."; + + private interface ConfigName { + String SCHEMA_NAME = "schema.name"; + String SCHEMA_VERSION = "schema.version"; + } + + public static final ConfigDef CONFIG_DEF = new ConfigDef() + .define(ConfigName.SCHEMA_NAME, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, "Schema name to set.") + .define(ConfigName.SCHEMA_VERSION, ConfigDef.Type.INT, null, ConfigDef.Importance.HIGH, "Schema version to set."); + + private String schemaName; + private Integer schemaVersion; + + @Override + public void configure(Map configs) { + final SimpleConfig config = new SimpleConfig(CONFIG_DEF, configs); + schemaName = config.getString(ConfigName.SCHEMA_NAME); + schemaVersion = config.getInt(ConfigName.SCHEMA_VERSION); + + if (schemaName == null && schemaVersion == null) { + throw new ConfigException("Neither schema name nor version configured"); + } + } + + @Override + public R apply(R record) { + final Schema schema = operatingSchema(record); + requireSchema(schema, "updating schema metadata"); + final boolean isArray = schema.type() == Schema.Type.ARRAY; + final boolean isMap = schema.type() == Schema.Type.MAP; + final Schema updatedSchema = new ConnectSchema( + schema.type(), + schema.isOptional(), + schema.defaultValue(), + schemaName != null ? schemaName : schema.name(), + schemaVersion != null ? schemaVersion : schema.version(), + schema.doc(), + schema.parameters(), + schema.fields(), + isMap ? schema.keySchema() : null, + isMap || isArray ? schema.valueSchema() : null + ); + return newRecord(record, updatedSchema); + } + + @Override + public ConfigDef config() { + return CONFIG_DEF; + } + + @Override + public void close() { + } + + protected abstract Schema operatingSchema(R record); + + protected abstract R newRecord(R record, Schema updatedSchema); + + /** + * Set the schema name, version or both on the record's key schema. + */ + public static class Key> extends SetSchemaMetadata { + @Override + protected Schema operatingSchema(R record) { + return record.keySchema(); + } + + @Override + protected R newRecord(R record, Schema updatedSchema) { + return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, record.key(), record.valueSchema(), record.value(), record.timestamp()); + } + } + + /** + * Set the schema name, version or both on the record's value schema. + */ + public static class Value> extends SetSchemaMetadata { + @Override + protected Schema operatingSchema(R record) { + return record.valueSchema(); + } + + @Override + protected R newRecord(R record, Schema updatedSchema) { + return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, record.value(), record.timestamp()); + } + } + +} \ No newline at end of file diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java index 1dd5345a6091a..f917a8d65f09a 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java @@ -27,25 +27,25 @@ import java.util.Map; import java.util.TimeZone; -/** - * This transformation facilitates updating the record's topic field as a function of the original topic value and the record timestamp. - *

- * It is mainly useful for sink connectors, since the topic field is often used to determine the equivalent entity name in the destination system - * (e.g. database table or search index name). - */ public class TimestampRouter> implements Transformation { - public interface Keys { + public static final String OVERVIEW_DOC = + "Update the record's topic field as a function of the original topic value and the record timestamp." + + "

" + + "This is mainly useful for sink connectors, since the topic field is often used to determine the equivalent entity name in the destination system" + + "(e.g. database table or search index name)."; + + public static final ConfigDef CONFIG_DEF = new ConfigDef() + .define(ConfigName.TOPIC_FORMAT, ConfigDef.Type.STRING, "${topic}-${timestamp}", ConfigDef.Importance.HIGH, + "Format string which can contain ${topic} and ${timestamp} as placeholders for the topic and timestamp, respectively.") + .define(ConfigName.TIMESTAMP_FORMAT, ConfigDef.Type.STRING, "yyyyMMdd", ConfigDef.Importance.HIGH, + "Format string for the timestamp that is compatible with java.text.SimpleDateFormat."); + + private interface ConfigName { String TOPIC_FORMAT = "topic.format"; String TIMESTAMP_FORMAT = "timestamp.format"; } - private static final ConfigDef CONFIG_DEF = new ConfigDef() - .define(Keys.TOPIC_FORMAT, ConfigDef.Type.STRING, "${topic}-${timestamp}", ConfigDef.Importance.HIGH, - "Format string which can contain ``${topic}`` and ``${timestamp}`` as placeholders for the topic and timestamp, respectively.") - .define(Keys.TIMESTAMP_FORMAT, ConfigDef.Type.STRING, "yyyyMMdd", ConfigDef.Importance.HIGH, - "Format string for the timestamp that is compatible with java.text.SimpleDateFormat."); - private String topicFormat; private ThreadLocal timestampFormat; @@ -53,9 +53,9 @@ public interface Keys { public void configure(Map props) { final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props); - topicFormat = config.getString(Keys.TOPIC_FORMAT); + topicFormat = config.getString(ConfigName.TOPIC_FORMAT); - final String timestampFormatStr = config.getString(Keys.TIMESTAMP_FORMAT); + final String timestampFormatStr = config.getString(ConfigName.TIMESTAMP_FORMAT); timestampFormat = new ThreadLocal() { @Override protected SimpleDateFormat initialValue() { diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java new file mode 100644 index 0000000000000..504da541f6574 --- /dev/null +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.transforms; + +import org.apache.kafka.common.cache.Cache; +import org.apache.kafka.common.cache.LRUCache; +import org.apache.kafka.common.cache.SynchronizedCache; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.transforms.util.NonEmptyListValidator; +import org.apache.kafka.connect.transforms.util.SimpleConfig; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.kafka.connect.transforms.util.Requirements.requireMap; +import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct; + +public class ValueToKey> implements Transformation { + + public static final String OVERVIEW_DOC = "Replace the record key with a new key formed from a subset of fields in the record value."; + + public static final String FIELDS_CONFIG = "fields"; + + public static final ConfigDef CONFIG_DEF = new ConfigDef() + .define(FIELDS_CONFIG, ConfigDef.Type.LIST, ConfigDef.NO_DEFAULT_VALUE, new NonEmptyListValidator(), ConfigDef.Importance.HIGH, + "Field names on the record value to extract as the record key."); + + private static final String PURPOSE = "copying fields from value to key"; + + private List fields; + + private Cache valueToKeySchemaCache; + + @Override + public void configure(Map configs) { + final SimpleConfig config = new SimpleConfig(CONFIG_DEF, configs); + fields = config.getList(FIELDS_CONFIG); + valueToKeySchemaCache = new SynchronizedCache<>(new LRUCache(16)); + } + + @Override + public R apply(R record) { + if (record.valueSchema() == null) { + return applySchemaless(record); + } else { + return applyWithSchema(record); + } + } + + private R applySchemaless(R record) { + final Map value = requireMap(record.value(), PURPOSE); + final Map key = new HashMap<>(fields.size()); + for (String field : fields) { + key.put(field, value.get(field)); + } + return record.newRecord(record.topic(), record.kafkaPartition(), null, key, record.valueSchema(), record.value(), record.timestamp()); + } + + private R applyWithSchema(R record) { + final Struct value = requireStruct(record.value(), PURPOSE); + + Schema keySchema = valueToKeySchemaCache.get(value.schema()); + if (keySchema == null) { + final SchemaBuilder keySchemaBuilder = SchemaBuilder.struct(); + for (String field : fields) { + final Schema fieldSchema = value.schema().field(field).schema(); + keySchemaBuilder.field(field, fieldSchema); + } + keySchema = keySchemaBuilder.build(); + valueToKeySchemaCache.put(value.schema(), keySchema); + } + + final Struct key = new Struct(keySchema); + for (String field : fields) { + key.put(field, value.get(field)); + } + + return record.newRecord(record.topic(), record.kafkaPartition(), keySchema, key, value.schema(), value, record.timestamp()); + } + + @Override + public ConfigDef config() { + return CONFIG_DEF; + } + + @Override + public void close() { + valueToKeySchemaCache = null; + } + +} diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/NonEmptyListValidator.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/NonEmptyListValidator.java new file mode 100644 index 0000000000000..1abbbc8aaf081 --- /dev/null +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/NonEmptyListValidator.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.transforms.util; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; + +import java.util.List; + +public class NonEmptyListValidator implements ConfigDef.Validator { + + @Override + public void ensureValid(String name, Object value) { + if (((List) value).isEmpty()) { + throw new ConfigException(name, value, "Empty list"); + } + } + + @Override + public String toString() { + return "non-empty list"; + } + +} diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/RegexValidator.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/RegexValidator.java new file mode 100644 index 0000000000000..9713b27446f47 --- /dev/null +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/RegexValidator.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.transforms.util; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; + +import java.util.regex.Pattern; + +public class RegexValidator implements ConfigDef.Validator { + + @Override + public void ensureValid(String name, Object value) { + try { + Pattern.compile((String) value); + } catch (Exception e) { + throw new ConfigException(name, value, "Invalid regex: " + e.getMessage()); + } + } + + @Override + public String toString() { + return "valid regex"; + } + +} diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/Requirements.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/Requirements.java new file mode 100644 index 0000000000000..b004f8ae91500 --- /dev/null +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/Requirements.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.transforms.util; + +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.sink.SinkRecord; + +import java.util.Map; + +public class Requirements { + + public static void requireSchema(Schema schema, String purpose) { + if (schema == null) { + throw new DataException("Schema required for [" + purpose + "]"); + } + } + + public static Map requireMap(Object value, String purpose) { + if (!(value instanceof Map)) { + throw new DataException("Only Map objects supported in absence of schema for [" + purpose + "], found: " + nullSafeClassName(value)); + } + return (Map) value; + } + + public static Struct requireStruct(Object value, String purpose) { + if (!(value instanceof Struct)) { + throw new DataException("Only Struct objects supported for [" + purpose + "], found: " + nullSafeClassName(value)); + } + return (Struct) value; + } + + public static SinkRecord requireSinkRecord(ConnectRecord record, String purpose) { + if (!(record instanceof SinkRecord)) { + throw new DataException("Only SinkRecord supported for [" + purpose + "], found: " + nullSafeClassName(record)); + } + return (SinkRecord) record; + } + + private static String nullSafeClassName(Object x) { + return x == null ? "null" : x.getClass().getCanonicalName(); + } + +} diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/SchemaUtil.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/SchemaUtil.java new file mode 100644 index 0000000000000..da261e79b22de --- /dev/null +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/SchemaUtil.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.transforms.util; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; + +import java.util.Map; + +public class SchemaUtil { + + public static SchemaBuilder copySchemaBasics(Schema source, SchemaBuilder builder) { + builder.name(source.name()); + builder.version(source.version()); + builder.doc(source.doc()); + + final Map params = source.parameters(); + if (params != null) { + builder.parameters(params); + } + + return builder; + } + +} diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java new file mode 100644 index 0000000000000..d72179559a50a --- /dev/null +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.transforms; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.Test; + +import java.util.Collections; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class ExtractFieldTest { + + @Test + public void schemaless() { + final ExtractField xform = new ExtractField.Key<>(); + xform.configure(Collections.singletonMap("field", "magic")); + + final SinkRecord record = new SinkRecord("test", 0, null, Collections.singletonMap("magic", 42), null, null, 0); + final SinkRecord transformedRecord = xform.apply(record); + + assertNull(transformedRecord.keySchema()); + assertEquals(42, transformedRecord.key()); + } + + @Test + public void withSchema() { + final ExtractField xform = new ExtractField.Key<>(); + xform.configure(Collections.singletonMap("field", "magic")); + + final Schema keySchema = SchemaBuilder.struct().field("magic", Schema.INT32_SCHEMA).build(); + final Struct key = new Struct(keySchema).put("magic", 42); + final SinkRecord record = new SinkRecord("test", 0, keySchema, key, null, null, 0); + final SinkRecord transformedRecord = xform.apply(record); + + assertEquals(Schema.INT32_SCHEMA, transformedRecord.keySchema()); + assertEquals(42, transformedRecord.key()); + } + +} diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistToStructTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistFieldTest.java similarity index 71% rename from connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistToStructTest.java rename to connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistFieldTest.java index 99a6e99dde000..b5f9d93d935e4 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistToStructTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistFieldTest.java @@ -25,12 +25,25 @@ import java.util.Collections; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; -public class HoistToStructTest { +public class HoistFieldTest { @Test - public void sanityCheck() { - final HoistToStruct xform = new HoistToStruct.Key<>(); + public void schemaless() { + final HoistField xform = new HoistField.Key<>(); + xform.configure(Collections.singletonMap("field", "magic")); + + final SinkRecord record = new SinkRecord("test", 0, null, 42, null, null, 0); + final SinkRecord transformedRecord = xform.apply(record); + + assertNull(transformedRecord.keySchema()); + assertEquals(Collections.singletonMap("magic", 42), transformedRecord.key()); + } + + @Test + public void withSchema() { + final HoistField xform = new HoistField.Key<>(); xform.configure(Collections.singletonMap("field", "magic")); final SinkRecord record = new SinkRecord("test", 0, Schema.INT32_SCHEMA, 42, null, null, 0); diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/MaskFieldTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/MaskFieldTest.java new file mode 100644 index 0000000000000..c96058afbe6de --- /dev/null +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/MaskFieldTest.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.transforms; + +import org.apache.kafka.connect.data.Decimal; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.data.Time; +import org.apache.kafka.connect.data.Timestamp; +import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.Test; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class MaskFieldTest { + + private static MaskField transform(List fields) { + final MaskField xform = new MaskField.Value<>(); + xform.configure(Collections.singletonMap("fields", fields)); + return xform; + } + + private static SinkRecord record(Schema schema, Object value) { + return new SinkRecord("", 0, null, null, schema, value, 0); + } + + @Test + public void schemaless() { + final Map value = new HashMap<>(); + value.put("magic", 42); + value.put("bool", true); + value.put("byte", (byte) 42); + value.put("short", (short) 42); + value.put("int", 42); + value.put("long", 42L); + value.put("float", 42f); + value.put("double", 42d); + value.put("string", "blabla"); + value.put("date", new Date()); + value.put("bigint", new BigInteger("42")); + value.put("bigdec", new BigDecimal("42.0")); + value.put("list", Collections.singletonList(42)); + value.put("map", Collections.singletonMap("key", "value")); + + final List maskFields = new ArrayList<>(value.keySet()); + maskFields.remove("magic"); + + final Map updatedValue = (Map) transform(maskFields).apply(record(null, value)).value(); + + assertEquals(42, updatedValue.get("magic")); + assertEquals(false, updatedValue.get("bool")); + assertEquals((byte) 0, updatedValue.get("byte")); + assertEquals((short) 0, updatedValue.get("short")); + assertEquals(0, updatedValue.get("int")); + assertEquals(0L, updatedValue.get("long")); + assertEquals(0f, updatedValue.get("float")); + assertEquals(0d, updatedValue.get("double")); + assertEquals("", updatedValue.get("string")); + assertEquals(new Date(0), updatedValue.get("date")); + assertEquals(BigInteger.ZERO, updatedValue.get("bigint")); + assertEquals(BigDecimal.ZERO, updatedValue.get("bigdec")); + assertEquals(Collections.emptyList(), updatedValue.get("list")); + assertEquals(Collections.emptyMap(), updatedValue.get("map")); + } + + @Test + public void withSchema() { + Schema schema = SchemaBuilder.struct() + .field("magic", Schema.INT32_SCHEMA) + .field("bool", Schema.BOOLEAN_SCHEMA) + .field("byte", Schema.INT8_SCHEMA) + .field("short", Schema.INT16_SCHEMA) + .field("int", Schema.INT32_SCHEMA) + .field("long", Schema.INT64_SCHEMA) + .field("float", Schema.FLOAT32_SCHEMA) + .field("double", Schema.FLOAT64_SCHEMA) + .field("string", Schema.STRING_SCHEMA) + .field("date", org.apache.kafka.connect.data.Date.SCHEMA) + .field("time", Time.SCHEMA) + .field("timestamp", Timestamp.SCHEMA) + .field("decimal", Decimal.schema(0)) + .field("array", SchemaBuilder.array(Schema.INT32_SCHEMA)) + .field("map", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA)) + .build(); + + final Struct value = new Struct(schema); + value.put("magic", 42); + value.put("bool", true); + value.put("byte", (byte) 42); + value.put("short", (short) 42); + value.put("int", 42); + value.put("long", 42L); + value.put("float", 42f); + value.put("double", 42d); + value.put("string", "hmm"); + value.put("date", new Date()); + value.put("time", new Date()); + value.put("timestamp", new Date()); + value.put("decimal", new BigDecimal(42)); + value.put("array", Arrays.asList(1, 2, 3)); + value.put("map", Collections.singletonMap("what", "what")); + + final List maskFields = new ArrayList<>(schema.fields().size()); + for (Field field: schema.fields()) { + if (!field.name().equals("magic")) { + maskFields.add(field.name()); + } + } + + final Struct updatedValue = (Struct) transform(maskFields).apply(record(schema, value)).value(); + + assertEquals(42, updatedValue.get("magic")); + assertEquals(false, updatedValue.get("bool")); + assertEquals((byte) 0, updatedValue.get("byte")); + assertEquals((short) 0, updatedValue.get("short")); + assertEquals(0, updatedValue.get("int")); + assertEquals(0L, updatedValue.get("long")); + assertEquals(0f, updatedValue.get("float")); + assertEquals(0d, updatedValue.get("double")); + assertEquals("", updatedValue.get("string")); + assertEquals(new Date(0), updatedValue.get("date")); + assertEquals(new Date(0), updatedValue.get("time")); + assertEquals(new Date(0), updatedValue.get("timestamp")); + assertEquals(BigDecimal.ZERO, updatedValue.get("decimal")); + assertEquals(Collections.emptyList(), updatedValue.get("array")); + assertEquals(Collections.emptyMap(), updatedValue.get("map")); + } + +} diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/RegexRouterTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/RegexRouterTest.java new file mode 100644 index 0000000000000..c599265d57200 --- /dev/null +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/RegexRouterTest.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.transforms; + +import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class RegexRouterTest { + + private static String apply(String regex, String replacement, String topic) { + final Map props = new HashMap<>(); + props.put("regex", regex); + props.put("replacement", replacement); + final RegexRouter router = new RegexRouter<>(); + router.configure(props); + return router.apply(new SinkRecord(topic, 0, null, null, null, null, 0)) + .topic(); + } + + @Test + public void staticReplacement() { + assertEquals("bar", apply("foo", "bar", "foo")); + } + + @Test + public void doesntMatch() { + assertEquals("orig", apply("foo", "bar", "orig")); + } + + @Test + public void identity() { + assertEquals("orig", apply("(.*)", "$1", "orig")); + } + + @Test + public void addPrefix() { + assertEquals("prefix-orig", apply("(.*)", "prefix-$1", "orig")); + } + + @Test + public void addSuffix() { + assertEquals("orig-suffix", apply("(.*)", "$1-suffix", "orig")); + } + + @Test + public void slice() { + assertEquals("index", apply("(.*)-(\\d\\d\\d\\d\\d\\d\\d\\d)", "$1", "index-20160117")); + } + +} diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java new file mode 100644 index 0000000000000..9f9d4b7031b52 --- /dev/null +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.transforms; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class ReplaceFieldTest { + + @Test + public void schemaless() { + final ReplaceField xform = new ReplaceField.Value<>(); + + final Map props = new HashMap<>(); + props.put("blacklist", "dont"); + props.put("renames", "abc:xyz,foo:bar"); + + xform.configure(props); + + final Map value = new HashMap<>(); + value.put("dont", "whatever"); + value.put("abc", 42); + value.put("foo", true); + value.put("etc", "etc"); + + final SinkRecord record = new SinkRecord("test", 0, null, null, null, value, 0); + final SinkRecord transformedRecord = xform.apply(record); + + final Map updatedValue = (Map) transformedRecord.value(); + assertEquals(3, updatedValue.size()); + assertEquals(42, updatedValue.get("xyz")); + assertEquals(true, updatedValue.get("bar")); + assertEquals("etc", updatedValue.get("etc")); + } + + @Test + public void withSchema() { + final ReplaceField xform = new ReplaceField.Value<>(); + + final Map props = new HashMap<>(); + props.put("whitelist", "abc,foo"); + props.put("renames", "abc:xyz,foo:bar"); + + xform.configure(props); + + final Schema schema = SchemaBuilder.struct() + .field("dont", Schema.STRING_SCHEMA) + .field("abc", Schema.INT32_SCHEMA) + .field("foo", Schema.BOOLEAN_SCHEMA) + .field("etc", Schema.STRING_SCHEMA) + .build(); + + final Struct value = new Struct(schema); + value.put("dont", "whatever"); + value.put("abc", 42); + value.put("foo", true); + value.put("etc", "etc"); + + final SinkRecord record = new SinkRecord("test", 0, null, null, schema, value, 0); + final SinkRecord transformedRecord = xform.apply(record); + + final Struct updatedValue = (Struct) transformedRecord.value(); + + assertEquals(2, updatedValue.schema().fields().size()); + assertEquals(new Integer(42), updatedValue.getInt32("xyz")); + assertEquals(true, updatedValue.getBoolean("bar")); + } + +} diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java new file mode 100644 index 0000000000000..2aa790f0a2d82 --- /dev/null +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.transforms; + +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class SetSchemaMetadataTest { + + @Test + public void schemaNameUpdate() { + final SetSchemaMetadata xform = new SetSchemaMetadata.Value<>(); + xform.configure(Collections.singletonMap("schema.name", "foo")); + final SinkRecord record = new SinkRecord("", 0, null, null, SchemaBuilder.struct().build(), null, 0); + final SinkRecord updatedRecord = xform.apply(record); + assertEquals("foo", updatedRecord.valueSchema().name()); + } + + @Test + public void schemaVersionUpdate() { + final SetSchemaMetadata xform = new SetSchemaMetadata.Value<>(); + xform.configure(Collections.singletonMap("schema.version", 42)); + final SinkRecord record = new SinkRecord("", 0, null, null, SchemaBuilder.struct().build(), null, 0); + final SinkRecord updatedRecord = xform.apply(record); + assertEquals(new Integer(42), updatedRecord.valueSchema().version()); + } + + @Test + public void schemaNameAndVersionUpdate() { + final Map props = new HashMap<>(); + props.put("schema.name", "foo"); + props.put("schema.version", "42"); + + final SetSchemaMetadata xform = new SetSchemaMetadata.Value<>(); + xform.configure(props); + + final SinkRecord record = new SinkRecord("", 0, null, null, SchemaBuilder.struct().build(), null, 0); + + final SinkRecord updatedRecord = xform.apply(record); + + assertEquals("foo", updatedRecord.valueSchema().name()); + assertEquals(new Integer(42), updatedRecord.valueSchema().version()); + } + +} diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java new file mode 100644 index 0000000000000..e5328d36baebb --- /dev/null +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.transforms; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.Test; + +import java.util.Collections; +import java.util.HashMap; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class ValueToKeyTest { + + @Test + public void schemaless() { + final ValueToKey xform = new ValueToKey<>(); + xform.configure(Collections.singletonMap("fields", "a,b")); + + final HashMap value = new HashMap<>(); + value.put("a", 1); + value.put("b", 2); + value.put("c", 3); + + final SinkRecord record = new SinkRecord("", 0, null, null, null, value, 0); + final SinkRecord transformedRecord = xform.apply(record); + + final HashMap expectedKey = new HashMap<>(); + expectedKey.put("a", 1); + expectedKey.put("b", 2); + + assertNull(transformedRecord.keySchema()); + assertEquals(expectedKey, transformedRecord.key()); + } + + @Test + public void withSchema() { + final ValueToKey xform = new ValueToKey<>(); + xform.configure(Collections.singletonMap("fields", "a,b")); + + final Schema valueSchema = SchemaBuilder.struct() + .field("a", Schema.INT32_SCHEMA) + .field("b", Schema.INT32_SCHEMA) + .field("c", Schema.INT32_SCHEMA) + .build(); + + final Struct value = new Struct(valueSchema); + value.put("a", 1); + value.put("b", 2); + value.put("c", 3); + + final SinkRecord record = new SinkRecord("", 0, null, null, valueSchema, value, 0); + final SinkRecord transformedRecord = xform.apply(record); + + final Schema expectedKeySchema = SchemaBuilder.struct() + .field("a", Schema.INT32_SCHEMA) + .field("b", Schema.INT32_SCHEMA) + .build(); + + final Struct expectedKey = new Struct(expectedKeySchema) + .put("a", 1) + .put("b", 2); + + assertEquals(expectedKeySchema, transformedRecord.keySchema()); + assertEquals(expectedKey, transformedRecord.key()); + } + +} diff --git a/docs/connect.html b/docs/connect.html index 23e168cae0309..1af5ed95f6051 100644 --- a/docs/connect.html +++ b/docs/connect.html @@ -100,6 +100,22 @@

Configuring Connecto For any other options, you should consult the documentation for the connector. +

Transformations

+ + Connectors can be configured with transformations to make lightweight message-at-a-time modifications. They can be convenient for minor data massaging and routing changes. + + A transformation chain can be specified in the connector configuration. + + + + Several widely-applicable data and routing transformations are included with Kafka Connect: + + +

REST API

Since Kafka Connect is intended to be run as a service, it also provides a REST API for managing connectors. By default, this service runs on port 8083. The following are the currently supported endpoints: diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py index 198e94501f50d..c298fb154e86e 100644 --- a/tests/kafkatest/tests/connect/connect_distributed_test.py +++ b/tests/kafkatest/tests/connect/connect_distributed_test.py @@ -460,7 +460,7 @@ def test_transformations(self): 'file': self.INPUT_FILE, 'topic': self.TOPIC, 'transforms': 'hoistToStruct,insertTimestampField', - 'transforms.hoistToStruct.type': 'org.apache.kafka.connect.transforms.HoistToStruct$Value', + 'transforms.hoistToStruct.type': 'org.apache.kafka.connect.transforms.HoistField$Value', 'transforms.hoistToStruct.field': 'content', 'transforms.insertTimestampField.type': 'org.apache.kafka.connect.transforms.InsertField$Value', 'transforms.insertTimestampField.timestamp.field': ts_fieldname,