From f34cc71c9931ea7ec5dd045512c623196928a2a3 Mon Sep 17 00:00:00 2001 From: Shikhar Bhushan Date: Fri, 13 Jan 2017 12:00:31 -0800 Subject: [PATCH 01/23] SetSchemaMetadata SMT --- .../connect/transforms/SetSchemaMetadata.java | 117 ++++++++++++++++++ .../transforms/SetSchemaMetadataTest.java | 67 ++++++++++ 2 files changed, 184 insertions(+) create mode 100644 connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java create mode 100644 connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java new file mode 100644 index 0000000000000..0dc627a8836d3 --- /dev/null +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.transforms; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.ConnectSchema; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.transforms.util.SimpleConfig; + +import java.util.Map; + +public abstract class SetSchemaMetadata> implements Transformation { + + public interface ConfigName { + String SCHEMA_NAME = "schema.name"; + String SCHEMA_VERSION = "schema.version"; + } + + private static final ConfigDef CONFIG_DEF = new ConfigDef() + .define(ConfigName.SCHEMA_NAME, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, "Schema name to set.") + .define(ConfigName.SCHEMA_VERSION, ConfigDef.Type.INT, null, ConfigDef.Importance.HIGH, "Schema version to set."); + + private String schemaName; + private Integer schemaVersion; + + @Override + public void configure(Map configs) { + final SimpleConfig config = new SimpleConfig(CONFIG_DEF, configs); + schemaName = config.getString(ConfigName.SCHEMA_NAME); + schemaVersion = config.getInt(ConfigName.SCHEMA_VERSION); + } + + @Override + public R apply(R record) { + if (schemaName == null && schemaVersion == null) return record; // no-op + final Schema schema = operatingSchema(record); + if (schema == null) { + throw new DataException("Cannot update metadata on null Schema"); + } + final boolean isArray = schema.type() == Schema.Type.ARRAY; + final boolean isMap = schema.type() == Schema.Type.MAP; + final Schema updatedSchema = new ConnectSchema( + schema.type(), + schema.isOptional(), + schema.defaultValue(), + schemaName != null ? schemaName : schema.name(), + schemaVersion != null ? schemaVersion : schema.version(), + schema.doc(), + schema.parameters(), + schema.fields(), + isMap ? schema.keySchema() : null, + isMap || isArray ? schema.valueSchema() : null + ); + return updatedRecord(record, updatedSchema); + } + + @Override + public ConfigDef config() { + return CONFIG_DEF; + } + + @Override + public void close() { + } + + protected abstract Schema operatingSchema(R record); + + protected abstract R updatedRecord(R record, Schema updatedSchema); + + /** + * Set the schema name, version or both on the record's key schema. + */ + public static class Key> extends SetSchemaMetadata { + @Override + protected Schema operatingSchema(R record) { + return record.keySchema(); + } + + @Override + protected R updatedRecord(R record, Schema updatedSchema) { + return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, record.key(), record.valueSchema(), record.value(), record.timestamp()); + } + } + + /** + * Set the schema name, version or both on the record's value schema. + */ + public static class Value> extends SetSchemaMetadata { + @Override + protected Schema operatingSchema(R record) { + return record.valueSchema(); + } + + @Override + protected R updatedRecord(R record, Schema updatedSchema) { + return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, record.value(), record.timestamp()); + } + } + +} \ No newline at end of file diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java new file mode 100644 index 0000000000000..2aa790f0a2d82 --- /dev/null +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/SetSchemaMetadataTest.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.transforms; + +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class SetSchemaMetadataTest { + + @Test + public void schemaNameUpdate() { + final SetSchemaMetadata xform = new SetSchemaMetadata.Value<>(); + xform.configure(Collections.singletonMap("schema.name", "foo")); + final SinkRecord record = new SinkRecord("", 0, null, null, SchemaBuilder.struct().build(), null, 0); + final SinkRecord updatedRecord = xform.apply(record); + assertEquals("foo", updatedRecord.valueSchema().name()); + } + + @Test + public void schemaVersionUpdate() { + final SetSchemaMetadata xform = new SetSchemaMetadata.Value<>(); + xform.configure(Collections.singletonMap("schema.version", 42)); + final SinkRecord record = new SinkRecord("", 0, null, null, SchemaBuilder.struct().build(), null, 0); + final SinkRecord updatedRecord = xform.apply(record); + assertEquals(new Integer(42), updatedRecord.valueSchema().version()); + } + + @Test + public void schemaNameAndVersionUpdate() { + final Map props = new HashMap<>(); + props.put("schema.name", "foo"); + props.put("schema.version", "42"); + + final SetSchemaMetadata xform = new SetSchemaMetadata.Value<>(); + xform.configure(props); + + final SinkRecord record = new SinkRecord("", 0, null, null, SchemaBuilder.struct().build(), null, 0); + + final SinkRecord updatedRecord = xform.apply(record); + + assertEquals("foo", updatedRecord.valueSchema().name()); + assertEquals(new Integer(42), updatedRecord.valueSchema().version()); + } + +} From 09af66b3f1861bf2a0718aaf79ca1a3bd22adcec Mon Sep 17 00:00:00 2001 From: Shikhar Bhushan Date: Fri, 13 Jan 2017 13:44:57 -0800 Subject: [PATCH 02/23] Support schemaless and rename HoistToStruct->Hoist; add the inverse Extract transform --- .../kafka/connect/transforms/Extract.java | 111 ++++++++++++++++++ .../{HoistToStruct.java => Hoist.java} | 35 +++--- .../kafka/connect/transforms/ExtractTest.java | 59 ++++++++++ ...{HoistToStructTest.java => HoistTest.java} | 19 ++- .../tests/connect/connect_distributed_test.py | 2 +- 5 files changed, 205 insertions(+), 21 deletions(-) create mode 100644 connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Extract.java rename connect/transforms/src/main/java/org/apache/kafka/connect/transforms/{HoistToStruct.java => Hoist.java} (78%) create mode 100644 connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractTest.java rename connect/transforms/src/test/java/org/apache/kafka/connect/transforms/{HoistToStructTest.java => HoistTest.java} (72%) diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Extract.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Extract.java new file mode 100644 index 0000000000000..ba8fd7b735d32 --- /dev/null +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Extract.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.transforms; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.transforms.util.SimpleConfig; + +import java.util.Map; + +public abstract class Extract> implements Transformation { + + public static final String FIELD_CONFIG = "field"; + + private static final ConfigDef CONFIG_DEF = new ConfigDef() + .define(FIELD_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM, "Field name to extract."); + + private String fieldName; + + @Override + public void configure(Map props) { + final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props); + fieldName = config.getString(FIELD_CONFIG); + } + + @Override + public R apply(R record) { + final Schema schema = operatingSchema(record); + final Object value = operatingValue(record); + + if (schema == null) { + if (!(value instanceof Map)) { + throw new DataException("Only Map supported in schemaless mode"); + } + return newRecord(record, null, ((Map) value).get(fieldName)); + } else { + if (schema.type() != Schema.Type.STRUCT) { + throw new DataException("Only STRUCT schema type supported, was: " + schema.type()); + } + return newRecord(record, schema.field(fieldName).schema(), ((Struct) value).get(fieldName)); + } + } + + @Override + public void close() { + } + + @Override + public ConfigDef config() { + return CONFIG_DEF; + } + + protected abstract Schema operatingSchema(R record); + + protected abstract Object operatingValue(R record); + + protected abstract R newRecord(R record, Schema updatedSchema, Object updatedValue); + + public static class Key> extends Extract { + @Override + protected Schema operatingSchema(R record) { + return record.keySchema(); + } + + @Override + protected Object operatingValue(R record) { + return record.key(); + } + + @Override + protected R newRecord(R record, Schema updatedSchema, Object updatedValue) { + return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, updatedValue, record.valueSchema(), record.value(), record.timestamp()); + } + } + + public static class Value> extends Extract { + @Override + protected Schema operatingSchema(R record) { + return record.valueSchema(); + } + + @Override + protected Object operatingValue(R record) { + return record.value(); + } + + @Override + protected R newRecord(R record, Schema updatedSchema, Object updatedValue) { + return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, updatedValue, record.timestamp()); + } + } + +} diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HoistToStruct.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Hoist.java similarity index 78% rename from connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HoistToStruct.java rename to connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Hoist.java index c2726ca84378b..294e6c4e0a37e 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HoistToStruct.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Hoist.java @@ -27,15 +27,16 @@ import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.transforms.util.SimpleConfig; +import java.util.Collections; import java.util.Map; -public abstract class HoistToStruct> implements Transformation { +public abstract class Hoist> implements Transformation { public static final String FIELD_CONFIG = "field"; private static final ConfigDef CONFIG_DEF = new ConfigDef() .define(FIELD_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM, - "Field name for the single field that will be created in the resulting Struct."); + "Field name for the single field that will be created in the resulting Struct or Map."); private Cache schemaUpdateCache; @@ -53,15 +54,19 @@ public R apply(R record) { final Schema schema = operatingSchema(record); final Object value = operatingValue(record); - Schema updatedSchema = schemaUpdateCache.get(schema); - if (updatedSchema == null) { - updatedSchema = SchemaBuilder.struct().field(fieldName, schema).build(); - schemaUpdateCache.put(schema, updatedSchema); - } + if (schema == null) { + return newRecord(record, null, Collections.singletonMap(fieldName, value)); + } else { + Schema updatedSchema = schemaUpdateCache.get(schema); + if (updatedSchema == null) { + updatedSchema = SchemaBuilder.struct().field(fieldName, schema).build(); + schemaUpdateCache.put(schema, updatedSchema); + } - final Struct updatedValue = new Struct(updatedSchema).put(fieldName, value); + final Struct updatedValue = new Struct(updatedSchema).put(fieldName, value); - return newRecord(record, updatedSchema, updatedValue); + return newRecord(record, updatedSchema, updatedValue); + } } @Override @@ -81,10 +86,9 @@ public ConfigDef config() { protected abstract R newRecord(R record, Schema updatedSchema, Object updatedValue); /** - * Wraps the record key in a {@link org.apache.kafka.connect.data.Struct} with specified field name. + * Wraps the record key in a {@link Struct} when schema present, or a {@link Map} in schemaless mode, with the specified field name. */ - public static class Key> extends HoistToStruct { - + public static class Key> extends Hoist { @Override protected Schema operatingSchema(R record) { return record.keySchema(); @@ -99,14 +103,12 @@ protected Object operatingValue(R record) { protected R newRecord(R record, Schema updatedSchema, Object updatedValue) { return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, updatedValue, record.valueSchema(), record.value(), record.timestamp()); } - } /** - * Wraps the record value in a {@link org.apache.kafka.connect.data.Struct} with specified field name. + * Wraps the record value in a {@link Struct} when schema present, or a {@link Map} in schemaless mode, with the specified field name. */ - public static class Value> extends HoistToStruct { - + public static class Value> extends Hoist { @Override protected Schema operatingSchema(R record) { return record.valueSchema(); @@ -121,7 +123,6 @@ protected Object operatingValue(R record) { protected R newRecord(R record, Schema updatedSchema, Object updatedValue) { return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, updatedValue, record.timestamp()); } - } } diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractTest.java new file mode 100644 index 0000000000000..0333838291f14 --- /dev/null +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractTest.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.transforms; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.Test; + +import java.util.Collections; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class ExtractTest { + + @Test + public void schemaless() { + final Extract xform = new Extract.Key<>(); + xform.configure(Collections.singletonMap("field", "magic")); + + final SinkRecord record = new SinkRecord("test", 0, null, Collections.singletonMap("magic", 42), null, null, 0); + final SinkRecord transformedRecord = xform.apply(record); + + assertNull(transformedRecord.keySchema()); + assertEquals(42, transformedRecord.key()); + } + + @Test + public void withSchema() { + final Extract xform = new Extract.Key<>(); + xform.configure(Collections.singletonMap("field", "magic")); + + final Schema keySchema = SchemaBuilder.struct().field("magic", Schema.INT32_SCHEMA).build(); + final Struct key = new Struct(keySchema).put("magic", 42); + final SinkRecord record = new SinkRecord("test", 0, keySchema, key, null, null, 0); + final SinkRecord transformedRecord = xform.apply(record); + + assertEquals(Schema.INT32_SCHEMA, transformedRecord.keySchema()); + assertEquals(42, transformedRecord.key()); + } + +} diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistToStructTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistTest.java similarity index 72% rename from connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistToStructTest.java rename to connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistTest.java index 99a6e99dde000..70cc454a5cdeb 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistToStructTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistTest.java @@ -25,12 +25,25 @@ import java.util.Collections; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; -public class HoistToStructTest { +public class HoistTest { @Test - public void sanityCheck() { - final HoistToStruct xform = new HoistToStruct.Key<>(); + public void schemaless() { + final Hoist xform = new Hoist.Key<>(); + xform.configure(Collections.singletonMap("field", "magic")); + + final SinkRecord record = new SinkRecord("test", 0, null, 42, null, null, 0); + final SinkRecord transformedRecord = xform.apply(record); + + assertNull(transformedRecord.keySchema()); + assertEquals(Collections.singletonMap("magic", 42), transformedRecord.key()); + } + + @Test + public void withSchema() { + final Hoist xform = new Hoist.Key<>(); xform.configure(Collections.singletonMap("field", "magic")); final SinkRecord record = new SinkRecord("test", 0, Schema.INT32_SCHEMA, 42, null, null, 0); diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py index 198e94501f50d..021f1807e7779 100644 --- a/tests/kafkatest/tests/connect/connect_distributed_test.py +++ b/tests/kafkatest/tests/connect/connect_distributed_test.py @@ -460,7 +460,7 @@ def test_transformations(self): 'file': self.INPUT_FILE, 'topic': self.TOPIC, 'transforms': 'hoistToStruct,insertTimestampField', - 'transforms.hoistToStruct.type': 'org.apache.kafka.connect.transforms.HoistToStruct$Value', + 'transforms.hoistToStruct.type': 'org.apache.kafka.connect.transforms.Hoist$Value', 'transforms.hoistToStruct.field': 'content', 'transforms.insertTimestampField.type': 'org.apache.kafka.connect.transforms.InsertField$Value', 'transforms.insertTimestampField.timestamp.field': ts_fieldname, From 022f4920c5f09d068bbf49e47091a1333dc48be2 Mon Sep 17 00:00:00 2001 From: Shikhar Bhushan Date: Fri, 13 Jan 2017 13:51:43 -0800 Subject: [PATCH 03/23] InsertField transform -- fix bad name for interface containing config name constants --- .../kafka/connect/transforms/InsertField.java | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java index d67fea0893237..0ec6cd5e7b0e7 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java @@ -37,7 +37,7 @@ public abstract class InsertField> implements Transformation { - public interface Keys { + public interface ConfigName { String TOPIC_FIELD = "topic.field"; String PARTITION_FIELD = "partition.field"; String OFFSET_FIELD = "offset.field"; @@ -49,17 +49,17 @@ public interface Keys { private static final String OPTIONALITY_DOC = "Suffix with '!' to make this a required field, or '?' to keep it optional (the default)."; private static final ConfigDef CONFIG_DEF = new ConfigDef() - .define(Keys.TOPIC_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, + .define(ConfigName.TOPIC_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "Field name for Kafka topic.\n" + OPTIONALITY_DOC) - .define(Keys.PARTITION_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, + .define(ConfigName.PARTITION_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "Field name for Kafka partition.\n" + OPTIONALITY_DOC) - .define(Keys.OFFSET_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, + .define(ConfigName.OFFSET_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "Field name for Kafka offset - only applicable to sink connectors.\n" + OPTIONALITY_DOC) - .define(Keys.TIMESTAMP_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, + .define(ConfigName.TIMESTAMP_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "Field name for record timestamp.\n" + OPTIONALITY_DOC) - .define(Keys.STATIC_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, + .define(ConfigName.STATIC_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "Field name for static data field.\n" + OPTIONALITY_DOC) - .define(Keys.STATIC_VALUE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, + .define(ConfigName.STATIC_VALUE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "Static field value, if field name configured."); private static final Schema OPTIONAL_TIMESTAMP_SCHEMA = Timestamp.builder().optional().build(); @@ -98,12 +98,12 @@ public static InsertionSpec parse(String spec) { @Override public void configure(Map props) { final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props); - topicField = InsertionSpec.parse(config.getString(Keys.TOPIC_FIELD)); - partitionField = InsertionSpec.parse(config.getString(Keys.PARTITION_FIELD)); - offsetField = InsertionSpec.parse(config.getString(Keys.OFFSET_FIELD)); - timestampField = InsertionSpec.parse(config.getString(Keys.TIMESTAMP_FIELD)); - staticField = InsertionSpec.parse(config.getString(Keys.STATIC_FIELD)); - staticValue = config.getString(Keys.STATIC_VALUE); + topicField = InsertionSpec.parse(config.getString(ConfigName.TOPIC_FIELD)); + partitionField = InsertionSpec.parse(config.getString(ConfigName.PARTITION_FIELD)); + offsetField = InsertionSpec.parse(config.getString(ConfigName.OFFSET_FIELD)); + timestampField = InsertionSpec.parse(config.getString(ConfigName.TIMESTAMP_FIELD)); + staticField = InsertionSpec.parse(config.getString(ConfigName.STATIC_FIELD)); + staticValue = config.getString(ConfigName.STATIC_VALUE); applicable = topicField != null || partitionField != null || offsetField != null || timestampField != null; schemaUpdateCache = new SynchronizedCache<>(new LRUCache(16)); From c5260a718e2f0ade66c4607a4b9c21abda61b90c Mon Sep 17 00:00:00 2001 From: Shikhar Bhushan Date: Fri, 13 Jan 2017 14:01:25 -0800 Subject: [PATCH 04/23] ValueToKey SMT --- .../kafka/connect/transforms/ValueToKey.java | 112 ++++++++++++++++++ .../connect/transforms/ValueToKeyTest.java | 87 ++++++++++++++ 2 files changed, 199 insertions(+) create mode 100644 connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java create mode 100644 connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java new file mode 100644 index 0000000000000..899b4e4053fc2 --- /dev/null +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java @@ -0,0 +1,112 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.transforms; + +import org.apache.kafka.common.cache.Cache; +import org.apache.kafka.common.cache.LRUCache; +import org.apache.kafka.common.cache.SynchronizedCache; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.transforms.util.SimpleConfig; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class ValueToKey> implements Transformation { + + public static final String FIELDS_CONFIG = "fields"; + + private static final ConfigDef CONFIG_DEF = new ConfigDef() + .define(FIELDS_CONFIG, ConfigDef.Type.LIST, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.HIGH, + "Field names on the record value to extract as the record key."); + + private List fields; + + private Cache valueToKeySchemaCache; + + @Override + public void configure(Map configs) { + final SimpleConfig config = new SimpleConfig(CONFIG_DEF, configs); + fields = config.getList(FIELDS_CONFIG); + valueToKeySchemaCache = new SynchronizedCache<>(new LRUCache(16)); + } + + @Override + public R apply(R record) { + final Schema valueSchema = record.valueSchema(); + if (valueSchema == null) { + if (!(record.value() instanceof Map)) { + throw new DataException("Only Map values supported for schemaless data"); + } + return applySchemaless(record); + } else { + if (valueSchema.type() != Schema.Type.STRUCT) { + throw new DataException("Only STRUCT schema type supported, was: " + valueSchema.type()); + } + return applyWithSchema(record); + } + } + + private R applySchemaless(R record) { + final Map value = (Map) record.value(); + final Map key = new HashMap<>(fields.size()); + for (String field : fields) { + key.put(field, value.get(field)); + } + return record.newRecord(record.topic(), record.kafkaPartition(), null, key, record.valueSchema(), record.value(), record.timestamp()); + } + + private R applyWithSchema(R record) { + final Schema valueSchema = record.valueSchema(); + final Struct value = (Struct) record.value(); + + Schema keySchema = valueToKeySchemaCache.get(valueSchema); + if (keySchema == null) { + final SchemaBuilder keySchemaBuilder = SchemaBuilder.struct(); + for (String field : fields) { + final Schema fieldSchema = value.schema().field(field).schema(); + keySchemaBuilder.field(field, fieldSchema); + } + keySchema = keySchemaBuilder.build(); + valueToKeySchemaCache.put(valueSchema, keySchema); + } + + final Struct key = new Struct(keySchema); + for (String field : fields) { + key.put(field, value.get(field)); + } + + return record.newRecord(record.topic(), record.kafkaPartition(), keySchema, key, valueSchema, value, record.timestamp()); + } + + @Override + public ConfigDef config() { + return CONFIG_DEF; + } + + @Override + public void close() { + valueToKeySchemaCache = null; + } + +} diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java new file mode 100644 index 0000000000000..e5328d36baebb --- /dev/null +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ValueToKeyTest.java @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.transforms; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.Test; + +import java.util.Collections; +import java.util.HashMap; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; + +public class ValueToKeyTest { + + @Test + public void schemaless() { + final ValueToKey xform = new ValueToKey<>(); + xform.configure(Collections.singletonMap("fields", "a,b")); + + final HashMap value = new HashMap<>(); + value.put("a", 1); + value.put("b", 2); + value.put("c", 3); + + final SinkRecord record = new SinkRecord("", 0, null, null, null, value, 0); + final SinkRecord transformedRecord = xform.apply(record); + + final HashMap expectedKey = new HashMap<>(); + expectedKey.put("a", 1); + expectedKey.put("b", 2); + + assertNull(transformedRecord.keySchema()); + assertEquals(expectedKey, transformedRecord.key()); + } + + @Test + public void withSchema() { + final ValueToKey xform = new ValueToKey<>(); + xform.configure(Collections.singletonMap("fields", "a,b")); + + final Schema valueSchema = SchemaBuilder.struct() + .field("a", Schema.INT32_SCHEMA) + .field("b", Schema.INT32_SCHEMA) + .field("c", Schema.INT32_SCHEMA) + .build(); + + final Struct value = new Struct(valueSchema); + value.put("a", 1); + value.put("b", 2); + value.put("c", 3); + + final SinkRecord record = new SinkRecord("", 0, null, null, valueSchema, value, 0); + final SinkRecord transformedRecord = xform.apply(record); + + final Schema expectedKeySchema = SchemaBuilder.struct() + .field("a", Schema.INT32_SCHEMA) + .field("b", Schema.INT32_SCHEMA) + .build(); + + final Struct expectedKey = new Struct(expectedKeySchema) + .put("a", 1) + .put("b", 2); + + assertEquals(expectedKeySchema, transformedRecord.keySchema()); + assertEquals(expectedKey, transformedRecord.key()); + } + +} From 06b47a8a2fbba9f54d14c8b0eccf5052fb3712fe Mon Sep 17 00:00:00 2001 From: Shikhar Bhushan Date: Fri, 13 Jan 2017 14:30:06 -0800 Subject: [PATCH 05/23] Clearer naming -- HoistField, ExtractField --- .../connect/transforms/{Extract.java => ExtractField.java} | 6 +++--- .../connect/transforms/{Hoist.java => HoistField.java} | 6 +++--- .../transforms/{ExtractTest.java => ExtractFieldTest.java} | 6 +++--- .../java/org/apache/kafka/connect/transforms/HoistTest.java | 4 ++-- tests/kafkatest/tests/connect/connect_distributed_test.py | 2 +- 5 files changed, 12 insertions(+), 12 deletions(-) rename connect/transforms/src/main/java/org/apache/kafka/connect/transforms/{Extract.java => ExtractField.java} (96%) rename connect/transforms/src/main/java/org/apache/kafka/connect/transforms/{Hoist.java => HoistField.java} (97%) rename connect/transforms/src/test/java/org/apache/kafka/connect/transforms/{ExtractTest.java => ExtractFieldTest.java} (92%) diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Extract.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java similarity index 96% rename from connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Extract.java rename to connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java index ba8fd7b735d32..ad9fd7576d55e 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Extract.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java @@ -26,7 +26,7 @@ import java.util.Map; -public abstract class Extract> implements Transformation { +public abstract class ExtractField> implements Transformation { public static final String FIELD_CONFIG = "field"; @@ -74,7 +74,7 @@ public ConfigDef config() { protected abstract R newRecord(R record, Schema updatedSchema, Object updatedValue); - public static class Key> extends Extract { + public static class Key> extends ExtractField { @Override protected Schema operatingSchema(R record) { return record.keySchema(); @@ -91,7 +91,7 @@ protected R newRecord(R record, Schema updatedSchema, Object updatedValue) { } } - public static class Value> extends Extract { + public static class Value> extends ExtractField { @Override protected Schema operatingSchema(R record) { return record.valueSchema(); diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Hoist.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HoistField.java similarity index 97% rename from connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Hoist.java rename to connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HoistField.java index 294e6c4e0a37e..9b33e9ccab726 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Hoist.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HoistField.java @@ -30,7 +30,7 @@ import java.util.Collections; import java.util.Map; -public abstract class Hoist> implements Transformation { +public abstract class HoistField> implements Transformation { public static final String FIELD_CONFIG = "field"; @@ -88,7 +88,7 @@ public ConfigDef config() { /** * Wraps the record key in a {@link Struct} when schema present, or a {@link Map} in schemaless mode, with the specified field name. */ - public static class Key> extends Hoist { + public static class Key> extends HoistField { @Override protected Schema operatingSchema(R record) { return record.keySchema(); @@ -108,7 +108,7 @@ protected R newRecord(R record, Schema updatedSchema, Object updatedValue) { /** * Wraps the record value in a {@link Struct} when schema present, or a {@link Map} in schemaless mode, with the specified field name. */ - public static class Value> extends Hoist { + public static class Value> extends HoistField { @Override protected Schema operatingSchema(R record) { return record.valueSchema(); diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java similarity index 92% rename from connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractTest.java rename to connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java index 0333838291f14..d72179559a50a 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ExtractFieldTest.java @@ -28,11 +28,11 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; -public class ExtractTest { +public class ExtractFieldTest { @Test public void schemaless() { - final Extract xform = new Extract.Key<>(); + final ExtractField xform = new ExtractField.Key<>(); xform.configure(Collections.singletonMap("field", "magic")); final SinkRecord record = new SinkRecord("test", 0, null, Collections.singletonMap("magic", 42), null, null, 0); @@ -44,7 +44,7 @@ public void schemaless() { @Test public void withSchema() { - final Extract xform = new Extract.Key<>(); + final ExtractField xform = new ExtractField.Key<>(); xform.configure(Collections.singletonMap("field", "magic")); final Schema keySchema = SchemaBuilder.struct().field("magic", Schema.INT32_SCHEMA).build(); diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistTest.java index 70cc454a5cdeb..2e7b59a909af3 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistTest.java @@ -31,7 +31,7 @@ public class HoistTest { @Test public void schemaless() { - final Hoist xform = new Hoist.Key<>(); + final HoistField xform = new HoistField.Key<>(); xform.configure(Collections.singletonMap("field", "magic")); final SinkRecord record = new SinkRecord("test", 0, null, 42, null, null, 0); @@ -43,7 +43,7 @@ public void schemaless() { @Test public void withSchema() { - final Hoist xform = new Hoist.Key<>(); + final HoistField xform = new HoistField.Key<>(); xform.configure(Collections.singletonMap("field", "magic")); final SinkRecord record = new SinkRecord("test", 0, Schema.INT32_SCHEMA, 42, null, null, 0); diff --git a/tests/kafkatest/tests/connect/connect_distributed_test.py b/tests/kafkatest/tests/connect/connect_distributed_test.py index 021f1807e7779..c298fb154e86e 100644 --- a/tests/kafkatest/tests/connect/connect_distributed_test.py +++ b/tests/kafkatest/tests/connect/connect_distributed_test.py @@ -460,7 +460,7 @@ def test_transformations(self): 'file': self.INPUT_FILE, 'topic': self.TOPIC, 'transforms': 'hoistToStruct,insertTimestampField', - 'transforms.hoistToStruct.type': 'org.apache.kafka.connect.transforms.Hoist$Value', + 'transforms.hoistToStruct.type': 'org.apache.kafka.connect.transforms.HoistField$Value', 'transforms.hoistToStruct.field': 'content', 'transforms.insertTimestampField.type': 'org.apache.kafka.connect.transforms.InsertField$Value', 'transforms.insertTimestampField.timestamp.field': ts_fieldname, From cdd88551f6957f6e7e3b3414158ee7f678708553 Mon Sep 17 00:00:00 2001 From: Shikhar Bhushan Date: Tue, 17 Jan 2017 13:56:23 -0800 Subject: [PATCH 06/23] Address review comments; docgen coming along --- .../connect/tools/TransformationDoc.java | 77 +++++++++++++++++++ .../connect/transforms/ExtractField.java | 29 +++---- .../kafka/connect/transforms/HoistField.java | 15 ++-- .../kafka/connect/transforms/InsertField.java | 64 +++++++-------- .../connect/transforms/SetSchemaMetadata.java | 15 ++-- .../connect/transforms/TimestampRouter.java | 30 ++++---- .../kafka/connect/transforms/ValueToKey.java | 30 ++++---- .../connect/transforms/util/Requirements.java | 61 +++++++++++++++ 8 files changed, 228 insertions(+), 93 deletions(-) create mode 100644 connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java create mode 100644 connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/Requirements.java diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java new file mode 100644 index 0000000000000..3aba8c519ce15 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.tools; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.transforms.ExtractField; +import org.apache.kafka.connect.transforms.HoistField; +import org.apache.kafka.connect.transforms.InsertField; +import org.apache.kafka.connect.transforms.SetSchemaMetadata; +import org.apache.kafka.connect.transforms.TimestampRouter; +import org.apache.kafka.connect.transforms.ValueToKey; + +import java.io.PrintStream; +import java.util.Arrays; +import java.util.List; + +public class TransformationDoc { + + private static final class DocInfo { + final String transformationName; + final String overview; + final ConfigDef configDef; + + private DocInfo(String transformationName, String overview, ConfigDef configDef) { + this.transformationName = transformationName; + this.overview = overview; + this.configDef = configDef; + } + } + + private static final List TRANSFORMATIONS = Arrays.asList( + new DocInfo(InsertField.class.getName(), InsertField.OVERVIEW_DOC, InsertField.CONFIG_DEF), + new DocInfo(ValueToKey.class.getName(), ValueToKey.OVERVIEW_DOC, ValueToKey.CONFIG_DEF), + new DocInfo(HoistField.class.getName(), HoistField.OVERVIEW_DOC, HoistField.CONFIG_DEF), + new DocInfo(ExtractField.class.getName(), ExtractField.OVERVIEW_DOC, ExtractField.CONFIG_DEF), + new DocInfo(SetSchemaMetadata.class.getName(), SetSchemaMetadata.OVERVIEW_DOC, SetSchemaMetadata.CONFIG_DEF), + new DocInfo(TimestampRouter.class.getName(), TimestampRouter.OVERVIEW_DOC, TimestampRouter.CONFIG_DEF) + ); + + private static void printHtml(PrintStream out) throws NoSuchFieldException, IllegalAccessException, InstantiationException { + for (final DocInfo docInfo : TRANSFORMATIONS) { + out.println("

"); + + out.print("

"); + out.print(docInfo.transformationName); + out.println("

"); + + out.println("

"); + out.println(docInfo.overview); + out.println("

"); + + out.println(docInfo.configDef.toHtmlTable()); + + out.println("
"); + } + } + + public static void main(String... args) throws Exception { + printHtml(System.out); + } + +} diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java index ad9fd7576d55e..b7063136e9df1 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ExtractField.java @@ -21,18 +21,27 @@ import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; -import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.transforms.util.SimpleConfig; import java.util.Map; +import static org.apache.kafka.connect.transforms.util.Requirements.requireMap; +import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct; + public abstract class ExtractField> implements Transformation { - public static final String FIELD_CONFIG = "field"; + public static final String OVERVIEW_DOC = + "Extract the specified field from a Struct when schema present, or a Map in the case of schemaless data." + + "

Use the concrete transformation type designed for the record key (" + Key.class.getCanonicalName() + ") " + + "or value (" + Value.class.getCanonicalName() + ")."; + + private static final String FIELD_CONFIG = "field"; - private static final ConfigDef CONFIG_DEF = new ConfigDef() + public static final ConfigDef CONFIG_DEF = new ConfigDef() .define(FIELD_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM, "Field name to extract."); + private static final String PURPOSE = "field extraction"; + private String fieldName; @Override @@ -44,18 +53,12 @@ public void configure(Map props) { @Override public R apply(R record) { final Schema schema = operatingSchema(record); - final Object value = operatingValue(record); - if (schema == null) { - if (!(value instanceof Map)) { - throw new DataException("Only Map supported in schemaless mode"); - } - return newRecord(record, null, ((Map) value).get(fieldName)); + final Map value = requireMap(operatingValue(record), PURPOSE); + return newRecord(record, null, value.get(fieldName)); } else { - if (schema.type() != Schema.Type.STRUCT) { - throw new DataException("Only STRUCT schema type supported, was: " + schema.type()); - } - return newRecord(record, schema.field(fieldName).schema(), ((Struct) value).get(fieldName)); + final Struct value = requireStruct(operatingValue(record), PURPOSE); + return newRecord(record, schema.field(fieldName).schema(), value.get(fieldName)); } } diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HoistField.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HoistField.java index 9b33e9ccab726..1f2ed7c8e626f 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HoistField.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HoistField.java @@ -32,9 +32,14 @@ public abstract class HoistField> implements Transformation { - public static final String FIELD_CONFIG = "field"; + public static final String OVERVIEW_DOC = + "Wrap data using the specified field name in a Struct when schema present, or a Map in the case of schemaless data." + + "

Use the concrete transformation type designed for the record key (" + Key.class.getCanonicalName() + ") " + + "or value (" + Value.class.getCanonicalName() + ")."; - private static final ConfigDef CONFIG_DEF = new ConfigDef() + private static final String FIELD_CONFIG = "field"; + + public static final ConfigDef CONFIG_DEF = new ConfigDef() .define(FIELD_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.MEDIUM, "Field name for the single field that will be created in the resulting Struct or Map."); @@ -85,9 +90,6 @@ public ConfigDef config() { protected abstract R newRecord(R record, Schema updatedSchema, Object updatedValue); - /** - * Wraps the record key in a {@link Struct} when schema present, or a {@link Map} in schemaless mode, with the specified field name. - */ public static class Key> extends HoistField { @Override protected Schema operatingSchema(R record) { @@ -105,9 +107,6 @@ protected R newRecord(R record, Schema updatedSchema, Object updatedValue) { } } - /** - * Wraps the record value in a {@link Struct} when schema present, or a {@link Map} in schemaless mode, with the specified field name. - */ public static class Value> extends HoistField { @Override protected Schema operatingSchema(R record) { diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java index 0ec6cd5e7b0e7..31f66d7770841 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java @@ -27,17 +27,24 @@ import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Timestamp; -import org.apache.kafka.connect.errors.DataException; -import org.apache.kafka.connect.sink.SinkRecord; import org.apache.kafka.connect.transforms.util.SimpleConfig; import java.util.Date; import java.util.HashMap; import java.util.Map; +import static org.apache.kafka.connect.transforms.util.Requirements.requireMap; +import static org.apache.kafka.connect.transforms.util.Requirements.requireSinkRecord; +import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct; + public abstract class InsertField> implements Transformation { - public interface ConfigName { + public static final String OVERVIEW_DOC = + "Insert field(s) using attributes from the record metadata or a configured static value." + + "

Use the concrete transformation type designed for the record key (" + Key.class.getCanonicalName() + ") " + + "or value (" + Value.class.getCanonicalName() + ")."; + + private interface ConfigName { String TOPIC_FIELD = "topic.field"; String PARTITION_FIELD = "partition.field"; String OFFSET_FIELD = "offset.field"; @@ -48,7 +55,7 @@ public interface ConfigName { private static final String OPTIONALITY_DOC = "Suffix with '!' to make this a required field, or '?' to keep it optional (the default)."; - private static final ConfigDef CONFIG_DEF = new ConfigDef() + public static final ConfigDef CONFIG_DEF = new ConfigDef() .define(ConfigName.TOPIC_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "Field name for Kafka topic.\n" + OPTIONALITY_DOC) .define(ConfigName.PARTITION_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, @@ -62,6 +69,8 @@ public interface ConfigName { .define(ConfigName.STATIC_VALUE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "Static field value, if field name configured."); + private static final String PURPOSE = "field insertion"; + private static final Schema OPTIONAL_TIMESTAMP_SCHEMA = Timestamp.builder().optional().build(); private static final class InsertionSpec { @@ -113,24 +122,16 @@ public void configure(Map props) { public R apply(R record) { if (!applicable) return record; - final Schema schema = operatingSchema(record); - final Object value = operatingValue(record); - - if (value == null) - throw new DataException("null value"); - - if (schema == null) { - if (!(value instanceof Map)) - throw new DataException("Can only operate on Map value in schemaless mode: " + value.getClass().getName()); - return applySchemaless(record, (Map) value); + if (operatingSchema(record) == null) { + return applySchemaless(record); } else { - if (schema.type() != Schema.Type.STRUCT) - throw new DataException("Can only operate on Struct types: " + value.getClass().getName()); - return applyWithSchema(record, schema, (Struct) value); + return applyWithSchema(record); } } - private R applySchemaless(R record, Map value) { + private R applySchemaless(R record) { + final Map value = requireMap(operatingValue(record), PURPOSE); + final Map updatedValue = new HashMap<>(value); if (topicField != null) { @@ -140,9 +141,7 @@ private R applySchemaless(R record, Map value) { updatedValue.put(partitionField.name, record.kafkaPartition()); } if (offsetField != null) { - if (!(record instanceof SinkRecord)) - throw new DataException("Offset insertion is only supported for sink connectors, record is of type: " + record.getClass()); - updatedValue.put(offsetField.name, ((SinkRecord) record).kafkaOffset()); + updatedValue.put(offsetField.name, requireSinkRecord(record, PURPOSE).kafkaOffset()); } if (timestampField != null && record.timestamp() != null) { updatedValue.put(timestampField.name, record.timestamp()); @@ -150,14 +149,17 @@ private R applySchemaless(R record, Map value) { if (staticField != null && staticValue != null) { updatedValue.put(staticField.name, staticValue); } + return newRecord(record, null, updatedValue); } - private R applyWithSchema(R record, Schema schema, Struct value) { - Schema updatedSchema = schemaUpdateCache.get(schema); + private R applyWithSchema(R record) { + final Struct value = requireStruct(operatingValue(record), PURPOSE); + + Schema updatedSchema = schemaUpdateCache.get(value.schema()); if (updatedSchema == null) { - updatedSchema = makeUpdatedSchema(schema); - schemaUpdateCache.put(schema, updatedSchema); + updatedSchema = makeUpdatedSchema(value.schema()); + schemaUpdateCache.put(value.schema(), updatedSchema); } final Struct updatedValue = new Struct(updatedSchema); @@ -218,10 +220,7 @@ private void insertFields(R record, Struct value) { value.put(partitionField.name, record.kafkaPartition()); } if (offsetField != null) { - if (!(record instanceof SinkRecord)) { - throw new DataException("Offset insertion is only supported for sink connectors, record is of type: " + record.getClass()); - } - value.put(offsetField.name, ((SinkRecord) record).kafkaOffset()); + value.put(offsetField.name, requireSinkRecord(record, PURPOSE).kafkaOffset()); } if (timestampField != null && record.timestamp() != null) { value.put(timestampField.name, new Date(record.timestamp())); @@ -248,8 +247,7 @@ public ConfigDef config() { protected abstract R newRecord(R record, Schema updatedSchema, Object updatedValue); /** - * This transformation allows inserting configured attributes of the record metadata as fields in the record key. - * It also allows adding a static data field. + * */ public static class Key> extends InsertField { @@ -270,10 +268,6 @@ protected R newRecord(R record, Schema updatedSchema, Object updatedValue) { } - /** - * This transformation allows inserting configured attributes of the record metadata as fields in the record value. - * It also allows adding a static data field. - */ public static class Value> extends InsertField { @Override diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java index 0dc627a8836d3..890723c08e856 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java @@ -21,19 +21,24 @@ import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.data.ConnectSchema; import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.transforms.util.SimpleConfig; import java.util.Map; +import static org.apache.kafka.connect.transforms.util.Requirements.requireSchema; + public abstract class SetSchemaMetadata> implements Transformation { - public interface ConfigName { + public static final String OVERVIEW_DOC = + "Set the schema name, version or both on the record's key (" + Key.class.getCanonicalName() + ")" + + " or value (" + Value.class.getCanonicalName() + ") schema."; + + private interface ConfigName { String SCHEMA_NAME = "schema.name"; String SCHEMA_VERSION = "schema.version"; } - private static final ConfigDef CONFIG_DEF = new ConfigDef() + public static final ConfigDef CONFIG_DEF = new ConfigDef() .define(ConfigName.SCHEMA_NAME, ConfigDef.Type.STRING, null, ConfigDef.Importance.HIGH, "Schema name to set.") .define(ConfigName.SCHEMA_VERSION, ConfigDef.Type.INT, null, ConfigDef.Importance.HIGH, "Schema version to set."); @@ -51,9 +56,7 @@ public void configure(Map configs) { public R apply(R record) { if (schemaName == null && schemaVersion == null) return record; // no-op final Schema schema = operatingSchema(record); - if (schema == null) { - throw new DataException("Cannot update metadata on null Schema"); - } + requireSchema(schema, "updating schema metadata"); final boolean isArray = schema.type() == Schema.Type.ARRAY; final boolean isMap = schema.type() == Schema.Type.MAP; final Schema updatedSchema = new ConnectSchema( diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java index 1dd5345a6091a..af26ec218df8c 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java @@ -27,25 +27,25 @@ import java.util.Map; import java.util.TimeZone; -/** - * This transformation facilitates updating the record's topic field as a function of the original topic value and the record timestamp. - *

- * It is mainly useful for sink connectors, since the topic field is often used to determine the equivalent entity name in the destination system - * (e.g. database table or search index name). - */ public class TimestampRouter> implements Transformation { - public interface Keys { + public static final String OVERVIEW_DOC = + "This transformation facilitates updating the record's topic field as a function of the original topic value and the record timestamp." + + "

" + + "It is mainly useful for sink connectors, since the topic field is often used to determine the equivalent entity name in the destination system" + + "(e.g. database table or search index name)."; + + public static final ConfigDef CONFIG_DEF = new ConfigDef() + .define(ConfigName.TOPIC_FORMAT, ConfigDef.Type.STRING, "${topic}-${timestamp}", ConfigDef.Importance.HIGH, + "Format string which can contain ${topic} and ${timestamp} as placeholders for the topic and timestamp, respectively.") + .define(ConfigName.TIMESTAMP_FORMAT, ConfigDef.Type.STRING, "yyyyMMdd", ConfigDef.Importance.HIGH, + "Format string for the timestamp that is compatible with java.text.SimpleDateFormat."); + + private interface ConfigName { String TOPIC_FORMAT = "topic.format"; String TIMESTAMP_FORMAT = "timestamp.format"; } - private static final ConfigDef CONFIG_DEF = new ConfigDef() - .define(Keys.TOPIC_FORMAT, ConfigDef.Type.STRING, "${topic}-${timestamp}", ConfigDef.Importance.HIGH, - "Format string which can contain ``${topic}`` and ``${timestamp}`` as placeholders for the topic and timestamp, respectively.") - .define(Keys.TIMESTAMP_FORMAT, ConfigDef.Type.STRING, "yyyyMMdd", ConfigDef.Importance.HIGH, - "Format string for the timestamp that is compatible with java.text.SimpleDateFormat."); - private String topicFormat; private ThreadLocal timestampFormat; @@ -53,9 +53,9 @@ public interface Keys { public void configure(Map props) { final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props); - topicFormat = config.getString(Keys.TOPIC_FORMAT); + topicFormat = config.getString(ConfigName.TOPIC_FORMAT); - final String timestampFormatStr = config.getString(Keys.TIMESTAMP_FORMAT); + final String timestampFormatStr = config.getString(ConfigName.TIMESTAMP_FORMAT); timestampFormat = new ThreadLocal() { @Override protected SimpleDateFormat initialValue() { diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java index 899b4e4053fc2..4a5d097edb44a 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java @@ -25,21 +25,27 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; -import org.apache.kafka.connect.errors.DataException; import org.apache.kafka.connect.transforms.util.SimpleConfig; import java.util.HashMap; import java.util.List; import java.util.Map; +import static org.apache.kafka.connect.transforms.util.Requirements.requireMap; +import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct; + public class ValueToKey> implements Transformation { + public static final String OVERVIEW_DOC = "Copy fields from the record value to the record key. The existing record key if any is clobbered."; + public static final String FIELDS_CONFIG = "fields"; - private static final ConfigDef CONFIG_DEF = new ConfigDef() + public static final ConfigDef CONFIG_DEF = new ConfigDef() .define(FIELDS_CONFIG, ConfigDef.Type.LIST, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.HIGH, "Field names on the record value to extract as the record key."); + private static final String PURPOSE = "copying fields from value to key"; + private List fields; private Cache valueToKeySchemaCache; @@ -53,22 +59,15 @@ public void configure(Map configs) { @Override public R apply(R record) { - final Schema valueSchema = record.valueSchema(); - if (valueSchema == null) { - if (!(record.value() instanceof Map)) { - throw new DataException("Only Map values supported for schemaless data"); - } + if (record.valueSchema() == null) { return applySchemaless(record); } else { - if (valueSchema.type() != Schema.Type.STRUCT) { - throw new DataException("Only STRUCT schema type supported, was: " + valueSchema.type()); - } return applyWithSchema(record); } } private R applySchemaless(R record) { - final Map value = (Map) record.value(); + final Map value = requireMap(record.value(), PURPOSE); final Map key = new HashMap<>(fields.size()); for (String field : fields) { key.put(field, value.get(field)); @@ -77,10 +76,9 @@ private R applySchemaless(R record) { } private R applyWithSchema(R record) { - final Schema valueSchema = record.valueSchema(); - final Struct value = (Struct) record.value(); + final Struct value = requireStruct(record.value(), PURPOSE); - Schema keySchema = valueToKeySchemaCache.get(valueSchema); + Schema keySchema = valueToKeySchemaCache.get(value.schema()); if (keySchema == null) { final SchemaBuilder keySchemaBuilder = SchemaBuilder.struct(); for (String field : fields) { @@ -88,7 +86,7 @@ private R applyWithSchema(R record) { keySchemaBuilder.field(field, fieldSchema); } keySchema = keySchemaBuilder.build(); - valueToKeySchemaCache.put(valueSchema, keySchema); + valueToKeySchemaCache.put(value.schema(), keySchema); } final Struct key = new Struct(keySchema); @@ -96,7 +94,7 @@ private R applyWithSchema(R record) { key.put(field, value.get(field)); } - return record.newRecord(record.topic(), record.kafkaPartition(), keySchema, key, valueSchema, value, record.timestamp()); + return record.newRecord(record.topic(), record.kafkaPartition(), keySchema, key, value.schema(), value, record.timestamp()); } @Override diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/Requirements.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/Requirements.java new file mode 100644 index 0000000000000..b004f8ae91500 --- /dev/null +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/Requirements.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.transforms.util; + +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.sink.SinkRecord; + +import java.util.Map; + +public class Requirements { + + public static void requireSchema(Schema schema, String purpose) { + if (schema == null) { + throw new DataException("Schema required for [" + purpose + "]"); + } + } + + public static Map requireMap(Object value, String purpose) { + if (!(value instanceof Map)) { + throw new DataException("Only Map objects supported in absence of schema for [" + purpose + "], found: " + nullSafeClassName(value)); + } + return (Map) value; + } + + public static Struct requireStruct(Object value, String purpose) { + if (!(value instanceof Struct)) { + throw new DataException("Only Struct objects supported for [" + purpose + "], found: " + nullSafeClassName(value)); + } + return (Struct) value; + } + + public static SinkRecord requireSinkRecord(ConnectRecord record, String purpose) { + if (!(record instanceof SinkRecord)) { + throw new DataException("Only SinkRecord supported for [" + purpose + "], found: " + nullSafeClassName(record)); + } + return (SinkRecord) record; + } + + private static String nullSafeClassName(Object x) { + return x == null ? "null" : x.getClass().getCanonicalName(); + } + +} From 04dc9f3aa2083f440db1b2db2f526e3c6b194ac4 Mon Sep 17 00:00:00 2001 From: Shikhar Bhushan Date: Tue, 17 Jan 2017 15:37:24 -0800 Subject: [PATCH 07/23] Auto-generate docs/generated/connect_transforms.html --- build.gradle | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/build.gradle b/build.gradle index 6f4d2501f7345..e191a51c3b89a 100644 --- a/build.gradle +++ b/build.gradle @@ -508,7 +508,7 @@ project(':core') { task siteDocsTar(dependsOn: ['genProtocolErrorDocs', 'genProtocolApiKeyDocs', 'genProtocolMessageDocs', 'genProducerConfigDocs', 'genConsumerConfigDocs', 'genKafkaConfigDocs', - 'genTopicConfigDocs', ':connect:runtime:genConnectConfigDocs', + 'genTopicConfigDocs', ':connect:runtime:genConnectConfigDocs', ':connect:runtime:genConnectTransformationDocs', ':streams:genStreamsConfigDocs'], type: Tar) { classifier = 'site-docs' compression = Compression.GZIP @@ -948,6 +948,13 @@ project(':connect:runtime') { if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() } standardOutput = new File(generatedDocsDir, "connect_config.html").newOutputStream() } + + task genConnectTransformationDocs(type: JavaExec) { + classpath = sourceSets.main.runtimeClasspath + main = 'org.apache.kafka.connect.tools.TransformationDoc' + if( !generatedDocsDir.exists() ) { generatedDocsDir.mkdirs() } + standardOutput = new File(generatedDocsDir, "connect_transforms.html").newOutputStream() + } } project(':connect:file') { From e90f989a357ff1a71af392c0e366e43e7b04dc75 Mon Sep 17 00:00:00 2001 From: Shikhar Bhushan Date: Tue, 17 Jan 2017 15:37:39 -0800 Subject: [PATCH 08/23] html tweaks --- .../connect/tools/TransformationDoc.java | 26 +++++++++++-------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java index 3aba8c519ce15..8f0817935fe61 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java @@ -52,21 +52,25 @@ private DocInfo(String transformationName, String overview, ConfigDef configDef) new DocInfo(TimestampRouter.class.getName(), TimestampRouter.OVERVIEW_DOC, TimestampRouter.CONFIG_DEF) ); - private static void printHtml(PrintStream out) throws NoSuchFieldException, IllegalAccessException, InstantiationException { - for (final DocInfo docInfo : TRANSFORMATIONS) { - out.println("

"); + private static void printTransformationHtml(PrintStream out, DocInfo docInfo) { + out.println("
"); - out.print("

"); - out.print(docInfo.transformationName); - out.println("

"); + out.print("
"); + out.print(docInfo.transformationName); + out.println("
"); - out.println("

"); - out.println(docInfo.overview); - out.println("

"); + out.println(docInfo.overview); - out.println(docInfo.configDef.toHtmlTable()); + out.println("

"); - out.println("

"); + out.println(docInfo.configDef.toHtmlTable()); + + out.println("
"); + } + + private static void printHtml(PrintStream out) throws NoSuchFieldException, IllegalAccessException, InstantiationException { + for (final DocInfo docInfo : TRANSFORMATIONS) { + printTransformationHtml(out, docInfo); } } From 692901f7184004ab6046398b230e99d6ab99b22e Mon Sep 17 00:00:00 2001 From: Shikhar Bhushan Date: Tue, 17 Jan 2017 15:42:58 -0800 Subject: [PATCH 09/23] consistent doc style --- .../org/apache/kafka/connect/transforms/TimestampRouter.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java index af26ec218df8c..f917a8d65f09a 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampRouter.java @@ -30,9 +30,9 @@ public class TimestampRouter> implements Transformation { public static final String OVERVIEW_DOC = - "This transformation facilitates updating the record's topic field as a function of the original topic value and the record timestamp." + "Update the record's topic field as a function of the original topic value and the record timestamp." + "

" - + "It is mainly useful for sink connectors, since the topic field is often used to determine the equivalent entity name in the destination system" + + "This is mainly useful for sink connectors, since the topic field is often used to determine the equivalent entity name in the destination system" + "(e.g. database table or search index name)."; public static final ConfigDef CONFIG_DEF = new ConfigDef() From 5772e95bb782ec8c080dc6f0bdcb668e6e06dbfc Mon Sep 17 00:00:00 2001 From: Shikhar Bhushan Date: Tue, 17 Jan 2017 16:22:49 -0800 Subject: [PATCH 10/23] Add RegexRouter SMT --- .../connect/tools/TransformationDoc.java | 4 +- .../kafka/connect/transforms/RegexRouter.java | 74 +++++++++++++++++++ .../connect/transforms/RegexRouterTest.java | 70 ++++++++++++++++++ 3 files changed, 147 insertions(+), 1 deletion(-) create mode 100644 connect/transforms/src/main/java/org/apache/kafka/connect/transforms/RegexRouter.java create mode 100644 connect/transforms/src/test/java/org/apache/kafka/connect/transforms/RegexRouterTest.java diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java index 8f0817935fe61..2a8697fb968b8 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java @@ -21,6 +21,7 @@ import org.apache.kafka.connect.transforms.ExtractField; import org.apache.kafka.connect.transforms.HoistField; import org.apache.kafka.connect.transforms.InsertField; +import org.apache.kafka.connect.transforms.RegexRouter; import org.apache.kafka.connect.transforms.SetSchemaMetadata; import org.apache.kafka.connect.transforms.TimestampRouter; import org.apache.kafka.connect.transforms.ValueToKey; @@ -49,7 +50,8 @@ private DocInfo(String transformationName, String overview, ConfigDef configDef) new DocInfo(HoistField.class.getName(), HoistField.OVERVIEW_DOC, HoistField.CONFIG_DEF), new DocInfo(ExtractField.class.getName(), ExtractField.OVERVIEW_DOC, ExtractField.CONFIG_DEF), new DocInfo(SetSchemaMetadata.class.getName(), SetSchemaMetadata.OVERVIEW_DOC, SetSchemaMetadata.CONFIG_DEF), - new DocInfo(TimestampRouter.class.getName(), TimestampRouter.OVERVIEW_DOC, TimestampRouter.CONFIG_DEF) + new DocInfo(TimestampRouter.class.getName(), TimestampRouter.OVERVIEW_DOC, TimestampRouter.CONFIG_DEF), + new DocInfo(RegexRouter.class.getName(), RegexRouter.OVERVIEW_DOC, RegexRouter.CONFIG_DEF) ); private static void printTransformationHtml(PrintStream out, DocInfo docInfo) { diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/RegexRouter.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/RegexRouter.java new file mode 100644 index 0000000000000..394c74ad3cc52 --- /dev/null +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/RegexRouter.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.transforms; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.transforms.util.SimpleConfig; + +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class RegexRouter> implements Transformation { + + public static final String OVERVIEW_DOC = "Update the record topic using the configured regular expression and replacement string." + + "

Under the hood, the regex is compiled to a java.util.regex.Pattern. " + + "If the pattern matches the input topic, java.util.regex.Matcher#replaceFirst() is used with the replacement string to obtain the new topic."; + + public static final ConfigDef CONFIG_DEF = new ConfigDef() + .define(ConfigName.REGEX, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.HIGH, + "Regular expression to use for matching.") + .define(ConfigName.REPLACEMENT, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.HIGH, + "Replacement string."); + + private interface ConfigName { + String REGEX = "regex"; + String REPLACEMENT = "replacement"; + } + + private Pattern regex; + private String replacement; + + @Override + public void configure(Map props) { + final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props); + regex = Pattern.compile(config.getString(ConfigName.REGEX)); + replacement = config.getString(ConfigName.REPLACEMENT); + } + + @Override + public R apply(R record) { + final Matcher matcher = regex.matcher(record.topic()); + if (matcher.matches()) { + final String topic = matcher.replaceFirst(replacement); + return record.newRecord(topic, record.kafkaPartition(), record.keySchema(), record.key(), record.valueSchema(), record.value(), record.timestamp()); + } + return record; + } + + @Override + public void close() { + } + + @Override + public ConfigDef config() { + return CONFIG_DEF; + } + +} diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/RegexRouterTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/RegexRouterTest.java new file mode 100644 index 0000000000000..c599265d57200 --- /dev/null +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/RegexRouterTest.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.transforms; + +import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class RegexRouterTest { + + private static String apply(String regex, String replacement, String topic) { + final Map props = new HashMap<>(); + props.put("regex", regex); + props.put("replacement", replacement); + final RegexRouter router = new RegexRouter<>(); + router.configure(props); + return router.apply(new SinkRecord(topic, 0, null, null, null, null, 0)) + .topic(); + } + + @Test + public void staticReplacement() { + assertEquals("bar", apply("foo", "bar", "foo")); + } + + @Test + public void doesntMatch() { + assertEquals("orig", apply("foo", "bar", "orig")); + } + + @Test + public void identity() { + assertEquals("orig", apply("(.*)", "$1", "orig")); + } + + @Test + public void addPrefix() { + assertEquals("prefix-orig", apply("(.*)", "prefix-$1", "orig")); + } + + @Test + public void addSuffix() { + assertEquals("orig-suffix", apply("(.*)", "$1-suffix", "orig")); + } + + @Test + public void slice() { + assertEquals("index", apply("(.*)-(\\d\\d\\d\\d\\d\\d\\d\\d)", "$1", "index-20160117")); + } + +} From 2b995857593614e7d27a4ceb1e7c6cc441b87b85 Mon Sep 17 00:00:00 2001 From: Shikhar Bhushan Date: Tue, 17 Jan 2017 16:39:29 -0800 Subject: [PATCH 11/23] consistent naming --- .../connect/transforms/{HoistTest.java => HoistFieldTest.java} | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) rename connect/transforms/src/test/java/org/apache/kafka/connect/transforms/{HoistTest.java => HoistFieldTest.java} (98%) diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistFieldTest.java similarity index 98% rename from connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistTest.java rename to connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistFieldTest.java index 2e7b59a909af3..b5f9d93d935e4 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HoistFieldTest.java @@ -27,7 +27,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNull; -public class HoistTest { +public class HoistFieldTest { @Test public void schemaless() { From 106390937d7a902628aa3188e4e48c297d98d2b5 Mon Sep 17 00:00:00 2001 From: Shikhar Bhushan Date: Wed, 18 Jan 2017 12:52:05 -0800 Subject: [PATCH 12/23] MaskField SMT --- .../connect/tools/TransformationDoc.java | 2 + .../kafka/connect/transforms/MaskField.java | 170 ++++++++++++++++++ .../connect/transforms/MaskFieldTest.java | 156 ++++++++++++++++ 3 files changed, 328 insertions(+) create mode 100644 connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java create mode 100644 connect/transforms/src/test/java/org/apache/kafka/connect/transforms/MaskFieldTest.java diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java index 2a8697fb968b8..d840ea69ed38b 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java @@ -21,6 +21,7 @@ import org.apache.kafka.connect.transforms.ExtractField; import org.apache.kafka.connect.transforms.HoistField; import org.apache.kafka.connect.transforms.InsertField; +import org.apache.kafka.connect.transforms.MaskField; import org.apache.kafka.connect.transforms.RegexRouter; import org.apache.kafka.connect.transforms.SetSchemaMetadata; import org.apache.kafka.connect.transforms.TimestampRouter; @@ -46,6 +47,7 @@ private DocInfo(String transformationName, String overview, ConfigDef configDef) private static final List TRANSFORMATIONS = Arrays.asList( new DocInfo(InsertField.class.getName(), InsertField.OVERVIEW_DOC, InsertField.CONFIG_DEF), + new DocInfo(MaskField.class.getName(), MaskField.OVERVIEW_DOC, MaskField.CONFIG_DEF), new DocInfo(ValueToKey.class.getName(), ValueToKey.OVERVIEW_DOC, ValueToKey.CONFIG_DEF), new DocInfo(HoistField.class.getName(), HoistField.OVERVIEW_DOC, HoistField.CONFIG_DEF), new DocInfo(ExtractField.class.getName(), ExtractField.OVERVIEW_DOC, ExtractField.CONFIG_DEF), diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java new file mode 100644 index 0000000000000..8c545762b8be7 --- /dev/null +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.transforms; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.transforms.util.SimpleConfig; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.kafka.connect.transforms.util.Requirements.requireMap; +import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct; + +public abstract class MaskField> implements Transformation { + + public static final String OVERVIEW_DOC = "Mask specified fields with a valid null value for the field type (i.e. 0, false, empty string, and so on)."; + + private static final String FIELDS_CONFIG = "fields"; + + public static final ConfigDef CONFIG_DEF = new ConfigDef() + .define(FIELDS_CONFIG, ConfigDef.Type.LIST, "", ConfigDef.Importance.HIGH, "Names of fields to mask."); + + private static final String PURPOSE = "mask fields"; + + private static final Map PRIMITIVE_VALUE_MAPPING = new HashMap<>(); + + static { + PRIMITIVE_VALUE_MAPPING.put(Boolean.class.getCanonicalName(), Boolean.FALSE); + PRIMITIVE_VALUE_MAPPING.put(Byte.class.getCanonicalName(), (byte) 0); + PRIMITIVE_VALUE_MAPPING.put(Short.class.getCanonicalName(), (short) 0); + PRIMITIVE_VALUE_MAPPING.put(Integer.class.getCanonicalName(), 0); + PRIMITIVE_VALUE_MAPPING.put(Long.class.getCanonicalName(), 0L); + PRIMITIVE_VALUE_MAPPING.put(Float.class.getCanonicalName(), 0f); + PRIMITIVE_VALUE_MAPPING.put(Double.class.getCanonicalName(), 0d); + PRIMITIVE_VALUE_MAPPING.put(BigInteger.class.getCanonicalName(), BigInteger.ZERO); + PRIMITIVE_VALUE_MAPPING.put(BigDecimal.class.getCanonicalName(), BigDecimal.ZERO); + PRIMITIVE_VALUE_MAPPING.put(Date.class.getCanonicalName(), new Date(0)); + PRIMITIVE_VALUE_MAPPING.put(String.class.getCanonicalName(), ""); + } + + private Set fields; + + @Override + public void configure(Map props) { + final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props); + fields = new HashSet<>(config.getList(FIELDS_CONFIG)); + } + + @Override + public R apply(R record) { + if (fields.isEmpty()) return record; + + if (operatingSchema(record) == null) { + return applySchemaless(record); + } else { + return applyWithSchema(record); + } + } + + private R applySchemaless(R record) { + final Map value = requireMap(operatingValue(record), PURPOSE); + final HashMap updatedValue = new HashMap<>(value); + for (String field : fields) { + updatedValue.put(field, masked(value.get(field))); + } + return updatedRecord(record, updatedValue); + } + + private R applyWithSchema(R record) { + final Struct value = requireStruct(operatingValue(record), PURPOSE); + final Struct updatedValue = new Struct(value.schema()); + for (Field field : value.schema().fields()) { + final Object origFieldValue = value.get(field); + updatedValue.put(field, fields.contains(field.name()) ? masked(origFieldValue) : origFieldValue); + } + return updatedRecord(record, updatedValue); + } + + private static Object masked(Object value) { + if (value == null) + return null; + Object maskedValue = PRIMITIVE_VALUE_MAPPING.get(value.getClass().getCanonicalName()); + if (maskedValue == null) { + if (value instanceof List) + maskedValue = Collections.emptyList(); + else if (value instanceof Map) + maskedValue = Collections.emptyMap(); + else + throw new DataException("Cannot mask value of type: " + value.getClass()); + } + return maskedValue; + } + + @Override + public ConfigDef config() { + return CONFIG_DEF; + } + + @Override + public void close() { + } + + protected abstract Schema operatingSchema(R record); + + protected abstract Object operatingValue(R record); + + protected abstract R updatedRecord(R base, Object value); + + public static final class Key> extends MaskField { + @Override + protected Schema operatingSchema(R record) { + return record.keySchema(); + } + + @Override + protected Object operatingValue(R record) { + return record.key(); + } + + @Override + protected R updatedRecord(R record, Object updatedValue) { + return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), updatedValue, record.valueSchema(), record.value(), record.timestamp()); + } + } + + public static final class Value> extends MaskField { + @Override + protected Schema operatingSchema(R record) { + return record.valueSchema(); + } + + @Override + protected Object operatingValue(R record) { + return record.value(); + } + + @Override + protected R updatedRecord(R record, Object updatedValue) { + return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), record.valueSchema(), updatedValue, record.timestamp()); + } + } + +} diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/MaskFieldTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/MaskFieldTest.java new file mode 100644 index 0000000000000..c96058afbe6de --- /dev/null +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/MaskFieldTest.java @@ -0,0 +1,156 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.transforms; + +import org.apache.kafka.connect.data.Decimal; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.data.Time; +import org.apache.kafka.connect.data.Timestamp; +import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.Test; + +import java.math.BigDecimal; +import java.math.BigInteger; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class MaskFieldTest { + + private static MaskField transform(List fields) { + final MaskField xform = new MaskField.Value<>(); + xform.configure(Collections.singletonMap("fields", fields)); + return xform; + } + + private static SinkRecord record(Schema schema, Object value) { + return new SinkRecord("", 0, null, null, schema, value, 0); + } + + @Test + public void schemaless() { + final Map value = new HashMap<>(); + value.put("magic", 42); + value.put("bool", true); + value.put("byte", (byte) 42); + value.put("short", (short) 42); + value.put("int", 42); + value.put("long", 42L); + value.put("float", 42f); + value.put("double", 42d); + value.put("string", "blabla"); + value.put("date", new Date()); + value.put("bigint", new BigInteger("42")); + value.put("bigdec", new BigDecimal("42.0")); + value.put("list", Collections.singletonList(42)); + value.put("map", Collections.singletonMap("key", "value")); + + final List maskFields = new ArrayList<>(value.keySet()); + maskFields.remove("magic"); + + final Map updatedValue = (Map) transform(maskFields).apply(record(null, value)).value(); + + assertEquals(42, updatedValue.get("magic")); + assertEquals(false, updatedValue.get("bool")); + assertEquals((byte) 0, updatedValue.get("byte")); + assertEquals((short) 0, updatedValue.get("short")); + assertEquals(0, updatedValue.get("int")); + assertEquals(0L, updatedValue.get("long")); + assertEquals(0f, updatedValue.get("float")); + assertEquals(0d, updatedValue.get("double")); + assertEquals("", updatedValue.get("string")); + assertEquals(new Date(0), updatedValue.get("date")); + assertEquals(BigInteger.ZERO, updatedValue.get("bigint")); + assertEquals(BigDecimal.ZERO, updatedValue.get("bigdec")); + assertEquals(Collections.emptyList(), updatedValue.get("list")); + assertEquals(Collections.emptyMap(), updatedValue.get("map")); + } + + @Test + public void withSchema() { + Schema schema = SchemaBuilder.struct() + .field("magic", Schema.INT32_SCHEMA) + .field("bool", Schema.BOOLEAN_SCHEMA) + .field("byte", Schema.INT8_SCHEMA) + .field("short", Schema.INT16_SCHEMA) + .field("int", Schema.INT32_SCHEMA) + .field("long", Schema.INT64_SCHEMA) + .field("float", Schema.FLOAT32_SCHEMA) + .field("double", Schema.FLOAT64_SCHEMA) + .field("string", Schema.STRING_SCHEMA) + .field("date", org.apache.kafka.connect.data.Date.SCHEMA) + .field("time", Time.SCHEMA) + .field("timestamp", Timestamp.SCHEMA) + .field("decimal", Decimal.schema(0)) + .field("array", SchemaBuilder.array(Schema.INT32_SCHEMA)) + .field("map", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA)) + .build(); + + final Struct value = new Struct(schema); + value.put("magic", 42); + value.put("bool", true); + value.put("byte", (byte) 42); + value.put("short", (short) 42); + value.put("int", 42); + value.put("long", 42L); + value.put("float", 42f); + value.put("double", 42d); + value.put("string", "hmm"); + value.put("date", new Date()); + value.put("time", new Date()); + value.put("timestamp", new Date()); + value.put("decimal", new BigDecimal(42)); + value.put("array", Arrays.asList(1, 2, 3)); + value.put("map", Collections.singletonMap("what", "what")); + + final List maskFields = new ArrayList<>(schema.fields().size()); + for (Field field: schema.fields()) { + if (!field.name().equals("magic")) { + maskFields.add(field.name()); + } + } + + final Struct updatedValue = (Struct) transform(maskFields).apply(record(schema, value)).value(); + + assertEquals(42, updatedValue.get("magic")); + assertEquals(false, updatedValue.get("bool")); + assertEquals((byte) 0, updatedValue.get("byte")); + assertEquals((short) 0, updatedValue.get("short")); + assertEquals(0, updatedValue.get("int")); + assertEquals(0L, updatedValue.get("long")); + assertEquals(0f, updatedValue.get("float")); + assertEquals(0d, updatedValue.get("double")); + assertEquals("", updatedValue.get("string")); + assertEquals(new Date(0), updatedValue.get("date")); + assertEquals(new Date(0), updatedValue.get("time")); + assertEquals(new Date(0), updatedValue.get("timestamp")); + assertEquals(BigDecimal.ZERO, updatedValue.get("decimal")); + assertEquals(Collections.emptyList(), updatedValue.get("array")); + assertEquals(Collections.emptyMap(), updatedValue.get("map")); + } + +} From db7faf0026651fbe288a75d766e224644a69ffc7 Mon Sep 17 00:00:00 2001 From: Shikhar Bhushan Date: Wed, 18 Jan 2017 13:58:59 -0800 Subject: [PATCH 13/23] empty comment --- .../java/org/apache/kafka/connect/transforms/InsertField.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java index 31f66d7770841..c798be66f799a 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java @@ -246,9 +246,6 @@ public ConfigDef config() { protected abstract R newRecord(R record, Schema updatedSchema, Object updatedValue); - /** - * - */ public static class Key> extends InsertField { @Override From b4aa9ee2a60d5bcc66bc161c055d07e0795c163b Mon Sep 17 00:00:00 2001 From: Shikhar Bhushan Date: Wed, 18 Jan 2017 14:26:07 -0800 Subject: [PATCH 14/23] Missed note in overview doc for MaskField SMT --- .../java/org/apache/kafka/connect/transforms/MaskField.java | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java index 8c545762b8be7..35e7fce300c2a 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java @@ -40,7 +40,10 @@ public abstract class MaskField> implements Transformation { - public static final String OVERVIEW_DOC = "Mask specified fields with a valid null value for the field type (i.e. 0, false, empty string, and so on)."; + public static final String OVERVIEW_DOC = + "Mask specified fields with a valid null value for the field type (i.e. 0, false, empty string, and so on)." + + "

Use the concrete transformation type designed for the record key (" + Key.class.getCanonicalName() + ") " + + "or value (" + Value.class.getCanonicalName() + ")."; private static final String FIELDS_CONFIG = "fields"; From 3186c9f9312ab383713cbce28d84a4f9647479f1 Mon Sep 17 00:00:00 2001 From: Shikhar Bhushan Date: Wed, 18 Jan 2017 16:01:09 -0800 Subject: [PATCH 15/23] Use Class object rather than canon name in masked value mappings --- .../kafka/connect/transforms/MaskField.java | 26 +++++++++---------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java index 35e7fce300c2a..d280f6b4838cb 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java @@ -52,20 +52,20 @@ public abstract class MaskField> implements Transform private static final String PURPOSE = "mask fields"; - private static final Map PRIMITIVE_VALUE_MAPPING = new HashMap<>(); + private static final Map, Object> PRIMITIVE_VALUE_MAPPING = new HashMap<>(); static { - PRIMITIVE_VALUE_MAPPING.put(Boolean.class.getCanonicalName(), Boolean.FALSE); - PRIMITIVE_VALUE_MAPPING.put(Byte.class.getCanonicalName(), (byte) 0); - PRIMITIVE_VALUE_MAPPING.put(Short.class.getCanonicalName(), (short) 0); - PRIMITIVE_VALUE_MAPPING.put(Integer.class.getCanonicalName(), 0); - PRIMITIVE_VALUE_MAPPING.put(Long.class.getCanonicalName(), 0L); - PRIMITIVE_VALUE_MAPPING.put(Float.class.getCanonicalName(), 0f); - PRIMITIVE_VALUE_MAPPING.put(Double.class.getCanonicalName(), 0d); - PRIMITIVE_VALUE_MAPPING.put(BigInteger.class.getCanonicalName(), BigInteger.ZERO); - PRIMITIVE_VALUE_MAPPING.put(BigDecimal.class.getCanonicalName(), BigDecimal.ZERO); - PRIMITIVE_VALUE_MAPPING.put(Date.class.getCanonicalName(), new Date(0)); - PRIMITIVE_VALUE_MAPPING.put(String.class.getCanonicalName(), ""); + PRIMITIVE_VALUE_MAPPING.put(Boolean.class, Boolean.FALSE); + PRIMITIVE_VALUE_MAPPING.put(Byte.class, (byte) 0); + PRIMITIVE_VALUE_MAPPING.put(Short.class, (short) 0); + PRIMITIVE_VALUE_MAPPING.put(Integer.class, 0); + PRIMITIVE_VALUE_MAPPING.put(Long.class, 0L); + PRIMITIVE_VALUE_MAPPING.put(Float.class, 0f); + PRIMITIVE_VALUE_MAPPING.put(Double.class, 0d); + PRIMITIVE_VALUE_MAPPING.put(BigInteger.class, BigInteger.ZERO); + PRIMITIVE_VALUE_MAPPING.put(BigDecimal.class, BigDecimal.ZERO); + PRIMITIVE_VALUE_MAPPING.put(Date.class, new Date(0)); + PRIMITIVE_VALUE_MAPPING.put(String.class, ""); } private Set fields; @@ -109,7 +109,7 @@ private R applyWithSchema(R record) { private static Object masked(Object value) { if (value == null) return null; - Object maskedValue = PRIMITIVE_VALUE_MAPPING.get(value.getClass().getCanonicalName()); + Object maskedValue = PRIMITIVE_VALUE_MAPPING.get(value.getClass()); if (maskedValue == null) { if (value instanceof List) maskedValue = Collections.emptyList(); From 9bb590857ab589bcbe378a049382ff6cf8ee5224 Mon Sep 17 00:00:00 2001 From: Shikhar Bhushan Date: Thu, 19 Jan 2017 10:24:27 -0800 Subject: [PATCH 16/23] MaskField.fields -> maskedFields for clarity --- .../org/apache/kafka/connect/transforms/MaskField.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java index d280f6b4838cb..8baa2c0651354 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java @@ -68,17 +68,17 @@ public abstract class MaskField> implements Transform PRIMITIVE_VALUE_MAPPING.put(String.class, ""); } - private Set fields; + private Set maskedFields; @Override public void configure(Map props) { final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props); - fields = new HashSet<>(config.getList(FIELDS_CONFIG)); + maskedFields = new HashSet<>(config.getList(FIELDS_CONFIG)); } @Override public R apply(R record) { - if (fields.isEmpty()) return record; + if (maskedFields.isEmpty()) return record; if (operatingSchema(record) == null) { return applySchemaless(record); @@ -90,7 +90,7 @@ public R apply(R record) { private R applySchemaless(R record) { final Map value = requireMap(operatingValue(record), PURPOSE); final HashMap updatedValue = new HashMap<>(value); - for (String field : fields) { + for (String field : maskedFields) { updatedValue.put(field, masked(value.get(field))); } return updatedRecord(record, updatedValue); @@ -101,7 +101,7 @@ private R applyWithSchema(R record) { final Struct updatedValue = new Struct(value.schema()); for (Field field : value.schema().fields()) { final Object origFieldValue = value.get(field); - updatedValue.put(field, fields.contains(field.name()) ? masked(origFieldValue) : origFieldValue); + updatedValue.put(field, maskedFields.contains(field.name()) ? masked(origFieldValue) : origFieldValue); } return updatedRecord(record, updatedValue); } From 8676dc53d0e74d5f04fe01e8b3d20c05f46b5c07 Mon Sep 17 00:00:00 2001 From: Shikhar Bhushan Date: Thu, 19 Jan 2017 10:24:46 -0800 Subject: [PATCH 17/23] Clearer overview doc for ValueToKey --- .../java/org/apache/kafka/connect/transforms/ValueToKey.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java index 4a5d097edb44a..0015e26775a98 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java @@ -36,7 +36,7 @@ public class ValueToKey> implements Transformation { - public static final String OVERVIEW_DOC = "Copy fields from the record value to the record key. The existing record key if any is clobbered."; + public static final String OVERVIEW_DOC = "Replace the record key with a new key formed from a subset of fields in the record value."; public static final String FIELDS_CONFIG = "fields"; From 16c9259a90b14df20ff389e87d5b60c6864fd75b Mon Sep 17 00:00:00 2001 From: Shikhar Bhushan Date: Thu, 19 Jan 2017 10:45:54 -0800 Subject: [PATCH 18/23] Config validation, try to fail-fast on no-op configs --- .../kafka/connect/transforms/InsertField.java | 9 ++-- .../kafka/connect/transforms/MaskField.java | 6 +-- .../kafka/connect/transforms/RegexRouter.java | 3 +- .../connect/transforms/SetSchemaMetadata.java | 6 ++- .../kafka/connect/transforms/ValueToKey.java | 3 +- .../util/NonEmptyListValidator.java | 41 ++++++++++++++++++ .../transforms/util/RegexValidator.java | 43 +++++++++++++++++++ 7 files changed, 101 insertions(+), 10 deletions(-) create mode 100644 connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/NonEmptyListValidator.java create mode 100644 connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/RegexValidator.java diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java index c798be66f799a..b148c6d6e567b 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java @@ -21,6 +21,7 @@ import org.apache.kafka.common.cache.LRUCache; import org.apache.kafka.common.cache.SynchronizedCache; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; @@ -100,7 +101,6 @@ public static InsertionSpec parse(String spec) { private InsertionSpec timestampField; private InsertionSpec staticField; private String staticValue; - private boolean applicable; private Cache schemaUpdateCache; @@ -113,15 +113,16 @@ public void configure(Map props) { timestampField = InsertionSpec.parse(config.getString(ConfigName.TIMESTAMP_FIELD)); staticField = InsertionSpec.parse(config.getString(ConfigName.STATIC_FIELD)); staticValue = config.getString(ConfigName.STATIC_VALUE); - applicable = topicField != null || partitionField != null || offsetField != null || timestampField != null; + + if (topicField == null && partitionField == null && offsetField == null && timestampField == null && staticField == null) { + throw new ConfigException("No field insertion configured"); + } schemaUpdateCache = new SynchronizedCache<>(new LRUCache(16)); } @Override public R apply(R record) { - if (!applicable) return record; - if (operatingSchema(record) == null) { return applySchemaless(record); } else { diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java index 8baa2c0651354..dc5fb7abfef5c 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java @@ -18,11 +18,13 @@ package org.apache.kafka.connect.transforms; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.transforms.util.NonEmptyListValidator; import org.apache.kafka.connect.transforms.util.SimpleConfig; import java.math.BigDecimal; @@ -48,7 +50,7 @@ public abstract class MaskField> implements Transform private static final String FIELDS_CONFIG = "fields"; public static final ConfigDef CONFIG_DEF = new ConfigDef() - .define(FIELDS_CONFIG, ConfigDef.Type.LIST, "", ConfigDef.Importance.HIGH, "Names of fields to mask."); + .define(FIELDS_CONFIG, ConfigDef.Type.LIST, ConfigDef.NO_DEFAULT_VALUE, new NonEmptyListValidator(), ConfigDef.Importance.HIGH, "Names of fields to mask."); private static final String PURPOSE = "mask fields"; @@ -78,8 +80,6 @@ public void configure(Map props) { @Override public R apply(R record) { - if (maskedFields.isEmpty()) return record; - if (operatingSchema(record) == null) { return applySchemaless(record); } else { diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/RegexRouter.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/RegexRouter.java index 394c74ad3cc52..f16560e3c5748 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/RegexRouter.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/RegexRouter.java @@ -19,6 +19,7 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.transforms.util.RegexValidator; import org.apache.kafka.connect.transforms.util.SimpleConfig; import java.util.Map; @@ -32,7 +33,7 @@ public class RegexRouter> implements Transformationjava.util.regex.Matcher#replaceFirst() is used with the replacement string to obtain the new topic."; public static final ConfigDef CONFIG_DEF = new ConfigDef() - .define(ConfigName.REGEX, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.HIGH, + .define(ConfigName.REGEX, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, new RegexValidator(), ConfigDef.Importance.HIGH, "Regular expression to use for matching.") .define(ConfigName.REPLACEMENT, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.HIGH, "Replacement string."); diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java index 890723c08e856..e0c337cb4647e 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java @@ -18,6 +18,7 @@ package org.apache.kafka.connect.transforms; import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.data.ConnectSchema; import org.apache.kafka.connect.data.Schema; @@ -50,11 +51,14 @@ public void configure(Map configs) { final SimpleConfig config = new SimpleConfig(CONFIG_DEF, configs); schemaName = config.getString(ConfigName.SCHEMA_NAME); schemaVersion = config.getInt(ConfigName.SCHEMA_VERSION); + + if (schemaName == null && schemaVersion == null) { + throw new ConfigException("Neither schema name nor version configured"); + } } @Override public R apply(R record) { - if (schemaName == null && schemaVersion == null) return record; // no-op final Schema schema = operatingSchema(record); requireSchema(schema, "updating schema metadata"); final boolean isArray = schema.type() == Schema.Type.ARRAY; diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java index 0015e26775a98..504da541f6574 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ValueToKey.java @@ -25,6 +25,7 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.transforms.util.NonEmptyListValidator; import org.apache.kafka.connect.transforms.util.SimpleConfig; import java.util.HashMap; @@ -41,7 +42,7 @@ public class ValueToKey> implements Transformation public static final String FIELDS_CONFIG = "fields"; public static final ConfigDef CONFIG_DEF = new ConfigDef() - .define(FIELDS_CONFIG, ConfigDef.Type.LIST, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Importance.HIGH, + .define(FIELDS_CONFIG, ConfigDef.Type.LIST, ConfigDef.NO_DEFAULT_VALUE, new NonEmptyListValidator(), ConfigDef.Importance.HIGH, "Field names on the record value to extract as the record key."); private static final String PURPOSE = "copying fields from value to key"; diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/NonEmptyListValidator.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/NonEmptyListValidator.java new file mode 100644 index 0000000000000..dee62a02c7776 --- /dev/null +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/NonEmptyListValidator.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.kafka.connect.transforms.util; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; + +import java.util.List; + +public class NonEmptyListValidator implements ConfigDef.Validator { + + @Override + public void ensureValid(String name, Object value) { + if (((List) value).isEmpty()) { + throw new ConfigException(name, value, "Empty list"); + } + } + + @Override + public String toString() { + return "non-empty list"; + } + +} diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/RegexValidator.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/RegexValidator.java new file mode 100644 index 0000000000000..22ba686f3339e --- /dev/null +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/RegexValidator.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.kafka.connect.transforms.util; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; + +import java.util.regex.Pattern; + +public class RegexValidator implements ConfigDef.Validator { + + @Override + public void ensureValid(String name, Object value) { + try { + Pattern.compile((String) value); + } catch (Exception e) { + throw new ConfigException(name, value, "Invalid regex: " + e.getMessage()); + } + } + + @Override + public String toString() { + return "valid regex"; + } + +} From a374bf9a2806e3dc3cdd699debc4787216dc6bd3 Mon Sep 17 00:00:00 2001 From: Shikhar Bhushan Date: Thu, 19 Jan 2017 10:54:54 -0800 Subject: [PATCH 19/23] checkstyle --- .../kafka/connect/transforms/MaskField.java | 1 - .../util/NonEmptyListValidator.java | 34 +++++++++---------- .../transforms/util/RegexValidator.java | 34 +++++++++---------- 3 files changed, 32 insertions(+), 37 deletions(-) diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java index dc5fb7abfef5c..212b8b2f7db97 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java @@ -18,7 +18,6 @@ package org.apache.kafka.connect.transforms; import org.apache.kafka.common.config.ConfigDef; -import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.data.Field; import org.apache.kafka.connect.data.Schema; diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/NonEmptyListValidator.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/NonEmptyListValidator.java index dee62a02c7776..1abbbc8aaf081 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/NonEmptyListValidator.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/NonEmptyListValidator.java @@ -1,21 +1,19 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ package org.apache.kafka.connect.transforms.util; diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/RegexValidator.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/RegexValidator.java index 22ba686f3339e..9713b27446f47 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/RegexValidator.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/RegexValidator.java @@ -1,21 +1,19 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ package org.apache.kafka.connect.transforms.util; From 7ee22e5bcd51699fe5d6ed304b0a4f460656f397 Mon Sep 17 00:00:00 2001 From: Shikhar Bhushan Date: Thu, 19 Jan 2017 14:40:12 -0800 Subject: [PATCH 20/23] ReplaceField SMT and some consistency/cleanups refactoring --- .../connect/tools/TransformationDoc.java | 2 + .../kafka/connect/transforms/InsertField.java | 56 ++--- .../kafka/connect/transforms/MaskField.java | 10 +- .../connect/transforms/ReplaceField.java | 230 ++++++++++++++++++ .../connect/transforms/SetSchemaMetadata.java | 8 +- .../connect/transforms/util/SchemaUtil.java | 40 +++ .../connect/transforms/ReplaceFieldTest.java | 92 +++++++ 7 files changed, 393 insertions(+), 45 deletions(-) create mode 100644 connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java create mode 100644 connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/SchemaUtil.java create mode 100644 connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java index d840ea69ed38b..6746042baa743 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java @@ -23,6 +23,7 @@ import org.apache.kafka.connect.transforms.InsertField; import org.apache.kafka.connect.transforms.MaskField; import org.apache.kafka.connect.transforms.RegexRouter; +import org.apache.kafka.connect.transforms.ReplaceField; import org.apache.kafka.connect.transforms.SetSchemaMetadata; import org.apache.kafka.connect.transforms.TimestampRouter; import org.apache.kafka.connect.transforms.ValueToKey; @@ -47,6 +48,7 @@ private DocInfo(String transformationName, String overview, ConfigDef configDef) private static final List TRANSFORMATIONS = Arrays.asList( new DocInfo(InsertField.class.getName(), InsertField.OVERVIEW_DOC, InsertField.CONFIG_DEF), + new DocInfo(ReplaceField.class.getName(), ReplaceField.OVERVIEW_DOC, ReplaceField.CONFIG_DEF), new DocInfo(MaskField.class.getName(), MaskField.OVERVIEW_DOC, MaskField.CONFIG_DEF), new DocInfo(ValueToKey.class.getName(), ValueToKey.OVERVIEW_DOC, ValueToKey.CONFIG_DEF), new DocInfo(HoistField.class.getName(), HoistField.OVERVIEW_DOC, HoistField.CONFIG_DEF), diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java index b148c6d6e567b..3c452c3fd57e6 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java @@ -29,6 +29,7 @@ import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.data.Timestamp; import org.apache.kafka.connect.transforms.util.SimpleConfig; +import org.apache.kafka.connect.transforms.util.SchemaUtil; import java.util.Date; import java.util.HashMap; @@ -165,24 +166,31 @@ private R applyWithSchema(R record) { final Struct updatedValue = new Struct(updatedSchema); - copyFields(value, updatedValue); + for (Field field : value.schema().fields()) { + updatedValue.put(field.name(), value.get(field)); + } - insertFields(record, updatedValue); + if (topicField != null) { + updatedValue.put(topicField.name, record.topic()); + } + if (partitionField != null && record.kafkaPartition() != null) { + updatedValue.put(partitionField.name, record.kafkaPartition()); + } + if (offsetField != null) { + updatedValue.put(offsetField.name, requireSinkRecord(record, PURPOSE).kafkaOffset()); + } + if (timestampField != null && record.timestamp() != null) { + updatedValue.put(timestampField.name, new Date(record.timestamp())); + } + if (staticField != null && staticValue != null) { + updatedValue.put(staticField.name, staticValue); + } return newRecord(record, updatedSchema, updatedValue); } private Schema makeUpdatedSchema(Schema schema) { - final SchemaBuilder builder = SchemaBuilder.struct(); - - builder.name(schema.name()); - builder.version(schema.version()); - builder.doc(schema.doc()); - - final Map params = schema.parameters(); - if (params != null) { - builder.parameters(params); - } + final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct()); for (Field field : schema.fields()) { builder.field(field.name(), field.schema()); @@ -207,30 +215,6 @@ private Schema makeUpdatedSchema(Schema schema) { return builder.build(); } - private void copyFields(Struct value, Struct updatedValue) { - for (Field field : value.schema().fields()) { - updatedValue.put(field.name(), value.get(field)); - } - } - - private void insertFields(R record, Struct value) { - if (topicField != null) { - value.put(topicField.name, record.topic()); - } - if (partitionField != null && record.kafkaPartition() != null) { - value.put(partitionField.name, record.kafkaPartition()); - } - if (offsetField != null) { - value.put(offsetField.name, requireSinkRecord(record, PURPOSE).kafkaOffset()); - } - if (timestampField != null && record.timestamp() != null) { - value.put(timestampField.name, new Date(record.timestamp())); - } - if (staticField != null && staticValue != null) { - value.put(staticField.name, staticValue); - } - } - @Override public void close() { schemaUpdateCache = null; diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java index 212b8b2f7db97..d7ef2aa8429ad 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/MaskField.java @@ -92,7 +92,7 @@ private R applySchemaless(R record) { for (String field : maskedFields) { updatedValue.put(field, masked(value.get(field))); } - return updatedRecord(record, updatedValue); + return newRecord(record, updatedValue); } private R applyWithSchema(R record) { @@ -102,7 +102,7 @@ private R applyWithSchema(R record) { final Object origFieldValue = value.get(field); updatedValue.put(field, maskedFields.contains(field.name()) ? masked(origFieldValue) : origFieldValue); } - return updatedRecord(record, updatedValue); + return newRecord(record, updatedValue); } private static Object masked(Object value) { @@ -133,7 +133,7 @@ public void close() { protected abstract Object operatingValue(R record); - protected abstract R updatedRecord(R base, Object value); + protected abstract R newRecord(R base, Object value); public static final class Key> extends MaskField { @Override @@ -147,7 +147,7 @@ protected Object operatingValue(R record) { } @Override - protected R updatedRecord(R record, Object updatedValue) { + protected R newRecord(R record, Object updatedValue) { return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), updatedValue, record.valueSchema(), record.value(), record.timestamp()); } } @@ -164,7 +164,7 @@ protected Object operatingValue(R record) { } @Override - protected R updatedRecord(R record, Object updatedValue) { + protected R newRecord(R record, Object updatedValue) { return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), record.valueSchema(), updatedValue, record.timestamp()); } } diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java new file mode 100644 index 0000000000000..6faf84250af07 --- /dev/null +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java @@ -0,0 +1,230 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.transforms; + +import org.apache.kafka.common.cache.Cache; +import org.apache.kafka.common.cache.LRUCache; +import org.apache.kafka.common.cache.SynchronizedCache; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.transforms.util.SchemaUtil; +import org.apache.kafka.connect.transforms.util.SimpleConfig; + +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.kafka.connect.transforms.util.Requirements.requireMap; +import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct; + +public abstract class ReplaceField> implements Transformation { + + public static final String OVERVIEW_DOC = "Filter or rename fields."; + + interface ConfigName { + String BLACKLIST = "blacklist"; + String WHITELIST = "whitelist"; + String RENAME = "renames"; + } + + public static final ConfigDef CONFIG_DEF = new ConfigDef() + .define(ConfigName.BLACKLIST, ConfigDef.Type.LIST, Collections.emptyList(), ConfigDef.Importance.MEDIUM, + "Fields to exclude. This takes precedence over the whitelist.") + .define(ConfigName.WHITELIST, ConfigDef.Type.LIST, Collections.emptyList(), ConfigDef.Importance.MEDIUM, + "Fields to include. If specified, only these fields will be used.") + .define(ConfigName.RENAME, ConfigDef.Type.LIST, Collections.emptyList(), new ConfigDef.Validator() { + @Override + public void ensureValid(String name, Object value) { + parseRenameMappings((List) value); + } + + @Override + public String toString() { + return "list of colon-delimited pairs, e.g. foo:bar,abc:xyz"; + } + }, ConfigDef.Importance.MEDIUM, "Field rename mappings."); + + private static final String PURPOSE = "field replacement"; + + private List blacklist; + private List whitelist; + private Map renames; + private Map reverseRenames; + + private Cache schemaUpdateCache; + + @Override + public void configure(Map configs) { + final SimpleConfig config = new SimpleConfig(CONFIG_DEF, configs); + blacklist = config.getList(ConfigName.BLACKLIST); + whitelist = config.getList(ConfigName.WHITELIST); + renames = parseRenameMappings(config.getList(ConfigName.RENAME)); + reverseRenames = invert(renames); + + schemaUpdateCache = new SynchronizedCache<>(new LRUCache(16)); + } + + static Map parseRenameMappings(List mappings) { + final Map m = new HashMap<>(); + for (String mapping : mappings) { + final String[] parts = mapping.split(":"); + if (parts.length != 2) { + throw new ConfigException(ConfigName.RENAME, mappings, "Invalid rename mapping: " + mapping); + } + m.put(parts[0], parts[1]); + } + return m; + } + + static Map invert(Map source) { + final Map m = new HashMap<>(); + for (Map.Entry e : source.entrySet()) { + m.put(e.getValue(), e.getKey()); + } + return m; + } + + boolean filter(String fieldName) { + return !blacklist.contains(fieldName) && (whitelist.isEmpty() || whitelist.contains(fieldName)); + } + + String renamed(String fieldName) { + final String mapping = renames.get(fieldName); + return mapping == null ? fieldName : mapping; + } + + String reverseRenamed(String fieldName) { + final String mapping = reverseRenames.get(fieldName); + return mapping == null ? fieldName : mapping; + } + + @Override + public R apply(R record) { + if (operatingSchema(record) == null) { + return applySchemaless(record); + } else { + return applyWithSchema(record); + } + } + + private R applySchemaless(R record) { + final Map value = requireMap(operatingValue(record), PURPOSE); + + final Map updatedValue = new HashMap<>(value.size()); + + for (Map.Entry e : value.entrySet()) { + final String fieldName = e.getKey(); + if (filter(fieldName)) { + final Object fieldValue = e.getValue(); + updatedValue.put(renamed(fieldName), fieldValue); + } + } + + return newRecord(record, null, updatedValue); + } + + private R applyWithSchema(R record) { + final Struct value = requireStruct(operatingValue(record), PURPOSE); + + Schema updatedSchema = schemaUpdateCache.get(value.schema()); + if (updatedSchema == null) { + updatedSchema = makeUpdatedSchema(value.schema()); + schemaUpdateCache.put(value.schema(), updatedSchema); + } + + final Struct updatedValue = new Struct(updatedSchema); + + for (Field field : updatedSchema.fields()) { + final Object fieldValue = value.get(reverseRenamed(field.name())); + updatedValue.put(field.name(), fieldValue); + } + + return newRecord(record, updatedSchema, updatedValue); + } + + private Schema makeUpdatedSchema(Schema schema) { + final SchemaBuilder builder = SchemaUtil.copySchemaBasics(schema, SchemaBuilder.struct()); + for (Field field : schema.fields()) { + if (filter(field.name())) { + builder.field(renamed(field.name()), field.schema()); + } + } + return builder.build(); + } + + @Override + public ConfigDef config() { + return CONFIG_DEF; + } + + @Override + public void close() { + schemaUpdateCache = null; + } + + protected abstract Schema operatingSchema(R record); + + protected abstract Object operatingValue(R record); + + protected abstract R newRecord(R record, Schema updatedSchema, Object updatedValue); + + public static class Key> extends ReplaceField { + + @Override + protected Schema operatingSchema(R record) { + return record.keySchema(); + } + + @Override + protected Object operatingValue(R record) { + return record.key(); + } + + @Override + protected R newRecord(R record, Schema updatedSchema, Object updatedValue) { + return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, updatedValue, record.valueSchema(), record.value(), record.timestamp()); + } + + } + + public static class Value> extends ReplaceField { + + @Override + protected Schema operatingSchema(R record) { + return record.valueSchema(); + } + + @Override + protected Object operatingValue(R record) { + return record.value(); + } + + @Override + protected R newRecord(R record, Schema updatedSchema, Object updatedValue) { + return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, updatedValue, record.timestamp()); + } + + } + +} diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java index e0c337cb4647e..f3076b445719b 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/SetSchemaMetadata.java @@ -75,7 +75,7 @@ public R apply(R record) { isMap ? schema.keySchema() : null, isMap || isArray ? schema.valueSchema() : null ); - return updatedRecord(record, updatedSchema); + return newRecord(record, updatedSchema); } @Override @@ -89,7 +89,7 @@ public void close() { protected abstract Schema operatingSchema(R record); - protected abstract R updatedRecord(R record, Schema updatedSchema); + protected abstract R newRecord(R record, Schema updatedSchema); /** * Set the schema name, version or both on the record's key schema. @@ -101,7 +101,7 @@ protected Schema operatingSchema(R record) { } @Override - protected R updatedRecord(R record, Schema updatedSchema) { + protected R newRecord(R record, Schema updatedSchema) { return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, record.key(), record.valueSchema(), record.value(), record.timestamp()); } } @@ -116,7 +116,7 @@ protected Schema operatingSchema(R record) { } @Override - protected R updatedRecord(R record, Schema updatedSchema) { + protected R newRecord(R record, Schema updatedSchema) { return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), updatedSchema, record.value(), record.timestamp()); } } diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/SchemaUtil.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/SchemaUtil.java new file mode 100644 index 0000000000000..da261e79b22de --- /dev/null +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/util/SchemaUtil.java @@ -0,0 +1,40 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.transforms.util; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; + +import java.util.Map; + +public class SchemaUtil { + + public static SchemaBuilder copySchemaBasics(Schema source, SchemaBuilder builder) { + builder.name(source.name()); + builder.version(source.version()); + builder.doc(source.doc()); + + final Map params = source.parameters(); + if (params != null) { + builder.parameters(params); + } + + return builder; + } + +} diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java new file mode 100644 index 0000000000000..9f9d4b7031b52 --- /dev/null +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/ReplaceFieldTest.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.transforms; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.sink.SinkRecord; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; + +public class ReplaceFieldTest { + + @Test + public void schemaless() { + final ReplaceField xform = new ReplaceField.Value<>(); + + final Map props = new HashMap<>(); + props.put("blacklist", "dont"); + props.put("renames", "abc:xyz,foo:bar"); + + xform.configure(props); + + final Map value = new HashMap<>(); + value.put("dont", "whatever"); + value.put("abc", 42); + value.put("foo", true); + value.put("etc", "etc"); + + final SinkRecord record = new SinkRecord("test", 0, null, null, null, value, 0); + final SinkRecord transformedRecord = xform.apply(record); + + final Map updatedValue = (Map) transformedRecord.value(); + assertEquals(3, updatedValue.size()); + assertEquals(42, updatedValue.get("xyz")); + assertEquals(true, updatedValue.get("bar")); + assertEquals("etc", updatedValue.get("etc")); + } + + @Test + public void withSchema() { + final ReplaceField xform = new ReplaceField.Value<>(); + + final Map props = new HashMap<>(); + props.put("whitelist", "abc,foo"); + props.put("renames", "abc:xyz,foo:bar"); + + xform.configure(props); + + final Schema schema = SchemaBuilder.struct() + .field("dont", Schema.STRING_SCHEMA) + .field("abc", Schema.INT32_SCHEMA) + .field("foo", Schema.BOOLEAN_SCHEMA) + .field("etc", Schema.STRING_SCHEMA) + .build(); + + final Struct value = new Struct(schema); + value.put("dont", "whatever"); + value.put("abc", 42); + value.put("foo", true); + value.put("etc", "etc"); + + final SinkRecord record = new SinkRecord("test", 0, null, null, schema, value, 0); + final SinkRecord transformedRecord = xform.apply(record); + + final Struct updatedValue = (Struct) transformedRecord.value(); + + assertEquals(2, updatedValue.schema().fields().size()); + assertEquals(new Integer(42), updatedValue.getInt32("xyz")); + assertEquals(true, updatedValue.getBoolean("bar")); + } + +} From aa325db8eeafbdd674e50417632671852003ff5d Mon Sep 17 00:00:00 2001 From: Shikhar Bhushan Date: Thu, 19 Jan 2017 16:00:13 -0800 Subject: [PATCH 21/23] Minor InsertField tweak -- docs and config check --- .../kafka/connect/transforms/InsertField.java | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java index 3c452c3fd57e6..f32d6ed2e9465 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertField.java @@ -55,19 +55,19 @@ private interface ConfigName { String STATIC_VALUE = "static.value"; } - private static final String OPTIONALITY_DOC = "Suffix with '!' to make this a required field, or '?' to keep it optional (the default)."; + private static final String OPTIONALITY_DOC = "Suffix with ! to make this a required field, or ? to keep it optional (the default)."; public static final ConfigDef CONFIG_DEF = new ConfigDef() .define(ConfigName.TOPIC_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, - "Field name for Kafka topic.\n" + OPTIONALITY_DOC) + "Field name for Kafka topic. " + OPTIONALITY_DOC) .define(ConfigName.PARTITION_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, - "Field name for Kafka partition.\n" + OPTIONALITY_DOC) + "Field name for Kafka partition. " + OPTIONALITY_DOC) .define(ConfigName.OFFSET_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, - "Field name for Kafka offset - only applicable to sink connectors.\n" + OPTIONALITY_DOC) + "Field name for Kafka offset - only applicable to sink connectors.
" + OPTIONALITY_DOC) .define(ConfigName.TIMESTAMP_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, - "Field name for record timestamp.\n" + OPTIONALITY_DOC) + "Field name for record timestamp. " + OPTIONALITY_DOC) .define(ConfigName.STATIC_FIELD, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, - "Field name for static data field.\n" + OPTIONALITY_DOC) + "Field name for static data field. " + OPTIONALITY_DOC) .define(ConfigName.STATIC_VALUE, ConfigDef.Type.STRING, null, ConfigDef.Importance.MEDIUM, "Static field value, if field name configured."); @@ -119,6 +119,10 @@ public void configure(Map props) { throw new ConfigException("No field insertion configured"); } + if (staticField != null && staticValue == null) { + throw new ConfigException(ConfigName.STATIC_VALUE, null, "No value specified for static field: " + staticField); + } + schemaUpdateCache = new SynchronizedCache<>(new LRUCache(16)); } From be6739638b11d14adc7ac63b3f554e3527b56068 Mon Sep 17 00:00:00 2001 From: Shikhar Bhushan Date: Thu, 19 Jan 2017 16:02:37 -0800 Subject: [PATCH 22/23] User doc for transformations in connect.html --- docs/connect.html | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/docs/connect.html b/docs/connect.html index 23e168cae0309..fb813bed20869 100644 --- a/docs/connect.html +++ b/docs/connect.html @@ -100,6 +100,22 @@

Configuring Connecto For any other options, you should consult the documentation for the connector. +

Transformations

+ + Connectors can be configured with transformations to make lightweight message-at-a-time modifications. They can be convenient for minor data massaging and routing changes. + + A transformation chain can be specified in the connector configuration. + +
    +
  • transforms - List of aliases for the transformation, implying the order in which the transformations will be applied.
  • +
  • transforms.$alias.type - Fully qualified class name for the transformation.
  • +
  • transforms.$alias.$transformationSpecificConfig Configuration properties for the transformation
  • +
+ + Several widely-applicable data and routing transformations are included with Kafka Connect: + + +

REST API

Since Kafka Connect is intended to be run as a service, it also provides a REST API for managing connectors. By default, this service runs on port 8083. The following are the currently supported endpoints: From 7eae01aee402a36da2e7d201a3fed414c778b321 Mon Sep 17 00:00:00 2001 From: Shikhar Bhushan Date: Fri, 20 Jan 2017 10:35:51 -0800 Subject: [PATCH 23/23] Better word choice --- docs/connect.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/connect.html b/docs/connect.html index fb813bed20869..1af5ed95f6051 100644 --- a/docs/connect.html +++ b/docs/connect.html @@ -107,7 +107,7 @@

Transformations -
  • transforms - List of aliases for the transformation, implying the order in which the transformations will be applied.
  • +
  • transforms - List of aliases for the transformation, specifying the order in which the transformations will be applied.
  • transforms.$alias.type - Fully qualified class name for the transformation.
  • transforms.$alias.$transformationSpecificConfig Configuration properties for the transformation