diff --git a/README.md b/README.md index 95705da3..f5a7293b 100644 --- a/README.md +++ b/README.md @@ -67,6 +67,7 @@ Kstreamplify adds extra features to Kafka Streams, simplifying development so yo * [By Key](#by-key) * [By Key and Value](#by-key-and-value) * [By Predicate](#by-predicate) + * [By Headers](#by-headers) * [OpenTelemetry](#opentelemetry) * [Custom Tags for Metrics](#custom-tags-for-metrics) * [Swagger](#swagger) @@ -808,29 +809,74 @@ For more details about prefixes, see the [Prefix](#prefix) section. The `DeduplicationUtils` class helps you deduplicate streams based on various criteria and within a specified time window. -All deduplication methods return a `KStream`, allowing you to handle errors and route them to `TopologyErrorHandler#catchErrors()`. +All `*withErrors` methods return a `KStream`, allowing you to handle errors and route them to `TopologyErrorHandler#catchErrors()`. +Methods without `*withErrors` return a plain `KStream` and can be used directly. **Note**: Only streams with `String` keys and Avro values are supported. -#### By Key +#### By Key + +##### With error handling ```java @Component public class MyKafkaStreams extends KafkaStreamsStarter { - @Override - public void topology(StreamsBuilder streamsBuilder) { - KStream myStream = streamsBuilder + @Override + public void topology(StreamsBuilder streamsBuilder) { + KStream myStream = streamsBuilder .stream("input_topic"); - DeduplicationUtils - .deduplicateKeys(streamsBuilder, myStream, Duration.ofDays(60)) - .to("output_topic"); - } + TopologyErrorHandler + .catchErrors( + DeduplicationUtils + .distinctKeysWithErrors(streamsBuilder, myStream, Duration.ofDays(60)) + ) + .to("output_topic", Produced.with(Serdes.String(), SerdesUtils.getValueSerdes())); + } +} +``` + +##### Without error handling + +```java +@Component +public class MyKafkaStreams extends KafkaStreamsStarter { + @Override + public void topology(StreamsBuilder streamsBuilder) { + KStream myStream = streamsBuilder + .stream("input_topic"); + + DeduplicationUtils + .distinctKeysWithErrors(streamsBuilder, myStream, Duration.ofDays(60)) + .to("output_topic", Produced.with(Serdes.String(), SerdesUtils.getValueSerdes())); + } } ``` #### By Key and Value +##### With error handling + +```java +@Component +public class MyKafkaStreams extends KafkaStreamsStarter { + @Override + public void topology(StreamsBuilder streamsBuilder) { + KStream myStream = streamsBuilder + .stream("input_topic"); + + TopologyErrorHandler + .catchErrors( + DeduplicationUtils + .distinctByKeyValuesWithErrors(streamsBuilder, myStream, Duration.ofDays(60)) + ) + .to("output_topic", Produced.with(Serdes.String(), SerdesUtils.getValueSerdes())); + } +} +``` + +##### Without error handling + ```java @Component public class MyKafkaStreams extends KafkaStreamsStarter { @@ -840,7 +886,7 @@ public class MyKafkaStreams extends KafkaStreamsStarter { .stream("input_topic"); DeduplicationUtils - .deduplicateKeyValues(streamsBuilder, myStream, Duration.ofDays(60)) + .distinctByKeyValues(streamsBuilder, myStream, Duration.ofDays(60)) .to("output_topic"); } } @@ -848,6 +894,33 @@ public class MyKafkaStreams extends KafkaStreamsStarter { #### By Predicate +##### With error handling + +```java +@Component +public class MyKafkaStreams extends KafkaStreamsStarter { + @Override + public void topology(StreamsBuilder streamsBuilder) { + KStream myStream = streamsBuilder + .stream("input_topic"); + + TopologyErrorHandler + .catchErrors( + DeduplicationUtils + .distinctByPredicateWithErrors( + streamsBuilder, + myStream, + Duration.ofDays(60), + value -> value.getFirstName() + "#" + value.getLastName() + ) + ) + .to("output_topic", Produced.with(Serdes.String(), SerdesUtils.getValueSerdes())); + } +} +``` + +##### Without error handling + ```java @Component public class MyKafkaStreams extends KafkaStreamsStarter { @@ -857,7 +930,7 @@ public class MyKafkaStreams extends KafkaStreamsStarter { .stream("input_topic"); DeduplicationUtils - .deduplicateWithPredicate(streamsBuilder, myStream, Duration.ofDays(60), + .distinctByPredicate(streamsBuilder, myStream, Duration.ofDays(60), value -> value.getFirstName() + "#" + value.getLastName()) .to("output_topic"); } @@ -868,6 +941,35 @@ In the predicate approach, the provided predicate is used as the key in the wind #### By Headers +##### With error handling + +```java +import java.util.List; + +@Component +public class MyKafkaStreams extends KafkaStreamsStarter { + @Override + public void topology(StreamsBuilder streamsBuilder) { + KStream myStream = streamsBuilder + .stream("input_topic"); + + TopologyErrorHandler + .catchErrors( + DeduplicationUtils + .distinctByHeadersWithErrors( + streamsBuilder, + myStream, + Duration.ofDays(60), + List.of("header1", "header2") + ) + ) + .to("output_topic", Produced.with(Serdes.String(), SerdesUtils.getValueSerdes())); + } +} +``` + +##### Without error handling + ```java import java.util.List; @@ -879,7 +981,7 @@ public class MyKafkaStreams extends KafkaStreamsStarter { .stream("input_topic"); DeduplicationUtils - .deduplicateWithHeaders(streamsBuilder, myStream, Duration.ofDays(60), + .distinctByHeaders(streamsBuilder, myStream, Duration.ofDays(60), List.of("header1", "header2")) .to("output_topic"); } diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupHeadersProcessor.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupHeadersProcessor.java index 2cd124a4..6f445d94 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupHeadersProcessor.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupHeadersProcessor.java @@ -18,7 +18,6 @@ */ package com.michelin.kstreamplify.deduplication; -import com.michelin.kstreamplify.error.ProcessingResult; import java.nio.charset.StandardCharsets; import java.time.Duration; import java.time.Instant; @@ -34,12 +33,11 @@ import org.apache.kafka.streams.state.WindowStore; /** - * Transformer class for the deduplication mechanism on headers of a given topic. + * Processor class for the deduplication mechanism on headers of a given topic. * * @param The type of the value */ -public class DedupHeadersProcessor - implements Processor> { +public class DedupHeadersProcessor implements Processor { /** Window store name, initialized @ construction. */ private final String windowStoreName; @@ -47,8 +45,8 @@ public class DedupHeadersProcessor private final Duration retentionWindowDuration; /** Deduplication headers list. */ private final List deduplicationHeadersList; - /** Kstream context for this transformer. */ - private ProcessorContext> processorContext; + /** Kstream context for this processor. */ + private ProcessorContext processorContext; /** Window store containing all the records seen on the given window. */ private WindowStore dedupWindowStore; @@ -66,41 +64,49 @@ public DedupHeadersProcessor( this.deduplicationHeadersList = deduplicationHeadersList; } + /** + * Get the header value for a given key + * + * @param headers headers of the record + * @param key the key to look for in the headers + * @return The header value for the given key, or an empty string if the header is not present or has no value. + */ + private static String getHeader(Headers headers, String key) { + Header header = headers.lastHeader(key); + if (header == null || header.value() == null) { + return StringUtils.EMPTY; + } + String value = new String(header.value(), StandardCharsets.UTF_8); + return StringUtils.defaultString(value); + } + @Override - public void init(ProcessorContext> context) { + public void init(ProcessorContext context) { this.processorContext = context; dedupWindowStore = this.processorContext.getStateStore(windowStoreName); } @Override public void process(Record message) { - try { - // Get the record timestamp - var currentInstant = Instant.ofEpochMilli(message.timestamp()); - String identifier = buildIdentifier(message.headers()); + // Get the record timestamp + var currentInstant = Instant.ofEpochMilli(message.timestamp()); + String identifier = buildIdentifier(message.headers()); - // Retrieve all the matching keys in the stateStore and return null if found it (signaling a duplicate) - try (var resultIterator = dedupWindowStore.backwardFetch( - identifier, - currentInstant.minus(retentionWindowDuration), - currentInstant.plus(retentionWindowDuration))) { - while (resultIterator != null && resultIterator.hasNext()) { - var currentKeyValue = resultIterator.next(); - if (identifier.equals(currentKeyValue.value)) { - return; - } + // Retrieve all the matching keys in the stateStore and return null if found it (signaling a duplicate) + try (var resultIterator = dedupWindowStore.backwardFetch( + identifier, + currentInstant.minus(retentionWindowDuration), + currentInstant.plus(retentionWindowDuration))) { + while (resultIterator != null && resultIterator.hasNext()) { + var currentKeyValue = resultIterator.next(); + if (identifier.equals(currentKeyValue.value)) { + return; } } - // First time we see this record, store entry in the window store and forward the record to the output - dedupWindowStore.put(identifier, identifier, message.timestamp()); - processorContext.forward(ProcessingResult.wrapRecordSuccess(message)); - } catch (Exception e) { - processorContext.forward(ProcessingResult.wrapRecordFailure( - e, - message, - "Could not figure out what to do with the current payload: " - + "An unlikely error occurred during deduplication transform")); } + // First time we see this record, store entry in the window store and forward the record to the output + dedupWindowStore.put(identifier, identifier, message.timestamp()); + processorContext.forward(message); } /** @@ -114,20 +120,4 @@ private String buildIdentifier(Headers headers) { .map(key -> getHeader(headers, key)) .collect(Collectors.joining("#")); } - - /** - * Get the header value for a given key - * - * @param headers headers of the record - * @param key the key to look for in the headers - * @return The header value for the given key, or an empty string if the header is not present or has no value. - */ - private static String getHeader(Headers headers, String key) { - Header header = headers.lastHeader(key); - if (header == null || header.value() == null) { - return StringUtils.EMPTY; - } - String value = new String(header.value(), StandardCharsets.UTF_8); - return StringUtils.defaultString(value); - } } diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupHeadersProcessorWithErrors.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupHeadersProcessorWithErrors.java new file mode 100644 index 00000000..b8623b7a --- /dev/null +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupHeadersProcessorWithErrors.java @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.michelin.kstreamplify.deduplication; + +import com.michelin.kstreamplify.error.ProcessingResult; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.Instant; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.avro.specific.SpecificRecord; +import org.apache.commons.lang3.StringUtils; +import org.apache.kafka.common.header.Header; +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.WindowStore; + +/** + * Processor class for the deduplication mechanism on headers of a given topic. + * + * @param The type of the value + */ +public class DedupHeadersProcessorWithErrors + implements Processor> { + + /** Window store name, initialized @ construction. */ + private final String windowStoreName; + /** Retention window for the state store. Used for fetching data. */ + private final Duration retentionWindowDuration; + /** Deduplication headers list. */ + private final List deduplicationHeadersList; + /** Kstream context for this processor. */ + private ProcessorContext> processorContext; + /** Window store containing all the records seen on the given window. */ + private WindowStore dedupWindowStore; + + /** + * Constructor. + * + * @param windowStoreName Name of the deduplication state store + * @param retentionWindowDuration Retention window duration + * @param deduplicationHeadersList Deduplication headers list + */ + public DedupHeadersProcessorWithErrors( + String windowStoreName, Duration retentionWindowDuration, List deduplicationHeadersList) { + this.windowStoreName = windowStoreName; + this.retentionWindowDuration = retentionWindowDuration; + this.deduplicationHeadersList = deduplicationHeadersList; + } + + @Override + public void init(ProcessorContext> context) { + this.processorContext = context; + dedupWindowStore = this.processorContext.getStateStore(windowStoreName); + } + + @Override + public void process(Record message) { + try { + // Get the record timestamp + var currentInstant = Instant.ofEpochMilli(message.timestamp()); + String identifier = buildIdentifier(message.headers()); + + // Retrieve all the matching keys in the stateStore and return null if found it (signaling a duplicate) + try (var resultIterator = dedupWindowStore.backwardFetch( + identifier, + currentInstant.minus(retentionWindowDuration), + currentInstant.plus(retentionWindowDuration))) { + while (resultIterator != null && resultIterator.hasNext()) { + var currentKeyValue = resultIterator.next(); + if (identifier.equals(currentKeyValue.value)) { + return; + } + } + } + // First time we see this record, store entry in the window store and forward the record to the output + dedupWindowStore.put(identifier, identifier, message.timestamp()); + processorContext.forward(ProcessingResult.wrapRecordSuccess(message)); + } catch (Exception e) { + processorContext.forward(ProcessingResult.wrapRecordFailure( + e, + message, + "Could not figure out what to do with the current payload: " + + "An unlikely error occurred during deduplication transform")); + } + } + + /** + * Build an identifier for the record based on the headers and the keys provided. + * + * @param headers The headers of the record + * @return The built identifier + */ + private String buildIdentifier(Headers headers) { + return deduplicationHeadersList.stream() + .map(key -> getHeader(headers, key)) + .collect(Collectors.joining("#")); + } + + /** + * Get the header value for a given key + * + * @param headers headers of the record + * @param key the key to look for in the headers + * @return The header value for the given key, or an empty string if the header is not present or has no value. + */ + private static String getHeader(Headers headers, String key) { + Header header = headers.lastHeader(key); + if (header == null || header.value() == null) { + return StringUtils.EMPTY; + } + String value = new String(header.value(), StandardCharsets.UTF_8); + return StringUtils.defaultString(value); + } +} diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessor.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessor.java index 7b4f1655..ad980181 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessor.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessor.java @@ -18,7 +18,6 @@ */ package com.michelin.kstreamplify.deduplication; -import com.michelin.kstreamplify.error.ProcessingResult; import java.time.Duration; import java.time.Instant; import org.apache.avro.specific.SpecificRecord; @@ -28,24 +27,20 @@ import org.apache.kafka.streams.state.WindowStore; /** - * Transformer class for the deduplication mechanism on keys of a given topic. + * Processor class for the deduplication mechanism on keys of a given topic. * * @param The type of the value */ -public class DedupKeyProcessor - implements Processor> { - - /** Context for this transformer. */ - private ProcessorContext> processorContext; - - /** Window store containing all the records seen on the given window. */ - private WindowStore dedupWindowStore; +public class DedupKeyProcessor implements Processor { /** Window store name, initialized @ construction. */ private final String windowStoreName; - /** Retention window for the state store. Used for fetching data. */ private final Duration retentionWindowDuration; + /** Context for this processor. */ + private ProcessorContext processorContext; + /** Window store containing all the records seen on the given window. */ + private WindowStore dedupWindowStore; /** * Constructor. @@ -59,39 +54,31 @@ public DedupKeyProcessor(String windowStoreName, Duration retentionWindowDuratio } @Override - public void init(ProcessorContext> context) { + public void init(ProcessorContext context) { processorContext = context; dedupWindowStore = this.processorContext.getStateStore(windowStoreName); } @Override public void process(Record message) { - try { - // Get the record timestamp - var currentInstant = Instant.ofEpochMilli(message.timestamp()); + // Get the record timestamp + var currentInstant = Instant.ofEpochMilli(message.timestamp()); - // Retrieve all the matching keys in the stateStore and return null if found it (signaling a duplicate) - try (var resultIterator = dedupWindowStore.backwardFetch( - message.key(), - currentInstant.minus(retentionWindowDuration), - currentInstant.plus(retentionWindowDuration))) { - while (resultIterator != null && resultIterator.hasNext()) { - var currentKeyValue = resultIterator.next(); - if (message.key().equals(currentKeyValue.value)) { - return; - } + // Retrieve all the matching keys in the stateStore and return null if found it (signaling a duplicate) + try (var resultIterator = dedupWindowStore.backwardFetch( + message.key(), + currentInstant.minus(retentionWindowDuration), + currentInstant.plus(retentionWindowDuration))) { + while (resultIterator != null && resultIterator.hasNext()) { + var currentKeyValue = resultIterator.next(); + if (message.key().equals(currentKeyValue.value)) { + return; } } - - // First time we see this record, store entry in the window store and forward the record to the output - dedupWindowStore.put(message.key(), message.key(), message.timestamp()); - processorContext.forward(ProcessingResult.wrapRecordSuccess(message)); - } catch (Exception e) { - processorContext.forward(ProcessingResult.wrapRecordFailure( - e, - message, - "Could not figure out what to do with the current payload: " - + "An unlikely error occurred during deduplication transform")); } + + // First time we see this record, store entry in the window store and forward the record to the output + dedupWindowStore.put(message.key(), message.key(), message.timestamp()); + processorContext.forward(message); } } diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessorWithErrors.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessorWithErrors.java new file mode 100644 index 00000000..aeeaf065 --- /dev/null +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessorWithErrors.java @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.michelin.kstreamplify.deduplication; + +import com.michelin.kstreamplify.error.ProcessingResult; +import java.time.Duration; +import java.time.Instant; +import org.apache.avro.specific.SpecificRecord; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.WindowStore; + +/** + * Processor class for the deduplication mechanism on keys of a given topic. + * + * @param The type of the value + */ +public class DedupKeyProcessorWithErrors + implements Processor> { + + /** Context for this processor. */ + private ProcessorContext> processorContext; + + /** Window store containing all the records seen on the given window. */ + private WindowStore dedupWindowStore; + + /** Window store name, initialized @ construction. */ + private final String windowStoreName; + + /** Retention window for the state store. Used for fetching data. */ + private final Duration retentionWindowDuration; + + /** + * Constructor. + * + * @param windowStoreName The name of the constructor + * @param retentionWindowDuration The retentionWindow Duration + */ + public DedupKeyProcessorWithErrors(String windowStoreName, Duration retentionWindowDuration) { + this.windowStoreName = windowStoreName; + this.retentionWindowDuration = retentionWindowDuration; + } + + @Override + public void init(ProcessorContext> context) { + processorContext = context; + dedupWindowStore = this.processorContext.getStateStore(windowStoreName); + } + + @Override + public void process(Record message) { + try { + // Get the record timestamp + var currentInstant = Instant.ofEpochMilli(message.timestamp()); + + // Retrieve all the matching keys in the stateStore and return null if found it (signaling a duplicate) + try (var resultIterator = dedupWindowStore.backwardFetch( + message.key(), + currentInstant.minus(retentionWindowDuration), + currentInstant.plus(retentionWindowDuration))) { + while (resultIterator != null && resultIterator.hasNext()) { + var currentKeyValue = resultIterator.next(); + if (message.key().equals(currentKeyValue.value)) { + return; + } + } + } + + // First time we see this record, store entry in the window store and forward the record to the output + dedupWindowStore.put(message.key(), message.key(), message.timestamp()); + processorContext.forward(ProcessingResult.wrapRecordSuccess(message)); + } catch (Exception e) { + processorContext.forward(ProcessingResult.wrapRecordFailure( + e, + message, + "Could not figure out what to do with the current payload: " + + "An unlikely error occurred during deduplication transform")); + } + } +} diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessor.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessor.java index cfb0812c..7b8247b0 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessor.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessor.java @@ -18,7 +18,6 @@ */ package com.michelin.kstreamplify.deduplication; -import com.michelin.kstreamplify.error.ProcessingResult; import java.time.Duration; import java.time.Instant; import org.apache.avro.specific.SpecificRecord; @@ -28,24 +27,20 @@ import org.apache.kafka.streams.state.WindowStore; /** - * Transformer class for the deduplication mechanism on both keys and values of a given topic. + * Processor class for the deduplication mechanism on both keys and values of a given topic. * * @param The type of the value */ -public class DedupKeyValueProcessor - implements Processor> { - - /** Kstream context for this transformer. */ - private ProcessorContext> processorContext; - - /** Window store containing all the records seen on the given window. */ - private WindowStore dedupWindowStore; +public class DedupKeyValueProcessor implements Processor { /** Window store name, initialized @ construction. */ private final String windowStoreName; - /** Retention window for the state store. Used for fetching data. */ private final Duration retentionWindowDuration; + /** Kstream context for this processor. */ + private ProcessorContext processorContext; + /** Window store containing all the records seen on the given window. */ + private WindowStore dedupWindowStore; /** * Constructor. @@ -59,7 +54,7 @@ public DedupKeyValueProcessor(String windowStoreName, Duration retentionWindowHo } @Override - public void init(ProcessorContext> context) { + public void init(ProcessorContext context) { this.processorContext = context; dedupWindowStore = this.processorContext.getStateStore(windowStoreName); @@ -67,32 +62,24 @@ public void init(ProcessorContext> context) { @Override public void process(Record message) { - try { - // Get the record timestamp - var currentInstant = Instant.ofEpochMilli(message.timestamp()); + // Get the record timestamp + var currentInstant = Instant.ofEpochMilli(message.timestamp()); - // Retrieve all the matching keys in the stateStore and return null if found it (signaling a duplicate) - try (var resultIterator = dedupWindowStore.backwardFetch( - message.key(), - currentInstant.minus(retentionWindowDuration), - currentInstant.plus(retentionWindowDuration))) { - while (resultIterator != null && resultIterator.hasNext()) { - var currentKeyValue = resultIterator.next(); - if (message.value().equals(currentKeyValue.value)) { - return; - } + // Retrieve all the matching keys in the stateStore and return null if found it (signaling a duplicate) + try (var resultIterator = dedupWindowStore.backwardFetch( + message.key(), + currentInstant.minus(retentionWindowDuration), + currentInstant.plus(retentionWindowDuration))) { + while (resultIterator != null && resultIterator.hasNext()) { + var currentKeyValue = resultIterator.next(); + if (message.value().equals(currentKeyValue.value)) { + return; } } - - // First time we see this record, store entry in the window store and forward the record to the output - dedupWindowStore.put(message.key(), message.value(), message.timestamp()); - processorContext.forward(ProcessingResult.wrapRecordSuccess(message)); - } catch (Exception e) { - processorContext.forward(ProcessingResult.wrapRecordFailure( - e, - message, - "Could not figure out what to do with the current payload: " - + "An unlikely error occurred during deduplication transform")); } + + // First time we see this record, store entry in the window store and forward the record to the output + dedupWindowStore.put(message.key(), message.value(), message.timestamp()); + processorContext.forward(message); } } diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessorWithErrors.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessorWithErrors.java new file mode 100644 index 00000000..8e47121d --- /dev/null +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessorWithErrors.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.michelin.kstreamplify.deduplication; + +import com.michelin.kstreamplify.error.ProcessingResult; +import java.time.Duration; +import java.time.Instant; +import org.apache.avro.specific.SpecificRecord; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.WindowStore; + +/** + * Processor class for the deduplication mechanism on both keys and values of a given topic. + * + * @param The type of the value + */ +public class DedupKeyValueProcessorWithErrors + implements Processor> { + + /** Kstream context for this processor. */ + private ProcessorContext> processorContext; + + /** Window store containing all the records seen on the given window. */ + private WindowStore dedupWindowStore; + + /** Window store name, initialized @ construction. */ + private final String windowStoreName; + + /** Retention window for the state store. Used for fetching data. */ + private final Duration retentionWindowDuration; + + /** + * Constructor. + * + * @param windowStoreName The window store name + * @param retentionWindowHours The retention window duration + */ + public DedupKeyValueProcessorWithErrors(String windowStoreName, Duration retentionWindowHours) { + this.windowStoreName = windowStoreName; + this.retentionWindowDuration = retentionWindowHours; + } + + @Override + public void init(ProcessorContext> context) { + this.processorContext = context; + + dedupWindowStore = this.processorContext.getStateStore(windowStoreName); + } + + @Override + public void process(Record message) { + try { + // Get the record timestamp + var currentInstant = Instant.ofEpochMilli(message.timestamp()); + + // Retrieve all the matching keys in the stateStore and return null if found it (signaling a duplicate) + try (var resultIterator = dedupWindowStore.backwardFetch( + message.key(), + currentInstant.minus(retentionWindowDuration), + currentInstant.plus(retentionWindowDuration))) { + while (resultIterator != null && resultIterator.hasNext()) { + var currentKeyValue = resultIterator.next(); + if (message.value().equals(currentKeyValue.value)) { + return; + } + } + } + + // First time we see this record, store entry in the window store and forward the record to the output + dedupWindowStore.put(message.key(), message.value(), message.timestamp()); + processorContext.forward(ProcessingResult.wrapRecordSuccess(message)); + } catch (Exception e) { + processorContext.forward(ProcessingResult.wrapRecordFailure( + e, + message, + "Could not figure out what to do with the current payload: " + + "An unlikely error occurred during deduplication transform")); + } + } +} diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessor.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessor.java index 4f0e0bf1..899270a6 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessor.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessor.java @@ -18,7 +18,6 @@ */ package com.michelin.kstreamplify.deduplication; -import com.michelin.kstreamplify.error.ProcessingResult; import java.time.Duration; import java.time.Instant; import java.util.function.Function; @@ -29,28 +28,23 @@ import org.apache.kafka.streams.state.WindowStore; /** - * Transformer class for the deduplication mechanism on predicate of a given topic. + * Processor class for the deduplication mechanism on predicate of a given topic. * * @param The type of the key * @param The type of the value */ -public class DedupWithPredicateProcessor - implements Processor> { - - /** Kstream context for this transformer. */ - private ProcessorContext> processorContext; - - /** Window store containing all the records seen on the given window. */ - private WindowStore dedupWindowStore; +public class DedupWithPredicateProcessor implements Processor { /** Window store name, initialized @ construction. */ private final String windowStoreName; - /** Retention window for the state store. Used for fetching data. */ private final Duration retentionWindowDuration; - /** Deduplication key extractor. */ private final Function deduplicationKeyExtractor; + /** Kstream context for this processor. */ + private ProcessorContext processorContext; + /** Window store containing all the records seen on the given window. */ + private WindowStore dedupWindowStore; /** * Constructor. @@ -67,41 +61,32 @@ public DedupWithPredicateProcessor( } @Override - public void init(ProcessorContext> context) { + public void init(ProcessorContext context) { this.processorContext = context; dedupWindowStore = this.processorContext.getStateStore(windowStoreName); } @Override public void process(Record message) { - try { - // Get the record timestamp - var currentInstant = Instant.ofEpochMilli(message.timestamp()); - String identifier = deduplicationKeyExtractor.apply(message.value()); + // Get the record timestamp + var currentInstant = Instant.ofEpochMilli(message.timestamp()); + String identifier = deduplicationKeyExtractor.apply(message.value()); - // Retrieve all the matching keys in the stateStore and return null if found it (signaling a duplicate) - try (var resultIterator = dedupWindowStore.backwardFetch( - identifier, - currentInstant.minus(retentionWindowDuration), - currentInstant.plus(retentionWindowDuration))) { - while (resultIterator != null && resultIterator.hasNext()) { - var currentKeyValue = resultIterator.next(); - if (identifier.equals(deduplicationKeyExtractor.apply(currentKeyValue.value))) { - return; - } + // Retrieve all the matching keys in the stateStore and return null if found it (signaling a duplicate) + try (var resultIterator = dedupWindowStore.backwardFetch( + identifier, + currentInstant.minus(retentionWindowDuration), + currentInstant.plus(retentionWindowDuration))) { + while (resultIterator != null && resultIterator.hasNext()) { + var currentKeyValue = resultIterator.next(); + if (identifier.equals(deduplicationKeyExtractor.apply(currentKeyValue.value))) { + return; } } - - // First time we see this record, store entry in the window store and forward the record to the output - dedupWindowStore.put(identifier, message.value(), message.timestamp()); - processorContext.forward(ProcessingResult.wrapRecordSuccess(message)); - - } catch (Exception e) { - processorContext.forward(ProcessingResult.wrapRecordFailure( - e, - message, - "Could not figure out what to do with the current payload: " - + "An unlikely error occurred during deduplication transform")); } + + // First time we see this record, store entry in the window store and forward the record to the output + dedupWindowStore.put(identifier, message.value(), message.timestamp()); + processorContext.forward(message); } } diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorWithErrors.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorWithErrors.java new file mode 100644 index 00000000..75dde8b3 --- /dev/null +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorWithErrors.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.michelin.kstreamplify.deduplication; + +import com.michelin.kstreamplify.error.ProcessingResult; +import java.time.Duration; +import java.time.Instant; +import java.util.function.Function; +import org.apache.avro.specific.SpecificRecord; +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.WindowStore; + +/** + * Processor class for the deduplication mechanism on predicate of a given topic. + * + * @param The type of the key + * @param The type of the value + */ +public class DedupWithPredicateProcessorWithErrors + implements Processor> { + + /** Kstream context for this processor. */ + private ProcessorContext> processorContext; + + /** Window store containing all the records seen on the given window. */ + private WindowStore dedupWindowStore; + + /** Window store name, initialized @ construction. */ + private final String windowStoreName; + + /** Retention window for the state store. Used for fetching data. */ + private final Duration retentionWindowDuration; + + /** Deduplication key extractor. */ + private final Function deduplicationKeyExtractor; + + /** + * Constructor. + * + * @param windowStoreName Name of the deduplication state store + * @param retentionWindowDuration Retention window duration + * @param deduplicationKeyExtractor Deduplication function + */ + public DedupWithPredicateProcessorWithErrors( + String windowStoreName, Duration retentionWindowDuration, Function deduplicationKeyExtractor) { + this.windowStoreName = windowStoreName; + this.retentionWindowDuration = retentionWindowDuration; + this.deduplicationKeyExtractor = deduplicationKeyExtractor; + } + + @Override + public void init(ProcessorContext> context) { + this.processorContext = context; + dedupWindowStore = this.processorContext.getStateStore(windowStoreName); + } + + @Override + public void process(Record message) { + try { + // Get the record timestamp + var currentInstant = Instant.ofEpochMilli(message.timestamp()); + String identifier = deduplicationKeyExtractor.apply(message.value()); + + // Retrieve all the matching keys in the stateStore and return null if found it (signaling a duplicate) + try (var resultIterator = dedupWindowStore.backwardFetch( + identifier, + currentInstant.minus(retentionWindowDuration), + currentInstant.plus(retentionWindowDuration))) { + while (resultIterator != null && resultIterator.hasNext()) { + var currentKeyValue = resultIterator.next(); + if (identifier.equals(deduplicationKeyExtractor.apply(currentKeyValue.value))) { + return; + } + } + } + + // First time we see this record, store entry in the window store and forward the record to the output + dedupWindowStore.put(identifier, message.value(), message.timestamp()); + processorContext.forward(ProcessingResult.wrapRecordSuccess(message)); + + } catch (Exception e) { + processorContext.forward(ProcessingResult.wrapRecordFailure( + e, + message, + "Could not figure out what to do with the current payload: " + + "An unlikely error occurred during deduplication transform")); + } + } +} diff --git a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DeduplicationUtils.java b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DeduplicationUtils.java index 08e10032..74ad3e1f 100644 --- a/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DeduplicationUtils.java +++ b/kstreamplify-core/src/main/java/com/michelin/kstreamplify/deduplication/DeduplicationUtils.java @@ -40,21 +40,39 @@ public final class DeduplicationUtils { private DeduplicationUtils() {} + /** @deprecated Since 1.7.0, use {@link #distinctKeysWithErrors(StreamsBuilder, KStream, Duration)} instead. */ + @Deprecated(since = "1.7.0", forRemoval = true) + public static KStream> deduplicateKeys( + StreamsBuilder streamsBuilder, KStream initialStream, Duration windowDuration) { + + return distinctKeysWithErrors( + streamsBuilder, + initialStream, + DEFAULT_DEDUP_NAME + DEFAULT_WINDOWSTORE, + DEFAULT_DEDUP_NAME + DEFAULT_REPARTITION, + windowDuration); + } + /** - * Deduplicate the input stream on the input key using a window store for the given period of time. This constructor - * should not be used if using the deduplicator multiple times in the same topology. - * - * @param streamsBuilder Stream builder instance for topology editing - * @param initialStream Stream containing the events that should be deduplicated - * @param windowDuration Window of time on which we should watch out for duplicates - * @param Generic Type of the Stream value. Key type is not implemented because using anything other than a - * String as the key is retarded. You can quote me on this. - * @return KStream with a processingResult + * @deprecated Since 1.7.0, use {@link #distinctKeysWithErrors(StreamsBuilder, KStream, String, String, Duration)} + * instead. */ + @Deprecated(since = "1.7.0", forRemoval = true) public static KStream> deduplicateKeys( + StreamsBuilder streamsBuilder, + KStream initialStream, + String storeName, + String repartitionName, + Duration windowDuration) { + + return distinctKeysWithErrors(streamsBuilder, initialStream, storeName, repartitionName, windowDuration); + } + + /** See {@link #distinctKeysWithErrors(StreamsBuilder, KStream, String, String, Duration)} */ + public static KStream> distinctKeysWithErrors( StreamsBuilder streamsBuilder, KStream initialStream, Duration windowDuration) { - return deduplicateKeys( + return distinctKeysWithErrors( streamsBuilder, initialStream, DEFAULT_DEDUP_NAME + DEFAULT_WINDOWSTORE, @@ -63,18 +81,21 @@ public static KStream> } /** - * Deduplicate the input stream on the input key using a window store for the given period of time. + * Deduplicates records from the input stream using the record key. + * + *

Records with identical keys within the configured time window are considered duplicates and are filtered out. + * + *

A window store is used to track seen keys during the specified {@code windowDuration}. * - * @param streamsBuilder Stream builder instance for topology editing - * @param initialStream Stream containing the events that should be deduplicated - * @param storeName State store name - * @param repartitionName Repartition topic name - * @param windowDuration Window of time to keep in the window store - * @param Generic Type of the Stream value. Key type is not implemented because using anything other than a - * String as the key is retarded. You can quote me on this. - * @return Resulting de-duplicated Stream + * @param streamsBuilder the {@link StreamsBuilder} used to build the topology + * @param initialStream the input stream to deduplicate (must have String keys) + * @param storeName the name of the state store used for deduplication + * @param repartitionName the name of the repartition topic + * @param windowDuration the time window during which duplicates are filtered + * @param the value type of the stream + * @return a deduplicated stream containing {@link ProcessingResult} */ - public static KStream> deduplicateKeys( + public static KStream> distinctKeysWithErrors( StreamsBuilder streamsBuilder, KStream initialStream, String storeName, @@ -90,24 +111,65 @@ public static KStream> var repartitioned = initialStream.repartition(Repartitioned.with(Serdes.String(), SerdesUtils.getValueSerdes()) .withName(repartitionName)); - return repartitioned.process(() -> new DedupKeyProcessor<>(storeName, windowDuration), storeName); + + return repartitioned.process(() -> new DedupKeyProcessorWithErrors<>(storeName, windowDuration), storeName); + } + + /** See {@link #distinctKeys(StreamsBuilder, KStream, String, String, Duration)} */ + public static KStream distinctKeys( + StreamsBuilder streamsBuilder, KStream initialStream, Duration windowDuration) { + + return distinctKeys( + streamsBuilder, + initialStream, + DEFAULT_DEDUP_NAME + DEFAULT_WINDOWSTORE, + DEFAULT_DEDUP_NAME + DEFAULT_REPARTITION, + windowDuration); } /** - * Deduplicate the input stream on the input key and value using a window store for the given period of time. This - * constructor should not be used if using the deduplicator multiple times in the same topology. + * Deduplicates records from the input stream using the record key. + * + *

Records with identical keys within the configured time window are considered duplicates and are filtered out. + * + *

A window store is used to track seen keys during the specified {@code windowDuration}. * - * @param streamsBuilder Stream builder instance for topology editing - * @param initialStream Stream containing the events that should be deduplicated - * @param windowDuration Window of time on which we should watch out for duplicates - * @param Generic Type of the Stream value. Key type is not implemented because using anything other than a - * String as the key is retarded. You can quote me on this. - * @return KStream with a processingResult + * @param streamsBuilder the {@link StreamsBuilder} used to build the topology + * @param initialStream the input stream to deduplicate (must have String keys) + * @param storeName the name of the state store used for deduplication + * @param repartitionName the name of the repartition topic + * @param windowDuration the time window during which duplicates are filtered + * @param the value type of the stream + * @return a deduplicated stream containing + */ + public static KStream distinctKeys( + StreamsBuilder streamsBuilder, + KStream initialStream, + String storeName, + String repartitionName, + Duration windowDuration) { + + StoreBuilder> dedupWindowStore = Stores.windowStoreBuilder( + Stores.persistentWindowStore(storeName, windowDuration, windowDuration, false), + Serdes.String(), + Serdes.String()); + streamsBuilder.addStateStore(dedupWindowStore); + + var repartitioned = + initialStream.repartition(Repartitioned.with(Serdes.String(), SerdesUtils.getValueSerdes()) + .withName(repartitionName)); + + return repartitioned.process(() -> new DedupKeyProcessor<>(storeName, windowDuration), storeName); + } + + /** + * @deprecated Since 1.7.0, use {@link #distinctByKeyValuesWithErrors(StreamsBuilder, KStream, Duration)} instead. */ + @Deprecated(since = "1.7.0", forRemoval = true) public static KStream> deduplicateKeyValues( StreamsBuilder streamsBuilder, KStream initialStream, Duration windowDuration) { - return deduplicateKeyValues( + return distinctByKeyValuesWithErrors( streamsBuilder, initialStream, DEFAULT_DEDUP_NAME + DEFAULT_WINDOWSTORE, @@ -116,18 +178,10 @@ public static KStream> } /** - * Deduplicate the input stream on the input key and Value using a window store for the given period of time. The - * input stream should have a String key. - * - * @param streamsBuilder Stream builder instance for topology editing - * @param initialStream Stream containing the events that should be deduplicated - * @param storeName State store name - * @param repartitionName Repartition topic name - * @param windowDuration Window of time to keep in the window store - * @param Generic Type of the Stream value. Key type is not implemented because using anything other than a - * String as the key is retarded. You can quote me on this. - * @return Resulting de-duplicated Stream + * @deprecated Since 1.7.0, use {@link #distinctByKeyValuesWithErrors(StreamsBuilder, KStream, String, String, + * Duration)} instead. */ + @Deprecated(since = "1.7.0", forRemoval = true) public static KStream> deduplicateKeyValues( StreamsBuilder streamsBuilder, KStream initialStream, @@ -135,6 +189,42 @@ public static KStream> String repartitionName, Duration windowDuration) { + return distinctByKeyValuesWithErrors(streamsBuilder, initialStream, storeName, repartitionName, windowDuration); + } + + /** See {@link #distinctByKeyValuesWithErrors(StreamsBuilder, KStream, String, String, Duration)} */ + public static KStream> distinctByKeyValuesWithErrors( + StreamsBuilder streamsBuilder, KStream initialStream, Duration windowDuration) { + + return distinctByKeyValuesWithErrors( + streamsBuilder, + initialStream, + DEFAULT_DEDUP_NAME + DEFAULT_WINDOWSTORE, + DEFAULT_DEDUP_NAME + DEFAULT_REPARTITION, + windowDuration); + } + + /** + * Deduplicates records from the input stream using both key and value. + * + *

Records with identical key-value pairs within the configured time window are considered duplicates and are + * filtered out. + * + * @param streamsBuilder the {@link StreamsBuilder} used to build the topology + * @param initialStream the input stream to deduplicate + * @param storeName the name of the state store used for deduplication + * @param repartitionName the name of the repartition topic + * @param windowDuration the time window during which duplicates are filtered + * @param the value type of the stream + * @return a deduplicated stream containing {@link ProcessingResult} + */ + public static KStream> distinctByKeyValuesWithErrors( + StreamsBuilder streamsBuilder, + KStream initialStream, + String storeName, + String repartitionName, + Duration windowDuration) { + StoreBuilder> dedupWindowStore = Stores.windowStoreBuilder( Stores.persistentWindowStore(storeName, windowDuration, windowDuration, false), Serdes.String(), @@ -144,64 +234,132 @@ public static KStream> var repartitioned = initialStream.repartition(Repartitioned.with(Serdes.String(), SerdesUtils.getValueSerdes()) .withName(repartitionName)); - return repartitioned.process(() -> new DedupKeyValueProcessor<>(storeName, windowDuration), storeName); + + return repartitioned.process( + () -> new DedupKeyValueProcessorWithErrors<>(storeName, windowDuration), storeName); + } + + /** See {@link #distinctByKeyValues(StreamsBuilder, KStream, String, String, Duration)} */ + public static KStream distinctByKeyValues( + StreamsBuilder streamsBuilder, KStream initialStream, Duration windowDuration) { + + return distinctByKeyValues( + streamsBuilder, + initialStream, + DEFAULT_DEDUP_NAME + DEFAULT_WINDOWSTORE, + DEFAULT_DEDUP_NAME + DEFAULT_REPARTITION, + windowDuration); } /** - * Deduplicate the input stream by applying the deduplicationKeyExtractor function on each record to generate a - * unique signature for the record. Uses a window store for the given period of time. The input stream should have a - * String key. This constructor should not be used if using the deduplicator multiple times in the same topology. - * Use {@link DeduplicationUtils#deduplicateWithPredicate(StreamsBuilder, KStream, String storeName, String - * repartitionName, Duration, Function)} in this scenario. + * Deduplicates records from the input stream using both key and value. + * + *

Records with identical key-value pairs within the configured time window are considered duplicates and are + * filtered out. * - * @param streamsBuilder Stream builder instance for topology editing - * @param initialStream Stream containing the events that should be deduplicated - * @param windowDuration Window of time to keep in the window store - * @param deduplicationKeyExtractor Function that should extract a deduplication key in String format. This key acts - * like a comparison vector. A recommended approach is to concatenate all necessary fields in String format to - * provide a unique identifier for comparison between records. - * @param Generic Type of the Stream value. Key type is not implemented because using anything other than a - * String as the key is retarded. You can quote me on this. - * @return Resulting de-duplicated Stream + * @param streamsBuilder the {@link StreamsBuilder} used to build the topology + * @param initialStream the input stream to deduplicate + * @param storeName the name of the state store used for deduplication + * @param repartitionName the name of the repartition topic + * @param windowDuration the time window during which duplicates are filtered + * @param the value type of the stream + * @return a deduplicated stream containing */ + public static KStream distinctByKeyValues( + StreamsBuilder streamsBuilder, + KStream initialStream, + String storeName, + String repartitionName, + Duration windowDuration) { + + StoreBuilder> dedupWindowStore = Stores.windowStoreBuilder( + Stores.persistentWindowStore(storeName, windowDuration, windowDuration, false), + Serdes.String(), + SerdesUtils.getValueSerdes()); + streamsBuilder.addStateStore(dedupWindowStore); + + var repartitioned = + initialStream.repartition(Repartitioned.with(Serdes.String(), SerdesUtils.getValueSerdes()) + .withName(repartitionName)); + + return repartitioned.process(() -> new DedupKeyValueProcessor<>(storeName, windowDuration), storeName); + } + + /** + * @deprecated Since 1.7.0, use {@link #distinctByPredicateWithErrors(StreamsBuilder, KStream, Duration, Function)} + * instead. + */ + @Deprecated(since = "1.7.0", forRemoval = true) public static KStream> deduplicateWithPredicate( StreamsBuilder streamsBuilder, KStream initialStream, Duration windowDuration, - Function deduplicationKeyExtractor) { - return deduplicateWithPredicate( + Function extractor) { + + return distinctByPredicateWithErrors( streamsBuilder, initialStream, DEFAULT_DEDUP_NAME + DEFAULT_WINDOWSTORE, DEFAULT_DEDUP_NAME + DEFAULT_REPARTITION, windowDuration, - deduplicationKeyExtractor); + extractor); } /** - * Deduplicate the input stream by applying the deduplicationKeyExtractor function on each record to generate a - * unique signature for the record. Uses a window store for the given period of time. The input stream should have a - * String key. - * - * @param streamsBuilder Stream builder instance for topology editing - * @param initialStream Stream containing the events that should be deduplicated - * @param storeName State store name - * @param repartitionName Repartition topic name - * @param windowDuration Window of time to keep in the window store - * @param deduplicationKeyExtractor Function that should extract a deduplication key in String format. This key acts - * like a comparison vector. A recommended approach is to concatenate all necessary fields in String format to - * provide a unique identifier for comparison between records. - * @param Generic Type of the Stream value. Key type is not implemented because using anything other than a - * String as the key is retarded. You can quote me on this. - * @return Resulting de-duplicated Stream + * @deprecated Since 1.7.0, use {@link #distinctByPredicateWithErrors(StreamsBuilder, KStream, String, String, + * Duration, Function)} instead. */ + @Deprecated(since = "1.7.0", forRemoval = true) public static KStream> deduplicateWithPredicate( StreamsBuilder streamsBuilder, KStream initialStream, String storeName, String repartitionName, Duration windowDuration, - Function deduplicationKeyExtractor) { + Function extractor) { + + return distinctByPredicateWithErrors( + streamsBuilder, initialStream, storeName, repartitionName, windowDuration, extractor); + } + + /** See {@link #distinctByPredicateWithErrors(StreamsBuilder, KStream, String, String, Duration, Function)} */ + public static KStream> distinctByPredicateWithErrors( + StreamsBuilder streamsBuilder, + KStream initialStream, + Duration windowDuration, + Function extractor) { + + return distinctByPredicateWithErrors( + streamsBuilder, + initialStream, + DEFAULT_DEDUP_NAME + DEFAULT_WINDOWSTORE, + DEFAULT_DEDUP_NAME + DEFAULT_REPARTITION, + windowDuration, + extractor); + } + + /** + * Deduplicates records from the input stream using a computed deduplication key. + * + *

The provided extractor builds a deduplication key for each record. Records with identical keys within the + * configured time window are considered duplicates and are filtered out. + * + * @param streamsBuilder the {@link StreamsBuilder} + * @param initialStream the input stream + * @param storeName state store name + * @param repartitionName repartition topic name + * @param windowDuration deduplication window + * @param extractor function building the deduplication key + * @param value type + * @return a deduplicated stream containing {@link ProcessingResult} + */ + public static KStream> distinctByPredicateWithErrors( + StreamsBuilder streamsBuilder, + KStream initialStream, + String storeName, + String repartitionName, + Duration windowDuration, + Function extractor) { StoreBuilder> dedupWindowStore = Stores.windowStoreBuilder( Stores.persistentWindowStore(storeName, windowDuration, windowDuration, false), @@ -212,39 +370,164 @@ public static KStream> var repartitioned = initialStream.repartition(Repartitioned.with(Serdes.String(), SerdesUtils.getValueSerdes()) .withName(repartitionName)); + return repartitioned.process( - () -> new DedupWithPredicateProcessor<>(storeName, windowDuration, deduplicationKeyExtractor), - storeName); + () -> new DedupWithPredicateProcessorWithErrors<>(storeName, windowDuration, extractor), storeName); + } + + /** See {@link #distinctByPredicateWithErrors(StreamsBuilder, KStream, String, String, Duration, Function)} */ + public static KStream> distinctByPredicate( + StreamsBuilder streamsBuilder, + KStream initialStream, + Duration windowDuration, + Function extractor) { + + return distinctByPredicateWithErrors( + streamsBuilder, + initialStream, + DEFAULT_DEDUP_NAME + DEFAULT_WINDOWSTORE, + DEFAULT_DEDUP_NAME + DEFAULT_REPARTITION, + windowDuration, + extractor); } /** - * Deduplicates records from the input stream based on a computed key derived from each record. + * Deduplicates records from the input stream using a computed deduplication key. * - *

The provided {@code deduplicationHeadersExtractor} generates a list of String values that together form a - * unique identifier for a record. Records with the same identifier within the given time window are considered - * duplicates. + *

The provided extractor builds a deduplication key for each record. Records with identical keys within the + * configured time window are considered duplicates and are filtered out. * - *

A window store is used to track seen identifiers for the specified {@code windowDuration}. + * @param streamsBuilder the {@link StreamsBuilder} + * @param initialStream the input stream + * @param storeName state store name + * @param repartitionName repartition topic name + * @param windowDuration deduplication window + * @param extractor function building the deduplication key + * @param value type + * @return a deduplicated stream containing + */ + public static KStream distinctByPredicate( + StreamsBuilder streamsBuilder, + KStream initialStream, + String storeName, + String repartitionName, + Duration windowDuration, + Function extractor) { + + StoreBuilder> dedupWindowStore = Stores.windowStoreBuilder( + Stores.persistentWindowStore(storeName, windowDuration, windowDuration, false), + Serdes.String(), + SerdesUtils.getValueSerdes()); + streamsBuilder.addStateStore(dedupWindowStore); + + var repartitioned = + initialStream.repartition(Repartitioned.with(Serdes.String(), SerdesUtils.getValueSerdes()) + .withName(repartitionName)); + + return repartitioned.process( + () -> new DedupWithPredicateProcessor<>(storeName, windowDuration, extractor), storeName); + } + + /** + * @deprecated Since 1.7.0, use {@link #distinctByHeadersWithErrors(StreamsBuilder, KStream, Duration, List)} + * instead. + */ + @Deprecated(since = "1.7.0", forRemoval = true) + public static KStream> deduplicateWithHeaders( + StreamsBuilder streamsBuilder, + KStream initialStream, + Duration windowDuration, + List deduplicationHeadersList) { + + return distinctByHeadersWithErrors( + streamsBuilder, + initialStream, + DEFAULT_DEDUP_NAME + DEFAULT_WINDOWSTORE, + DEFAULT_DEDUP_NAME + DEFAULT_REPARTITION, + windowDuration, + deduplicationHeadersList); + } + + /** + * @deprecated since 1.7.0, use {@link #distinctByHeadersWithErrors(StreamsBuilder, KStream, String, String, + * Duration, List)} instead. + */ + @Deprecated(since = "1.7.0", forRemoval = true) + public static KStream> deduplicateWithHeaders( + StreamsBuilder streamsBuilder, + KStream initialStream, + String storeName, + String repartitionName, + Duration windowDuration, + List deduplicationHeadersList) { + + return distinctByHeadersWithErrors( + streamsBuilder, initialStream, storeName, repartitionName, windowDuration, deduplicationHeadersList); + } + + /** See {@link #distinctByHeadersWithErrors(StreamsBuilder, KStream, String, String, Duration, List)} */ + public static KStream> distinctByHeadersWithErrors( + StreamsBuilder streamsBuilder, + KStream initialStream, + Duration windowDuration, + List deduplicationHeadersList) { + + return distinctByHeadersWithErrors( + streamsBuilder, + initialStream, + DEFAULT_DEDUP_NAME + DEFAULT_WINDOWSTORE, + DEFAULT_DEDUP_NAME + DEFAULT_REPARTITION, + windowDuration, + deduplicationHeadersList); + } + + /** + * Deduplicates records from the input stream using a composite key built from the provided headers. * - *

Note: This method uses internally generated store and repartition names. It should not be used multiple - * times in the same topology. In such cases, use {@link DeduplicationUtils#deduplicateWithHeaders(StreamsBuilder, - * KStream, String, String, Duration, List)}. + *

The {@code deduplicationHeadersList} defines which headers are used to build the deduplication key. Records + * with identical header values within the configured time window are considered duplicates and filtered out. + * + *

A window store is used to track seen keys during the specified {@code windowDuration}. * * @param streamsBuilder the {@link StreamsBuilder} used to build the topology * @param initialStream the input stream to deduplicate (must have String keys) + * @param storeName the name of the state store used for deduplication + * @param repartitionName the name of the repartition topic * @param windowDuration the time window during which duplicates are filtered - * @param deduplicationHeadersList list of header names to extract from each record for deduplication. The - * combination of these header values forms the unique identifier for deduplication. + * @param deduplicationHeadersList list of header names used to build the deduplication key * @param the value type of the stream * @return a deduplicated stream containing {@link ProcessingResult} */ - public static KStream> deduplicateWithHeaders( + public static KStream> distinctByHeadersWithErrors( + StreamsBuilder streamsBuilder, + KStream initialStream, + String storeName, + String repartitionName, + Duration windowDuration, + List deduplicationHeadersList) { + + StoreBuilder> dedupWindowStore = Stores.windowStoreBuilder( + Stores.persistentWindowStore(storeName, windowDuration, windowDuration, false), + Serdes.String(), + Serdes.String()); + streamsBuilder.addStateStore(dedupWindowStore); + + var repartitioned = + initialStream.repartition(Repartitioned.with(Serdes.String(), SerdesUtils.getValueSerdes()) + .withName(repartitionName)); + return repartitioned.process( + () -> new DedupHeadersProcessorWithErrors<>(storeName, windowDuration, deduplicationHeadersList), + storeName); + } + + /** See {@link #distinctByHeaders(StreamsBuilder, KStream, String, String, Duration, List)} */ + public static KStream distinctByHeaders( StreamsBuilder streamsBuilder, KStream initialStream, Duration windowDuration, List deduplicationHeadersList) { - return deduplicateWithHeaders( + return distinctByHeaders( streamsBuilder, initialStream, DEFAULT_DEDUP_NAME + DEFAULT_WINDOWSTORE, @@ -254,25 +537,23 @@ public static KStream> } /** - * Deduplicates records from the input stream based on a computed key derived from each record. + * Deduplicates records from the input stream using a composite key built from the provided headers. * - *

The {@code deduplicationHeadersExtractor} produces a list of String values used to build a unique identifier - * for each record. Records sharing the same identifier within the configured time window are considered duplicates. + *

The {@code deduplicationHeadersList} defines which headers are used to build the deduplication key. Records + * with identical header values within the configured time window are considered duplicates and filtered out. * - *

This variant allows specifying custom state store and repartition names, making it suitable for reuse within - * the same topology. + *

A window store is used to track seen keys during the specified {@code windowDuration}. * * @param streamsBuilder the {@link StreamsBuilder} used to build the topology * @param initialStream the input stream to deduplicate (must have String keys) * @param storeName the name of the state store used for deduplication * @param repartitionName the name of the repartition topic * @param windowDuration the time window during which duplicates are filtered - * @param deduplicationHeadersList list of header names to extract from each record for deduplication. The - * combination of these header values forms the unique identifier for deduplication. + * @param deduplicationHeadersList list of header names used to build the deduplication key * @param the value type of the stream - * @return a deduplicated stream containing {@link ProcessingResult} + * @return a deduplicated stream */ - public static KStream> deduplicateWithHeaders( + public static KStream distinctByHeaders( StreamsBuilder streamsBuilder, KStream initialStream, String storeName, diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupHeadersProcessorTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupHeadersProcessorTest.java index 57ef2c47..760f6497 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupHeadersProcessorTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupHeadersProcessorTest.java @@ -18,6 +18,7 @@ */ package com.michelin.kstreamplify.deduplication; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; @@ -28,7 +29,6 @@ import static org.mockito.Mockito.when; import com.michelin.kstreamplify.avro.KafkaError; -import com.michelin.kstreamplify.error.ProcessingResult; import java.time.Duration; import java.util.List; import org.apache.kafka.streams.KeyValue; @@ -48,7 +48,7 @@ class DedupHeadersProcessorTest { private DedupHeadersProcessor processor; @Mock - private ProcessorContext> context; + private ProcessorContext context; @Mock private WindowStore windowStore; @@ -76,7 +76,7 @@ void shouldProcessNewRecord() { processor.process(message); verify(windowStore).put("value-1#value-3", "value-1#value-3", message.timestamp()); - verify(context).forward(argThat(arg -> arg.value().getValue().equals(message.value()))); + verify(context).forward(argThat(arg -> arg.value().equals(message.value()))); } @Test @@ -115,12 +115,6 @@ void shouldThrowException() { .thenThrow(new RuntimeException("Exception...")); doThrow(new RuntimeException("Exception...")).when(windowStore).put(anyString(), any(), anyLong()); - processor.process(message); - - verify(context).forward(argThat(arg -> arg.value() - .getError() - .getContextMessage() - .equals("Could not figure out what to do with the current payload: " - + "An unlikely error occurred during deduplication transform"))); + assertThrows(RuntimeException.class, () -> processor.process(message)); } } diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupHeadersProcessorWithErrorsTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupHeadersProcessorWithErrorsTest.java new file mode 100644 index 00000000..e0da9834 --- /dev/null +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupHeadersProcessorWithErrorsTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.michelin.kstreamplify.deduplication; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.michelin.kstreamplify.avro.KafkaError; +import com.michelin.kstreamplify.error.ProcessingResult; +import java.time.Duration; +import java.util.List; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class DedupHeadersProcessorWithErrorsTest { + + private DedupHeadersProcessorWithErrors processor; + + @Mock + private ProcessorContext> context; + + @Mock + private WindowStore windowStore; + + @Mock + private WindowStoreIterator windowStoreIterator; + + @BeforeEach + void setUp() { + // Create an instance of DedupWithHeadersProcessor for testing + processor = new DedupHeadersProcessorWithErrors<>( + "testStore", Duration.ofHours(1), List.of("header-1", "header-3")); + // Stub the context.getStateStore method to return the mock store + when(context.getStateStore("testStore")).thenReturn(windowStore); + + processor.init(context); + } + + @Test + void shouldProcessNewRecord() { + final KafkaError kafkaError = new KafkaError(); + final Record message = new Record<>("key", kafkaError, 0); + message.headers().add("header-1", "value-1".getBytes()); + message.headers().add("header-2", "value-2".getBytes()); + message.headers().add("header-3", "value-3".getBytes()); + processor.process(message); + + verify(windowStore).put("value-1#value-3", "value-1#value-3", message.timestamp()); + verify(context).forward(argThat(arg -> arg.value().getValue().equals(message.value()))); + } + + @Test + void shouldProcessDuplicate() { + final KafkaError kafkaError = new KafkaError(); + final Record message = new Record<>("key", kafkaError, 0); + message.headers().add("header-1", "value-1".getBytes()); + message.headers().add("header-2", "value-2".getBytes()); + message.headers().add("header-3", "value-3".getBytes()); + + // Simulate hasNext() returning true once and then false + when(windowStoreIterator.hasNext()).thenReturn(true); + + // Simulate the condition to trigger the return statement + when(windowStoreIterator.next()).thenReturn(KeyValue.pair(0L, "value-1#value-3")); + + // Simulate the backwardFetch() method returning the mocked ResultIterator + when(windowStore.backwardFetch(any(), any(), any())).thenReturn(windowStoreIterator); + + // Call the process method + processor.process(message); + + verify(windowStore, never()).put(anyString(), any(), anyLong()); + verify(context, never()).forward(any()); + } + + @Test + void shouldThrowException() { + Record message = new Record<>("key", new KafkaError(), 0L); + message.headers().add("header-1", "value-1".getBytes()); + message.headers().add("header-2", "value-2".getBytes()); + message.headers().add("header-3", "value-3".getBytes()); + + when(windowStore.backwardFetch(any(), any(), any())) + .thenReturn(null) + .thenThrow(new RuntimeException("Exception...")); + doThrow(new RuntimeException("Exception...")).when(windowStore).put(anyString(), any(), anyLong()); + + processor.process(message); + + verify(context).forward(argThat(arg -> arg.value() + .getError() + .getContextMessage() + .equals("Could not figure out what to do with the current payload: " + + "An unlikely error occurred during deduplication transform"))); + } +} diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessorTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessorTest.java index 1158d087..ea982929 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessorTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessorTest.java @@ -18,6 +18,7 @@ */ package com.michelin.kstreamplify.deduplication; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; @@ -28,7 +29,6 @@ import static org.mockito.Mockito.when; import com.michelin.kstreamplify.avro.KafkaError; -import com.michelin.kstreamplify.error.ProcessingResult; import java.time.Duration; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.api.ProcessorContext; @@ -47,7 +47,7 @@ class DedupKeyProcessorTest { private DedupKeyProcessor processor; @Mock - private ProcessorContext> context; + private ProcessorContext context; @Mock private WindowStore windowStore; @@ -74,7 +74,7 @@ void shouldProcessNewRecord() { processor.process(message); verify(windowStore).put("key", "key", message.timestamp()); - verify(context).forward(argThat(arg -> arg.value().getValue().equals(message.value()))); + verify(context).forward(argThat(arg -> arg.value().equals(message.value()))); } @Test @@ -107,12 +107,6 @@ void shouldThrowException() { .thenThrow(new RuntimeException("Exception...")); doThrow(new RuntimeException("Exception...")).when(windowStore).put(anyString(), any(), anyLong()); - processor.process(message); - - verify(context).forward(argThat(arg -> arg.value() - .getError() - .getContextMessage() - .equals("Could not figure out what to do with the current payload: " - + "An unlikely error occurred during deduplication transform"))); + assertThrows(RuntimeException.class, () -> processor.process(message)); } } diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessorWithErrorsTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessorWithErrorsTest.java new file mode 100644 index 00000000..e0259232 --- /dev/null +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyProcessorWithErrorsTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.michelin.kstreamplify.deduplication; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.michelin.kstreamplify.avro.KafkaError; +import com.michelin.kstreamplify.error.ProcessingResult; +import java.time.Duration; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class DedupKeyProcessorWithErrorsTest { + + private DedupKeyProcessorWithErrors processor; + + @Mock + private ProcessorContext> context; + + @Mock + private WindowStore windowStore; + + @Mock + private WindowStoreIterator windowStoreIterator; + + @BeforeEach + void setUp() { + // Create an instance of DedupWithPredicateProcessor for testing + processor = new DedupKeyProcessorWithErrors<>("testStore", Duration.ofHours(1)); + + // Stub the context.getStateStore method to return the mock store + when(context.getStateStore("testStore")).thenReturn(windowStore); + + processor.init(context); + } + + @Test + void shouldProcessNewRecord() { + final KafkaError kafkaError = new KafkaError(); + final Record message = new Record<>("key", kafkaError, 0); + + processor.process(message); + + verify(windowStore).put("key", "key", message.timestamp()); + verify(context).forward(argThat(arg -> arg.value().getValue().equals(message.value()))); + } + + @Test + void shouldProcessDuplicate() { + final KafkaError kafkaError = new KafkaError(); + final Record message = new Record<>("key", kafkaError, 0L); + + // Simulate hasNext() returning true once and then false + when(windowStoreIterator.hasNext()).thenReturn(true); + + // Simulate the condition to trigger the return statement + when(windowStoreIterator.next()).thenReturn(KeyValue.pair(0L, "key")); + + // Simulate the backwardFetch() method returning the mocked ResultIterator + when(windowStore.backwardFetch(any(), any(), any())).thenReturn(windowStoreIterator); + + // Call the process method + processor.process(message); + + verify(windowStore, never()).put(anyString(), any(), anyLong()); + verify(context, never()).forward(any()); + } + + @Test + void shouldThrowException() { + Record message = new Record<>("key", new KafkaError(), 0L); + + when(windowStore.backwardFetch(any(), any(), any())) + .thenReturn(null) + .thenThrow(new RuntimeException("Exception...")); + doThrow(new RuntimeException("Exception...")).when(windowStore).put(anyString(), any(), anyLong()); + + processor.process(message); + + verify(context).forward(argThat(arg -> arg.value() + .getError() + .getContextMessage() + .equals("Could not figure out what to do with the current payload: " + + "An unlikely error occurred during deduplication transform"))); + } +} diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessorTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessorTest.java index 5ccb1432..26eaa02a 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessorTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessorTest.java @@ -18,6 +18,7 @@ */ package com.michelin.kstreamplify.deduplication; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; @@ -28,7 +29,6 @@ import static org.mockito.Mockito.when; import com.michelin.kstreamplify.avro.KafkaError; -import com.michelin.kstreamplify.error.ProcessingResult; import java.time.Duration; import org.apache.kafka.streams.KeyValue; import org.apache.kafka.streams.processor.api.ProcessorContext; @@ -47,7 +47,7 @@ class DedupKeyValueProcessorTest { private DedupKeyValueProcessor processor; @Mock - private ProcessorContext> context; + private ProcessorContext context; @Mock private WindowStore windowStore; @@ -74,7 +74,7 @@ void shouldProcessNewRecord() { processor.process(message); verify(windowStore).put(message.key(), message.value(), message.timestamp()); - verify(context).forward(argThat(arg -> arg.value().getValue().equals(message.value()))); + verify(context).forward(argThat(arg -> arg.value().equals(message.value()))); } @Test @@ -108,12 +108,6 @@ void shouldThrowException() { doThrow(new RuntimeException("Exception...")).when(windowStore).put(anyString(), any(), anyLong()); // Call the process method - processor.process(message); - - verify(context).forward(argThat(arg -> arg.value() - .getError() - .getContextMessage() - .equals("Could not figure out what to do with the current payload: " - + "An unlikely error occurred during deduplication transform"))); + assertThrows(RuntimeException.class, () -> processor.process(message)); } } diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessorWithErrorsTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessorWithErrorsTest.java new file mode 100644 index 00000000..1e930677 --- /dev/null +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupKeyValueProcessorWithErrorsTest.java @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.michelin.kstreamplify.deduplication; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.michelin.kstreamplify.avro.KafkaError; +import com.michelin.kstreamplify.error.ProcessingResult; +import java.time.Duration; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class DedupKeyValueProcessorWithErrorsTest { + + private DedupKeyValueProcessorWithErrors processor; + + @Mock + private ProcessorContext> context; + + @Mock + private WindowStore windowStore; + + @Mock + private WindowStoreIterator windowStoreIterator; + + @BeforeEach + void setUp() { + // Create an instance of DedupWithPredicateProcessor for testing + processor = new DedupKeyValueProcessorWithErrors<>("testStore", Duration.ofHours(1)); + + // Stub the context.getStateStore method to return the mock store + when(context.getStateStore("testStore")).thenReturn(windowStore); + + processor.init(context); + } + + @Test + void shouldProcessNewRecord() { + final KafkaError kafkaError = new KafkaError(); + final Record message = new Record<>("key", kafkaError, 0); + + processor.process(message); + + verify(windowStore).put(message.key(), message.value(), message.timestamp()); + verify(context).forward(argThat(arg -> arg.value().getValue().equals(message.value()))); + } + + @Test + void shouldProcessDuplicate() { + final KafkaError kafkaError = new KafkaError(); + final Record message = new Record<>("key", kafkaError, 0L); + + // Simulate hasNext() returning true once and then false + when(windowStoreIterator.hasNext()).thenReturn(true); + + // Simulate the condition to trigger the return statement + when(windowStoreIterator.next()).thenReturn(KeyValue.pair(0L, kafkaError)); + + // Simulate the backwardFetch() method returning the mocked ResultIterator + when(windowStore.backwardFetch(any(), any(), any())).thenReturn(windowStoreIterator); + + // Call the process method + processor.process(message); + + verify(windowStore, never()).put(anyString(), any(), anyLong()); + verify(context, never()).forward(any()); + } + + @Test + void shouldThrowException() { + final Record message = new Record<>("key", new KafkaError(), 0L); + + when(windowStore.backwardFetch(any(), any(), any())) + .thenReturn(null) + .thenThrow(new RuntimeException("Exception...")); + doThrow(new RuntimeException("Exception...")).when(windowStore).put(anyString(), any(), anyLong()); + + // Call the process method + processor.process(message); + + verify(context).forward(argThat(arg -> arg.value() + .getError() + .getContextMessage() + .equals("Could not figure out what to do with the current payload: " + + "An unlikely error occurred during deduplication transform"))); + } +} diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorTest.java index 8481d965..d4d263f5 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorTest.java @@ -18,6 +18,7 @@ */ package com.michelin.kstreamplify.deduplication; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyLong; import static org.mockito.ArgumentMatchers.anyString; @@ -28,7 +29,6 @@ import static org.mockito.Mockito.when; import com.michelin.kstreamplify.avro.KafkaError; -import com.michelin.kstreamplify.error.ProcessingResult; import java.time.Duration; import org.apache.avro.specific.SpecificRecord; import org.apache.kafka.streams.KeyValue; @@ -47,7 +47,7 @@ class DedupWithPredicateProcessorTest { private DedupWithPredicateProcessor processor; @Mock - private ProcessorContext> context; + private ProcessorContext context; @Mock private WindowStore windowStore; @@ -74,7 +74,7 @@ void shouldProcessNewRecord() { processor.process(message); verify(windowStore).put("", message.value(), message.timestamp()); - verify(context).forward(argThat(arg -> arg.value().getValue().equals(message.value()))); + verify(context).forward(argThat(arg -> arg.value().equals(message.value()))); } @Test @@ -108,13 +108,7 @@ void shouldThrowException() { doThrow(new RuntimeException("Exception...")).when(windowStore).put(anyString(), any(), anyLong()); // Call the process method - processor.process(message); - - verify(context).forward(argThat(arg -> arg.value() - .getError() - .getContextMessage() - .equals("Could not figure out what to do with the current payload: " - + "An unlikely error occurred during deduplication transform"))); + assertThrows(RuntimeException.class, () -> processor.process(message)); } static class KeyExtractorStub { diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorWithErrorsTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorWithErrorsTest.java new file mode 100644 index 00000000..c51e98b9 --- /dev/null +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/deduplication/DedupWithPredicateProcessorWithErrorsTest.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package com.michelin.kstreamplify.deduplication; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyLong; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.argThat; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.michelin.kstreamplify.avro.KafkaError; +import com.michelin.kstreamplify.error.ProcessingResult; +import java.time.Duration; +import org.apache.avro.specific.SpecificRecord; +import org.apache.kafka.streams.KeyValue; +import org.apache.kafka.streams.processor.api.ProcessorContext; +import org.apache.kafka.streams.processor.api.Record; +import org.apache.kafka.streams.state.WindowStore; +import org.apache.kafka.streams.state.WindowStoreIterator; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +class DedupWithPredicateProcessorWithErrorsTest { + private DedupWithPredicateProcessorWithErrors processor; + + @Mock + private ProcessorContext> context; + + @Mock + private WindowStore windowStore; + + @Mock + private WindowStoreIterator windowStoreIterator; + + @BeforeEach + void setUp() { + // Create an instance of DedupWithPredicateProcessor for testing + processor = new DedupWithPredicateProcessorWithErrors<>( + "testStore", Duration.ofHours(1), KeyExtractorStub::extract); + + // Stub the context.getStateStore method to return the mock store + when(context.getStateStore("testStore")).thenReturn(windowStore); + + processor.init(context); + } + + @Test + void shouldProcessNewRecord() { + final KafkaError kafkaError = new KafkaError(); + final Record message = new Record<>("key", kafkaError, 0); + + processor.process(message); + + verify(windowStore).put("", message.value(), message.timestamp()); + verify(context).forward(argThat(arg -> arg.value().getValue().equals(message.value()))); + } + + @Test + void shouldProcessDuplicate() { + final KafkaError kafkaError = new KafkaError(); + final Record message = new Record<>("key", kafkaError, 0L); + + // Simulate hasNext() returning true once and then false + when(windowStoreIterator.hasNext()).thenReturn(true); + + // Simulate the condition to trigger the return statement + when(windowStoreIterator.next()).thenReturn(KeyValue.pair(0L, kafkaError)); + + // Simulate the backwardFetch() method returning the mocked ResultIterator + when(windowStore.backwardFetch(any(), any(), any())).thenReturn(windowStoreIterator); + + // Call the process method + processor.process(message); + + verify(windowStore, never()).put(anyString(), any(), anyLong()); + verify(context, never()).forward(any()); + } + + @Test + void shouldThrowException() { + Record message = new Record<>("key", new KafkaError(), 0L); + + when(windowStore.backwardFetch(any(), any(), any())) + .thenReturn(null) + .thenThrow(new RuntimeException("Exception...")); + doThrow(new RuntimeException("Exception...")).when(windowStore).put(anyString(), any(), anyLong()); + + // Call the process method + processor.process(message); + + verify(context).forward(argThat(arg -> arg.value() + .getError() + .getContextMessage() + .equals("Could not figure out what to do with the current payload: " + + "An unlikely error occurred during deduplication transform"))); + } + + static class KeyExtractorStub { + public static String extract(V v) { + return ""; + } + } +} diff --git a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/initializer/KafkaStreamsStarterTest.java b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/initializer/KafkaStreamsStarterTest.java index 9720dfa8..27c0b430 100644 --- a/kstreamplify-core/src/test/java/com/michelin/kstreamplify/initializer/KafkaStreamsStarterTest.java +++ b/kstreamplify-core/src/test/java/com/michelin/kstreamplify/initializer/KafkaStreamsStarterTest.java @@ -92,19 +92,19 @@ static class KafkaStreamsStarterStub extends KafkaStreamsStarter { public void topology(StreamsBuilder streamsBuilder) { var streams = TopicWithSerdeStub.inputTopicWithSerde().stream(streamsBuilder); - DeduplicationUtils.deduplicateKeys( + DeduplicationUtils.distinctKeysWithErrors( streamsBuilder, streams, "deduplicateKeysStoreName", "deduplicateKeysRepartitionName", Duration.ZERO); - DeduplicationUtils.deduplicateKeyValues( + DeduplicationUtils.distinctByKeyValuesWithErrors( streamsBuilder, streams, "deduplicateKeyValuesStoreName", "deduplicateKeyValuesRepartitionName", Duration.ZERO); - DeduplicationUtils.deduplicateWithPredicate(streamsBuilder, streams, Duration.ofMillis(1), null); + DeduplicationUtils.distinctByPredicateWithErrors(streamsBuilder, streams, Duration.ofMillis(1), null); var enrichedStreams = streams.mapValues(KafkaStreamsStarterStub::enrichValue); var enrichedStreams2 = streams.mapValues(KafkaStreamsStarterStub::enrichValue2);