From d1af2b4c8686b03225af7e62f29a9f53f46ae2d4 Mon Sep 17 00:00:00 2001 From: Tom Bentley Date: Tue, 3 Nov 2020 15:52:19 +0000 Subject: [PATCH 1/5] KIP-145: Add unimplementated SMTs, HeaderFrom, DropHeaders and InsertHeader These SMTs were originally specified in KIP-145 but never implemented at the time. HeaderTo is not included since its original specification doesn't deal with the fact that there can be >1 header with the same name, but a field can only have a single value (which could be an array, but not if the headers for the given name had different schemas). --- .../connect/tools/TransformationDoc.java | 9 +- .../kafka/connect/transforms/DropHeaders.java | 66 ++++ .../kafka/connect/transforms/HeaderFrom.java | 219 +++++++++++ .../connect/transforms/InsertHeader.java | 70 ++++ .../connect/transforms/DropHeadersTest.java | 110 ++++++ .../connect/transforms/HeaderFromTest.java | 357 ++++++++++++++++++ .../connect/transforms/InsertHeaderTest.java | 96 +++++ 7 files changed, 926 insertions(+), 1 deletion(-) create mode 100644 connect/transforms/src/main/java/org/apache/kafka/connect/transforms/DropHeaders.java create mode 100644 connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java create mode 100644 connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertHeader.java create mode 100644 connect/transforms/src/test/java/org/apache/kafka/connect/transforms/DropHeadersTest.java create mode 100644 connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HeaderFromTest.java create mode 100644 connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertHeaderTest.java diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java index 82c2663714c47..e29a7c6c3afef 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java @@ -18,11 +18,15 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.transforms.Cast; +import org.apache.kafka.connect.transforms.DropHeaders; import org.apache.kafka.connect.transforms.ExtractField; import org.apache.kafka.connect.transforms.Filter; import org.apache.kafka.connect.transforms.Flatten; +import org.apache.kafka.connect.transforms.HeaderFrom; +import org.apache.kafka.connect.transforms.HeaderTo; import org.apache.kafka.connect.transforms.HoistField; import org.apache.kafka.connect.transforms.InsertField; +import org.apache.kafka.connect.transforms.InsertHeader; import org.apache.kafka.connect.transforms.MaskField; import org.apache.kafka.connect.transforms.RegexRouter; import org.apache.kafka.connect.transforms.ReplaceField; @@ -62,7 +66,10 @@ private DocInfo(String transformationName, String overview, ConfigDef configDef) new DocInfo(Flatten.class.getName(), Flatten.OVERVIEW_DOC, Flatten.CONFIG_DEF), new DocInfo(Cast.class.getName(), Cast.OVERVIEW_DOC, Cast.CONFIG_DEF), new DocInfo(TimestampConverter.class.getName(), TimestampConverter.OVERVIEW_DOC, TimestampConverter.CONFIG_DEF), - new DocInfo(Filter.class.getName(), Filter.OVERVIEW_DOC, Filter.CONFIG_DEF) + new DocInfo(Filter.class.getName(), Filter.OVERVIEW_DOC, Filter.CONFIG_DEF), + new DocInfo(InsertHeader.class.getName(), InsertHeader.OVERVIEW_DOC, InsertHeader.CONFIG_DEF), + new DocInfo(DropHeaders.class.getName(), DropHeaders.OVERVIEW_DOC, DropHeaders.CONFIG_DEF), + new DocInfo(HeaderFrom.class.getName(), HeaderFrom.OVERVIEW_DOC, HeaderFrom.CONFIG_DEF) ); private static void printTransformationHtml(PrintStream out, DocInfo docInfo) { diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/DropHeaders.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/DropHeaders.java new file mode 100644 index 0000000000000..afa67472e7815 --- /dev/null +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/DropHeaders.java @@ -0,0 +1,66 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.transforms; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.header.Headers; +import org.apache.kafka.connect.transforms.util.SimpleConfig; + +import java.util.List; +import java.util.Map; + +public class DropHeaders> implements Transformation { + + public static final String OVERVIEW_DOC = + "Removes one or more headers from each record."; + + public static final String HEADERS_FIELD = "header.names"; + + public static final ConfigDef CONFIG_DEF = new ConfigDef() + .define(HEADERS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, + "The name of the headers to be removed."); + + private List headers; + + @Override + public R apply(R record) { + Headers updatedHeaders = record.headers().duplicate(); + for (String name : headers) { + updatedHeaders.remove(name); + } + return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), + record.valueSchema(), record.value(), record.timestamp(), updatedHeaders); + } + + + @Override + public ConfigDef config() { + return CONFIG_DEF; + } + + @Override + public void close() { + + } + + @Override + public void configure(Map props) { + final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props); + headers = config.getList(HEADERS_FIELD); + } +} diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java new file mode 100644 index 0000000000000..94a8edb2398f1 --- /dev/null +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.transforms; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.header.Header; +import org.apache.kafka.connect.header.Headers; +import org.apache.kafka.connect.transforms.util.Requirements; +import org.apache.kafka.connect.transforms.util.SchemaUtil; +import org.apache.kafka.connect.transforms.util.SimpleConfig; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE; + +public abstract class HeaderFrom> implements Transformation { + + public static final String FIELDS_FIELD = "fields"; + public static final String HEADERS_FIELD = "headers"; + public static final String OPERATION_FIELD = "operation"; + + public static final String OVERVIEW_DOC = + "Moves or copies fields in the key/value of a record into that record's headers. " + + "Corresponding elements of " + FIELDS_FIELD + " and " + + "" + HEADERS_FIELD + " together identify a field and the header it should be " + + "moved or copied to. " + + "Use the concrete transformation type designed for the record " + + "key (" + Key.class.getName() + ") or value (" + Value.class.getName() + ")."; + + public static final ConfigDef CONFIG_DEF = new ConfigDef() + .define(FIELDS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, + "Field names in the record whose values are to be copied or moved to headers.") + .define(HEADERS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, + "Header names, in the same order as the field names listed in the fields configuration property.") + .define(OPERATION_FIELD, ConfigDef.Type.STRING, NO_DEFAULT_VALUE, + ConfigDef.ValidString.in("move", "copy"), ConfigDef.Importance.HIGH, + "Either move if the fields are to be moved to the headers (removed from the key/value), " + + "or copy if the fields are to be copied to the headers (retained in the key/value)."); + + enum Operation { + MOVE("move"), + COPY("copy"); + + private final String name; + + Operation(String name) { + this.name = name; + } + + static Operation fromName(String name) { + switch (name) { + case "move": + return MOVE; + case "copy": + return COPY; + default: + throw new IllegalArgumentException(); + } + } + + public String toString() { + return name; + } + } + + private List fields; + + private List headers; + + private Operation operation; + + @Override + public R apply(R record) { + Object operatingValue = operatingValue(record); + Schema operatingSchema = operatingSchema(record); + + if (operatingSchema == null) { + return applySchemaless(record, operatingValue); + } else { + return applyWithSchema(record, operatingValue, operatingSchema); + } + } + + private R applyWithSchema(R record, Object operatingValue, Schema operatingSchema) { + Headers updatedHeaders = record.headers().duplicate(); + Struct value = Requirements.requireStruct(operatingValue, "header " + operation); + final Schema updatedSchema; + if (operation == Operation.MOVE) { + updatedSchema = moveSchema(operatingSchema); + } else { + updatedSchema = operatingSchema; + } + final Struct updatedValue = new Struct(updatedSchema); + for (Field field : updatedSchema.fields()) { + updatedValue.put(field, value.get(field.name())); + } + for (int i = 0; i < fields.size(); i++) { + String fieldName = fields.get(i); + String headerName = headers.get(i); + Object fieldValue = value.get(fieldName); + Schema fieldSchema = operatingSchema.field(fieldName).schema(); + updatedHeaders.add(headerName, fieldValue, fieldSchema); + } + return newRecord(record, updatedSchema, updatedValue, updatedHeaders); + } + + private Schema moveSchema(Schema operatingSchema) { + final Schema updatedSchema; + final SchemaBuilder builder = SchemaUtil.copySchemaBasics(operatingSchema, SchemaBuilder.struct()); + for (Field field : operatingSchema.fields()) { + if (!fields.contains(field.name())) { + builder.field(field.name(), field.schema()); + } + } + updatedSchema = builder.build(); + return updatedSchema; + } + + private R applySchemaless(R record, Object operatingValue) { + Headers updatedHeaders = record.headers().duplicate(); + Map value = Requirements.requireMap(operatingValue, "header " + operation); + Map updatedValue = new HashMap<>(value); + for (int i = 0; i < fields.size(); i++) { + String fieldName = fields.get(i); + Object fieldValue = value.get(fieldName); + String headerName = headers.get(i); + if (operation == Operation.MOVE) { + updatedValue.remove(fieldName); + } + updatedHeaders.add(headerName, fieldValue, null); + } + return newRecord(record, null, updatedValue, updatedHeaders); + } + + protected abstract Object operatingValue(R record); + protected abstract Schema operatingSchema(R record); + protected abstract R newRecord(R record, Schema updatedSchema, Object updatedValue, Iterable
updatedHeaders); + + public static class Key> extends HeaderFrom { + + @Override + public Object operatingValue(R record) { + return record.key(); + } + + @Override + protected Schema operatingSchema(R record) { + return record.keySchema(); + } + + @Override + protected R newRecord(R record, Schema updatedSchema, Object updatedValue, Iterable
updatedHeaders) { + return record.newRecord(record.topic(), record.kafkaPartition(), updatedSchema, updatedValue, + record.valueSchema(), record.value(), record.timestamp(), updatedHeaders); + } + } + + public static class Value> extends HeaderFrom { + + @Override + public Object operatingValue(R record) { + return record.value(); + } + + @Override + protected Schema operatingSchema(R record) { + return record.valueSchema(); + } + + @Override + protected R newRecord(R record, Schema updatedSchema, Object updatedValue, Iterable
updatedHeaders) { + return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), + updatedSchema, updatedValue, record.timestamp(), updatedHeaders); + } + } + + @Override + public ConfigDef config() { + return CONFIG_DEF; + } + + @Override + public void close() { + + } + + @Override + public void configure(Map props) { + final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props); + fields = config.getList(FIELDS_FIELD); + headers = config.getList(HEADERS_FIELD); + if (headers.size() != fields.size()) { + throw new ConfigException("'fields' config must have the same number of elements as 'headers' config."); + } + operation = Operation.fromName(config.getString(OPERATION_FIELD)); + } +} diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertHeader.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertHeader.java new file mode 100644 index 0000000000000..a5b3cf2f8c441 --- /dev/null +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertHeader.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.transforms; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.header.Headers; +import org.apache.kafka.connect.transforms.util.SimpleConfig; + +import java.util.Map; + +public class InsertHeader> implements Transformation { + + public static final String OVERVIEW_DOC = + "Add a header to each record."; + + public static final String HEADER_FIELD = "header"; + public static final String VALUE_LITERAL_FIELD = "value.literal"; + + public static final ConfigDef CONFIG_DEF = new ConfigDef() + .define(HEADER_FIELD, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, + "The name of the header.") + .define(VALUE_LITERAL_FIELD, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, + "The literal value that is to be set as the header value on all records."); + + private String header; + + private String literalValue; + + @Override + public R apply(R record) { + Headers updatedHeaders = record.headers().duplicate(); + updatedHeaders.add(header, literalValue, Schema.STRING_SCHEMA); + return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), + record.valueSchema(), record.value(), record.timestamp(), updatedHeaders); + } + + + @Override + public ConfigDef config() { + return CONFIG_DEF; + } + + @Override + public void close() { + + } + + @Override + public void configure(Map props) { + final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props); + header = config.getString(HEADER_FIELD); + literalValue = config.getString(VALUE_LITERAL_FIELD); + } +} diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/DropHeadersTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/DropHeadersTest.java new file mode 100644 index 0000000000000..a362481e9c219 --- /dev/null +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/DropHeadersTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.transforms; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.header.ConnectHeaders; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static java.util.Arrays.asList; +import static java.util.Collections.singletonMap; +import static org.junit.Assert.assertEquals; + +public class DropHeadersTest { + + private DropHeaders xform = new DropHeaders<>(); + + private Map config(String... headers) { + Map result = new HashMap<>(); + result.put(DropHeaders.HEADERS_FIELD, asList(headers)); + return result; + } + + @Test + public void dropExistingHeader() { + xform.configure(config("to-drop")); + ConnectHeaders expected = new ConnectHeaders(); + expected.addString("existing", "existing-value"); + ConnectHeaders headers = expected.duplicate(); + headers.addString("to-drop", "existing-value"); + SourceRecord original = sourceRecord(headers); + SourceRecord xformed = xform.apply(original); + assertNonHeaders(original, xformed); + assertEquals(expected, xformed.headers()); + } + + @Test + public void dropExistingHeaderWithMultipleValues() { + xform.configure(config("to-drop")); + ConnectHeaders expected = new ConnectHeaders(); + expected.addString("existing", "existing-value"); + ConnectHeaders headers = expected.duplicate(); + headers.addString("to-drop", "existing-value"); + headers.addString("to-drop", "existing-other-value"); + + SourceRecord original = sourceRecord(headers); + SourceRecord xformed = xform.apply(original); + assertNonHeaders(original, xformed); + assertEquals(expected, xformed.headers()); + } + + @Test + public void dropNonExistingHeader() { + xform.configure(config("to-drop")); + ConnectHeaders expected = new ConnectHeaders(); + expected.addString("existing", "existing-value"); + ConnectHeaders headers = expected.duplicate(); + + SourceRecord original = sourceRecord(headers); + SourceRecord xformed = xform.apply(original); + assertNonHeaders(original, xformed); + assertEquals(expected, xformed.headers()); + } + + private void assertNonHeaders(SourceRecord original, SourceRecord xformed) { + assertEquals(original.sourcePartition(), xformed.sourcePartition()); + assertEquals(original.sourceOffset(), xformed.sourceOffset()); + assertEquals(original.topic(), xformed.topic()); + assertEquals(original.kafkaPartition(), xformed.kafkaPartition()); + assertEquals(original.keySchema(), xformed.keySchema()); + assertEquals(original.key(), xformed.key()); + assertEquals(original.valueSchema(), xformed.valueSchema()); + assertEquals(original.value(), xformed.value()); + assertEquals(original.timestamp(), xformed.timestamp()); + } + + private SourceRecord sourceRecord(ConnectHeaders headers) { + Map sourcePartition = singletonMap("foo", "bar"); + Map sourceOffset = singletonMap("baz", "quxx"); + String topic = "topic"; + Integer partition = 0; + Schema keySchema = null; + Object key = "key"; + Schema valueSchema = null; + Object value = "value"; + Long timestamp = 0L; + + SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, topic, partition, + keySchema, key, valueSchema, value, timestamp, headers); + return record; + } +} + diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HeaderFromTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HeaderFromTest.java new file mode 100644 index 0000000000000..e378ea2c97216 --- /dev/null +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HeaderFromTest.java @@ -0,0 +1,357 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.transforms; + +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.header.ConnectHeaders; +import org.apache.kafka.connect.header.Header; +import org.apache.kafka.connect.header.Headers; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; +import static org.apache.kafka.connect.data.Schema.STRING_SCHEMA; +import static org.junit.Assert.assertEquals; + +@RunWith(Parameterized.class) +public class HeaderFromTest { + + private final boolean keyTransform; + + static class RecordBuilder { + private final List fields = new ArrayList<>(2); + private final List fieldSchemas = new ArrayList<>(2); + private final List fieldValues = new ArrayList<>(2); + private final ConnectHeaders headers = new ConnectHeaders(); + + public RecordBuilder() { + } + + public RecordBuilder withField(String name, Schema schema, Object value) { + fields.add(name); + fieldSchemas.add(schema); + fieldValues.add(value); + return this; + } + + public RecordBuilder addHeader(String name, Schema schema, Object value) { + headers.add(name, new SchemaAndValue(schema, value)); + return this; + } + + public SourceRecord schemaless(boolean keyTransform) { + Map map = new HashMap<>(); + for (int i = 0; i < this.fields.size(); i++) { + String fieldName = this.fields.get(i); + map.put(fieldName, this.fieldValues.get(i)); + + } + return sourceRecord(keyTransform, null, map); + } + + private Schema schema() { + SchemaBuilder schemaBuilder = new SchemaBuilder(Schema.Type.STRUCT); + for (int i = 0; i < this.fields.size(); i++) { + String fieldName = this.fields.get(i); + schemaBuilder.field(fieldName, this.fieldSchemas.get(i)); + + } + return schemaBuilder.build(); + } + + private Struct struct(Schema schema) { + Struct struct = new Struct(schema); + for (int i = 0; i < this.fields.size(); i++) { + String fieldName = this.fields.get(i); + struct.put(fieldName, this.fieldValues.get(i)); + } + return struct; + } + + public SourceRecord withSchema(boolean keyTransform) { + Schema schema = schema(); + Struct struct = struct(schema); + return sourceRecord(keyTransform, schema, struct); + } + + private SourceRecord sourceRecord(boolean keyTransform, Schema keyOrValueSchema, Object keyOrValue) { + Map sourcePartition = singletonMap("foo", "bar"); + Map sourceOffset = singletonMap("baz", "quxx"); + String topic = "topic"; + Integer partition = 0; + Long timestamp = 0L; + + ConnectHeaders headers = this.headers; + if (keyOrValueSchema == null) { + // When doing a schemaless transformation we don't expect the header to have a schema + headers = new ConnectHeaders(); + for (Header header : this.headers) { + headers.add(header.key(), new SchemaAndValue(null, header.value())); + } + } + return new SourceRecord(sourcePartition, sourceOffset, topic, partition, + keyTransform ? keyOrValueSchema : null, + keyTransform ? keyOrValue : "key", + !keyTransform ? keyOrValueSchema : null, + !keyTransform ? keyOrValue : "value", + timestamp, headers); + } + + @Override + public String toString() { + return "RecordBuilder(" + + "fields=" + fields + + ", fieldSchemas=" + fieldSchemas + + ", fieldValues=" + fieldValues + + ", headers=" + headers + + ')'; + } + } + + @Parameterized.Parameters(name = "{0}: testKey={1}, xformFields={3}, xformHeaders={4}, operation={5}") + public static Collection data() { + + List result = new ArrayList<>(); + + + + for (Boolean testKeyTransform : asList(true, false)) { + result.add( + new Object[]{ + "basic copy", + testKeyTransform, + new RecordBuilder() + .withField("field1", STRING_SCHEMA, "field1-value") + .withField("field2", STRING_SCHEMA, "field2-value") + .addHeader("header1", STRING_SCHEMA, "existing-value"), + singletonList("field1"), singletonList("inserted1"), HeaderFrom.Operation.COPY, + new RecordBuilder() + .withField("field1", STRING_SCHEMA, "field1-value") + .withField("field2", STRING_SCHEMA, "field2-value") + .addHeader("header1", STRING_SCHEMA, "existing-value") + .addHeader("inserted1", STRING_SCHEMA, "field1-value") + }); + result.add( + new Object[]{ + "basic move", + testKeyTransform, + new RecordBuilder() + .withField("field1", STRING_SCHEMA, "field1-value") + .withField("field2", STRING_SCHEMA, "field2-value") + .addHeader("header1", STRING_SCHEMA, "existing-value"), + singletonList("field1"), singletonList("inserted1"), HeaderFrom.Operation.MOVE, + new RecordBuilder() + // field1 got moved + .withField("field2", STRING_SCHEMA, "field2-value") + .addHeader("header1", STRING_SCHEMA, "existing-value") + .addHeader("inserted1", STRING_SCHEMA, "field1-value") + }); + result.add( + new Object[]{ + "copy with preexisting header", + testKeyTransform, + new RecordBuilder() + .withField("field1", STRING_SCHEMA, "field1-value") + .withField("field2", STRING_SCHEMA, "field2-value") + .addHeader("inserted1", STRING_SCHEMA, "existing-value"), + singletonList("field1"), singletonList("inserted1"), HeaderFrom.Operation.COPY, + new RecordBuilder() + .withField("field1", STRING_SCHEMA, "field1-value") + .withField("field2", STRING_SCHEMA, "field2-value") + .addHeader("inserted1", STRING_SCHEMA, "existing-value") + .addHeader("inserted1", STRING_SCHEMA, "field1-value") + }); + result.add( + new Object[]{ + "move with preexisting header", + testKeyTransform, + new RecordBuilder() + .withField("field1", STRING_SCHEMA, "field1-value") + .withField("field2", STRING_SCHEMA, "field2-value") + .addHeader("inserted1", STRING_SCHEMA, "existing-value"), + singletonList("field1"), singletonList("inserted1"), HeaderFrom.Operation.MOVE, + new RecordBuilder() + // field1 got moved + .withField("field2", STRING_SCHEMA, "field2-value") + .addHeader("inserted1", STRING_SCHEMA, "existing-value") + .addHeader("inserted1", STRING_SCHEMA, "field1-value") + }); + Schema schema = new SchemaBuilder(Schema.Type.STRUCT).field("foo", STRING_SCHEMA).build(); + Struct struct = new Struct(schema).put("foo", "foo-value"); + result.add( + new Object[]{ + "copy with struct value", + testKeyTransform, + new RecordBuilder() + .withField("field1", schema, struct) + .withField("field2", STRING_SCHEMA, "field2-value") + .addHeader("header1", STRING_SCHEMA, "existing-value"), + singletonList("field1"), singletonList("inserted1"), HeaderFrom.Operation.COPY, + new RecordBuilder() + .withField("field1", schema, struct) + .withField("field2", STRING_SCHEMA, "field2-value") + .addHeader("header1", STRING_SCHEMA, "existing-value") + .addHeader("inserted1", schema, struct) + }); + result.add( + new Object[]{ + "move with struct value", + testKeyTransform, + new RecordBuilder() + .withField("field1", schema, struct) + .withField("field2", STRING_SCHEMA, "field2-value") + .addHeader("header1", STRING_SCHEMA, "existing-value"), + singletonList("field1"), singletonList("inserted1"), HeaderFrom.Operation.MOVE, + new RecordBuilder() + // field1 got moved + .withField("field2", STRING_SCHEMA, "field2-value") + .addHeader("header1", STRING_SCHEMA, "existing-value") + .addHeader("inserted1", schema, struct) + }); + result.add( + new Object[]{ + "two headers from same field", + testKeyTransform, + new RecordBuilder() + .withField("field1", STRING_SCHEMA, "field1-value") + .withField("field2", STRING_SCHEMA, "field2-value") + .addHeader("header1", STRING_SCHEMA, "existing-value"), + // two headers from the same field + asList("field1", "field1"), asList("inserted1", "inserted2"), HeaderFrom.Operation.MOVE, + new RecordBuilder() + // field1 got moved + .withField("field2", STRING_SCHEMA, "field2-value") + .addHeader("header1", STRING_SCHEMA, "existing-value") + .addHeader("inserted1", STRING_SCHEMA, "field1-value") + .addHeader("inserted2", STRING_SCHEMA, "field1-value") + }); + result.add( + new Object[]{ + "two fields to same header", + testKeyTransform, + new RecordBuilder() + .withField("field1", STRING_SCHEMA, "field1-value") + .withField("field2", STRING_SCHEMA, "field2-value") + .addHeader("header1", STRING_SCHEMA, "existing-value"), + // two headers from the same field + asList("field1", "field2"), asList("inserted1", "inserted1"), HeaderFrom.Operation.MOVE, + new RecordBuilder() + // field1 and field2 got moved + .addHeader("header1", STRING_SCHEMA, "existing-value") + .addHeader("inserted1", STRING_SCHEMA, "field1-value") + .addHeader("inserted1", STRING_SCHEMA, "field2-value") + }); + } + return result; + } + + private final HeaderFrom xform; + + private final RecordBuilder originalRecordBuilder; + private final RecordBuilder expectedRecordBuilder; + private final List transformFields; + private final List headers; + private final HeaderFrom.Operation operation; + + public HeaderFromTest(String description, + boolean keyTransform, + RecordBuilder originalBuilder, + List transformFields, List headers, HeaderFrom.Operation operation, + RecordBuilder expectedBuilder) { + this.keyTransform = keyTransform; + this.xform = keyTransform ? new HeaderFrom.Key<>() : new HeaderFrom.Value<>(); + this.originalRecordBuilder = originalBuilder; + this.expectedRecordBuilder = expectedBuilder; + this.transformFields = transformFields; + this.headers = headers; + this.operation = operation; + } + + private Map config() { + Map result = new HashMap<>(); + result.put(HeaderFrom.HEADERS_FIELD, headers); + result.put(HeaderFrom.FIELDS_FIELD, transformFields); + result.put(HeaderFrom.OPERATION_FIELD, operation.toString()); + return result; + } + + @Test + public void schemaless() { + xform.configure(config()); + ConnectHeaders headers = new ConnectHeaders(); + headers.addString("existing", "existing-value"); + + SourceRecord originalRecord = originalRecordBuilder.schemaless(keyTransform); + SourceRecord expectedRecord = expectedRecordBuilder.schemaless(keyTransform); + SourceRecord xformed = xform.apply(originalRecord); + assertSameRecord(expectedRecord, xformed); + } + + @Test + public void withSchema() { + xform.configure(config()); + ConnectHeaders headers = new ConnectHeaders(); + headers.addString("existing", "existing-value"); + Headers expect = headers.duplicate(); + for (int i = 0; i < this.headers.size(); i++) { + expect.add(this.headers.get(i), originalRecordBuilder.fieldValues.get(i), originalRecordBuilder.fieldSchemas.get(i)); + } + + SourceRecord originalRecord = originalRecordBuilder.withSchema(keyTransform); + SourceRecord expectedRecord = expectedRecordBuilder.withSchema(keyTransform); + SourceRecord xformed = xform.apply(originalRecord); + assertSameRecord(expectedRecord, xformed); + } + + @Test(expected = ConfigException.class) + public void invalidConfig() { + Map config = config(); + List headers = new ArrayList<>(this.headers); + headers.add("unexpected"); + config.put(HeaderFrom.HEADERS_FIELD, headers); + xform.configure(config); + } + + private static void assertSameRecord(SourceRecord expected, SourceRecord xformed) { + assertEquals(expected.sourcePartition(), xformed.sourcePartition()); + assertEquals(expected.sourceOffset(), xformed.sourceOffset()); + assertEquals(expected.topic(), xformed.topic()); + assertEquals(expected.kafkaPartition(), xformed.kafkaPartition()); + assertEquals(expected.keySchema(), xformed.keySchema()); + assertEquals(expected.key(), xformed.key()); + assertEquals(expected.valueSchema(), xformed.valueSchema()); + assertEquals(expected.value(), xformed.value()); + assertEquals(expected.timestamp(), xformed.timestamp()); + assertEquals(expected.headers(), xformed.headers()); + } + +} + diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertHeaderTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertHeaderTest.java new file mode 100644 index 0000000000000..8bc1823e43843 --- /dev/null +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertHeaderTest.java @@ -0,0 +1,96 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.transforms; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.header.ConnectHeaders; +import org.apache.kafka.connect.header.Headers; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static java.util.Collections.singletonMap; +import static org.junit.Assert.assertEquals; + +public class InsertHeaderTest { + + private InsertHeader xform = new InsertHeader<>(); + + private Map config(String header, String valueLiteral) { + Map result = new HashMap<>(); + result.put(InsertHeader.HEADER_FIELD, header); + result.put(InsertHeader.VALUE_LITERAL_FIELD, valueLiteral); + return result; + } + + @Test + public void insertionWithExistingOtherHeader() { + xform.configure(config("inserted", "inserted-value")); + ConnectHeaders headers = new ConnectHeaders(); + headers.addString("existing", "existing-value"); + Headers expect = headers.duplicate().addString("inserted", "inserted-value"); + + SourceRecord original = sourceRecord(headers); + SourceRecord xformed = xform.apply(original); + assertNonHeaders(original, xformed); + assertEquals(expect, xformed.headers()); + } + + @Test + public void insertionWithExistingSameHeader() { + xform.configure(config("existing", "inserted-value")); + ConnectHeaders headers = new ConnectHeaders(); + headers.addString("existing", "preexisting-value"); + Headers expect = headers.duplicate().addString("existing", "inserted-value"); + + SourceRecord original = sourceRecord(headers); + SourceRecord xformed = xform.apply(original); + assertNonHeaders(original, xformed); + assertEquals(expect, xformed.headers()); + } + + private void assertNonHeaders(SourceRecord original, SourceRecord xformed) { + assertEquals(original.sourcePartition(), xformed.sourcePartition()); + assertEquals(original.sourceOffset(), xformed.sourceOffset()); + assertEquals(original.topic(), xformed.topic()); + assertEquals(original.kafkaPartition(), xformed.kafkaPartition()); + assertEquals(original.keySchema(), xformed.keySchema()); + assertEquals(original.key(), xformed.key()); + assertEquals(original.valueSchema(), xformed.valueSchema()); + assertEquals(original.value(), xformed.value()); + assertEquals(original.timestamp(), xformed.timestamp()); + } + + private SourceRecord sourceRecord(ConnectHeaders headers) { + Map sourcePartition = singletonMap("foo", "bar"); + Map sourceOffset = singletonMap("baz", "quxx"); + String topic = "topic"; + Integer partition = 0; + Schema keySchema = null; + Object key = "key"; + Schema valueSchema = null; + Object value = "value"; + Long timestamp = 0L; + + SourceRecord record = new SourceRecord(sourcePartition, sourceOffset, topic, partition, + keySchema, key, valueSchema, value, timestamp, headers); + return record; + } +} + From 3233c5954d3a7dd0c3be50f3d02fc6f95acee799 Mon Sep 17 00:00:00 2001 From: Tom Bentley Date: Wed, 11 Nov 2020 09:08:21 +0000 Subject: [PATCH 2/5] Review comments --- .../kafka/connect/transforms/DropHeaders.java | 2 +- .../kafka/connect/transforms/HeaderFrom.java | 48 ++++++++++++------- .../connect/transforms/InsertHeader.java | 8 ++-- .../connect/transforms/InsertHeaderTest.java | 13 +++++ 4 files changed, 50 insertions(+), 21 deletions(-) diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/DropHeaders.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/DropHeaders.java index afa67472e7815..c92c80c446695 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/DropHeaders.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/DropHeaders.java @@ -29,7 +29,7 @@ public class DropHeaders> implements Transformation> implements Transformation { @@ -40,6 +44,8 @@ public abstract class HeaderFrom> implements Transfor public static final String FIELDS_FIELD = "fields"; public static final String HEADERS_FIELD = "headers"; public static final String OPERATION_FIELD = "operation"; + private static final String MOVE_OPERATION = "move"; + private static final String COPY_OPERATION = "copy"; public static final String OVERVIEW_DOC = "Moves or copies fields in the key/value of a record into that record's headers. " + @@ -60,8 +66,8 @@ public abstract class HeaderFrom> implements Transfor "or copy if the fields are to be copied to the headers (retained in the key/value)."); enum Operation { - MOVE("move"), - COPY("copy"); + MOVE(MOVE_OPERATION), + COPY(COPY_OPERATION); private final String name; @@ -71,9 +77,9 @@ enum Operation { static Operation fromName(String name) { switch (name) { - case "move": + case MOVE_OPERATION: return MOVE; - case "copy": + case COPY_OPERATION: return COPY; default: throw new IllegalArgumentException(); @@ -91,6 +97,8 @@ public String toString() { private Operation operation; + private Cache moveSchemaCache = new SynchronizedCache<>(new LRUCache<>(16)); + @Override public R apply(R record) { Object operatingValue = operatingValue(record); @@ -107,19 +115,21 @@ private R applyWithSchema(R record, Object operatingValue, Schema operatingSchem Headers updatedHeaders = record.headers().duplicate(); Struct value = Requirements.requireStruct(operatingValue, "header " + operation); final Schema updatedSchema; + final Struct updatedValue; if (operation == Operation.MOVE) { updatedSchema = moveSchema(operatingSchema); + updatedValue = new Struct(updatedSchema); + for (Field field : updatedSchema.fields()) { + updatedValue.put(field, value.get(field.name())); + } } else { updatedSchema = operatingSchema; - } - final Struct updatedValue = new Struct(updatedSchema); - for (Field field : updatedSchema.fields()) { - updatedValue.put(field, value.get(field.name())); + updatedValue = value; } for (int i = 0; i < fields.size(); i++) { String fieldName = fields.get(i); String headerName = headers.get(i); - Object fieldValue = value.get(fieldName); + Object fieldValue = value.schema().field(fieldName) != null ? value.get(fieldName) : null; Schema fieldSchema = operatingSchema.field(fieldName).schema(); updatedHeaders.add(headerName, fieldValue, fieldSchema); } @@ -127,15 +137,18 @@ private R applyWithSchema(R record, Object operatingValue, Schema operatingSchem } private Schema moveSchema(Schema operatingSchema) { - final Schema updatedSchema; - final SchemaBuilder builder = SchemaUtil.copySchemaBasics(operatingSchema, SchemaBuilder.struct()); - for (Field field : operatingSchema.fields()) { - if (!fields.contains(field.name())) { - builder.field(field.name(), field.schema()); + Schema moveSchema = this.moveSchemaCache.get(operatingSchema); + if (moveSchema == null) { + final SchemaBuilder builder = SchemaUtil.copySchemaBasics(operatingSchema, SchemaBuilder.struct()); + for (Field field : operatingSchema.fields()) { + if (!fields.contains(field.name())) { + builder.field(field.name(), field.schema()); + } } + moveSchema = builder.build(); + moveSchemaCache.put(operatingSchema, moveSchema); } - updatedSchema = builder.build(); - return updatedSchema; + return moveSchema; } private R applySchemaless(R record, Object operatingValue) { @@ -212,7 +225,8 @@ public void configure(Map props) { fields = config.getList(FIELDS_FIELD); headers = config.getList(HEADERS_FIELD); if (headers.size() != fields.size()) { - throw new ConfigException("'fields' config must have the same number of elements as 'headers' config."); + throw new ConfigException(format("'%s' config must have the same number of elements as '%s' config.", + FIELDS_FIELD, HEADERS_FIELD)); } operation = Operation.fromName(config.getString(OPERATION_FIELD)); } diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertHeader.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertHeader.java index a5b3cf2f8c441..162ac75a58409 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertHeader.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertHeader.java @@ -19,6 +19,8 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.data.Values; import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.transforms.util.SimpleConfig; @@ -40,12 +42,12 @@ public class InsertHeader> implements Transformation< private String header; - private String literalValue; + private SchemaAndValue literalValue; @Override public R apply(R record) { Headers updatedHeaders = record.headers().duplicate(); - updatedHeaders.add(header, literalValue, Schema.STRING_SCHEMA); + updatedHeaders.add(header, literalValue); return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), record.valueSchema(), record.value(), record.timestamp(), updatedHeaders); } @@ -65,6 +67,6 @@ public void close() { public void configure(Map props) { final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props); header = config.getString(HEADER_FIELD); - literalValue = config.getString(VALUE_LITERAL_FIELD); + literalValue = Values.parseString(config.getString(VALUE_LITERAL_FIELD)); } } diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertHeaderTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertHeaderTest.java index 8bc1823e43843..1c944768cf544 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertHeaderTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertHeaderTest.java @@ -65,6 +65,19 @@ public void insertionWithExistingSameHeader() { assertEquals(expect, xformed.headers()); } + @Test + public void insertionWithByteHeader() { + xform.configure(config("inserted", "1")); + ConnectHeaders headers = new ConnectHeaders(); + headers.addString("existing", "existing-value"); + Headers expect = headers.duplicate().addByte("inserted", (byte) 1); + + SourceRecord original = sourceRecord(headers); + SourceRecord xformed = xform.apply(original); + assertNonHeaders(original, xformed); + assertEquals(expect, xformed.headers()); + } + private void assertNonHeaders(SourceRecord original, SourceRecord xformed) { assertEquals(original.sourcePartition(), xformed.sourcePartition()); assertEquals(original.sourceOffset(), xformed.sourceOffset()); From 9e8ef0b30ff27329ed68650d5b0c899b8c6a0201 Mon Sep 17 00:00:00 2001 From: Tom Bentley Date: Mon, 16 Nov 2020 11:28:01 +0000 Subject: [PATCH 3/5] Docs --- docs/connect.html | 3 +++ 1 file changed, 3 insertions(+) diff --git a/docs/connect.html b/docs/connect.html index ef4cfe9668c7a..e522c8a088f22 100644 --- a/docs/connect.html +++ b/docs/connect.html @@ -183,6 +183,9 @@
predicate to selectively filter certain messages. +
  • InsertHeader - Add a header using static data
  • +
  • HeadersFrom - Copy or move fields in the key or value to the record headers
  • +
  • DropHeaders - Remove headers by name
  • Details on how to configure each transformation are listed below:

    From 6e75b8cf48611d253ea513795ab6f7a79ffee069 Mon Sep 17 00:00:00 2001 From: Tom Bentley Date: Wed, 7 Apr 2021 11:51:34 +0100 Subject: [PATCH 4/5] Review comments --- .../connect/tools/TransformationDoc.java | 1 - .../kafka/connect/transforms/DropHeaders.java | 26 ++++++++++++------- .../kafka/connect/transforms/HeaderFrom.java | 11 +++++--- .../connect/transforms/InsertHeader.java | 11 +++++--- .../connect/transforms/DropHeadersTest.java | 6 +++++ .../connect/transforms/HeaderFromTest.java | 20 +++++++++++++- .../connect/transforms/InsertHeaderTest.java | 11 ++++++++ 7 files changed, 69 insertions(+), 17 deletions(-) diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java index e29a7c6c3afef..5771a6b0ed757 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/tools/TransformationDoc.java @@ -23,7 +23,6 @@ import org.apache.kafka.connect.transforms.Filter; import org.apache.kafka.connect.transforms.Flatten; import org.apache.kafka.connect.transforms.HeaderFrom; -import org.apache.kafka.connect.transforms.HeaderTo; import org.apache.kafka.connect.transforms.HoistField; import org.apache.kafka.connect.transforms.InsertField; import org.apache.kafka.connect.transforms.InsertHeader; diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/DropHeaders.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/DropHeaders.java index c92c80c446695..6d1e1a49811c2 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/DropHeaders.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/DropHeaders.java @@ -18,11 +18,17 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.header.ConnectHeaders; +import org.apache.kafka.connect.header.Header; import org.apache.kafka.connect.header.Headers; +import org.apache.kafka.connect.transforms.util.NonEmptyListValidator; import org.apache.kafka.connect.transforms.util.SimpleConfig; -import java.util.List; +import java.util.HashSet; import java.util.Map; +import java.util.Set; + +import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE; public class DropHeaders> implements Transformation { @@ -32,22 +38,25 @@ public class DropHeaders> implements Transformation headers; + private Set headers; @Override public R apply(R record) { - Headers updatedHeaders = record.headers().duplicate(); - for (String name : headers) { - updatedHeaders.remove(name); + Headers updatedHeaders = new ConnectHeaders(); + for (Header header : record.headers()) { + if (!headers.contains(header.key())) { + updatedHeaders.add(header); + } } return record.newRecord(record.topic(), record.kafkaPartition(), record.keySchema(), record.key(), record.valueSchema(), record.value(), record.timestamp(), updatedHeaders); } - @Override public ConfigDef config() { return CONFIG_DEF; @@ -55,12 +64,11 @@ public ConfigDef config() { @Override public void close() { - } @Override public void configure(Map props) { final SimpleConfig config = new SimpleConfig(CONFIG_DEF, props); - headers = config.getList(HEADERS_FIELD); + headers = new HashSet<>(config.getList(HEADERS_FIELD)); } } diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java index 40033f89f7176..b32ad567e8f6a 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java @@ -28,6 +28,7 @@ import org.apache.kafka.connect.data.Struct; import org.apache.kafka.connect.header.Header; import org.apache.kafka.connect.header.Headers; +import org.apache.kafka.connect.transforms.util.NonEmptyListValidator; import org.apache.kafka.connect.transforms.util.Requirements; import org.apache.kafka.connect.transforms.util.SchemaUtil; import org.apache.kafka.connect.transforms.util.SimpleConfig; @@ -56,12 +57,16 @@ public abstract class HeaderFrom> implements Transfor "key (" + Key.class.getName() + ") or value (" + Value.class.getName() + ")."; public static final ConfigDef CONFIG_DEF = new ConfigDef() - .define(FIELDS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, + .define(FIELDS_FIELD, ConfigDef.Type.LIST, + NO_DEFAULT_VALUE, new NonEmptyListValidator(), + ConfigDef.Importance.HIGH, "Field names in the record whose values are to be copied or moved to headers.") - .define(HEADERS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, + .define(HEADERS_FIELD, ConfigDef.Type.LIST, + NO_DEFAULT_VALUE, new NonEmptyListValidator(), + ConfigDef.Importance.HIGH, "Header names, in the same order as the field names listed in the fields configuration property.") .define(OPERATION_FIELD, ConfigDef.Type.STRING, NO_DEFAULT_VALUE, - ConfigDef.ValidString.in("move", "copy"), ConfigDef.Importance.HIGH, + ConfigDef.ValidString.in(MOVE_OPERATION, COPY_OPERATION), ConfigDef.Importance.HIGH, "Either move if the fields are to be moved to the headers (removed from the key/value), " + "or copy if the fields are to be copied to the headers (retained in the key/value)."); diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertHeader.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertHeader.java index 162ac75a58409..88b20020ebac9 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertHeader.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/InsertHeader.java @@ -18,7 +18,6 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.ConnectRecord; -import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaAndValue; import org.apache.kafka.connect.data.Values; import org.apache.kafka.connect.header.Headers; @@ -26,6 +25,8 @@ import java.util.Map; +import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE; + public class InsertHeader> implements Transformation { public static final String OVERVIEW_DOC = @@ -35,9 +36,13 @@ public class InsertHeader> implements Transformation< public static final String VALUE_LITERAL_FIELD = "value.literal"; public static final ConfigDef CONFIG_DEF = new ConfigDef() - .define(HEADER_FIELD, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, + .define(HEADER_FIELD, ConfigDef.Type.STRING, + NO_DEFAULT_VALUE, new ConfigDef.NonNullValidator(), + ConfigDef.Importance.HIGH, "The name of the header.") - .define(VALUE_LITERAL_FIELD, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, + .define(VALUE_LITERAL_FIELD, ConfigDef.Type.STRING, + NO_DEFAULT_VALUE, new ConfigDef.NonNullValidator(), + ConfigDef.Importance.HIGH, "The literal value that is to be set as the header value on all records."); private String header; diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/DropHeadersTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/DropHeadersTest.java index a362481e9c219..fda45a0a1ce4c 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/DropHeadersTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/DropHeadersTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.connect.transforms; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.header.ConnectHeaders; import org.apache.kafka.connect.source.SourceRecord; @@ -79,6 +80,11 @@ public void dropNonExistingHeader() { assertEquals(expected, xformed.headers()); } + @Test(expected = ConfigException.class) + public void configRejectsEmptyList() { + xform.configure(config()); + } + private void assertNonHeaders(SourceRecord original, SourceRecord xformed) { assertEquals(original.sourcePartition(), xformed.sourcePartition()); assertEquals(original.sourceOffset(), xformed.sourceOffset()); diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HeaderFromTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HeaderFromTest.java index e378ea2c97216..de67ada8aa098 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HeaderFromTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HeaderFromTest.java @@ -36,6 +36,7 @@ import java.util.Map; import static java.util.Arrays.asList; +import static java.util.Collections.emptyList; import static java.util.Collections.singletonList; import static java.util.Collections.singletonMap; import static org.apache.kafka.connect.data.Schema.STRING_SCHEMA; @@ -332,7 +333,7 @@ public void withSchema() { } @Test(expected = ConfigException.class) - public void invalidConfig() { + public void invalidConfigExtraHeaderConfig() { Map config = config(); List headers = new ArrayList<>(this.headers); headers.add("unexpected"); @@ -340,6 +341,23 @@ public void invalidConfig() { xform.configure(config); } + @Test(expected = ConfigException.class) + public void invalidConfigExtraFieldConfig() { + Map config = config(); + List fields = new ArrayList<>(this.transformFields); + fields.add("unexpected"); + config.put(HeaderFrom.FIELDS_FIELD, fields); + xform.configure(config); + } + + @Test(expected = ConfigException.class) + public void invalidConfigEmptyHeadersAndFieldsConfig() { + Map config = config(); + config.put(HeaderFrom.HEADERS_FIELD, emptyList()); + config.put(HeaderFrom.FIELDS_FIELD, emptyList()); + xform.configure(config); + } + private static void assertSameRecord(SourceRecord expected, SourceRecord xformed) { assertEquals(expected.sourcePartition(), xformed.sourcePartition()); assertEquals(expected.sourceOffset(), xformed.sourceOffset()); diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertHeaderTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertHeaderTest.java index 1c944768cf544..66538df12b705 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertHeaderTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertHeaderTest.java @@ -16,6 +16,7 @@ */ package org.apache.kafka.connect.transforms; +import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.header.ConnectHeaders; import org.apache.kafka.connect.header.Headers; @@ -78,6 +79,16 @@ public void insertionWithByteHeader() { assertEquals(expect, xformed.headers()); } + @Test(expected = ConfigException.class) + public void configRejectsNullHeaderKey() { + xform.configure(config(null, "1")); + } + + @Test(expected = ConfigException.class) + public void configRejectsNullHeaderValue() { + xform.configure(config("inserted", null)); + } + private void assertNonHeaders(SourceRecord original, SourceRecord xformed) { assertEquals(original.sourcePartition(), xformed.sourcePartition()); assertEquals(original.sourceOffset(), xformed.sourceOffset()); From c7ef2dc934db78512a5192b1450edb38f9e4461f Mon Sep 17 00:00:00 2001 From: Tom Bentley Date: Wed, 7 Apr 2021 12:21:46 +0100 Subject: [PATCH 5/5] Convert tests to junit5 --- .../connect/transforms/DropHeadersTest.java | 9 +- .../connect/transforms/HeaderFromTest.java | 152 ++++++++---------- .../connect/transforms/InsertHeaderTest.java | 13 +- 3 files changed, 80 insertions(+), 94 deletions(-) diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/DropHeadersTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/DropHeadersTest.java index fda45a0a1ce4c..7d20c384e78ad 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/DropHeadersTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/DropHeadersTest.java @@ -20,14 +20,15 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.header.ConnectHeaders; import org.apache.kafka.connect.source.SourceRecord; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.util.HashMap; import java.util.Map; import static java.util.Arrays.asList; import static java.util.Collections.singletonMap; -import static org.junit.Assert.assertEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; public class DropHeadersTest { @@ -80,9 +81,9 @@ public void dropNonExistingHeader() { assertEquals(expected, xformed.headers()); } - @Test(expected = ConfigException.class) + @Test public void configRejectsEmptyList() { - xform.configure(config()); + assertThrows(ConfigException.class, () -> xform.configure(config())); } private void assertNonHeaders(SourceRecord original, SourceRecord xformed) { diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HeaderFromTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HeaderFromTest.java index de67ada8aa098..61e05757474b6 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HeaderFromTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HeaderFromTest.java @@ -25,12 +25,12 @@ import org.apache.kafka.connect.header.Header; import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.source.SourceRecord; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; +import org.junit.jupiter.params.provider.ValueSource; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -40,13 +40,11 @@ import static java.util.Collections.singletonList; import static java.util.Collections.singletonMap; import static org.apache.kafka.connect.data.Schema.STRING_SCHEMA; -import static org.junit.Assert.assertEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; -@RunWith(Parameterized.class) public class HeaderFromTest { - private final boolean keyTransform; - static class RecordBuilder { private final List fields = new ArrayList<>(2); private final List fieldSchemas = new ArrayList<>(2); @@ -137,16 +135,13 @@ public String toString() { } } - @Parameterized.Parameters(name = "{0}: testKey={1}, xformFields={3}, xformHeaders={4}, operation={5}") - public static Collection data() { - - List result = new ArrayList<>(); - + public static List data() { + List result = new ArrayList<>(); for (Boolean testKeyTransform : asList(true, false)) { result.add( - new Object[]{ + Arguments.of( "basic copy", testKeyTransform, new RecordBuilder() @@ -159,9 +154,9 @@ public static Collection data() { .withField("field2", STRING_SCHEMA, "field2-value") .addHeader("header1", STRING_SCHEMA, "existing-value") .addHeader("inserted1", STRING_SCHEMA, "field1-value") - }); + )); result.add( - new Object[]{ + Arguments.of( "basic move", testKeyTransform, new RecordBuilder() @@ -174,9 +169,9 @@ public static Collection data() { .withField("field2", STRING_SCHEMA, "field2-value") .addHeader("header1", STRING_SCHEMA, "existing-value") .addHeader("inserted1", STRING_SCHEMA, "field1-value") - }); + )); result.add( - new Object[]{ + Arguments.of( "copy with preexisting header", testKeyTransform, new RecordBuilder() @@ -189,9 +184,9 @@ public static Collection data() { .withField("field2", STRING_SCHEMA, "field2-value") .addHeader("inserted1", STRING_SCHEMA, "existing-value") .addHeader("inserted1", STRING_SCHEMA, "field1-value") - }); + )); result.add( - new Object[]{ + Arguments.of( "move with preexisting header", testKeyTransform, new RecordBuilder() @@ -204,11 +199,11 @@ public static Collection data() { .withField("field2", STRING_SCHEMA, "field2-value") .addHeader("inserted1", STRING_SCHEMA, "existing-value") .addHeader("inserted1", STRING_SCHEMA, "field1-value") - }); + )); Schema schema = new SchemaBuilder(Schema.Type.STRUCT).field("foo", STRING_SCHEMA).build(); Struct struct = new Struct(schema).put("foo", "foo-value"); result.add( - new Object[]{ + Arguments.of( "copy with struct value", testKeyTransform, new RecordBuilder() @@ -221,9 +216,9 @@ public static Collection data() { .withField("field2", STRING_SCHEMA, "field2-value") .addHeader("header1", STRING_SCHEMA, "existing-value") .addHeader("inserted1", schema, struct) - }); + )); result.add( - new Object[]{ + Arguments.of( "move with struct value", testKeyTransform, new RecordBuilder() @@ -236,9 +231,9 @@ public static Collection data() { .withField("field2", STRING_SCHEMA, "field2-value") .addHeader("header1", STRING_SCHEMA, "existing-value") .addHeader("inserted1", schema, struct) - }); + )); result.add( - new Object[]{ + Arguments.of( "two headers from same field", testKeyTransform, new RecordBuilder() @@ -253,9 +248,9 @@ public static Collection data() { .addHeader("header1", STRING_SCHEMA, "existing-value") .addHeader("inserted1", STRING_SCHEMA, "field1-value") .addHeader("inserted2", STRING_SCHEMA, "field1-value") - }); + )); result.add( - new Object[]{ + Arguments.of( "two fields to same header", testKeyTransform, new RecordBuilder() @@ -269,34 +264,12 @@ public static Collection data() { .addHeader("header1", STRING_SCHEMA, "existing-value") .addHeader("inserted1", STRING_SCHEMA, "field1-value") .addHeader("inserted1", STRING_SCHEMA, "field2-value") - }); + )); } return result; } - private final HeaderFrom xform; - - private final RecordBuilder originalRecordBuilder; - private final RecordBuilder expectedRecordBuilder; - private final List transformFields; - private final List headers; - private final HeaderFrom.Operation operation; - - public HeaderFromTest(String description, - boolean keyTransform, - RecordBuilder originalBuilder, - List transformFields, List headers, HeaderFrom.Operation operation, - RecordBuilder expectedBuilder) { - this.keyTransform = keyTransform; - this.xform = keyTransform ? new HeaderFrom.Key<>() : new HeaderFrom.Value<>(); - this.originalRecordBuilder = originalBuilder; - this.expectedRecordBuilder = expectedBuilder; - this.transformFields = transformFields; - this.headers = headers; - this.operation = operation; - } - - private Map config() { + private Map config(List headers, List transformFields, HeaderFrom.Operation operation) { Map result = new HashMap<>(); result.put(HeaderFrom.HEADERS_FIELD, headers); result.put(HeaderFrom.FIELDS_FIELD, transformFields); @@ -304,58 +277,69 @@ private Map config() { return result; } - @Test - public void schemaless() { - xform.configure(config()); + @ParameterizedTest + @MethodSource("data") + public void schemaless(String description, + boolean keyTransform, + RecordBuilder originalBuilder, + List transformFields, List headers1, HeaderFrom.Operation operation, + RecordBuilder expectedBuilder) { + HeaderFrom xform = keyTransform ? new HeaderFrom.Key<>() : new HeaderFrom.Value<>(); + + xform.configure(config(headers1, transformFields, operation)); ConnectHeaders headers = new ConnectHeaders(); headers.addString("existing", "existing-value"); - SourceRecord originalRecord = originalRecordBuilder.schemaless(keyTransform); - SourceRecord expectedRecord = expectedRecordBuilder.schemaless(keyTransform); + SourceRecord originalRecord = originalBuilder.schemaless(keyTransform); + SourceRecord expectedRecord = expectedBuilder.schemaless(keyTransform); SourceRecord xformed = xform.apply(originalRecord); assertSameRecord(expectedRecord, xformed); } - @Test - public void withSchema() { - xform.configure(config()); + @ParameterizedTest + @MethodSource("data") + public void withSchema(String description, + boolean keyTransform, + RecordBuilder originalBuilder, + List transformFields, List headers1, HeaderFrom.Operation operation, + RecordBuilder expectedBuilder) { + HeaderFrom xform = keyTransform ? new HeaderFrom.Key<>() : new HeaderFrom.Value<>(); + xform.configure(config(headers1, transformFields, operation)); ConnectHeaders headers = new ConnectHeaders(); headers.addString("existing", "existing-value"); Headers expect = headers.duplicate(); - for (int i = 0; i < this.headers.size(); i++) { - expect.add(this.headers.get(i), originalRecordBuilder.fieldValues.get(i), originalRecordBuilder.fieldSchemas.get(i)); + for (int i = 0; i < headers1.size(); i++) { + expect.add(headers1.get(i), originalBuilder.fieldValues.get(i), originalBuilder.fieldSchemas.get(i)); } - SourceRecord originalRecord = originalRecordBuilder.withSchema(keyTransform); - SourceRecord expectedRecord = expectedRecordBuilder.withSchema(keyTransform); + SourceRecord originalRecord = originalBuilder.withSchema(keyTransform); + SourceRecord expectedRecord = expectedBuilder.withSchema(keyTransform); SourceRecord xformed = xform.apply(originalRecord); assertSameRecord(expectedRecord, xformed); } - @Test(expected = ConfigException.class) - public void invalidConfigExtraHeaderConfig() { - Map config = config(); - List headers = new ArrayList<>(this.headers); - headers.add("unexpected"); - config.put(HeaderFrom.HEADERS_FIELD, headers); - xform.configure(config); + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void invalidConfigExtraHeaderConfig(boolean keyTransform) { + Map config = config(singletonList("foo"), asList("foo", "bar"), HeaderFrom.Operation.COPY); + HeaderFrom xform = keyTransform ? new HeaderFrom.Key<>() : new HeaderFrom.Value<>(); + assertThrows(ConfigException.class, () -> xform.configure(config)); } - @Test(expected = ConfigException.class) - public void invalidConfigExtraFieldConfig() { - Map config = config(); - List fields = new ArrayList<>(this.transformFields); - fields.add("unexpected"); - config.put(HeaderFrom.FIELDS_FIELD, fields); - xform.configure(config); + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void invalidConfigExtraFieldConfig(boolean keyTransform) { + Map config = config(asList("foo", "bar"), singletonList("foo"), HeaderFrom.Operation.COPY); + HeaderFrom xform = keyTransform ? new HeaderFrom.Key<>() : new HeaderFrom.Value<>(); + assertThrows(ConfigException.class, () -> xform.configure(config)); } - @Test(expected = ConfigException.class) - public void invalidConfigEmptyHeadersAndFieldsConfig() { - Map config = config(); - config.put(HeaderFrom.HEADERS_FIELD, emptyList()); - config.put(HeaderFrom.FIELDS_FIELD, emptyList()); - xform.configure(config); + @ParameterizedTest + @ValueSource(booleans = {true, false}) + public void invalidConfigEmptyHeadersAndFieldsConfig(boolean keyTransform) { + Map config = config(emptyList(), emptyList(), HeaderFrom.Operation.COPY); + HeaderFrom xform = keyTransform ? new HeaderFrom.Key<>() : new HeaderFrom.Value<>(); + assertThrows(ConfigException.class, () -> xform.configure(config)); } private static void assertSameRecord(SourceRecord expected, SourceRecord xformed) { diff --git a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertHeaderTest.java b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertHeaderTest.java index 66538df12b705..97cbe5d4162b6 100644 --- a/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertHeaderTest.java +++ b/connect/transforms/src/test/java/org/apache/kafka/connect/transforms/InsertHeaderTest.java @@ -21,13 +21,14 @@ import org.apache.kafka.connect.header.ConnectHeaders; import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.source.SourceRecord; -import org.junit.Test; +import org.junit.jupiter.api.Test; import java.util.HashMap; import java.util.Map; import static java.util.Collections.singletonMap; -import static org.junit.Assert.assertEquals; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertThrows; public class InsertHeaderTest { @@ -79,14 +80,14 @@ public void insertionWithByteHeader() { assertEquals(expect, xformed.headers()); } - @Test(expected = ConfigException.class) + @Test public void configRejectsNullHeaderKey() { - xform.configure(config(null, "1")); + assertThrows(ConfigException.class, () -> xform.configure(config(null, "1"))); } - @Test(expected = ConfigException.class) + @Test public void configRejectsNullHeaderValue() { - xform.configure(config("inserted", null)); + assertThrows(ConfigException.class, () -> xform.configure(config("inserted", null))); } private void assertNonHeaders(SourceRecord original, SourceRecord xformed) {